diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..12717ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,80 @@ +# 1. 忽略操作系统自动生成的文件 +.DS_Store +Thumbs.db +*.lnk + +# 2. 忽略编译/构建产物 +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# 3. 忽略IDE配置 +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# 4. 忽略日志文件 +*.log +*.tmp +*.temp +*.md + + +# 5. 忽略敏感数据 +*.env +*.key +*.pem +*.cert +config.yaml +secrets/ + +# 6. 忽略大型媒体文件 +*.mp4 +*.mov +*.avi +*.wav +*.mp3 +*.zip +*.tar +*.gz +*.7z +*.rar + +# 7. 忽略数据分析/机器学习特有 +*.model +*.h5 +*.pkl +*.joblib +.ipynb_checkpoints/ + +# 8. 忽略你项目中的自动生成目录 +# 根据你的目录结构,忽略source/output/和source/reports/下的所有文件 +# 但保留目录结构本身(可以添加空的.gitkeep文件来保持空目录) +source/output/**/* +!source/output/.gitkeep +source/reports/**/* +!source/reports/.gitkeep +source/.note +source/.test_output +source/.all_i_need +source/.all_i_need/ diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..60f7544 --- /dev/null +++ b/readme.md @@ -0,0 +1,116 @@ +# B站关注清理工具 - Scripts 版 + +> 一键命令运行全流程:`python source/scripts/run_pipeline.py` + +python source/scripts/run_pipeline.py --input-json source/resources/export_uids_test5.json + +本工具包含7个步骤的完整流水线: + +1. 抓取视频标题 +2. 分批AI分析 +3. 生成保留关注报告 +4. 生成取关UID列表 +5. 按首字母排序 +6. 提取分组信息 +7. 删除最近10条标题 + +## 快速开始 + +```powershell +# 完整流程(推荐) +python source/scripts/run_pipeline.py + +# 速度优先 +python source/scripts/run_pipeline.py --workers 8 --batch-size 30 --sleep-seconds 0 + +# 试跑30个UP +python source/scripts/run_pipeline.py --max-ups 30 + +# 跳过抓取,使用已有标题报告 +python source/scripts/run_pipeline.py --skip-fetch + +# 跳过分析,仅生成产物 +python source/scripts/run_pipeline.py --skip-analyze + +# 跳过排序/分组/删除 +python source/scripts/run_pipeline.py --skip-sort --skip-group --skip-remove +``` + +## 输出文件 + +| 文件 | 说明 | +|------|------| +| `source/output/reports/1_up_titles_report.md` | 标题抓取报告 | +| `source/output/reports/2_up_analysis_full_auto.md` | AI分析报告(完整) | +| `source/output/reports/3_up_keep_follow_only.md` | 保留关注报告 | +| `source/output/uids/4_unfollow_mids_list.txt` | 取关UID列表 | +| `source/output/reports/5_sorted_up_analysis.md` | 按首字母排序报告 | +| `source/output/reports/6_group_info.md` | 提取分组信息报告 | +| `source/output/reports/7_no_titles.md` | 最终报告(删除最近10条) | + +## 常用参数 + +| 参数 | 默认值 | 说明 | +|------|--------|------| +| `--workers` | 6 | 并发请求数 | +| `--batch-size` | 20 | 每批分析条数 | +| `--max-ups` | 0(全部) | 限制处理UP数量 | +| `--split-size` | 100 | UID拆分大小 | +| `--sleep-seconds` | 0 | 任务间隔秒数 | + +### 跳过参数 + +| 参数 | 说明 | +|------|------| +| `--skip-fetch` | 跳过抓取阶段 | +| `--skip-analyze` | 跳过分析阶段 | +| `--skip-sort` | 跳过排序阶段 | +| `--skip-group` | 跳过提取分组阶段 | +| `--skip-remove` | 跳过删除最近10条阶段 | + +## 分步执行 + +### 步骤1:抓取标题 +```powershell +python source/scripts/analyze_up_content.py --skip-ai +``` + +### 步骤2:分批AI分析 +```powershell +python source/scripts/batch_ai_summary_from_report.py --run-all-batches +``` + +### 步骤3:生成保留关注报告 +```powershell +python source/scripts/extract_keep_follow_doc.py +``` + +### 步骤4:生成取关UID +```powershell +python source/scripts/extract_unfollow_list.py --format mid-only --split-size 100 +``` + +### 步骤5:按首字母排序 +```powershell +python source/scripts/sort_up_main.py +``` + +### 步骤6:提取分组信息 +```powershell +python source/scripts/extract_group_info.py +``` + +### 步骤7:删除最近10条标题 +```powershell +python source/scripts/remove_10content.py +``` + +## 先配置API + +编辑 [source/scripts/analyze_up_content.py](source/scripts/analyze_up_content.py) 顶部配置: + +```python +VOLCENGINE_API_KEY = "你的火山引擎API Key" +VOLCENGINE_MODEL = "deepseek-v3-1-terminus" +VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" +``` \ No newline at end of file diff --git a/source/.gitignore b/source/.gitignore new file mode 100644 index 0000000..12717ad --- /dev/null +++ b/source/.gitignore @@ -0,0 +1,80 @@ +# 1. 忽略操作系统自动生成的文件 +.DS_Store +Thumbs.db +*.lnk + +# 2. 忽略编译/构建产物 +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# 3. 忽略IDE配置 +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# 4. 忽略日志文件 +*.log +*.tmp +*.temp +*.md + + +# 5. 忽略敏感数据 +*.env +*.key +*.pem +*.cert +config.yaml +secrets/ + +# 6. 忽略大型媒体文件 +*.mp4 +*.mov +*.avi +*.wav +*.mp3 +*.zip +*.tar +*.gz +*.7z +*.rar + +# 7. 忽略数据分析/机器学习特有 +*.model +*.h5 +*.pkl +*.joblib +.ipynb_checkpoints/ + +# 8. 忽略你项目中的自动生成目录 +# 根据你的目录结构,忽略source/output/和source/reports/下的所有文件 +# 但保留目录结构本身(可以添加空的.gitkeep文件来保持空目录) +source/output/**/* +!source/output/.gitkeep +source/reports/**/* +!source/reports/.gitkeep +source/.note +source/.test_output +source/.all_i_need +source/.all_i_need/ diff --git a/source/scripts/analyze_up_content.py b/source/scripts/analyze_up_content.py new file mode 100644 index 0000000..fb7c973 --- /dev/null +++ b/source/scripts/analyze_up_content.py @@ -0,0 +1,690 @@ +#!/usr/bin/env python3 +"""Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API. + +Input JSON format (list of objects): +[ + {"mid": 12345, "name": "UP Name", "tag": ["准备取关"]} +] +""" + +from __future__ import annotations + +import argparse +import hashlib +import html +import json +import random +import re +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from urllib import error, parse, request + + +BILIBILI_API = "https://api.bilibili.com/x/space/arc/search" +BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search" +BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav" +# 可选:如果仍频繁触发412,可填浏览器里复制的Cookie字符串。 +BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696" +RUNTIME_BILIBILI_COOKIE = "" +DEFAULT_USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" +) +MIXIN_KEY_ENC_TAB = [ + 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, + 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, + 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4, + 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52, +] + +# 在这里直接填写火山引擎配置。 +VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" +VOLCENGINE_MODEL = "deepseek-v3-1-terminus" +VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" + + +@dataclass +class UpItem: + mid: int + name: str + tag: list[str] + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="抓取 UP 前10个视频标题,并调用火山引擎 API 生成分析报告" + ) + parser.add_argument( + "--input", + default="./source/resources/export_uids.json", + help="输入 JSON 文件路径,默认: ./source/resources/export_uids.json", + ) + parser.add_argument( + "--output", + default="./source/output/reports/up_titles_report.md", + help="输出 Markdown 报告路径,默认: ./source/output/reports/up_titles_report.md", + ) + parser.add_argument( + "--titles-per-up", + type=int, + default=10, + help="每个 UP 抓取的视频标题数量,默认: 10", + ) + parser.add_argument( + "--max-ups", + type=int, + default=0, + help="最多处理多少个 UP,0 表示全部", + ) + parser.add_argument( + "--only-tag", + default="", + help="只处理包含该标签的 UP,例如: 准备取关;留空表示不过滤", + ) + parser.add_argument( + "--sleep-seconds", + type=float, + default=0.8, + help="每个 UP 抓取后的等待秒数,默认: 0.8", + ) + parser.add_argument( + "--retry-times", + type=int, + default=3, + help="抓取重试次数(遇到412/-799时),默认: 3", + ) + parser.add_argument( + "--test-mid", + type=int, + default=0, + help="测试模式:只抓取这个mid,不读取输入文件", + ) + parser.add_argument( + "--test-name", + default="TEST_UP", + help="测试模式下显示名称,默认: TEST_UP", + ) + parser.add_argument( + "--skip-ai", + action="store_true", + help="只测试抓取,不调用AI分析", + ) + parser.add_argument( + "--debug", + action="store_true", + help="输出抓取调试信息", + ) + parser.add_argument( + "--bili-cookie", + default="", + help="可选:运行时传入B站Cookie,优先级高于脚本内BILIBILI_COOKIE", + ) + parser.add_argument( + "--fetch-mode", + choices=["auto", "api", "html"], + default="auto", + help="抓取模式: auto(先API后HTML)/api/html,默认: auto", + ) + parser.add_argument( + "--analyze-from-report", + default="", + help="从已有报告读取标题并仅执行AI分析,例如: source/up_analysis_report.md", + ) + parser.add_argument( + "--batch-size", + type=int, + default=30, + help="分批分析时每批数量,默认: 30", + ) + parser.add_argument( + "--batch-index", + type=int, + default=1, + help="分批分析批次序号(从1开始),默认: 1", + ) + return parser.parse_args() + + +def parse_report_items(report_path: Path) -> list[dict[str, Any]]: + lines = report_path.read_text(encoding="utf-8").splitlines() + items: list[dict[str, Any]] = [] + current: dict[str, Any] | None = None + section = "" + + for line in lines: + m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) + if m: + if current is not None: + items.append(current) + current = { + "mid": int(m.group(2)), + "name": m.group(1).strip(), + "tag": [], + "url": f"https://space.bilibili.com/{int(m.group(2))}/video", + "titles": [], + "analysis": "", + "error": "", + } + section = "" + continue + + if current is None: + continue + + if line.startswith("- 主页: "): + current["url"] = line.replace("- 主页: ", "", 1).strip() + continue + if line.startswith("- 标签: "): + raw_tag = line.replace("- 标签: ", "", 1).strip() + current["tag"] = [] if raw_tag in ("", "无") else [x.strip() for x in raw_tag.split(",") if x.strip()] + continue + if line == "### 最近10条标题": + section = "titles" + continue + if line == "### AI分析": + section = "analysis" + continue + if line == "### 异常": + section = "error" + continue + if line.startswith("### "): + section = "" + continue + + if section == "titles" and line.startswith("- "): + t = line[2:].strip() + if t and t != "(未抓取到标题)": + current["titles"].append(t) + elif section == "analysis": + if line.strip(): + if current["analysis"]: + current["analysis"] += "\n" + line.strip() + else: + current["analysis"] = line.strip() + elif section == "error" and line.startswith("- "): + current["error"] = line[2:].strip() + + if current is not None: + items.append(current) + return items + + +def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int: + report_path = Path(args.analyze_from_report) + if not report_path.exists(): + print(f"报告文件不存在: {report_path}", file=sys.stderr) + return 1 + + items = parse_report_items(report_path) + if not items: + print("报告中未解析到可分析条目", file=sys.stderr) + return 1 + + pending = [ + it for it in items + if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析") + ] + if not pending: + print("报告中没有待分析条目(可能已全部分析完成)") + output_path.write_text(build_report(items), encoding="utf-8") + return 0 + + batch_size = max(args.batch_size, 1) + batch_index = max(args.batch_index, 1) + start = (batch_index - 1) * batch_size + end = start + batch_size + batch = pending[start:end] + if not batch: + print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}") + output_path.write_text(build_report(items), encoding="utf-8") + return 0 + + print( + f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, " + f"本批{len(batch)}条, 待分析总数{len(pending)}" + ) + + key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} + for idx, it in enumerate(batch, start=1): + print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})") + try: + analysis = analyze_titles(it["name"], it["url"], it["titles"]) + origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}") + if origin_idx is not None: + items[origin_idx]["analysis"] = analysis + items[origin_idx]["error"] = "" + except Exception as exc: # noqa: BLE001 + origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}") + if origin_idx is not None: + items[origin_idx]["error"] = str(exc) + time.sleep(max(args.sleep_seconds, 0.0)) + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(build_report(items), encoding="utf-8") + print(f"分批分析报告已生成: {output_path}") + return 0 + + +def load_up_items(input_path: Path) -> list[UpItem]: + raw = json.loads(input_path.read_text(encoding="utf-8")) + if not isinstance(raw, list): + raise ValueError("输入 JSON 必须是数组") + + items: list[UpItem] = [] + for idx, obj in enumerate(raw): + if not isinstance(obj, dict): + raise ValueError(f"第 {idx + 1} 项不是对象") + mid = obj.get("mid") + name = obj.get("name", "") + tags = obj.get("tag", []) + if mid is None: + continue + try: + mid_int = int(mid) + except (TypeError, ValueError): + continue + if not isinstance(name, str): + name = str(name) + if not isinstance(tags, list): + tags = [] + tags = [str(t) for t in tags] + items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags)) + return items + + +def http_get_json( + url: str, + timeout: float = 20.0, + referer: str = "https://space.bilibili.com/", +) -> dict[str, Any]: + headers = { + "User-Agent": DEFAULT_USER_AGENT, + "Referer": referer, + "Origin": "https://www.bilibili.com", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + } + cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip() + if cookie: + headers["Cookie"] = cookie + req = request.Request(url, headers=headers, method="GET") + with request.urlopen(req, timeout=timeout) as resp: + body = resp.read().decode("utf-8", errors="replace") + return json.loads(body) + + +def http_get_text( + url: str, + timeout: float = 20.0, + referer: str = "https://space.bilibili.com/", +) -> str: + headers = { + "User-Agent": DEFAULT_USER_AGENT, + "Referer": referer, + "Origin": "https://www.bilibili.com", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + } + cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip() + if cookie: + headers["Cookie"] = cookie + req = request.Request(url, headers=headers, method="GET") + with request.urlopen(req, timeout=timeout) as resp: + return resp.read().decode("utf-8", errors="replace") + + +def get_mixin_key(img_key: str, sub_key: str) -> str: + origin = img_key + sub_key + mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB) + return mixed[:32] + + +def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]: + params = {k: str(v) for k, v in base_params.items()} + params["wts"] = str(int(time.time())) + params = dict(sorted(params.items())) + filtered = { + k: re.sub(r"[!'()*]", "", v) + for k, v in params.items() + } + query = parse.urlencode(filtered) + w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest() + filtered["w_rid"] = w_rid + return filtered + + +def get_wbi_mixin_key() -> str: + data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/") + if data.get("code") != 0: + raise RuntimeError( + f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}" + ) + wbi_img = data.get("data", {}).get("wbi_img", {}) + img_url = wbi_img.get("img_url", "") + sub_url = wbi_img.get("sub_url", "") + if not img_url or not sub_url: + raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url") + img_key = img_url.rsplit("/", 1)[-1].split(".")[0] + sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0] + return get_mixin_key(img_key, sub_key) + + +def parse_titles_from_data(data: dict[str, Any]) -> list[str]: + vlist = data.get("data", {}).get("list", {}).get("vlist", []) + if not isinstance(vlist, list): + return [] + titles: list[str] = [] + for item in vlist: + if not isinstance(item, dict): + continue + title = item.get("title", "") + if isinstance(title, str) and title.strip(): + titles.append(clean_html(title.strip())) + return titles + + +def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]: + url = f"https://space.bilibili.com/{mid}/video" + html_text = http_get_text(url, referer="https://www.bilibili.com/") + + # 页面中视频封面常携带标题到alt字段,优先从这里提取。 + alt_candidates = re.findall( + r']*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"', + html_text, + flags=re.IGNORECASE, + ) + + titles: list[str] = [] + seen: set[str] = set() + for raw in alt_candidates: + t = clean_html(html.unescape(raw)).strip() + if not t or t in seen: + continue + seen.add(t) + titles.append(t) + if len(titles) >= titles_per_up: + break + + if debug: + print(f"[debug] HTML模式提取到 {len(titles)} 条标题") + return titles + + +def fetch_titles( + mid: int, + titles_per_up: int, + retry_times: int = 3, + debug: bool = False, + fetch_mode: str = "auto", +) -> list[str]: + base_params = { + "mid": str(mid), + "pn": "1", + "ps": str(titles_per_up), + "order": "pubdate", + "index": "1", + "jsonp": "json", + } + + errors: list[str] = [] + if fetch_mode in ("auto", "api"): + # 优先使用wbi接口,稳定性通常更好。 + mixin_key = "" + try: + mixin_key = get_wbi_mixin_key() + except Exception as exc: # noqa: BLE001 + if debug: + print(f"[debug] 获取wbi密钥失败: {exc}") + + for attempt in range(1, max(retry_times, 1) + 1): + try: + if mixin_key: + signed = build_wbi_params(base_params, mixin_key) + url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}" + else: + url = f"{BILIBILI_API}?{parse.urlencode(base_params)}" + data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video") + code = data.get("code", -1) + if code == 0: + titles = parse_titles_from_data(data) + if titles: + return titles + errors.append("接口返回成功但标题为空") + else: + errors.append(f"code={code}, message={data.get('message', 'unknown')} ") + except error.HTTPError as exc: + errors.append(f"HTTP {exc.code} {exc.reason}") + except Exception as exc: # noqa: BLE001 + errors.append(str(exc)) + + sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0)) + if debug: + print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]},{sleep_for:.1f}s后重试") + time.sleep(sleep_for) + + if fetch_mode in ("auto", "html"): + try: + html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug) + if html_titles: + return html_titles + errors.append("HTML模式未提取到标题") + except Exception as exc: # noqa: BLE001 + errors.append(f"HTML模式失败: {exc}") + + joined = "; ".join(errors[-3:]) + if ("412" in joined) or ("-799" in joined): + hint = "提示: 请在脚本里填写BILIBILI_COOKIE,或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\"" + raise RuntimeError(f"{joined}; {hint}") + raise RuntimeError(joined) + + +def clean_html(text: str) -> str: + return re.sub(r"<[^>]+>", "", text) + + +def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str: + api_key = VOLCENGINE_API_KEY.strip() + base_url = VOLCENGINE_BASE_URL.strip() + model = VOLCENGINE_MODEL.strip() + + if (not api_key) or ("在这里填" in api_key): + raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") + if (not model) or ("在这里填" in model): + raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") + if not base_url: + raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") + + url = f"{base_url.rstrip('/')}/chat/completions" + payload = { + "model": model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "temperature": 0.4, + } + data = json.dumps(payload, ensure_ascii=False).encode("utf-8") + + req = request.Request( + url, + data=data, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + method="POST", + ) + + with request.urlopen(req, timeout=60) as resp: + body = resp.read().decode("utf-8", errors="replace") + result = json.loads(body) + content = result.get("choices", [{}])[0].get("message", {}).get("content", "") + if not isinstance(content, str) or not content.strip(): + raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}") + return content.strip() + + +def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str: + system_prompt = ( + "你是一个内容分析助手。根据视频标题判断UP主内容方向,并给出是否建议取关。" + "输出必须是简体中文,且严格按照用户给定的Markdown格式。" + ) + joined_titles = "\n".join(f"- {t}" for t in titles) + user_prompt = f""" +请分析以下UP主最近视频标题: + +UP主:{up_name} +主页:{up_url} +标题: +{joined_titles} + +请按以下格式输出(不要增加其它段落): +1) 内容定位:一句话 +2) 受众画像:一句话 +3) 近期内容倾向:2-3点,使用-开头 +4) 质量评价:80-120字 +5) 取关建议:保留关注/可以取关(二选一) +6) 建议理由:50-100字 +""".strip() + return call_volcengine_chat(system_prompt, user_prompt) + + +def build_report(results: list[dict[str, Any]]) -> str: + now = time.strftime("%Y-%m-%d %H:%M:%S") + lines: list[str] = [] + lines.append("# UP主内容分析报告") + lines.append("") + lines.append(f"- 生成时间: {now}") + lines.append(f"- 分析数量: {len(results)}") + lines.append("") + + for idx, item in enumerate(results, start=1): + lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") + lines.append("") + lines.append(f"- 主页: {item['url']}") + tags = item.get("tag", []) + lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") + lines.append("") + lines.append("### 最近10条标题") + lines.append("") + titles = item.get("titles", []) + if titles: + for t in titles: + lines.append(f"- {t}") + else: + lines.append("- (未抓取到标题)") + lines.append("") + + analysis = item.get("analysis", "") + if analysis: + lines.append("### AI分析") + lines.append("") + lines.append(analysis) + lines.append("") + + error_msg = item.get("error", "") + if error_msg: + lines.append("### 异常") + lines.append("") + lines.append(f"- {error_msg}") + lines.append("") + + return "\n".join(lines).rstrip() + "\n" + + +def main() -> int: + global RUNTIME_BILIBILI_COOKIE + args = parse_args() + RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip() + input_path = Path(args.input) + output_path = Path(args.output) + + if args.analyze_from_report: + return run_batch_analysis_from_report(args, output_path) + + if args.test_mid > 0: + items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ] + print(f"测试模式: 仅处理 mid={args.test_mid}") + else: + if not input_path.exists(): + print(f"输入文件不存在: {input_path}", file=sys.stderr) + return 1 + + try: + items = load_up_items(input_path) + except Exception as exc: + print(f"加载输入文件失败: {exc}", file=sys.stderr) + return 1 + + if args.only_tag: + items = [it for it in items if args.only_tag in it.tag] + + if args.max_ups and args.max_ups > 0: + items = items[: args.max_ups] + + if not items: + print("没有可处理的 UP 数据", file=sys.stderr) + return 1 + + print(f"开始处理 {len(items)} 个 UP...") + if args.skip_ai: + print("已启用 --skip-ai,仅测试抓取标题") + if args.debug: + print(f"[debug] 当前抓取模式: {args.fetch_mode}") + + results: list[dict[str, Any]] = [] + for idx, item in enumerate(items, start=1): + up_url = f"https://space.bilibili.com/{item.mid}/video" + row: dict[str, Any] = { + "mid": item.mid, + "name": item.name or f"mid_{item.mid}", + "tag": item.tag, + "url": up_url, + "titles": [], + "analysis": "", + "error": "", + } + + print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})") + try: + titles = fetch_titles( + item.mid, + args.titles_per_up, + retry_times=args.retry_times, + debug=args.debug, + fetch_mode=args.fetch_mode, + ) + row["titles"] = titles + if not titles: + row["error"] = "未抓取到标题,可能是接口限制或UP无公开视频" + elif args.skip_ai: + row["analysis"] = "测试模式已跳过AI分析" + else: + row["analysis"] = analyze_titles(row["name"], up_url, titles) + except error.HTTPError as exc: + row["error"] = f"HTTP错误: {exc.code} {exc.reason}" + except error.URLError as exc: + row["error"] = f"网络错误: {exc.reason}" + except Exception as exc: # noqa: BLE001 + row["error"] = str(exc) + + if args.debug and row["titles"]: + sample = row["titles"][: min(3, len(row["titles"]))] + print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}") + + results.append(row) + time.sleep(max(args.sleep_seconds, 0)) + + report = build_report(results) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(report, encoding="utf-8") + print(f"报告已生成: {output_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/source/scripts/batch_ai_summary_from_report.py b/source/scripts/batch_ai_summary_from_report.py new file mode 100644 index 0000000..9308608 --- /dev/null +++ b/source/scripts/batch_ai_summary_from_report.py @@ -0,0 +1,598 @@ +#!/usr/bin/env python3 +"""Batch AI summary from existing UP markdown report. + +Read an existing report (e.g. source/up_analysis_report.md), +extract each UP's title list, and generate AI summaries in batches. +""" + +from __future__ import annotations + +import argparse +from concurrent.futures import ThreadPoolExecutor, as_completed +import json +import math +import re +import sys +import time +from pathlib import Path +from typing import Any +from urllib import request + +# Fill your Volcengine Ark settings here. +VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" +VOLCENGINE_MODEL = "deepseek-v3-1-terminus" +VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" + +SKIP_MARKERS = { + "", + "测试模式已跳过AI分析", + "(待分析)", +} + +# 预设分组及关键词规则(可自行扩展)。 +PRESET_GROUPS: dict[str, list[str]] = { + "AAA_核心每日必读":[ + "编程", "算法", "工程", "干货", "新闻", "趋势", + ], + "AA_编程信息干货必留": [ + "编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构", + ], + "A_硬核知识保留": [ + "科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度", + ], + "B_技能学习保留": [ + "英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作", + ], + "C_资讯快餐观察": [ + "热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势", + ], + "D_娱乐消遣可取关": [ + "搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣", + ], + "E_营销带货谨慎": [ + "好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活", + ], +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结") + parser.add_argument( + "--input-report", + default="source/output/reports/1_up_titles_report.md", + help="已有标题报告路径", + ) + parser.add_argument( + "--output-report", + default="source/output/reports/2_up_analysis_full_auto.md", + help="输出报告路径", + ) + parser.add_argument( + "--batch-size", + type=int, + default=20, + help="每批处理数量,默认: 20", + ) + parser.add_argument( + "--batch-index", + type=int, + default=1, + help="批次序号(从1开始),默认: 1", + ) + parser.add_argument( + "--sleep-seconds", + type=float, + default=0.0, + help="提交任务间隔秒数,默认: 0(并发模式建议0)", + ) + parser.add_argument( + "--workers", + type=int, + default=4, + help="并发请求数,默认: 4", + ) + parser.add_argument( + "--max-retries", + type=int, + default=2, + help="单个UP分析最大重试次数,默认: 2", + ) + parser.add_argument( + "--request-timeout", + type=float, + default=60.0, + help="单次AI请求超时秒数,默认: 60", + ) + parser.add_argument( + "--force", + action="store_true", + help="强制覆盖已有AI分析(默认只处理待分析项)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="输出调试信息", + ) + parser.add_argument( + "--config-from", + default="source/scripts/analyze_up_content.py", + help="自动读取API配置的脚本路径", + ) + parser.add_argument( + "--run-all-batches", + action="store_true", + help="自动连续跑完所有批次(忽略batch-index)", + ) + return parser.parse_args() + + +def load_api_config_from_script(path: Path) -> dict[str, str]: + if not path.exists(): + return {} + text = path.read_text(encoding="utf-8", errors="replace") + result: dict[str, str] = {} + for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"): + m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE) + if m: + result[key] = m.group(1).strip() + return result + + +def parse_report(path: Path) -> list[dict[str, Any]]: + lines = path.read_text(encoding="utf-8").splitlines() + + items: list[dict[str, Any]] = [] + current: dict[str, Any] | None = None + section = "" + + for line in lines: + m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) + if m: + if current is not None: + items.append(current) + mid = int(m.group(2)) + current = { + "mid": mid, + "name": m.group(1).strip(), + "tag": [], + "url": f"https://space.bilibili.com/{mid}/video", + "titles": [], + "analysis": "", + "group": "", + "action": "", + "reason": "", + "error": "", + } + section = "" + continue + + if current is None: + continue + + if line.startswith("- 主页: "): + current["url"] = line.replace("- 主页: ", "", 1).strip() + continue + if line.startswith("- 标签: "): + raw = line.replace("- 标签: ", "", 1).strip() + current["tag"] = [] if raw in ("", "无") else [x.strip() for x in raw.split(",") if x.strip()] + continue + + if line == "### 最近10条标题": + section = "titles" + continue + if line == "### AI分析": + section = "analysis" + continue + if line == "### 分组建议": + section = "group" + continue + if line == "### 异常": + section = "error" + continue + if line.startswith("### "): + section = "" + continue + + if section == "titles" and line.startswith("- "): + text = line[2:].strip() + if text and text != "(未抓取到标题)": + current["titles"].append(text) + elif section == "analysis" and line.strip(): + current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip() + elif section == "group": + if line.startswith("- 预设分组: "): + current["group"] = line.replace("- 预设分组: ", "", 1).strip() + elif line.startswith("- 建议动作: "): + current["action"] = line.replace("- 建议动作: ", "", 1).strip() + elif line.startswith("- 判断依据: "): + current["reason"] = line.replace("- 判断依据: ", "", 1).strip() + elif line.strip() == "(待分组)": + current["group"] = "" + current["action"] = "" + current["reason"] = "" + elif section == "error" and line.startswith("- "): + current["error"] = line[2:].strip() + + if current is not None: + items.append(current) + + return items + + +def call_volcengine_chat( + system_prompt: str, + user_prompt: str, + cfg: dict[str, str], + timeout: float, +) -> str: + api_key = cfg.get("VOLCENGINE_API_KEY", "").strip() + model = cfg.get("VOLCENGINE_MODEL", "").strip() + base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip() + + if (not api_key) or ("在这里填" in api_key): + raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") + if (not model) or ("在这里填" in model): + raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") + if not base_url: + raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") + + payload = { + "model": model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "temperature": 0.4, + } + + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = request.Request( + f"{base_url.rstrip('/')}/chat/completions", + data=body, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + method="POST", + ) + + with request.urlopen(req, timeout=timeout) as resp: + text = resp.read().decode("utf-8", errors="replace") + + data = json.loads(text) + content = data.get("choices", [{}])[0].get("message", {}).get("content", "") + if not isinstance(content, str) or not content.strip(): + raise RuntimeError(f"AI响应异常: {text[:500]}") + return content.strip() + + +def summarize_one_up( + name: str, + mid: int, + titles: list[str], + tags: list[str], + cfg: dict[str, str], + timeout: float, +) -> dict[str, str]: + system_prompt = ( + "你是内容定位与订阅决策助手。" + "你必须输出合法JSON,不要输出其它文本。" + ) + joined_titles = "\n".join(f"- {t}" for t in titles) + joined_tags = "、".join(tags) if tags else "无" + rule_hint = heuristic_group_hint(titles, tags) + groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS) + + user_prompt = f""" +请基于以下信息完成分组与总结。 + +UP主: {name} +mid: {mid} +标签: {joined_tags} +最近标题: +{joined_titles} + +预设分组: +{groups_desc} + +代码规则初判: +{rule_hint} + +要求: +1) 输出JSON对象,字段严格为: summary, group, action, reason。 +2) summary: 一段中文总结,50-100字。 +3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。 +4) action: 只能是"保留关注"或"可以取关"。敏感一点,只保留真正核心优质的up,其他都建议取关。 +5) reason: 30-60字,解释为什么分到该组并给出该动作。 +""".strip() + + content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout) + return parse_ai_json(content) + + +def parse_ai_json(content: str) -> dict[str, str]: + text = content.strip() + if text.startswith("```"): + text = re.sub(r"^```[a-zA-Z]*\n?", "", text) + text = re.sub(r"\n?```$", "", text).strip() + m = re.search(r"\{.*\}", text, flags=re.DOTALL) + if m: + text = m.group(0) + data = json.loads(text) + summary = str(data.get("summary", "")).strip() + group = str(data.get("group", "")).strip() + action = str(data.get("action", "")).strip() + reason = str(data.get("reason", "")).strip() + if not summary: + raise RuntimeError("AI返回缺少summary") + if group not in PRESET_GROUPS: + raise RuntimeError(f"AI返回未知group: {group}") + if action not in ("保留关注", "可以取关"): + raise RuntimeError(f"AI返回未知action: {action}") + if not reason: + reason = "基于标题内容与更新风格综合判断。" + return { + "summary": summary, + "group": group, + "action": action, + "reason": reason, + } + + +def heuristic_group_hint(titles: list[str], tags: list[str]) -> str: + text = "\n".join(titles) + "\n" + " ".join(tags) + score: dict[str, int] = {k: 0 for k in PRESET_GROUPS} + lower_text = text.lower() + for group, words in PRESET_GROUPS.items(): + for w in words: + w_lower = w.lower() + if w_lower in lower_text: + score[group] += 1 + ranked = sorted(score.items(), key=lambda x: x[1], reverse=True) + best_group, best_score = ranked[0] + if best_score <= 0: + return "未命中关键词,倾向按内容专业度与稳定性判断。" + top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3]) + return f"关键词命中最高组={best_group}(score={best_score}),参考分布: {top3}" + + +def summarize_one_up_with_retry( + item: dict[str, Any], + cfg: dict[str, str], + max_retries: int, + timeout: float, + debug: bool, +) -> dict[str, str]: + last_exc: Exception | None = None + total_try = max(1, max_retries) + for attempt in range(1, total_try + 1): + try: + return summarize_one_up( + item["name"], + item["mid"], + item.get("titles", []), + item.get("tag", []), + cfg, + timeout=timeout, + ) + except Exception as exc: # noqa: BLE001 + last_exc = exc + if debug: + print(f"[debug] {item['name']} 第{attempt}次失败: {exc}") + if attempt < total_try: + time.sleep(min(2.0, 0.5 * attempt)) + raise RuntimeError(str(last_exc) if last_exc else "未知错误") + + +def build_report(items: list[dict[str, Any]], batch_note: str) -> str: + now = time.strftime("%Y-%m-%d %H:%M:%S") + lines: list[str] = [ + "# UP主内容分析报告(分批AI总结)", + "", + f"- 生成时间: {now}", + f"- 分析数量: {len(items)}", + f"- 处理说明: {batch_note}", + "", + ] + + group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS} + action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0} + for item in items: + g = item.get("group", "") + a = item.get("action", "") + if g in group_stats: + group_stats[g] += 1 + if a in action_stats: + action_stats[a] += 1 + + lines.append("## 分组统计") + lines.append("") + for g, c in group_stats.items(): + lines.append(f"- {g}: {c}") + lines.append(f"- 保留关注: {action_stats['保留关注']}") + lines.append(f"- 可以取关: {action_stats['可以取关']}") + lines.append("") + + for idx, item in enumerate(items, start=1): + lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") + lines.append("") + lines.append(f"- 主页: {item['url']}") + tags = item.get("tag", []) + lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") + lines.append("") + lines.append("### 最近10条标题") + lines.append("") + titles = item.get("titles", []) + if titles: + for t in titles: + lines.append(f"- {t}") + else: + lines.append("- (未抓取到标题)") + lines.append("") + + lines.append("### AI分析") + lines.append("") + analysis = item.get("analysis", "") + lines.append(analysis if analysis else "(待分析)") + lines.append("") + + lines.append("### 分组建议") + lines.append("") + group = item.get("group", "") + action = item.get("action", "") + reason = item.get("group_reason", "") + if group and action: + lines.append(f"- 预设分组: {group}") + lines.append(f"- 建议动作: {action}") + lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}") + else: + lines.append("- (待分组)") + lines.append("") + + error = item.get("error", "") + if error: + lines.append("### 异常") + lines.append("") + lines.append(f"- {error}") + lines.append("") + + return "\n".join(lines).rstrip() + "\n" + + +def main() -> int: + args = parse_args() + input_report = Path(args.input_report) + output_report = Path(args.output_report) + + if not input_report.exists(): + print(f"输入报告不存在: {input_report}", file=sys.stderr) + return 1 + + items = parse_report(input_report) + if not items: + print("输入报告未解析出任何UP条目", file=sys.stderr) + return 1 + + config = { + "VOLCENGINE_API_KEY": VOLCENGINE_API_KEY, + "VOLCENGINE_MODEL": VOLCENGINE_MODEL, + "VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL, + } + if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]): + inherited = load_api_config_from_script(Path(args.config_from)) + if inherited: + config.update(inherited) + + if args.force: + pending = [it for it in items if it.get("titles")] + # else: + # pending = [ + # it for it in items + # if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS + # ] + else: + pending = [ + it for it in items + if it.get("titles") and ( + it.get("analysis", "").strip() in SKIP_MARKERS + or not it.get("group") # 没有分组也要重跑 + ) + ] + + if not pending: + print("没有待分析条目,直接输出当前报告") + output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8") + return 0 + + index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} + success_total = 0 + failed_total = 0 + + batch_size = max(1, args.batch_size) + if args.run_all_batches: + total_batches = math.ceil(len(pending) / batch_size) + batch_indexes = list(range(1, total_batches + 1)) + print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}") + else: + batch_indexes = [max(1, args.batch_index)] + + workers = max(1, args.workers) + print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s") + + for batch_index in batch_indexes: + start = (batch_index - 1) * batch_size + end = start + batch_size + batch = pending[start:end] + if not batch: + continue + + print( + f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, " + f"本批{len(batch)}条, 待分析总数{len(pending)}" + ) + + success = 0 + failed = 0 + future_to_item: dict[Any, dict[str, Any]] = {} + with ThreadPoolExecutor(max_workers=workers) as executor: + for i, it in enumerate(batch, start=1): + print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})") + future = executor.submit( + summarize_one_up_with_retry, + it, + config, + max(1, args.max_retries), + float(args.request_timeout), + args.debug, + ) + future_to_item[future] = it + if args.sleep_seconds > 0: + time.sleep(args.sleep_seconds) + + done_count = 0 + for future in as_completed(future_to_item): + done_count += 1 + it = future_to_item[future] + idx = index_map.get(f"{it['mid']}::{it['name']}") + try: + ai_res = future.result() + if idx is not None: + items[idx]["analysis"] = ai_res["summary"] + items[idx]["group"] = ai_res["group"] + items[idx]["action"] = ai_res["action"] + items[idx]["group_reason"] = ai_res["reason"] + items[idx]["error"] = "" + success += 1 + print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})") + except Exception as exc: # noqa: BLE001 + if idx is not None: + items[idx]["error"] = str(exc) + failed += 1 + print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})") + if args.debug: + print(f"[debug] 失败详情: {exc}") + + success_total += success + failed_total += failed + + step_note = ( + f"第{batch_index}批完成: 成功{success}, 失败{failed}, " + f"本批{len(batch)}, 待分析总数{len(pending)}" + ) + output_report.parent.mkdir(parents=True, exist_ok=True) + output_report.write_text(build_report(items, step_note), encoding="utf-8") + print(f"第{batch_index}批写入完成: {output_report}") + + mode_text = "自动连续" if args.run_all_batches else "单批" + note = ( + f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, " + f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}" + ) + output_report.parent.mkdir(parents=True, exist_ok=True) + output_report.write_text(build_report(items, note), encoding="utf-8") + print(f"输出完成: {output_report}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/source/scripts/extract_group_info.py b/source/scripts/extract_group_info.py new file mode 100644 index 0000000..9b81fc1 --- /dev/null +++ b/source/scripts/extract_group_info.py @@ -0,0 +1,101 @@ +import argparse +import re + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="提取UP分组信息") + parser.add_argument( + "--input", + default="./source/19_53_no_titles.md", + help="输入报告路径", + ) + parser.add_argument( + "--output", + help="输出报告路径(默认覆盖输入)", + ) + return parser.parse_args() + +def main(): + args = parse_args() + input_file = args.input + output_file = args.output or input_file + + with open(input_file, 'r', encoding='utf-8') as f: + content = f.read() + + lines = content.split('\n') + section_starts = [] + for i, line in enumerate(lines): + if line.startswith('## '): + section_starts.append(i) + + if len(section_starts) < 2: + print('No sections found') + return 1 + + header = '\n'.join(lines[:section_starts[0]]) + sections = [] + + for idx in range(len(section_starts)): + start = section_starts[idx] + end = section_starts[idx + 1] if idx + 1 < len(section_starts) else len(lines) + section = '\n'.join(lines[start:end]) + sections.append(section) + + sections = sections[1:] + + parsed = [] + for sec in sections: + match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec) + if match: + num = int(match.group(1)) + name = match.group(2) + mid = match.group(3) + + group_m = re.search(r'- 预设分组: (.+)', sec) + action_m = re.search(r'- 建议动作: (.+)', sec) + reason_m = re.search(r'- 判断依据: (.+)', sec) + error_m = re.search(r'AI返回未知group: (.+)', sec) + + group = group_m.group(1).strip() if group_m else "" + action = action_m.group(1).strip() if action_m else "" + reason = reason_m.group(1).strip() if reason_m else "" + error = error_m.group(1).strip() if error_m else "" + + parsed.append({ + 'num': num, + 'name': name, + 'mid': mid, + 'group': group, + 'action': action, + 'reason': reason, + 'error': error + }) + + parsed.sort(key=lambda x: (x['name'].casefold(), int(x['mid']))) + + lines_out = [header, ""] + + for p in parsed: + lines_out.append(f"## {p['num']}. {p['name']} (mid: {p['mid']})") + lines_out.append("") + if p['group']: + lines_out.append(f"- 预设分组: {p['group']}") + if p['action']: + lines_out.append(f"- 建议动作: {p['action']}") + if p['reason']: + lines_out.append(f"- 判断依据: {p['reason']}") + if p['error']: + lines_out.append(f"- 异常: {p['error']}") + lines_out.append("") + + result = '\n'.join(lines_out) + result = re.sub(r'\n{3,}', '\n\n', result) + + with open(output_file, 'w', encoding='utf-8') as f: + f.write(result) + + print(f'Extracted {len(parsed)} sections') + return 0 + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/source/scripts/extract_keep_follow_doc.py b/source/scripts/extract_keep_follow_doc.py new file mode 100644 index 0000000..afe96f7 --- /dev/null +++ b/source/scripts/extract_keep_follow_doc.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import re +import time +from pathlib import Path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="提取非取关UP的AI分析与分组建议") + parser.add_argument( + "--input-report", + default="source/output/reports/2_up_analysis_full_auto.md", + help="输入分析报告路径", + ) + parser.add_argument( + "--output-report", + default="source/output/reports/3_up_keep_follow_only.md", + help="输出保留关注报告路径", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + src = Path(args.input_report) + dst = Path(args.output_report) + + if not src.exists(): + print(f"来源文件不存在: {src}") + return 1 + + text = src.read_text(encoding="utf-8") + pattern = r"^##\s+\d+\.\s+(.+?)\s+\(mid:\s*(\d+)\)\s*$" + matches = list(re.finditer(pattern, text, re.MULTILINE)) + + items: list[tuple[str, str, str, str, str, str]] = [] + for i, m in enumerate(matches): + start = m.start() + end = matches[i + 1].start() if i + 1 < len(matches) else len(text) + section = text[start:end] + + name = m.group(1).strip() + mid = m.group(2).strip() + + action_m = re.search(r"-\s*建议动作:\s*(.+)", section) + action = action_m.group(1).strip() if action_m else "" + # 反逻辑:没有"建议动作: 可以取关"就保留 + if action == "可以取关": + continue + + ai_m = re.search(r"###\s*AI分析\s*\n([\s\S]*?)(?=\n###\s|\Z)", section) + ai_text = ai_m.group(1).strip() if ai_m else "" + + group_m = re.search(r"###\s*分组建议\s*\n([\s\S]*?)(?=\n###\s|\Z)", section) + group_text = group_m.group(1).strip() if group_m else "" + + error_m = re.search(r"###\s*异常\s*\n([\s\S]*?)(?=\n###\s|\Z)", section) + error_text = error_m.group(1).strip() if error_m else "" + + items.append((name, mid, ai_text, group_text, action, error_text)) + + # 按昵称首字母A-Z排序(同名时按mid升序) + items.sort(key=lambda x: (x[0].casefold(), int(x[1]))) + + lines = [ + "# 保留关注UP主分析与分组建议", + "", + f"- 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}", + f"- 来源文件: {src.name}", + f"- 条目数: {len(items)}", + "", + ] + + for idx, (name, mid, ai_text, group_text, action, error_text) in enumerate(items, 1): + lines.append(f"## {idx}. {name} (mid: {mid})") + lines.append("") + + lines.append("### AI分析") + lines.append("") + lines.append(ai_text if ai_text else "(无)") + lines.append("") + + lines.append("### 分组建议") + lines.append("") + lines.append(group_text if group_text else f"- 建议动作: {action if action else '(无)'}") + lines.append("") + + if error_text: + lines.append("### 异常") + lines.append("") + lines.append(error_text) + lines.append("") + + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_text("\n".join(lines), encoding="utf-8") + print(f"已生成: {dst}") + print(f"保留条目: {len(items)}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/source/scripts/extract_unfollow_list.py b/source/scripts/extract_unfollow_list.py new file mode 100644 index 0000000..62fc57e --- /dev/null +++ b/source/scripts/extract_unfollow_list.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""Extract UPs marked as "可以取关" and output their mids to CSV. + +Read an UP analysis report and extract all UPs with action "可以取关", +then output their mids to a CSV file. +""" + +from __future__ import annotations + +import argparse +import csv +import re +import sys +from pathlib import Path +from typing import Any + + +def parse_report(report_path: Path) -> list[dict[str, Any]]: + """解析Markdown格式的UP分析报告,返回UP列表""" + if not report_path.exists(): + return [] + + text = report_path.read_text(encoding="utf-8") + items = [] + + # 按UP项分割(每个UP项以"## N. 名字 (mid: ...)"开头) + pattern = r"^## \d+\. (.+?)\s+\(mid:\s*(\d+)\)" + matches = list(re.finditer(pattern, text, re.MULTILINE)) + + for i, match in enumerate(matches): + start = match.start() + end = matches[i + 1].start() if i + 1 < len(matches) else len(text) + section = text[start:end] + + name = match.group(1).strip() + mid = int(match.group(2)) + + # 提取建议动作 + action_match = re.search(r"- 建议动作: (.+?)(?:\n|$)", section) + action = action_match.group(1).strip() if action_match else "" + + items.append({ + "mid": mid, + "name": name, + "action": action, + }) + + return items + + +def main() -> int: + parser = argparse.ArgumentParser(description="从UP分析报告中提取可以取关的UP") + parser.add_argument( + "--input-report", + default="source/output/reports/2_up_analysis_full_auto.md", + help="输入报告路径", + ) + parser.add_argument( + "--output-csv", + default="source/output/uids/4_unfollow_mids_list.txt", + help="输出文件路径", + ) + parser.add_argument( + "--format", + choices=["csv", "mid-only", "json"], + default="mid-only", + help="输出格式:csv(mid,name), mid-only(仅mid逗号分隔), json(JSON格式)", + ) + parser.add_argument( + "--with-names", + action="store_true", + help="在mid后添加UP名称(仅mid-only格式生效)", + ) + parser.add_argument( + "--split-size", + type=int, + default=0, + help="可选:将mid-only结果按N个一组拆分多个文件,例如100", + ) + + args = parser.parse_args() + + input_report = Path(args.input_report) + output_csv = Path(args.output_csv) + + if not input_report.exists(): + print(f"错误: 输入报告不存在: {input_report}", file=sys.stderr) + return 1 + + print(f"读取报告: {input_report}") + items = parse_report(input_report) + + if not items: + print("未能从报告中解析任何UP", file=sys.stderr) + return 1 + + # 筛选可以取关的UP + unfollow_items = [it for it in items if it.get("action") == "可以取关"] + + print(f"总 UP 数: {len(items)}") + print(f"可以取关: {len(unfollow_items)}") + + if not unfollow_items: + print("没有可以取关的UP") + return 0 + + # 输出格式 + if args.format == "csv": + # 标准CSV格式:mid, name + output_csv.parent.mkdir(parents=True, exist_ok=True) + with open(output_csv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=["mid", "name"]) + writer.writeheader() + for item in unfollow_items: + writer.writerow({"mid": item["mid"], "name": item["name"]}) + + print(f"\n✓ 已输出CSV格式到: {output_csv}") + print(f" 格式: mid,name") + print(f" 行数: {len(unfollow_items)}") + + elif args.format == "mid-only": + # 仅mid,逗号分隔 + mids = [str(it["mid"]) for it in unfollow_items] + + if args.with_names: + # mid:name 格式 + content = ",".join([f"{it['mid']}:{it['name']}" for it in unfollow_items]) + print(f"\n✓ 已输出mid:name列表到: {output_csv}") + print(f" 格式: mid1:name1,mid2:name2,...") + else: + # 仅mid + content = ",".join(mids) + print(f"\n✓ 已输出mid列表到: {output_csv}") + print(f" 格式: mid1,mid2,mid3,...") + + output_csv.parent.mkdir(parents=True, exist_ok=True) + output_csv.write_text(content, encoding="utf-8") + print(f" 数量: {len(mids)}") + + split_size = max(0, int(args.split_size)) + if split_size > 0: + groups = [mids[i:i + split_size] for i in range(0, len(mids), split_size)] + stem = output_csv.stem + suffix = output_csv.suffix or ".txt" + for i, group in enumerate(groups, start=1): + part_path = output_csv.with_name(f"{stem}_{i}{suffix}") + part_path.write_text(",".join(group), encoding="utf-8") + print(f" 已按每组{split_size}个拆分为{len(groups)}个文件") + + elif args.format == "json": + # JSON格式 + import json + + data = [{"mid": it["mid"], "name": it["name"]} for it in unfollow_items] + output_csv.parent.mkdir(parents=True, exist_ok=True) + output_csv.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + + print(f"\n✓ 已输出JSON格式到: {output_csv}") + print(f" 数量: {len(data)}") + + # 显示前10个示例 + if len(unfollow_items) > 0: + print(f"\n📋 示例(前10个):") + for item in unfollow_items[:10]: + print(f" - {item['mid']}: {item['name']}") + + if len(unfollow_items) > 10: + print(f" ... 还有 {len(unfollow_items) - 10} 个") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/source/scripts/remove_10content.py b/source/scripts/remove_10content.py new file mode 100644 index 0000000..aae1058 --- /dev/null +++ b/source/scripts/remove_10content.py @@ -0,0 +1,67 @@ +import argparse +import re + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="删除最近10条标题内容") + parser.add_argument( + "--input", + default="source/output/reports/2_up_analysis_full_auto.md", + help="输入报告路径", + ) + parser.add_argument( + "--output", + help="输出报告路径(默认覆盖输入)", + ) + return parser.parse_args() + +def main(): + args = parse_args() + input_file = args.input + output_file = args.output or input_file + + with open(input_file, 'r', encoding='utf-8') as f: + content = f.read() + + lines = content.split('\n') + new_lines = [] + i = 0 + while i < len(lines): + line = lines[i] + new_lines.append(line) + + if line.startswith('## '): + i += 1 + while i < len(lines): + curr = lines[i] + if curr.startswith('## '): + break + if curr.startswith('### '): + if '最近10条标题' in curr: + i += 1 + while i < len(lines) and lines[i].startswith(' - '): + i += 1 + continue + else: + break + if curr.startswith('- ') and not curr.startswith(' - '): + i += 1 + continue + if curr.startswith(' - '): + i += 1 + continue + new_lines.append(curr) + i += 1 + else: + i += 1 + + result = '\n'.join(new_lines) + result = re.sub(r'\n{3,}', '\n\n', result) + + with open(output_file, 'w', encoding='utf-8') as f: + f.write(result) + + print(f'Done: {output_file}') + return 0 + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/source/scripts/run_pipeline.py b/source/scripts/run_pipeline.py new file mode 100644 index 0000000..688bff5 --- /dev/null +++ b/source/scripts/run_pipeline.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +"""One-command pipeline: fetch titles -> batch analyze -> outputs. + +Pipeline outputs: +1) source/output/reports/1_up_titles_report.md +2) source/output/reports/2_up_analysis_full_auto.md +3) source/output/reports/3_up_keep_follow_only.md +4) source/output/uids/4_unfollow_mids_list.txt (+ split files) + +Pipeline steps: +1) 抓取视频标题 (analyze_up_content.py) +2) 分批AI分析 (batch_ai_summary_from_report.py) +3) 生成保留关注报告 (extract_keep_follow_doc.py) +4) 生成取关UID列表 (extract_unfollow_list.py) +5) 按首字母排序 (sort_up_main.py) +6) 提取分组信息 (extract_group_info.py) +""" + +from __future__ import annotations + +import argparse +import subprocess +import sys +from pathlib import Path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="一键运行完整功能链") + parser.add_argument( + "--input-json", + default="source/resources/export_uids.json", + help="UP资源文件路径,默认: source/resources/export_uids.json", + ) + parser.add_argument( + "--titles-report", + default="source/output/reports/1_up_titles_report.md", + help="标题抓取报告输出路径", + ) + parser.add_argument( + "--analysis-report", + default="source/output/reports/2_up_analysis_full_auto.md", + help="分批分析报告输出路径", + ) + parser.add_argument( + "--keep-report", + default="source/output/reports/3_up_keep_follow_only.md", + help="保留关注报告输出路径", + ) + parser.add_argument( + "--unfollow-uids", + default="source/output/uids/4_unfollow_mids_list.txt", + help="取关UID输出路径", + ) + parser.add_argument( + "--group_info", + default="source/output/uids/only_group_info.md", + help="分组信息输出路径", + ) + parser.add_argument("--titles-per-up", type=int, default=10, help="每个UP抓取标题数量") + parser.add_argument("--batch-size", type=int, default=20, help="分批分析每批条数") + parser.add_argument("--workers", type=int, default=6, help="并发请求数") + parser.add_argument("--max-retries", type=int, default=2, help="单条分析重试次数") + parser.add_argument("--request-timeout", type=float, default=60.0, help="单次请求超时") + parser.add_argument("--split-size", type=int, default=100, help="取关UID拆分分组大小") + parser.add_argument("--sleep-seconds", type=float, default=0.0, help="任务间隔秒数") + parser.add_argument("--retry-times", type=int, default=3, help="抓取重试次数") + parser.add_argument("--fetch-mode", choices=["auto", "api", "html"], default="auto", help="标题抓取模式") + parser.add_argument("--only-tag", default="", help="可选:仅处理包含该标签的UP") + parser.add_argument("--max-ups", type=int, default=0, help="可选:限制处理UP数量") + parser.add_argument("--bili-cookie", default="", help="可选:运行时传入B站Cookie") + parser.add_argument("--skip-fetch", action="store_true", help="跳过抓取阶段,直接使用已有标题报告") + parser.add_argument("--skip-analyze", action="store_true", help="跳过分析阶段,直接做产物提取") + parser.add_argument("--skip-sort", action="store_true", help="跳过排序阶段") + parser.add_argument("--skip-group", action="store_true", help="跳过提取分组阶段") + parser.add_argument("--python", default=sys.executable, help="指定Python解释器") + return parser.parse_args() + + +def run_cmd(cmd: list[str], title: str) -> None: + print(f"\n=== {title} ===") + print("$", " ".join(cmd)) + subprocess.run(cmd, check=True) + + +def main() -> int: + args = parse_args() + + for p in [ + Path(args.titles_report).parent, + Path(args.analysis_report).parent, + Path(args.keep_report).parent, + Path(args.unfollow_uids).parent, + ]: + p.mkdir(parents=True, exist_ok=True) + + if not args.skip_fetch: + fetch_cmd = [ + args.python, + "source/scripts/analyze_up_content.py", + "--input", + args.input_json, + "--output", + args.titles_report, + "--titles-per-up", + str(max(1, args.titles_per_up)), + "--retry-times", + str(max(1, args.retry_times)), + "--fetch-mode", + args.fetch_mode, + "--sleep-seconds", + str(max(0.0, args.sleep_seconds)), + "--skip-ai", + ] + if args.only_tag: + fetch_cmd += ["--only-tag", args.only_tag] + if args.max_ups > 0: + fetch_cmd += ["--max-ups", str(args.max_ups)] + if args.bili_cookie: + fetch_cmd += ["--bili-cookie", args.bili_cookie] + + run_cmd(fetch_cmd, "步骤1/6 抓取视频标题") + + if not args.skip_analyze: + analyze_cmd = [ + args.python, + "source/scripts/batch_ai_summary_from_report.py", + "--input-report", + args.titles_report, + "--output-report", + args.analysis_report, + "--batch-size", + str(max(1, args.batch_size)), + "--run-all-batches", + "--workers", + str(max(1, args.workers)), + "--max-retries", + str(max(1, args.max_retries)), + "--request-timeout", + str(max(1.0, args.request_timeout)), + "--sleep-seconds", + str(max(0.0, args.sleep_seconds)), + ] + run_cmd(analyze_cmd, "步骤2/6 分批AI分析") + + keep_cmd = [ + args.python, + "source/scripts/extract_keep_follow_doc.py", + "--input-report", + args.analysis_report, + "--output-report", + args.keep_report, + ] + run_cmd(keep_cmd, "步骤3/6 生成保留关注报告") + + uid_cmd = [ + args.python, + "source/scripts/extract_unfollow_list.py", + "--input-report", + args.analysis_report, + "--output-csv", + args.unfollow_uids, + "--format", + "mid-only", + "--split-size", + str(max(0, args.split_size)), + ] + run_cmd(uid_cmd, "步骤4/6 生成取关UID列表") + + sorted_report = "source/output/reports/5_sorted_up_analysis.md" + group_report = "source/output/reports/6_group_info.md" + + if not args.skip_sort: + sort_cmd = [ + args.python, + "source/scripts/sort_up_main.py", + "--input", + args.analysis_report, + "--output", + sorted_report, + ] + run_cmd(sort_cmd, "步骤5/6 按首字母排序") + + if not args.skip_group: + input_for_group = sorted_report if not args.skip_sort else args.analysis_report + group_cmd = [ + args.python, + "source/scripts/extract_group_info.py", + "--input", + input_for_group, + "--output", + group_report, + ] + run_cmd(group_cmd, "步骤6/6 提取分组信息") + + print("\n流水线完成。") + print(f"- 1 标题报告: {args.titles_report}") + print(f"- 2 分析报告: {args.analysis_report}") + print(f"- 3 保留报告: {args.keep_report}") + print(f"- 4 取关UID: {args.unfollow_uids}") + if not args.skip_sort: + print(f"- 5 排序报告: {sorted_report}") + if not args.skip_group: + print(f"- 6 分组报告: {group_report}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/source/scripts/sort_up_main.py b/source/scripts/sort_up_main.py new file mode 100644 index 0000000..d3ecc4e --- /dev/null +++ b/source/scripts/sort_up_main.py @@ -0,0 +1,93 @@ +import argparse +import re + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="对UP主按首字母排序") + parser.add_argument( + "--input", + default="source/output/reports/2_up_analysis_full_auto.md", + help="输入报告路径", + ) + parser.add_argument( + "--output", + help="输出报告路径(默认覆盖输入)", + ) + return parser.parse_args() + +def main(): + args = parse_args() + input_file = args.input + output_file = args.output or input_file + + with open(input_file, 'r', encoding='utf-8') as f: + content = f.read() + + lines = content.split('\n') + + header_lines = [] + section_starts = [] + for i, line in enumerate(lines): + if line.startswith('## '): + section_starts.append(i) + + if len(section_starts) < 2: + print('No sections found') + return 1 + + header = '\n'.join(lines[:section_starts[0]]) + sections_data = [] + + for idx in range(len(section_starts)): + start = section_starts[idx] + if idx + 1 < len(section_starts): + end = section_starts[idx + 1] + else: + end = len(lines) + + section_lines = lines[start:end] + section_text = '\n'.join(section_lines) + sections_data.append(section_text) + + sections_data = sections_data[1:] + + parsed = [] + for sec in sections_data: + match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec) + if match: + num = int(match.group(1)) + name = match.group(2) + mid = match.group(3) + parsed.append({ + 'num': num, + 'name': name, + 'mid': mid, + 'content': sec + }) + + def sort_key(item): + name = item['name'] + first_char = name[0].lower() if name else '' + if first_char.isdigit(): + return '0' + first_char + elif first_char.isalpha(): + return '1' + first_char + else: + return '2' + first_char + + parsed.sort(key=sort_key) + + new_content = header + '\n' + for i, sec in enumerate(parsed): + new_content += sec['content'] + '\n' + + with open(output_file, 'w', encoding='utf-8') as f: + f.write(new_content) + + print(f'Sorted {len(parsed)} sections') + print('First 10:') + for s in parsed[:10]: + print(f' {s["name"]}') + return 0 + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file