clean_cpa.py
· 4.3 KiB · Python
Raw
import argparse
import requests
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.panel import Panel
from rich.table import Table
token = ''
base_url = 'http://127.0.0.1:8374'
check_list = {
'codex': {
'free': [
'usage_limit_reached',
'invalid_request_error',
'token_revoked',
'"status": 401',
"'status': 401",
'"status": 403',
"'status': 403",
"authorization token has been invalidated",
]
}
}
console = Console()
def get_all():
return requests.get(
url=base_url + "/v0/management/auth-files",
headers={
"Authorization": "Bearer " + token
},
timeout=60,
).json().get('files', [])
def clean(name: str):
return requests.delete(
url=base_url + "/v0/management/auth-files",
params={
'name': name,
},
headers={
"Authorization": "Bearer " + token
},
timeout=60,
).json()
def parse_args():
parser = argparse.ArgumentParser(description="清理命中规则的 auth-files")
parser.add_argument("--token", required=True, help="管理接口 Bearer Token")
parser.add_argument(
"--base_url",
default="http://127.0.0.1:8374",
help="管理接口地址,默认: http://127.0.0.1:8374",
)
parser.add_argument(
"-dry-run",
"--dry-run",
dest="dry_run",
action="store_true",
help="仅预览命中结果,不执行删除",
)
return parser.parse_args()
def main():
args = parse_args()
global token, base_url
token = args.token
base_url = args.base_url
mode_text = "(Dry Run,仅预览)" if args.dry_run else ""
console.print(Panel(f"[bold cyan]开始清理文件...[/bold cyan]", title="[bold green]清理工具[/bold green]"))
files = get_all()
total_files = len(files)
console.print(Panel(f"[bold cyan]开始扫描文件...[/bold cyan]{mode_text}\n共发现 [bold yellow]{total_files}[/bold yellow] 个文件", title="[bold green]清理工具[/bold green]"))
matched_files = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
) as progress:
task = progress.add_task("[cyan]扫描文件中...", total=total_files)
for file in files:
progress.update(task, description=f"[cyan]检查: {file.get('name', 'unknown')[:30]}")
if file.get('status', '') == 'active':
progress.advance(task)
continue
if file.get('provider', '') not in check_list:
progress.advance(task)
continue
keywords = check_list[file.get('provider', '')]
if file.get('id_token', {}).get('plan_type', '') not in keywords:
progress.advance(task)
continue
keywords = keywords[file.get('id_token', {}).get('plan_type', '')]
if 'status_message' not in file:
progress.advance(task)
continue
status_message = file.get('status_message', '')
if status_message == '':
progress.advance(task)
continue
for keyword in keywords:
if status_message.find(keyword) >= 0:
matched_files.append({
'name': file.get('name'),
'keyword': keyword,
'status_message': status_message
})
if not args.dry_run:
clean(file.get('name'))
progress.advance(task)
if matched_files:
table = Table(title="[bold red]命中删除规则的文件[/bold red]", show_lines=True)
table.add_column("[bold]序号[/bold]", style="cyan", justify="center", width=6)
table.add_column("[bold]文件名[/bold]", style="white", width=40)
table.add_column("[bold]命中规则[/bold]", style="yellow", width=25)
table.add_column("[bold]错误信息[/bold]", style="red", width=50)
for idx, item in enumerate(matched_files, 1):
table.add_row(
str(idx),
item['name'][:40] if len(item['name']) > 40 else item['name'],
item['keyword'],
item['status_message'][:50] + "..." if len(item['status_message']) > 50 else item['status_message']
)
console.print()
console.print(table)
console.print()
if args.dry_run:
console.print(Panel(f"[bold yellow]Dry Run: 共发现 {len(matched_files)} 个文件命中删除规则(未删除)[/bold yellow]", border_style="yellow"))
else:
console.print(Panel(f"[bold red]共发现 {len(matched_files)} 个文件命中删除规则[/bold red]", border_style="red"))
else:
console.print()
console.print(Panel("[bold green]✓ 没有发现命中删除规则的文件[/bold green]", border_style="green"))
if __name__ == "__main__":
main()
| 1 | import argparse |
| 2 | |
| 3 | import requests |
| 4 | from rich.console import Console |
| 5 | from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn |
| 6 | from rich.panel import Panel |
| 7 | from rich.table import Table |
| 8 | |
| 9 | token = '' |
| 10 | base_url = 'http://127.0.0.1:8374' |
| 11 | |
| 12 | check_list = { |
| 13 | 'codex': { |
| 14 | 'free': [ |
| 15 | 'usage_limit_reached', |
| 16 | 'invalid_request_error', |
| 17 | 'token_revoked', |
| 18 | '"status": 401', |
| 19 | "'status': 401", |
| 20 | '"status": 403', |
| 21 | "'status': 403", |
| 22 | "authorization token has been invalidated", |
| 23 | ] |
| 24 | } |
| 25 | } |
| 26 | |
| 27 | console = Console() |
| 28 | |
| 29 | |
| 30 | def get_all(): |
| 31 | return requests.get( |
| 32 | url=base_url + "/v0/management/auth-files", |
| 33 | headers={ |
| 34 | "Authorization": "Bearer " + token |
| 35 | }, |
| 36 | timeout=60, |
| 37 | ).json().get('files', []) |
| 38 | |
| 39 | |
| 40 | def clean(name: str): |
| 41 | return requests.delete( |
| 42 | url=base_url + "/v0/management/auth-files", |
| 43 | params={ |
| 44 | 'name': name, |
| 45 | }, |
| 46 | headers={ |
| 47 | "Authorization": "Bearer " + token |
| 48 | }, |
| 49 | timeout=60, |
| 50 | ).json() |
| 51 | |
| 52 | |
| 53 | def parse_args(): |
| 54 | parser = argparse.ArgumentParser(description="清理命中规则的 auth-files") |
| 55 | parser.add_argument("--token", required=True, help="管理接口 Bearer Token") |
| 56 | parser.add_argument( |
| 57 | "--base_url", |
| 58 | default="http://127.0.0.1:8374", |
| 59 | help="管理接口地址,默认: http://127.0.0.1:8374", |
| 60 | ) |
| 61 | parser.add_argument( |
| 62 | "-dry-run", |
| 63 | "--dry-run", |
| 64 | dest="dry_run", |
| 65 | action="store_true", |
| 66 | help="仅预览命中结果,不执行删除", |
| 67 | ) |
| 68 | return parser.parse_args() |
| 69 | |
| 70 | |
| 71 | def main(): |
| 72 | args = parse_args() |
| 73 | global token, base_url |
| 74 | token = args.token |
| 75 | base_url = args.base_url |
| 76 | |
| 77 | mode_text = "(Dry Run,仅预览)" if args.dry_run else "" |
| 78 | console.print(Panel(f"[bold cyan]开始清理文件...[/bold cyan]", title="[bold green]清理工具[/bold green]")) |
| 79 | files = get_all() |
| 80 | total_files = len(files) |
| 81 | |
| 82 | console.print(Panel(f"[bold cyan]开始扫描文件...[/bold cyan]{mode_text}\n共发现 [bold yellow]{total_files}[/bold yellow] 个文件", title="[bold green]清理工具[/bold green]")) |
| 83 | |
| 84 | matched_files = [] |
| 85 | |
| 86 | with Progress( |
| 87 | SpinnerColumn(), |
| 88 | TextColumn("[progress.description]{task.description}"), |
| 89 | BarColumn(), |
| 90 | TaskProgressColumn(), |
| 91 | console=console, |
| 92 | ) as progress: |
| 93 | task = progress.add_task("[cyan]扫描文件中...", total=total_files) |
| 94 | |
| 95 | for file in files: |
| 96 | progress.update(task, description=f"[cyan]检查: {file.get('name', 'unknown')[:30]}") |
| 97 | |
| 98 | if file.get('status', '') == 'active': |
| 99 | progress.advance(task) |
| 100 | continue |
| 101 | |
| 102 | if file.get('provider', '') not in check_list: |
| 103 | progress.advance(task) |
| 104 | continue |
| 105 | |
| 106 | keywords = check_list[file.get('provider', '')] |
| 107 | |
| 108 | if file.get('id_token', {}).get('plan_type', '') not in keywords: |
| 109 | progress.advance(task) |
| 110 | continue |
| 111 | |
| 112 | keywords = keywords[file.get('id_token', {}).get('plan_type', '')] |
| 113 | |
| 114 | if 'status_message' not in file: |
| 115 | progress.advance(task) |
| 116 | continue |
| 117 | |
| 118 | status_message = file.get('status_message', '') |
| 119 | if status_message == '': |
| 120 | progress.advance(task) |
| 121 | continue |
| 122 | |
| 123 | for keyword in keywords: |
| 124 | if status_message.find(keyword) >= 0: |
| 125 | matched_files.append({ |
| 126 | 'name': file.get('name'), |
| 127 | 'keyword': keyword, |
| 128 | 'status_message': status_message |
| 129 | }) |
| 130 | if not args.dry_run: |
| 131 | clean(file.get('name')) |
| 132 | |
| 133 | progress.advance(task) |
| 134 | |
| 135 | if matched_files: |
| 136 | table = Table(title="[bold red]命中删除规则的文件[/bold red]", show_lines=True) |
| 137 | table.add_column("[bold]序号[/bold]", style="cyan", justify="center", width=6) |
| 138 | table.add_column("[bold]文件名[/bold]", style="white", width=40) |
| 139 | table.add_column("[bold]命中规则[/bold]", style="yellow", width=25) |
| 140 | table.add_column("[bold]错误信息[/bold]", style="red", width=50) |
| 141 | |
| 142 | for idx, item in enumerate(matched_files, 1): |
| 143 | table.add_row( |
| 144 | str(idx), |
| 145 | item['name'][:40] if len(item['name']) > 40 else item['name'], |
| 146 | item['keyword'], |
| 147 | item['status_message'][:50] + "..." if len(item['status_message']) > 50 else item['status_message'] |
| 148 | ) |
| 149 | |
| 150 | console.print() |
| 151 | console.print(table) |
| 152 | console.print() |
| 153 | if args.dry_run: |
| 154 | console.print(Panel(f"[bold yellow]Dry Run: 共发现 {len(matched_files)} 个文件命中删除规则(未删除)[/bold yellow]", border_style="yellow")) |
| 155 | else: |
| 156 | console.print(Panel(f"[bold red]共发现 {len(matched_files)} 个文件命中删除规则[/bold red]", border_style="red")) |
| 157 | else: |
| 158 | console.print() |
| 159 | console.print(Panel("[bold green]✓ 没有发现命中删除规则的文件[/bold green]", border_style="green")) |
| 160 | |
| 161 | |
| 162 | if __name__ == "__main__": |
| 163 | main() |
| 164 |