强制停止跟踪 all_i_need 目录

This commit is contained in:
2026-04-26 23:47:06 +08:00
parent e79651eb04
commit 4f2d338101
26 changed files with 0 additions and 2258 deletions

116
readme.md
View File

@@ -1,116 +0,0 @@
# B站关注清理工具 - Scripts 版
> 一键命令运行全流程:`python source/scripts/run_pipeline.py`
python source/scripts/run_pipeline.py --input-json source/resources/export_uids_test5.json
本工具包含7个步骤的完整流水线
1. 抓取视频标题
2. 分批AI分析
3. 生成保留关注报告
4. 生成取关UID列表
5. 按首字母排序
6. 提取分组信息
7. 删除最近10条标题
## 快速开始
```powershell
# 完整流程(推荐)
python source/scripts/run_pipeline.py
# 速度优先
python source/scripts/run_pipeline.py --workers 8 --batch-size 30 --sleep-seconds 0
# 试跑30个UP
python source/scripts/run_pipeline.py --max-ups 30
# 跳过抓取,使用已有标题报告
python source/scripts/run_pipeline.py --skip-fetch
# 跳过分析,仅生成产物
python source/scripts/run_pipeline.py --skip-analyze
# 跳过排序/分组/删除
python source/scripts/run_pipeline.py --skip-sort --skip-group --skip-remove
```
## 输出文件
| 文件 | 说明 |
|------|------|
| `source/output/reports/1_up_titles_report.md` | 标题抓取报告 |
| `source/output/reports/2_up_analysis_full_auto.md` | AI分析报告完整 |
| `source/output/reports/3_up_keep_follow_only.md` | 保留关注报告 |
| `source/output/uids/4_unfollow_mids_list.txt` | 取关UID列表 |
| `source/output/reports/5_sorted_up_analysis.md` | 按首字母排序报告 |
| `source/output/reports/6_group_info.md` | 提取分组信息报告 |
| `source/output/reports/7_no_titles.md` | 最终报告删除最近10条 |
## 常用参数
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `--workers` | 6 | 并发请求数 |
| `--batch-size` | 20 | 每批分析条数 |
| `--max-ups` | 0全部 | 限制处理UP数量 |
| `--split-size` | 100 | UID拆分大小 |
| `--sleep-seconds` | 0 | 任务间隔秒数 |
### 跳过参数
| 参数 | 说明 |
|------|------|
| `--skip-fetch` | 跳过抓取阶段 |
| `--skip-analyze` | 跳过分析阶段 |
| `--skip-sort` | 跳过排序阶段 |
| `--skip-group` | 跳过提取分组阶段 |
| `--skip-remove` | 跳过删除最近10条阶段 |
## 分步执行
### 步骤1抓取标题
```powershell
python source/scripts/analyze_up_content.py --skip-ai
```
### 步骤2分批AI分析
```powershell
python source/scripts/batch_ai_summary_from_report.py --run-all-batches
```
### 步骤3生成保留关注报告
```powershell
python source/scripts/extract_keep_follow_doc.py
```
### 步骤4生成取关UID
```powershell
python source/scripts/extract_unfollow_list.py --format mid-only --split-size 100
```
### 步骤5按首字母排序
```powershell
python source/scripts/sort_up_main.py
```
### 步骤6提取分组信息
```powershell
python source/scripts/extract_group_info.py
```
### 步骤7删除最近10条标题
```powershell
python source/scripts/remove_10content.py
```
## 先配置API
编辑 [source/scripts/analyze_up_content.py](source/scripts/analyze_up_content.py) 顶部配置:
```python
VOLCENGINE_API_KEY = "你的火山引擎API Key"
VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
```

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
1044673687,1481344732,1858861103,444728505,23947287,35807625,111714204,1587138171,440798355,33291981,11914415,436700803,3493282273299102,612593877,2125857107,2000819931,507448807,505935166,14524124,385200931,1769820463,1562896062,3493285194632125,3493264275540254,479424216,604710494,1016523517,1428318343,700817047,543931674,1590538073,1574721168,432752294,3494376355400290,1795221360,4848323,495224316,3493258518858434,379247856,32360194,381653678,274928598,475656605,365212208,3546378525477862,35339643,1747335,1263990139,3493263038220393,251642119,387412319,1212367465,589747109,1025542770,23770618,3494350482836026,54091976,599449178,1715594148,3493127266503448,1767282898,487505057,630874464,1264711195,3537118481615036,319358609,518742534,385172962,4401694,474803476,525382468,3546595513600180,295993972,476819048,21435789,1725223092,2114928296,174471602,1480366563,17095888,295100453,1305776725,25694274,14797570,166828,385126080,3461582166166488,3537120815745590,489302782,73674032,1500074803,68134500,1047158092,3546571071293861,124806013,26055664,441631812,243680430,601300995,108526737,2100151539,3546603229023143,1749224369,3493133887211865,56300844,255139870,23244398,3493291869866324,3494354444355822,3546593938639500,1098004826,94577838,21849780,35105301,423319981,535023713,224560702,3546637651675315,3494361759221832,1640934198,1710911403,14342271,2031277323,603430640,3546568640694467,1741962246,1304346514,283389925,3461575868418125,3546622413768823,3494364269513335,185549749,502539494,73528331,510767506,3461579156752681,238171381,3546627212052911,448165099,1975692083,542824499,16243913,3494354016537425,316627722,1944667205,1433031509,3546387566299549,496787581,3546643550963789,382423121,600428973,430426421,325848853,735958,35162124,668794433,3546390949005555,478548163,3546672034482563,250584301,485234598,1555665460,6776617,108709998,437840703,28378491,67079745,1606682745,629101318,452161580,3493089637305282,374377163,213845897,323713206,272107494,622986240,1773278179,3546656899336980,67141499,318331,285027361,114366178,203983793,1283676771,1965933018,470624011,3546583482239276,3493281239402498,1475977561,2016676980,1209319826,1335124945,416206486,129860965,1780480185,1809567655,245645656,1937416537,1060544882,1335713025,3546617688886097,3546752326044595,3546613148551357,652060948,2116071253,97407861,3546908731639909,3546693165386233,278761367,323588182,486989780,3494353494345852,96609715,264869770,478849208,1679822121,19414347,3493127314737312,702915816,482867012,3546969421122388,3546590214097572,501642082,458165375,3546662484052067,481153145,1159873315,3546857594685834,1508100119,111900,1732848825,3546606469123022,106685726,490494088,1511660367

View File

@@ -1 +0,0 @@
1044673687,1481344732,1858861103,444728505,23947287,35807625,111714204,1587138171,440798355,33291981,11914415,436700803,3493282273299102,612593877,2125857107,2000819931,507448807,505935166,14524124,385200931,1769820463,1562896062,3493285194632125,3493264275540254,479424216,604710494,1016523517,1428318343,700817047,543931674,1590538073,1574721168,432752294,3494376355400290,1795221360,4848323,495224316,3493258518858434,379247856,32360194,381653678,274928598,475656605,365212208,3546378525477862,35339643,1747335,1263990139,3493263038220393,251642119,387412319,1212367465,589747109,1025542770,23770618,3494350482836026,54091976,599449178,1715594148,3493127266503448,1767282898,487505057,630874464,1264711195,3537118481615036,319358609,518742534,385172962,4401694,474803476,525382468,3546595513600180,295993972,476819048,21435789,1725223092,2114928296,174471602,1480366563,17095888,295100453,1305776725,25694274,14797570,166828,385126080,3461582166166488,3537120815745590,489302782,73674032,1500074803,68134500,1047158092,3546571071293861,124806013,26055664,441631812,243680430,601300995,108526737

View File

@@ -1 +0,0 @@
2100151539,3546603229023143,1749224369,3493133887211865,56300844,255139870,23244398,3493291869866324,3494354444355822,3546593938639500,1098004826,94577838,21849780,35105301,423319981,535023713,224560702,3546637651675315,3494361759221832,1640934198,1710911403,14342271,2031277323,603430640,3546568640694467,1741962246,1304346514,283389925,3461575868418125,3546622413768823,3494364269513335,185549749,502539494,73528331,510767506,3461579156752681,238171381,3546627212052911,448165099,1975692083,542824499,16243913,3494354016537425,316627722,1944667205,1433031509,3546387566299549,496787581,3546643550963789,382423121,600428973,430426421,325848853,735958,35162124,668794433,3546390949005555,478548163,3546672034482563,250584301,485234598,1555665460,6776617,108709998,437840703,28378491,67079745,1606682745,629101318,452161580,3493089637305282,374377163,213845897,323713206,272107494,622986240,1773278179,3546656899336980,67141499,318331,285027361,114366178,203983793,1283676771,1965933018,470624011,3546583482239276,3493281239402498,1475977561,2016676980,1209319826,1335124945,416206486,129860965,1780480185,1809567655,245645656,1937416537,1060544882,1335713025

View File

@@ -1 +0,0 @@
3546617688886097,3546752326044595,3546613148551357,652060948,2116071253,97407861,3546908731639909,3546693165386233,278761367,323588182,486989780,3494353494345852,96609715,264869770,478849208,1679822121,19414347,3493127314737312,702915816,482867012,3546969421122388,3546590214097572,501642082,458165375,3546662484052067,481153145,1159873315,3546857594685834,1508100119,111900,1732848825,3546606469123022,106685726,490494088,1511660367

91
source/.gitignore vendored
View File

