#!/usr/bin/env python3 """Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API. Input JSON format (list of objects): [ {"mid": 12345, "name": "UP Name", "tag": ["准备取关"]} ] """ from __future__ import annotations import argparse import hashlib import html import json import random import re import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any from urllib import error, parse, request BILIBILI_API = "https://api.bilibili.com/x/space/arc/search" BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search" BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav" # 可选:如果仍频繁触发412,可填浏览器里复制的Cookie字符串。 BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696" RUNTIME_BILIBILI_COOKIE = "" DEFAULT_USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) MIXIN_KEY_ENC_TAB = [ 46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52, ] # 在这里直接填写火山引擎配置。 VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a" VOLCENGINE_MODEL = "deepseek-v3-1-terminus" VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3" @dataclass class UpItem: mid: int name: str tag: list[str] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="抓取 UP 前10个视频标题,并调用火山引擎 API 生成分析报告" ) parser.add_argument( "--input", default="./source/resources/export_uids.json", help="输入 JSON 文件路径,默认: ./source/resources/export_uids.json", ) parser.add_argument( "--output", default="./source/output/reports/up_titles_report.md", help="输出 Markdown 报告路径,默认: ./source/output/reports/up_titles_report.md", ) parser.add_argument( "--titles-per-up", type=int, default=10, help="每个 UP 抓取的视频标题数量,默认: 10", ) parser.add_argument( "--max-ups", type=int, default=0, help="最多处理多少个 UP,0 表示全部", ) parser.add_argument( "--only-tag", default="", help="只处理包含该标签的 UP,例如: 准备取关;留空表示不过滤", ) parser.add_argument( "--sleep-seconds", type=float, default=0.8, help="每个 UP 抓取后的等待秒数,默认: 0.8", ) parser.add_argument( "--retry-times", type=int, default=3, help="抓取重试次数(遇到412/-799时),默认: 3", ) parser.add_argument( "--test-mid", type=int, default=0, help="测试模式:只抓取这个mid,不读取输入文件", ) parser.add_argument( "--test-name", default="TEST_UP", help="测试模式下显示名称,默认: TEST_UP", ) parser.add_argument( "--skip-ai", action="store_true", help="只测试抓取,不调用AI分析", ) parser.add_argument( "--debug", action="store_true", help="输出抓取调试信息", ) parser.add_argument( "--bili-cookie", default="", help="可选:运行时传入B站Cookie,优先级高于脚本内BILIBILI_COOKIE", ) parser.add_argument( "--fetch-mode", choices=["auto", "api", "html"], default="auto", help="抓取模式: auto(先API后HTML)/api/html,默认: auto", ) parser.add_argument( "--analyze-from-report", default="", help="从已有报告读取标题并仅执行AI分析,例如: source/up_analysis_report.md", ) parser.add_argument( "--batch-size", type=int, default=30, help="分批分析时每批数量,默认: 30", ) parser.add_argument( "--batch-index", type=int, default=1, help="分批分析批次序号(从1开始),默认: 1", ) return parser.parse_args() def parse_report_items(report_path: Path) -> list[dict[str, Any]]: lines = report_path.read_text(encoding="utf-8").splitlines() items: list[dict[str, Any]] = [] current: dict[str, Any] | None = None section = "" for line in lines: m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line) if m: if current is not None: items.append(current) current = { "mid": int(m.group(2)), "name": m.group(1).strip(), "tag": [], "url": f"https://space.bilibili.com/{int(m.group(2))}/video", "titles": [], "analysis": "", "error": "", } section = "" continue if current is None: continue if line.startswith("- 主页: "): current["url"] = line.replace("- 主页: ", "", 1).strip() continue if line.startswith("- 标签: "): raw_tag = line.replace("- 标签: ", "", 1).strip() current["tag"] = [] if raw_tag in ("", "无") else [x.strip() for x in raw_tag.split(",") if x.strip()] continue if line == "### 最近10条标题": section = "titles" continue if line == "### AI分析": section = "analysis" continue if line == "### 异常": section = "error" continue if line.startswith("### "): section = "" continue if section == "titles" and line.startswith("- "): t = line[2:].strip() if t and t != "(未抓取到标题)": current["titles"].append(t) elif section == "analysis": if line.strip(): if current["analysis"]: current["analysis"] += "\n" + line.strip() else: current["analysis"] = line.strip() elif section == "error" and line.startswith("- "): current["error"] = line[2:].strip() if current is not None: items.append(current) return items def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int: report_path = Path(args.analyze_from_report) if not report_path.exists(): print(f"报告文件不存在: {report_path}", file=sys.stderr) return 1 items = parse_report_items(report_path) if not items: print("报告中未解析到可分析条目", file=sys.stderr) return 1 pending = [ it for it in items if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析") ] if not pending: print("报告中没有待分析条目(可能已全部分析完成)") output_path.write_text(build_report(items), encoding="utf-8") return 0 batch_size = max(args.batch_size, 1) batch_index = max(args.batch_index, 1) start = (batch_index - 1) * batch_size end = start + batch_size batch = pending[start:end] if not batch: print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}") output_path.write_text(build_report(items), encoding="utf-8") return 0 print( f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, " f"本批{len(batch)}条, 待分析总数{len(pending)}" ) key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)} for idx, it in enumerate(batch, start=1): print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})") try: analysis = analyze_titles(it["name"], it["url"], it["titles"]) origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}") if origin_idx is not None: items[origin_idx]["analysis"] = analysis items[origin_idx]["error"] = "" except Exception as exc: # noqa: BLE001 origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}") if origin_idx is not None: items[origin_idx]["error"] = str(exc) time.sleep(max(args.sleep_seconds, 0.0)) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(build_report(items), encoding="utf-8") print(f"分批分析报告已生成: {output_path}") return 0 def load_up_items(input_path: Path) -> list[UpItem]: raw = json.loads(input_path.read_text(encoding="utf-8")) if not isinstance(raw, list): raise ValueError("输入 JSON 必须是数组") items: list[UpItem] = [] for idx, obj in enumerate(raw): if not isinstance(obj, dict): raise ValueError(f"第 {idx + 1} 项不是对象") mid = obj.get("mid") name = obj.get("name", "") tags = obj.get("tag", []) if mid is None: continue try: mid_int = int(mid) except (TypeError, ValueError): continue if not isinstance(name, str): name = str(name) if not isinstance(tags, list): tags = [] tags = [str(t) for t in tags] items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags)) return items def http_get_json( url: str, timeout: float = 20.0, referer: str = "https://space.bilibili.com/", ) -> dict[str, Any]: headers = { "User-Agent": DEFAULT_USER_AGENT, "Referer": referer, "Origin": "https://www.bilibili.com", "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip() if cookie: headers["Cookie"] = cookie req = request.Request(url, headers=headers, method="GET") with request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8", errors="replace") return json.loads(body) def http_get_text( url: str, timeout: float = 20.0, referer: str = "https://space.bilibili.com/", ) -> str: headers = { "User-Agent": DEFAULT_USER_AGENT, "Referer": referer, "Origin": "https://www.bilibili.com", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip() if cookie: headers["Cookie"] = cookie req = request.Request(url, headers=headers, method="GET") with request.urlopen(req, timeout=timeout) as resp: return resp.read().decode("utf-8", errors="replace") def get_mixin_key(img_key: str, sub_key: str) -> str: origin = img_key + sub_key mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB) return mixed[:32] def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]: params = {k: str(v) for k, v in base_params.items()} params["wts"] = str(int(time.time())) params = dict(sorted(params.items())) filtered = { k: re.sub(r"[!'()*]", "", v) for k, v in params.items() } query = parse.urlencode(filtered) w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest() filtered["w_rid"] = w_rid return filtered def get_wbi_mixin_key() -> str: data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/") if data.get("code") != 0: raise RuntimeError( f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}" ) wbi_img = data.get("data", {}).get("wbi_img", {}) img_url = wbi_img.get("img_url", "") sub_url = wbi_img.get("sub_url", "") if not img_url or not sub_url: raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url") img_key = img_url.rsplit("/", 1)[-1].split(".")[0] sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0] return get_mixin_key(img_key, sub_key) def parse_titles_from_data(data: dict[str, Any]) -> list[str]: vlist = data.get("data", {}).get("list", {}).get("vlist", []) if not isinstance(vlist, list): return [] titles: list[str] = [] for item in vlist: if not isinstance(item, dict): continue title = item.get("title", "") if isinstance(title, str) and title.strip(): titles.append(clean_html(title.strip())) return titles def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]: url = f"https://space.bilibili.com/{mid}/video" html_text = http_get_text(url, referer="https://www.bilibili.com/") # 页面中视频封面常携带标题到alt字段,优先从这里提取。 alt_candidates = re.findall( r']*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"', html_text, flags=re.IGNORECASE, ) titles: list[str] = [] seen: set[str] = set() for raw in alt_candidates: t = clean_html(html.unescape(raw)).strip() if not t or t in seen: continue seen.add(t) titles.append(t) if len(titles) >= titles_per_up: break if debug: print(f"[debug] HTML模式提取到 {len(titles)} 条标题") return titles def fetch_titles( mid: int, titles_per_up: int, retry_times: int = 3, debug: bool = False, fetch_mode: str = "auto", ) -> list[str]: base_params = { "mid": str(mid), "pn": "1", "ps": str(titles_per_up), "order": "pubdate", "index": "1", "jsonp": "json", } errors: list[str] = [] if fetch_mode in ("auto", "api"): # 优先使用wbi接口,稳定性通常更好。 mixin_key = "" try: mixin_key = get_wbi_mixin_key() except Exception as exc: # noqa: BLE001 if debug: print(f"[debug] 获取wbi密钥失败: {exc}") for attempt in range(1, max(retry_times, 1) + 1): try: if mixin_key: signed = build_wbi_params(base_params, mixin_key) url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}" else: url = f"{BILIBILI_API}?{parse.urlencode(base_params)}" data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video") code = data.get("code", -1) if code == 0: titles = parse_titles_from_data(data) if titles: return titles errors.append("接口返回成功但标题为空") else: errors.append(f"code={code}, message={data.get('message', 'unknown')} ") except error.HTTPError as exc: errors.append(f"HTTP {exc.code} {exc.reason}") except Exception as exc: # noqa: BLE001 errors.append(str(exc)) sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0)) if debug: print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]},{sleep_for:.1f}s后重试") time.sleep(sleep_for) if fetch_mode in ("auto", "html"): try: html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug) if html_titles: return html_titles errors.append("HTML模式未提取到标题") except Exception as exc: # noqa: BLE001 errors.append(f"HTML模式失败: {exc}") joined = "; ".join(errors[-3:]) if ("412" in joined) or ("-799" in joined): hint = "提示: 请在脚本里填写BILIBILI_COOKIE,或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\"" raise RuntimeError(f"{joined}; {hint}") raise RuntimeError(joined) def clean_html(text: str) -> str: return re.sub(r"<[^>]+>", "", text) def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str: api_key = VOLCENGINE_API_KEY.strip() base_url = VOLCENGINE_BASE_URL.strip() model = VOLCENGINE_MODEL.strip() if (not api_key) or ("在这里填" in api_key): raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY") if (not model) or ("在这里填" in model): raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL") if not base_url: raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL") url = f"{base_url.rstrip('/')}/chat/completions" payload = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": 0.4, } data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = request.Request( url, data=data, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", }, method="POST", ) with request.urlopen(req, timeout=60) as resp: body = resp.read().decode("utf-8", errors="replace") result = json.loads(body) content = result.get("choices", [{}])[0].get("message", {}).get("content", "") if not isinstance(content, str) or not content.strip(): raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}") return content.strip() def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str: system_prompt = ( "你是一个内容分析助手。根据视频标题判断UP主内容方向,并给出是否建议取关。" "输出必须是简体中文,且严格按照用户给定的Markdown格式。" ) joined_titles = "\n".join(f"- {t}" for t in titles) user_prompt = f""" 请分析以下UP主最近视频标题: UP主:{up_name} 主页:{up_url} 标题: {joined_titles} 请按以下格式输出(不要增加其它段落): 1) 内容定位:一句话 2) 受众画像:一句话 3) 近期内容倾向:2-3点,使用-开头 4) 质量评价:80-120字 5) 取关建议:保留关注/可以取关(二选一) 6) 建议理由:50-100字 """.strip() return call_volcengine_chat(system_prompt, user_prompt) def build_report(results: list[dict[str, Any]]) -> str: now = time.strftime("%Y-%m-%d %H:%M:%S") lines: list[str] = [] lines.append("# UP主内容分析报告") lines.append("") lines.append(f"- 生成时间: {now}") lines.append(f"- 分析数量: {len(results)}") lines.append("") for idx, item in enumerate(results, start=1): lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})") lines.append("") lines.append(f"- 主页: {item['url']}") tags = item.get("tag", []) lines.append(f"- 标签: {', '.join(tags) if tags else '无'}") lines.append("") lines.append("### 最近10条标题") lines.append("") titles = item.get("titles", []) if titles: for t in titles: lines.append(f"- {t}") else: lines.append("- (未抓取到标题)") lines.append("") analysis = item.get("analysis", "") if analysis: lines.append("### AI分析") lines.append("") lines.append(analysis) lines.append("") error_msg = item.get("error", "") if error_msg: lines.append("### 异常") lines.append("") lines.append(f"- {error_msg}") lines.append("") return "\n".join(lines).rstrip() + "\n" def main() -> int: global RUNTIME_BILIBILI_COOKIE args = parse_args() RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip() input_path = Path(args.input) output_path = Path(args.output) if args.analyze_from_report: return run_batch_analysis_from_report(args, output_path) if args.test_mid > 0: items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ] print(f"测试模式: 仅处理 mid={args.test_mid}") else: if not input_path.exists(): print(f"输入文件不存在: {input_path}", file=sys.stderr) return 1 try: items = load_up_items(input_path) except Exception as exc: print(f"加载输入文件失败: {exc}", file=sys.stderr) return 1 if args.only_tag: items = [it for it in items if args.only_tag in it.tag] if args.max_ups and args.max_ups > 0: items = items[: args.max_ups] if not items: print("没有可处理的 UP 数据", file=sys.stderr) return 1 print(f"开始处理 {len(items)} 个 UP...") if args.skip_ai: print("已启用 --skip-ai,仅测试抓取标题") if args.debug: print(f"[debug] 当前抓取模式: {args.fetch_mode}") results: list[dict[str, Any]] = [] for idx, item in enumerate(items, start=1): up_url = f"https://space.bilibili.com/{item.mid}/video" row: dict[str, Any] = { "mid": item.mid, "name": item.name or f"mid_{item.mid}", "tag": item.tag, "url": up_url, "titles": [], "analysis": "", "error": "", } print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})") try: titles = fetch_titles( item.mid, args.titles_per_up, retry_times=args.retry_times, debug=args.debug, fetch_mode=args.fetch_mode, ) row["titles"] = titles if not titles: row["error"] = "未抓取到标题,可能是接口限制或UP无公开视频" elif args.skip_ai: row["analysis"] = "测试模式已跳过AI分析" else: row["analysis"] = analyze_titles(row["name"], up_url, titles) except error.HTTPError as exc: row["error"] = f"HTTP错误: {exc.code} {exc.reason}" except error.URLError as exc: row["error"] = f"网络错误: {exc.reason}" except Exception as exc: # noqa: BLE001 row["error"] = str(exc) if args.debug and row["titles"]: sample = row["titles"][: min(3, len(row["titles"]))] print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}") results.append(row) time.sleep(max(args.sleep_seconds, 0)) report = build_report(results) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(report, encoding="utf-8") print(f"报告已生成: {output_path}") return 0 if __name__ == "__main__": raise SystemExit(main())