#!/usr/bin/env python3 """Batch AI summary from existing UP markdown report. Read an existing report (e.g. source/up_analysis_report.md), extract each UP's title list, and generate AI summaries in batches. """ from __future__ import annotations import argparse from concurrent.futures import ThreadPoolExecutor, as_completed import json import math import re import sys import time from pathlib import Path from typing import Any from urllib import request # Fill your Volcengine Ark settings here. VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" VOLCENGINE_MODEL = "deepseek-v3-1-terminus" VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" SKIP_MARKERS = { "", "测试模式已跳过AI分析", "(待分析)", } # 预设分组及关键词规则(可自行扩展)。 PRESET_GROUPS: dict[str, list[str]] = { "AAA_核心每日必读":[ "编程", "算法", "工程", "干货", "新闻", "趋势", ], "AA_编程信息干货必留": [ "编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构", ], "A_硬核知识保留": [ "科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度", ], "B_技能学习保留": [ "英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作", ], "C_资讯快餐观察": [ "热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势", ], "D_娱乐消遣可取关": [ "搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣", ], "E_营销带货谨慎": [ "好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活", ], } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结") parser.add_argument( "--input-report", default="source/output/reports/up_titles_report.md", help="已有标题报告路径,默认: source/output/reports/up_titles_report.md", ) parser.add_argument( "--output-report", default="source/output/reports/up_analysis_full_auto.md", help="输出报告路径,默认: source/output/reports/up_analysis_full_auto.md", ) parser.add_argument( "--batch-size", type=int, default=20, help="每批处理数量,默认: 20", ) parser.add_argument( "--batch-index", type=int, default=1, help="批次序号(从1开始),默认: 1", ) parser.add_argument( "--sleep-seconds", type=float, default=0.0, help="提交任务间隔秒数,默认: 0(并发模式建议0)", ) parser.add_argument( "--workers", type=int, default=4, help="并发请求数,默认: 4", ) parser.add_argument( "--max-retries", type=int, default=2, help="单个UP分析最大重试次数,默认: 2", ) parser.add_argument( "--request-timeout", type=float, default=60.0, help="单次AI请求超时秒数,默认: 60", ) parser.add_argument( "--force", action="store_true", help="强制覆盖已有AI分析(默认只处理待分析项)", ) parser.add_argument( "--debug", action="store_true", help="输出调试信息", ) parser.add_argument( "--config-from", default="source/analyze_up_content.py", help="自动读取API配置的脚本路径,默认: source/analyze_up_content.py", ) parser.add_argument( "--run-all-batches", action="store_true", help="自动连续跑完所有批次(忽略batch-index)", ) return parser.parse_args() def load_api_config_from_script(path: Path) -> dict[str, str]: if not path.exists(): return {} text = path.read_text(encoding="utf-8", errors="replace") result: dict[str, str] = {} for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"): m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE) if m: result[key] = m.group(1).strip() return result def parse_report(path: Path) -> list[dict[str, Any]]: lines = path.read_text(encoding="utf-8").splitlines() items: list[dict[str, Any]] = [] current: dict[str, Any] | None = None section = "" for line in lines: m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) if m: if current is not None: items.append(current) mid = int(m.group(2)) current = { "mid": mid, "name": m.group(1).strip(), "tag": [], "url": f"https://space.bilibili.com/{mid}/video", "titles": [], "analysis": "", "group": "", "action": "", "reason": "", "error": "", } section = "" continue if current is None: continue if line.startswith("- 主页: "): current["url"] = line.replace("- 主页: ", "", 1).strip() continue if line.startswith("- 标签: "): raw = line.replace("- 标签: ", "", 1).strip() current["tag"] = [] if raw in ("", "无") else [x.strip() for x in raw.split(",") if x.strip()] continue if line == "### 最近10条标题": section = "titles" continue if line == "### AI分析": section = "analysis" continue if line == "### 分组建议": section = "group" continue if line == "### 异常": section = "error" continue if line.startswith("### "): section = "" continue if section == "titles" and line.startswith("- "): text = line[2:].strip() if text and text != "(未抓取到标题)": current["titles"].append(text) elif section == "analysis" and line.strip(): current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip() elif section == "group": if line.startswith("- 预设分组: "): current["group"] = line.replace("- 预设分组: ", "", 1).strip() elif line.startswith("- 建议动作: "): current["action"] = line.replace("- 建议动作: ", "", 1).strip() elif line.startswith("- 判断依据: "): current["reason"] = line.replace("- 判断依据: ", "", 1).strip() elif line.strip() == "(待分组)": current["group"] = "" current["action"] = "" current["reason"] = "" elif section == "error" and line.startswith("- "): current["error"] = line[2:].strip() if current is not None: items.append(current) return items def call_volcengine_chat( system_prompt: str, user_prompt: str, cfg: dict[str, str], timeout: float, ) -> str: api_key = cfg.get("VOLCENGINE_API_KEY", "").strip() model = cfg.get("VOLCENGINE_MODEL", "").strip() base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip() if (not api_key) or ("在这里填" in api_key): raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") if (not model) or ("在这里填" in model): raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") if not base_url: raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") payload = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": 0.4, } body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = request.Request( f"{base_url.rstrip('/')}/chat/completions", data=body, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", }, method="POST", ) with request.urlopen(req, timeout=timeout) as resp: text = resp.read().decode("utf-8", errors="replace") data = json.loads(text) content = data.get("choices", [{}])[0].get("message", {}).get("content", "") if not isinstance(content, str) or not content.strip(): raise RuntimeError(f"AI响应异常: {text[:500]}") return content.strip() def summarize_one_up( name: str, mid: int, titles: list[str], tags: list[str], cfg: dict[str, str], timeout: float, ) -> dict[str, str]: system_prompt = ( "你是内容定位与订阅决策助手。" "你必须输出合法JSON,不要输出其它文本。" ) joined_titles = "\n".join(f"- {t}" for t in titles) joined_tags = "、".join(tags) if tags else "无" rule_hint = heuristic_group_hint(titles, tags) groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS) user_prompt = f""" 请基于以下信息完成分组与总结。 UP主: {name} mid: {mid} 标签: {joined_tags} 最近标题: {joined_titles} 预设分组: {groups_desc} 代码规则初判: {rule_hint} 要求: 1) 输出JSON对象,字段严格为: summary, group, action, reason。 2) summary: 一段中文总结,50-100字。 3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。 4) action: 只能是"保留关注"或"可以取关"。敏感一点,只保留真正核心优质的up,其他都建议取关。 5) reason: 30-60字,解释为什么分到该组并给出该动作。 """.strip() content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout) return parse_ai_json(content) def parse_ai_json(content: str) -> dict[str, str]: text = content.strip() if text.startswith("```"): text = re.sub(r"^```[a-zA-Z]*\n?", "", text) text = re.sub(r"\n?```$", "", text).strip() m = re.search(r"\{.*\}", text, flags=re.DOTALL) if m: text = m.group(0) data = json.loads(text) summary = str(data.get("summary", "")).strip() group = str(data.get("group", "")).strip() action = str(data.get("action", "")).strip() reason = str(data.get("reason", "")).strip() if not summary: raise RuntimeError("AI返回缺少summary") if group not in PRESET_GROUPS: raise RuntimeError(f"AI返回未知group: {group}") if action not in ("保留关注", "可以取关"): raise RuntimeError(f"AI返回未知action: {action}") if not reason: reason = "基于标题内容与更新风格综合判断。" return { "summary": summary, "group": group, "action": action, "reason": reason, } def heuristic_group_hint(titles: list[str], tags: list[str]) -> str: text = "\n".join(titles) + "\n" + " ".join(tags) score: dict[str, int] = {k: 0 for k in PRESET_GROUPS} lower_text = text.lower() for group, words in PRESET_GROUPS.items(): for w in words: w_lower = w.lower() if w_lower in lower_text: score[group] += 1 ranked = sorted(score.items(), key=lambda x: x[1], reverse=True) best_group, best_score = ranked[0] if best_score <= 0: return "未命中关键词,倾向按内容专业度与稳定性判断。" top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3]) return f"关键词命中最高组={best_group}(score={best_score}),参考分布: {top3}" def summarize_one_up_with_retry( item: dict[str, Any], cfg: dict[str, str], max_retries: int, timeout: float, debug: bool, ) -> dict[str, str]: last_exc: Exception | None = None total_try = max(1, max_retries) for attempt in range(1, total_try + 1): try: return summarize_one_up( item["name"], item["mid"], item.get("titles", []), item.get("tag", []), cfg, timeout=timeout, ) except Exception as exc: # noqa: BLE001 last_exc = exc if debug: print(f"[debug] {item['name']} 第{attempt}次失败: {exc}") if attempt < total_try: time.sleep(min(2.0, 0.5 * attempt)) raise RuntimeError(str(last_exc) if last_exc else "未知错误") def build_report(items: list[dict[str, Any]], batch_note: str) -> str: now = time.strftime("%Y-%m-%d %H:%M:%S") lines: list[str] = [ "# UP主内容分析报告(分批AI总结)", "", f"- 生成时间: {now}", f"- 分析数量: {len(items)}", f"- 处理说明: {batch_note}", "", ] group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS} action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0} for item in items: g = item.get("group", "") a = item.get("action", "") if g in group_stats: group_stats[g] += 1 if a in action_stats: action_stats[a] += 1 lines.append("## 分组统计") lines.append("") for g, c in group_stats.items(): lines.append(f"- {g}: {c}") lines.append(f"- 保留关注: {action_stats['保留关注']}") lines.append(f"- 可以取关: {action_stats['可以取关']}") lines.append("") for idx, item in enumerate(items, start=1): lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") lines.append("") lines.append(f"- 主页: {item['url']}") tags = item.get("tag", []) lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") lines.append("") lines.append("### 最近10条标题") lines.append("") titles = item.get("titles", []) if titles: for t in titles: lines.append(f"- {t}") else: lines.append("- (未抓取到标题)") lines.append("") lines.append("### AI分析") lines.append("") analysis = item.get("analysis", "") lines.append(analysis if analysis else "(待分析)") lines.append("") lines.append("### 分组建议") lines.append("") group = item.get("group", "") action = item.get("action", "") reason = item.get("group_reason", "") if group and action: lines.append(f"- 预设分组: {group}") lines.append(f"- 建议动作: {action}") lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}") else: lines.append("- (待分组)") lines.append("") error = item.get("error", "") if error: lines.append("### 异常") lines.append("") lines.append(f"- {error}") lines.append("") return "\n".join(lines).rstrip() + "\n" def main() -> int: args = parse_args() input_report = Path(args.input_report) output_report = Path(args.output_report) if not input_report.exists(): print(f"输入报告不存在: {input_report}", file=sys.stderr) return 1 items = parse_report(input_report) if not items: print("输入报告未解析出任何UP条目", file=sys.stderr) return 1 config = { "VOLCENGINE_API_KEY": VOLCENGINE_API_KEY, "VOLCENGINE_MODEL": VOLCENGINE_MODEL, "VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL, } if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]): inherited = load_api_config_from_script(Path(args.config_from)) if inherited: config.update(inherited) if args.force: pending = [it for it in items if it.get("titles")] # else: # pending = [ # it for it in items # if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS # ] else: pending = [ it for it in items if it.get("titles") and ( it.get("analysis", "").strip() in SKIP_MARKERS or not it.get("group") # 没有分组也要重跑 ) ] if not pending: print("没有待分析条目,直接输出当前报告") output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8") return 0 index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} success_total = 0 failed_total = 0 batch_size = max(1, args.batch_size) if args.run_all_batches: total_batches = math.ceil(len(pending) / batch_size) batch_indexes = list(range(1, total_batches + 1)) print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}") else: batch_indexes = [max(1, args.batch_index)] workers = max(1, args.workers) print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s") for batch_index in batch_indexes: start = (batch_index - 1) * batch_size end = start + batch_size batch = pending[start:end] if not batch: continue print( f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, " f"本批{len(batch)}条, 待分析总数{len(pending)}" ) success = 0 failed = 0 future_to_item: dict[Any, dict[str, Any]] = {} with ThreadPoolExecutor(max_workers=workers) as executor: for i, it in enumerate(batch, start=1): print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})") future = executor.submit( summarize_one_up_with_retry, it, config, max(1, args.max_retries), float(args.request_timeout), args.debug, ) future_to_item[future] = it if args.sleep_seconds > 0: time.sleep(args.sleep_seconds) done_count = 0 for future in as_completed(future_to_item): done_count += 1 it = future_to_item[future] idx = index_map.get(f"{it['mid']}::{it['name']}") try: ai_res = future.result() if idx is not None: items[idx]["analysis"] = ai_res["summary"] items[idx]["group"] = ai_res["group"] items[idx]["action"] = ai_res["action"] items[idx]["group_reason"] = ai_res["reason"] items[idx]["error"] = "" success += 1 print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})") except Exception as exc: # noqa: BLE001 if idx is not None: items[idx]["error"] = str(exc) failed += 1 print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})") if args.debug: print(f"[debug] 失败详情: {exc}") success_total += success failed_total += failed step_note = ( f"第{batch_index}批完成: 成功{success}, 失败{failed}, " f"本批{len(batch)}, 待分析总数{len(pending)}" ) output_report.parent.mkdir(parents=True, exist_ok=True) output_report.write_text(build_report(items, step_note), encoding="utf-8") print(f"第{batch_index}批写入完成: {output_report}") mode_text = "自动连续" if args.run_all_batches else "单批" note = ( f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, " f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}" ) output_report.parent.mkdir(parents=True, exist_ok=True) output_report.write_text(build_report(items, note), encoding="utf-8") print(f"输出完成: {output_report}") return 0 if __name__ == "__main__": raise SystemExit(main())