@@ -1,91 +0,0 @@
# 1. 忽略操作系统自动生成的文件
.DS_Store
Thumbs.db
*.lnk
# 2. 忽略编译/构建产物
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# 3. 忽略IDE配置
.vscode/
.idea/
*.swp
*.swo
*~
# 4. 忽略日志文件
*.log
*.tmp
*.temp
*.md
source/all_i_need/
# 5. 忽略敏感数据
*.env
*.key
*.pem
*.cert
config.yaml
secrets/
# 6. 忽略大型媒体文件
*.mp4
*.mov
*.avi
*.wav
*.mp3
*.zip
*.tar
*.gz
*.7z
*.rar
# 7. 忽略数据分析/机器学习特有
*.model
*.h5
*.pkl
*.joblib
.ipynb_checkpoints/
# 8. 忽略你项目中的自动生成目录
# 根据你的目录结构忽略source/output/和source/reports/下的所有文件
# 但保留目录结构本身(可以添加空的.gitkeep文件来保持空目录
source/output/**/*
!source/output/.gitkeep
source/reports/**/*
!source/reports/.gitkeep
source/.note
source/.test_output/
source/.note
source/.test_output/
source/all_i_need/**/*
source/.*/
source/all_i_need/**/*.txt
source/all_i_need/**/*.json
source/.all_i_need/export_uids.json
source/.all_i_need/export_uids.txt
source/.all_i_need/unfollow_mids_list_1.txt
source/.all_i_need/unfollow_mids_list_2.txt
source/.all_i_need/unfollow_mids_list_3.txt
source/.all_i_need/unfollow_mids_list.txt

View File

@@ -1 +0,0 @@
321583894,439478093,1044673687,1031543543,1481344732

View File

@@ -1 +0,0 @@
321583894,439478093,1044673687,1031543543,1481344732

View File

@@ -1 +0,0 @@
1044673687,1481344732,1858861103,444728505,23947287,35807625,111714204,1587138171,440798355,33291981,11914415,436700803,3493282273299102,612593877,2125857107,2000819931,507448807,505935166,14524124,385200931,1769820463,1562896062,3493285194632125,3493264275540254,479424216,604710494,1016523517,1428318343,700817047,543931674,1590538073,1574721168,432752294,3494376355400290,1795221360,4848323,495224316,3493258518858434,379247856,32360194,381653678,274928598,475656605,365212208,3546378525477862,35339643,1747335,1263990139,3493263038220393,251642119,387412319,1212367465,589747109,1025542770,23770618,3494350482836026,54091976,599449178,1715594148,3493127266503448,1767282898,487505057,630874464,1264711195,3537118481615036,319358609,518742534,385172962,4401694,474803476,525382468,3546595513600180,295993972,476819048,21435789,1725223092,2114928296,174471602,1480366563,17095888,295100453,1305776725,25694274,14797570,166828,385126080,3461582166166488,3537120815745590,489302782,73674032,1500074803,68134500,1047158092,3546571071293861,124806013,26055664,441631812,243680430,601300995,108526737,2100151539,3546603229023143,1749224369,3493133887211865,56300844,255139870,23244398,3493291869866324,3494354444355822,3546593938639500,1098004826,94577838,21849780,35105301,423319981,535023713,224560702,3546637651675315,3494361759221832,1640934198,1710911403,14342271,2031277323,603430640,3546568640694467,1741962246,1304346514,283389925,3461575868418125,3546622413768823,3494364269513335,185549749,502539494,73528331,510767506,3461579156752681,238171381,3546627212052911,448165099,1975692083,542824499,16243913,3494354016537425,316627722,1944667205,1433031509,3546387566299549,496787581,3546643550963789,382423121,600428973,430426421,325848853,735958,35162124,668794433,3546390949005555,478548163,3546672034482563,250584301,485234598,1555665460,6776617,108709998,437840703,28378491,67079745,1606682745,629101318,452161580,3493089637305282,374377163,213845897,323713206,272107494,622986240,1773278179,3546656899336980,67141499,318331,285027361,114366178,203983793,1283676771,1965933018,470624011,3546583482239276,3493281239402498,1475977561,2016676980,1209319826,1335124945,416206486,129860965,1780480185,1809567655,245645656,1937416537,1060544882,1335713025,3546617688886097,3546752326044595,3546613148551357,652060948,2116071253,97407861,3546908731639909,3546693165386233,278761367,323588182,486989780,3494353494345852,96609715,264869770,478849208,1679822121,19414347,3493127314737312,702915816,482867012,3546969421122388,3546590214097572,501642082,458165375,3546662484052067,481153145,1159873315,3546857594685834,1508100119,111900,1732848825,3546606469123022,106685726,490494088,1511660367

View File

@@ -1 +0,0 @@
1044673687,1481344732,1858861103,444728505,23947287,35807625,111714204,1587138171,440798355,33291981,11914415,436700803,3493282273299102,612593877,2125857107,2000819931,507448807,505935166,14524124,385200931,1769820463,1562896062,3493285194632125,3493264275540254,479424216,604710494,1016523517,1428318343,700817047,543931674,1590538073,1574721168,432752294,3494376355400290,1795221360,4848323,495224316,3493258518858434,379247856,32360194,381653678,274928598,475656605,365212208,3546378525477862,35339643,1747335,1263990139,3493263038220393,251642119,387412319,1212367465,589747109,1025542770,23770618,3494350482836026,54091976,599449178,1715594148,3493127266503448,1767282898,487505057,630874464,1264711195,3537118481615036,319358609,518742534,385172962,4401694,474803476,525382468,3546595513600180,295993972,476819048,21435789,1725223092,2114928296,174471602,1480366563,17095888,295100453,1305776725,25694274,14797570,166828,385126080,3461582166166488,3537120815745590,489302782,73674032,1500074803,68134500,1047158092,3546571071293861,124806013,26055664,441631812,243680430,601300995,108526737

View File

@@ -1 +0,0 @@
2100151539,3546603229023143,1749224369,3493133887211865,56300844,255139870,23244398,3493291869866324,3494354444355822,3546593938639500,1098004826,94577838,21849780,35105301,423319981,535023713,224560702,3546637651675315,3494361759221832,1640934198,1710911403,14342271,2031277323,603430640,3546568640694467,1741962246,1304346514,283389925,3461575868418125,3546622413768823,3494364269513335,185549749,502539494,73528331,510767506,3461579156752681,238171381,3546627212052911,448165099,1975692083,542824499,16243913,3494354016537425,316627722,1944667205,1433031509,3546387566299549,496787581,3546643550963789,382423121,600428973,430426421,325848853,735958,35162124,668794433,3546390949005555,478548163,3546672034482563,250584301,485234598,1555665460,6776617,108709998,437840703,28378491,67079745,1606682745,629101318,452161580,3493089637305282,374377163,213845897,323713206,272107494,622986240,1773278179,3546656899336980,67141499,318331,285027361,114366178,203983793,1283676771,1965933018,470624011,3546583482239276,3493281239402498,1475977561,2016676980,1209319826,1335124945,416206486,129860965,1780480185,1809567655,245645656,1937416537,1060544882,1335713025

View File

@@ -1 +0,0 @@
3546617688886097,3546752326044595,3546613148551357,652060948,2116071253,97407861,3546908731639909,3546693165386233,278761367,323588182,486989780,3494353494345852,96609715,264869770,478849208,1679822121,19414347,3493127314737312,702915816,482867012,3546969421122388,3546590214097572,501642082,458165375,3546662484052067,481153145,1159873315,3546857594685834,1508100119,111900,1732848825,3546606469123022,106685726,490494088,1511660367

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
[{"mid":1357612844,"name":"考研英语马天艺老师","tag":[]},{"mid":321583894,"name":"我是蓝同学啊","tag":["实力派"]},{"mid":439478093,"name":"中国食品报融媒体","tag":["纪录片","新闻"]},{"mid":1044673687,"name":"心理述","tag":[]},{"mid":62224043,"name":"栗之from一直夫妇","tag":[]},{"mid":1031543543,"name":"Java面试突击-Mic","tag":[]},{"mid":1481344732,"name":"我们都是社畜","tag":["准备取关"]},{"mid":475443398,"name":"黑毛羊驼","tag":["准备取关"]}]

View File

@@ -1 +0,0 @@
1357612844,321583894,439478093,1044673687,62224043

View File

