175 lines
5.7 KiB
Python
175 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
||
"""Extract UPs marked as "可以取关" and output their mids to CSV.
|
||
|
||
Read an UP analysis report and extract all UPs with action "可以取关",
|
||
then output their mids to a CSV file.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
def parse_report(report_path: Path) -> list[dict[str, Any]]:
|
||
"""解析Markdown格式的UP分析报告,返回UP列表"""
|
||
if not report_path.exists():
|
||
return []
|
||
|
||
text = report_path.read_text(encoding="utf-8")
|
||
items = []
|
||
|
||
# 按UP项分割(每个UP项以"## N. 名字 (mid: ...)"开头)
|
||
pattern = r"^## \d+\. (.+?)\s+\(mid:\s*(\d+)\)"
|
||
matches = list(re.finditer(pattern, text, re.MULTILINE))
|
||
|
||
for i, match in enumerate(matches):
|
||
start = match.start()
|
||
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
|
||
section = text[start:end]
|
||
|
||
name = match.group(1).strip()
|
||
mid = int(match.group(2))
|
||
|
||
# 提取建议动作
|
||
action_match = re.search(r"- 建议动作: (.+?)(?:\n|$)", section)
|
||
action = action_match.group(1).strip() if action_match else ""
|
||
|
||
items.append({
|
||
"mid": mid,
|
||
"name": name,
|
||
"action": action,
|
||
})
|
||
|
||
return items
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="从UP分析报告中提取可以取关的UP")
|
||
parser.add_argument(
|
||
"--input-report",
|
||
default="source/output/reports/2_up_analysis_full_auto.md",
|
||
help="输入报告路径",
|
||
)
|
||
parser.add_argument(
|
||
"--output-csv",
|
||
default="source/output/uids/4_unfollow_mids_list.txt",
|
||
help="输出文件路径",
|
||
)
|
||
parser.add_argument(
|
||
"--format",
|
||
choices=["csv", "mid-only", "json"],
|
||
default="mid-only",
|
||
help="输出格式:csv(mid,name), mid-only(仅mid逗号分隔), json(JSON格式)",
|
||
)
|
||
parser.add_argument(
|
||
"--with-names",
|
||
action="store_true",
|
||
help="在mid后添加UP名称(仅mid-only格式生效)",
|
||
)
|
||
parser.add_argument(
|
||
"--split-size",
|
||
type=int,
|
||
default=0,
|
||
help="可选:将mid-only结果按N个一组拆分多个文件,例如100",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
input_report = Path(args.input_report)
|
||
output_csv = Path(args.output_csv)
|
||
|
||
if not input_report.exists():
|
||
print(f"错误: 输入报告不存在: {input_report}", file=sys.stderr)
|
||
return 1
|
||
|
||
print(f"读取报告: {input_report}")
|
||
items = parse_report(input_report)
|
||
|
||
if not items:
|
||
print("未能从报告中解析任何UP", file=sys.stderr)
|
||
return 1
|
||
|
||
# 筛选可以取关的UP
|
||
unfollow_items = [it for it in items if it.get("action") == "可以取关"]
|
||
|
||
print(f"总 UP 数: {len(items)}")
|
||
print(f"可以取关: {len(unfollow_items)}")
|
||
|
||
if not unfollow_items:
|
||
print("没有可以取关的UP")
|
||
return 0
|
||
|
||
# 输出格式
|
||
if args.format == "csv":
|
||
# 标准CSV格式:mid, name
|
||
output_csv.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(output_csv, "w", newline="", encoding="utf-8") as f:
|
||
writer = csv.DictWriter(f, fieldnames=["mid", "name"])
|
||
writer.writeheader()
|
||
for item in unfollow_items:
|
||
writer.writerow({"mid": item["mid"], "name": item["name"]})
|
||
|
||
print(f"\n✓ 已输出CSV格式到: {output_csv}")
|
||
print(f" 格式: mid,name")
|
||
print(f" 行数: {len(unfollow_items)}")
|
||
|
||
elif args.format == "mid-only":
|
||
# 仅mid,逗号分隔
|
||
mids = [str(it["mid"]) for it in unfollow_items]
|
||
|
||
if args.with_names:
|
||
# mid:name 格式
|
||
content = ",".join([f"{it['mid']}:{it['name']}" for it in unfollow_items])
|
||
print(f"\n✓ 已输出mid:name列表到: {output_csv}")
|
||
print(f" 格式: mid1:name1,mid2:name2,...")
|
||
else:
|
||
# 仅mid
|
||
content = ",".join(mids)
|
||
print(f"\n✓ 已输出mid列表到: {output_csv}")
|
||
print(f" 格式: mid1,mid2,mid3,...")
|
||
|
||
output_csv.parent.mkdir(parents=True, exist_ok=True)
|
||
output_csv.write_text(content, encoding="utf-8")
|
||
print(f" 数量: {len(mids)}")
|
||
|
||
split_size = max(0, int(args.split_size))
|
||
if split_size > 0:
|
||
groups = [mids[i:i + split_size] for i in range(0, len(mids), split_size)]
|
||
stem = output_csv.stem
|
||
suffix = output_csv.suffix or ".txt"
|
||
for i, group in enumerate(groups, start=1):
|
||
part_path = output_csv.with_name(f"{stem}_{i}{suffix}")
|
||
part_path.write_text(",".join(group), encoding="utf-8")
|
||
print(f" 已按每组{split_size}个拆分为{len(groups)}个文件")
|
||
|
||
elif args.format == "json":
|
||
# JSON格式
|
||
import json
|
||
|
||
data = [{"mid": it["mid"], "name": it["name"]} for it in unfollow_items]
|
||
output_csv.parent.mkdir(parents=True, exist_ok=True)
|
||
output_csv.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
print(f"\n✓ 已输出JSON格式到: {output_csv}")
|
||
print(f" 数量: {len(data)}")
|
||
|
||
# 显示前10个示例
|
||
if len(unfollow_items) > 0:
|
||
print(f"\n📋 示例(前10个):")
|
||
for item in unfollow_items[:10]:
|
||
print(f" - {item['mid']}: {item['name']}")
|
||
|
||
if len(unfollow_items) > 10:
|
||
print(f" ... 还有 {len(unfollow_items) - 10} 个")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|