import argparse import requests from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn from rich.panel import Panel from rich.table import Table token = '' base_url = 'http://127.0.0.1:8374' check_list = { 'codex': { 'free': [ 'usage_limit_reached', 'invalid_request_error', 'token_revoked', '"status": 401', "'status': 401", '"status": 403', "'status': 403", "authorization token has been invalidated", ] } } console = Console() def get_all(): return requests.get( url=base_url + "/v0/management/auth-files", headers={ "Authorization": "Bearer " + token }, timeout=60, ).json().get('files', []) def clean(name: str): return requests.delete( url=base_url + "/v0/management/auth-files", params={ 'name': name, }, headers={ "Authorization": "Bearer " + token }, timeout=60, ).json() def parse_args(): parser = argparse.ArgumentParser(description="清理命中规则的 auth-files") parser.add_argument("--token", required=True, help="管理接口 Bearer Token") parser.add_argument( "--base_url", default="http://127.0.0.1:8374", help="管理接口地址,默认: http://127.0.0.1:8374", ) parser.add_argument( "-dry-run", "--dry-run", dest="dry_run", action="store_true", help="仅预览命中结果,不执行删除", ) return parser.parse_args() def main(): args = parse_args() global token, base_url token = args.token base_url = args.base_url mode_text = "(Dry Run,仅预览)" if args.dry_run else "" console.print(Panel(f"[bold cyan]开始清理文件...[/bold cyan]", title="[bold green]清理工具[/bold green]")) files = get_all() total_files = len(files) console.print(Panel(f"[bold cyan]开始扫描文件...[/bold cyan]{mode_text}\n共发现 [bold yellow]{total_files}[/bold yellow] 个文件", title="[bold green]清理工具[/bold green]")) matched_files = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console, ) as progress: task = progress.add_task("[cyan]扫描文件中...", total=total_files) for file in files: progress.update(task, description=f"[cyan]检查: {file.get('name', 'unknown')[:30]}") if file.get('status', '') == 'active': progress.advance(task) continue if file.get('provider', '') not in check_list: progress.advance(task) continue keywords = check_list[file.get('provider', '')] if file.get('id_token', {}).get('plan_type', '') not in keywords: progress.advance(task) continue keywords = keywords[file.get('id_token', {}).get('plan_type', '')] if 'status_message' not in file: progress.advance(task) continue status_message = file.get('status_message', '') if status_message == '': progress.advance(task) continue for keyword in keywords: if status_message.find(keyword) >= 0: matched_files.append({ 'name': file.get('name'), 'keyword': keyword, 'status_message': status_message }) if not args.dry_run: clean(file.get('name')) progress.advance(task) if matched_files: table = Table(title="[bold red]命中删除规则的文件[/bold red]", show_lines=True) table.add_column("[bold]序号[/bold]", style="cyan", justify="center", width=6) table.add_column("[bold]文件名[/bold]", style="white", width=40) table.add_column("[bold]命中规则[/bold]", style="yellow", width=25) table.add_column("[bold]错误信息[/bold]", style="red", width=50) for idx, item in enumerate(matched_files, 1): table.add_row( str(idx), item['name'][:40] if len(item['name']) > 40 else item['name'], item['keyword'], item['status_message'][:50] + "..." if len(item['status_message']) > 50 else item['status_message'] ) console.print() console.print(table) console.print() if args.dry_run: console.print(Panel(f"[bold yellow]Dry Run: 共发现 {len(matched_files)} 个文件命中删除规则(未删除)[/bold yellow]", border_style="yellow")) else: console.print(Panel(f"[bold red]共发现 {len(matched_files)} 个文件命中删除规则[/bold red]", border_style="red")) else: console.print() console.print(Panel("[bold green]✓ 没有发现命中删除规则的文件[/bold green]", border_style="green")) if __name__ == "__main__": main()