@@ -1,690 +0,0 @@
#!/usr/bin/env python3
"""Fetch recent Bilibili video titles for UIDs and analyze with Volcengine API.
Input JSON format (list of objects):
[
{"mid": 12345, "name": "UP Name", "tag": ["准备取关"]}
]
"""
from __future__ import annotations
import argparse
import hashlib
import html
import json
import random
import re
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib import error, parse, request
BILIBILI_API = "https://api.bilibili.com/x/space/arc/search"
BILIBILI_WBI_API = "https://api.bilibili.com/x/space/wbi/arc/search"
BILIBILI_NAV_API = "https://api.bilibili.com/x/web-interface/nav"
# 可选如果仍频繁触发412可填浏览器里复制的Cookie字符串。
BILIBILI_COOKIE = "buvid3=5D02D792-070F-79D0-4243-4F75C6277EC022345infoc; b_nut=1765807422; _uuid=1796ECEE-451E-E1B7-1D9A-5D7F5CCCDA5822634infoc; buvid_fp=993faeece85f3e3119d8331a4e5bf683; buvid4=785EC013-0E2C-BC9F-5CBD-B8B00C76D13024715-025121522-ba1d0oh5R0Q47E2dVDisZg%3D%3D; SESSDATA=875331b4%2C1781359476%2C70459%2Ac1CjAXAQicR89csAHVVl-X8yAIy0-eko5ey69tJAyAXIbHhSU5HaUgth-E2fW1e9ij0MESVll2anVrYXVOYkc3VzZ2RmtFQlZzUnNoR0JOdUNZYldWSXh4Y3NZVlVWc1lOaC04M2JRQ3VKZ0x5b2RMbXl1MWpCSE1XMjd2UjVDTUJoUko1bU96aE9BIIEC; bili_jct=2e6b55fe6837ee753c69cd477c1b1ac6; DedeUserID=440102691; DedeUserID__ckMd5=42ab71f1395d8071; theme-tip-show=SHOWED; rpdid=|(u~RklkYm)u0J'u~Yl)|~YuR; hit-dyn-v2=1; theme-avatar-tip-show=SHOWED; LIVE_BUVID=AUTO5117758855687732; PVID=3; CURRENT_QUALITY=64; theme-switch-show=SHOWED; home_feed_column=4; browser_resolution=1359-871; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzcyODE5NjAsImlhdCI6MTc3NzAyMjcwMCwicGx0IjotMX0.euCIXefcvPlg1SwKKQh2HLfYStrTdG8dN-qnKCeUBFU; bili_ticket_expires=1777281900; sid=7beimq93; CURRENT_FNVAL=2000; bp_t_offset_440102691=1195139899255160832; b_lsid=52AAA640_19DC3A11696"
RUNTIME_BILIBILI_COOKIE = ""
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
MIXIN_KEY_ENC_TAB = [
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35,
27, 43, 5, 49, 33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13,
37, 48, 7, 16, 24, 55, 40, 61, 26, 17, 0, 1, 60, 51, 30, 4,
22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11, 36, 20, 34, 44, 52,
]
# 在这里直接填写火山引擎配置。
VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
@dataclass
class UpItem:
mid: int
name: str
tag: list[str]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="抓取 UP 前10个视频标题并调用火山引擎 API 生成分析报告"
)
parser.add_argument(
"--input",
default="./source/resources/export_uids.json",
help="输入 JSON 文件路径,默认: ./source/resources/export_uids.json",
)
parser.add_argument(
"--output",
default="./source/output/reports/up_titles_report.md",
help="输出 Markdown 报告路径,默认: ./source/output/reports/up_titles_report.md",
)
parser.add_argument(
"--titles-per-up",
type=int,
default=10,
help="每个 UP 抓取的视频标题数量,默认: 10",
)
parser.add_argument(
"--max-ups",
type=int,
default=0,
help="最多处理多少个 UP0 表示全部",
)
parser.add_argument(
"--only-tag",
default="",
help="只处理包含该标签的 UP例如: 准备取关;留空表示不过滤",
)
parser.add_argument(
"--sleep-seconds",
type=float,
default=0.8,
help="每个 UP 抓取后的等待秒数,默认: 0.8",
)
parser.add_argument(
"--retry-times",
type=int,
default=3,
help="抓取重试次数遇到412/-799时默认: 3",
)
parser.add_argument(
"--test-mid",
type=int,
default=0,
help="测试模式只抓取这个mid不读取输入文件",
)
parser.add_argument(
"--test-name",
default="TEST_UP",
help="测试模式下显示名称,默认: TEST_UP",
)
parser.add_argument(
"--skip-ai",
action="store_true",
help="只测试抓取不调用AI分析",
)
parser.add_argument(
"--debug",
action="store_true",
help="输出抓取调试信息",
)
parser.add_argument(
"--bili-cookie",
default="",
help="可选运行时传入B站Cookie优先级高于脚本内BILIBILI_COOKIE",
)
parser.add_argument(
"--fetch-mode",
choices=["auto", "api", "html"],
default="auto",
help="抓取模式: auto(先API后HTML)/api/html默认: auto",
)
parser.add_argument(
"--analyze-from-report",
default="",
help="从已有报告读取标题并仅执行AI分析例如: source/up_analysis_report.md",
)
parser.add_argument(
"--batch-size",
type=int,
default=30,
help="分批分析时每批数量,默认: 30",
)
parser.add_argument(
"--batch-index",
type=int,
default=1,
help="分批分析批次序号(从1开始),默认: 1",
)
return parser.parse_args()
def parse_report_items(report_path: Path) -> list[dict[str, Any]]:
lines = report_path.read_text(encoding="utf-8").splitlines()
items: list[dict[str, Any]] = []
current: dict[str, Any] | None = None
section = ""
for line in lines:
m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
if m:
if current is not None:
items.append(current)
current = {
"mid": int(m.group(2)),
"name": m.group(1).strip(),
"tag": [],
"url": f"https://space.bilibili.com/{int(m.group(2))}/video",
"titles": [],
"analysis": "",
"error": "",
}
section = ""
continue
if current is None:
continue
if line.startswith("- 主页: "):
current["url"] = line.replace("- 主页: ", "", 1).strip()
continue
if line.startswith("- 标签: "):
raw_tag = line.replace("- 标签: ", "", 1).strip()
current["tag"] = [] if raw_tag in ("", "") else [x.strip() for x in raw_tag.split(",") if x.strip()]
continue
if line == "### 最近10条标题":
section = "titles"
continue
if line == "### AI分析":
section = "analysis"
continue
if line == "### 异常":
section = "error"
continue
if line.startswith("### "):
section = ""
continue
if section == "titles" and line.startswith("- "):
t = line[2:].strip()
if t and t != "(未抓取到标题)":
current["titles"].append(t)
elif section == "analysis":
if line.strip():
if current["analysis"]:
current["analysis"] += "\n" + line.strip()
else:
current["analysis"] = line.strip()
elif section == "error" and line.startswith("- "):
current["error"] = line[2:].strip()
if current is not None:
items.append(current)
return items
def run_batch_analysis_from_report(args: argparse.Namespace, output_path: Path) -> int:
report_path = Path(args.analyze_from_report)
if not report_path.exists():
print(f"报告文件不存在: {report_path}", file=sys.stderr)
return 1
items = parse_report_items(report_path)
if not items:
print("报告中未解析到可分析条目", file=sys.stderr)
return 1
pending = [
it for it in items
if it.get("titles") and (not it.get("analysis") or it.get("analysis") == "测试模式已跳过AI分析")
]
if not pending:
print("报告中没有待分析条目(可能已全部分析完成)")
output_path.write_text(build_report(items), encoding="utf-8")
return 0
batch_size = max(args.batch_size, 1)
batch_index = max(args.batch_index, 1)
start = (batch_index - 1) * batch_size
end = start + batch_size
batch = pending[start:end]
if not batch:
print(f"批次为空: batch-index={batch_index}, batch-size={batch_size}, 待分析总数={len(pending)}")
output_path.write_text(build_report(items), encoding="utf-8")
return 0
print(
f"开始分批分析: 第{batch_index}批, 每批{batch_size}条, "
f"本批{len(batch)}条, 待分析总数{len(pending)}"
)
key_to_index = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
for idx, it in enumerate(batch, start=1):
print(f"[batch {idx}/{len(batch)}] AI分析: {it['name']} ({it['mid']})")
try:
analysis = analyze_titles(it["name"], it["url"], it["titles"])
origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
if origin_idx is not None:
items[origin_idx]["analysis"] = analysis
items[origin_idx]["error"] = ""
except Exception as exc: # noqa: BLE001
origin_idx = key_to_index.get(f"{it['mid']}::{it['name']}")
if origin_idx is not None:
items[origin_idx]["error"] = str(exc)
time.sleep(max(args.sleep_seconds, 0.0))
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(build_report(items), encoding="utf-8")
print(f"分批分析报告已生成: {output_path}")
return 0
def load_up_items(input_path: Path) -> list[UpItem]:
raw = json.loads(input_path.read_text(encoding="utf-8"))
if not isinstance(raw, list):
raise ValueError("输入 JSON 必须是数组")
items: list[UpItem] = []
for idx, obj in enumerate(raw):
if not isinstance(obj, dict):
raise ValueError(f"{idx + 1} 项不是对象")
mid = obj.get("mid")
name = obj.get("name", "")
tags = obj.get("tag", [])
if mid is None:
continue
try:
mid_int = int(mid)
except (TypeError, ValueError):
continue
if not isinstance(name, str):
name = str(name)
if not isinstance(tags, list):
tags = []
tags = [str(t) for t in tags]
items.append(UpItem(mid=mid_int, name=name.strip(), tag=tags))
return items
def http_get_json(
url: str,
timeout: float = 20.0,
referer: str = "https://space.bilibili.com/",
) -> dict[str, Any]:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Referer": referer,
"Origin": "https://www.bilibili.com",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
if cookie:
headers["Cookie"] = cookie
req = request.Request(url, headers=headers, method="GET")
with request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
return json.loads(body)
def http_get_text(
url: str,
timeout: float = 20.0,
referer: str = "https://space.bilibili.com/",
) -> str:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Referer": referer,
"Origin": "https://www.bilibili.com",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
cookie = RUNTIME_BILIBILI_COOKIE.strip() or BILIBILI_COOKIE.strip()
if cookie:
headers["Cookie"] = cookie
req = request.Request(url, headers=headers, method="GET")
with request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
def get_mixin_key(img_key: str, sub_key: str) -> str:
origin = img_key + sub_key
mixed = "".join(origin[i] for i in MIXIN_KEY_ENC_TAB)
return mixed[:32]
def build_wbi_params(base_params: dict[str, Any], mixin_key: str) -> dict[str, Any]:
params = {k: str(v) for k, v in base_params.items()}
params["wts"] = str(int(time.time()))
params = dict(sorted(params.items()))
filtered = {
k: re.sub(r"[!'()*]", "", v)
for k, v in params.items()
}
query = parse.urlencode(filtered)
w_rid = hashlib.md5((query + mixin_key).encode("utf-8")).hexdigest()
filtered["w_rid"] = w_rid
return filtered
def get_wbi_mixin_key() -> str:
data = http_get_json(BILIBILI_NAV_API, referer="https://www.bilibili.com/")
if data.get("code") != 0:
raise RuntimeError(
f"获取wbi密钥失败 code={data.get('code')}, message={data.get('message')}"
)
wbi_img = data.get("data", {}).get("wbi_img", {})
img_url = wbi_img.get("img_url", "")
sub_url = wbi_img.get("sub_url", "")
if not img_url or not sub_url:
raise RuntimeError("获取wbi密钥失败: nav接口缺少img_url/sub_url")
img_key = img_url.rsplit("/", 1)[-1].split(".")[0]
sub_key = sub_url.rsplit("/", 1)[-1].split(".")[0]
return get_mixin_key(img_key, sub_key)
def parse_titles_from_data(data: dict[str, Any]) -> list[str]:
vlist = data.get("data", {}).get("list", {}).get("vlist", [])
if not isinstance(vlist, list):
return []
titles: list[str] = []
for item in vlist:
if not isinstance(item, dict):
continue
title = item.get("title", "")
if isinstance(title, str) and title.strip():
titles.append(clean_html(title.strip()))
return titles
def fetch_titles_from_space_html(mid: int, titles_per_up: int, debug: bool = False) -> list[str]:
url = f"https://space.bilibili.com/{mid}/video"
html_text = http_get_text(url, referer="https://www.bilibili.com/")
# 页面中视频封面<img>常携带标题到alt字段优先从这里提取。
alt_candidates = re.findall(
r'<img[^>]*class="[^"]*b-img__inner[^"]*"[^>]*alt="([^"]+)"',
html_text,
flags=re.IGNORECASE,
)
titles: list[str] = []
seen: set[str] = set()
for raw in alt_candidates:
t = clean_html(html.unescape(raw)).strip()
if not t or t in seen:
continue
seen.add(t)
titles.append(t)
if len(titles) >= titles_per_up:
break
if debug:
print(f"[debug] HTML模式提取到 {len(titles)} 条标题")
return titles
def fetch_titles(
mid: int,
titles_per_up: int,
retry_times: int = 3,
debug: bool = False,
fetch_mode: str = "auto",
) -> list[str]:
base_params = {
"mid": str(mid),
"pn": "1",
"ps": str(titles_per_up),
"order": "pubdate",
"index": "1",
"jsonp": "json",
}
errors: list[str] = []
if fetch_mode in ("auto", "api"):
# 优先使用wbi接口稳定性通常更好。
mixin_key = ""
try:
mixin_key = get_wbi_mixin_key()
except Exception as exc: # noqa: BLE001
if debug:
print(f"[debug] 获取wbi密钥失败: {exc}")
for attempt in range(1, max(retry_times, 1) + 1):
try:
if mixin_key:
signed = build_wbi_params(base_params, mixin_key)
url = f"{BILIBILI_WBI_API}?{parse.urlencode(signed)}"
else:
url = f"{BILIBILI_API}?{parse.urlencode(base_params)}"
data = http_get_json(url, referer=f"https://space.bilibili.com/{mid}/video")
code = data.get("code", -1)
if code == 0:
titles = parse_titles_from_data(data)
if titles:
return titles
errors.append("接口返回成功但标题为空")
else:
errors.append(f"code={code}, message={data.get('message', 'unknown')} ")
except error.HTTPError as exc:
errors.append(f"HTTP {exc.code} {exc.reason}")
except Exception as exc: # noqa: BLE001
errors.append(str(exc))
sleep_for = min(12.0, (1.8 ** attempt) + random.uniform(0.2, 1.0))
if debug:
print(f"[debug] mid={mid} API第{attempt}次失败: {errors[-1]}{sleep_for:.1f}s后重试")
time.sleep(sleep_for)
if fetch_mode in ("auto", "html"):
try:
html_titles = fetch_titles_from_space_html(mid, titles_per_up, debug=debug)
if html_titles:
return html_titles
errors.append("HTML模式未提取到标题")
except Exception as exc: # noqa: BLE001
errors.append(f"HTML模式失败: {exc}")
joined = "; ".join(errors[-3:])
if ("412" in joined) or ("-799" in joined):
hint = "提示: 请在脚本里填写BILIBILI_COOKIE或运行时加 --bili-cookie \"SESSDATA=...; buvid3=...\""
raise RuntimeError(f"{joined}; {hint}")
raise RuntimeError(joined)
def clean_html(text: str) -> str:
return re.sub(r"<[^>]+>", "", text)
def call_volcengine_chat(system_prompt: str, user_prompt: str) -> str:
api_key = VOLCENGINE_API_KEY.strip()
base_url = VOLCENGINE_BASE_URL.strip()
model = VOLCENGINE_MODEL.strip()
if (not api_key) or ("在这里填" in api_key):
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
if (not model) or ("在这里填" in model):
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
if not base_url:
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
url = f"{base_url.rstrip('/')}/chat/completions"
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.4,
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request(
url,
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
with request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8", errors="replace")
result = json.loads(body)
content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
if not isinstance(content, str) or not content.strip():
raise RuntimeError(f"火山引擎返回结构异常: {body[:500]}")
return content.strip()
def analyze_titles(up_name: str, up_url: str, titles: list[str]) -> str:
system_prompt = (
"你是一个内容分析助手。根据视频标题判断UP主内容方向并给出是否建议取关。"
"输出必须是简体中文且严格按照用户给定的Markdown格式。"
)
joined_titles = "\n".join(f"- {t}" for t in titles)
user_prompt = f"""
请分析以下UP主最近视频标题
UP主{up_name}
主页:{up_url}
标题:
{joined_titles}
请按以下格式输出(不要增加其它段落):
1) 内容定位:一句话
2) 受众画像:一句话
3) 近期内容倾向2-3点使用-开头
4) 质量评价80-120字
5) 取关建议:保留关注/可以取关(二选一)
6) 建议理由50-100字
""".strip()
return call_volcengine_chat(system_prompt, user_prompt)
def build_report(results: list[dict[str, Any]]) -> str:
now = time.strftime("%Y-%m-%d %H:%M:%S")
lines: list[str] = []
lines.append("# UP主内容分析报告")
lines.append("")
lines.append(f"- 生成时间: {now}")
lines.append(f"- 分析数量: {len(results)}")
lines.append("")
for idx, item in enumerate(results, start=1):
lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
lines.append("")
lines.append(f"- 主页: {item['url']}")
tags = item.get("tag", [])
lines.append(f"- 标签: {', '.join(tags) if tags else ''}")
lines.append("")
lines.append("### 最近10条标题")
lines.append("")
titles = item.get("titles", [])
if titles:
for t in titles:
lines.append(f"- {t}")
else:
lines.append("- (未抓取到标题)")
lines.append("")
analysis = item.get("analysis", "")
if analysis:
lines.append("### AI分析")
lines.append("")
lines.append(analysis)
lines.append("")
error_msg = item.get("error", "")
if error_msg:
lines.append("### 异常")
lines.append("")
lines.append(f"- {error_msg}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def main() -> int:
global RUNTIME_BILIBILI_COOKIE
args = parse_args()
RUNTIME_BILIBILI_COOKIE = (args.bili_cookie or "").strip()
input_path = Path(args.input)
output_path = Path(args.output)
if args.analyze_from_report:
return run_batch_analysis_from_report(args, output_path)
if args.test_mid > 0:
items = [UpItem(mid=args.test_mid, name=args.test_name, tag=["测试模式"]) ]
print(f"测试模式: 仅处理 mid={args.test_mid}")
else:
if not input_path.exists():
print(f"输入文件不存在: {input_path}", file=sys.stderr)
return 1
try:
items = load_up_items(input_path)
except Exception as exc:
print(f"加载输入文件失败: {exc}", file=sys.stderr)
return 1
if args.only_tag:
items = [it for it in items if args.only_tag in it.tag]
if args.max_ups and args.max_ups > 0:
items = items[: args.max_ups]
if not items:
print("没有可处理的 UP 数据", file=sys.stderr)
return 1
print(f"开始处理 {len(items)} 个 UP...")
if args.skip_ai:
print("已启用 --skip-ai仅测试抓取标题")
if args.debug:
print(f"[debug] 当前抓取模式: {args.fetch_mode}")
results: list[dict[str, Any]] = []
for idx, item in enumerate(items, start=1):
up_url = f"https://space.bilibili.com/{item.mid}/video"
row: dict[str, Any] = {
"mid": item.mid,
"name": item.name or f"mid_{item.mid}",
"tag": item.tag,
"url": up_url,
"titles": [],
"analysis": "",
"error": "",
}
print(f"[{idx}/{len(items)}] 抓取: {row['name']} ({item.mid})")
try:
titles = fetch_titles(
item.mid,
args.titles_per_up,
retry_times=args.retry_times,
debug=args.debug,
fetch_mode=args.fetch_mode,
)
row["titles"] = titles
if not titles:
row["error"] = "未抓取到标题可能是接口限制或UP无公开视频"
elif args.skip_ai:
row["analysis"] = "测试模式已跳过AI分析"
else:
row["analysis"] = analyze_titles(row["name"], up_url, titles)
except error.HTTPError as exc:
row["error"] = f"HTTP错误: {exc.code} {exc.reason}"
except error.URLError as exc:
row["error"] = f"网络错误: {exc.reason}"
except Exception as exc: # noqa: BLE001
row["error"] = str(exc)
if args.debug and row["titles"]:
sample = row["titles"][: min(3, len(row["titles"]))]
print(f"[debug] mid={item.mid} 成功抓取 {len(row['titles'])} 条,样例: {sample}")
results.append(row)
time.sleep(max(args.sleep_seconds, 0))
report = build_report(results)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(f"报告已生成: {output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,598 +0,0 @@
#!/usr/bin/env python3
"""Batch AI summary from existing UP markdown report.
Read an existing report (e.g. source/up_analysis_report.md),
extract each UP's title list, and generate AI summaries in batches.
"""
from __future__ import annotations
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import math
import re
import sys
import time
from pathlib import Path
from typing import Any
from urllib import request
# Fill your Volcengine Ark settings here.
VOLCENGINE_API_KEY = "586d443c-5034-4810-9760-50ce77394e8a"
VOLCENGINE_MODEL = "deepseek-v3-1-terminus"
VOLCENGINE_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
SKIP_MARKERS = {
"",
"测试模式已跳过AI分析",
"(待分析)",
}
# 预设分组及关键词规则(可自行扩展)。
PRESET_GROUPS: dict[str, list[str]] = {
"AAA_核心每日必读":[
"编程", "算法", "工程", "干货", "新闻", "趋势",
],
"AA_编程信息干货必留": [
"编程", "算法", "工程", "教程", "实战", "课程", "新技术", "开源", "工具", "效率", "技术", "架构",
],
"A_硬核知识保留": [
"科普", "数学", "物理", "编程", "算法", "工程", "历史", "新闻", "深度",
],
"B_技能学习保留": [
"英语", "四六级", "考研", "面试", "教程", "实战", "学习", "课程", "写作",
],
"C_资讯快餐观察": [
"热点", "速览", "信息差", "快报", "盘点", "吐槽", "观点", "趋势",
],
"D_娱乐消遣可取关": [
"搞笑", "整活", "抽象", "乐子", "娱乐", "段子", "鬼畜", "日常", "情侣",
],
"E_营销带货谨慎": [
"好物", "测评", "种草", "直播", "带货", "优惠", "开箱", "广告", "激活",
],
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="基于现有报告分批做AI总结")
parser.add_argument(
"--input-report",
default="source/output/reports/1_up_titles_report.md",
help="已有标题报告路径",
)
parser.add_argument(
"--output-report",
default="source/output/reports/2_up_analysis_full_auto.md",
help="输出报告路径",
)
parser.add_argument(
"--batch-size",
type=int,
default=20,
help="每批处理数量,默认: 20",
)
parser.add_argument(
"--batch-index",
type=int,
default=1,
help="批次序号从1开始默认: 1",
)
parser.add_argument(
"--sleep-seconds",
type=float,
default=0.0,
help="提交任务间隔秒数,默认: 0并发模式建议0",
)
parser.add_argument(
"--workers",
type=int,
default=4,
help="并发请求数,默认: 4",
)
parser.add_argument(
"--max-retries",
type=int,
default=2,
help="单个UP分析最大重试次数默认: 2",
)
parser.add_argument(
"--request-timeout",
type=float,
default=60.0,
help="单次AI请求超时秒数默认: 60",
)
parser.add_argument(
"--force",
action="store_true",
help="强制覆盖已有AI分析默认只处理待分析项",
)
parser.add_argument(
"--debug",
action="store_true",
help="输出调试信息",
)
parser.add_argument(
"--config-from",
default="source/scripts/analyze_up_content.py",
help="自动读取API配置的脚本路径",
)
parser.add_argument(
"--run-all-batches",
action="store_true",
help="自动连续跑完所有批次忽略batch-index",
)
return parser.parse_args()
def load_api_config_from_script(path: Path) -> dict[str, str]:
if not path.exists():
return {}
text = path.read_text(encoding="utf-8", errors="replace")
result: dict[str, str] = {}
for key in ("VOLCENGINE_API_KEY", "VOLCENGINE_MODEL", "VOLCENGINE_BASE_URL"):
m = re.search(rf"^{key}\s*=\s*\"([^\"]*)\"", text, flags=re.MULTILINE)
if m:
result[key] = m.group(1).strip()
return result
def parse_report(path: Path) -> list[dict[str, Any]]:
lines = path.read_text(encoding="utf-8").splitlines()
items: list[dict[str, Any]] = []
current: dict[str, Any] | None = None
section = ""
for line in lines:
m = re.match(r"^##\s+\d+\.\s+(.*?)\s+\(mid:\s*(\d+)\)", line)
if m:
if current is not None:
items.append(current)
mid = int(m.group(2))
current = {
"mid": mid,
"name": m.group(1).strip(),
"tag": [],
"url": f"https://space.bilibili.com/{mid}/video",
"titles": [],
"analysis": "",
"group": "",
"action": "",
"reason": "",
"error": "",
}
section = ""
continue
if current is None:
continue
if line.startswith("- 主页: "):
current["url"] = line.replace("- 主页: ", "", 1).strip()
continue
if line.startswith("- 标签: "):
raw = line.replace("- 标签: ", "", 1).strip()
current["tag"] = [] if raw in ("", "") else [x.strip() for x in raw.split(",") if x.strip()]
continue
if line == "### 最近10条标题":
section = "titles"
continue
if line == "### AI分析":
section = "analysis"
continue
if line == "### 分组建议":
section = "group"
continue
if line == "### 异常":
section = "error"
continue
if line.startswith("### "):
section = ""
continue
if section == "titles" and line.startswith("- "):
text = line[2:].strip()
if text and text != "(未抓取到标题)":
current["titles"].append(text)
elif section == "analysis" and line.strip():
current["analysis"] = (current["analysis"] + "\n" + line.strip()).strip()
elif section == "group":
if line.startswith("- 预设分组: "):
current["group"] = line.replace("- 预设分组: ", "", 1).strip()
elif line.startswith("- 建议动作: "):
current["action"] = line.replace("- 建议动作: ", "", 1).strip()
elif line.startswith("- 判断依据: "):
current["reason"] = line.replace("- 判断依据: ", "", 1).strip()
elif line.strip() == "(待分组)":
current["group"] = ""
current["action"] = ""
current["reason"] = ""
elif section == "error" and line.startswith("- "):
current["error"] = line[2:].strip()
if current is not None:
items.append(current)
return items
def call_volcengine_chat(
system_prompt: str,
user_prompt: str,
cfg: dict[str, str],
timeout: float,
) -> str:
api_key = cfg.get("VOLCENGINE_API_KEY", "").strip()
model = cfg.get("VOLCENGINE_MODEL", "").strip()
base_url = cfg.get("VOLCENGINE_BASE_URL", "").strip()
if (not api_key) or ("在这里填" in api_key):
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_API_KEY")
if (not model) or ("在这里填" in model):
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_MODEL")
if not base_url:
raise RuntimeError("请先在脚本顶部填写 VOLCENGINE_BASE_URL")
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.4,
}
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = request.Request(
f"{base_url.rstrip('/')}/chat/completions",
data=body,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
with request.urlopen(req, timeout=timeout) as resp:
text = resp.read().decode("utf-8", errors="replace")
data = json.loads(text)
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
if not isinstance(content, str) or not content.strip():
raise RuntimeError(f"AI响应异常: {text[:500]}")
return content.strip()
def summarize_one_up(
name: str,
mid: int,
titles: list[str],
tags: list[str],
cfg: dict[str, str],
timeout: float,
) -> dict[str, str]:
system_prompt = (
"你是内容定位与订阅决策助手。"
"你必须输出合法JSON不要输出其它文本。"
)
joined_titles = "\n".join(f"- {t}" for t in titles)
joined_tags = "".join(tags) if tags else ""
rule_hint = heuristic_group_hint(titles, tags)
groups_desc = "\n".join(f"- {k}" for k in PRESET_GROUPS)
user_prompt = f"""
请基于以下信息完成分组与总结。
UP主: {name}
mid: {mid}
标签: {joined_tags}
最近标题:
{joined_titles}
预设分组:
{groups_desc}
代码规则初判:
{rule_hint}
要求:
1) 输出JSON对象字段严格为: summary, group, action, reason。
2) summary: 一段中文总结50-100字。
3) group: 必须从预设分组里选一个。给出详细的分组类别和命中分组中的规则词。
4) action: 只能是"保留关注""可以取关"。敏感一点只保留真正核心优质的up其他都建议取关。
5) reason: 30-60字解释为什么分到该组并给出该动作。
""".strip()
content = call_volcengine_chat(system_prompt, user_prompt, cfg, timeout=timeout)
return parse_ai_json(content)
def parse_ai_json(content: str) -> dict[str, str]:
text = content.strip()
if text.startswith("```"):
text = re.sub(r"^```[a-zA-Z]*\n?", "", text)
text = re.sub(r"\n?```$", "", text).strip()
m = re.search(r"\{.*\}", text, flags=re.DOTALL)
if m:
text = m.group(0)
data = json.loads(text)
summary = str(data.get("summary", "")).strip()
group = str(data.get("group", "")).strip()
action = str(data.get("action", "")).strip()
reason = str(data.get("reason", "")).strip()
if not summary:
raise RuntimeError("AI返回缺少summary")
if group not in PRESET_GROUPS:
raise RuntimeError(f"AI返回未知group: {group}")
if action not in ("保留关注", "可以取关"):
raise RuntimeError(f"AI返回未知action: {action}")
if not reason:
reason = "基于标题内容与更新风格综合判断。"
return {
"summary": summary,
"group": group,
"action": action,
"reason": reason,
}
def heuristic_group_hint(titles: list[str], tags: list[str]) -> str:
text = "\n".join(titles) + "\n" + " ".join(tags)
score: dict[str, int] = {k: 0 for k in PRESET_GROUPS}
lower_text = text.lower()
for group, words in PRESET_GROUPS.items():
for w in words:
w_lower = w.lower()
if w_lower in lower_text:
score[group] += 1
ranked = sorted(score.items(), key=lambda x: x[1], reverse=True)
best_group, best_score = ranked[0]
if best_score <= 0:
return "未命中关键词,倾向按内容专业度与稳定性判断。"
top3 = ", ".join(f"{g}:{s}" for g, s in ranked[:3])
return f"关键词命中最高组={best_group}score={best_score}),参考分布: {top3}"
def summarize_one_up_with_retry(
item: dict[str, Any],
cfg: dict[str, str],
max_retries: int,
timeout: float,
debug: bool,
) -> dict[str, str]:
last_exc: Exception | None = None
total_try = max(1, max_retries)
for attempt in range(1, total_try + 1):
try:
return summarize_one_up(
item["name"],
item["mid"],
item.get("titles", []),
item.get("tag", []),
cfg,
timeout=timeout,
)
except Exception as exc: # noqa: BLE001
last_exc = exc
if debug:
print(f"[debug] {item['name']}{attempt}次失败: {exc}")
if attempt < total_try:
time.sleep(min(2.0, 0.5 * attempt))
raise RuntimeError(str(last_exc) if last_exc else "未知错误")
def build_report(items: list[dict[str, Any]], batch_note: str) -> str:
now = time.strftime("%Y-%m-%d %H:%M:%S")
lines: list[str] = [
"# UP主内容分析报告分批AI总结",
"",
f"- 生成时间: {now}",
f"- 分析数量: {len(items)}",
f"- 处理说明: {batch_note}",
"",
]
group_stats: dict[str, int] = {k: 0 for k in PRESET_GROUPS}
action_stats: dict[str, int] = {"保留关注": 0, "可以取关": 0}
for item in items:
g = item.get("group", "")
a = item.get("action", "")
if g in group_stats:
group_stats[g] += 1
if a in action_stats:
action_stats[a] += 1
lines.append("## 分组统计")
lines.append("")
for g, c in group_stats.items():
lines.append(f"- {g}: {c}")
lines.append(f"- 保留关注: {action_stats['保留关注']}")
lines.append(f"- 可以取关: {action_stats['可以取关']}")
lines.append("")
for idx, item in enumerate(items, start=1):
lines.append(f"## {idx}. {item['name']} (mid: {item['mid']})")
lines.append("")
lines.append(f"- 主页: {item['url']}")
tags = item.get("tag", [])
lines.append(f"- 标签: {', '.join(tags) if tags else ''}")
lines.append("")
lines.append("### 最近10条标题")
lines.append("")
titles = item.get("titles", [])
if titles:
for t in titles:
lines.append(f"- {t}")
else:
lines.append("- (未抓取到标题)")
lines.append("")
lines.append("### AI分析")
lines.append("")
analysis = item.get("analysis", "")
lines.append(analysis if analysis else "(待分析)")
lines.append("")
lines.append("### 分组建议")
lines.append("")
group = item.get("group", "")
action = item.get("action", "")
reason = item.get("group_reason", "")
if group and action:
lines.append(f"- 预设分组: {group}")
lines.append(f"- 建议动作: {action}")
lines.append(f"- 判断依据: {reason if reason else '基于标题与更新风格综合判断。'}")
else:
lines.append("- (待分组)")
lines.append("")
error = item.get("error", "")
if error:
lines.append("### 异常")
lines.append("")
lines.append(f"- {error}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def main() -> int:
args = parse_args()
input_report = Path(args.input_report)
output_report = Path(args.output_report)
if not input_report.exists():
print(f"输入报告不存在: {input_report}", file=sys.stderr)
return 1
items = parse_report(input_report)
if not items:
print("输入报告未解析出任何UP条目", file=sys.stderr)
return 1
config = {
"VOLCENGINE_API_KEY": VOLCENGINE_API_KEY,
"VOLCENGINE_MODEL": VOLCENGINE_MODEL,
"VOLCENGINE_BASE_URL": VOLCENGINE_BASE_URL,
}
if ("在这里填" in config["VOLCENGINE_API_KEY"]) or ("在这里填" in config["VOLCENGINE_MODEL"]):
inherited = load_api_config_from_script(Path(args.config_from))
if inherited:
config.update(inherited)
if args.force:
pending = [it for it in items if it.get("titles")]
# else:
# pending = [
# it for it in items
# if it.get("titles") and it.get("analysis", "").strip() in SKIP_MARKERS
# ]
else:
pending = [
it for it in items
if it.get("titles") and (
it.get("analysis", "").strip() in SKIP_MARKERS
or not it.get("group") # 没有分组也要重跑
)
]
if not pending:
print("没有待分析条目,直接输出当前报告")
output_report.write_text(build_report(items, "无待分析条目"), encoding="utf-8")
return 0
index_map = {f"{it['mid']}::{it['name']}": idx for idx, it in enumerate(items)}
success_total = 0
failed_total = 0
batch_size = max(1, args.batch_size)
if args.run_all_batches:
total_batches = math.ceil(len(pending) / batch_size)
batch_indexes = list(range(1, total_batches + 1))
print(f"自动连续模式: 共{total_batches}批, 待分析总数{len(pending)}")
else:
batch_indexes = [max(1, args.batch_index)]
workers = max(1, args.workers)
print(f"并发配置: workers={workers}, retries={max(1, args.max_retries)}, timeout={args.request_timeout}s")
for batch_index in batch_indexes:
start = (batch_index - 1) * batch_size
end = start + batch_size
batch = pending[start:end]
if not batch:
continue
print(
f"开始分批AI总结: 第{batch_index}批, 每批{batch_size}条, "
f"本批{len(batch)}条, 待分析总数{len(pending)}"
)
success = 0
failed = 0
future_to_item: dict[Any, dict[str, Any]] = {}
with ThreadPoolExecutor(max_workers=workers) as executor:
for i, it in enumerate(batch, start=1):
print(f"[submit {i}/{len(batch)}] {it['name']} ({it['mid']})")
future = executor.submit(
summarize_one_up_with_retry,
it,
config,
max(1, args.max_retries),
float(args.request_timeout),
args.debug,
)
future_to_item[future] = it
if args.sleep_seconds > 0:
time.sleep(args.sleep_seconds)
done_count = 0
for future in as_completed(future_to_item):
done_count += 1
it = future_to_item[future]
idx = index_map.get(f"{it['mid']}::{it['name']}")
try:
ai_res = future.result()
if idx is not None:
items[idx]["analysis"] = ai_res["summary"]
items[idx]["group"] = ai_res["group"]
items[idx]["action"] = ai_res["action"]
items[idx]["group_reason"] = ai_res["reason"]
items[idx]["error"] = ""
success += 1
print(f"[done {done_count}/{len(batch)}] 成功: {it['name']} ({it['mid']})")
except Exception as exc: # noqa: BLE001
if idx is not None:
items[idx]["error"] = str(exc)
failed += 1
print(f"[done {done_count}/{len(batch)}] 失败: {it['name']} ({it['mid']})")
if args.debug:
print(f"[debug] 失败详情: {exc}")
success_total += success
failed_total += failed
step_note = (
f"{batch_index}批完成: 成功{success}, 失败{failed}, "
f"本批{len(batch)}, 待分析总数{len(pending)}"
)
output_report.parent.mkdir(parents=True, exist_ok=True)
output_report.write_text(build_report(items, step_note), encoding="utf-8")
print(f"{batch_index}批写入完成: {output_report}")
mode_text = "自动连续" if args.run_all_batches else "单批"
note = (
f"{mode_text}模式完成: 成功{success_total}, 失败{failed_total}, "
f"处理批次数={len(batch_indexes)}, 待分析总数={len(pending)}"
)
output_report.parent.mkdir(parents=True, exist_ok=True)
output_report.write_text(build_report(items, note), encoding="utf-8")
print(f"输出完成: {output_report}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,101 +0,0 @@
import argparse
import re
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="提取UP分组信息")
parser.add_argument(
"--input",
default="./source/19_53_no_titles.md",
help="输入报告路径",
)
parser.add_argument(
"--output",
help="输出报告路径(默认覆盖输入)",
)
return parser.parse_args()
def main():
args = parse_args()
input_file = args.input
output_file = args.output or input_file
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
section_starts = []
for i, line in enumerate(lines):
if line.startswith('## '):
section_starts.append(i)
if len(section_starts) < 2:
print('No sections found')
return 1
header = '\n'.join(lines[:section_starts[0]])
sections = []
for idx in range(len(section_starts)):
start = section_starts[idx]
end = section_starts[idx + 1] if idx + 1 < len(section_starts) else len(lines)
section = '\n'.join(lines[start:end])
sections.append(section)
sections = sections[1:]
parsed = []
for sec in sections:
match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec)
if match:
num = int(match.group(1))
name = match.group(2)
mid = match.group(3)
group_m = re.search(r'- 预设分组: (.+)', sec)
action_m = re.search(r'- 建议动作: (.+)', sec)
reason_m = re.search(r'- 判断依据: (.+)', sec)
error_m = re.search(r'AI返回未知group: (.+)', sec)
group = group_m.group(1).strip() if group_m else ""
action = action_m.group(1).strip() if action_m else ""
reason = reason_m.group(1).strip() if reason_m else ""
error = error_m.group(1).strip() if error_m else ""
parsed.append({
'num': num,
'name': name,
'mid': mid,
'group': group,
'action': action,
'reason': reason,
'error': error
})
parsed.sort(key=lambda x: (x['name'].casefold(), int(x['mid'])))
lines_out = [header, ""]
for p in parsed:
lines_out.append(f"## {p['num']}. {p['name']} (mid: {p['mid']})")
lines_out.append("")
if p['group']:
lines_out.append(f"- 预设分组: {p['group']}")
if p['action']:
lines_out.append(f"- 建议动作: {p['action']}")
if p['reason']:
lines_out.append(f"- 判断依据: {p['reason']}")
if p['error']:
lines_out.append(f"- 异常: {p['error']}")
lines_out.append("")
result = '\n'.join(lines_out)
result = re.sub(r'\n{3,}', '\n\n', result)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(result)
print(f'Extracted {len(parsed)} sections')
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,104 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import time
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="提取非取关UP的AI分析与分组建议")
parser.add_argument(
"--input-report",
default="source/output/reports/2_up_analysis_full_auto.md",
help="输入分析报告路径",
)
parser.add_argument(
"--output-report",
default="source/output/reports/3_up_keep_follow_only.md",
help="输出保留关注报告路径",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
src = Path(args.input_report)
dst = Path(args.output_report)
if not src.exists():
print(f"来源文件不存在: {src}")
return 1
text = src.read_text(encoding="utf-8")
pattern = r"^##\s+\d+\.\s+(.+?)\s+\(mid:\s*(\d+)\)\s*$"
matches = list(re.finditer(pattern, text, re.MULTILINE))
items: list[tuple[str, str, str, str, str, str]] = []
for i, m in enumerate(matches):
start = m.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
section = text[start:end]
name = m.group(1).strip()
mid = m.group(2).strip()
action_m = re.search(r"-\s*建议动作:\s*(.+)", section)
action = action_m.group(1).strip() if action_m else ""
# 反逻辑:没有"建议动作: 可以取关"就保留
if action == "可以取关":
continue
ai_m = re.search(r"###\s*AI分析\s*\n([\s\S]*?)(?=\n###\s|\Z)", section)
ai_text = ai_m.group(1).strip() if ai_m else ""
group_m = re.search(r"###\s*分组建议\s*\n([\s\S]*?)(?=\n###\s|\Z)", section)
group_text = group_m.group(1).strip() if group_m else ""
error_m = re.search(r"###\s*异常\s*\n([\s\S]*?)(?=\n###\s|\Z)", section)
error_text = error_m.group(1).strip() if error_m else ""
items.append((name, mid, ai_text, group_text, action, error_text))
# 按昵称首字母A-Z排序同名时按mid升序
items.sort(key=lambda x: (x[0].casefold(), int(x[1])))
lines = [
"# 保留关注UP主分析与分组建议",
"",
f"- 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
f"- 来源文件: {src.name}",
f"- 条目数: {len(items)}",
"",
]
for idx, (name, mid, ai_text, group_text, action, error_text) in enumerate(items, 1):
lines.append(f"## {idx}. {name} (mid: {mid})")
lines.append("")
lines.append("### AI分析")
lines.append("")
lines.append(ai_text if ai_text else "(无)")
lines.append("")
lines.append("### 分组建议")
lines.append("")
lines.append(group_text if group_text else f"- 建议动作: {action if action else '(无)'}")
lines.append("")
if error_text:
lines.append("### 异常")
lines.append("")
lines.append(error_text)
lines.append("")
dst.parent.mkdir(parents=True, exist_ok=True)
dst.write_text("\n".join(lines), encoding="utf-8")
print(f"已生成: {dst}")
print(f"保留条目: {len(items)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,174 +0,0 @@
#!/usr/bin/env python3
"""Extract UPs marked as "可以取关" and output their mids to CSV.
Read an UP analysis report and extract all UPs with action "可以取关",
then output their mids to a CSV file.
"""
from __future__ import annotations
import argparse
import csv
import re
import sys
from pathlib import Path
from typing import Any
def parse_report(report_path: Path) -> list[dict[str, Any]]:
"""解析Markdown格式的UP分析报告返回UP列表"""
if not report_path.exists():
return []
text = report_path.read_text(encoding="utf-8")
items = []
# 按UP项分割每个UP项以"## N. 名字 (mid: ...)"开头)
pattern = r"^## \d+\. (.+?)\s+\(mid:\s*(\d+)\)"
matches = list(re.finditer(pattern, text, re.MULTILINE))
for i, match in enumerate(matches):
start = match.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
section = text[start:end]
name = match.group(1).strip()
mid = int(match.group(2))
# 提取建议动作
action_match = re.search(r"- 建议动作: (.+?)(?:\n|$)", section)
action = action_match.group(1).strip() if action_match else ""
items.append({
"mid": mid,
"name": name,
"action": action,
})
return items
def main() -> int:
parser = argparse.ArgumentParser(description="从UP分析报告中提取可以取关的UP")
parser.add_argument(
"--input-report",
default="source/output/reports/2_up_analysis_full_auto.md",
help="输入报告路径",
)
parser.add_argument(
"--output-csv",
default="source/output/uids/4_unfollow_mids_list.txt",
help="输出文件路径",
)
parser.add_argument(
"--format",
choices=["csv", "mid-only", "json"],
default="mid-only",
help="输出格式csv(mid,name), mid-only(仅mid逗号分隔), json(JSON格式)",
)
parser.add_argument(
"--with-names",
action="store_true",
help="在mid后添加UP名称仅mid-only格式生效",
)
parser.add_argument(
"--split-size",
type=int,
default=0,
help="可选将mid-only结果按N个一组拆分多个文件例如100",
)
args = parser.parse_args()
input_report = Path(args.input_report)
output_csv = Path(args.output_csv)
if not input_report.exists():
print(f"错误: 输入报告不存在: {input_report}", file=sys.stderr)
return 1
print(f"读取报告: {input_report}")
items = parse_report(input_report)
if not items:
print("未能从报告中解析任何UP", file=sys.stderr)
return 1
# 筛选可以取关的UP
unfollow_items = [it for it in items if it.get("action") == "可以取关"]
print(f"总 UP 数: {len(items)}")
print(f"可以取关: {len(unfollow_items)}")
if not unfollow_items:
print("没有可以取关的UP")
return 0
# 输出格式
if args.format == "csv":
# 标准CSV格式mid, name
output_csv.parent.mkdir(parents=True, exist_ok=True)
with open(output_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["mid", "name"])
writer.writeheader()
for item in unfollow_items:
writer.writerow({"mid": item["mid"], "name": item["name"]})
print(f"\n✓ 已输出CSV格式到: {output_csv}")
print(f" 格式: mid,name")
print(f" 行数: {len(unfollow_items)}")
elif args.format == "mid-only":
# 仅mid逗号分隔
mids = [str(it["mid"]) for it in unfollow_items]
if args.with_names:
# mid:name 格式
content = ",".join([f"{it['mid']}:{it['name']}" for it in unfollow_items])
print(f"\n✓ 已输出mid:name列表到: {output_csv}")
print(f" 格式: mid1:name1,mid2:name2,...")
else:
# 仅mid
content = ",".join(mids)
print(f"\n✓ 已输出mid列表到: {output_csv}")
print(f" 格式: mid1,mid2,mid3,...")
output_csv.parent.mkdir(parents=True, exist_ok=True)
output_csv.write_text(content, encoding="utf-8")
print(f" 数量: {len(mids)}")
split_size = max(0, int(args.split_size))
if split_size > 0:
groups = [mids[i:i + split_size] for i in range(0, len(mids), split_size)]
stem = output_csv.stem
suffix = output_csv.suffix or ".txt"
for i, group in enumerate(groups, start=1):
part_path = output_csv.with_name(f"{stem}_{i}{suffix}")
part_path.write_text(",".join(group), encoding="utf-8")
print(f" 已按每组{split_size}个拆分为{len(groups)}个文件")
elif args.format == "json":
# JSON格式
import json
data = [{"mid": it["mid"], "name": it["name"]} for it in unfollow_items]
output_csv.parent.mkdir(parents=True, exist_ok=True)
output_csv.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\n✓ 已输出JSON格式到: {output_csv}")
print(f" 数量: {len(data)}")
# 显示前10个示例
if len(unfollow_items) > 0:
print(f"\n📋 示例前10个:")
for item in unfollow_items[:10]:
print(f" - {item['mid']}: {item['name']}")
if len(unfollow_items) > 10:
print(f" ... 还有 {len(unfollow_items) - 10}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,67 +0,0 @@
import argparse
import re
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="删除最近10条标题内容")
parser.add_argument(
"--input",
default="source/output/reports/2_up_analysis_full_auto.md",
help="输入报告路径",
)
parser.add_argument(
"--output",
help="输出报告路径(默认覆盖输入)",
)
return parser.parse_args()
def main():
args = parse_args()
input_file = args.input
output_file = args.output or input_file
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
new_lines = []
i = 0
while i < len(lines):
line = lines[i]
new_lines.append(line)
if line.startswith('## '):
i += 1
while i < len(lines):
curr = lines[i]
if curr.startswith('## '):
break
if curr.startswith('### '):
if '最近10条标题' in curr:
i += 1
while i < len(lines) and lines[i].startswith(' - '):
i += 1
continue
else:
break
if curr.startswith('- ') and not curr.startswith(' - '):
i += 1
continue
if curr.startswith(' - '):
i += 1
continue
new_lines.append(curr)
i += 1
else:
i += 1
result = '\n'.join(new_lines)
result = re.sub(r'\n{3,}', '\n\n', result)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(result)
print(f'Done: {output_file}')
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,208 +0,0 @@
#!/usr/bin/env python3
"""One-command pipeline: fetch titles -> batch analyze -> outputs.
Pipeline outputs:
1) source/output/reports/1_up_titles_report.md
2) source/output/reports/2_up_analysis_full_auto.md
3) source/output/reports/3_up_keep_follow_only.md
4) source/output/uids/4_unfollow_mids_list.txt (+ split files)
Pipeline steps:
1) 抓取视频标题 (analyze_up_content.py)
2) 分批AI分析 (batch_ai_summary_from_report.py)
3) 生成保留关注报告 (extract_keep_follow_doc.py)
4) 生成取关UID列表 (extract_unfollow_list.py)
5) 按首字母排序 (sort_up_main.py)
6) 提取分组信息 (extract_group_info.py)
"""
from __future__ import annotations
import argparse
import subprocess
import sys
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="一键运行完整功能链")
parser.add_argument(
"--input-json",
default="source/resources/export_uids.json",
help="UP资源文件路径默认: source/resources/export_uids.json",
)
parser.add_argument(
"--titles-report",
default="source/output/reports/1_up_titles_report.md",
help="标题抓取报告输出路径",
)
parser.add_argument(
"--analysis-report",
default="source/output/reports/2_up_analysis_full_auto.md",
help="分批分析报告输出路径",
)
parser.add_argument(
"--keep-report",
default="source/output/reports/3_up_keep_follow_only.md",
help="保留关注报告输出路径",
)
parser.add_argument(
"--unfollow-uids",
default="source/output/uids/4_unfollow_mids_list.txt",
help="取关UID输出路径",
)
parser.add_argument(
"--group_info",
default="source/output/uids/only_group_info.md",
help="分组信息输出路径",
)
parser.add_argument("--titles-per-up", type=int, default=10, help="每个UP抓取标题数量")
parser.add_argument("--batch-size", type=int, default=20, help="分批分析每批条数")
parser.add_argument("--workers", type=int, default=6, help="并发请求数")
parser.add_argument("--max-retries", type=int, default=2, help="单条分析重试次数")
parser.add_argument("--request-timeout", type=float, default=60.0, help="单次请求超时")
parser.add_argument("--split-size", type=int, default=100, help="取关UID拆分分组大小")
parser.add_argument("--sleep-seconds", type=float, default=0.0, help="任务间隔秒数")
parser.add_argument("--retry-times", type=int, default=3, help="抓取重试次数")
parser.add_argument("--fetch-mode", choices=["auto", "api", "html"], default="auto", help="标题抓取模式")
parser.add_argument("--only-tag", default="", help="可选仅处理包含该标签的UP")
parser.add_argument("--max-ups", type=int, default=0, help="可选限制处理UP数量")
parser.add_argument("--bili-cookie", default="", help="可选运行时传入B站Cookie")
parser.add_argument("--skip-fetch", action="store_true", help="跳过抓取阶段,直接使用已有标题报告")
parser.add_argument("--skip-analyze", action="store_true", help="跳过分析阶段,直接做产物提取")
parser.add_argument("--skip-sort", action="store_true", help="跳过排序阶段")
parser.add_argument("--skip-group", action="store_true", help="跳过提取分组阶段")
parser.add_argument("--python", default=sys.executable, help="指定Python解释器")
return parser.parse_args()
def run_cmd(cmd: list[str], title: str) -> None:
print(f"\n=== {title} ===")
print("$", " ".join(cmd))
subprocess.run(cmd, check=True)
def main() -> int:
args = parse_args()
for p in [
Path(args.titles_report).parent,
Path(args.analysis_report).parent,
Path(args.keep_report).parent,
Path(args.unfollow_uids).parent,
]:
p.mkdir(parents=True, exist_ok=True)
if not args.skip_fetch:
fetch_cmd = [
args.python,
"source/scripts/analyze_up_content.py",
"--input",
args.input_json,
"--output",
args.titles_report,
"--titles-per-up",
str(max(1, args.titles_per_up)),
"--retry-times",
str(max(1, args.retry_times)),
"--fetch-mode",
args.fetch_mode,
"--sleep-seconds",
str(max(0.0, args.sleep_seconds)),
"--skip-ai",
]
if args.only_tag:
fetch_cmd += ["--only-tag", args.only_tag]
if args.max_ups > 0:
fetch_cmd += ["--max-ups", str(args.max_ups)]
if args.bili_cookie:
fetch_cmd += ["--bili-cookie", args.bili_cookie]
run_cmd(fetch_cmd, "步骤1/6 抓取视频标题")
if not args.skip_analyze:
analyze_cmd = [
args.python,
"source/scripts/batch_ai_summary_from_report.py",
"--input-report",
args.titles_report,
"--output-report",
args.analysis_report,
"--batch-size",
str(max(1, args.batch_size)),
"--run-all-batches",
"--workers",
str(max(1, args.workers)),
"--max-retries",
str(max(1, args.max_retries)),
"--request-timeout",
str(max(1.0, args.request_timeout)),
"--sleep-seconds",
str(max(0.0, args.sleep_seconds)),
]
run_cmd(analyze_cmd, "步骤2/6 分批AI分析")
keep_cmd = [
args.python,
"source/scripts/extract_keep_follow_doc.py",
"--input-report",
args.analysis_report,
"--output-report",
args.keep_report,
]
run_cmd(keep_cmd, "步骤3/6 生成保留关注报告")
uid_cmd = [
args.python,
"source/scripts/extract_unfollow_list.py",
"--input-report",
args.analysis_report,
"--output-csv",
args.unfollow_uids,
"--format",
"mid-only",
"--split-size",
str(max(0, args.split_size)),
]
run_cmd(uid_cmd, "步骤4/6 生成取关UID列表")
sorted_report = "source/output/reports/5_sorted_up_analysis.md"
group_report = "source/output/reports/6_group_info.md"
if not args.skip_sort:
sort_cmd = [
args.python,
"source/scripts/sort_up_main.py",
"--input",
args.analysis_report,
"--output",
sorted_report,
]
run_cmd(sort_cmd, "步骤5/6 按首字母排序")
if not args.skip_group:
input_for_group = sorted_report if not args.skip_sort else args.analysis_report
group_cmd = [
args.python,
"source/scripts/extract_group_info.py",
"--input",
input_for_group,
"--output",
group_report,
]
run_cmd(group_cmd, "步骤6/6 提取分组信息")
print("\n流水线完成。")
print(f"- 1 标题报告: {args.titles_report}")
print(f"- 2 分析报告: {args.analysis_report}")
print(f"- 3 保留报告: {args.keep_report}")
print(f"- 4 取关UID: {args.unfollow_uids}")
if not args.skip_sort:
print(f"- 5 排序报告: {sorted_report}")
if not args.skip_group:
print(f"- 6 分组报告: {group_report}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,93 +0,0 @@
import argparse
import re
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="对UP主按首字母排序")
parser.add_argument(
"--input",
default="source/output/reports/2_up_analysis_full_auto.md",
help="输入报告路径",
)
parser.add_argument(
"--output",
help="输出报告路径(默认覆盖输入)",
)
return parser.parse_args()
def main():
args = parse_args()
input_file = args.input
output_file = args.output or input_file
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
header_lines = []
section_starts = []
for i, line in enumerate(lines):
if line.startswith('## '):
section_starts.append(i)
if len(section_starts) < 2:
print('No sections found')
return 1
header = '\n'.join(lines[:section_starts[0]])
sections_data = []
for idx in range(len(section_starts)):
start = section_starts[idx]
if idx + 1 < len(section_starts):
end = section_starts[idx + 1]
else:
end = len(lines)
section_lines = lines[start:end]
section_text = '\n'.join(section_lines)
sections_data.append(section_text)
sections_data = sections_data[1:]
parsed = []
for sec in sections_data:
match = re.match(r'^## (\d+)\. (.+) \(mid: (\d+)\)', sec)
if match:
num = int(match.group(1))
name = match.group(2)
mid = match.group(3)
parsed.append({
'num': num,
'name': name,
'mid': mid,
'content': sec
})
def sort_key(item):
name = item['name']
first_char = name[0].lower() if name else ''
if first_char.isdigit():
return '0' + first_char
elif first_char.isalpha():
return '1' + first_char
else:
return '2' + first_char
parsed.sort(key=sort_key)
new_content = header + '\n'
for i, sec in enumerate(parsed):
new_content += sec['content'] + '\n'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(new_content)
print(f'Sorted {len(parsed)} sections')
print('First 10:')
for s in parsed[:10]:
print(f' {s["name"]}')
return 0
if __name__ == "__main__":
raise SystemExit(main())