From 62fa9d292b033a50a09367629375b588c788e78b Mon Sep 17 00:00:00 2001
From: digouyou <2074920584@qq.com>
Date: Mon, 27 Apr 2026 00:04:53 +0800
Subject: [PATCH] add .gitignore to root, remove .all_i_need output from
tracking
---
.gitignore | 80 ++
readme.md | 116 +++
source/.gitignore | 80 ++
source/scripts/analyze_up_content.py | 690 ++++++++++++++++++
.../scripts/batch_ai_summary_from_report.py | 598 +++++++++++++++
source/scripts/extract_group_info.py | 101 +++
source/scripts/extract_keep_follow_doc.py | 104 +++
source/scripts/extract_unfollow_list.py | 174 +++++
source/scripts/remove_10content.py | 67 ++
source/scripts/run_pipeline.py | 208 ++++++
source/scripts/sort_up_main.py | 93 +++
11 files changed, 2311 insertions(+)
create mode 100644 .gitignore
create mode 100644 readme.md
create mode 100644 source/.gitignore
create mode 100644 source/scripts/analyze_up_content.py
create mode 100644 source/scripts/batch_ai_summary_from_report.py
create mode 100644 source/scripts/extract_group_info.py
create mode 100644 source/scripts/extract_keep_follow_doc.py
create mode 100644 source/scripts/extract_unfollow_list.py
create mode 100644 source/scripts/remove_10content.py
create mode 100644 source/scripts/run_pipeline.py
create mode 100644 source/scripts/sort_up_main.py
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..12717ad
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,80 @@
+# 1. 忽略操作系统自动生成的文件
+.DS_Store
+Thumbs.db
+*.lnk
+
+# 2. 忽略编译/构建产物
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# 3. 忽略IDE配置
+.vscode/
+.idea/
+*.swp
+*.swo
+*~
+
+# 4. 忽略日志文件
+*.log
+*.tmp
+*.temp
+*.md
+
+
+# 5. 忽略敏感数据
+*.env
+*.key
+*.pem
+*.cert
+config.yaml
+secrets/
+
+# 6. 忽略大型媒体文件
+*.mp4
+*.mov
+*.avi
+*.wav
+*.mp3
+*.zip
+*.tar
+*.gz
+*.7z
+*.rar
+
+# 7. 忽略数据分析/机器学习特有
+*.model
+*.h5
+*.pkl
+*.joblib
+.ipynb_checkpoints/
+
+# 8. 忽略你项目中的自动生成目录
+# 根据你的目录结构,忽略source/output/和source/reports/下的所有文件
+# 但保留目录结构本身(可以添加空的.gitkeep文件来保持空目录)
+source/output/**/*
+!source/output/.gitkeep
+source/reports/**/*
+!source/reports/.gitkeep
+source/.note
+source/.test_output
+source/.all_i_need
+source/.all_i_need/
diff --git a/readme.md b/readme.md
new file mode 100644
index 0000000..60f7544
--- /dev/null
+++ b/readme.md
@@ -0,0 +1,116 @@
+# B站关注清理工具 - Scripts 版
+
+> 一键命令运行全流程:`python source/scripts/run_pipeline.py`
+
+python source/scripts/run_pipeline.py --input-json source/resources/export_uids_test5.json
+
+本工具包含7个步骤的完整流水线:
+
+1. 抓取视频标题
+2. 分批AI分析
+3. 生成保留关注报告
+4. 生成取关UID列表
+5. 按首字母排序
+6. 提取分组信息
+7. 删除最近10条标题
+
+## 快速开始
+
+```powershell
+# 完整流程(推荐)
+python source/scripts/run_pipeline.py
+
+# 速度优先
+python source/scripts/run_pipeline.py --workers 8 --batch-size 30 --sleep-seconds 0
+
+# 试跑30个UP
+python source/scripts/run_pipeline.py --max-ups 30
+
+# 跳过抓取,使用已有标题报告
+python source/scripts/run_pipeline.py --skip-fetch
+
+# 跳过分析,仅生成产物
+python source/scripts/run_pipeline.py --skip-analyze
+
+# 跳过排序/分组/删除
+python source/scripts/run_pipeline.py --skip-sort --skip-group --skip-remove
+```
+
+## 输出文件
+
+| 文件 | 说明 |
+|------|------|
+| `source/output/reports/1_up_titles_report.md` | 标题抓取报告 |
+| `source/output/reports/2_up_analysis_full_auto.md` | AI分析报告(完整) |
+| `source/output/reports/3_up_keep_follow_only.md` | 保留关注报告 |
+| `source/output/uids/4_unfollow_mids_list.txt` | 取关UID列表 |
+| `source/output/reports/5_sorted_up_analysis.md` | 按首字母排序报告 |
+| `source/output/reports/6_group_info.md` | 提取分组信息报告 |
+| `source/output/reports/7_no_titles.md` | 最终报告(删除最近10条) |
+
+## 常用参数
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| `--workers` | 6 | 并发请求数 |
+| `--batch-size` | 20 | 每批分析条数 |
+| `--max-ups` | 0(全部) | 限制处理UP数量 |
+| `--split-size` | 100 | UID拆分大小 |
+| `--sleep-seconds` | 0 | 任务间隔秒数 |
+
+### 跳过参数
+
+| 参数 | 说明 |
+|------|------|
+| `--skip-fetch` | 跳过抓取阶段 |
+| `--skip-analyze` | 跳过分析阶段 |
+| `--skip-sort` | 跳过排序阶段 |
+| `--skip-group` | 跳过提取分组阶段 |
+| `--skip-remove` | 跳过删除最近10条阶段 |
+
+## 分步执行
+
+### 步骤1:抓取标题
+```powershell
+python source/scripts/analyze_up_content.py --skip-ai
+```
+
+### 步骤2:分批AI分析
+```powershell
+python source/scripts/batch_ai_summary_from_report.py --run-all-batches
+```
+
+### 步骤3:生成保留关注报告
+```powershell
+python source/scripts/extract_keep_follow_doc.py
+```
+
+### 步骤4:生成取关UID
+```powershell
+python source/scripts/extract_unfollow_list.py --format mid-only --split-size 100
+```
+
+### 步骤5:按首字母排序
+```powershell
+python source/scripts/sort_up_main.py
+```
+
+### 步骤6:提取分组信息
+```powershell
+python source/scripts/extract_group_info.py
+```
+
+### 步骤7:删除最近10条标题
+```powershell
+python source/scripts/remove_10content.py
+```
+
+## 先配置API
+
+编辑 [source/scripts/analyze_up_content.py](source/scripts/analyze_up_content.py) 顶部配置:
+
+```python
+VOLCENGINE_API_KEY = "你的火山引擎API Key"
+VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
+VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
+```
\ No newline at end of file
diff --git a/source/.gitignore b/source/.gitignore
new file mode 100644
index 0000000..12717ad
--- /dev/null
+++ b/source/.gitignore
@@ -0,0 +1,80 @@
+# 1. 忽略操作系统自动生成的文件
+.DS_Store
+Thumbs.db
+*.lnk
+
+# 2. 忽略编译/构建产物
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# 3. 忽略IDE配置
+.vscode/
+.idea/
+*.swp
+*.swo
+*~
+
+# 4. 忽略日志文件
+*.log
+*.tmp
+*.temp
+*.md
+
+
+# 5. 忽略敏感数据
+*.env
+*.key
+*.pem
+*.cert
+config.yaml
+secrets/
+
+# 6. 忽略大型媒体文件
+*.mp4
+*.mov
+*.avi
+*.wav
+*.mp3
+*.zip
+*.tar
+*.gz
+*.7z
+*.rar
+
+# 7. 忽略数据分析/机器学习特有
+*.model
+*.h5
+*.pkl
+*.joblib
+.ipynb_checkpoints/
+
+# 8. 忽略你项目中的自动生成目录
+# 根据你的目录结构,忽略source/output/和source/reports/下的所有文件
+# 但保留目录结构本身(可以添加空的.gitkeep文件来保持空目录)
+source/output/**/*
+!source/output/.gitkeep
+source/reports/**/*
+!source/reports/.gitkeep
+source/.note
+source/.test_output
+source/.all_i_need
+source/.all_i_need/
diff --git a/source/scripts/analyze_up_content.py b/source/scripts/analyze_up_content.py
new file mode 100644
index 0000000..fb7c973
--- /dev/null
+++ b/source/scripts/analyze_up_content.py
@@ -0,0 +1,690 @@
+#!/usr/bin/env python3
+"""Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API.
+
+Input JSON format (list of objects):
+[
+ {"mid": 12345, "name": "UP Name", "tag": ["准备取关"]}
+]
+"""
+
+from __future__ import annotations
+
+import argparse
+import hashlib
+import html
+import json
+import random
+import re
+import sys
+import time
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+from urllib import error, parse, request
+
+
+BILIBILI_API = "https://api.bilibili.com/x/space/arc/search"
+BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search"
+BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav"
+# 可选:如果仍频繁触发412,可填浏览器里复制的Cookie字符串。
+BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696"
+RUNTIME_BILIBILI_COOKIE = ""
+DEFAULT_USER_AGENT = (
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/124.0.0.0 Safari/537.36"
+)
+MIXIN_KEY_ENC_TAB = [
+ 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35,
+ 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13,
+ 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4,
+ 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52,
+]
+
+# 在这里直接填写火山引擎配置。
+VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
+VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
+VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
+
+
+@dataclass
+class UpItem:
+ mid: int
+ name: str
+ tag: list[str]
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ description="抓取 UP 前10个视频标题,并调用火山引擎 API 生成分析报告"
+ )
+ parser.add_argument(
+ "--input",
+ default="./source/resources/export_uids.json",
+ help="输入 JSON 文件路径,默认: ./source/resources/export_uids.json",
+ )
+ parser.add_argument(
+ "--output",
+ default="./source/output/reports/up_titles_report.md",
+ help="输出 Markdown 报告路径,默认: ./source/output/reports/up_titles_report.md",
+ )
+ parser.add_argument(
+ "--titles-per-up",
+ type=int,
+ default=10,
+ help="每个 UP 抓取的视频标题数量,默认: 10",
+ )
+ parser.add_argument(
+ "--max-ups",
+ type=int,
+ default=0,
+ help="最多处理多少个 UP,0 表示全部",
+ )
+ parser.add_argument(
+ "--only-tag",
+ default="",
+ help="只处理包含该标签的 UP,例如: 准备取关;留空表示不过滤",
+ )
+ parser.add_argument(
+ "--sleep-seconds",
+ type=float,
+ default=0.8,
+ help="每个 UP 抓取后的等待秒数,默认: 0.8",
+ )
+ parser.add_argument(
+ "--retry-times",
+ type=int,
+ default=3,
+ help="抓取重试次数(遇到412/-799时),默认: 3",
+ )
+ parser.add_argument(
+ "--test-mid",
+ type=int,
+ default=0,
+ help="测试模式:只抓取这个mid,不读取输入文件",
+ )
+ parser.add_argument(
+ "--test-name",
+ default="TEST_UP",
+ help="测试模式下显示名称,默认: TEST_UP",
+ )
+ parser.add_argument(
+ "--skip-ai",
+ action="store_true",
+ help="只测试抓取,不调用AI分析",
+ )
+ parser.add_argument(
+ "--debug",
+ action="store_true",
+ help="输出抓取调试信息",
+ )
+ parser.add_argument(
+ "--bili-cookie",
+ default="",
+ help="可选:运行时传入B站Cookie,优先级高于脚本内BILIBILI_COOKIE",
+ )
+ parser.add_argument(
+ "--fetch-mode",
+ choices=["auto", "api", "html"],
+ default="auto",
+ help="抓取模式: auto(先API后HTML)/api/html,默认: auto",
+ )
+ parser.add_argument(
+ "--analyze-from-report",
+ default="",
+ help="从已有报告读取标题并仅执行AI分析,例如: source/up_analysis_report.md",
+ )
+ parser.add_argument(
+ "--batch-size",
+ type=int,
+ default=30,
+ help="分批分析时每批数量,默认: 30",
+ )
+ parser.add_argument(
+ "--batch-index",
+ type=int,
+ default=1,
+ help="分批分析批次序号(从1开始),默认: 1",
+ )
+ return parser.parse_args()
+
+
+def parse_report_items(report_path: Path) -> list[dict[str, Any]]:
+ lines = report_path.read_text(encoding="utf-8").splitlines()
+ items: list[dict[str, Any]] = []
+ current: dict[str, Any] | None = None
+ section = ""
+
+ for line in lines:
+ m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
+ if m:
+ if current is not None:
+ items.append(current)
+ current = {
+ "mid": int(m.group(2)),
+ "name": m.group(1).strip(),
+ "tag": [],
+ "url": f"https://space.bilibili.com/{int(m.group(2))}/video",
+ "titles": [],
+ "analysis": "",
+ "error": "",
+ }
+ section = ""
+ continue
+
+ if current is None:
+ continue
+
+ if line.startswith("- 主页: "):
+ current["url"] = line.replace("- 主页: ", "", 1).strip()
+ continue
+ if line.startswith("- 标签: "):
+ raw_tag = line.replace("- 标签: ", "", 1).strip()
+ current["tag"] = [] if raw_tag in ("", "无") else [x.strip() for x in raw_tag.split(",") if x.strip()]
+ continue
+ if line == "### 最近10条标题":
+ section = "titles"
+ continue
+ if line == "### AI分析":
+ section = "analysis"
+ continue
+ if line == "### 异常":
+ section = "error"
+ continue
+ if line.startswith("### "):
+ section = ""
+ continue
+
+ if section == "titles" and line.startswith("- "):
+ t = line[2:].strip()
+ if t and t != "(未抓取到标题)":
+ current["titles"].append(t)
+ elif section == "analysis":
+ if line.strip():
+ if current["analysis"]:
+ current["analysis"] += "\n" + line.strip()
+ else:
+ current["analysis"] = line.strip()
+ elif section == "error" and line.startswith("- "):
+ current["error"] = line[2:].strip()
+
+ if current is not None:
+ items.append(current)
+ return items
+
+
+def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int:
+ report_path = Path(args.analyze_from_report)
+ if not report_path.exists():
+ print(f"报告文件不存在: {report_path}", file=sys.stderr)
+ return 1
+
+ items = parse_report_items(report_path)
+ if not items:
+ print("报告中未解析到可分析条目", file=sys.stderr)
+ return 1
+
+ pending = [
+ it for it in items
+ if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析")
+ ]
+ if not pending:
+ print("报告中没有待分析条目(可能已全部分析完成)")
+ output_path.write_text(build_report(items), encoding="utf-8")
+ return 0
+
+ batch_size = max(args.batch_size, 1)
+ batch_index = max(args.batch_index, 1)
+ start = (batch_index - 1) * batch_size
+ end = start + batch_size
+ batch = pending[start:end]
+ if not batch:
+ print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}")
+ output_path.write_text(build_report(items), encoding="utf-8")
+ return 0
+
+ print(
+ f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, "
+ f"本批{len(batch)}条, 待分析总数{len(pending)}"
+ )
+
+ key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
+ for idx, it in enumerate(batch, start=1):
+ print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})")
+ try:
+ analysis = analyze_titles(it["name"], it["url"], it["titles"])
+ origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
+ if origin_idx is not None:
+ items[origin_idx]["analysis"] = analysis
+ items[origin_idx]["error"] = ""
+ except Exception as exc: # noqa: BLE001
+ origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
+ if origin_idx is not None:
+ items[origin_idx]["error"] = str(exc)
+ time.sleep(max(args.sleep_seconds, 0.0))
+
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ output_path.write_text(build_report(items), encoding="utf-8")
+ print(f"分批分析报告已生成: {output_path}")
+ return 0
+
+
+def load_up_items(input_path: Path) -> list[UpItem]:
+ raw = json.loads(input_path.read_text(encoding="utf-8"))
+ if not isinstance(raw, list):
+ raise ValueError("输入 JSON 必须是数组")
+
+ items: list[UpItem] = []
+ for idx, obj in enumerate(raw):
+ if not isinstance(obj, dict):
+ raise ValueError(f"第 {idx + 1} 项不是对象")
+ mid = obj.get("mid")
+ name = obj.get("name", "")
+ tags = obj.get("tag", [])
+ if mid is None:
+ continue
+ try:
+ mid_int = int(mid)
+ except (TypeError, ValueError):
+ continue
+ if not isinstance(name, str):
+ name = str(name)
+ if not isinstance(tags, list):
+ tags = []
+ tags = [str(t) for t in tags]
+ items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags))
+ return items
+
+
+def http_get_json(
+ url: str,
+ timeout: float = 20.0,
+ referer: str = "https://space.bilibili.com/",
+) -> dict[str, Any]:
+ headers = {
+ "User-Agent": DEFAULT_USER_AGENT,
+ "Referer": referer,
+ "Origin": "https://www.bilibili.com",
+ "Accept": "application/json, text/plain, */*",
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
+ }
+ cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
+ if cookie:
+ headers["Cookie"] = cookie
+ req = request.Request(url, headers=headers, method="GET")
+ with request.urlopen(req, timeout=timeout) as resp:
+ body = resp.read().decode("utf-8", errors="replace")
+ return json.loads(body)
+
+
+def http_get_text(
+ url: str,
+ timeout: float = 20.0,
+ referer: str = "https://space.bilibili.com/",
+) -> str:
+ headers = {
+ "User-Agent": DEFAULT_USER_AGENT,
+ "Referer": referer,
+ "Origin": "https://www.bilibili.com",
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
+ }
+ cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
+ if cookie:
+ headers["Cookie"] = cookie
+ req = request.Request(url, headers=headers, method="GET")
+ with request.urlopen(req, timeout=timeout) as resp:
+ return resp.read().decode("utf-8", errors="replace")
+
+
+def get_mixin_key(img_key: str, sub_key: str) -> str:
+ origin = img_key + sub_key
+ mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB)
+ return mixed[:32]
+
+
+def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]:
+ params = {k: str(v) for k, v in base_params.items()}
+ params["wts"] = str(int(time.time()))
+ params = dict(sorted(params.items()))
+ filtered = {
+ k: re.sub(r"[!'()*]", "", v)
+ for k, v in params.items()
+ }
+ query = parse.urlencode(filtered)
+ w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest()
+ filtered["w_rid"] = w_rid
+ return filtered
+
+
+def get_wbi_mixin_key() -> str:
+ data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/")
+ if data.get("code") != 0:
+ raise RuntimeError(
+ f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}"
+ )
+ wbi_img = data.get("data", {}).get("wbi_img", {})
+ img_url = wbi_img.get("img_url", "")
+ sub_url = wbi_img.get("sub_url", "")
+ if not img_url or not sub_url:
+ raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url")
+ img_key = img_url.rsplit("/", 1)[-1].split(".")[0]
+ sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0]
+ return get_mixin_key(img_key, sub_key)
+
+
+def parse_titles_from_data(data: dict[str, Any]) -> list[str]:
+ vlist = data.get("data", {}).get("list", {}).get("vlist", [])
+ if not isinstance(vlist, list):
+ return []
+ titles: list[str] = []
+ for item in vlist:
+ if not isinstance(item, dict):
+ continue
+ title = item.get("title", "")
+ if isinstance(title, str) and title.strip():
+ titles.append(clean_html(title.strip()))
+ return titles
+
+
+def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]:
+ url = f"https://space.bilibili.com/{mid}/video"
+ html_text = http_get_text(url, referer="https://www.bilibili.com/")
+
+ # 页面中视频封面
常携带标题到alt字段,优先从这里提取。
+ alt_candidates = re.findall(
+ r'
]*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"',
+ html_text,
+ flags=re.IGNORECASE,
+ )
+
+ titles: list[str] = []
+ seen: set[str] = set()
+ for raw in alt_candidates:
+ t = clean_html(html.unescape(raw)).strip()
+ if not t or t in seen:
+ continue
+ seen.add(t)
+ titles.append(t)
+ if len(titles) >= titles_per_up:
+ break
+
+ if debug:
+ print(f"[debug] HTML模式提取到 {len(titles)} 条标题")
+ return titles
+
+
+def fetch_titles(
+ mid: int,
+ titles_per_up: int,
+ retry_times: int = 3,
+ debug: bool = False,
+ fetch_mode: str = "auto",
+) -> list[str]:
+ base_params = {
+ "mid": str(mid),
+ "pn": "1",
+ "ps": str(titles_per_up),
+ "order": "pubdate",
+ "index": "1",
+ "jsonp": "json",
+ }
+
+ errors: list[str] = []
+ if fetch_mode in ("auto", "api"):
+ # 优先使用wbi接口,稳定性通常更好。
+ mixin_key = ""
+ try:
+ mixin_key = get_wbi_mixin_key()
+ except Exception as exc: # noqa: BLE001
+ if debug:
+ print(f"[debug] 获取wbi密钥失败: {exc}")
+
+ for attempt in range(1, max(retry_times, 1) + 1):
+ try:
+ if mixin_key:
+ signed = build_wbi_params(base_params, mixin_key)
+ url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}"
+ else:
+ url = f"{BILIBILI_API}?{parse.urlencode(base_params)}"
+ data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video")
+ code = data.get("code", -1)
+ if code == 0:
+ titles = parse_titles_from_data(data)
+ if titles:
+ return titles
+ errors.append("接口返回成功但标题为空")
+ else:
+ errors.append(f"code={code}, message={data.get('message', 'unknown')} ")
+ except error.HTTPError as exc:
+ errors.append(f"HTTP {exc.code} {exc.reason}")
+ except Exception as exc: # noqa: BLE001
+ errors.append(str(exc))
+
+ sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0))
+ if debug:
+ print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]},{sleep_for:.1f}s后重试")
+ time.sleep(sleep_for)
+
+ if fetch_mode in ("auto", "html"):
+ try:
+ html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug)
+ if html_titles:
+ return html_titles
+ errors.append("HTML模式未提取到标题")
+ except Exception as exc: # noqa: BLE001
+ errors.append(f"HTML模式失败: {exc}")
+
+ joined = "; ".join(errors[-3:])
+ if ("412" in joined) or ("-799" in joined):
+ hint = "提示: 请在脚本里填写BILIBILI_COOKIE,或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\""
+ raise RuntimeError(f"{joined}; {hint}")
+ raise RuntimeError(joined)
+
+
+def clean_html(text: str) -> str:
+ return re.sub(r"<[^>]+>", "", text)
+
+
+def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str:
+ api_key = VOLCENGINE_API_KEY.strip()
+ base_url = VOLCENGINE_BASE_URL.strip()
+ model = VOLCENGINE_MODEL.strip()
+
+ if (not api_key) or ("在这里填" in api_key):
+ raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
+ if (not model) or ("在这里填" in model):
+ raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
+ if not base_url:
+ raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
+
+ url = f"{base_url.rstrip('/')}/chat/completions"
+ payload = {
+ "model": model,
+ "messages": [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt},
+ ],
+ "temperature": 0.4,
+ }
+ data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
+
+ req = request.Request(
+ url,
+ data=data,
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {api_key}",
+ },
+ method="POST",
+ )
+
+ with request.urlopen(req, timeout=60) as resp:
+ body = resp.read().decode("utf-8", errors="replace")
+ result = json.loads(body)
+ content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
+ if not isinstance(content, str) or not content.strip():
+ raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}")
+ return content.strip()
+
+
+def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str:
+ system_prompt = (
+ "你是一个内容分析助手。根据视频标题判断UP主内容方向,并给出是否建议取关。"
+ "输出必须是简体中文,且严格按照用户给定的Markdown格式。"
+ )
+ joined_titles = "\n".join(f"- {t}" for t in titles)
+ user_prompt = f"""
+请分析以下UP主最近视频标题:
+
+UP主:{up_name}
+主页:{up_url}
+标题:
+{joined_titles}
+
+请按以下格式输出(不要增加其它段落):
+1) 内容定位:一句话
+2) 受众画像:一句话
+3) 近期内容倾向:2-3点,使用-开头
+4) 质量评价:80-120字
+5) 取关建议:保留关注/可以取关(二选一)
+6) 建议理由:50-100字
+""".strip()
+ return call_volcengine_chat(system_prompt, user_prompt)
+
+
+def build_report(results: list[dict[str, Any]]) -> str:
+ now = time.strftime("%Y-%m-%d %H:%M:%S")
+ lines: list[str] = []
+ lines.append("# UP主内容分析报告")
+ lines.append("")
+ lines.append(f"- 生成时间: {now}")
+ lines.append(f"- 分析数量: {len(results)}")
+ lines.append("")
+
+ for idx, item in enumerate(results, start=1):
+ lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
+ lines.append("")
+ lines.append(f"- 主页: {item['url']}")
+ tags = item.get("tag", [])
+ lines.append(f"- 标签: {', '.join(tags) if tags else '无'}")
+ lines.append("")
+ lines.append("### 最近10条标题")
+ lines.append("")
+ titles = item.get("titles", [])
+ if titles:
+ for t in titles:
+ lines.append(f"- {t}")
+ else:
+ lines.append("- (未抓取到标题)")
+ lines.append("")
+
+ analysis = item.get("analysis", "")
+ if analysis:
+ lines.append("### AI分析")
+ lines.append("")
+ lines.append(analysis)
+ lines.append("")
+
+ error_msg = item.get("error", "")
+ if error_msg:
+ lines.append("### 异常")
+ lines.append("")
+ lines.append(f"- {error_msg}")
+ lines.append("")
+
+ return "\n".join(lines).rstrip() + "\n"
+
+
+def main() -> int:
+ global RUNTIME_BILIBILI_COOKIE
+ args = parse_args()
+ RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip()
+ input_path = Path(args.input)
+ output_path = Path(args.output)
+
+ if args.analyze_from_report:
+ return run_batch_analysis_from_report(args, output_path)
+
+ if args.test_mid > 0:
+ items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ]
+ print(f"测试模式: 仅处理 mid={args.test_mid}")
+ else:
+ if not input_path.exists():
+ print(f"输入文件不存在: {input_path}", file=sys.stderr)
+ return 1
+
+ try:
+ items = load_up_items(input_path)
+ except Exception as exc:
+ print(f"加载输入文件失败: {exc}", file=sys.stderr)
+ return 1
+
+ if args.only_tag:
+ items = [it for it in items if args.only_tag in it.tag]
+
+ if args.max_ups and args.max_ups > 0:
+ items = items[: args.max_ups]
+
+ if not items:
+ print("没有可处理的 UP 数据", file=sys.stderr)
+ return 1
+
+ print(f"开始处理 {len(items)} 个 UP...")
+ if args.skip_ai:
+ print("已启用 --skip-ai,仅测试抓取标题")
+ if args.debug:
+ print(f"[debug] 当前抓取模式: {args.fetch_mode}")
+
+ results: list[dict[str, Any]] = []
+ for idx, item in enumerate(items, start=1):
+ up_url = f"https://space.bilibili.com/{item.mid}/video"
+ row: dict[str, Any] = {
+ "mid": item.mid,
+ "name": item.name or f"mid_{item.mid}",
+ "tag": item.tag,
+ "url": up_url,
+ "titles": [],
+ "analysis": "",
+ "error": "",
+ }
+
+ print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})")
+ try:
+ titles = fetch_titles(
+ item.mid,
+ args.titles_per_up,
+ retry_times=args.retry_times,
+ debug=args.debug,
+ fetch_mode=args.fetch_mode,
+ )
+ row["titles"] = titles
+ if not titles:
+ row["error"] = "未抓取到标题,可能是接口限制或UP无公开视频"
+ elif args.skip_ai:
+ row["analysis"] = "测试模式已跳过AI分析"
+ else:
+ row["analysis"] = analyze_titles(row["name"], up_url, titles)
+ except error.HTTPError as exc:
+ row["error"] = f"HTTP错误: {exc.code} {exc.reason}"
+ except error.URLError as exc:
+ row["error"] = f"网络错误: {exc.reason}"
+ except Exception as exc: # noqa: BLE001
+ row["error"] = str(exc)
+
+ if args.debug and row["titles"]:
+ sample = row["titles"][: min(3, len(row["titles"]))]
+ print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}")
+
+ results.append(row)
+ time.sleep(max(args.sleep_seconds, 0))
+
+ report = build_report(results)
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ output_path.write_text(report, encoding="utf-8")
+ print(f"报告已生成: {output_path}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/source/scripts/batch_ai_summary_from_report.py b/source/scripts/batch_ai_summary_from_report.py
new file mode 100644
index 0000000..9308608
--- /dev/null
+++ b/source/scripts/batch_ai_summary_from_report.py
@@ -0,0 +1,598 @@
+#!/usr/bin/env python3
+"""Batch AI summary from existing UP markdown report.
+
+Read an existing report (e.g. source/up_analysis_report.md),
+extract each UP's title list, and generate AI summaries in batches.
+"""
+
+from __future__ import annotations
+
+import argparse
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import json
+import math
+import re
+import sys
+import time
+from pathlib import Path
+from typing import Any
+from urllib import request
+
+# Fill your Volcengine Ark settings here.
+VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
+VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
+VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
+
+SKIP_MARKERS = {
+ "",
+ "测试模式已跳过AI分析",
+ "(待分析)",
+}
+
+# 预设分组及关键词规则(可自行扩展)。
+PRESET_GROUPS: dict[str, list[str]] = {
+ "AAA_核心每日必读":[
+ "编程", "算法", "工程", "干货", "新闻", "趋势",
+ ],
+ "AA_编程信息干货必留": [
+ "编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构",
+ ],
+ "A_硬核知识保留": [
+ "科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度",
+ ],
+ "B_技能学习保留": [
+ "英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作",
+ ],
+ "C_资讯快餐观察": [
+ "热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势",
+ ],
+ "D_娱乐消遣可取关": [
+ "搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣",
+ ],
+ "E_营销带货谨慎": [
+ "好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活",
+ ],
+}
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结")
+ parser.add_argument(
+ "--input-report",
+ default="source/output/reports/1_up_titles_report.md",
+ help="已有标题报告路径",
+ )
+ parser.add_argument(
+ "--output-report",
+ default="source/output/reports/2_up_analysis_full_auto.md",
+ help="输出报告路径",
+ )
+ parser.add_argument(
+ "--batch-size",
+ type=int,
+ default=20,
+ help="每批处理数量,默认: 20",
+ )
+ parser.add_argument(
+ "--batch-index",
+ type=int,
+ default=1,
+ help="批次序号(从1开始),默认: 1",
+ )
+ parser.add_argument(
+ "--sleep-seconds",
+ type=float,
+ default=0.0,
+ help="提交任务间隔秒数,默认: 0(并发模式建议0)",
+ )
+ parser.add_argument(
+ "--workers",
+ type=int,
+ default=4,
+ help="并发请求数,默认: 4",
+ )
+ parser.add_argument(
+ "--max-retries",
+ type=int,
+ default=2,
+ help="单个UP分析最大重试次数,默认: 2",
+ )
+ parser.add_argument(
+ "--request-timeout",
+ type=float,
+ default=60.0,
+ help="单次AI请求超时秒数,默认: 60",
+ )
+ parser.add_argument(
+ "--force",
+ action="store_true",
+ help="强制覆盖已有AI分析(默认只处理待分析项)",
+ )
+ parser.add_argument(
+ "--debug",
+ action="store_true",
+ help="输出调试信息",
+ )
+ parser.add_argument(
+ "--config-from",
+ default="source/scripts/analyze_up_content.py",
+ help="自动读取API配置的脚本路径",
+ )
+ parser.add_argument(
+ "--run-all-batches",
+ action="store_true",
+ help="自动连续跑完所有批次(忽略batch-index)",
+ )
+ return parser.parse_args()
+
+
+def load_api_config_from_script(path: Path) -> dict[str, str]:
+ if not path.exists():
+ return {}
+ text = path.read_text(encoding="utf-8", errors="replace")
+ result: dict[str, str] = {}
+ for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"):
+ m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE)
+ if m:
+ result[key] = m.group(1).strip()
+ return result
+
+
+def parse_report(path: Path) -> list[dict[str, Any]]:
+ lines = path.read_text(encoding="utf-8").splitlines()
+
+ items: list[dict[str, Any]] = []
+ current: dict[str, Any] | None = None
+ section = ""
+
+ for line in lines:
+ m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
+ if m:
+ if current is not None:
+ items.append(current)
+ mid = int(m.group(2))
+ current = {
+ "mid": mid,
+ "name": m.group(1).strip(),
+ "tag": [],
+ "url": f"https://space.bilibili.com/{mid}/video",
+ "titles": [],
+ "analysis": "",
+ "group": "",
+ "action": "",
+ "reason": "",
+ "error": "",
+ }
+ section = ""
+ continue
+
+ if current is None:
+ continue
+
+ if line.startswith("- 主页: "):
+ current["url"] = line.replace("- 主页: ", "", 1).strip()
+ continue
+ if line.startswith("- 标签: "):
+ raw = line.replace("- 标签: ", "", 1).strip()
+ current["tag"] = [] if raw in ("", "无") else [x.strip() for x in raw.split(",") if x.strip()]
+ continue
+
+ if line == "### 最近10条标题":
+ section = "titles"
+ continue
+ if line == "### AI分析":
+ section = "analysis"
+ continue
+ if line == "### 分组建议":
+ section = "group"
+ continue
+ if line == "### 异常":
+ section = "error"
+ continue
+ if line.startswith("### "):
+ section = ""
+ continue
+
+ if section == "titles" and line.startswith("- "):
+ text = line[2:].strip()
+ if text and text != "(未抓取到标题)":
+ current["titles"].append(text)
+ elif section == "analysis" and line.strip():
+ current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip()
+ elif section == "group":
+ if line.startswith("- 预设分组: "):
+ current["group"] = line.replace("- 预设分组: ", "", 1).strip()
+ elif line.startswith("- 建议动作: "):
+ current["action"] = line.replace("- 建议动作: ", "", 1).strip()
+ elif line.startswith("- 判断依据: "):
+ current["reason"] = line.replace("- 判断依据: ", "", 1).strip()
+ elif line.strip() == "(待分组)":
+ current["group"] = ""
+ current["action"] = ""
+ current["reason"] = ""
+ elif section == "error" and line.startswith("- "):
+ current["error"] = line[2:].strip()
+
+ if current is not None:
+ items.append(current)
+
+ return items
+
+
+def call_volcengine_chat(
+ system_prompt: str,
+ user_prompt: str,
+ cfg: dict[str, str],
+ timeout: float,
+) -> str:
+ api_key = cfg.get("VOLCENGINE_API_KEY", "").strip()
+ model = cfg.get("VOLCENGINE_MODEL", "").strip()
+ base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip()
+
+ if (not api_key) or ("在这里填" in api_key):
+ raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
+ if (not model) or ("在这里填" in model):
+ raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
+ if not base_url:
+ raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
+
+ payload = {
+ "model": model,
+ "messages": [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt},
+ ],
+ "temperature": 0.4,
+ }
+
+ body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
+ req = request.Request(
+ f"{base_url.rstrip('/')}/chat/completions",
+ data=body,
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {api_key}",
+ },
+ method="POST",
+ )
+
+ with request.urlopen(req, timeout=timeout) as resp:
+ text = resp.read().decode("utf-8", errors="replace")
+
+ data = json.loads(text)
+ content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
+ if not isinstance(content, str) or not content.strip():
+ raise RuntimeError(f"AI响应异常: {text[:500]}")
+ return content.strip()
+
+
+def summarize_one_up(
+ name: str,
+ mid: int,
+ titles: list[str],
+ tags: list[str],
+ cfg: dict[str, str],
+ timeout: float,
+) -> dict[str, str]:
+ system_prompt = (
+ "你是内容定位与订阅决策助手。"
+ "你必须输出合法JSON,不要输出其它文本。"
+ )
+ joined_titles = "\n".join(f"- {t}" for t in titles)
+ joined_tags = "、".join(tags) if tags else "无"
+ rule_hint = heuristic_group_hint(titles, tags)
+ groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS)
+
+ user_prompt = f"""
+请基于以下信息完成分组与总结。
+
+UP主: {name}
+mid: {mid}
+标签: {joined_tags}
+最近标题:
+{joined_titles}
+
+预设分组:
+{groups_desc}
+
+代码规则初判:
+{rule_hint}
+
+要求:
+1) 输出JSON对象,字段严格为: summary, group, action, reason。
+2) summary: 一段中文总结,50-100字。
+3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。
+4) action: 只能是"保留关注"或"可以取关"。敏感一点,只保留真正核心优质的up,其他都建议取关。
+5) reason: 30-60字,解释为什么分到该组并给出该动作。
+""".strip()
+
+ content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout)
+ return parse_ai_json(content)
+
+
+def parse_ai_json(content: str) -> dict[str, str]:
+ text = content.strip()
+ if text.startswith("```"):
+ text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
+ text = re.sub(r"\n?```$", "", text).strip()
+ m = re.search(r"\{.*\}", text, flags=re.DOTALL)
+ if m:
+ text = m.group(0)
+ data = json.loads(text)
+ summary = str(data.get("summary", "")).strip()
+ group = str(data.get("group", "")).strip()
+ action = str(data.get("action", "")).strip()
+ reason = str(data.get("reason", "")).strip()
+ if not summary:
+ raise RuntimeError("AI返回缺少summary")
+ if group not in PRESET_GROUPS:
+ raise RuntimeError(f"AI返回未知group: {group}")
+ if action not in ("保留关注", "可以取关"):
+ raise RuntimeError(f"AI返回未知action: {action}")
+ if not reason:
+ reason = "基于标题内容与更新风格综合判断。"
+ return {
+ "summary": summary,
+ "group": group,
+ "action": action,
+ "reason": reason,
+ }
+
+
+def heuristic_group_hint(titles: list[str], tags: list[str]) -> str:
+ text = "\n".join(titles) + "\n" + " ".join(tags)
+ score: dict[str, int] = {k: 0 for k in PRESET_GROUPS}
+ lower_text = text.lower()
+ for group, words in PRESET_GROUPS.items():
+ for w in words:
+ w_lower = w.lower()
+ if w_lower in lower_text:
+ score[group] += 1
+ ranked = sorted(score.items(), key=lambda x: x[1], reverse=True)
+ best_group, best_score = ranked[0]
+ if best_score <= 0:
+ return "未命中关键词,倾向按内容专业度与稳定性判断。"
+ top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3])
+ return f"关键词命中最高组={best_group}(score={best_score}),参考分布: {top3}"
+
+
+def summarize_one_up_with_retry(
+ item: dict[str, Any],
+ cfg: dict[str, str],
+ max_retries: int,
+ timeout: float,
+ debug: bool,
+) -> dict[str, str]:
+ last_exc: Exception | None = None
+ total_try = max(1, max_retries)
+ for attempt in range(1, total_try + 1):
+ try:
+ return summarize_one_up(
+ item["name"],
+ item["mid"],
+ item.get("titles", []),
+ item.get("tag", []),
+ cfg,
+ timeout=timeout,
+ )
+ except Exception as exc: # noqa: BLE001
+ last_exc = exc
+ if debug:
+ print(f"[debug] {item['name']} 第{attempt}次失败: {exc}")
+ if attempt < total_try:
+ time.sleep(min(2.0, 0.5 * attempt))
+ raise RuntimeError(str(last_exc) if last_exc else "未知错误")
+
+
+def build_report(items: list[dict[str, Any]], batch_note: str) -> str:
+ now = time.strftime("%Y-%m-%d %H:%M:%S")
+ lines: list[str] = [
+ "# UP主内容分析报告(分批AI总结)",
+ "",
+ f"- 生成时间: {now}",
+ f"- 分析数量: {len(items)}",
+ f"- 处理说明: {batch_note}",
+ "",
+ ]
+
+ group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS}
+ action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0}
+ for item in items:
+ g = item.get("group", "")
+ a = item.get("action", "")
+ if g in group_stats:
+ group_stats[g] += 1
+ if a in action_stats:
+ action_stats[a] += 1
+
+ lines.append("## 分组统计")
+ lines.append("")
+ for g, c in group_stats.items():
+ lines.append(f"- {g}: {c}")
+ lines.append(f"- 保留关注: {action_stats['保留关注']}")
+ lines.append(f"- 可以取关: {action_stats['可以取关']}")
+ lines.append("")
+
+ for idx, item in enumerate(items, start=1):
+ lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
+ lines.append("")
+ lines.append(f"- 主页: {item['url']}")
+ tags = item.get("tag", [])
+ lines.append(f"- 标签: {', '.join(tags) if tags else '无'}")
+ lines.append("")
+ lines.append("### 最近10条标题")
+ lines.append("")
+ titles = item.get("titles", [])
+ if titles:
+ for t in titles:
+ lines.append(f"- {t}")
+ else:
+ lines.append("- (未抓取到标题)")
+ lines.append("")
+
+ lines.append("### AI分析")
+ lines.append("")
+ analysis = item.get("analysis", "")
+ lines.append(analysis if analysis else "(待分析)")
+ lines.append("")
+
+ lines.append("### 分组建议")
+ lines.append("")
+ group = item.get("group", "")
+ action = item.get("action", "")
+ reason = item.get("group_reason", "")
+ if group and action:
+ lines.append(f"- 预设分组: {group}")
+ lines.append(f"- 建议动作: {action}")
+ lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}")
+ else:
+ lines.append("- (待分组)")
+ lines.append("")
+
+ error = item.get("error", "")
+ if error:
+ lines.append("### 异常")
+ lines.append("")
+ lines.append(f"- {error}")
+ lines.append("")
+
+ return "\n".join(lines).rstrip() + "\n"
+
+
+def main() -> int:
+ args = parse_args()
+ input_report = Path(args.input_report)
+ output_report = Path(args.output_report)
+
+ if not input_report.exists():
+ print(f"输入报告不存在: {input_report}", file=sys.stderr)
+ return 1
+
+ items = parse_report(input_report)
+ if not items:
+ print("输入报告未解析出任何UP条目", file=sys.stderr)
+ return 1
+
+ config = {
+ "VOLCENGINE_API_KEY": VOLCENGINE_API_KEY,
+ "VOLCENGINE_MODEL": VOLCENGINE_MODEL,
+ "VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL,
+ }
+ if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]):
+ inherited = load_api_config_from_script(Path(args.config_from))
+ if inherited:
+ config.update(inherited)
+
+ if args.force:
+ pending = [it for it in items if it.get("titles")]
+ # else:
+ # pending = [
+ # it for it in items
+ # if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS
+ # ]
+ else:
+ pending = [
+ it for it in items
+ if it.get("titles") and (
+ it.get("analysis", "").strip() in SKIP_MARKERS
+ or not it.get("group") # 没有分组也要重跑
+ )
+ ]
+
+ if not pending:
+ print("没有待分析条目,直接输出当前报告")
+ output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8")
+ return 0
+
+ index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
+ success_total = 0
+ failed_total = 0
+
+ batch_size = max(1, args.batch_size)
+ if args.run_all_batches:
+ total_batches = math.ceil(len(pending) / batch_size)
+ batch_indexes = list(range(1, total_batches + 1))
+ print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}")
+ else:
+ batch_indexes = [max(1, args.batch_index)]
+
+ workers = max(1, args.workers)
+ print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s")
+
+ for batch_index in batch_indexes:
+ start = (batch_index - 1) * batch_size
+ end = start + batch_size
+ batch = pending[start:end]
+ if not batch:
+ continue
+
+ print(
+ f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, "
+ f"本批{len(batch)}条, 待分析总数{len(pending)}"
+ )
+
+ success = 0
+ failed = 0
+ future_to_item: dict[Any, dict[str, Any]] = {}
+ with ThreadPoolExecutor(max_workers=workers) as executor:
+ for i, it in enumerate(batch, start=1):
+ print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})")
+ future = executor.submit(
+ summarize_one_up_with_retry,
+ it,
+ config,
+ max(1, args.max_retries),
+ float(args.request_timeout),
+ args.debug,
+ )
+ future_to_item[future] = it
+ if args.sleep_seconds > 0:
+ time.sleep(args.sleep_seconds)
+
+ done_count = 0
+ for future in as_completed(future_to_item):
+ done_count += 1
+ it = future_to_item[future]
+ idx = index_map.get(f"{it['mid']}::{it['name']}")
+ try:
+ ai_res = future.result()
+ if idx is not None:
+ items[idx]["analysis"] = ai_res["summary"]
+ items[idx]["group"] = ai_res["group"]
+ items[idx]["action"] = ai_res["action"]
+ items[idx]["group_reason"] = ai_res["reason"]
+ items[idx]["error"] = ""
+ success += 1
+ print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})")
+ except Exception as exc: # noqa: BLE001
+ if idx is not None:
+ items[idx]["error"] = str(exc)
+ failed += 1
+ print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})")
+ if args.debug:
+ print(f"[debug] 失败详情: {exc}")
+
+ success_total += success
+ failed_total += failed
+
+ step_note = (
+ f"第{batch_index}批完成: 成功{success}, 失败{failed}, "
+ f"本批{len(batch)}, 待分析总数{len(pending)}"
+ )
+ output_report.parent.mkdir(parents=True, exist_ok=True)
+ output_report.write_text(build_report(items, step_note), encoding="utf-8")
+ print(f"第{batch_index}批写入完成: {output_report}")
+
+ mode_text = "自动连续" if args.run_all_batches else "单批"
+ note = (
+ f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, "
+ f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}"
+ )
+ output_report.parent.mkdir(parents=True, exist_ok=True)
+ output_report.write_text(build_report(items, note), encoding="utf-8")
+ print(f"输出完成: {output_report}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/source/scripts/extract_group_info.py b/source/scripts/extract_group_info.py
new file mode 100644
index 0000000..9b81fc1
--- /dev/null
+++ b/source/scripts/extract_group_info.py
@@ -0,0 +1,101 @@
+import argparse
+import re
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="提取UP分组信息")
+ parser.add_argument(
+ "--input",
+ default="./source/19_53_no_titles.md",
+ help="输入报告路径",
+ )
+ parser.add_argument(
+ "--output",
+ help="输出报告路径(默认覆盖输入)",
+ )
+ return parser.parse_args()
+
+def main():
+ args = parse_args()
+ input_file = args.input
+ output_file = args.output or input_file
+
+ with open(input_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ lines = content.split('\n')
+ section_starts = []
+ for i, line in enumerate(lines):
+ if line.startswith('## '):
+ section_starts.append(i)
+
+ if len(section_starts) < 2:
+ print('No sections found')
+ return 1
+
+ header = '\n'.join(lines[:section_starts[0]])
+ sections = []
+
+ for idx in range(len(section_starts)):
+ start = section_starts[idx]
+ end = section_starts[idx + 1] if idx + 1 < len(section_starts) else len(lines)
+ section = '\n'.join(lines[start:end])
+ sections.append(section)
+
+ sections = sections[1:]
+
+ parsed = []
+ for sec in sections:
+ match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec)
+ if match:
+ num = int(match.group(1))
+ name = match.group(2)
+ mid = match.group(3)
+
+ group_m = re.search(r'- 预设分组: (.+)', sec)
+ action_m = re.search(r'- 建议动作: (.+)', sec)
+ reason_m = re.search(r'- 判断依据: (.+)', sec)
+ error_m = re.search(r'AI返回未知group: (.+)', sec)
+
+ group = group_m.group(1).strip() if group_m else ""
+ action = action_m.group(1).strip() if action_m else ""
+ reason = reason_m.group(1).strip() if reason_m else ""
+ error = error_m.group(1).strip() if error_m else ""
+
+ parsed.append({
+ 'num': num,
+ 'name': name,
+ 'mid': mid,
+ 'group': group,
+ 'action': action,
+ 'reason': reason,
+ 'error': error
+ })
+
+ parsed.sort(key=lambda x: (x['name'].casefold(), int(x['mid'])))
+
+ lines_out = [header, ""]
+
+ for p in parsed:
+ lines_out.append(f"## {p['num']}. {p['name']} (mid: {p['mid']})")
+ lines_out.append("")
+ if p['group']:
+ lines_out.append(f"- 预设分组: {p['group']}")
+ if p['action']:
+ lines_out.append(f"- 建议动作: {p['action']}")
+ if p['reason']:
+ lines_out.append(f"- 判断依据: {p['reason']}")
+ if p['error']:
+ lines_out.append(f"- 异常: {p['error']}")
+ lines_out.append("")
+
+ result = '\n'.join(lines_out)
+ result = re.sub(r'\n{3,}', '\n\n', result)
+
+ with open(output_file, 'w', encoding='utf-8') as f:
+ f.write(result)
+
+ print(f'Extracted {len(parsed)} sections')
+ return 0
+
+if __name__ == "__main__":
+ raise SystemExit(main())
\ No newline at end of file
diff --git a/source/scripts/extract_keep_follow_doc.py b/source/scripts/extract_keep_follow_doc.py
new file mode 100644
index 0000000..afe96f7
--- /dev/null
+++ b/source/scripts/extract_keep_follow_doc.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import re
+import time
+from pathlib import Path
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="提取非取关UP的AI分析与分组建议")
+ parser.add_argument(
+ "--input-report",
+ default="source/output/reports/2_up_analysis_full_auto.md",
+ help="输入分析报告路径",
+ )
+ parser.add_argument(
+ "--output-report",
+ default="source/output/reports/3_up_keep_follow_only.md",
+ help="输出保留关注报告路径",
+ )
+ return parser.parse_args()
+
+
+def main() -> int:
+ args = parse_args()
+ src = Path(args.input_report)
+ dst = Path(args.output_report)
+
+ if not src.exists():
+ print(f"来源文件不存在: {src}")
+ return 1
+
+ text = src.read_text(encoding="utf-8")
+ pattern = r"^##\s+\d+\.\s+(.+?)\s+\(mid:\s*(\d+)\)\s*$"
+ matches = list(re.finditer(pattern, text, re.MULTILINE))
+
+ items: list[tuple[str, str, str, str, str, str]] = []
+ for i, m in enumerate(matches):
+ start = m.start()
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
+ section = text[start:end]
+
+ name = m.group(1).strip()
+ mid = m.group(2).strip()
+
+ action_m = re.search(r"-\s*建议动作:\s*(.+)", section)
+ action = action_m.group(1).strip() if action_m else ""
+ # 反逻辑:没有"建议动作: 可以取关"就保留
+ if action == "可以取关":
+ continue
+
+ ai_m = re.search(r"###\s*AI分析\s*\n([\s\S]*?)(?=\n###\s|\Z)", section)
+ ai_text = ai_m.group(1).strip() if ai_m else ""
+
+ group_m = re.search(r"###\s*分组建议\s*\n([\s\S]*?)(?=\n###\s|\Z)", section)
+ group_text = group_m.group(1).strip() if group_m else ""
+
+ error_m = re.search(r"###\s*异常\s*\n([\s\S]*?)(?=\n###\s|\Z)", section)
+ error_text = error_m.group(1).strip() if error_m else ""
+
+ items.append((name, mid, ai_text, group_text, action, error_text))
+
+ # 按昵称首字母A-Z排序(同名时按mid升序)
+ items.sort(key=lambda x: (x[0].casefold(), int(x[1])))
+
+ lines = [
+ "# 保留关注UP主分析与分组建议",
+ "",
+ f"- 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
+ f"- 来源文件: {src.name}",
+ f"- 条目数: {len(items)}",
+ "",
+ ]
+
+ for idx, (name, mid, ai_text, group_text, action, error_text) in enumerate(items, 1):
+ lines.append(f"## {idx}. {name} (mid: {mid})")
+ lines.append("")
+
+ lines.append("### AI分析")
+ lines.append("")
+ lines.append(ai_text if ai_text else "(无)")
+ lines.append("")
+
+ lines.append("### 分组建议")
+ lines.append("")
+ lines.append(group_text if group_text else f"- 建议动作: {action if action else '(无)'}")
+ lines.append("")
+
+ if error_text:
+ lines.append("### 异常")
+ lines.append("")
+ lines.append(error_text)
+ lines.append("")
+
+ dst.parent.mkdir(parents=True, exist_ok=True)
+ dst.write_text("\n".join(lines), encoding="utf-8")
+ print(f"已生成: {dst}")
+ print(f"保留条目: {len(items)}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
\ No newline at end of file
diff --git a/source/scripts/extract_unfollow_list.py b/source/scripts/extract_unfollow_list.py
new file mode 100644
index 0000000..62fc57e
--- /dev/null
+++ b/source/scripts/extract_unfollow_list.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python3
+"""Extract UPs marked as "可以取关" and output their mids to CSV.
+
+Read an UP analysis report and extract all UPs with action "可以取关",
+then output their mids to a CSV file.
+"""
+
+from __future__ import annotations
+
+import argparse
+import csv
+import re
+import sys
+from pathlib import Path
+from typing import Any
+
+
+def parse_report(report_path: Path) -> list[dict[str, Any]]:
+ """解析Markdown格式的UP分析报告,返回UP列表"""
+ if not report_path.exists():
+ return []
+
+ text = report_path.read_text(encoding="utf-8")
+ items = []
+
+ # 按UP项分割(每个UP项以"## N. 名字 (mid: ...)"开头)
+ pattern = r"^## \d+\. (.+?)\s+\(mid:\s*(\d+)\)"
+ matches = list(re.finditer(pattern, text, re.MULTILINE))
+
+ for i, match in enumerate(matches):
+ start = match.start()
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
+ section = text[start:end]
+
+ name = match.group(1).strip()
+ mid = int(match.group(2))
+
+ # 提取建议动作
+ action_match = re.search(r"- 建议动作: (.+?)(?:\n|$)", section)
+ action = action_match.group(1).strip() if action_match else ""
+
+ items.append({
+ "mid": mid,
+ "name": name,
+ "action": action,
+ })
+
+ return items
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="从UP分析报告中提取可以取关的UP")
+ parser.add_argument(
+ "--input-report",
+ default="source/output/reports/2_up_analysis_full_auto.md",
+ help="输入报告路径",
+ )
+ parser.add_argument(
+ "--output-csv",
+ default="source/output/uids/4_unfollow_mids_list.txt",
+ help="输出文件路径",
+ )
+ parser.add_argument(
+ "--format",
+ choices=["csv", "mid-only", "json"],
+ default="mid-only",
+ help="输出格式:csv(mid,name), mid-only(仅mid逗号分隔), json(JSON格式)",
+ )
+ parser.add_argument(
+ "--with-names",
+ action="store_true",
+ help="在mid后添加UP名称(仅mid-only格式生效)",
+ )
+ parser.add_argument(
+ "--split-size",
+ type=int,
+ default=0,
+ help="可选:将mid-only结果按N个一组拆分多个文件,例如100",
+ )
+
+ args = parser.parse_args()
+
+ input_report = Path(args.input_report)
+ output_csv = Path(args.output_csv)
+
+ if not input_report.exists():
+ print(f"错误: 输入报告不存在: {input_report}", file=sys.stderr)
+ return 1
+
+ print(f"读取报告: {input_report}")
+ items = parse_report(input_report)
+
+ if not items:
+ print("未能从报告中解析任何UP", file=sys.stderr)
+ return 1
+
+ # 筛选可以取关的UP
+ unfollow_items = [it for it in items if it.get("action") == "可以取关"]
+
+ print(f"总 UP 数: {len(items)}")
+ print(f"可以取关: {len(unfollow_items)}")
+
+ if not unfollow_items:
+ print("没有可以取关的UP")
+ return 0
+
+ # 输出格式
+ if args.format == "csv":
+ # 标准CSV格式:mid, name
+ output_csv.parent.mkdir(parents=True, exist_ok=True)
+ with open(output_csv, "w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(f, fieldnames=["mid", "name"])
+ writer.writeheader()
+ for item in unfollow_items:
+ writer.writerow({"mid": item["mid"], "name": item["name"]})
+
+ print(f"\n✓ 已输出CSV格式到: {output_csv}")
+ print(f" 格式: mid,name")
+ print(f" 行数: {len(unfollow_items)}")
+
+ elif args.format == "mid-only":
+ # 仅mid,逗号分隔
+ mids = [str(it["mid"]) for it in unfollow_items]
+
+ if args.with_names:
+ # mid:name 格式
+ content = ",".join([f"{it['mid']}:{it['name']}" for it in unfollow_items])
+ print(f"\n✓ 已输出mid:name列表到: {output_csv}")
+ print(f" 格式: mid1:name1,mid2:name2,...")
+ else:
+ # 仅mid
+ content = ",".join(mids)
+ print(f"\n✓ 已输出mid列表到: {output_csv}")
+ print(f" 格式: mid1,mid2,mid3,...")
+
+ output_csv.parent.mkdir(parents=True, exist_ok=True)
+ output_csv.write_text(content, encoding="utf-8")
+ print(f" 数量: {len(mids)}")
+
+ split_size = max(0, int(args.split_size))
+ if split_size > 0:
+ groups = [mids[i:i + split_size] for i in range(0, len(mids), split_size)]
+ stem = output_csv.stem
+ suffix = output_csv.suffix or ".txt"
+ for i, group in enumerate(groups, start=1):
+ part_path = output_csv.with_name(f"{stem}_{i}{suffix}")
+ part_path.write_text(",".join(group), encoding="utf-8")
+ print(f" 已按每组{split_size}个拆分为{len(groups)}个文件")
+
+ elif args.format == "json":
+ # JSON格式
+ import json
+
+ data = [{"mid": it["mid"], "name": it["name"]} for it in unfollow_items]
+ output_csv.parent.mkdir(parents=True, exist_ok=True)
+ output_csv.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
+
+ print(f"\n✓ 已输出JSON格式到: {output_csv}")
+ print(f" 数量: {len(data)}")
+
+ # 显示前10个示例
+ if len(unfollow_items) > 0:
+ print(f"\n📋 示例(前10个):")
+ for item in unfollow_items[:10]:
+ print(f" - {item['mid']}: {item['name']}")
+
+ if len(unfollow_items) > 10:
+ print(f" ... 还有 {len(unfollow_items) - 10} 个")
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/source/scripts/remove_10content.py b/source/scripts/remove_10content.py
new file mode 100644
index 0000000..aae1058
--- /dev/null
+++ b/source/scripts/remove_10content.py
@@ -0,0 +1,67 @@
+import argparse
+import re
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="删除最近10条标题内容")
+ parser.add_argument(
+ "--input",
+ default="source/output/reports/2_up_analysis_full_auto.md",
+ help="输入报告路径",
+ )
+ parser.add_argument(
+ "--output",
+ help="输出报告路径(默认覆盖输入)",
+ )
+ return parser.parse_args()
+
+def main():
+ args = parse_args()
+ input_file = args.input
+ output_file = args.output or input_file
+
+ with open(input_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ lines = content.split('\n')
+ new_lines = []
+ i = 0
+ while i < len(lines):
+ line = lines[i]
+ new_lines.append(line)
+
+ if line.startswith('## '):
+ i += 1
+ while i < len(lines):
+ curr = lines[i]
+ if curr.startswith('## '):
+ break
+ if curr.startswith('### '):
+ if '最近10条标题' in curr:
+ i += 1
+ while i < len(lines) and lines[i].startswith(' - '):
+ i += 1
+ continue
+ else:
+ break
+ if curr.startswith('- ') and not curr.startswith(' - '):
+ i += 1
+ continue
+ if curr.startswith(' - '):
+ i += 1
+ continue
+ new_lines.append(curr)
+ i += 1
+ else:
+ i += 1
+
+ result = '\n'.join(new_lines)
+ result = re.sub(r'\n{3,}', '\n\n', result)
+
+ with open(output_file, 'w', encoding='utf-8') as f:
+ f.write(result)
+
+ print(f'Done: {output_file}')
+ return 0
+
+if __name__ == "__main__":
+ raise SystemExit(main())
\ No newline at end of file
diff --git a/source/scripts/run_pipeline.py b/source/scripts/run_pipeline.py
new file mode 100644
index 0000000..688bff5
--- /dev/null
+++ b/source/scripts/run_pipeline.py
@@ -0,0 +1,208 @@
+#!/usr/bin/env python3
+"""One-command pipeline: fetch titles -> batch analyze -> outputs.
+
+Pipeline outputs:
+1) source/output/reports/1_up_titles_report.md
+2) source/output/reports/2_up_analysis_full_auto.md
+3) source/output/reports/3_up_keep_follow_only.md
+4) source/output/uids/4_unfollow_mids_list.txt (+ split files)
+
+Pipeline steps:
+1) 抓取视频标题 (analyze_up_content.py)
+2) 分批AI分析 (batch_ai_summary_from_report.py)
+3) 生成保留关注报告 (extract_keep_follow_doc.py)
+4) 生成取关UID列表 (extract_unfollow_list.py)
+5) 按首字母排序 (sort_up_main.py)
+6) 提取分组信息 (extract_group_info.py)
+"""
+
+from __future__ import annotations
+
+import argparse
+import subprocess
+import sys
+from pathlib import Path
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="一键运行完整功能链")
+ parser.add_argument(
+ "--input-json",
+ default="source/resources/export_uids.json",
+ help="UP资源文件路径,默认: source/resources/export_uids.json",
+ )
+ parser.add_argument(
+ "--titles-report",
+ default="source/output/reports/1_up_titles_report.md",
+ help="标题抓取报告输出路径",
+ )
+ parser.add_argument(
+ "--analysis-report",
+ default="source/output/reports/2_up_analysis_full_auto.md",
+ help="分批分析报告输出路径",
+ )
+ parser.add_argument(
+ "--keep-report",
+ default="source/output/reports/3_up_keep_follow_only.md",
+ help="保留关注报告输出路径",
+ )
+ parser.add_argument(
+ "--unfollow-uids",
+ default="source/output/uids/4_unfollow_mids_list.txt",
+ help="取关UID输出路径",
+ )
+ parser.add_argument(
+ "--group_info",
+ default="source/output/uids/only_group_info.md",
+ help="分组信息输出路径",
+ )
+ parser.add_argument("--titles-per-up", type=int, default=10, help="每个UP抓取标题数量")
+ parser.add_argument("--batch-size", type=int, default=20, help="分批分析每批条数")
+ parser.add_argument("--workers", type=int, default=6, help="并发请求数")
+ parser.add_argument("--max-retries", type=int, default=2, help="单条分析重试次数")
+ parser.add_argument("--request-timeout", type=float, default=60.0, help="单次请求超时")
+ parser.add_argument("--split-size", type=int, default=100, help="取关UID拆分分组大小")
+ parser.add_argument("--sleep-seconds", type=float, default=0.0, help="任务间隔秒数")
+ parser.add_argument("--retry-times", type=int, default=3, help="抓取重试次数")
+ parser.add_argument("--fetch-mode", choices=["auto", "api", "html"], default="auto", help="标题抓取模式")
+ parser.add_argument("--only-tag", default="", help="可选:仅处理包含该标签的UP")
+ parser.add_argument("--max-ups", type=int, default=0, help="可选:限制处理UP数量")
+ parser.add_argument("--bili-cookie", default="", help="可选:运行时传入B站Cookie")
+ parser.add_argument("--skip-fetch", action="store_true", help="跳过抓取阶段,直接使用已有标题报告")
+ parser.add_argument("--skip-analyze", action="store_true", help="跳过分析阶段,直接做产物提取")
+ parser.add_argument("--skip-sort", action="store_true", help="跳过排序阶段")
+ parser.add_argument("--skip-group", action="store_true", help="跳过提取分组阶段")
+ parser.add_argument("--python", default=sys.executable, help="指定Python解释器")
+ return parser.parse_args()
+
+
+def run_cmd(cmd: list[str], title: str) -> None:
+ print(f"\n=== {title} ===")
+ print("$", " ".join(cmd))
+ subprocess.run(cmd, check=True)
+
+
+def main() -> int:
+ args = parse_args()
+
+ for p in [
+ Path(args.titles_report).parent,
+ Path(args.analysis_report).parent,
+ Path(args.keep_report).parent,
+ Path(args.unfollow_uids).parent,
+ ]:
+ p.mkdir(parents=True, exist_ok=True)
+
+ if not args.skip_fetch:
+ fetch_cmd = [
+ args.python,
+ "source/scripts/analyze_up_content.py",
+ "--input",
+ args.input_json,
+ "--output",
+ args.titles_report,
+ "--titles-per-up",
+ str(max(1, args.titles_per_up)),
+ "--retry-times",
+ str(max(1, args.retry_times)),
+ "--fetch-mode",
+ args.fetch_mode,
+ "--sleep-seconds",
+ str(max(0.0, args.sleep_seconds)),
+ "--skip-ai",
+ ]
+ if args.only_tag:
+ fetch_cmd += ["--only-tag", args.only_tag]
+ if args.max_ups > 0:
+ fetch_cmd += ["--max-ups", str(args.max_ups)]
+ if args.bili_cookie:
+ fetch_cmd += ["--bili-cookie", args.bili_cookie]
+
+ run_cmd(fetch_cmd, "步骤1/6 抓取视频标题")
+
+ if not args.skip_analyze:
+ analyze_cmd = [
+ args.python,
+ "source/scripts/batch_ai_summary_from_report.py",
+ "--input-report",
+ args.titles_report,
+ "--output-report",
+ args.analysis_report,
+ "--batch-size",
+ str(max(1, args.batch_size)),
+ "--run-all-batches",
+ "--workers",
+ str(max(1, args.workers)),
+ "--max-retries",
+ str(max(1, args.max_retries)),
+ "--request-timeout",
+ str(max(1.0, args.request_timeout)),
+ "--sleep-seconds",
+ str(max(0.0, args.sleep_seconds)),
+ ]
+ run_cmd(analyze_cmd, "步骤2/6 分批AI分析")
+
+ keep_cmd = [
+ args.python,
+ "source/scripts/extract_keep_follow_doc.py",
+ "--input-report",
+ args.analysis_report,
+ "--output-report",
+ args.keep_report,
+ ]
+ run_cmd(keep_cmd, "步骤3/6 生成保留关注报告")
+
+ uid_cmd = [
+ args.python,
+ "source/scripts/extract_unfollow_list.py",
+ "--input-report",
+ args.analysis_report,
+ "--output-csv",
+ args.unfollow_uids,
+ "--format",
+ "mid-only",
+ "--split-size",
+ str(max(0, args.split_size)),
+ ]
+ run_cmd(uid_cmd, "步骤4/6 生成取关UID列表")
+
+ sorted_report = "source/output/reports/5_sorted_up_analysis.md"
+ group_report = "source/output/reports/6_group_info.md"
+
+ if not args.skip_sort:
+ sort_cmd = [
+ args.python,
+ "source/scripts/sort_up_main.py",
+ "--input",
+ args.analysis_report,
+ "--output",
+ sorted_report,
+ ]
+ run_cmd(sort_cmd, "步骤5/6 按首字母排序")
+
+ if not args.skip_group:
+ input_for_group = sorted_report if not args.skip_sort else args.analysis_report
+ group_cmd = [
+ args.python,
+ "source/scripts/extract_group_info.py",
+ "--input",
+ input_for_group,
+ "--output",
+ group_report,
+ ]
+ run_cmd(group_cmd, "步骤6/6 提取分组信息")
+
+ print("\n流水线完成。")
+ print(f"- 1 标题报告: {args.titles_report}")
+ print(f"- 2 分析报告: {args.analysis_report}")
+ print(f"- 3 保留报告: {args.keep_report}")
+ print(f"- 4 取关UID: {args.unfollow_uids}")
+ if not args.skip_sort:
+ print(f"- 5 排序报告: {sorted_report}")
+ if not args.skip_group:
+ print(f"- 6 分组报告: {group_report}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/source/scripts/sort_up_main.py b/source/scripts/sort_up_main.py
new file mode 100644
index 0000000..d3ecc4e
--- /dev/null
+++ b/source/scripts/sort_up_main.py
@@ -0,0 +1,93 @@
+import argparse
+import re
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="对UP主按首字母排序")
+ parser.add_argument(
+ "--input",
+ default="source/output/reports/2_up_analysis_full_auto.md",
+ help="输入报告路径",
+ )
+ parser.add_argument(
+ "--output",
+ help="输出报告路径(默认覆盖输入)",
+ )
+ return parser.parse_args()
+
+def main():
+ args = parse_args()
+ input_file = args.input
+ output_file = args.output or input_file
+
+ with open(input_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ lines = content.split('\n')
+
+ header_lines = []
+ section_starts = []
+ for i, line in enumerate(lines):
+ if line.startswith('## '):
+ section_starts.append(i)
+
+ if len(section_starts) < 2:
+ print('No sections found')
+ return 1
+
+ header = '\n'.join(lines[:section_starts[0]])
+ sections_data = []
+
+ for idx in range(len(section_starts)):
+ start = section_starts[idx]
+ if idx + 1 < len(section_starts):
+ end = section_starts[idx + 1]
+ else:
+ end = len(lines)
+
+ section_lines = lines[start:end]
+ section_text = '\n'.join(section_lines)
+ sections_data.append(section_text)
+
+ sections_data = sections_data[1:]
+
+ parsed = []
+ for sec in sections_data:
+ match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec)
+ if match:
+ num = int(match.group(1))
+ name = match.group(2)
+ mid = match.group(3)
+ parsed.append({
+ 'num': num,
+ 'name': name,
+ 'mid': mid,
+ 'content': sec
+ })
+
+ def sort_key(item):
+ name = item['name']
+ first_char = name[0].lower() if name else ''
+ if first_char.isdigit():
+ return '0' + first_char
+ elif first_char.isalpha():
+ return '1' + first_char
+ else:
+ return '2' + first_char
+
+ parsed.sort(key=sort_key)
+
+ new_content = header + '\n'
+ for i, sec in enumerate(parsed):
+ new_content += sec['content'] + '\n'
+
+ with open(output_file, 'w', encoding='utf-8') as f:
+ f.write(new_content)
+
+ print(f'Sorted {len(parsed)} sections')
+ print('First 10:')
+ for s in parsed[:10]:
+ print(f' {s["name"]}')
+ return 0
+
+if __name__ == "__main__":
+ raise SystemExit(main())
\ No newline at end of file