以下是一个更复杂的 Python 代码示例,结合了面向对象编程、多线程、异常处理、装饰器、生成器等高级特性,并模拟了一个简单的异步任务调度系统: ```python import threading import time import random from functools import wraps from datetime import datetime from queue import Queue, Empty # 装饰器:记录函数执行时间 def log_execution_time(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() print(f"{func.__name__} executed in {end_time - start_time:.4f} seconds") return result return wrapper # 自定义异常 class TaskError(Exception): pass # 任务基类 class Task: def __init__(self, name, priority=1): self.name = name self.priority = priority self._result = None self._status = "pending" def execute(self): raise NotImplementedError @property def result(self): if self._status != "completed": raise TaskError("Task not completed yet") return self._result @property def status(self): return self._status # 具体任务实现 class CalculationTask(Task): def __init__(self, name, data, priority=1): super().__init__(name, priority) self.data = data @log_execution_time def execute(self): self._status = "running" try: # 模拟复杂计算 time.sleep(random.uniform(0.1, 0.5)) self._result = sum(x ** 2 for x in self.data) self._status = "completed" except Exception as e: self._status = "failed" raise TaskError(f"Task failed: {str(e)}") # 任务生成器 def task_generator(n): for i in range(n): data = [random.randint(1, 100) for _ in range(random.randint(5, 10))] yield CalculationTask(f"Task-{i}", data, random.randint(1, 3)) # 任务调度器 class TaskScheduler: def __init__(self, max_workers=4): self.task_queue = Queue() self.workers = [] self.max_workers = max_workers self.lock = threading.Lock() self.results = [] def add_task(self, task): self.task_queue.put(task) def _worker(self): while True: try: task = self.task_queue.get(timeout=1) print(f"Processing {task.name} (priority: {task.priority}) at {datetime.now().strftime('%H:%M:%S')}") task.execute() with self.lock: self.results.append((task.name, task.result)) self.task_queue.task_done() except Empty: break except Exception as e: print(f"Error processing task: {str(e)}") def run(self): # 启动工作线程 for _ in range(self.max_workers): worker = threading.Thread(target=self._worker) worker.daemon = True worker.start() self.workers.append(worker) # 等待所有任务完成 self.task_queue.join() # 返回结果 return self.results # 主程序 if __name__ == "__main__": # 创建调度器 scheduler = TaskScheduler(max_workers=3) # 添加任务 for task in task_generator(10): scheduler.add_task(task) # 运行调度器 start_time = time.time() results = scheduler.run() duration = time.time() - start_time # 打印结果 print("\nTask Results:") for name, result in sorted(results, key=lambda x: int(x[0].split('-')[1])): print(f"{name}: {result}") print(f"\nTotal execution time: {duration:.2f} seconds") print(f"Tasks completed: {len(results)}") ``` 这个代码示例包含以下复杂特性: 1. 面向对象编程:使用类来组织代码(Task, CalculationTask, TaskScheduler) 2. 装饰器:`log_execution_time` 用于记录函数执行时间 3. 多线程:使用 `threading` 模块实现并发任务处理 4. 生成器:`task_generator` 按需生成任务 5. 异常处理:自定义 `TaskError` 和异常处理机制 6. 线程同步:使用 `Lock` 保护共享资源 7. 队列:使用 `Queue` 实现任务调度 8. 上下文管理:`with` 语句管理锁资源 9. 属性装饰器:`@property` 定义只读属性 这个程序模拟了一个任务调度系统,可以并发执行多个计算任务,记录执行时间,并处理各种异常情况。
在复杂一些
- 作者:China-Hubei-Wuhan
- 日期:2025年3月30日 22:36
- 浏览:10
评论区: