diff --git a/readme.md b/readme.md index 956475e..60f7544 100644 --- a/readme.md +++ b/readme.md @@ -1,170 +1,116 @@ -# B站关注清理工具(优化版) +# B站关注清理工具 - Scripts 版 -> 一键命令运行全流程:`python source/run_pipeline.py` +> 一键命令运行全流程:`python source/scripts/run_pipeline.py` -本项目保留并聚焦一条可用功能链: +python source/scripts/run_pipeline.py --input-json source/resources/export_uids_test5.json + +本工具包含7个步骤的完整流水线: 1. 抓取视频标题 2. 分批AI分析 -3. 生成取关UID(支持按100拆分) -4. 生成保留关注报告 +3. 生成保留关注报告 +4. 生成取关UID列表 5. 按首字母排序 6. 提取分组信息 +7. 删除最近10条标题 ## 快速开始 ```powershell # 完整流程(推荐) -python source/run_pipeline.py +python source/scripts/run_pipeline.py # 速度优先 -python source/run_pipeline.py --workers 8 --batch-size 30 --sleep-seconds 0 +python source/scripts/run_pipeline.py --workers 8 --batch-size 30 --sleep-seconds 0 # 试跑30个UP -python source/run_pipeline.py --max-ups 30 +python source/scripts/run_pipeline.py --max-ups 30 # 跳过抓取,使用已有标题报告 -python source/run_pipeline.py --skip-fetch +python source/scripts/run_pipeline.py --skip-fetch # 跳过分析,仅生成产物 -python source/run_pipeline.py --skip-analyze +python source/scripts/run_pipeline.py --skip-analyze -# 跳过排序/分组 -python source/run_pipeline.py --skip-sort --skip-group +# 跳过排序/分组/删除 +python source/scripts/run_pipeline.py --skip-sort --skip-group --skip-remove ``` -## 目录结构 +## 输出文件 -```text -source/ - resources/ # 资源文件 - export_uids.json - export_uids.txt +| 文件 | 说明 | +|------|------| +| `source/output/reports/1_up_titles_report.md` | 标题抓取报告 | +| `source/output/reports/2_up_analysis_full_auto.md` | AI分析报告(完整) | +| `source/output/reports/3_up_keep_follow_only.md` | 保留关注报告 | +| `source/output/uids/4_unfollow_mids_list.txt` | 取关UID列表 | +| `source/output/reports/5_sorted_up_analysis.md` | 按首字母排序报告 | +| `source/output/reports/6_group_info.md` | 提取分组信息报告 | +| `source/output/reports/7_no_titles.md` | 最终报告(删除最近10条) | - output/ # 产物目录 - reports/ # 报告文件 - up_titles_report.md - up_analysis_full_auto.md - up_keep_follow_only.md - uids/ # 取关UID结果 - unfollow_mids_list.txt - unfollow_mids_list_1.txt - unfollow_mids_list_2.txt - ... +## 常用参数 - analyze_up_content.py # 步骤1:抓取标题 - batch_ai_summary_from_report.py# 步骤2:分批分析 - extract_keep_follow_doc.py # 步骤3:保留关注报告 - extract_unfollow_list.py # 步骤4:取关UID - run_pipeline.py # 一键流水线 - README_up_analysis.md +| 参数 | 默认值 | 说明 | +|------|--------|------| +| `--workers` | 6 | 并发请求数 | +| `--batch-size` | 20 | 每批分析条数 | +| `--max-ups` | 0(全部) | 限制处理UP数量 | +| `--split-size` | 100 | UID拆分大小 | +| `--sleep-seconds` | 0 | 任务间隔秒数 | + +### 跳过参数 + +| 参数 | 说明 | +|------|------| +| `--skip-fetch` | 跳过抓取阶段 | +| `--skip-analyze` | 跳过分析阶段 | +| `--skip-sort` | 跳过排序阶段 | +| `--skip-group` | 跳过提取分组阶段 | +| `--skip-remove` | 跳过删除最近10条阶段 | + +## 分步执行 + +### 步骤1:抓取标题 +```powershell +python source/scripts/analyze_up_content.py --skip-ai ``` -## 先配置 API +### 步骤2:分批AI分析 +```powershell +python source/scripts/batch_ai_summary_from_report.py --run-all-batches +``` -编辑 [source/analyze_up_content.py](source/analyze_up_content.py) 顶部配置: +### 步骤3:生成保留关注报告 +```powershell +python source/scripts/extract_keep_follow_doc.py +``` + +### 步骤4:生成取关UID +```powershell +python source/scripts/extract_unfollow_list.py --format mid-only --split-size 100 +``` + +### 步骤5:按首字母排序 +```powershell +python source/scripts/sort_up_main.py +``` + +### 步骤6:提取分组信息 +```powershell +python source/scripts/extract_group_info.py +``` + +### 步骤7:删除最近10条标题 +```powershell +python source/scripts/remove_10content.py +``` + +## 先配置API + +编辑 [source/scripts/analyze_up_content.py](source/scripts/analyze_up_content.py) 顶部配置: ```python VOLCENGINE_API_KEY = "你的火山引擎API Key" VOLCENGINE_MODEL = "deepseek-v3-1-terminus" VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" -``` - -`batch_ai_summary_from_report.py` 会自动读取该配置。 - -## 常用参数 - -```powershell -# 提升速度 -python source/run_pipeline.py --workers 8 --batch-size 30 --sleep-seconds 0 - -# 只先抓取前50个做试跑 -python source/run_pipeline.py --max-ups 50 - -# 仅处理带标签UP -python source/run_pipeline.py --only-tag "准备取关" - -# 跳过抓取(复用已有标题报告) -python source/run_pipeline.py --skip-fetch - -# 跳过分析(复用已有分析报告,仅生成产物) -python source/run_pipeline.py --skip-analyze - -# 修改UID拆分粒度 -python source/run_pipeline.py --split-size 200 -``` - -## 分步执行(可选) - -### 步骤1:抓取标题 - -```powershell -python source/analyze_up_content.py --skip-ai -``` - -默认输出: -- [source/output/reports/up_titles_report.md](source/output/reports/up_titles_report.md) - -### 步骤2:分批AI分析 - -```powershell -python source/batch_ai_summary_from_report.py --run-all-batches -# 小批量测试 -python source/batch_ai_summary_from_report.py - - -python source/batch_ai_summary_from_report.py --input source\output\reports\up_titles_report.md --output source\18_12.md --force - -python source/batch_ai_summary_from_report.py --input source\output\reports\up_titles_report.md --output source\19_06_all.md --force --run-all-batches -``` - -默认输入/输出: -- 输入 [source/output/reports/up_titles_report.md](source/output/reports/up_titles_report.md) -- 输出 [source/output/reports/up_analysis_full_auto.md](source/output/reports/up_analysis_full_auto.md) - -### 步骤3:生成保留关注报告 - -```powershell -python source/extract_keep_follow_doc.py - -python source/extract_keep_follow_doc.py --input source/19_06_all.md --output source/19_30_keep_follow.md -``` - -输出: -- [source/output/reports/up_keep_follow_only.md](source/output/reports/up_keep_follow_only.md) - -### 步骤4:生成取关UID - -```powershell -python source/extract_unfollow_list.py --format mid-only --split-size 100 -``` - -输出: -- 主文件 [source/output/uids/unfollow_mids_list.txt](source/output/uids/unfollow_mids_list.txt) -- 拆分文件 [source/output/uids/unfollow_mids_list_1.txt](source/output/uids/unfollow_mids_list_1.txt) 等 - -## 结果解释 - -- `up_analysis_full_auto.md`:完整分析报告(含取关/保留) -- `up_keep_follow_only.md`:仅保留关注UP的AI分析与分组建议 -- `unfollow_mids_list.txt`:可取关UID逗号分隔列表(可直接粘贴使用) - -## 建议参数 - -- 稳定优先:`--workers 4 --max-retries 2 --request-timeout 60` -- 速度优先:`--workers 8 --batch-size 30 --sleep-seconds 0` -- 低风险试跑:`--max-ups 30` 先验证再全量 - - - -### 结果按首字母排序 - -``` -python sort_up_main.py -``` - - -### 提取分组 -``` -python source/extract_group_info.py --input source/19_53_no_titles.md --output source/group_only.md ``` \ No newline at end of file diff --git a/source/.gitignore b/source/.gitignore index ceebbad..3ae93a1 100644 --- a/source/.gitignore +++ b/source/.gitignore @@ -38,6 +38,7 @@ MANIFEST *.tmp *.temp *.md +*.* # 5. 忽略敏感数据 *.env diff --git a/source/analyze_up_content.py b/source/analyze_up_content.py deleted file mode 100644 index d33a29d..0000000 --- a/source/analyze_up_content.py +++ /dev/null @@ -1,690 +0,0 @@ -#!/usr/bin/env python3 -"""Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API. - -Input JSON format (list of objects): -[ - {"mid": 12345, "name": "UP Name", "tag": ["准备取关"]} -] -""" - -from __future__ import annotations - -import argparse -import hashlib -import html -import json -import random -import re -import sys -import time -from dataclasses import dataclass -from pathlib import Path -from typing import Any -from urllib import error, parse, request - - -BILIBILI_API = "https://api.bilibili.com/x/space/arc/search" -BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search" -BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav" -# 可选:如果仍频繁触发412,可填浏览器里复制的Cookie字符串。 -BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696" -RUNTIME_BILIBILI_COOKIE = "" -DEFAULT_USER_AGENT = ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/124.0.0.0 Safari/537.36" -) -MIXIN_KEY_ENC_TAB = [ - 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, - 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, - 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4, - 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52, -] - -# 在这里直接填写火山引擎配置。 -VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" -VOLCENGINE_MODEL = "deepseek-v3-1-terminus" -VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" - - -@dataclass -class UpItem: - mid: int - name: str - tag: list[str] - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="抓取 UP 前10个视频标题,并调用火山引擎 API 生成分析报告" - ) - parser.add_argument( - "--input", - default="source/resources/export_uids.json", - help="输入 JSON 文件路径,默认: source/resources/export_uids.json", - ) - parser.add_argument( - "--output", - default="source/output/reports/up_titles_report.md", - help="输出 Markdown 报告路径,默认: source/output/reports/up_titles_report.md", - ) - parser.add_argument( - "--titles-per-up", - type=int, - default=10, - help="每个 UP 抓取的视频标题数量,默认: 10", - ) - parser.add_argument( - "--max-ups", - type=int, - default=0, - help="最多处理多少个 UP,0 表示全部", - ) - parser.add_argument( - "--only-tag", - default="", - help="只处理包含该标签的 UP,例如: 准备取关;留空表示不过滤", - ) - parser.add_argument( - "--sleep-seconds", - type=float, - default=0.8, - help="每个 UP 抓取后的等待秒数,默认: 0.8", - ) - parser.add_argument( - "--retry-times", - type=int, - default=3, - help="抓取重试次数(遇到412/-799时),默认: 3", - ) - parser.add_argument( - "--test-mid", - type=int, - default=0, - help="测试模式:只抓取这个mid,不读取输入文件", - ) - parser.add_argument( - "--test-name", - default="TEST_UP", - help="测试模式下显示名称,默认: TEST_UP", - ) - parser.add_argument( - "--skip-ai", - action="store_true", - help="只测试抓取,不调用AI分析", - ) - parser.add_argument( - "--debug", - action="store_true", - help="输出抓取调试信息", - ) - parser.add_argument( - "--bili-cookie", - default="", - help="可选:运行时传入B站Cookie,优先级高于脚本内BILIBILI_COOKIE", - ) - parser.add_argument( - "--fetch-mode", - choices=["auto", "api", "html"], - default="auto", - help="抓取模式: auto(先API后HTML)/api/html,默认: auto", - ) - parser.add_argument( - "--analyze-from-report", - default="", - help="从已有报告读取标题并仅执行AI分析,例如: source/up_analysis_report.md", - ) - parser.add_argument( - "--batch-size", - type=int, - default=30, - help="分批分析时每批数量,默认: 30", - ) - parser.add_argument( - "--batch-index", - type=int, - default=1, - help="分批分析批次序号(从1开始),默认: 1", - ) - return parser.parse_args() - - -def parse_report_items(report_path: Path) -> list[dict[str, Any]]: - lines = report_path.read_text(encoding="utf-8").splitlines() - items: list[dict[str, Any]] = [] - current: dict[str, Any] | None = None - section = "" - - for line in lines: - m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) - if m: - if current is not None: - items.append(current) - current = { - "mid": int(m.group(2)), - "name": m.group(1).strip(), - "tag": [], - "url": f"https://space.bilibili.com/{int(m.group(2))}/video", - "titles": [], - "analysis": "", - "error": "", - } - section = "" - continue - - if current is None: - continue - - if line.startswith("- 主页: "): - current["url"] = line.replace("- 主页: ", "", 1).strip() - continue - if line.startswith("- 标签: "): - raw_tag = line.replace("- 标签: ", "", 1).strip() - current["tag"] = [] if raw_tag in ("", "无") else [x.strip() for x in raw_tag.split(",") if x.strip()] - continue - if line == "### 最近10条标题": - section = "titles" - continue - if line == "### AI分析": - section = "analysis" - continue - if line == "### 异常": - section = "error" - continue - if line.startswith("### "): - section = "" - continue - - if section == "titles" and line.startswith("- "): - t = line[2:].strip() - if t and t != "(未抓取到标题)": - current["titles"].append(t) - elif section == "analysis": - if line.strip(): - if current["analysis"]: - current["analysis"] += "\n" + line.strip() - else: - current["analysis"] = line.strip() - elif section == "error" and line.startswith("- "): - current["error"] = line[2:].strip() - - if current is not None: - items.append(current) - return items - - -def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int: - report_path = Path(args.analyze_from_report) - if not report_path.exists(): - print(f"报告文件不存在: {report_path}", file=sys.stderr) - return 1 - - items = parse_report_items(report_path) - if not items: - print("报告中未解析到可分析条目", file=sys.stderr) - return 1 - - pending = [ - it for it in items - if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析") - ] - if not pending: - print("报告中没有待分析条目(可能已全部分析完成)") - output_path.write_text(build_report(items), encoding="utf-8") - return 0 - - batch_size = max(args.batch_size, 1) - batch_index = max(args.batch_index, 1) - start = (batch_index - 1) * batch_size - end = start + batch_size - batch = pending[start:end] - if not batch: - print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}") - output_path.write_text(build_report(items), encoding="utf-8") - return 0 - - print( - f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, " - f"本批{len(batch)}条, 待分析总数{len(pending)}" - ) - - key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} - for idx, it in enumerate(batch, start=1): - print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})") - try: - analysis = analyze_titles(it["name"], it["url"], it["titles"]) - origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}") - if origin_idx is not None: - items[origin_idx]["analysis"] = analysis - items[origin_idx]["error"] = "" - except Exception as exc: # noqa: BLE001 - origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}") - if origin_idx is not None: - items[origin_idx]["error"] = str(exc) - time.sleep(max(args.sleep_seconds, 0.0)) - - output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_text(build_report(items), encoding="utf-8") - print(f"分批分析报告已生成: {output_path}") - return 0 - - -def load_up_items(input_path: Path) -> list[UpItem]: - raw = json.loads(input_path.read_text(encoding="utf-8")) - if not isinstance(raw, list): - raise ValueError("输入 JSON 必须是数组") - - items: list[UpItem] = [] - for idx, obj in enumerate(raw): - if not isinstance(obj, dict): - raise ValueError(f"第 {idx + 1} 项不是对象") - mid = obj.get("mid") - name = obj.get("name", "") - tags = obj.get("tag", []) - if mid is None: - continue - try: - mid_int = int(mid) - except (TypeError, ValueError): - continue - if not isinstance(name, str): - name = str(name) - if not isinstance(tags, list): - tags = [] - tags = [str(t) for t in tags] - items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags)) - return items - - -def http_get_json( - url: str, - timeout: float = 20.0, - referer: str = "https://space.bilibili.com/", -) -> dict[str, Any]: - headers = { - "User-Agent": DEFAULT_USER_AGENT, - "Referer": referer, - "Origin": "https://www.bilibili.com", - "Accept": "application/json, text/plain, */*", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", - } - cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip() - if cookie: - headers["Cookie"] = cookie - req = request.Request(url, headers=headers, method="GET") - with request.urlopen(req, timeout=timeout) as resp: - body = resp.read().decode("utf-8", errors="replace") - return json.loads(body) - - -def http_get_text( - url: str, - timeout: float = 20.0, - referer: str = "https://space.bilibili.com/", -) -> str: - headers = { - "User-Agent": DEFAULT_USER_AGENT, - "Referer": referer, - "Origin": "https://www.bilibili.com", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", - } - cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip() - if cookie: - headers["Cookie"] = cookie - req = request.Request(url, headers=headers, method="GET") - with request.urlopen(req, timeout=timeout) as resp: - return resp.read().decode("utf-8", errors="replace") - - -def get_mixin_key(img_key: str, sub_key: str) -> str: - origin = img_key + sub_key - mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB) - return mixed[:32] - - -def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]: - params = {k: str(v) for k, v in base_params.items()} - params["wts"] = str(int(time.time())) - params = dict(sorted(params.items())) - filtered = { - k: re.sub(r"[!'()*]", "", v) - for k, v in params.items() - } - query = parse.urlencode(filtered) - w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest() - filtered["w_rid"] = w_rid - return filtered - - -def get_wbi_mixin_key() -> str: - data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/") - if data.get("code") != 0: - raise RuntimeError( - f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}" - ) - wbi_img = data.get("data", {}).get("wbi_img", {}) - img_url = wbi_img.get("img_url", "") - sub_url = wbi_img.get("sub_url", "") - if not img_url or not sub_url: - raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url") - img_key = img_url.rsplit("/", 1)[-1].split(".")[0] - sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0] - return get_mixin_key(img_key, sub_key) - - -def parse_titles_from_data(data: dict[str, Any]) -> list[str]: - vlist = data.get("data", {}).get("list", {}).get("vlist", []) - if not isinstance(vlist, list): - return [] - titles: list[str] = [] - for item in vlist: - if not isinstance(item, dict): - continue - title = item.get("title", "") - if isinstance(title, str) and title.strip(): - titles.append(clean_html(title.strip())) - return titles - - -def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]: - url = f"https://space.bilibili.com/{mid}/video" - html_text = http_get_text(url, referer="https://www.bilibili.com/") - - # 页面中视频封面常携带标题到alt字段,优先从这里提取。 - alt_candidates = re.findall( - r']*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"', - html_text, - flags=re.IGNORECASE, - ) - - titles: list[str] = [] - seen: set[str] = set() - for raw in alt_candidates: - t = clean_html(html.unescape(raw)).strip() - if not t or t in seen: - continue - seen.add(t) - titles.append(t) - if len(titles) >= titles_per_up: - break - - if debug: - print(f"[debug] HTML模式提取到 {len(titles)} 条标题") - return titles - - -def fetch_titles( - mid: int, - titles_per_up: int, - retry_times: int = 3, - debug: bool = False, - fetch_mode: str = "auto", -) -> list[str]: - base_params = { - "mid": str(mid), - "pn": "1", - "ps": str(titles_per_up), - "order": "pubdate", - "index": "1", - "jsonp": "json", - } - - errors: list[str] = [] - if fetch_mode in ("auto", "api"): - # 优先使用wbi接口,稳定性通常更好。 - mixin_key = "" - try: - mixin_key = get_wbi_mixin_key() - except Exception as exc: # noqa: BLE001 - if debug: - print(f"[debug] 获取wbi密钥失败: {exc}") - - for attempt in range(1, max(retry_times, 1) + 1): - try: - if mixin_key: - signed = build_wbi_params(base_params, mixin_key) - url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}" - else: - url = f"{BILIBILI_API}?{parse.urlencode(base_params)}" - data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video") - code = data.get("code", -1) - if code == 0: - titles = parse_titles_from_data(data) - if titles: - return titles - errors.append("接口返回成功但标题为空") - else: - errors.append(f"code={code}, message={data.get('message', 'unknown')} ") - except error.HTTPError as exc: - errors.append(f"HTTP {exc.code} {exc.reason}") - except Exception as exc: # noqa: BLE001 - errors.append(str(exc)) - - sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0)) - if debug: - print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]},{sleep_for:.1f}s后重试") - time.sleep(sleep_for) - - if fetch_mode in ("auto", "html"): - try: - html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug) - if html_titles: - return html_titles - errors.append("HTML模式未提取到标题") - except Exception as exc: # noqa: BLE001 - errors.append(f"HTML模式失败: {exc}") - - joined = "; ".join(errors[-3:]) - if ("412" in joined) or ("-799" in joined): - hint = "提示: 请在脚本里填写BILIBILI_COOKIE,或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\"" - raise RuntimeError(f"{joined}; {hint}") - raise RuntimeError(joined) - - -def clean_html(text: str) -> str: - return re.sub(r"<[^>]+>", "", text) - - -def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str: - api_key = VOLCENGINE_API_KEY.strip() - base_url = VOLCENGINE_BASE_URL.strip() - model = VOLCENGINE_MODEL.strip() - - if (not api_key) or ("在这里填" in api_key): - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") - if (not model) or ("在这里填" in model): - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") - if not base_url: - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") - - url = f"{base_url.rstrip('/')}/chat/completions" - payload = { - "model": model, - "messages": [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - "temperature": 0.4, - } - data = json.dumps(payload, ensure_ascii=False).encode("utf-8") - - req = request.Request( - url, - data=data, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - method="POST", - ) - - with request.urlopen(req, timeout=60) as resp: - body = resp.read().decode("utf-8", errors="replace") - result = json.loads(body) - content = result.get("choices", [{}])[0].get("message", {}).get("content", "") - if not isinstance(content, str) or not content.strip(): - raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}") - return content.strip() - - -def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str: - system_prompt = ( - "你是一个内容分析助手。根据视频标题判断UP主内容方向,并给出是否建议取关。" - "输出必须是简体中文,且严格按照用户给定的Markdown格式。" - ) - joined_titles = "\n".join(f"- {t}" for t in titles) - user_prompt = f""" -请分析以下UP主最近视频标题: - -UP主:{up_name} -主页:{up_url} -标题: -{joined_titles} - -请按以下格式输出(不要增加其它段落): -1) 内容定位:一句话 -2) 受众画像:一句话 -3) 近期内容倾向:2-3点,使用-开头 -4) 质量评价:80-120字 -5) 取关建议:保留关注/可以取关(二选一) -6) 建议理由:50-100字 -""".strip() - return call_volcengine_chat(system_prompt, user_prompt) - - -def build_report(results: list[dict[str, Any]]) -> str: - now = time.strftime("%Y-%m-%d %H:%M:%S") - lines: list[str] = [] - lines.append("# UP主内容分析报告") - lines.append("") - lines.append(f"- 生成时间: {now}") - lines.append(f"- 分析数量: {len(results)}") - lines.append("") - - for idx, item in enumerate(results, start=1): - lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") - lines.append("") - lines.append(f"- 主页: {item['url']}") - tags = item.get("tag", []) - lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") - lines.append("") - lines.append("### 最近10条标题") - lines.append("") - titles = item.get("titles", []) - if titles: - for t in titles: - lines.append(f"- {t}") - else: - lines.append("- (未抓取到标题)") - lines.append("") - - analysis = item.get("analysis", "") - if analysis: - lines.append("### AI分析") - lines.append("") - lines.append(analysis) - lines.append("") - - error_msg = item.get("error", "") - if error_msg: - lines.append("### 异常") - lines.append("") - lines.append(f"- {error_msg}") - lines.append("") - - return "\n".join(lines).rstrip() + "\n" - - -def main() -> int: - global RUNTIME_BILIBILI_COOKIE - args = parse_args() - RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip() - input_path = Path(args.input) - output_path = Path(args.output) - - if args.analyze_from_report: - return run_batch_analysis_from_report(args, output_path) - - if args.test_mid > 0: - items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ] - print(f"测试模式: 仅处理 mid={args.test_mid}") - else: - if not input_path.exists(): - print(f"输入文件不存在: {input_path}", file=sys.stderr) - return 1 - - try: - items = load_up_items(input_path) - except Exception as exc: - print(f"加载输入文件失败: {exc}", file=sys.stderr) - return 1 - - if args.only_tag: - items = [it for it in items if args.only_tag in it.tag] - - if args.max_ups and args.max_ups > 0: - items = items[: args.max_ups] - - if not items: - print("没有可处理的 UP 数据", file=sys.stderr) - return 1 - - print(f"开始处理 {len(items)} 个 UP...") - if args.skip_ai: - print("已启用 --skip-ai,仅测试抓取标题") - if args.debug: - print(f"[debug] 当前抓取模式: {args.fetch_mode}") - - results: list[dict[str, Any]] = [] - for idx, item in enumerate(items, start=1): - up_url = f"https://space.bilibili.com/{item.mid}/video" - row: dict[str, Any] = { - "mid": item.mid, - "name": item.name or f"mid_{item.mid}", - "tag": item.tag, - "url": up_url, - "titles": [], - "analysis": "", - "error": "", - } - - print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})") - try: - titles = fetch_titles( - item.mid, - args.titles_per_up, - retry_times=args.retry_times, - debug=args.debug, - fetch_mode=args.fetch_mode, - ) - row["titles"] = titles - if not titles: - row["error"] = "未抓取到标题,可能是接口限制或UP无公开视频" - elif args.skip_ai: - row["analysis"] = "测试模式已跳过AI分析" - else: - row["analysis"] = analyze_titles(row["name"], up_url, titles) - except error.HTTPError as exc: - row["error"] = f"HTTP错误: {exc.code} {exc.reason}" - except error.URLError as exc: - row["error"] = f"网络错误: {exc.reason}" - except Exception as exc: # noqa: BLE001 - row["error"] = str(exc) - - if args.debug and row["titles"]: - sample = row["titles"][: min(3, len(row["titles"]))] - print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}") - - results.append(row) - time.sleep(max(args.sleep_seconds, 0)) - - report = build_report(results) - output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_text(report, encoding="utf-8") - print(f"报告已生成: {output_path}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/source/batch_ai_summary_from_report copy.py b/source/batch_ai_summary_from_report copy.py deleted file mode 100644 index 7592b4c..0000000 --- a/source/batch_ai_summary_from_report copy.py +++ /dev/null @@ -1,573 +0,0 @@ -#!/usr/bin/env python3 -"""Batch AI summary from existing UP markdown report. - -Read an existing report (e.g. source/up_analysis_report.md), -extract each UP's title list, and generate AI summaries in batches. -""" - -from __future__ import annotations - -import argparse -from concurrent.futures import ThreadPoolExecutor, as_completed -import json -import math -import re -import sys -import time -from pathlib import Path -from typing import Any -from urllib import request - -# Fill your Volcengine Ark settings here. -VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" -VOLCENGINE_MODEL = "deepseek-v3-1-terminus" -VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" - -SKIP_MARKERS = { - "", - "测试模式已跳过AI分析", - "(待分析)", -} - -# 预设分组及关键词规则(可自行扩展)。 -PRESET_GROUPS: dict[str, list[str]] = { - "AAA_核心每日必读":[ - "编程", "算法", "工程", "干货", "新闻", "趋势", - ], - "AA_编程信息干货必留": [ - "编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构", - ], - "A_硬核知识保留": [ - "科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度", - ], - "B_技能学习保留": [ - "英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作", - ], - "C_资讯快餐观察": [ - "热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势", - ], - "D_娱乐消遣可取关": [ - "搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣", - ], - "E_营销带货谨慎": [ - "好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活", - ], -} - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结") - parser.add_argument( - "--input-report", - default="source/output/reports/up_titles_report.md", - help="已有标题报告路径,默认: source/output/reports/up_titles_report.md", - ) - parser.add_argument( - "--output-report", - default="source/output/reports/up_analysis_full_auto.md", - help="输出报告路径,默认: source/output/reports/up_analysis_full_auto.md", - ) - parser.add_argument( - "--batch-size", - type=int, - default=20, - help="每批处理数量,默认: 20", - ) - parser.add_argument( - "--batch-index", - type=int, - default=1, - help="批次序号(从1开始),默认: 1", - ) - parser.add_argument( - "--sleep-seconds", - type=float, - default=0.0, - help="提交任务间隔秒数,默认: 0(并发模式建议0)", - ) - parser.add_argument( - "--workers", - type=int, - default=4, - help="并发请求数,默认: 4", - ) - parser.add_argument( - "--max-retries", - type=int, - default=2, - help="单个UP分析最大重试次数,默认: 2", - ) - parser.add_argument( - "--request-timeout", - type=float, - default=60.0, - help="单次AI请求超时秒数,默认: 60", - ) - parser.add_argument( - "--force", - action="store_true", - help="强制覆盖已有AI分析(默认只处理待分析项)", - ) - parser.add_argument( - "--debug", - action="store_true", - help="输出调试信息", - ) - parser.add_argument( - "--config-from", - default="source/analyze_up_content.py", - help="自动读取API配置的脚本路径,默认: source/analyze_up_content.py", - ) - parser.add_argument( - "--run-all-batches", - action="store_true", - help="自动连续跑完所有批次(忽略batch-index)", - ) - return parser.parse_args() - - -def load_api_config_from_script(path: Path) -> dict[str, str]: - if not path.exists(): - return {} - text = path.read_text(encoding="utf-8", errors="replace") - result: dict[str, str] = {} - for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"): - m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE) - if m: - result[key] = m.group(1).strip() - return result - - -def parse_report(path: Path) -> list[dict[str, Any]]: - lines = path.read_text(encoding="utf-8").splitlines() - - items: list[dict[str, Any]] = [] - current: dict[str, Any] | None = None - section = "" - - for line in lines: - m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) - if m: - if current is not None: - items.append(current) - mid = int(m.group(2)) - current = { - "mid": mid, - "name": m.group(1).strip(), - "tag": [], - "url": f"https://space.bilibili.com/{mid}/video", - "titles": [], - "analysis": "", - "error": "", - } - section = "" - continue - - if current is None: - continue - - if line.startswith("- 主页: "): - current["url"] = line.replace("- 主页: ", "", 1).strip() - continue - if line.startswith("- 标签: "): - raw = line.replace("- 标签: ", "", 1).strip() - current["tag"] = [] if raw in ("", "无") else [x.strip() for x in raw.split(",") if x.strip()] - continue - - if line == "### 最近10条标题": - section = "titles" - continue - if line == "### AI分析": - section = "analysis" - continue - if line == "### 异常": - section = "error" - continue - if line.startswith("### "): - section = "" - continue - - if section == "titles" and line.startswith("- "): - text = line[2:].strip() - if text and text != "(未抓取到标题)": - current["titles"].append(text) - elif section == "analysis" and line.strip(): - current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip() - elif section == "error" and line.startswith("- "): - current["error"] = line[2:].strip() - - if current is not None: - items.append(current) - - return items - - -def call_volcengine_chat( - system_prompt: str, - user_prompt: str, - cfg: dict[str, str], - timeout: float, -) -> str: - api_key = cfg.get("VOLCENGINE_API_KEY", "").strip() - model = cfg.get("VOLCENGINE_MODEL", "").strip() - base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip() - - if (not api_key) or ("在这里填" in api_key): - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") - if (not model) or ("在这里填" in model): - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") - if not base_url: - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") - - payload = { - "model": model, - "messages": [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - "temperature": 0.4, - } - - body = json.dumps(payload, ensure_ascii=False).encode("utf-8") - req = request.Request( - f"{base_url.rstrip('/')}/chat/completions", - data=body, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - method="POST", - ) - - with request.urlopen(req, timeout=timeout) as resp: - text = resp.read().decode("utf-8", errors="replace") - - data = json.loads(text) - content = data.get("choices", [{}])[0].get("message", {}).get("content", "") - if not isinstance(content, str) or not content.strip(): - raise RuntimeError(f"AI响应异常: {text[:500]}") - return content.strip() - - -def summarize_one_up( - name: str, - mid: int, - titles: list[str], - tags: list[str], - cfg: dict[str, str], - timeout: float, -) -> dict[str, str]: - system_prompt = ( - "你是内容定位与订阅决策助手。" - "你必须输出合法JSON,不要输出其它文本。" - ) - joined_titles = "\n".join(f"- {t}" for t in titles) - joined_tags = "、".join(tags) if tags else "无" - rule_hint = heuristic_group_hint(titles, tags) - groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS) - - user_prompt = f""" -请基于以下信息完成分组与总结。 - -UP主: {name} -mid: {mid} -标签: {joined_tags} -最近标题: -{joined_titles} - -预设分组: -{groups_desc} - -代码规则初判: -{rule_hint} - -要求: -1) 输出JSON对象,字段严格为: summary, group, action, reason。 -2) summary: 一段中文总结,50-100字。 -3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。 -4) action: 只能是"保留关注"或"可以取关"。敏感一点,只保留真正核心优质的up,其他都建议取关。 -5) reason: 30-60字,解释为什么分到该组并给出该动作。 -""".strip() - - content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout) - return parse_ai_json(content) - - -def parse_ai_json(content: str) -> dict[str, str]: - text = content.strip() - if text.startswith("```"): - text = re.sub(r"^```[a-zA-Z]*\n?", "", text) - text = re.sub(r"\n?```$", "", text).strip() - m = re.search(r"\{.*\}", text, flags=re.DOTALL) - if m: - text = m.group(0) - data = json.loads(text) - summary = str(data.get("summary", "")).strip() - group = str(data.get("group", "")).strip() - action = str(data.get("action", "")).strip() - reason = str(data.get("reason", "")).strip() - if not summary: - raise RuntimeError("AI返回缺少summary") - if group not in PRESET_GROUPS: - raise RuntimeError(f"AI返回未知group: {group}") - if action not in ("保留关注", "可以取关"): - raise RuntimeError(f"AI返回未知action: {action}") - if not reason: - reason = "基于标题内容与更新风格综合判断。" - return { - "summary": summary, - "group": group, - "action": action, - "reason": reason, - } - - -def heuristic_group_hint(titles: list[str], tags: list[str]) -> str: - text = "\n".join(titles) + "\n" + " ".join(tags) - score: dict[str, int] = {k: 0 for k in PRESET_GROUPS} - lower_text = text.lower() - for group, words in PRESET_GROUPS.items(): - for w in words: - w_lower = w.lower() - if w_lower in lower_text: - score[group] += 1 - ranked = sorted(score.items(), key=lambda x: x[1], reverse=True) - best_group, best_score = ranked[0] - if best_score <= 0: - return "未命中关键词,倾向按内容专业度与稳定性判断。" - top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3]) - return f"关键词命中最高组={best_group}(score={best_score}),参考分布: {top3}" - - -def summarize_one_up_with_retry( - item: dict[str, Any], - cfg: dict[str, str], - max_retries: int, - timeout: float, - debug: bool, -) -> dict[str, str]: - last_exc: Exception | None = None - total_try = max(1, max_retries) - for attempt in range(1, total_try + 1): - try: - return summarize_one_up( - item["name"], - item["mid"], - item.get("titles", []), - item.get("tag", []), - cfg, - timeout=timeout, - ) - except Exception as exc: # noqa: BLE001 - last_exc = exc - if debug: - print(f"[debug] {item['name']} 第{attempt}次失败: {exc}") - if attempt < total_try: - time.sleep(min(2.0, 0.5 * attempt)) - raise RuntimeError(str(last_exc) if last_exc else "未知错误") - - -def build_report(items: list[dict[str, Any]], batch_note: str) -> str: - now = time.strftime("%Y-%m-%d %H:%M:%S") - lines: list[str] = [ - "# UP主内容分析报告(分批AI总结)", - "", - f"- 生成时间: {now}", - f"- 分析数量: {len(items)}", - f"- 处理说明: {batch_note}", - "", - ] - - group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS} - action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0} - for item in items: - g = item.get("group", "") - a = item.get("action", "") - if g in group_stats: - group_stats[g] += 1 - if a in action_stats: - action_stats[a] += 1 - - lines.append("## 分组统计") - lines.append("") - for g, c in group_stats.items(): - lines.append(f"- {g}: {c}") - lines.append(f"- 保留关注: {action_stats['保留关注']}") - lines.append(f"- 可以取关: {action_stats['可以取关']}") - lines.append("") - - for idx, item in enumerate(items, start=1): - lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") - lines.append("") - lines.append(f"- 主页: {item['url']}") - tags = item.get("tag", []) - lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") - lines.append("") - lines.append("### 最近10条标题") - lines.append("") - titles = item.get("titles", []) - if titles: - for t in titles: - lines.append(f"- {t}") - else: - lines.append("- (未抓取到标题)") - lines.append("") - - lines.append("### AI分析") - lines.append("") - analysis = item.get("analysis", "") - lines.append(analysis if analysis else "(待分析)") - lines.append("") - - lines.append("### 分组建议") - lines.append("") - group = item.get("group", "") - action = item.get("action", "") - reason = item.get("group_reason", "") - if group and action: - lines.append(f"- 预设分组: {group}") - lines.append(f"- 建议动作: {action}") - lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}") - else: - lines.append("- (待分组)") - lines.append("") - - error = item.get("error", "") - if error: - lines.append("### 异常") - lines.append("") - lines.append(f"- {error}") - lines.append("") - - return "\n".join(lines).rstrip() + "\n" - - -def main() -> int: - args = parse_args() - input_report = Path(args.input_report) - output_report = Path(args.output_report) - - if not input_report.exists(): - print(f"输入报告不存在: {input_report}", file=sys.stderr) - return 1 - - items = parse_report(input_report) - if not items: - print("输入报告未解析出任何UP条目", file=sys.stderr) - return 1 - - config = { - "VOLCENGINE_API_KEY": VOLCENGINE_API_KEY, - "VOLCENGINE_MODEL": VOLCENGINE_MODEL, - "VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL, - } - if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]): - inherited = load_api_config_from_script(Path(args.config_from)) - if inherited: - config.update(inherited) - - if args.force: - pending = [it for it in items if it.get("titles")] - else: - pending = [ - it for it in items - if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS - ] - - if not pending: - print("没有待分析条目,直接输出当前报告") - output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8") - return 0 - - index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} - success_total = 0 - failed_total = 0 - - batch_size = max(1, args.batch_size) - if args.run_all_batches: - total_batches = math.ceil(len(pending) / batch_size) - batch_indexes = list(range(1, total_batches + 1)) - print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}") - else: - batch_indexes = [max(1, args.batch_index)] - - workers = max(1, args.workers) - print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s") - - for batch_index in batch_indexes: - start = (batch_index - 1) * batch_size - end = start + batch_size - batch = pending[start:end] - if not batch: - continue - - print( - f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, " - f"本批{len(batch)}条, 待分析总数{len(pending)}" - ) - - success = 0 - failed = 0 - future_to_item: dict[Any, dict[str, Any]] = {} - with ThreadPoolExecutor(max_workers=workers) as executor: - for i, it in enumerate(batch, start=1): - print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})") - future = executor.submit( - summarize_one_up_with_retry, - it, - config, - max(1, args.max_retries), - float(args.request_timeout), - args.debug, - ) - future_to_item[future] = it - if args.sleep_seconds > 0: - time.sleep(args.sleep_seconds) - - done_count = 0 - for future in as_completed(future_to_item): - done_count += 1 - it = future_to_item[future] - idx = index_map.get(f"{it['mid']}::{it['name']}") - try: - ai_res = future.result() - if idx is not None: - items[idx]["analysis"] = ai_res["summary"] - items[idx]["group"] = ai_res["group"] - items[idx]["action"] = ai_res["action"] - items[idx]["group_reason"] = ai_res["reason"] - items[idx]["error"] = "" - success += 1 - print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})") - except Exception as exc: # noqa: BLE001 - if idx is not None: - items[idx]["error"] = str(exc) - failed += 1 - print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})") - if args.debug: - print(f"[debug] 失败详情: {exc}") - - success_total += success - failed_total += failed - - step_note = ( - f"第{batch_index}批完成: 成功{success}, 失败{failed}, " - f"本批{len(batch)}, 待分析总数{len(pending)}" - ) - output_report.parent.mkdir(parents=True, exist_ok=True) - output_report.write_text(build_report(items, step_note), encoding="utf-8") - print(f"第{batch_index}批写入完成: {output_report}") - - mode_text = "自动连续" if args.run_all_batches else "单批" - note = ( - f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, " - f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}" - ) - output_report.parent.mkdir(parents=True, exist_ok=True) - output_report.write_text(build_report(items, note), encoding="utf-8") - print(f"输出完成: {output_report}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/source/batch_ai_summary_from_report.py b/source/batch_ai_summary_from_report.py deleted file mode 100644 index 9e1a921..0000000 --- a/source/batch_ai_summary_from_report.py +++ /dev/null @@ -1,598 +0,0 @@ -#!/usr/bin/env python3 -"""Batch AI summary from existing UP markdown report. - -Read an existing report (e.g. source/up_analysis_report.md), -extract each UP's title list, and generate AI summaries in batches. -""" - -from __future__ import annotations - -import argparse -from concurrent.futures import ThreadPoolExecutor, as_completed -import json -import math -import re -import sys -import time -from pathlib import Path -from typing import Any -from urllib import request - -# Fill your Volcengine Ark settings here. -VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" -VOLCENGINE_MODEL = "deepseek-v3-1-terminus" -VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" - -SKIP_MARKERS = { - "", - "测试模式已跳过AI分析", - "(待分析)", -} - -# 预设分组及关键词规则(可自行扩展)。 -PRESET_GROUPS: dict[str, list[str]] = { - "AAA_核心每日必读":[ - "编程", "算法", "工程", "干货", "新闻", "趋势", - ], - "AA_编程信息干货必留": [ - "编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构", - ], - "A_硬核知识保留": [ - "科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度", - ], - "B_技能学习保留": [ - "英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作", - ], - "C_资讯快餐观察": [ - "热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势", - ], - "D_娱乐消遣可取关": [ - "搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣", - ], - "E_营销带货谨慎": [ - "好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活", - ], -} - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结") - parser.add_argument( - "--input-report", - default="source/output/reports/up_titles_report.md", - help="已有标题报告路径,默认: source/output/reports/up_titles_report.md", - ) - parser.add_argument( - "--output-report", - default="source/output/reports/up_analysis_full_auto.md", - help="输出报告路径,默认: source/output/reports/up_analysis_full_auto.md", - ) - parser.add_argument( - "--batch-size", - type=int, - default=20, - help="每批处理数量,默认: 20", - ) - parser.add_argument( - "--batch-index", - type=int, - default=1, - help="批次序号(从1开始),默认: 1", - ) - parser.add_argument( - "--sleep-seconds", - type=float, - default=0.0, - help="提交任务间隔秒数,默认: 0(并发模式建议0)", - ) - parser.add_argument( - "--workers", - type=int, - default=4, - help="并发请求数,默认: 4", - ) - parser.add_argument( - "--max-retries", - type=int, - default=2, - help="单个UP分析最大重试次数,默认: 2", - ) - parser.add_argument( - "--request-timeout", - type=float, - default=60.0, - help="单次AI请求超时秒数,默认: 60", - ) - parser.add_argument( - "--force", - action="store_true", - help="强制覆盖已有AI分析(默认只处理待分析项)", - ) - parser.add_argument( - "--debug", - action="store_true", - help="输出调试信息", - ) - parser.add_argument( - "--config-from", - default="source/analyze_up_content.py", - help="自动读取API配置的脚本路径,默认: source/analyze_up_content.py", - ) - parser.add_argument( - "--run-all-batches", - action="store_true", - help="自动连续跑完所有批次(忽略batch-index)", - ) - return parser.parse_args() - - -def load_api_config_from_script(path: Path) -> dict[str, str]: - if not path.exists(): - return {} - text = path.read_text(encoding="utf-8", errors="replace") - result: dict[str, str] = {} - for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"): - m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE) - if m: - result[key] = m.group(1).strip() - return result - - -def parse_report(path: Path) -> list[dict[str, Any]]: - lines = path.read_text(encoding="utf-8").splitlines() - - items: list[dict[str, Any]] = [] - current: dict[str, Any] | None = None - section = "" - - for line in lines: - m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) - if m: - if current is not None: - items.append(current) - mid = int(m.group(2)) - current = { - "mid": mid, - "name": m.group(1).strip(), - "tag": [], - "url": f"https://space.bilibili.com/{mid}/video", - "titles": [], - "analysis": "", - "group": "", - "action": "", - "reason": "", - "error": "", - } - section = "" - continue - - if current is None: - continue - - if line.startswith("- 主页: "): - current["url"] = line.replace("- 主页: ", "", 1).strip() - continue - if line.startswith("- 标签: "): - raw = line.replace("- 标签: ", "", 1).strip() - current["tag"] = [] if raw in ("", "无") else [x.strip() for x in raw.split(",") if x.strip()] - continue - - if line == "### 最近10条标题": - section = "titles" - continue - if line == "### AI分析": - section = "analysis" - continue - if line == "### 分组建议": - section = "group" - continue - if line == "### 异常": - section = "error" - continue - if line.startswith("### "): - section = "" - continue - - if section == "titles" and line.startswith("- "): - text = line[2:].strip() - if text and text != "(未抓取到标题)": - current["titles"].append(text) - elif section == "analysis" and line.strip(): - current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip() - elif section == "group": - if line.startswith("- 预设分组: "): - current["group"] = line.replace("- 预设分组: ", "", 1).strip() - elif line.startswith("- 建议动作: "): - current["action"] = line.replace("- 建议动作: ", "", 1).strip() - elif line.startswith("- 判断依据: "): - current["reason"] = line.replace("- 判断依据: ", "", 1).strip() - elif line.strip() == "(待分组)": - current["group"] = "" - current["action"] = "" - current["reason"] = "" - elif section == "error" and line.startswith("- "): - current["error"] = line[2:].strip() - - if current is not None: - items.append(current) - - return items - - -def call_volcengine_chat( - system_prompt: str, - user_prompt: str, - cfg: dict[str, str], - timeout: float, -) -> str: - api_key = cfg.get("VOLCENGINE_API_KEY", "").strip() - model = cfg.get("VOLCENGINE_MODEL", "").strip() - base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip() - - if (not api_key) or ("在这里填" in api_key): - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") - if (not model) or ("在这里填" in model): - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") - if not base_url: - raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") - - payload = { - "model": model, - "messages": [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - "temperature": 0.4, - } - - body = json.dumps(payload, ensure_ascii=False).encode("utf-8") - req = request.Request( - f"{base_url.rstrip('/')}/chat/completions", - data=body, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - method="POST", - ) - - with request.urlopen(req, timeout=timeout) as resp: - text = resp.read().decode("utf-8", errors="replace") - - data = json.loads(text) - content = data.get("choices", [{}])[0].get("message", {}).get("content", "") - if not isinstance(content, str) or not content.strip(): - raise RuntimeError(f"AI响应异常: {text[:500]}") - return content.strip() - - -def summarize_one_up( - name: str, - mid: int, - titles: list[str], - tags: list[str], - cfg: dict[str, str], - timeout: float, -) -> dict[str, str]: - system_prompt = ( - "你是内容定位与订阅决策助手。" - "你必须输出合法JSON,不要输出其它文本。" - ) - joined_titles = "\n".join(f"- {t}" for t in titles) - joined_tags = "、".join(tags) if tags else "无" - rule_hint = heuristic_group_hint(titles, tags) - groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS) - - user_prompt = f""" -请基于以下信息完成分组与总结。 - -UP主: {name} -mid: {mid} -标签: {joined_tags} -最近标题: -{joined_titles} - -预设分组: -{groups_desc} - -代码规则初判: -{rule_hint} - -要求: -1) 输出JSON对象,字段严格为: summary, group, action, reason。 -2) summary: 一段中文总结,50-100字。 -3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。 -4) action: 只能是"保留关注"或"可以取关"。敏感一点,只保留真正核心优质的up,其他都建议取关。 -5) reason: 30-60字,解释为什么分到该组并给出该动作。 -""".strip() - - content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout) - return parse_ai_json(content) - - -def parse_ai_json(content: str) -> dict[str, str]: - text = content.strip() - if text.startswith("```"): - text = re.sub(r"^```[a-zA-Z]*\n?", "", text) - text = re.sub(r"\n?```$", "", text).strip() - m = re.search(r"\{.*\}", text, flags=re.DOTALL) - if m: - text = m.group(0) - data = json.loads(text) - summary = str(data.get("summary", "")).strip() - group = str(data.get("group", "")).strip() - action = str(data.get("action", "")).strip() - reason = str(data.get("reason", "")).strip() - if not summary: - raise RuntimeError("AI返回缺少summary") - if group not in PRESET_GROUPS: - raise RuntimeError(f"AI返回未知group: {group}") - if action not in ("保留关注", "可以取关"): - raise RuntimeError(f"AI返回未知action: {action}") - if not reason: - reason = "基于标题内容与更新风格综合判断。" - return { - "summary": summary, - "group": group, - "action": action, - "reason": reason, - } - - -def heuristic_group_hint(titles: list[str], tags: list[str]) -> str: - text = "\n".join(titles) + "\n" + " ".join(tags) - score: dict[str, int] = {k: 0 for k in PRESET_GROUPS} - lower_text = text.lower() - for group, words in PRESET_GROUPS.items(): - for w in words: - w_lower = w.lower() - if w_lower in lower_text: - score[group] += 1 - ranked = sorted(score.items(), key=lambda x: x[1], reverse=True) - best_group, best_score = ranked[0] - if best_score <= 0: - return "未命中关键词,倾向按内容专业度与稳定性判断。" - top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3]) - return f"关键词命中最高组={best_group}(score={best_score}),参考分布: {top3}" - - -def summarize_one_up_with_retry( - item: dict[str, Any], - cfg: dict[str, str], - max_retries: int, - timeout: float, - debug: bool, -) -> dict[str, str]: - last_exc: Exception | None = None - total_try = max(1, max_retries) - for attempt in range(1, total_try + 1): - try: - return summarize_one_up( - item["name"], - item["mid"], - item.get("titles", []), - item.get("tag", []), - cfg, - timeout=timeout, - ) - except Exception as exc: # noqa: BLE001 - last_exc = exc - if debug: - print(f"[debug] {item['name']} 第{attempt}次失败: {exc}") - if attempt < total_try: - time.sleep(min(2.0, 0.5 * attempt)) - raise RuntimeError(str(last_exc) if last_exc else "未知错误") - - -def build_report(items: list[dict[str, Any]], batch_note: str) -> str: - now = time.strftime("%Y-%m-%d %H:%M:%S") - lines: list[str] = [ - "# UP主内容分析报告(分批AI总结)", - "", - f"- 生成时间: {now}", - f"- 分析数量: {len(items)}", - f"- 处理说明: {batch_note}", - "", - ] - - group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS} - action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0} - for item in items: - g = item.get("group", "") - a = item.get("action", "") - if g in group_stats: - group_stats[g] += 1 - if a in action_stats: - action_stats[a] += 1 - - lines.append("## 分组统计") - lines.append("") - for g, c in group_stats.items(): - lines.append(f"- {g}: {c}") - lines.append(f"- 保留关注: {action_stats['保留关注']}") - lines.append(f"- 可以取关: {action_stats['可以取关']}") - lines.append("") - - for idx, item in enumerate(items, start=1): - lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") - lines.append("") - lines.append(f"- 主页: {item['url']}") - tags = item.get("tag", []) - lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") - lines.append("") - lines.append("### 最近10条标题") - lines.append("") - titles = item.get("titles", []) - if titles: - for t in titles: - lines.append(f"- {t}") - else: - lines.append("- (未抓取到标题)") - lines.append("") - - lines.append("### AI分析") - lines.append("") - analysis = item.get("analysis", "") - lines.append(analysis if analysis else "(待分析)") - lines.append("") - - lines.append("### 分组建议") - lines.append("") - group = item.get("group", "") - action = item.get("action", "") - reason = item.get("group_reason", "") - if group and action: - lines.append(f"- 预设分组: {group}") - lines.append(f"- 建议动作: {action}") - lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}") - else: - lines.append("- (待分组)") - lines.append("") - - error = item.get("error", "") - if error: - lines.append("### 异常") - lines.append("") - lines.append(f"- {error}") - lines.append("") - - return "\n".join(lines).rstrip() + "\n" - - -def main() -> int: - args = parse_args() - input_report = Path(args.input_report) - output_report = Path(args.output_report) - - if not input_report.exists(): - print(f"输入报告不存在: {input_report}", file=sys.stderr) - return 1 - - items = parse_report(input_report) - if not items: - print("输入报告未解析出任何UP条目", file=sys.stderr) - return 1 - - config = { - "VOLCENGINE_API_KEY": VOLCENGINE_API_KEY, - "VOLCENGINE_MODEL": VOLCENGINE_MODEL, - "VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL, - } - if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]): - inherited = load_api_config_from_script(Path(args.config_from)) - if inherited: - config.update(inherited) - - if args.force: - pending = [it for it in items if it.get("titles")] - # else: - # pending = [ - # it for it in items - # if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS - # ] - else: - pending = [ - it for it in items - if it.get("titles") and ( - it.get("analysis", "").strip() in SKIP_MARKERS - or not it.get("group") # 没有分组也要重跑 - ) - ] - - if not pending: - print("没有待分析条目,直接输出当前报告") - output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8") - return 0 - - index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} - success_total = 0 - failed_total = 0 - - batch_size = max(1, args.batch_size) - if args.run_all_batches: - total_batches = math.ceil(len(pending) / batch_size) - batch_indexes = list(range(1, total_batches + 1)) - print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}") - else: - batch_indexes = [max(1, args.batch_index)] - - workers = max(1, args.workers) - print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s") - - for batch_index in batch_indexes: - start = (batch_index - 1) * batch_size - end = start + batch_size - batch = pending[start:end] - if not batch: - continue - - print( - f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, " - f"本批{len(batch)}条, 待分析总数{len(pending)}" - ) - - success = 0 - failed = 0 - future_to_item: dict[Any, dict[str, Any]] = {} - with ThreadPoolExecutor(max_workers=workers) as executor: - for i, it in enumerate(batch, start=1): - print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})") - future = executor.submit( - summarize_one_up_with_retry, - it, - config, - max(1, args.max_retries), - float(args.request_timeout), - args.debug, - ) - future_to_item[future] = it - if args.sleep_seconds > 0: - time.sleep(args.sleep_seconds) - - done_count = 0 - for future in as_completed(future_to_item): - done_count += 1 - it = future_to_item[future] - idx = index_map.get(f"{it['mid']}::{it['name']}") - try: - ai_res = future.result() - if idx is not None: - items[idx]["analysis"] = ai_res["summary"] - items[idx]["group"] = ai_res["group"] - items[idx]["action"] = ai_res["action"] - items[idx]["group_reason"] = ai_res["reason"] - items[idx]["error"] = "" - success += 1 - print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})") - except Exception as exc: # noqa: BLE001 - if idx is not None: - items[idx]["error"] = str(exc) - failed += 1 - print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})") - if args.debug: - print(f"[debug] 失败详情: {exc}") - - success_total += success - failed_total += failed - - step_note = ( - f"第{batch_index}批完成: 成功{success}, 失败{failed}, " - f"本批{len(batch)}, 待分析总数{len(pending)}" - ) - output_report.parent.mkdir(parents=True, exist_ok=True) - output_report.write_text(build_report(items, step_note), encoding="utf-8") - print(f"第{batch_index}批写入完成: {output_report}") - - mode_text = "自动连续" if args.run_all_batches else "单批" - note = ( - f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, " - f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}" - ) - output_report.parent.mkdir(parents=True, exist_ok=True) - output_report.write_text(build_report(items, note), encoding="utf-8") - print(f"输出完成: {output_report}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/source/extract_group_info.py b/source/extract_group_info.py deleted file mode 100644 index 9f9c2ae..0000000 --- a/source/extract_group_info.py +++ /dev/null @@ -1,101 +0,0 @@ -import argparse -import re - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="提取UP分组信息") - parser.add_argument( - "--input", - default="source/19_53_no_titles.md", - help="输入报告路径", - ) - parser.add_argument( - "--output", - help="输出报告路径(默认覆盖输入)", - ) - return parser.parse_args() - -def main(): - args = parse_args() - input_file = args.input - output_file = args.output or input_file - - with open(input_file, 'r', encoding='utf-8') as f: - content = f.read() - - lines = content.split('\n') - section_starts = [] - for i, line in enumerate(lines): - if line.startswith('## '): - section_starts.append(i) - - if len(section_starts) < 2: - print('No sections found') - return 1 - - header = '\n'.join(lines[:section_starts[0]]) - sections = [] - - for idx in range(len(section_starts)): - start = section_starts[idx] - end = section_starts[idx + 1] if idx + 1 < len(section_starts) else len(lines) - section = '\n'.join(lines[start:end]) - sections.append(section) - - sections = sections[1:] - - parsed = [] - for sec in sections: - match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec) - if match: - num = int(match.group(1)) - name = match.group(2) - mid = match.group(3) - - group_m = re.search(r'- 预设分组: (.+)', sec) - action_m = re.search(r'- 建议动作: (.+)', sec) - reason_m = re.search(r'- 判断依据: (.+)', sec) - error_m = re.search(r'AI返回未知group: (.+)', sec) - - group = group_m.group(1).strip() if group_m else "" - action = action_m.group(1).strip() if action_m else "" - reason = reason_m.group(1).strip() if reason_m else "" - error = error_m.group(1).strip() if error_m else "" - - parsed.append({ - 'num': num, - 'name': name, - 'mid': mid, - 'group': group, - 'action': action, - 'reason': reason, - 'error': error - }) - - parsed.sort(key=lambda x: (x['name'].casefold(), int(x['mid']))) - - lines_out = [header, ""] - - for p in parsed: - lines_out.append(f"## {p['num']}. {p['name']} (mid: {p['mid']})") - lines_out.append("") - if p['group']: - lines_out.append(f"- 预设分组: {p['group']}") - if p['action']: - lines_out.append(f"- 建议动作: {p['action']}") - if p['reason']: - lines_out.append(f"- 判断依据: {p['reason']}") - if p['error']: - lines_out.append(f"- 异常: {p['error']}") - lines_out.append("") - - result = '\n'.join(lines_out) - result = re.sub(r'\n{3,}', '\n\n', result) - - with open(output_file, 'w', encoding='utf-8') as f: - f.write(result) - - print(f'Extracted {len(parsed)} sections') - return 0 - -if __name__ == "__main__": - raise SystemExit(main()) \ No newline at end of file diff --git a/source/extract_keep_follow_doc.py b/source/extract_keep_follow_doc.py deleted file mode 100644 index 8a96896..0000000 --- a/source/extract_keep_follow_doc.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import argparse -import re -import time -from pathlib import Path - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="提取非取关UP的AI分析与分组建议") - parser.add_argument( - "--input-report", - default="source/output/reports/up_analysis_full_auto.md", - help="输入分析报告路径,默认: source/output/reports/up_analysis_full_auto.md", - ) - parser.add_argument( - "--output-report", - default="source/output/reports/up_keep_follow_only.md", - help="输出保留关注报告路径,默认: source/output/reports/up_keep_follow_only.md", - ) - return parser.parse_args() - - -def main() -> int: - args = parse_args() - src = Path(args.input_report) - dst = Path(args.output_report) - - if not src.exists(): - print(f"来源文件不存在: {src}") - return 1 - - text = src.read_text(encoding="utf-8") - pattern = r"^##\s+\d+\.\s+(.+?)\s+\(mid:\s*(\d+)\)\s*$" - matches = list(re.finditer(pattern, text, re.MULTILINE)) - - items: list[tuple[str, str, str, str, str, str]] = [] - for i, m in enumerate(matches): - start = m.start() - end = matches[i + 1].start() if i + 1 < len(matches) else len(text) - section = text[start:end] - - name = m.group(1).strip() - mid = m.group(2).strip() - - action_m = re.search(r"-\s*建议动作:\s*(.+)", section) - action = action_m.group(1).strip() if action_m else "" - # 反逻辑:没有"建议动作: 可以取关"就保留 - if action == "可以取关": - continue - - ai_m = re.search(r"###\s*AI分析\s*\n([\s\S]*?)(?=\n###\s|\Z)", section) - ai_text = ai_m.group(1).strip() if ai_m else "" - - group_m = re.search(r"###\s*分组建议\s*\n([\s\S]*?)(?=\n###\s|\Z)", section) - group_text = group_m.group(1).strip() if group_m else "" - - error_m = re.search(r"###\s*异常\s*\n([\s\S]*?)(?=\n###\s|\Z)", section) - error_text = error_m.group(1).strip() if error_m else "" - - items.append((name, mid, ai_text, group_text, action, error_text)) - - # 按昵称首字母A-Z排序(同名时按mid升序) - items.sort(key=lambda x: (x[0].casefold(), int(x[1]))) - - lines = [ - "# 保留关注UP主分析与分组建议", - "", - f"- 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}", - f"- 来源文件: {src.name}", - f"- 条目数: {len(items)}", - "", - ] - - for idx, (name, mid, ai_text, group_text, action, error_text) in enumerate(items, 1): - lines.append(f"## {idx}. {name} (mid: {mid})") - lines.append("") - - lines.append("### AI分析") - lines.append("") - lines.append(ai_text if ai_text else "(无)") - lines.append("") - - lines.append("### 分组建议") - lines.append("") - lines.append(group_text if group_text else f"- 建议动作: {action if action else '(无)'}") - lines.append("") - - if error_text: - lines.append("### 异常") - lines.append("") - lines.append(error_text) - lines.append("") - - dst.parent.mkdir(parents=True, exist_ok=True) - dst.write_text("\n".join(lines), encoding="utf-8") - print(f"已生成: {dst}") - print(f"保留条目: {len(items)}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) \ No newline at end of file diff --git a/source/extract_unfollow_list.py b/source/extract_unfollow_list.py deleted file mode 100644 index 0b5b919..0000000 --- a/source/extract_unfollow_list.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python3 -"""Extract UPs marked as "可以取关" and output their mids to CSV. - -Read an UP analysis report and extract all UPs with action "可以取关", -then output their mids to a CSV file. -""" - -from __future__ import annotations - -import argparse -import csv -import re -import sys -from pathlib import Path -from typing import Any - - -def parse_report(report_path: Path) -> list[dict[str, Any]]: - """解析Markdown格式的UP分析报告,返回UP列表""" - if not report_path.exists(): - return [] - - text = report_path.read_text(encoding="utf-8") - items = [] - - # 按UP项分割(每个UP项以"## N. 名字 (mid: ...)"开头) - pattern = r"^## \d+\. (.+?)\s+\(mid:\s*(\d+)\)" - matches = list(re.finditer(pattern, text, re.MULTILINE)) - - for i, match in enumerate(matches): - start = match.start() - end = matches[i + 1].start() if i + 1 < len(matches) else len(text) - section = text[start:end] - - name = match.group(1).strip() - mid = int(match.group(2)) - - # 提取建议动作 - action_match = re.search(r"- 建议动作: (.+?)(?:\n|$)", section) - action = action_match.group(1).strip() if action_match else "" - - items.append({ - "mid": mid, - "name": name, - "action": action, - }) - - return items - - -def main() -> int: - parser = argparse.ArgumentParser(description="从UP分析报告中提取可以取关的UP") - parser.add_argument( - "--input-report", - default="source/output/reports/up_analysis_full_auto.md", - help="输入报告路径,默认: source/output/reports/up_analysis_full_auto.md", - ) - parser.add_argument( - "--output-csv", - default="source/output/uids/unfollow_mids_list.txt", - help="输出文件路径,默认: source/output/uids/unfollow_mids_list.txt", - ) - parser.add_argument( - "--format", - choices=["csv", "mid-only", "json"], - default="mid-only", - help="输出格式:csv(mid,name), mid-only(仅mid逗号分隔), json(JSON格式)", - ) - parser.add_argument( - "--with-names", - action="store_true", - help="在mid后添加UP名称(仅mid-only格式生效)", - ) - parser.add_argument( - "--split-size", - type=int, - default=0, - help="可选:将mid-only结果按N个一组拆分多个文件,例如100", - ) - - args = parser.parse_args() - - input_report = Path(args.input_report) - output_csv = Path(args.output_csv) - - if not input_report.exists(): - print(f"错误: 输入报告不存在: {input_report}", file=sys.stderr) - return 1 - - print(f"读取报告: {input_report}") - items = parse_report(input_report) - - if not items: - print("未能从报告中解析任何UP", file=sys.stderr) - return 1 - - # 筛选可以取关的UP - unfollow_items = [it for it in items if it.get("action") == "可以取关"] - - print(f"总 UP 数: {len(items)}") - print(f"可以取关: {len(unfollow_items)}") - - if not unfollow_items: - print("没有可以取关的UP") - return 0 - - # 输出格式 - if args.format == "csv": - # 标准CSV格式:mid, name - output_csv.parent.mkdir(parents=True, exist_ok=True) - with open(output_csv, "w", newline="", encoding="utf-8") as f: - writer = csv.DictWriter(f, fieldnames=["mid", "name"]) - writer.writeheader() - for item in unfollow_items: - writer.writerow({"mid": item["mid"], "name": item["name"]}) - - print(f"\n✓ 已输出CSV格式到: {output_csv}") - print(f" 格式: mid,name") - print(f" 行数: {len(unfollow_items)}") - - elif args.format == "mid-only": - # 仅mid,逗号分隔 - mids = [str(it["mid"]) for it in unfollow_items] - - if args.with_names: - # mid:name 格式 - content = ",".join([f"{it['mid']}:{it['name']}" for it in unfollow_items]) - print(f"\n✓ 已输出mid:name列表到: {output_csv}") - print(f" 格式: mid1:name1,mid2:name2,...") - else: - # 仅mid - content = ",".join(mids) - print(f"\n✓ 已输出mid列表到: {output_csv}") - print(f" 格式: mid1,mid2,mid3,...") - - output_csv.parent.mkdir(parents=True, exist_ok=True) - output_csv.write_text(content, encoding="utf-8") - print(f" 数量: {len(mids)}") - - split_size = max(0, int(args.split_size)) - if split_size > 0: - groups = [mids[i:i + split_size] for i in range(0, len(mids), split_size)] - stem = output_csv.stem - suffix = output_csv.suffix or ".txt" - for i, group in enumerate(groups, start=1): - part_path = output_csv.with_name(f"{stem}_{i}{suffix}") - part_path.write_text(",".join(group), encoding="utf-8") - print(f" 已按每组{split_size}个拆分为{len(groups)}个文件") - - elif args.format == "json": - # JSON格式 - import json - - data = [{"mid": it["mid"], "name": it["name"]} for it in unfollow_items] - output_csv.parent.mkdir(parents=True, exist_ok=True) - output_csv.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") - - print(f"\n✓ 已输出JSON格式到: {output_csv}") - print(f" 数量: {len(data)}") - - # 显示前10个示例 - if len(unfollow_items) > 0: - print(f"\n📋 示例(前10个):") - for item in unfollow_items[:10]: - print(f" - {item['mid']}: {item['name']}") - - if len(unfollow_items) > 10: - print(f" ... 还有 {len(unfollow_items) - 10} 个") - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/source/remove_10content.py b/source/remove_10content.py deleted file mode 100644 index 6846bb6..0000000 --- a/source/remove_10content.py +++ /dev/null @@ -1,67 +0,0 @@ -import argparse -import re - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="删除最近10条标题内容") - parser.add_argument( - "--input", - default="source/output/reports/up_analysis_full_auto.md", - help="输入报告路径", - ) - parser.add_argument( - "--output", - help="输出报告路径(默认覆盖输入)", - ) - return parser.parse_args() - -def main(): - args = parse_args() - input_file = args.input - output_file = args.output or input_file - - with open(input_file, 'r', encoding='utf-8') as f: - content = f.read() - - lines = content.split('\n') - new_lines = [] - i = 0 - while i < len(lines): - line = lines[i] - new_lines.append(line) - - if line.startswith('## '): - i += 1 - while i < len(lines): - curr = lines[i] - if curr.startswith('## '): - break - if curr.startswith('### '): - if '最近10条标题' in curr: - i += 1 - while i < len(lines) and lines[i].startswith(' - '): - i += 1 - continue - else: - break - if curr.startswith('- ') and not curr.startswith(' - '): - i += 1 - continue - if curr.startswith(' - '): - i += 1 - continue - new_lines.append(curr) - i += 1 - else: - i += 1 - - result = '\n'.join(new_lines) - result = re.sub(r'\n{3,}', '\n\n', result) - - with open(output_file, 'w', encoding='utf-8') as f: - f.write(result) - - print(f'Done: {output_file}') - return 0 - -if __name__ == "__main__": - raise SystemExit(main()) \ No newline at end of file diff --git a/source/run_pipeline.py b/source/run_pipeline.py deleted file mode 100644 index 472a00f..0000000 --- a/source/run_pipeline.py +++ /dev/null @@ -1,200 +0,0 @@ -#!/usr/bin/env python3 -"""One-command pipeline: fetch titles -> batch analyze -> outputs. - -Pipeline outputs: -1) source/output/reports/up_titles_report.md -2) source/output/reports/up_analysis_full_auto.md -3) source/output/reports/up_keep_follow_only.md -4) source/output/uids/unfollow_mids_list.txt (+ split files) - -Pipeline steps: -1) 抓取视频标题 (analyze_up_content.py) -2) 分批AI分析 (batch_ai_summary_from_report.py) -3) 生成保留关注报告 (extract_keep_follow_doc.py) -4) 生成取关UID列表 (extract_unfollow_list.py) -5) 按首字母排序 (sort_up_main.py) -6) 提取分组信息 (extract_group_info.py) -""" - -from __future__ import annotations - -import argparse -import subprocess -import sys -from pathlib import Path - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="一键运行完整功能链") - parser.add_argument( - "--input-json", - default="source/resources/export_uids.json", - help="UP资源文件路径,默认: source/resources/export_uids.json", - ) - parser.add_argument( - "--titles-report", - default="source/output/reports/up_titles_report.md", - help="标题抓取报告输出路径", - ) - parser.add_argument( - "--analysis-report", - default="source/output/reports/up_analysis_full_auto.md", - help="分批分析报告输出路径", - ) - parser.add_argument( - "--keep-report", - default="source/output/reports/up_keep_follow_only.md", - help="保留关注报告输出路径", - ) - parser.add_argument( - "--unfollow-uids", - default="source/output/uids/unfollow_mids_list.txt", - help="取关UID输出路径", - ) - parser.add_argument( - "--group_info", - default="source/output/uids/only_group_info.md", - help="分组信息输出路径", - ) - parser.add_argument("--titles-per-up", type=int, default=10, help="每个UP抓取标题数量") - parser.add_argument("--batch-size", type=int, default=20, help="分批分析每批条数") - parser.add_argument("--workers", type=int, default=6, help="并发请求数") - parser.add_argument("--max-retries", type=int, default=2, help="单条分析重试次数") - parser.add_argument("--request-timeout", type=float, default=60.0, help="单次请求超时") - parser.add_argument("--split-size", type=int, default=100, help="取关UID拆分分组大小") - parser.add_argument("--sleep-seconds", type=float, default=0.0, help="任务间隔秒数") - parser.add_argument("--retry-times", type=int, default=3, help="抓取重试次数") - parser.add_argument("--fetch-mode", choices=["auto", "api", "html"], default="auto", help="标题抓取模式") - parser.add_argument("--only-tag", default="", help="可选:仅处理包含该标签的UP") - parser.add_argument("--max-ups", type=int, default=0, help="可选:限制处理UP数量") - parser.add_argument("--bili-cookie", default="", help="可选:运行时传入B站Cookie") - parser.add_argument("--skip-fetch", action="store_true", help="跳过抓取阶段,直接使用已有标题报告") - parser.add_argument("--skip-analyze", action="store_true", help="跳过分析阶段,直接做产物提取") - parser.add_argument("--skip-sort", action="store_true", help="跳过排序阶段") - parser.add_argument("--skip-group", action="store_true", help="跳过提取分组阶段") - parser.add_argument("--python", default=sys.executable, help="指定Python解释器") - return parser.parse_args() - - -def run_cmd(cmd: list[str], title: str) -> None: - print(f"\n=== {title} ===") - print("$", " ".join(cmd)) - subprocess.run(cmd, check=True) - - -def main() -> int: - args = parse_args() - - for p in [ - Path(args.titles_report).parent, - Path(args.analysis_report).parent, - Path(args.keep_report).parent, - Path(args.unfollow_uids).parent, - ]: - p.mkdir(parents=True, exist_ok=True) - - if not args.skip_fetch: - fetch_cmd = [ - args.python, - "source/analyze_up_content.py", - "--input", - args.input_json, - "--output", - args.titles_report, - "--titles-per-up", - str(max(1, args.titles_per_up)), - "--retry-times", - str(max(1, args.retry_times)), - "--fetch-mode", - args.fetch_mode, - "--sleep-seconds", - str(max(0.0, args.sleep_seconds)), - "--skip-ai", - ] - if args.only_tag: - fetch_cmd += ["--only-tag", args.only_tag] - if args.max_ups > 0: - fetch_cmd += ["--max-ups", str(args.max_ups)] - if args.bili_cookie: - fetch_cmd += ["--bili-cookie", args.bili_cookie] - - run_cmd(fetch_cmd, "步骤1/6 抓取视频标题") - - if not args.skip_analyze: - analyze_cmd = [ - args.python, - "source/batch_ai_summary_from_report.py", - "--input-report", - args.titles_report, - "--output-report", - args.analysis_report, - "--batch-size", - str(max(1, args.batch_size)), - "--run-all-batches", - "--workers", - str(max(1, args.workers)), - "--max-retries", - str(max(1, args.max_retries)), - "--request-timeout", - str(max(1.0, args.request_timeout)), - "--sleep-seconds", - str(max(0.0, args.sleep_seconds)), - ] - run_cmd(analyze_cmd, "步骤2/6 分批AI分析") - - keep_cmd = [ - args.python, - "source/extract_keep_follow_doc.py", - "--input-report", - args.analysis_report, - "--output-report", - args.keep_report, - ] - run_cmd(keep_cmd, "步骤3/6 生成保留关注报告") - - uid_cmd = [ - args.python, - "source/extract_unfollow_list.py", - "--input-report", - args.analysis_report, - "--output-csv", - args.unfollow_uids, - "--format", - "mid-only", - "--split-size", - str(max(0, args.split_size)), - ] - run_cmd(uid_cmd, "步骤4/6 生成取关UID列表") - - if not args.skip_sort: - sort_cmd = [ - args.python, - "source/sort_up_main.py", - "--input", - args.analysis_report, - "--output", - args.analysis_report, - ] - run_cmd(sort_cmd, "步骤5/6 按首字母排序") - - if not args.skip_group: - group_cmd = [ - args.python, - "source/extract_group_info.py", - "--input", - args.analysis_report, - "--output", - args.analysis_report, - ] - run_cmd(group_cmd, "步骤6/6 提取分组信息") - - print("\n流水线完成。") - print(f"- 标题报告: {args.titles_report}") - print(f"- 分析报告: {args.analysis_report}") - print(f"- 保留报告: {args.keep_report}") - print(f"- 取关UID: {args.unfollow_uids}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/source/sort_up_main.py b/source/sort_up_main.py deleted file mode 100644 index 0fb4800..0000000 --- a/source/sort_up_main.py +++ /dev/null @@ -1,93 +0,0 @@ -import argparse -import re - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="对UP主按首字母排序") - parser.add_argument( - "--input", - default="source/output/reports/up_analysis_full_auto.md", - help="输入报告路径", - ) - parser.add_argument( - "--output", - help="输出报告路径(默认覆盖输入)", - ) - return parser.parse_args() - -def main(): - args = parse_args() - input_file = args.input - output_file = args.output or input_file - - with open(input_file, 'r', encoding='utf-8') as f: - content = f.read() - - lines = content.split('\n') - - header_lines = [] - section_starts = [] - for i, line in enumerate(lines): - if line.startswith('## '): - section_starts.append(i) - - if len(section_starts) < 2: - print('No sections found') - return 1 - - header = '\n'.join(lines[:section_starts[0]]) - sections_data = [] - - for idx in range(len(section_starts)): - start = section_starts[idx] - if idx + 1 < len(section_starts): - end = section_starts[idx + 1] - else: - end = len(lines) - - section_lines = lines[start:end] - section_text = '\n'.join(section_lines) - sections_data.append(section_text) - - sections_data = sections_data[1:] - - parsed = [] - for sec in sections_data: - match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec) - if match: - num = int(match.group(1)) - name = match.group(2) - mid = match.group(3) - parsed.append({ - 'num': num, - 'name': name, - 'mid': mid, - 'content': sec - }) - - def sort_key(item): - name = item['name'] - first_char = name[0].lower() if name else '' - if first_char.isdigit(): - return '0' + first_char - elif first_char.isalpha(): - return '1' + first_char - else: - return '2' + first_char - - parsed.sort(key=sort_key) - - new_content = header + '\n' - for i, sec in enumerate(parsed): - new_content += sec['content'] + '\n' - - with open(output_file, 'w', encoding='utf-8') as f: - f.write(new_content) - - print(f'Sorted {len(parsed)} sections') - print('First 10:') - for s in parsed[:10]: - print(f' {s["name"]}') - return 0 - -if __name__ == "__main__": - raise SystemExit(main()) \ No newline at end of file diff --git a/source/unfollow_mids_list.txt b/source/unfollow_mids_list.txt deleted file mode 100644 index 34abdcb..0000000 --- a/source/unfollow_mids_list.txt +++ /dev/null @@ -1 +0,0 @@ -439478093,1044673687,1481344732,1858861103,444728505,23947287,35807625,111714204,1587138171,440798355,33291981,11914415,436700803,3493282273299102,612593877,2125857107,2000819931,2053592854,507448807,505935166,14524124,385200931,669397302,1769820463,1562896062,3493285194632125,3493264275540254,479424216,604710494,1016523517,1428318343,700817047,1634586116,543931674,1590538073,1574721168,432752294,15247078,3494376355400290,1795221360,4848323,495224316,3493258518858434,379247856,32360194,381653678,274928598,475656605,365212208,3546378525477862,35339643,1747335,1263990139,3493263038220393,251642119,387412319,1212367465,589747109,1025542770,23770618,3494350482836026,54091976,599449178,1715594148,3493127266503448,1767282898,487505057,630874464,1264711195,3537118481615036,319358609,518742534,385172962,4401694,474803476,629408291,525382468,3546595513600180,295993972,287591832,476819048,21435789,1725223092,2114928296,174471602,1480366563,17095888,295100453,1305776725,25694274,14797570,166828,385126080,3546610074126569,3461582166166488,3537120815745590,489302782,73674032,1500074803,68134500,1047158092,3546571071293861,1246792669,124806013,26055664,441631812,243680430,601300995,108526737,2100151539,3546603229023143,1749224369,3493133887211865,56300844,255139870,1029508158,23244398,3493291869866324,3494354444355822,3546593938639500,1098004826,94577838,21849780,35105301,423319981,535023713,224560702,524899324,3546637651675315,3494361759221832,1640934198,1710911403,14342271,2031277323,603430640,3546568640694467,1741962246,1304346514,283389925,3461575868418125,3546622413768823,3494364269513335,185549749,502539494,73528331,510767506,3461579156752681,238171381,3546627212052911,448165099,1975692083,542824499,16243913,3494354016537425,316627722,1944667205,1433031509,3546387566299549,496787581,3546643550963789,382423121,600428973,430426421,325848853,735958,35162124,668794433,3546390949005555,478548163,3546672034482563,250584301,485234598,1555665460,6776617,30947486,3493262402783627,108709998,437840703,28378491,67079745,21612876,1606682745,629101318,452161580,3493089637305282,374377163,213845897,323713206,272107494,622986240,1773278179,3546656899336980,67141499,318331,285027361,114366178,203983793,1283676771,1965933018,470624011,3546583482239276,3493281239402498,1475977561,2016676980,1209319826,1335124945,416206486,129860965,1780480185,1809567655,245645656,1937416537,1060544882,1335713025,3546617688886097,3546752326044595,3546613148551357,652060948,2116071253,97407861,3546908731639909,3546693165386233,278761367,323588182,486989780,3494353494345852,96609715,264869770,478849208,1679822121,19414347,3493127314737312,702915816,482867012,3546969421122388,3546590214097572,501642082,458165375,3546662484052067,481153145,1159873315,1037607360,3546857594685834,1508100119,111900,1732848825,3546606469123022,106685726,490494088,1511660367 \ No newline at end of file