再复杂一些

not image

好的,下面是一个更复杂的 Python 代码示例,结合了面向对象编程、多线程、异常处理、装饰器、生成器、上下文管理器、日志记录等高级特性,并模拟了一个异步任务调度系统。这个系统还包含了一个基于优先级的任务队列和任务依赖管理。 ```python import threading import time import random from functools import wraps from datetime import datetime from queue import Queue, Empty, PriorityQueue import logging # 配置日志记录 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 自定义异常 class TaskError(Exception): pass # 装饰器:记录函数执行时间 def log_execution_time(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() logging.info(f"{func.__name__} executed in {end_time - start_time:.4f} seconds") return result return wrapper # 任务基类 class Task: def __init__(self, name, priority=1): self.name = name self.priority = priority self._result = None self._status = "pending" self.dependencies = [] def add_dependency(self, task): self.dependencies.append(task) def execute(self): raise NotImplementedError @property def result(self): if self._status != "completed": raise TaskError("Task not completed yet") return self._result @property def status(self): return self._status # 具体任务实现 class CalculationTask(Task): def __init__(self, name, data, priority=1): super().__init__(name, priority) self.data = data @log_execution_time def execute(self): self._status = "running" try: # 检查依赖任务是否完成 for dep in self.dependencies: if dep.status != "completed": raise TaskError(f"Dependency {dep.name} not completed") # 模拟复杂计算 time.sleep(random.uniform(0.1, 0.5)) self._result = sum(x ** 2 for x in self.data) self._status = "completed" except Exception as e: self._status = "failed" raise TaskError(f"Task failed: {str(e)}") # 任务生成器 def task_generator(n): for i in range(n): data = [random.randint(1, 100) for _ in range(random.randint(5, 10))] yield CalculationTask(f"Task-{i}", data, random.randint(1, 3)) # 任务调度器 class TaskScheduler: def __init__(self, max_workers=4): self.task_queue = PriorityQueue() self.workers = [] self.max_workers = max_workers self.lock = threading.Lock() self.results = [] def add_task(self, task): self.task_queue.put((task.priority, task)) def _worker(self): while True: try: priority, task = self.task_queue.get(timeout=1) logging.info(f"Processing {task.name} (priority: {task.priority}) at {datetime.now().strftime('%H:%M:%S')}") task.execute() with self.lock: self.results.append((task.name, task.result)) self.task_queue.task_done() except Empty: break except Exception as e: logging.error(f"Error processing task: {str(e)}") def run(self): # 启动工作线程 for _ in range(self.max_workers): worker = threading.Thread(target=self._worker) worker.daemon = True worker.start() self.workers.append(worker) # 等待所有任务完成 self.task_queue.join() # 返回结果 return self.results # 主程序 if __name__ == "__main__": # 创建调度器 scheduler = TaskScheduler(max_workers=3) # 添加任务及其依赖关系 tasks = list(task_generator(10)) for i in range(1, len(tasks)): tasks[i].add_dependency(tasks[i-1]) for task in tasks: scheduler.add_task(task) # 运行调度器 start_time = time.time() results = scheduler.run() duration = time.time() - start_time # 打印结果 print("\nTask Results:") for name, result in sorted(results, key=lambda x: int(x[0].split('-')[1])): print(f"{name}: {result}") print(f"\nTotal execution time: {duration:.2f} seconds") print(f"Tasks completed: {len(results)}") ``` 这个代码示例包含以下复杂特性: 1. **面向对象编程**:使用类来组织代码(Task, CalculationTask, TaskScheduler)。 2. **装饰器**:`log_execution_time` 用于记录函数执行时间。 3. **多线程**:使用 `threading` 模块实现并发任务处理。 4. **生成器**:`task_generator` 按需生成任务。 5. **异常处理**:自定义 `TaskError` 和异常处理机制。 6. **线程同步**:使用 `Lock` 保护共享资源。 7. **队列**:使用 `PriorityQueue` 实现基于优先级的任务队列。 8. **上下文管理**:`with` 语句管理锁资源。 9. **属性装饰器**:`@property` 定义只读属性。 10. **日志记录**:使用 `logging` 模块记录日志信息。 11. **任务依赖管理**:每个任务可以有依赖任务,只有当所有依赖任务完成后,该任务才会被执行。 12. **优先级队列**:任务根据优先级进行排序和处理。 这个程序模拟了一个复杂的异步任务调度系统,能够处理任务的优先级和依赖关系,并记录详细的日志信息。

评论区:

评论:

请输入内容: