599 lines
20 KiB
Python
599 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""Batch AI summary from existing UP markdown report.
|
||
|
||
Read an existing report (e.g. source/up_analysis_report.md),
|
||
extract each UP's title list, and generate AI summaries in batches.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import json
|
||
import math
|
||
import re
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib import request
|
||
|
||
# Fill your Volcengine Ark settings here.
|
||
VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
|
||
VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
|
||
VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
|
||
|
||
SKIP_MARKERS = {
|
||
"",
|
||
"测试模式已跳过AI分析",
|
||
"(待分析)",
|
||
}
|
||
|
||
# 预设分组及关键词规则(可自行扩展)。
|
||
PRESET_GROUPS: dict[str, list[str]] = {
|
||
"AAA_核心每日必读":[
|
||
"编程", "算法", "工程", "干货", "新闻", "趋势",
|
||
],
|
||
"AA_编程信息干货必留": [
|
||
"编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构",
|
||
],
|
||
"A_硬核知识保留": [
|
||
"科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度",
|
||
],
|
||
"B_技能学习保留": [
|
||
"英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作",
|
||
],
|
||
"C_资讯快餐观察": [
|
||
"热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势",
|
||
],
|
||
"D_娱乐消遣可取关": [
|
||
"搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣",
|
||
],
|
||
"E_营销带货谨慎": [
|
||
"好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活",
|
||
],
|
||
}
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结")
|
||
parser.add_argument(
|
||
"--input-report",
|
||
default="source/output/reports/up_titles_report.md",
|
||
help="已有标题报告路径,默认: source/output/reports/up_titles_report.md",
|
||
)
|
||
parser.add_argument(
|
||
"--output-report",
|
||
default="source/output/reports/up_analysis_full_auto.md",
|
||
help="输出报告路径,默认: source/output/reports/up_analysis_full_auto.md",
|
||
)
|
||
parser.add_argument(
|
||
"--batch-size",
|
||
type=int,
|
||
default=20,
|
||
help="每批处理数量,默认: 20",
|
||
)
|
||
parser.add_argument(
|
||
"--batch-index",
|
||
type=int,
|
||
default=1,
|
||
help="批次序号(从1开始),默认: 1",
|
||
)
|
||
parser.add_argument(
|
||
"--sleep-seconds",
|
||
type=float,
|
||
default=0.0,
|
||
help="提交任务间隔秒数,默认: 0(并发模式建议0)",
|
||
)
|
||
parser.add_argument(
|
||
"--workers",
|
||
type=int,
|
||
default=4,
|
||
help="并发请求数,默认: 4",
|
||
)
|
||
parser.add_argument(
|
||
"--max-retries",
|
||
type=int,
|
||
default=2,
|
||
help="单个UP分析最大重试次数,默认: 2",
|
||
)
|
||
parser.add_argument(
|
||
"--request-timeout",
|
||
type=float,
|
||
default=60.0,
|
||
help="单次AI请求超时秒数,默认: 60",
|
||
)
|
||
parser.add_argument(
|
||
"--force",
|
||
action="store_true",
|
||
help="强制覆盖已有AI分析(默认只处理待分析项)",
|
||
)
|
||
parser.add_argument(
|
||
"--debug",
|
||
action="store_true",
|
||
help="输出调试信息",
|
||
)
|
||
parser.add_argument(
|
||
"--config-from",
|
||
default="source/analyze_up_content.py",
|
||
help="自动读取API配置的脚本路径,默认: source/analyze_up_content.py",
|
||
)
|
||
parser.add_argument(
|
||
"--run-all-batches",
|
||
action="store_true",
|
||
help="自动连续跑完所有批次(忽略batch-index)",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def load_api_config_from_script(path: Path) -> dict[str, str]:
|
||
if not path.exists():
|
||
return {}
|
||
text = path.read_text(encoding="utf-8", errors="replace")
|
||
result: dict[str, str] = {}
|
||
for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"):
|
||
m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE)
|
||
if m:
|
||
result[key] = m.group(1).strip()
|
||
return result
|
||
|
||
|
||
def parse_report(path: Path) -> list[dict[str, Any]]:
|
||
lines = path.read_text(encoding="utf-8").splitlines()
|
||
|
||
items: list[dict[str, Any]] = []
|
||
current: dict[str, Any] | None = None
|
||
section = ""
|
||
|
||
for line in lines:
|
||
m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
|
||
if m:
|
||
if current is not None:
|
||
items.append(current)
|
||
mid = int(m.group(2))
|
||
current = {
|
||
"mid": mid,
|
||
"name": m.group(1).strip(),
|
||
"tag": [],
|
||
"url": f"https://space.bilibili.com/{mid}/video",
|
||
"titles": [],
|
||
"analysis": "",
|
||
"group": "",
|
||
"action": "",
|
||
"reason": "",
|
||
"error": "",
|
||
}
|
||
section = ""
|
||
continue
|
||
|
||
if current is None:
|
||
continue
|
||
|
||
if line.startswith("- 主页: "):
|
||
current["url"] = line.replace("- 主页: ", "", 1).strip()
|
||
continue
|
||
if line.startswith("- 标签: "):
|
||
raw = line.replace("- 标签: ", "", 1).strip()
|
||
current["tag"] = [] if raw in ("", "无") else [x.strip() for x in raw.split(",") if x.strip()]
|
||
continue
|
||
|
||
if line == "### 最近10条标题":
|
||
section = "titles"
|
||
continue
|
||
if line == "### AI分析":
|
||
section = "analysis"
|
||
continue
|
||
if line == "### 分组建议":
|
||
section = "group"
|
||
continue
|
||
if line == "### 异常":
|
||
section = "error"
|
||
continue
|
||
if line.startswith("### "):
|
||
section = ""
|
||
continue
|
||
|
||
if section == "titles" and line.startswith("- "):
|
||
text = line[2:].strip()
|
||
if text and text != "(未抓取到标题)":
|
||
current["titles"].append(text)
|
||
elif section == "analysis" and line.strip():
|
||
current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip()
|
||
elif section == "group":
|
||
if line.startswith("- 预设分组: "):
|
||
current["group"] = line.replace("- 预设分组: ", "", 1).strip()
|
||
elif line.startswith("- 建议动作: "):
|
||
current["action"] = line.replace("- 建议动作: ", "", 1).strip()
|
||
elif line.startswith("- 判断依据: "):
|
||
current["reason"] = line.replace("- 判断依据: ", "", 1).strip()
|
||
elif line.strip() == "(待分组)":
|
||
current["group"] = ""
|
||
current["action"] = ""
|
||
current["reason"] = ""
|
||
elif section == "error" and line.startswith("- "):
|
||
current["error"] = line[2:].strip()
|
||
|
||
if current is not None:
|
||
items.append(current)
|
||
|
||
return items
|
||
|
||
|
||
def call_volcengine_chat(
|
||
system_prompt: str,
|
||
user_prompt: str,
|
||
cfg: dict[str, str],
|
||
timeout: float,
|
||
) -> str:
|
||
api_key = cfg.get("VOLCENGINE_API_KEY", "").strip()
|
||
model = cfg.get("VOLCENGINE_MODEL", "").strip()
|
||
base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip()
|
||
|
||
if (not api_key) or ("在这里填" in api_key):
|
||
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
|
||
if (not model) or ("在这里填" in model):
|
||
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
|
||
if not base_url:
|
||
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
|
||
|
||
payload = {
|
||
"model": model,
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt},
|
||
],
|
||
"temperature": 0.4,
|
||
}
|
||
|
||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
req = request.Request(
|
||
f"{base_url.rstrip('/')}/chat/completions",
|
||
data=body,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {api_key}",
|
||
},
|
||
method="POST",
|
||
)
|
||
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
text = resp.read().decode("utf-8", errors="replace")
|
||
|
||
data = json.loads(text)
|
||
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||
if not isinstance(content, str) or not content.strip():
|
||
raise RuntimeError(f"AI响应异常: {text[:500]}")
|
||
return content.strip()
|
||
|
||
|
||
def summarize_one_up(
|
||
name: str,
|
||
mid: int,
|
||
titles: list[str],
|
||
tags: list[str],
|
||
cfg: dict[str, str],
|
||
timeout: float,
|
||
) -> dict[str, str]:
|
||
system_prompt = (
|
||
"你是内容定位与订阅决策助手。"
|
||
"你必须输出合法JSON,不要输出其它文本。"
|
||
)
|
||
joined_titles = "\n".join(f"- {t}" for t in titles)
|
||
joined_tags = "、".join(tags) if tags else "无"
|
||
rule_hint = heuristic_group_hint(titles, tags)
|
||
groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS)
|
||
|
||
user_prompt = f"""
|
||
请基于以下信息完成分组与总结。
|
||
|
||
UP主: {name}
|
||
mid: {mid}
|
||
标签: {joined_tags}
|
||
最近标题:
|
||
{joined_titles}
|
||
|
||
预设分组:
|
||
{groups_desc}
|
||
|
||
代码规则初判:
|
||
{rule_hint}
|
||
|
||
要求:
|
||
1) 输出JSON对象,字段严格为: summary, group, action, reason。
|
||
2) summary: 一段中文总结,50-100字。
|
||
3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。
|
||
4) action: 只能是"保留关注"或"可以取关"。敏感一点,只保留真正核心优质的up,其他都建议取关。
|
||
5) reason: 30-60字,解释为什么分到该组并给出该动作。
|
||
""".strip()
|
||
|
||
content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout)
|
||
return parse_ai_json(content)
|
||
|
||
|
||
def parse_ai_json(content: str) -> dict[str, str]:
|
||
text = content.strip()
|
||
if text.startswith("```"):
|
||
text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
|
||
text = re.sub(r"\n?```$", "", text).strip()
|
||
m = re.search(r"\{.*\}", text, flags=re.DOTALL)
|
||
if m:
|
||
text = m.group(0)
|
||
data = json.loads(text)
|
||
summary = str(data.get("summary", "")).strip()
|
||
group = str(data.get("group", "")).strip()
|
||
action = str(data.get("action", "")).strip()
|
||
reason = str(data.get("reason", "")).strip()
|
||
if not summary:
|
||
raise RuntimeError("AI返回缺少summary")
|
||
if group not in PRESET_GROUPS:
|
||
raise RuntimeError(f"AI返回未知group: {group}")
|
||
if action not in ("保留关注", "可以取关"):
|
||
raise RuntimeError(f"AI返回未知action: {action}")
|
||
if not reason:
|
||
reason = "基于标题内容与更新风格综合判断。"
|
||
return {
|
||
"summary": summary,
|
||
"group": group,
|
||
"action": action,
|
||
"reason": reason,
|
||
}
|
||
|
||
|
||
def heuristic_group_hint(titles: list[str], tags: list[str]) -> str:
|
||
text = "\n".join(titles) + "\n" + " ".join(tags)
|
||
score: dict[str, int] = {k: 0 for k in PRESET_GROUPS}
|
||
lower_text = text.lower()
|
||
for group, words in PRESET_GROUPS.items():
|
||
for w in words:
|
||
w_lower = w.lower()
|
||
if w_lower in lower_text:
|
||
score[group] += 1
|
||
ranked = sorted(score.items(), key=lambda x: x[1], reverse=True)
|
||
best_group, best_score = ranked[0]
|
||
if best_score <= 0:
|
||
return "未命中关键词,倾向按内容专业度与稳定性判断。"
|
||
top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3])
|
||
return f"关键词命中最高组={best_group}(score={best_score}),参考分布: {top3}"
|
||
|
||
|
||
def summarize_one_up_with_retry(
|
||
item: dict[str, Any],
|
||
cfg: dict[str, str],
|
||
max_retries: int,
|
||
timeout: float,
|
||
debug: bool,
|
||
) -> dict[str, str]:
|
||
last_exc: Exception | None = None
|
||
total_try = max(1, max_retries)
|
||
for attempt in range(1, total_try + 1):
|
||
try:
|
||
return summarize_one_up(
|
||
item["name"],
|
||
item["mid"],
|
||
item.get("titles", []),
|
||
item.get("tag", []),
|
||
cfg,
|
||
timeout=timeout,
|
||
)
|
||
except Exception as exc: # noqa: BLE001
|
||
last_exc = exc
|
||
if debug:
|
||
print(f"[debug] {item['name']} 第{attempt}次失败: {exc}")
|
||
if attempt < total_try:
|
||
time.sleep(min(2.0, 0.5 * attempt))
|
||
raise RuntimeError(str(last_exc) if last_exc else "未知错误")
|
||
|
||
|
||
def build_report(items: list[dict[str, Any]], batch_note: str) -> str:
|
||
now = time.strftime("%Y-%m-%d %H:%M:%S")
|
||
lines: list[str] = [
|
||
"# UP主内容分析报告(分批AI总结)",
|
||
"",
|
||
f"- 生成时间: {now}",
|
||
f"- 分析数量: {len(items)}",
|
||
f"- 处理说明: {batch_note}",
|
||
"",
|
||
]
|
||
|
||
group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS}
|
||
action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0}
|
||
for item in items:
|
||
g = item.get("group", "")
|
||
a = item.get("action", "")
|
||
if g in group_stats:
|
||
group_stats[g] += 1
|
||
if a in action_stats:
|
||
action_stats[a] += 1
|
||
|
||
lines.append("## 分组统计")
|
||
lines.append("")
|
||
for g, c in group_stats.items():
|
||
lines.append(f"- {g}: {c}")
|
||
lines.append(f"- 保留关注: {action_stats['保留关注']}")
|
||
lines.append(f"- 可以取关: {action_stats['可以取关']}")
|
||
lines.append("")
|
||
|
||
for idx, item in enumerate(items, start=1):
|
||
lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
|
||
lines.append("")
|
||
lines.append(f"- 主页: {item['url']}")
|
||
tags = item.get("tag", [])
|
||
lines.append(f"- 标签: {', '.join(tags) if tags else '无'}")
|
||
lines.append("")
|
||
lines.append("### 最近10条标题")
|
||
lines.append("")
|
||
titles = item.get("titles", [])
|
||
if titles:
|
||
for t in titles:
|
||
lines.append(f"- {t}")
|
||
else:
|
||
lines.append("- (未抓取到标题)")
|
||
lines.append("")
|
||
|
||
lines.append("### AI分析")
|
||
lines.append("")
|
||
analysis = item.get("analysis", "")
|
||
lines.append(analysis if analysis else "(待分析)")
|
||
lines.append("")
|
||
|
||
lines.append("### 分组建议")
|
||
lines.append("")
|
||
group = item.get("group", "")
|
||
action = item.get("action", "")
|
||
reason = item.get("group_reason", "")
|
||
if group and action:
|
||
lines.append(f"- 预设分组: {group}")
|
||
lines.append(f"- 建议动作: {action}")
|
||
lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}")
|
||
else:
|
||
lines.append("- (待分组)")
|
||
lines.append("")
|
||
|
||
error = item.get("error", "")
|
||
if error:
|
||
lines.append("### 异常")
|
||
lines.append("")
|
||
lines.append(f"- {error}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines).rstrip() + "\n"
|
||
|
||
|
||
def main() -> int:
|
||
args = parse_args()
|
||
input_report = Path(args.input_report)
|
||
output_report = Path(args.output_report)
|
||
|
||
if not input_report.exists():
|
||
print(f"输入报告不存在: {input_report}", file=sys.stderr)
|
||
return 1
|
||
|
||
items = parse_report(input_report)
|
||
if not items:
|
||
print("输入报告未解析出任何UP条目", file=sys.stderr)
|
||
return 1
|
||
|
||
config = {
|
||
"VOLCENGINE_API_KEY": VOLCENGINE_API_KEY,
|
||
"VOLCENGINE_MODEL": VOLCENGINE_MODEL,
|
||
"VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL,
|
||
}
|
||
if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]):
|
||
inherited = load_api_config_from_script(Path(args.config_from))
|
||
if inherited:
|
||
config.update(inherited)
|
||
|
||
if args.force:
|
||
pending = [it for it in items if it.get("titles")]
|
||
# else:
|
||
# pending = [
|
||
# it for it in items
|
||
# if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS
|
||
# ]
|
||
else:
|
||
pending = [
|
||
it for it in items
|
||
if it.get("titles") and (
|
||
it.get("analysis", "").strip() in SKIP_MARKERS
|
||
or not it.get("group") # 没有分组也要重跑
|
||
)
|
||
]
|
||
|
||
if not pending:
|
||
print("没有待分析条目,直接输出当前报告")
|
||
output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8")
|
||
return 0
|
||
|
||
index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
|
||
success_total = 0
|
||
failed_total = 0
|
||
|
||
batch_size = max(1, args.batch_size)
|
||
if args.run_all_batches:
|
||
total_batches = math.ceil(len(pending) / batch_size)
|
||
batch_indexes = list(range(1, total_batches + 1))
|
||
print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}")
|
||
else:
|
||
batch_indexes = [max(1, args.batch_index)]
|
||
|
||
workers = max(1, args.workers)
|
||
print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s")
|
||
|
||
for batch_index in batch_indexes:
|
||
start = (batch_index - 1) * batch_size
|
||
end = start + batch_size
|
||
batch = pending[start:end]
|
||
if not batch:
|
||
continue
|
||
|
||
print(
|
||
f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, "
|
||
f"本批{len(batch)}条, 待分析总数{len(pending)}"
|
||
)
|
||
|
||
success = 0
|
||
failed = 0
|
||
future_to_item: dict[Any, dict[str, Any]] = {}
|
||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||
for i, it in enumerate(batch, start=1):
|
||
print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})")
|
||
future = executor.submit(
|
||
summarize_one_up_with_retry,
|
||
it,
|
||
config,
|
||
max(1, args.max_retries),
|
||
float(args.request_timeout),
|
||
args.debug,
|
||
)
|
||
future_to_item[future] = it
|
||
if args.sleep_seconds > 0:
|
||
time.sleep(args.sleep_seconds)
|
||
|
||
done_count = 0
|
||
for future in as_completed(future_to_item):
|
||
done_count += 1
|
||
it = future_to_item[future]
|
||
idx = index_map.get(f"{it['mid']}::{it['name']}")
|
||
try:
|
||
ai_res = future.result()
|
||
if idx is not None:
|
||
items[idx]["analysis"] = ai_res["summary"]
|
||
items[idx]["group"] = ai_res["group"]
|
||
items[idx]["action"] = ai_res["action"]
|
||
items[idx]["group_reason"] = ai_res["reason"]
|
||
items[idx]["error"] = ""
|
||
success += 1
|
||
print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})")
|
||
except Exception as exc: # noqa: BLE001
|
||
if idx is not None:
|
||
items[idx]["error"] = str(exc)
|
||
failed += 1
|
||
print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})")
|
||
if args.debug:
|
||
print(f"[debug] 失败详情: {exc}")
|
||
|
||
success_total += success
|
||
failed_total += failed
|
||
|
||
step_note = (
|
||
f"第{batch_index}批完成: 成功{success}, 失败{failed}, "
|
||
f"本批{len(batch)}, 待分析总数{len(pending)}"
|
||
)
|
||
output_report.parent.mkdir(parents=True, exist_ok=True)
|
||
output_report.write_text(build_report(items, step_note), encoding="utf-8")
|
||
print(f"第{batch_index}批写入完成: {output_report}")
|
||
|
||
mode_text = "自动连续" if args.run_all_batches else "单批"
|
||
note = (
|
||
f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, "
|
||
f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}"
|
||
)
|
||
output_report.parent.mkdir(parents=True, exist_ok=True)
|
||
output_report.write_text(build_report(items, note), encoding="utf-8")
|
||
print(f"输出完成: {output_report}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|