#!/usr/bin/env python3 """Extract UPs marked as "可以取关" and output their mids to CSV. Read an UP analysis report and extract all UPs with action "可以取关", then output their mids to a CSV file. """ from __future__ import annotations import argparse import csv import re import sys from pathlib import Path from typing import Any def parse_report(report_path: Path) -> list[dict[str, Any]]: """解析Markdown格式的UP分析报告,返回UP列表""" if not report_path.exists(): return [] text = report_path.read_text(encoding="utf-8") items = [] # 按UP项分割(每个UP项以"## N. 名字 (mid: ...)"开头) pattern = r"^## \d+\. (.+?)\s+\(mid:\s*(\d+)\)" matches = list(re.finditer(pattern, text, re.MULTILINE)) for i, match in enumerate(matches): start = match.start() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) section = text[start:end] name = match.group(1).strip() mid = int(match.group(2)) # 提取建议动作 action_match = re.search(r"- 建议动作: (.+?)(?:\n|$)", section) action = action_match.group(1).strip() if action_match else "" items.append({ "mid": mid, "name": name, "action": action, }) return items def main() -> int: parser = argparse.ArgumentParser(description="从UP分析报告中提取可以取关的UP") parser.add_argument( "--input-report", default="source/output/reports/up_analysis_full_auto.md", help="输入报告路径,默认: source/output/reports/up_analysis_full_auto.md", ) parser.add_argument( "--output-csv", default="source/output/uids/unfollow_mids_list.txt", help="输出文件路径,默认: source/output/uids/unfollow_mids_list.txt", ) parser.add_argument( "--format", choices=["csv", "mid-only", "json"], default="mid-only", help="输出格式:csv(mid,name), mid-only(仅mid逗号分隔), json(JSON格式)", ) parser.add_argument( "--with-names", action="store_true", help="在mid后添加UP名称(仅mid-only格式生效)", ) parser.add_argument( "--split-size", type=int, default=0, help="可选:将mid-only结果按N个一组拆分多个文件,例如100", ) args = parser.parse_args() input_report = Path(args.input_report) output_csv = Path(args.output_csv) if not input_report.exists(): print(f"错误: 输入报告不存在: {input_report}", file=sys.stderr) return 1 print(f"读取报告: {input_report}") items = parse_report(input_report) if not items: print("未能从报告中解析任何UP", file=sys.stderr) return 1 # 筛选可以取关的UP unfollow_items = [it for it in items if it.get("action") == "可以取关"] print(f"总 UP 数: {len(items)}") print(f"可以取关: {len(unfollow_items)}") if not unfollow_items: print("没有可以取关的UP") return 0 # 输出格式 if args.format == "csv": # 标准CSV格式:mid, name output_csv.parent.mkdir(parents=True, exist_ok=True) with open(output_csv, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["mid", "name"]) writer.writeheader() for item in unfollow_items: writer.writerow({"mid": item["mid"], "name": item["name"]}) print(f"\n✓ 已输出CSV格式到: {output_csv}") print(f" 格式: mid,name") print(f" 行数: {len(unfollow_items)}") elif args.format == "mid-only": # 仅mid,逗号分隔 mids = [str(it["mid"]) for it in unfollow_items] if args.with_names: # mid:name 格式 content = ",".join([f"{it['mid']}:{it['name']}" for it in unfollow_items]) print(f"\n✓ 已输出mid:name列表到: {output_csv}") print(f" 格式: mid1:name1,mid2:name2,...") else: # 仅mid content = ",".join(mids) print(f"\n✓ 已输出mid列表到: {output_csv}") print(f" 格式: mid1,mid2,mid3,...") output_csv.parent.mkdir(parents=True, exist_ok=True) output_csv.write_text(content, encoding="utf-8") print(f" 数量: {len(mids)}") split_size = max(0, int(args.split_size)) if split_size > 0: groups = [mids[i:i + split_size] for i in range(0, len(mids), split_size)] stem = output_csv.stem suffix = output_csv.suffix or ".txt" for i, group in enumerate(groups, start=1): part_path = output_csv.with_name(f"{stem}_{i}{suffix}") part_path.write_text(",".join(group), encoding="utf-8") print(f" 已按每组{split_size}个拆分为{len(groups)}个文件") elif args.format == "json": # JSON格式 import json data = [{"mid": it["mid"], "name": it["name"]} for it in unfollow_items] output_csv.parent.mkdir(parents=True, exist_ok=True) output_csv.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") print(f"\n✓ 已输出JSON格式到: {output_csv}") print(f" 数量: {len(data)}") # 显示前10个示例 if len(unfollow_items) > 0: print(f"\n📋 示例(前10个):") for item in unfollow_items[:10]: print(f" - {item['mid']}: {item['name']}") if len(unfollow_items) > 10: print(f" ... 还有 {len(unfollow_items) - 10} 个") return 0 if __name__ == "__main__": sys.exit(main())