691 lines
24 KiB
Python
691 lines
24 KiB
Python
#!/usr/bin/env python3
|
||
"""Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API.
|
||
|
||
Input JSON format (list of objects):
|
||
[
|
||
{"mid": 12345, "name": "UP Name", "tag": ["准备取关"]}
|
||
]
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import html
|
||
import json
|
||
import random
|
||
import re
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib import error, parse, request
|
||
|
||
|
||
BILIBILI_API = "https://api.bilibili.com/x/space/arc/search"
|
||
BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search"
|
||
BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav"
|
||
# 可选:如果仍频繁触发412,可填浏览器里复制的Cookie字符串。
|
||
BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696"
|
||
RUNTIME_BILIBILI_COOKIE = ""
|
||
DEFAULT_USER_AGENT = (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/124.0.0.0 Safari/537.36"
|
||
)
|
||
MIXIN_KEY_ENC_TAB = [
|
||
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35,
|
||
27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13,
|
||
37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4,
|
||
22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52,
|
||
]
|
||
|
||
# 在这里直接填写火山引擎配置。
|
||
VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
|
||
VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
|
||
VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
|
||
|
||
|
||
@dataclass
|
||
class UpItem:
|
||
mid: int
|
||
name: str
|
||
tag: list[str]
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(
|
||
description="抓取 UP 前10个视频标题,并调用火山引擎 API 生成分析报告"
|
||
)
|
||
parser.add_argument(
|
||
"--input",
|
||
default="./source/resources/export_uids.json",
|
||
help="输入 JSON 文件路径,默认: ./source/resources/export_uids.json",
|
||
)
|
||
parser.add_argument(
|
||
"--output",
|
||
default="./source/output/reports/up_titles_report.md",
|
||
help="输出 Markdown 报告路径,默认: ./source/output/reports/up_titles_report.md",
|
||
)
|
||
parser.add_argument(
|
||
"--titles-per-up",
|
||
type=int,
|
||
default=10,
|
||
help="每个 UP 抓取的视频标题数量,默认: 10",
|
||
)
|
||
parser.add_argument(
|
||
"--max-ups",
|
||
type=int,
|
||
default=0,
|
||
help="最多处理多少个 UP,0 表示全部",
|
||
)
|
||
parser.add_argument(
|
||
"--only-tag",
|
||
default="",
|
||
help="只处理包含该标签的 UP,例如: 准备取关;留空表示不过滤",
|
||
)
|
||
parser.add_argument(
|
||
"--sleep-seconds",
|
||
type=float,
|
||
default=0.8,
|
||
help="每个 UP 抓取后的等待秒数,默认: 0.8",
|
||
)
|
||
parser.add_argument(
|
||
"--retry-times",
|
||
type=int,
|
||
default=3,
|
||
help="抓取重试次数(遇到412/-799时),默认: 3",
|
||
)
|
||
parser.add_argument(
|
||
"--test-mid",
|
||
type=int,
|
||
default=0,
|
||
help="测试模式:只抓取这个mid,不读取输入文件",
|
||
)
|
||
parser.add_argument(
|
||
"--test-name",
|
||
default="TEST_UP",
|
||
help="测试模式下显示名称,默认: TEST_UP",
|
||
)
|
||
parser.add_argument(
|
||
"--skip-ai",
|
||
action="store_true",
|
||
help="只测试抓取,不调用AI分析",
|
||
)
|
||
parser.add_argument(
|
||
"--debug",
|
||
action="store_true",
|
||
help="输出抓取调试信息",
|
||
)
|
||
parser.add_argument(
|
||
"--bili-cookie",
|
||
default="",
|
||
help="可选:运行时传入B站Cookie,优先级高于脚本内BILIBILI_COOKIE",
|
||
)
|
||
parser.add_argument(
|
||
"--fetch-mode",
|
||
choices=["auto", "api", "html"],
|
||
default="auto",
|
||
help="抓取模式: auto(先API后HTML)/api/html,默认: auto",
|
||
)
|
||
parser.add_argument(
|
||
"--analyze-from-report",
|
||
default="",
|
||
help="从已有报告读取标题并仅执行AI分析,例如: source/up_analysis_report.md",
|
||
)
|
||
parser.add_argument(
|
||
"--batch-size",
|
||
type=int,
|
||
default=30,
|
||
help="分批分析时每批数量,默认: 30",
|
||
)
|
||
parser.add_argument(
|
||
"--batch-index",
|
||
type=int,
|
||
default=1,
|
||
help="分批分析批次序号(从1开始),默认: 1",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def parse_report_items(report_path: Path) -> list[dict[str, Any]]:
|
||
lines = report_path.read_text(encoding="utf-8").splitlines()
|
||
items: list[dict[str, Any]] = []
|
||
current: dict[str, Any] | None = None
|
||
section = ""
|
||
|
||
for line in lines:
|
||
m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
|
||
if m:
|
||
if current is not None:
|
||
items.append(current)
|
||
current = {
|
||
"mid": int(m.group(2)),
|
||
"name": m.group(1).strip(),
|
||
"tag": [],
|
||
"url": f"https://space.bilibili.com/{int(m.group(2))}/video",
|
||
"titles": [],
|
||
"analysis": "",
|
||
"error": "",
|
||
}
|
||
section = ""
|
||
continue
|
||
|
||
if current is None:
|
||
continue
|
||
|
||
if line.startswith("- 主页: "):
|
||
current["url"] = line.replace("- 主页: ", "", 1).strip()
|
||
continue
|
||
if line.startswith("- 标签: "):
|
||
raw_tag = line.replace("- 标签: ", "", 1).strip()
|
||
current["tag"] = [] if raw_tag in ("", "无") else [x.strip() for x in raw_tag.split(",") if x.strip()]
|
||
continue
|
||
if line == "### 最近10条标题":
|
||
section = "titles"
|
||
continue
|
||
if line == "### AI分析":
|
||
section = "analysis"
|
||
continue
|
||
if line == "### 异常":
|
||
section = "error"
|
||
continue
|
||
if line.startswith("### "):
|
||
section = ""
|
||
continue
|
||
|
||
if section == "titles" and line.startswith("- "):
|
||
t = line[2:].strip()
|
||
if t and t != "(未抓取到标题)":
|
||
current["titles"].append(t)
|
||
elif section == "analysis":
|
||
if line.strip():
|
||
if current["analysis"]:
|
||
current["analysis"] += "\n" + line.strip()
|
||
else:
|
||
current["analysis"] = line.strip()
|
||
elif section == "error" and line.startswith("- "):
|
||
current["error"] = line[2:].strip()
|
||
|
||
if current is not None:
|
||
items.append(current)
|
||
return items
|
||
|
||
|
||
def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int:
|
||
report_path = Path(args.analyze_from_report)
|
||
if not report_path.exists():
|
||
print(f"报告文件不存在: {report_path}", file=sys.stderr)
|
||
return 1
|
||
|
||
items = parse_report_items(report_path)
|
||
if not items:
|
||
print("报告中未解析到可分析条目", file=sys.stderr)
|
||
return 1
|
||
|
||
pending = [
|
||
it for it in items
|
||
if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析")
|
||
]
|
||
if not pending:
|
||
print("报告中没有待分析条目(可能已全部分析完成)")
|
||
output_path.write_text(build_report(items), encoding="utf-8")
|
||
return 0
|
||
|
||
batch_size = max(args.batch_size, 1)
|
||
batch_index = max(args.batch_index, 1)
|
||
start = (batch_index - 1) * batch_size
|
||
end = start + batch_size
|
||
batch = pending[start:end]
|
||
if not batch:
|
||
print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}")
|
||
output_path.write_text(build_report(items), encoding="utf-8")
|
||
return 0
|
||
|
||
print(
|
||
f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, "
|
||
f"本批{len(batch)}条, 待分析总数{len(pending)}"
|
||
)
|
||
|
||
key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
|
||
for idx, it in enumerate(batch, start=1):
|
||
print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})")
|
||
try:
|
||
analysis = analyze_titles(it["name"], it["url"], it["titles"])
|
||
origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
|
||
if origin_idx is not None:
|
||
items[origin_idx]["analysis"] = analysis
|
||
items[origin_idx]["error"] = ""
|
||
except Exception as exc: # noqa: BLE001
|
||
origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
|
||
if origin_idx is not None:
|
||
items[origin_idx]["error"] = str(exc)
|
||
time.sleep(max(args.sleep_seconds, 0.0))
|
||
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
output_path.write_text(build_report(items), encoding="utf-8")
|
||
print(f"分批分析报告已生成: {output_path}")
|
||
return 0
|
||
|
||
|
||
def load_up_items(input_path: Path) -> list[UpItem]:
|
||
raw = json.loads(input_path.read_text(encoding="utf-8"))
|
||
if not isinstance(raw, list):
|
||
raise ValueError("输入 JSON 必须是数组")
|
||
|
||
items: list[UpItem] = []
|
||
for idx, obj in enumerate(raw):
|
||
if not isinstance(obj, dict):
|
||
raise ValueError(f"第 {idx + 1} 项不是对象")
|
||
mid = obj.get("mid")
|
||
name = obj.get("name", "")
|
||
tags = obj.get("tag", [])
|
||
if mid is None:
|
||
continue
|
||
try:
|
||
mid_int = int(mid)
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if not isinstance(name, str):
|
||
name = str(name)
|
||
if not isinstance(tags, list):
|
||
tags = []
|
||
tags = [str(t) for t in tags]
|
||
items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags))
|
||
return items
|
||
|
||
|
||
def http_get_json(
|
||
url: str,
|
||
timeout: float = 20.0,
|
||
referer: str = "https://space.bilibili.com/",
|
||
) -> dict[str, Any]:
|
||
headers = {
|
||
"User-Agent": DEFAULT_USER_AGENT,
|
||
"Referer": referer,
|
||
"Origin": "https://www.bilibili.com",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
}
|
||
cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
|
||
if cookie:
|
||
headers["Cookie"] = cookie
|
||
req = request.Request(url, headers=headers, method="GET")
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
body = resp.read().decode("utf-8", errors="replace")
|
||
return json.loads(body)
|
||
|
||
|
||
def http_get_text(
|
||
url: str,
|
||
timeout: float = 20.0,
|
||
referer: str = "https://space.bilibili.com/",
|
||
) -> str:
|
||
headers = {
|
||
"User-Agent": DEFAULT_USER_AGENT,
|
||
"Referer": referer,
|
||
"Origin": "https://www.bilibili.com",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
}
|
||
cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
|
||
if cookie:
|
||
headers["Cookie"] = cookie
|
||
req = request.Request(url, headers=headers, method="GET")
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
return resp.read().decode("utf-8", errors="replace")
|
||
|
||
|
||
def get_mixin_key(img_key: str, sub_key: str) -> str:
|
||
origin = img_key + sub_key
|
||
mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB)
|
||
return mixed[:32]
|
||
|
||
|
||
def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]:
|
||
params = {k: str(v) for k, v in base_params.items()}
|
||
params["wts"] = str(int(time.time()))
|
||
params = dict(sorted(params.items()))
|
||
filtered = {
|
||
k: re.sub(r"[!'()*]", "", v)
|
||
for k, v in params.items()
|
||
}
|
||
query = parse.urlencode(filtered)
|
||
w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest()
|
||
filtered["w_rid"] = w_rid
|
||
return filtered
|
||
|
||
|
||
def get_wbi_mixin_key() -> str:
|
||
data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/")
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(
|
||
f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}"
|
||
)
|
||
wbi_img = data.get("data", {}).get("wbi_img", {})
|
||
img_url = wbi_img.get("img_url", "")
|
||
sub_url = wbi_img.get("sub_url", "")
|
||
if not img_url or not sub_url:
|
||
raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url")
|
||
img_key = img_url.rsplit("/", 1)[-1].split(".")[0]
|
||
sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0]
|
||
return get_mixin_key(img_key, sub_key)
|
||
|
||
|
||
def parse_titles_from_data(data: dict[str, Any]) -> list[str]:
|
||
vlist = data.get("data", {}).get("list", {}).get("vlist", [])
|
||
if not isinstance(vlist, list):
|
||
return []
|
||
titles: list[str] = []
|
||
for item in vlist:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
title = item.get("title", "")
|
||
if isinstance(title, str) and title.strip():
|
||
titles.append(clean_html(title.strip()))
|
||
return titles
|
||
|
||
|
||
def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]:
|
||
url = f"https://space.bilibili.com/{mid}/video"
|
||
html_text = http_get_text(url, referer="https://www.bilibili.com/")
|
||
|
||
# 页面中视频封面<img>常携带标题到alt字段,优先从这里提取。
|
||
alt_candidates = re.findall(
|
||
r'<img[^>]*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"',
|
||
html_text,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
|
||
titles: list[str] = []
|
||
seen: set[str] = set()
|
||
for raw in alt_candidates:
|
||
t = clean_html(html.unescape(raw)).strip()
|
||
if not t or t in seen:
|
||
continue
|
||
seen.add(t)
|
||
titles.append(t)
|
||
if len(titles) >= titles_per_up:
|
||
break
|
||
|
||
if debug:
|
||
print(f"[debug] HTML模式提取到 {len(titles)} 条标题")
|
||
return titles
|
||
|
||
|
||
def fetch_titles(
|
||
mid: int,
|
||
titles_per_up: int,
|
||
retry_times: int = 3,
|
||
debug: bool = False,
|
||
fetch_mode: str = "auto",
|
||
) -> list[str]:
|
||
base_params = {
|
||
"mid": str(mid),
|
||
"pn": "1",
|
||
"ps": str(titles_per_up),
|
||
"order": "pubdate",
|
||
"index": "1",
|
||
"jsonp": "json",
|
||
}
|
||
|
||
errors: list[str] = []
|
||
if fetch_mode in ("auto", "api"):
|
||
# 优先使用wbi接口,稳定性通常更好。
|
||
mixin_key = ""
|
||
try:
|
||
mixin_key = get_wbi_mixin_key()
|
||
except Exception as exc: # noqa: BLE001
|
||
if debug:
|
||
print(f"[debug] 获取wbi密钥失败: {exc}")
|
||
|
||
for attempt in range(1, max(retry_times, 1) + 1):
|
||
try:
|
||
if mixin_key:
|
||
signed = build_wbi_params(base_params, mixin_key)
|
||
url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}"
|
||
else:
|
||
url = f"{BILIBILI_API}?{parse.urlencode(base_params)}"
|
||
data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video")
|
||
code = data.get("code", -1)
|
||
if code == 0:
|
||
titles = parse_titles_from_data(data)
|
||
if titles:
|
||
return titles
|
||
errors.append("接口返回成功但标题为空")
|
||
else:
|
||
errors.append(f"code={code}, message={data.get('message', 'unknown')} ")
|
||
except error.HTTPError as exc:
|
||
errors.append(f"HTTP {exc.code} {exc.reason}")
|
||
except Exception as exc: # noqa: BLE001
|
||
errors.append(str(exc))
|
||
|
||
sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0))
|
||
if debug:
|
||
print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]},{sleep_for:.1f}s后重试")
|
||
time.sleep(sleep_for)
|
||
|
||
if fetch_mode in ("auto", "html"):
|
||
try:
|
||
html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug)
|
||
if html_titles:
|
||
return html_titles
|
||
errors.append("HTML模式未提取到标题")
|
||
except Exception as exc: # noqa: BLE001
|
||
errors.append(f"HTML模式失败: {exc}")
|
||
|
||
joined = "; ".join(errors[-3:])
|
||
if ("412" in joined) or ("-799" in joined):
|
||
hint = "提示: 请在脚本里填写BILIBILI_COOKIE,或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\""
|
||
raise RuntimeError(f"{joined}; {hint}")
|
||
raise RuntimeError(joined)
|
||
|
||
|
||
def clean_html(text: str) -> str:
|
||
return re.sub(r"<[^>]+>", "", text)
|
||
|
||
|
||
def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str:
|
||
api_key = VOLCENGINE_API_KEY.strip()
|
||
base_url = VOLCENGINE_BASE_URL.strip()
|
||
model = VOLCENGINE_MODEL.strip()
|
||
|
||
if (not api_key) or ("在这里填" in api_key):
|
||
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
|
||
if (not model) or ("在这里填" in model):
|
||
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
|
||
if not base_url:
|
||
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
|
||
|
||
url = f"{base_url.rstrip('/')}/chat/completions"
|
||
payload = {
|
||
"model": model,
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt},
|
||
],
|
||
"temperature": 0.4,
|
||
}
|
||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
|
||
req = request.Request(
|
||
url,
|
||
data=data,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {api_key}",
|
||
},
|
||
method="POST",
|
||
)
|
||
|
||
with request.urlopen(req, timeout=60) as resp:
|
||
body = resp.read().decode("utf-8", errors="replace")
|
||
result = json.loads(body)
|
||
content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||
if not isinstance(content, str) or not content.strip():
|
||
raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}")
|
||
return content.strip()
|
||
|
||
|
||
def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str:
|
||
system_prompt = (
|
||
"你是一个内容分析助手。根据视频标题判断UP主内容方向,并给出是否建议取关。"
|
||
"输出必须是简体中文,且严格按照用户给定的Markdown格式。"
|
||
)
|
||
joined_titles = "\n".join(f"- {t}" for t in titles)
|
||
user_prompt = f"""
|
||
请分析以下UP主最近视频标题:
|
||
|
||
UP主:{up_name}
|
||
主页:{up_url}
|
||
标题:
|
||
{joined_titles}
|
||
|
||
请按以下格式输出(不要增加其它段落):
|
||
1) 内容定位:一句话
|
||
2) 受众画像:一句话
|
||
3) 近期内容倾向:2-3点,使用-开头
|
||
4) 质量评价:80-120字
|
||
5) 取关建议:保留关注/可以取关(二选一)
|
||
6) 建议理由:50-100字
|
||
""".strip()
|
||
return call_volcengine_chat(system_prompt, user_prompt)
|
||
|
||
|
||
def build_report(results: list[dict[str, Any]]) -> str:
|
||
now = time.strftime("%Y-%m-%d %H:%M:%S")
|
||
lines: list[str] = []
|
||
lines.append("# UP主内容分析报告")
|
||
lines.append("")
|
||
lines.append(f"- 生成时间: {now}")
|
||
lines.append(f"- 分析数量: {len(results)}")
|
||
lines.append("")
|
||
|
||
for idx, item in enumerate(results, start=1):
|
||
lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
|
||
lines.append("")
|
||
lines.append(f"- 主页: {item['url']}")
|
||
tags = item.get("tag", [])
|
||
lines.append(f"- 标签: {', '.join(tags) if tags else '无'}")
|
||
lines.append("")
|
||
lines.append("### 最近10条标题")
|
||
lines.append("")
|
||
titles = item.get("titles", [])
|
||
if titles:
|
||
for t in titles:
|
||
lines.append(f"- {t}")
|
||
else:
|
||
lines.append("- (未抓取到标题)")
|
||
lines.append("")
|
||
|
||
analysis = item.get("analysis", "")
|
||
if analysis:
|
||
lines.append("### AI分析")
|
||
lines.append("")
|
||
lines.append(analysis)
|
||
lines.append("")
|
||
|
||
error_msg = item.get("error", "")
|
||
if error_msg:
|
||
lines.append("### 异常")
|
||
lines.append("")
|
||
lines.append(f"- {error_msg}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines).rstrip() + "\n"
|
||
|
||
|
||
def main() -> int:
|
||
global RUNTIME_BILIBILI_COOKIE
|
||
args = parse_args()
|
||
RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip()
|
||
input_path = Path(args.input)
|
||
output_path = Path(args.output)
|
||
|
||
if args.analyze_from_report:
|
||
return run_batch_analysis_from_report(args, output_path)
|
||
|
||
if args.test_mid > 0:
|
||
items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ]
|
||
print(f"测试模式: 仅处理 mid={args.test_mid}")
|
||
else:
|
||
if not input_path.exists():
|
||
print(f"输入文件不存在: {input_path}", file=sys.stderr)
|
||
return 1
|
||
|
||
try:
|
||
items = load_up_items(input_path)
|
||
except Exception as exc:
|
||
print(f"加载输入文件失败: {exc}", file=sys.stderr)
|
||
return 1
|
||
|
||
if args.only_tag:
|
||
items = [it for it in items if args.only_tag in it.tag]
|
||
|
||
if args.max_ups and args.max_ups > 0:
|
||
items = items[: args.max_ups]
|
||
|
||
if not items:
|
||
print("没有可处理的 UP 数据", file=sys.stderr)
|
||
return 1
|
||
|
||
print(f"开始处理 {len(items)} 个 UP...")
|
||
if args.skip_ai:
|
||
print("已启用 --skip-ai,仅测试抓取标题")
|
||
if args.debug:
|
||
print(f"[debug] 当前抓取模式: {args.fetch_mode}")
|
||
|
||
results: list[dict[str, Any]] = []
|
||
for idx, item in enumerate(items, start=1):
|
||
up_url = f"https://space.bilibili.com/{item.mid}/video"
|
||
row: dict[str, Any] = {
|
||
"mid": item.mid,
|
||
"name": item.name or f"mid_{item.mid}",
|
||
"tag": item.tag,
|
||
"url": up_url,
|
||
"titles": [],
|
||
"analysis": "",
|
||
"error": "",
|
||
}
|
||
|
||
print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})")
|
||
try:
|
||
titles = fetch_titles(
|
||
item.mid,
|
||
args.titles_per_up,
|
||
retry_times=args.retry_times,
|
||
debug=args.debug,
|
||
fetch_mode=args.fetch_mode,
|
||
)
|
||
row["titles"] = titles
|
||
if not titles:
|
||
row["error"] = "未抓取到标题,可能是接口限制或UP无公开视频"
|
||
elif args.skip_ai:
|
||
row["analysis"] = "测试模式已跳过AI分析"
|
||
else:
|
||
row["analysis"] = analyze_titles(row["name"], up_url, titles)
|
||
except error.HTTPError as exc:
|
||
row["error"] = f"HTTP错误: {exc.code} {exc.reason}"
|
||
except error.URLError as exc:
|
||
row["error"] = f"网络错误: {exc.reason}"
|
||
except Exception as exc: # noqa: BLE001
|
||
row["error"] = str(exc)
|
||
|
||
if args.debug and row["titles"]:
|
||
sample = row["titles"][: min(3, len(row["titles"]))]
|
||
print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}")
|
||
|
||
results.append(row)
|
||
time.sleep(max(args.sleep_seconds, 0))
|
||
|
||
report = build_report(results)
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
output_path.write_text(report, encoding="utf-8")
|
||
print(f"报告已生成: {output_path}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|