Files
bili_follow_group/source/analyze_up_content.py
digouyou b34239f5ea first_test
Co-authored-by: Copilot <copilot@github.com>
2026-04-26 19:26:17 +08:00

691 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API.
Input JSON format (list of objects):
[
{"mid": 12345, "name": "UP Name", "tag": ["准备取关"]}
]
"""
from __future__ import annotations
import argparse
import hashlib
import html
import json
import random
import re
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib import error, parse, request
BILIBILI_API = "https://api.bilibili.com/x/space/arc/search"
BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search"
BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav"
# 可选如果仍频繁触发412可填浏览器里复制的Cookie字符串。
BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696"
RUNTIME_BILIBILI_COOKIE = ""
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
MIXIN_KEY_ENC_TAB = [
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35,
27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13,
37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4,
22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52,
]
# 在这里直接填写火山引擎配置。
VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
@dataclass
class UpItem:
mid: int
name: str
tag: list[str]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="抓取 UP 前10个视频标题并调用火山引擎 API 生成分析报告"
)
parser.add_argument(
"--input",
default="source/resources/export_uids.json",
help="输入 JSON 文件路径,默认: source/resources/export_uids.json",
)
parser.add_argument(
"--output",
default="source/output/reports/up_titles_report.md",
help="输出 Markdown 报告路径,默认: source/output/reports/up_titles_report.md",
)
parser.add_argument(
"--titles-per-up",
type=int,
default=10,
help="每个 UP 抓取的视频标题数量,默认: 10",
)
parser.add_argument(
"--max-ups",
type=int,
default=0,
help="最多处理多少个 UP0 表示全部",
)
parser.add_argument(
"--only-tag",
default="",
help="只处理包含该标签的 UP例如: 准备取关;留空表示不过滤",
)
parser.add_argument(
"--sleep-seconds",
type=float,
default=0.8,
help="每个 UP 抓取后的等待秒数,默认: 0.8",
)
parser.add_argument(
"--retry-times",
type=int,
default=3,
help="抓取重试次数遇到412/-799时默认: 3",
)
parser.add_argument(
"--test-mid",
type=int,
default=0,
help="测试模式只抓取这个mid不读取输入文件",
)
parser.add_argument(
"--test-name",
default="TEST_UP",
help="测试模式下显示名称,默认: TEST_UP",
)
parser.add_argument(
"--skip-ai",
action="store_true",
help="只测试抓取不调用AI分析",
)
parser.add_argument(
"--debug",
action="store_true",
help="输出抓取调试信息",
)
parser.add_argument(
"--bili-cookie",
default="",
help="可选运行时传入B站Cookie优先级高于脚本内BILIBILI_COOKIE",
)
parser.add_argument(
"--fetch-mode",
choices=["auto", "api", "html"],
default="auto",
help="抓取模式: auto(先API后HTML)/api/html默认: auto",
)
parser.add_argument(
"--analyze-from-report",
default="",
help="从已有报告读取标题并仅执行AI分析例如: source/up_analysis_report.md",
)
parser.add_argument(
"--batch-size",
type=int,
default=30,
help="分批分析时每批数量,默认: 30",
)
parser.add_argument(
"--batch-index",
type=int,
default=1,
help="分批分析批次序号(从1开始),默认: 1",
)
return parser.parse_args()
def parse_report_items(report_path: Path) -> list[dict[str, Any]]:
lines = report_path.read_text(encoding="utf-8").splitlines()
items: list[dict[str, Any]] = []
current: dict[str, Any] | None = None
section = ""
for line in lines:
m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
if m:
if current is not None:
items.append(current)
current = {
"mid": int(m.group(2)),
"name": m.group(1).strip(),
"tag": [],
"url": f"https://space.bilibili.com/{int(m.group(2))}/video",
"titles": [],
"analysis": "",
"error": "",
}
section = ""
continue
if current is None:
continue
if line.startswith("- 主页: "):
current["url"] = line.replace("- 主页: ", "", 1).strip()
continue
if line.startswith("- 标签: "):
raw_tag = line.replace("- 标签: ", "", 1).strip()
current["tag"] = [] if raw_tag in ("", "") else [x.strip() for x in raw_tag.split(",") if x.strip()]
continue
if line == "### 最近10条标题":
section = "titles"
continue
if line == "### AI分析":
section = "analysis"
continue
if line == "### 异常":
section = "error"
continue
if line.startswith("### "):
section = ""
continue
if section == "titles" and line.startswith("- "):
t = line[2:].strip()
if t and t != "(未抓取到标题)":
current["titles"].append(t)
elif section == "analysis":
if line.strip():
if current["analysis"]:
current["analysis"] += "\n" + line.strip()
else:
current["analysis"] = line.strip()
elif section == "error" and line.startswith("- "):
current["error"] = line[2:].strip()
if current is not None:
items.append(current)
return items
def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int:
report_path = Path(args.analyze_from_report)
if not report_path.exists():
print(f"报告文件不存在: {report_path}", file=sys.stderr)
return 1
items = parse_report_items(report_path)
if not items:
print("报告中未解析到可分析条目", file=sys.stderr)
return 1
pending = [
it for it in items
if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析")
]
if not pending:
print("报告中没有待分析条目(可能已全部分析完成)")
output_path.write_text(build_report(items), encoding="utf-8")
return 0
batch_size = max(args.batch_size, 1)
batch_index = max(args.batch_index, 1)
start = (batch_index - 1) * batch_size
end = start + batch_size
batch = pending[start:end]
if not batch:
print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}")
output_path.write_text(build_report(items), encoding="utf-8")
return 0
print(
f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, "
f"本批{len(batch)}条, 待分析总数{len(pending)}"
)
key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
for idx, it in enumerate(batch, start=1):
print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})")
try:
analysis = analyze_titles(it["name"], it["url"], it["titles"])
origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
if origin_idx is not None:
items[origin_idx]["analysis"] = analysis
items[origin_idx]["error"] = ""
except Exception as exc: # noqa: BLE001
origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
if origin_idx is not None:
items[origin_idx]["error"] = str(exc)
time.sleep(max(args.sleep_seconds, 0.0))
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(build_report(items), encoding="utf-8")
print(f"分批分析报告已生成: {output_path}")
return 0
def load_up_items(input_path: Path) -> list[UpItem]:
raw = json.loads(input_path.read_text(encoding="utf-8"))
if not isinstance(raw, list):
raise ValueError("输入 JSON 必须是数组")
items: list[UpItem] = []
for idx, obj in enumerate(raw):
if not isinstance(obj, dict):
raise ValueError(f"{idx + 1} 项不是对象")
mid = obj.get("mid")
name = obj.get("name", "")
tags = obj.get("tag", [])
if mid is None:
continue
try:
mid_int = int(mid)
except (TypeError, ValueError):
continue
if not isinstance(name, str):
name = str(name)
if not isinstance(tags, list):
tags = []
tags = [str(t) for t in tags]
items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags))
return items
def http_get_json(
url: str,
timeout: float = 20.0,
referer: str = "https://space.bilibili.com/",
) -> dict[str, Any]:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Referer": referer,
"Origin": "https://www.bilibili.com",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
if cookie:
headers["Cookie"] = cookie
req = request.Request(url, headers=headers, method="GET")
with request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
return json.loads(body)
def http_get_text(
url: str,
timeout: float = 20.0,
referer: str = "https://space.bilibili.com/",
) -> str:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Referer": referer,
"Origin": "https://www.bilibili.com",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
if cookie:
headers["Cookie"] = cookie
req = request.Request(url, headers=headers, method="GET")
with request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
def get_mixin_key(img_key: str, sub_key: str) -> str:
origin = img_key + sub_key
mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB)
return mixed[:32]
def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]:
params = {k: str(v) for k, v in base_params.items()}
params["wts"] = str(int(time.time()))
params = dict(sorted(params.items()))
filtered = {
k: re.sub(r"[!'()*]", "", v)
for k, v in params.items()
}
query = parse.urlencode(filtered)
w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest()
filtered["w_rid"] = w_rid
return filtered
def get_wbi_mixin_key() -> str:
data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/")
if data.get("code") != 0:
raise RuntimeError(
f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}"
)
wbi_img = data.get("data", {}).get("wbi_img", {})
img_url = wbi_img.get("img_url", "")
sub_url = wbi_img.get("sub_url", "")
if not img_url or not sub_url:
raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url")
img_key = img_url.rsplit("/", 1)[-1].split(".")[0]
sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0]
return get_mixin_key(img_key, sub_key)
def parse_titles_from_data(data: dict[str, Any]) -> list[str]:
vlist = data.get("data", {}).get("list", {}).get("vlist", [])
if not isinstance(vlist, list):
return []
titles: list[str] = []
for item in vlist:
if not isinstance(item, dict):
continue
title = item.get("title", "")
if isinstance(title, str) and title.strip():
titles.append(clean_html(title.strip()))
return titles
def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]:
url = f"https://space.bilibili.com/{mid}/video"
html_text = http_get_text(url, referer="https://www.bilibili.com/")
# 页面中视频封面<img>常携带标题到alt字段优先从这里提取。
alt_candidates = re.findall(
r'<img[^>]*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"',
html_text,
flags=re.IGNORECASE,
)
titles: list[str] = []
seen: set[str] = set()
for raw in alt_candidates:
t = clean_html(html.unescape(raw)).strip()
if not t or t in seen:
continue
seen.add(t)
titles.append(t)
if len(titles) >= titles_per_up:
break
if debug:
print(f"[debug] HTML模式提取到 {len(titles)} 条标题")
return titles
def fetch_titles(
mid: int,
titles_per_up: int,
retry_times: int = 3,
debug: bool = False,
fetch_mode: str = "auto",
) -> list[str]:
base_params = {
"mid": str(mid),
"pn": "1",
"ps": str(titles_per_up),
"order": "pubdate",
"index": "1",
"jsonp": "json",
}
errors: list[str] = []
if fetch_mode in ("auto", "api"):
# 优先使用wbi接口稳定性通常更好。
mixin_key = ""
try:
mixin_key = get_wbi_mixin_key()
except Exception as exc: # noqa: BLE001
if debug:
print(f"[debug] 获取wbi密钥失败: {exc}")
for attempt in range(1, max(retry_times, 1) + 1):
try:
if mixin_key:
signed = build_wbi_params(base_params, mixin_key)
url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}"
else:
url = f"{BILIBILI_API}?{parse.urlencode(base_params)}"
data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video")
code = data.get("code", -1)
if code == 0:
titles = parse_titles_from_data(data)
if titles:
return titles
errors.append("接口返回成功但标题为空")
else:
errors.append(f"code={code}, message={data.get('message', 'unknown')} ")
except error.HTTPError as exc:
errors.append(f"HTTP {exc.code} {exc.reason}")
except Exception as exc: # noqa: BLE001
errors.append(str(exc))
sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0))
if debug:
print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]}{sleep_for:.1f}s后重试")
time.sleep(sleep_for)
if fetch_mode in ("auto", "html"):
try:
html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug)
if html_titles:
return html_titles
errors.append("HTML模式未提取到标题")
except Exception as exc: # noqa: BLE001
errors.append(f"HTML模式失败: {exc}")
joined = "; ".join(errors[-3:])
if ("412" in joined) or ("-799" in joined):
hint = "提示: 请在脚本里填写BILIBILI_COOKIE或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\""
raise RuntimeError(f"{joined}; {hint}")
raise RuntimeError(joined)
def clean_html(text: str) -> str:
return re.sub(r"<[^>]+>", "", text)
def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str:
api_key = VOLCENGINE_API_KEY.strip()
base_url = VOLCENGINE_BASE_URL.strip()
model = VOLCENGINE_MODEL.strip()
if (not api_key) or ("在这里填" in api_key):
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
if (not model) or ("在这里填" in model):
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
if not base_url:
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
url = f"{base_url.rstrip('/')}/chat/completions"
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.4,
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request(
url,
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
with request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8", errors="replace")
result = json.loads(body)
content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
if not isinstance(content, str) or not content.strip():
raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}")
return content.strip()
def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str:
system_prompt = (
"你是一个内容分析助手。根据视频标题判断UP主内容方向并给出是否建议取关。"
"输出必须是简体中文且严格按照用户给定的Markdown格式。"
)
joined_titles = "\n".join(f"- {t}" for t in titles)
user_prompt = f"""
请分析以下UP主最近视频标题
UP主{up_name}
主页:{up_url}
标题:
{joined_titles}
请按以下格式输出(不要增加其它段落):
1) 内容定位:一句话
2) 受众画像:一句话
3) 近期内容倾向2-3点使用-开头
4) 质量评价80-120字
5) 取关建议:保留关注/可以取关(二选一)
6) 建议理由50-100字
""".strip()
return call_volcengine_chat(system_prompt, user_prompt)
def build_report(results: list[dict[str, Any]]) -> str:
now = time.strftime("%Y-%m-%d %H:%M:%S")
lines: list[str] = []
lines.append("# UP主内容分析报告")
lines.append("")
lines.append(f"- 生成时间: {now}")
lines.append(f"- 分析数量: {len(results)}")
lines.append("")
for idx, item in enumerate(results, start=1):
lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
lines.append("")
lines.append(f"- 主页: {item['url']}")
tags = item.get("tag", [])
lines.append(f"- 标签: {', '.join(tags) if tags else ''}")
lines.append("")
lines.append("### 最近10条标题")
lines.append("")
titles = item.get("titles", [])
if titles:
for t in titles:
lines.append(f"- {t}")
else:
lines.append("- (未抓取到标题)")
lines.append("")
analysis = item.get("analysis", "")
if analysis:
lines.append("### AI分析")
lines.append("")
lines.append(analysis)
lines.append("")
error_msg = item.get("error", "")
if error_msg:
lines.append("### 异常")
lines.append("")
lines.append(f"- {error_msg}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def main() -> int:
global RUNTIME_BILIBILI_COOKIE
args = parse_args()
RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip()
input_path = Path(args.input)
output_path = Path(args.output)
if args.analyze_from_report:
return run_batch_analysis_from_report(args, output_path)
if args.test_mid > 0:
items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ]
print(f"测试模式: 仅处理 mid={args.test_mid}")
else:
if not input_path.exists():
print(f"输入文件不存在: {input_path}", file=sys.stderr)
return 1
try:
items = load_up_items(input_path)
except Exception as exc:
print(f"加载输入文件失败: {exc}", file=sys.stderr)
return 1
if args.only_tag:
items = [it for it in items if args.only_tag in it.tag]
if args.max_ups and args.max_ups > 0:
items = items[: args.max_ups]
if not items:
print("没有可处理的 UP 数据", file=sys.stderr)
return 1
print(f"开始处理 {len(items)} 个 UP...")
if args.skip_ai:
print("已启用 --skip-ai仅测试抓取标题")
if args.debug:
print(f"[debug] 当前抓取模式: {args.fetch_mode}")
results: list[dict[str, Any]] = []
for idx, item in enumerate(items, start=1):
up_url = f"https://space.bilibili.com/{item.mid}/video"
row: dict[str, Any] = {
"mid": item.mid,
"name": item.name or f"mid_{item.mid}",
"tag": item.tag,
"url": up_url,
"titles": [],
"analysis": "",
"error": "",
}
print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})")
try:
titles = fetch_titles(
item.mid,
args.titles_per_up,
retry_times=args.retry_times,
debug=args.debug,
fetch_mode=args.fetch_mode,
)
row["titles"] = titles
if not titles:
row["error"] = "未抓取到标题可能是接口限制或UP无公开视频"
elif args.skip_ai:
row["analysis"] = "测试模式已跳过AI分析"
else:
row["analysis"] = analyze_titles(row["name"], up_url, titles)
except error.HTTPError as exc:
row["error"] = f"HTTP错误: {exc.code} {exc.reason}"
except error.URLError as exc:
row["error"] = f"网络错误: {exc.reason}"
except Exception as exc: # noqa: BLE001
row["error"] = str(exc)
if args.debug and row["titles"]:
sample = row["titles"][: min(3, len(row["titles"]))]
print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}")
results.append(row)
time.sleep(max(args.sleep_seconds, 0))
report = build_report(results)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(f"报告已生成: {output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())