边写代码零食不停口 盼盼麦香鸡味块 、卡乐比(Calbee)薯条三兄弟 独立小包、好时kisses多口味巧克力糖、老金磨方【黑金系列】黑芝麻丸
边写代码边贴面膜 事业美丽两不误 DR. YS 野森博士+【AOUFSE/澳芙雪特证】377专研美白淡斑面膜组合 优惠劵
别光顾写代码更要多喝茶水,提神有营养 六安瓜片茶叶茶香二级200g 2025年新茶雨前盒装自己喝
让AI成为我们的得力助手:《用Cursor玩转AI辅助编程——不写代码也能做软件开发》
Python 图书推荐
书名 | 出版社 | 推荐 |
---|---|---|
Python编程 从入门到实践 第3版(图灵出品) | 人民邮电出版社 | ★★★★★ |
Python数据科学手册(第2版)(图灵出品) | 人民邮电出版社 | ★★★★★ |
图形引擎开发入门:基于Python语言 | 电子工业出版社 | ★★★★★ |
科研论文配图绘制指南 基于Python(异步图书出品) | 人民邮电出版社 | ★★★★★ |
Effective Python:编写好Python的90个有效方法(第2版 英文版) | 人民邮电出版社 | ★★★★★ |
Python人工智能与机器学习(套装全5册) | 清华大学出版社 | ★★★★★ |
JAVA 图书推荐
书名 | 出版社 | 推荐 |
---|---|---|
Java核心技术 第12版:卷Ⅰ+卷Ⅱ | 机械工业出版社 | ★★★★★ |
Java核心技术 第11版 套装共2册 | 机械工业出版社 | ★★★★★ |
Java语言程序设计基础篇+进阶篇 原书第12版 套装共2册 | 机械工业出版社 | ★★★★★ |
Java 11官方参考手册(第11版) | 清华大学出版社 | ★★★★★ |
Offer来了:Java面试核心知识点精讲(第2版)(博文视点出品) | 电子工业出版社 | ★★★★★ |
什么是 aiohttp?
aiohttp 是一个基于 Python asyncio 的异步 HTTP 客户端/服务器框架,专为高性能网络编程设计。它提供了:
- 异步 HTTP 客户端(类似异步版 requests)
- 异步 HTTP 服务器(类似异步版 Flask/Django)
- 完整的 WebSocket 支持
- 高效的连接池管理
核心优势
特性 | 描述 |
---|---|
异步非阻塞 | 单线程处理数千并发连接 |
高性能 | 远超同步框架(如 requests)的吞吐量 |
轻量级 | 简洁的API,无复杂依赖 |
全面协议支持 | HTTP/1.1, HTTP/2(客户端), WebSocket |
生态完善 | 良好文档和活跃社区 |
基础用法 - HTTP客户端
安装
pip install aiohttp
基本GET请求
import aiohttp
import asyncioasync def main():async with aiohttp.ClientSession() as session:async with session.get('https://api.example.com/data') as response:print("状态码:", response.status)print("响应内容:", await response.text())asyncio.run(main())
POST请求示例
async def post_example():async with aiohttp.ClientSession() as session:# 表单数据async with session.post('https://httpbin.org/post', data={'key': 'value'}) as response:print(await response.json())# JSON数据async with session.post('https://api.example.com/users',json={'name': 'Alice', 'age': 30}) as response:print(await response.json())
高级用法
并发请求
async def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as response:return await response.text()async def concurrent_requests():urls = ['https://api.example.com/item/1','https://api.example.com/item/2','https://api.example.com/item/3']tasks = [fetch(url) for url in urls]results = await asyncio.gather(*tasks)for url, content in zip(urls, results):print(f"{url}: {content[:50]}...")asyncio.run(concurrent_requests())
超时控制
async def timeout_example():timeout = aiohttp.ClientTimeout(total=5) # 5秒总超时async with aiohttp.ClientSession(timeout=timeout) as session:try:async with session.get('https://slow-api.example.com') as response:return await response.text()except asyncio.TimeoutError:print("请求超时!")
流式处理大响应
async def stream_response():async with aiohttp.ClientSession() as session:async with session.get('https://large-file.example.com') as response:with open('large_file.txt', 'wb') as f:async for chunk in response.content.iter_chunked(1024):f.write(chunk)print(f"已接收 {len(chunk)} 字节")
服务器端开发
基本HTTP服务器
from aiohttp import webasync def handle(request):name = request.match_info.get('name', "World")return web.Response(text=f"Hello, {name}!")app = web.Application()
app.add_routes([web.get('/', handle),web.get('/{name}', handle)
])if __name__ == '__main__':web.run_app(app, port=8080)
REST API示例
async def get_users(request):users = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]return web.json_response(users)async def create_user(request):data = await request.json()# 实际应用中这里会保存到数据库return web.json_response({'id': 3, **data}, status=201)app = web.Application()
app.add_routes([web.get('/api/users', get_users),web.post('/api/users', create_user)
])
WebSocket服务器
async def websocket_handler(request):ws = web.WebSocketResponse()await ws.prepare(request)async for msg in ws:if msg.type == aiohttp.WSMsgType.TEXT:if msg.data == 'close':await ws.close()else:await ws.send_str(f"ECHO: {msg.data}")elif msg.type == aiohttp.WSMsgType.ERROR:print('WebSocket连接异常关闭')return wsapp.add_routes([web.get('/ws', websocket_handler)])
进阶扩展
中间件示例
async def auth_middleware(app, handler):async def middleware(request):# 验证API密钥if request.headers.get('X-API-Key') != 'SECRET_KEY':return web.json_response({'error': 'Unauthorized'}, status=401)return await handler(request)return middlewareapp = web.Application(middlewares=[auth_middleware])
HTTP/2客户端支持
async def http2_request():conn = aiohttp.TCPConnector(force_close=True, enable_cleanup_closed=True)async with aiohttp.ClientSession(connector=conn) as session:async with session.get('https://http2.akamai.com/',headers={'accept': 'text/html'}) as response:print("HTTP版本:", response.version)print("内容:", await response.text()[:200])
性能优化配置
# 自定义连接器配置
connector = aiohttp.TCPConnector(limit=100, # 最大并发连接数limit_per_host=20, # 单主机最大连接数ssl=False, # 禁用SSL验证(仅用于测试)force_close=True # 避免连接延迟关闭
)# 自定义会话配置
session = aiohttp.ClientSession(connector=connector,timeout=aiohttp.ClientTimeout(total=30),headers={'User-Agent': 'MyApp/1.0'},cookie_jar=aiohttp.CookieJar(unsafe=True)
)
最佳实践
- 重用ClientSession:避免为每个请求创建新会话
- 使用连接池:合理配置TCPConnector参数
- 超时设置:总是配置合理的超时时间
- 资源清理:使用async with确保资源释放
- 错误处理:捕获并处理常见网络异常
try:async with session.get(url) as response:response.raise_for_status()return await response.json() except aiohttp.ClientError as e:print(f"请求错误: {e}")
完整示例
import aiohttp
import asyncio
from aiohttp import web# 客户端示例
async def fetch_data():async with aiohttp.ClientSession() as session:# 并发请求多个APIurls = ['https://jsonplaceholder.typicode.com/posts/1','https://jsonplaceholder.typicode.com/comments/1','https://jsonplaceholder.typicode.com/albums/1']tasks = []for url in urls:tasks.append(session.get(url))responses = await asyncio.gather(*tasks)results = []for response in responses:results.append(await response.json())return results# 服务器示例
async def handle_index(request):return web.Response(text="Welcome to aiohttp server!")async def handle_api(request):data = await fetch_data()return web.json_response(data)# 创建应用
app = web.Application()
app.add_routes([web.get('/', handle_index),web.get('/api', handle_api)
])# 启动服务器
async def start_server():runner = web.AppRunner(app)await runner.setup()site = web.TCPSite(runner, 'localhost', 8080)await site.start()print("Server running at http://localhost:8080")# 保持运行while True:await asyncio.sleep(3600) # 每小时唤醒一次if __name__ == '__main__':asyncio.run(start_server())
总结
aiohttp 是 Python 异步生态中处理 HTTP 通信的首选工具,它提供了:
- 高效客户端:用于高性能爬虫、API调用
- 轻量级服务器:构建高性能Web服务和API
- WebSocket支持:实现实时双向通信
- 连接池管理:优化资源利用率
通过合理利用 aiohttp 的异步特性,开发者可以轻松构建出能够处理数万并发连接的高性能网络应用,同时保持代码的简洁性和可维护性。