id} exceeded max retries") class WorkerNode: def __init__(self, node_id: str): self.node_id = node_id self.active_connections = 0 self.task_queue = asyncio.Queue() self.session = aiohttp.ClientSession() async def execute_task(self, task: Task): self.active_connections += 1 try: # 模拟耗时操作 await asyncio.sleep(0.5) # 调用外部API async with self.session.post( 'https://api.example.com/process', json=task.payload ) as response: result = await response.json() logging.info(f"Task {task.id} completed: {result}") finally: self.active_connections -= 1 # -------------------------- # API 服务 # -------------------------- app = FastAPI() scheduler = TaskScheduler() @app.post("/tasks") async def create_task(task: Task): await scheduler.add_task(task) return {"message": "Task queued", "task_id": task.id} @app.get("/tasks/{task_id}") async def get_task_status(task_id: str): # 从Redis获取最新状态 result = scheduler.redis.get(f"task:{task_id}") return json.loads(result) if result else {"error": "Not found"} # -------------------------- # 监控系统 # -------------------------- class Monitor: def __init__(self): self.metrics = { 'tasks_processed': 0, 'tasks_failed': 0, 'worker_stats': {} } def update_metrics(self, task: Task): self.metrics['tasks_processed'] += 1 if task.status == "failed": self.metrics['tasks_failed'] += 1 def get_prometheus_metrics(self): return f""" http_requests_total {self.metrics['tasks_processed']} http_errors_total {self.metrics['tasks_failed']} """ # -------------------------- # 启动系统 # -------------------------- async def main(): # 初始化工作节点 workers = [WorkerNode(f"worker-{i}") for i in range(3)] scheduler.workers = workers # 启动任务分发 asyncio.create_task(scheduler.dispatch_tasks()) # 启动监控 monitor = Monitor() # 启动FastAPI import uvicorn config = uvicorn.Config(app, host="0.0.0.0", port=8000) server = uvicorn.Server(config) await server.serve() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main()) ``` --- ### 系统功能说明 1. **优先级队列**:使用堆结构实现的任务优先级队列 2. **负载均衡**:支持多种策略(轮询/最少连接) 3. **容错机制**:自动重试失败任务(最多3次) 4. **实时监控**:通过Redis Pub/Sub实现状态更新 5. **异步处理**:使用asyncio实现高性能I/O 6. **REST API**:提供任务提交和状态查询接口 7. **工作节点**:模拟分布式工作节点处理任务 --- ### 运行方式 1. 安装依赖: ```bash pip install fastapi uvicorn redis aiohttp pydantic ``` 2. 启动Redis服务: ```bash docker run -p 6379:6379 redis ``` 3. 运行程序: ```bash python main.py ``` 4. 测试API: ```bash curl -X POST -H "Content-Type: application/json" -d '{ "id": "task1", "priority": 5, "payload": {"data": "test
写一个复杂的代码
- 作者:China-Hubei-Ezhou
- 日期:2025年3月11日 17:20
- 浏览:16
评论区: