Python异步编程深入:从协程到高性能并发
Python异步编程深入从协程到高性能并发引言异步编程是提高Python应用性能的关键技术之一。通过事件循环和协程我们可以在单线程中实现高并发处理。本文将深入探讨Python异步编程的核心概念包括协程、事件循环、任务管理和最佳实践。一、协程基础1.1 协程定义与创建import asyncio # 基本协程定义 async def hello(): print(Hello) await asyncio.sleep(1) print(World) # 运行协程 asyncio.run(hello()) # 协程作为函数 async def fetch_data(url): print(fFetching {url}) await asyncio.sleep(2) return {url: url, data: sample} async def main(): result await fetch_data(https://api.example.com) print(result) asyncio.run(main())1.2 协程的状态import asyncio async def my_coroutine(): await asyncio.sleep(1) return done # 获取协程对象 coro my_coroutine() print(f状态: {coro.send(None)}) # 启动协程 # 协程状态转换 # PENDING - RUNNING - DONE # - CANCELLED二、事件循环2.1 事件循环的作用import asyncio def main(): # 获取或创建事件循环 loop asyncio.get_event_loop() # 运行协程直到完成 loop.run_until_complete(hello()) # 关闭事件循环 loop.close() if __name__ __main__: main()2.2 自定义事件循环import asyncio import uvloop # 使用uvloop替代默认事件循环 uvloop.install() async def benchmark(): tasks [asyncio.sleep(0.1) for _ in range(1000)] await asyncio.gather(*tasks) # uvloop提供更好的性能 asyncio.run(benchmark())三、任务管理3.1 创建任务import asyncio async def task_function(name, delay): print(fTask {name} started) await asyncio.sleep(delay) print(fTask {name} completed) return fResult from {name} async def main(): # 创建任务 task1 asyncio.create_task(task_function(A, 2)) task2 asyncio.create_task(task_function(B, 1)) # 等待任务完成 result1 await task1 result2 await task2 print(fResults: {result1}, {result2}) asyncio.run(main())3.2 并发任务import asyncio async def fetch_url(url): print(fFetching {url}) await asyncio.sleep(1) return fData from {url} async def main(): urls [ https://api.example.com/1, https://api.example.com/2, https://api.example.com/3, ] # 并发执行所有任务 tasks [fetch_url(url) for url in urls] results await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())3.3 任务取消import asyncio async def long_running_task(): try: print(Starting long task) for i in range(10): await asyncio.sleep(0.5) print(fProgress: {i1}/10) return Completed except asyncio.CancelledError: print(Task was cancelled) raise async def main(): task asyncio.create_task(long_running_task()) # 1秒后取消任务 await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print(Main caught cancellation) asyncio.run(main())四、异步模式4.1 生产者-消费者模式import asyncio from collections import deque async def producer(queue): for i in range(5): await asyncio.sleep(1) item fItem {i} await queue.put(item) print(fProduced: {item}) async def consumer(queue): while True: item await queue.get() print(fConsumed: {item}) queue.task_done() async def main(): queue asyncio.Queue() # 创建任务 producer_task asyncio.create_task(producer(queue)) consumer_task asyncio.create_task(consumer(queue)) # 等待生产者完成 await producer_task # 等待队列清空 await queue.join() # 取消消费者 consumer_task.cancel() asyncio.run(main())4.2 超时处理import asyncio async def slow_operation(): await asyncio.sleep(5) return Done async def main(): try: # 设置超时 result await asyncio.wait_for(slow_operation(), timeout2) print(result) except asyncio.TimeoutError: print(Operation timed out) asyncio.run(main())4.3 信号量控制并发import asyncio async def limited_task(semaphore, task_id): async with semaphore: print(fTask {task_id} started) await asyncio.sleep(1) print(fTask {task_id} completed) async def main(): # 限制并发数为3 semaphore asyncio.Semaphore(3) tasks [limited_task(semaphore, i) for i in range(10)] await asyncio.gather(*tasks) asyncio.run(main())五、异步IO操作5.1 异步文件操作import asyncio async def read_file_async(file_path): async with asyncio.open(file_path, r) as f: contents await f.read() return contents async def write_file_async(file_path, content): async with asyncio.open(file_path, w) as f: await f.write(content) async def main(): content await read_file_async(input.txt) await write_file_async(output.txt, content) asyncio.run(main())5.2 异步HTTP客户端import httpx async def fetch_data(url): async with httpx.AsyncClient() as client: response await client.get(url) return response.json() async def main(): urls [ https://api.github.com/users/octocat, https://api.github.com/repos/python/cpython ] tasks [fetch_data(url) for url in urls] results await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())六、异步最佳实践6.1 避免阻塞调用import asyncio import time # 错误同步sleep阻塞事件循环 async def bad_example(): print(Start) time.sleep(1) # 阻塞 print(End) # 正确使用异步sleep async def good_example(): print(Start) await asyncio.sleep(1) # 非阻塞 print(End)6.2 使用asyncio.to_threadimport asyncio def blocking_io(): 阻塞IO操作 with open(large_file.txt, r) as f: return f.read() async def main(): # 将阻塞操作移到线程池 result await asyncio.to_thread(blocking_io) print(fRead {len(result)} characters) asyncio.run(main())6.3 异步上下文管理器import asyncio class AsyncResource: async def __aenter__(self): print(Acquiring resource) await asyncio.sleep(0.1) return self async def __aexit__(self, exc_type, exc, tb): print(Releasing resource) await asyncio.sleep(0.1) async def do_work(self): print(Doing work) async def main(): async with AsyncResource() as resource: await resource.do_work() asyncio.run(main())七、总结Python异步编程的核心概念协程轻量级的并发执行单元事件循环协调协程执行的核心任务协程的封装和管理并发模式生产者-消费者、信号量控制等在实际项目中建议使用asyncio进行异步编程避免在协程中调用阻塞函数使用httpx等异步库合理控制并发数量思考在你的项目中异步编程带来了哪些性能提升欢迎分享