Python异步爬虫编程技巧:从入门到高级实战指南

Python异步爬虫编程技巧:从入门到高级实战指南 🚀

📚 目录

  1. 前言:为什么要学异步爬虫
  2. 异步编程基础概念
  3. 异步爬虫核心技术栈
  4. 入门实战:第一个异步爬虫
  5. 进阶技巧:并发控制与资源管理
  6. 高级实战:分布式异步爬虫架构
  7. 反爬虫对抗策略
  8. 性能优化与监控
  9. 常见问题与解决方案
  10. 最佳实践总结

前言:为什么要学异步爬虫? 🤔

在我近几年的爬虫开发经验中,见证了从最初的单线程顺序爬取,到多线程并发,再到如今的异步编程范式的演进。记得在做新闻数据采集的时候,传统的同步爬虫需要2小时才能完成10万个新闻的数据采集,而改用异步方案后,同样的任务仅需15分钟就能完成。

异步爬虫的核心优势在于:

  • 高并发能力:单进程可处理数千个并发请求
  • 资源利用率高:避免了线程切换开销
  • 响应速度快:非阻塞IO让程序更高效
  • 扩展性强:更容易构建大规模爬虫系统

异步编程基础概念 📖

什么是异步编程?

异步编程是一种编程范式,允许程序在等待某个操作完成时继续执行其他任务,而不是阻塞等待。在爬虫场景中,当我们发送HTTP请求时,不需要傻等服务器响应,而是可以继续发送其他请求。

核心概念解析

协程(Coroutine):可以暂停和恢复执行的函数,是异步编程的基本单元。

事件循环(Event Loop):负责调度和执行协程的核心机制。

awaitable对象:可以被await关键字等待的对象,包括协程、Task和Future。

异步爬虫核心技术栈 🛠️

在实际项目中,我通常会使用以下技术组合:

组件推荐库用途
HTTP客户端aiohttp异步HTTP请求
HTML解析BeautifulSoup4页面内容提取
并发控制asyncio.Semaphore限制并发数量
数据存储aiofiles异步文件操作
代理管理aiohttp-proxy代理池管理

入门实战:第一个异步爬虫 🎯

让我们从一个简单但实用的例子开始,爬取多个网页的标题:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeclass AsyncSpider:def __init__(self, max_concurrent=10):"""初始化异步爬虫:param max_concurrent: 最大并发数"""self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)async def __aenter__(self):"""异步上下文管理器入口"""# 创建aiohttp会话,设置连接池和超时参数connector = aiohttp.TCPConnector(limit=100,  # 总连接池大小limit_per_host=20,  # 单个host的连接数限制ttl_dns_cache=300,  # DNS缓存时间)timeout = aiohttp.ClientTimeout(total=30, connect=10)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""异步上下文管理器退出"""if self.session:await self.session.close()async def fetch_page(self, url):"""获取单个页面内容:param url: 目标URL:return: 页面标题和URL的元组"""async with self.semaphore:  # 控制并发数try:async with self.session.get(url) as response:if response.status == 200:html = await response.text()soup = BeautifulSoup(html, 'html.parser')title = soup.find('title')title_text = title.get_text().strip() if title else "无标题"print(f"✅ 成功获取: {url} - {title_text}")return url, title_textelse:print(f"❌ HTTP错误 {response.status}: {url}")return url, Noneexcept asyncio.TimeoutError:print(f"⏰ 超时: {url}")return url, Noneexcept Exception as e:print(f"🚫 异常: {url} - {str(e)}")return url, Noneasync def crawl_urls(self, urls):"""批量爬取URL列表:param urls: URL列表:return: 结果列表"""print(f"🚀 开始爬取 {len(urls)} 个URL,最大并发数: {self.max_concurrent}")start_time = time.time()# 创建所有任务tasks = [self.fetch_page(url) for url in urls]# 并发执行所有任务results = await asyncio.gather(*tasks, return_exceptions=True)end_time = time.time()print(f"🎉 爬取完成,耗时: {end_time - start_time:.2f}秒")# 过滤掉异常结果valid_results = [r for r in results if isinstance(r, tuple) and r[1] is not None]print(f"📊 成功率: {len(valid_results)}/{len(urls)} ({len(valid_results)/len(urls)*100:.1f}%)")return valid_results# 使用示例
async def main():# 测试URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2', 'https://httpbin.org/delay/1','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json',]# 使用异步上下文管理器确保资源正确释放async with AsyncSpider(max_concurrent=5) as spider:results = await spider.crawl_urls(urls)# 输出结果print("\n📋 爬取结果:")for url, title in results:print(f"  {url} -> {title}")if __name__ == "__main__":asyncio.run(main())

这个基础版本展示了异步爬虫的核心要素:

  • 使用aiohttp进行异步HTTP请求
  • 通过Semaphore控制并发数量
  • 使用异步上下文管理器管理资源
  • 异常处理和超时控制

进阶技巧:并发控制与资源管理 ⚡

在实际项目中,合理的并发控制和资源管理至关重要。以下是一个更高级的实现:

import asyncio
import aiohttp
import aiofiles
import json
import random
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin, urlparse
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@dataclass
class CrawlResult:"""爬取结果数据类"""url: strstatus_code: inttitle: Optional[str] = Nonecontent_length: Optional[int] = Noneresponse_time: Optional[float] = Noneerror: Optional[str] = Noneclass AdvancedAsyncSpider:def __init__(self, config: Dict[str, Any]):"""高级异步爬虫初始化:param config: 配置字典,包含各种爬虫参数"""self.max_concurrent = config.get('max_concurrent', 10)self.retry_times = config.get('retry_times', 3)self.retry_delay = config.get('retry_delay', 1)self.request_delay = config.get('request_delay', 0)self.timeout = config.get('timeout', 30)self.session = Noneself.semaphore = asyncio.Semaphore(self.max_concurrent)self.results: List[CrawlResult] = []self.failed_urls: List[str] = []# 用户代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',]async def __aenter__(self):"""创建会话"""connector = aiohttp.TCPConnector(limit=200,limit_per_host=50,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=60,enable_cleanup_closed=True)timeout = aiohttp.ClientTimeout(total=self.timeout)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,trust_env=True  # 支持代理环境变量)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""关闭会话"""if self.session:await self.session.close()# 等待连接完全关闭await asyncio.sleep(0.1)def get_random_headers(self) -> Dict[str, str]:"""生成随机请求头"""return {'User-Agent': random.choice(self.user_agents),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1',}async def fetch_with_retry(self, url: str) -> CrawlResult:"""带重试机制的页面获取:param url: 目标URL:return: 爬取结果对象"""async with self.semaphore:for attempt in range(self.retry_times + 1):start_time = asyncio.get_event_loop().time()try:# 添加随机延迟,避免被反爬虫检测if self.request_delay > 0:await asyncio.sleep(random.uniform(0, self.request_delay))headers = self.get_random_headers()async with self.session.get(url, headers=headers) as response:response_time = asyncio.get_event_loop().time() - start_timecontent = await response.text()# 解析标题title = Noneif 'text/html' in response.headers.get('content-type', ''):from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')title_tag = soup.find('title')if title_tag:title = title_tag.get_text().strip()result = CrawlResult(url=url,status_code=response.status,title=title,content_length=len(content),response_time=response_time)logger.info(f"✅ 成功: {url} (状态码: {response.status}, 耗时: {response_time:.2f}s)")return resultexcept asyncio.TimeoutError:error_msg = f"超时 (尝试 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"⏰ {url} - {error_msg}")except aiohttp.ClientError as e:error_msg = f"客户端错误: {str(e)} (尝试 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"🚫 {url} - {error_msg}")except Exception as e:error_msg = f"未知错误: {str(e)} (尝试 {attempt + 1}/{self.retry_times + 1})"logger.error(f"💥 {url} - {error_msg}")# 如果不是最后一次尝试,等待后重试if attempt < self.retry_times:await asyncio.sleep(self.retry_delay * (2 ** attempt))  # 指数退避# 所有重试都失败了result = CrawlResult(url=url,status_code=0,error=f"重试 {self.retry_times} 次后仍然失败")self.failed_urls.append(url)logger.error(f"❌ 最终失败: {url}")return resultasync def crawl_batch(self, urls: List[str], callback=None) -> List[CrawlResult]:"""批量爬取URL:param urls: URL列表:param callback: 可选的回调函数,处理每个结果:return: 爬取结果列表"""logger.info(f"🚀 开始批量爬取 {len(urls)} 个URL")start_time = asyncio.get_event_loop().time()# 创建所有任务tasks = [self.fetch_with_retry(url) for url in urls]# 使用as_completed获取完成的任务,可以实时处理结果results = []completed = 0for coro in asyncio.as_completed(tasks):result = await cororesults.append(result)completed += 1# 调用回调函数if callback:await callback(result, completed, len(urls))# 显示进度if completed % 10 == 0 or completed == len(urls):logger.info(f"📊 进度: {completed}/{len(urls)} ({completed/len(urls)*100:.1f}%)")total_time = asyncio.get_event_loop().time() - start_timesuccess_count = len([r for r in results if r.status_code > 0])logger.info(f"🎉 批量爬取完成")logger.info(f"📈 总耗时: {total_time:.2f}秒")logger.info(f"📊 成功率: {success_count}/{len(urls)} ({success_count/len(urls)*100:.1f}%)")self.results.extend(results)return resultsasync def save_results(self, filename: str):"""异步保存结果到JSON文件"""data = {'total_urls': len(self.results),'successful': len([r for r in self.results if r.status_code > 0]),'failed': len(self.failed_urls),'results': [asdict(result) for result in self.results],'failed_urls': self.failed_urls}async with aiofiles.open(filename, 'w', encoding='utf-8') as f:await f.write(json.dumps(data, ensure_ascii=False, indent=2))logger.info(f"💾 结果已保存到: {filename}")# 使用示例
async def progress_callback(result: CrawlResult, completed: int, total: int):"""进度回调函数"""if result.status_code > 0:print(f"✅ [{completed}/{total}] {result.url} - {result.title}")else:print(f"❌ [{completed}/{total}] {result.url} - {result.error}")async def advanced_demo():"""高级爬虫演示"""config = {'max_concurrent': 20,  # 并发数'retry_times': 2,      # 重试次数'retry_delay': 1,      # 重试延迟'request_delay': 0.5,  # 请求间隔'timeout': 15,         # 超时时间}# 测试URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json','https://httpbin.org/xml','https://httpbin.org/html','https://httpbin.org/robots.txt',] * 3  # 重复3次,模拟更多URLasync with AdvancedAsyncSpider(config) as spider:# 批量爬取results = await spider.crawl_batch(urls, callback=progress_callback)# 保存结果await spider.save_results('crawl_results.json')# 统计信息successful = [r for r in results if r.status_code > 0]avg_response_time = sum(r.response_time or 0 for r in successful) / len(successful)print(f"\n📊 统计信息:")print(f"  成功: {len(successful)}")print(f"  失败: {len(spider.failed_urls)}")print(f"  平均响应时间: {avg_response_time:.2f}秒")if __name__ == "__main__":asyncio.run(advanced_demo())

这个进阶版本包含了:

  • 完整的重试机制与指数退避
  • 随机请求头和延迟,避免反爬虫检测
  • 实时进度回调和详细日志
  • 结构化的结果存储
  • 更好的资源管理和异常处理

高级实战:分布式异步爬虫架构 🏗️

对于大规模爬虫项目,我们需要考虑分布式架构。以下是一个基于Redis的分布式爬虫实现:

import asyncio
import aiohttp
import aioredis
import json
import hashlib
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import logginglogger = logging.getLogger(__name__)@dataclass
class Task:"""爬取任务数据结构"""url: strpriority: int = 0retry_count: int = 0max_retries: int = 3metadata: Dict[str, Any] = Nonedef __post_init__(self):if self.metadata is None:self.metadata = {}class DistributedSpider:"""分布式异步爬虫"""def __init__(self, worker_id: str, redis_config: Dict[str, Any], spider_config: Dict[str, Any]):self.worker_id = worker_idself.redis_config = redis_configself.spider_config = spider_config# Redis连接self.redis = None# 队列名称self.task_queue = "spider:tasks"self.result_queue = "spider:results"self.failed_queue = "spider:failed"self.duplicate_set = "spider:duplicates"# HTTP会话self.session = Noneself.semaphore = asyncio.Semaphore(spider_config.get('max_concurrent', 10))# 统计信息self.stats = {'processed': 0,'success': 0,'failed': 0,'start_time': time.time()}async def __aenter__(self):"""初始化连接"""# 连接Redisself.redis = aioredis.from_url(f"redis://{self.redis_config['host']}:{self.redis_config['port']}",password=self.redis_config.get('password'),db=self.redis_config.get('db', 0),encoding='utf-8',decode_responses=True)# 创建HTTP会话connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)timeout = aiohttp.ClientTimeout(total=30)self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)logger.info(f"🚀 Worker {self.worker_id} 已启动")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""清理资源"""if self.session:await self.session.close()if self.redis:await self.redis.close()logger.info(f"🛑 Worker {self.worker_id} 已停止")def generate_task_id(self, url: str) -> str:"""生成任务唯一ID"""return hashlib.md5(url.encode()).hexdigest()async def add_task(self, task: Task) -> bool:"""添加任务到队列"""task_id = self.generate_task_id(task.url)# 检查是否重复is_duplicate = await self.redis.sismember(self.duplicate_set, task_id)if is_duplicate:logger.debug(f"⚠️ 重复任务: {task.url}")return False# 添加到去重集合await self.redis.sadd(self.duplicate_set, task_id)# 添加到任务队列(使用优先级队列)task_data = json.dumps(asdict(task))await self.redis.zadd(self.task_queue, {task_data: task.priority})logger.info(f"➕ 任务已添加: {task.url} (优先级: {task.priority})")return Trueasync def get_task(self) -> Optional[Task]:"""从队列获取任务"""# 使用BZPOPMAX获取最高优先级任务(阻塞式)result = await self.redis.bzpopmax(self.task_queue, timeout=5)if not result:return Nonetask_data = json.loads(result[1])return Task(**task_data)async def process_task(self, task: Task) -> Dict[str, Any]:"""处理单个任务"""async with self.semaphore:start_time = time.time()try:# 发送HTTP请求headers = {'User-Agent': 'DistributedSpider/1.0','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'}async with self.session.get(task.url, headers=headers) as response:content = await response.text()response_time = time.time() - start_timeresult = {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': response.status,'content_length': len(content),'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'content': content[:1000],  # 只保存前1000字符'metadata': task.metadata}# 这里可以添加内容解析逻辑if response.status == 200:result['success'] = True# 解析页面,提取新的URL(示例)new_urls = await self.extract_urls(content, task.url)result['extracted_urls'] = new_urlselse:result['success'] = Falseresult['error'] = f"HTTP {response.status}"return resultexcept Exception as e:response_time = time.time() - start_timereturn {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': 0,'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'success': False,'error': str(e),'metadata': task.metadata}async def extract_urls(self, content: str, base_url: str) -> List[str]:"""从页面内容中提取URL(示例实现)"""try:from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')urls = []for link in soup.find_all('a', href=True):url = urljoin(base_url, link['href'])# 简单过滤if url.startswith('http') and len(urls) < 10:urls.append(url)return urlsexcept Exception:return []async def save_result(self, result: Dict[str, Any]):"""保存处理结果"""if result['success']:# 保存成功结果await self.redis.lpush(self.result_queue, json.dumps(result))self.stats['success'] += 1# 如果有提取的URL,添加为新任务if 'extracted_urls' in result:for url in result['extracted_urls']:new_task = Task(url=url, priority=0, metadata={'parent_url': result['url']})await self.add_task(new_task)else:# 处理失败的任务task_id = result['task_id']# 重试逻辑original_task = Task(url=result['url'],retry_count=result.get('retry_count', 0) + 1,metadata=result.get('metadata', {}))if original_task.retry_count <= original_task.max_retries:# 重新加入队列,降低优先级original_task.priority = -original_task.retry_countawait self.add_task(original_task)logger.info(f"🔄 重试任务: {original_task.url} (第{original_task.retry_count}次)")else:# 超过重试次数,保存到失败队列await self.redis.lpush(self.failed_queue, json.dumps(result))logger.error(f"💀 任务最终失败: {original_task.url}")self.stats['failed'] += 1async def run_worker(self, max_tasks: Optional[int] = None):"""运行工作进程"""logger.info(f"🏃 Worker {self.worker_id} 开始工作")processed = 0while True:if max_tasks and processed >= max_tasks:logger.info(f"🎯 达到最大任务数量: {max_tasks}")break# 获取任务task = await self.get_task()if not task:logger.debug("⏳ 暂无任务,等待中...")continue# 处理任务logger.info(f"🔧 处理任务: {task.url}")result = await self.process_task(task)# 保存结果await self.save_result(result)# 更新统计processed += 1self.stats['processed'] = processed# 定期输出统计信息if processed % 10 == 0:await self.print_stats()async def print_stats(self):"""打印统计信息"""elapsed = time.time() - self.stats['start_time']rate = self.stats['processed'] / elapsed if elapsed > 0 else 0logger.info(f"📊 Worker {self.worker_id} 统计:")logger.info(f"  已处理: {self.stats['processed']}")logger.info(f"  成功: {self.stats['success']}")logger.info(f"  失败: {self.stats['failed']}")logger.info(f"  速率: {rate:.2f} tasks/sec")logger.info(f"  运行时间: {elapsed:.2f}秒")# 使用示例
async def distributed_demo():"""分布式爬虫演示"""redis_config = {'host': 'localhost','port': 6379,'password': None,'db': 0}spider_config = {'max_concurrent': 5,'request_delay': 1,}worker_id = f"worker-{int(time.time())}"async with DistributedSpider(worker_id, redis_config, spider_config) as spider:# 添加一些初始任务initial_urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/json','https://httpbin.org/html',]for url in initial_urls:task = Task(url=url, priority=10)  # 高优先级await spider.add_task(task)# 运行工作进程await spider.run_worker(max_tasks=20)if __name__ == "__main__":logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')asyncio.run(distributed_demo())

这个分布式版本实现了:

  • 基于Redis的任务队列和结果存储
  • 优先级任务调度
  • 自动去重机制
  • 失败重试和降级处理
  • 多工作进程协作
  • 实时统计和监控

反爬虫对抗策略 🛡️

在实际爬虫开发中,反爬虫对抗是必须面对的挑战。以下是一些常用的策略:

import asyncio
import aiohttp
import random
import time
from typing import List, Dict, Optional
from urllib.parse import urlparse
import jsonclass AntiAntiSpider:"""反反爬虫工具类"""def __init__(self):# 用户代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0','Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',]# 代理池self.proxy_pool = [# 'http://proxy1:port',# 'http://proxy2:port',# 可以从代理服务商API动态获取]# 请求频率控制self.domain_delays = {}  # 每个域名的延迟配置self.last_request_times = {}  # 每个域名的最后请求时间def get_random_user_agent(self) -> str:"""获取随机User-Agent"""return random.choice(self.user_agents)def get_proxy(self) -> Optional[str]:"""获取代理(如果有的话)"""if self.proxy_pool:return random.choice(self.proxy_pool)return Noneasync def respect_robots_txt(self, session: aiohttp.ClientSession, url: str) -> bool:"""检查robots.txt(可选实现)"""try:parsed_url = urlparse(url)robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"async with session.get(robots_url) as response:if response.status == 200:robots_content = await response.text()# 这里可以实现robots.txt解析逻辑# 简化版本:检查是否包含Disallowreturn 'Disallow: /' not in robots_contentexcept:passreturn True  # 默认允许async def rate_limit(self, url: str):"""频率限制"""domain = urlparse(url).netloc# 获取该域名的延迟配置(默认1-3秒)if domain not in self.domain_delays:self.domain_delays[domain] = random.uniform(1, 3)# 检查上次请求时间if domain in self.last_request_times:elapsed = time.time() - self.last_request_times[domain]required_delay = self.domain_delays[domain]if elapsed < required_delay:sleep_time = required_delay - elapsedawait asyncio.sleep(sleep_time)# 更新最后请求时间self.last_request_times[domain] = time.time()def get_headers(self, url: str, referer: Optional[str] = None) -> Dict[str, str]:"""生成请求头"""headers = {'User-Agent': self.get_random_user_agent(),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Sec-Fetch-Dest': 'document','Sec-Fetch-Mode': 'navigate','Sec-Fetch-Site': 'none','Cache-Control': 'max-age=0',}# 添加Referer(如果提供)if referer:headers['Referer'] = referer# 根据域名添加特定头部domain = urlparse(url).netlocif 'github' in domain:headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'return headersclass StealthSpider:"""隐蔽爬虫实现"""def __init__(self, max_concurrent: int = 3):self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)self.anti_anti = AntiAntiSpider()# 会话状态管理self.cookies = {}self.session_data = {}async def __aenter__(self):# 创建更真实的连接器配置connector = aiohttp.TCPConnector(limit=50,limit_per_host=10,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=30,enable_cleanup_closed=True,# 模拟真实浏览器的连接行为family=0,  # 支持IPv4和IPv6)# 设置合理的超时timeout = aiohttp.ClientTimeout(total=30,connect=10,sock_read=20)# 创建会话,支持自动重定向和cookieself.session = aiohttp.ClientSession(connector=connector,timeout=timeout,cookie_jar=aiohttp.CookieJar(),# 支持自动解压auto_decompress=True,# 信任环境变量中的代理设置trust_env=True)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()await asyncio.sleep(0.1)  # 确保连接完全关闭async def fetch_with_stealth(self, url: str, referer: Optional[str] = None) -> Dict:"""隐蔽模式获取页面"""async with self.semaphore:# 频率限制await self.anti_anti.rate_limit(url)# 生成请求头headers = self.anti_anti.get_headers(url, referer)# 获取代理proxy = self.anti_anti.get_proxy()try:# 发送请求async with self.session.get(url, headers=headers, proxy=proxy,allow_redirects=True,max_redirects=5) as response:content = await response.text()return {'url': str(response.url),'status': response.status,'headers': dict(response.headers),'content': content,'cookies': {cookie.key: cookie.value for cookie in response.cookies.values()},'final_url': str(response.url),'redirects': len(response.history),'success': True}except Exception as e:return {'url': url,'status': 0,'error': str(e),'success': False}async def crawl_with_session_management(self, urls: List[str]) -> List[Dict]:"""带会话管理的爬取"""results = []for i, url in enumerate(urls):# 使用前一个URL作为referer(模拟用户行为)referer = urls[i-1] if i > 0 else Noneresult = await self.fetch_with_stealth(url, referer)results.append(result)# 模拟用户阅读时间if result['success']:reading_time = random.uniform(2, 8)print(f"📖 模拟阅读时间: {reading_time:.1f}秒")await asyncio.sleep(reading_time)return results# 验证码处理示例(需要OCR服务)
class CaptchaHandler:"""验证码处理器"""async def solve_captcha(self, captcha_image_url: str, session: aiohttp.ClientSession) -> Optional[str]:"""解决验证码(示例实现)实际项目中可能需要:1. 第三方OCR服务2. 机器学习模型3. 人工打码平台"""try:# 下载验证码图片async with session.get(captcha_image_url) as response:if response.status == 200:image_data = await response.read()# 这里应该调用OCR服务# 示例:使用第三方服务# result = await self.call_ocr_service(image_data)# return result# 临时返回None,表示无法处理return Noneexcept Exception as e:print(f"验证码处理失败: {e}")return Noneasync def handle_captcha_page(self, session: aiohttp.ClientSession, response_text: str, current_url: str):"""处理包含验证码的页面"""# 检测是否包含验证码if 'captcha' in response_text.lower() or '验证码' in response_text:print("🤖 检测到验证码页面")# 提取验证码图片URL(需要根据具体网站实现)# 这里只是示例from bs4 import BeautifulSoupsoup = BeautifulSoup(response_text, 'html.parser')captcha_img = soup.find('img', {'id': 'captcha'})if captcha_img:captcha_url = captcha_img.get('src')if captcha_url:# 解决验证码captcha_result = await self.solve_captcha(captcha_url, session)if captcha_result:print(f"✅ 验证码识别结果: {captcha_result}")return captcha_resultprint("❌ 验证码处理失败")return None# 使用示例
async def stealth_demo():"""隐蔽爬虫演示"""urls = ['https://httpbin.org/user-agent','https://httpbin.org/headers','https://httpbin.org/cookies','https://httpbin.org/redirect/2',]async with StealthSpider(max_concurrent=2) as spider:results = await spider.crawl_with_session_management(urls)for result in results:if result['success']:print(f"✅ {result['url']} (状态: {result['status']})")if result['redirects'] > 0:print(f"   重定向次数: {result['redirects']}")else:print(f"❌ {result['url']} - {result['error']}")if __name__ == "__main__":asyncio.run(stealth_demo())

性能优化与监控 📊

性能优化是异步爬虫的关键环节,以下是一个完整的监控和优化方案:

import asyncio
import aiohttp
import time
import psutil
import gc
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging
from collections import deque, defaultdict
import json@dataclass
class PerformanceMetrics:"""性能指标数据类"""timestamp: floatrequests_per_second: floataverage_response_time: floatmemory_usage_mb: floatcpu_usage_percent: floatactive_connections: intqueue_size: intsuccess_rate: floatclass PerformanceMonitor:"""性能监控器"""def __init__(self, window_size: int = 60):self.window_size = window_size  # 监控窗口大小(秒)self.metrics_history = deque(maxlen=window_size)self.request_times = deque()self.response_times = deque()self.success_count = 0self.total_count = 0self.start_time = time.time()# 连接池监控self.active_connections = 0self.queue_size = 0def record_request(self, response_time: float, success: bool):"""记录请求指标"""current_time = time.time()self.request_times.append(current_time)self.response_times.append(response_time)if success:self.success_count += 1self.total_count += 1# 清理过期数据cutoff_time = current_time - self.window_sizewhile self.request_times and self.request_times[0] < cutoff_time:self.request_times.popleft()self.response_times.popleft()def get_current_metrics(self) -> PerformanceMetrics:"""获取当前性能指标"""current_time = time.time()# 计算RPSrps = len(self.request_times) / self.window_size if self.request_times else 0# 计算平均响应时间avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0# 系统资源使用memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # MBcpu_usage = psutil.Process().cpu_percent()# 成功率success_rate = self.success_count / self.total_count if self.total_count > 0 else 0metrics = PerformanceMetrics(timestamp=current_time,requests_per_second=rps,average_response_time=avg_response_time,memory_usage_mb=memory_usage,cpu_usage_percent=cpu_usage,active_connections=self.active_connections,queue_size=self.queue_size,success_rate=success_rate)self.metrics_history.append(metrics)return metricsdef print_stats(self):"""打印统计信息"""metrics = self.get_current_metrics()uptime = time.time() - self.start_timeprint(f"\n📊 性能监控报告 (运行时间: {uptime:.1f}s)")print(f"  RPS: {metrics.requests_per_second:.2f}")print(f"  平均响应时间: {metrics.average_response_time:.2f}s")print(f"  内存使用: {metrics.memory_usage_mb:.1f}MB")print(f"  CPU使用: {metrics.cpu_usage_percent:.1f}%")print(f"  活跃连接: {metrics.active_connections}")print(f"  队列大小: {metrics.queue_size}")print(f"  成功率: {metrics.success_rate:.1%}")print(f"  总请求数: {self.total_count}")class OptimizedAsyncSpider:"""优化版异步爬虫"""def __init__(self, config: Dict):self.config = configself.session = Noneself.semaphore = Noneself.monitor = PerformanceMonitor()# 连接池优化配置self.connector_config = {'limit': config.get('max_connections', 100),'limit_per_host': config.get('max_connections_per_host', 30),'ttl_dns_cache': config.get('dns_cache_ttl', 300),'use_dns_cache': True,'keepalive_timeout': config.get('keepalive_timeout', 60),'enable_cleanup_closed': True,# 优化TCP socket选项'socket_options': [(1, 6, 1),  # TCP_NODELAY] if hasattr(1, '__index__') else None}# 自适应并发控制self.adaptive_concurrency = config.get('adaptive_concurrency', True)self.min_concurrent = config.get('min_concurrent', 5)self.max_concurrent = config.get('max_concurrent', 50)self.current_concurrent = self.min_concurrent# 请求池self.request_pool = asyncio.Queue(maxsize=config.get('request_queue_size', 1000))# 响应时间统计(用于自适应调整)self.response_time_window = deque(maxlen=100)async def __aenter__(self):# 创建优化的连接器connector = aiohttp.TCPConnector(**self.connector_config)# 设置超时timeout = aiohttp.ClientTimeout(total=self.config.get('total_timeout', 30),connect=self.config.get('connect_timeout', 10),sock_read=self.config.get('read_timeout', 20))# 创建会话self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,# 启用压缩auto_decompress=True,# 设置最大响应大小read_bufsize=self.config.get('read_bufsize', 64 * 1024),)# 初始化信号量self.semaphore = asyncio.Semaphore(self.current_concurrent)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()# 强制垃圾回收gc.collect()async def adjust_concurrency(self):"""自适应并发调整"""if not self.adaptive_concurrency or len(self.response_time_window) < 10:return# 计算最近的平均响应时间recent_avg = sum(list(self.response_time_window)[-10:]) / 10overall_avg = sum(self.response_time_window) / len(self.response_time_window)# 如果最近响应时间明显增加,降低并发if recent_avg > overall_avg * 1.5 and self.current_concurrent > self.min_concurrent:self.current_concurrent = max(self.min_concurrent, self.current_concurrent - 2)print(f"📉 降低并发数至: {self.current_concurrent}")# 如果响应时间稳定且较快,增加并发elif recent_avg < overall_avg * 0.8 and self.current_concurrent < self.max_concurrent:self.current_concurrent = min(self.max_concurrent, self.current_concurrent + 1)print(f"📈 提高并发数至: {self.current_concurrent}")# 更新信号量self.semaphore = asyncio.Semaphore(self.current_concurrent)async def fetch_optimized(self, url: str) -> Dict:"""优化的请求方法"""async with self.semaphore:start_time = time.time()try:# 更新连接数self.monitor.active_connections += 1async with self.session.get(url) as response:content = await response.text()response_time = time.time() - start_time# 记录响应时间self.response_time_window.append(response_time)self.monitor.record_request(response_time, response.status == 200)return {'url': url,'status': response.status,'content': content,'response_time': response_time,'content_length': len(content),'success': response.status == 200}except Exception as e:response_time = time.time() - start_timeself.monitor.record_request(response_time, False)return {'url': url,'status': 0,'error': str(e),'response_time': response_time,'success': False}finally:self.monitor.active_connections -= 1async def batch_crawl_optimized(self, urls: List[str]) -> List[Dict]:"""优化的批量爬取"""print(f"🚀 开始优化爬取 {len(urls)} 个URL")# 分批处理,避免内存过载batch_size = self.config.get('batch_size', 100)all_results = []for i in range(0, len(urls), batch_size):batch_urls = urls[i:i + batch_size]print(f"📦 处理批次 {i//batch_size + 1}/{(len(urls)-1)//batch_size + 1}")# 创建任务tasks = [self.fetch_optimized(url) for url in batch_urls]# 执行任务batch_results = await asyncio.gather(*tasks, return_exceptions=True)# 过滤异常valid_results = [r for r in batch_results if isinstance(r, dict)]all_results.extend(valid_results)# 自适应调整并发await self.adjust_concurrency()# 打印监控信息if i % (batch_size * 2) == 0:  # 每2个批次打印一次self.monitor.print_stats()# 批次间短暂休息,避免过载if i + batch_size < len(urls):await asyncio.sleep(0.1)return all_resultsasync def memory_cleanup(self):"""内存清理"""# 手动触发垃圾回收gc.collect()# 清理过期的监控数据current_time = time.time()cutoff_time = current_time - 300  # 保留5分钟的数据while (self.monitor.metrics_history and self.monitor.metrics_history[0].timestamp < cutoff_time):self.monitor.metrics_history.popleft()# 使用示例
async def performance_demo():"""性能优化演示"""config = {'max_connections': 50,'max_connections_per_host': 15,'max_concurrent': 20,'min_concurrent': 5,'adaptive_concurrency': True,'batch_size': 50,'total_timeout': 15,'connect_timeout': 5,}# 生成大量测试URLtest_urls = []for i in range(200):delay = i % 5 + 1  # 1-5秒延迟test_urls.append(f'https://httpbin.org/delay/{delay}')async with OptimizedAsyncSpider(config) as spider:# 启动监控任务monitor_task = asyncio.create_task(periodic_monitoring(spider.monitor))try:# 执行爬取results = await spider.batch_crawl_optimized(test_urls)# 最终统计print(f"\n🎉 爬取完成!")print(f"📊 总URL数: {len(test_urls)}")print(f"📊 成功数: {len([r for r in results if r.get('success')])}")print(f"📊 失败数: {len([r for r in results if not r.get('success')])}")spider.monitor.print_stats()finally:monitor_task.cancel()async def periodic_monitoring(monitor: PerformanceMonitor):"""定期监控任务"""while True:await asyncio.sleep(10)  # 每10秒监控一次try:metrics = monitor.get_current_metrics()# 检查异常情况if metrics.memory_usage_mb > 500:  # 内存超过500MBprint("⚠️ 内存使用过高!")if metrics.requests_per_second < 1 and monitor.total_count > 0:print("⚠️ RPS过低,可能存在瓶颈!")if metrics.success_rate < 0.8 and monitor.total_count > 10:print("⚠️ 成功率过低!")except Exception as e:print(f"监控异常: {e}")if __name__ == "__main__":asyncio.run(performance_demo())

常见问题与解决方案 ❓

1. 内存泄露问题

问题:长时间运行后内存持续增长
解决方案

# 正确的资源管理
async with aiohttp.ClientSession() as session:# 使用完自动关闭pass# 定期垃圾回收
import gc
gc.collect()# 限制并发数量
semaphore = asyncio.Semaphore(10)

2. 连接池耗尽

问题:大量并发请求导致连接池耗尽
解决方案

connector = aiohttp.TCPConnector(limit=100,  # 增加连接池大小limit_per_host=20,  # 单host连接数限制ttl_dns_cache=300,  # DNS缓存enable_cleanup_closed=True  # 自动清理关闭的连接
)

3. 超时处理不当

问题:请求超时导致任务堆积
解决方案

timeout = aiohttp.ClientTimeout(total=30,      # 总超时connect=10,    # 连接超时sock_read=20   # 读取超时
)

4. 异常传播

问题:单个任务异常影响整体爬取
解决方案

# 使用return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)# 过滤异常结果
valid_results = [r for r in results if not isinstance(r, Exception)]

最佳实践总结 🏆

经过近几年的异步爬虫开发经验,我总结出以下最佳实践:

1. 架构设计原则

  • 单一职责:每个组件只负责特定功能
  • 可扩展性:支持水平扩展和配置调整
  • 容错性:优雅处理各种异常情况
  • 监控性:完整的性能监控和日志记录

2. 性能优化要点

  • 合理的并发数:根据目标网站和网络环境调整
  • 连接复用:使用连接池减少握手开销
  • 内存管理:及时清理资源,避免内存泄露
  • 批处理:分批处理大量URL,避免过载

3. 反爬虫对抗策略

  • 请求头伪装:模拟真实浏览器行为
  • 频率控制:合理的请求间隔
  • IP轮换:使用代理池分散请求
  • 会话管理:维护登录状态和cookie

4. 错误处理机制

  • 重试策略:指数退避重试
  • 降级处理:失败任务的备用方案
  • 监控告警:及时发现和处理问题
  • 日志记录:详细的操作日志

5. 数据质量保证

  • 去重机制:避免重复抓取
  • 数据验证:确保抓取数据的完整性
  • 增量更新:只抓取变化的数据
  • 备份恢复:重要数据的备份策略

通过遵循这些最佳实践,你可以构建出高效、稳定、可维护的异步爬虫系统。记住,爬虫开发不仅仅是技术问题,还需要考虑法律合规、网站负载、数据质量等多个方面。

异步爬虫是一门实践性很强的技术,建议在实际项目中不断优化和完善。随着Python异步生态的不断发展,相信会有更多优秀的工具和库出现,让我们的爬虫开发更加高效和便捷。

希望这份指南能够帮助你在异步爬虫的道路上走得更远!🚀

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/88144.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/88144.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JMeter-SSE响应数据自动化3.0

背景 此次因为多了一些需要过滤排除的错误(数量很少)&#xff0c;还需要修改下JMeter的jtl文件输出数据&#xff08;后续统计数据需要&#xff09; 所以只涉及到JSR脚本的一些改动(此部分改动并不会影响到JMeter的HTML报告) 改动 主要通过设置JMeter中prev输出数据变量threadN…

012 进程状态和优先级

&#x1f984; 个人主页: 小米里的大麦-CSDN博客 &#x1f38f; 所属专栏: Linux_小米里的大麦的博客-CSDN博客 &#x1f381; GitHub主页: 小米里的大麦的 GitHub ⚙️ 操作环境: Visual Studio 2022 文章目录 进程状态和优先级一、进程状态分类特殊状态说明 二、如何查看进程…

React JSX原理

JSX本质 实质上是React.createElement()的语法糖

Java-51 深入浅出 Tomcat 手写 Tomcat 类加载机制 双亲委派机制 生命周期 插件化

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月13日更新到&#xff1a; AI炼丹日志-28 - Aud…

从C++编程入手设计模式——责任链模式

从C编程入手设计模式——责任链模式 ​ 当我们的一个请求需要多个对象去处理&#xff0c;但具体由谁来处理&#xff0c;是根据情况动态决定的。例如&#xff0c;一个日志系统中&#xff0c;可能希望把错误信息写入文件&#xff0c;把提示信息输出到控制台&#xff0c;而不是每…

泛型方法调用需要显示指定泛型类型的场景

泛型类型的推断确定 一般来说&#xff0c;泛型类型的推断可以由以下几个场景确定&#xff1a; 变量定义指定类型 List<String> strList new ArrayList<>();ArrayList的泛型类型是依据变量的类型确定的。 方法返回值确定 Overridepublic Function<List<I…

Deep Research:开启深度研究的智能新时代

在当今信息爆炸的时代&#xff0c;人们面临着海量的信息&#xff0c;无论是专业人士还是普通消费者&#xff0c;都迫切需要一种高效、精准的方式来获取和分析信息。OpenAI 推出的 Deep Research&#xff0c;宛如一颗璀璨的新星&#xff0c;在知识的海洋中为我们导航&#xff0c…

曼昆《经济学原理》第九版 宏观经济学 第二十四章失业与自然失业率

以下是曼昆《经济学原理》第九版宏观经济学第二十四章**“失业与自然失业率”**的详细讲解&#xff0c;从零基础开始构建知识框架&#xff0c;结合中国实际案例与生活化比喻&#xff0c;帮助小白系统理解核心概念&#xff1a; 一、知识框架&#xff1a;失业的“全景图” 1. 核…

【软考高级系统架构论文】论软件系统架构风格

论文真题 请以“软件系统架构风格”为论题,依次从以下三个方面进行论述: 1、概要叙述你参与分析和开发的软件系统开发项目以及你所担任的主要工作。 2、分析软件系统开发中常用的软件系统架构风格有哪些?详细阐述每种风格的具体含义。 3、详细说明在你所参与的软件系统开发项…

LeetCode--35.搜索插入位置

解题思路&#xff1a; 1.获取信息&#xff1a; 给定一个升序排列的数组和一个整数&#xff0c;要求查找该整数应该在数组中插入的位置 限制条件是&#xff0c;要求时间复杂度为O(log N) 2.分析题目&#xff1a; 时间复杂度要求O(log N)&#xff0c;那么就使用二分查找法&#x…

Unix、Linux、POSIX、Minix 区别与联系

一、Unix&#xff1a;现代操作系统的技术原型 诞生&#xff1a;1969年贝尔实验室&#xff0c;用C语言重写后实现跨平台&#xff08;1973年&#xff09;。核心设计&#xff1a; 一切皆文件&#xff08;设备/进程均抽象为文件&#xff09;。管道&#xff08;|&#xff09;和文本…

python计算长方形的周长 2025年3月青少年电子学会等级考试 中小学生python编程等级考试一级真题答案解析

python计算长方形的周长 2025年3月 python编程等级考试一级编程题 博主推荐 所有考级比赛学习相关资料合集【推荐收藏】 1、Python比赛 信息素养大赛Python编程挑战赛 蓝桥杯python选拔赛真题详解 蓝桥杯python省赛真题详解 蓝桥杯python国赛真题详解 2、Python考级 p…

使用 RedisVL 进行复杂查询

一、前置条件 在开始之前&#xff0c;请确保&#xff1a; 已安装 redisvl 并激活相应的 Python 环境。运行 Redis 实例&#xff0c;且 RediSearch 版本 > 2.4。 二、初始化与数据加载 我们将使用一个包含用户信息的数据集&#xff0c;字段包括 user、age、job、credit_s…

「Linux文件及目录管理」vi、vim编辑器

知识点解析 vi/vim编辑器简介 vi:Linux默认的文本编辑器,基于命令行操作,功能强大。vim:vi的增强版,支持语法高亮、多窗口编辑、插件扩展等功能。vi/vim基本模式 命令模式:默认模式,用于移动光标、复制、粘贴、删除等操作。插入模式:按i进入,用于输入文本。末行模式:…

电容器保护测控装置如何选型?

在电力系统的无功补偿环节&#xff0c;​电容器保护测控装置是保障并联电容器组安全稳定运行的核心设备。其选型需综合考量保护需求、系统环境及扩展功能。以下是关键选型要素分析&#xff1a; ​一、明确核心功能需求​ 电容器保护测控装置&#xff0c;选型时需匹配电容器组实…

最近小峰一直在忙国际化项目,确实有点分身乏术... [特殊字符] 不过! 我正紧锣密鼓准备一系列干货文章/深度解析

本人详解 大家晚上好呀&#xff01;&#x1f319; 最近小峰一直在忙国际化项目&#xff0c;确实有点分身乏术... &#x1f605; 不过&#xff01; 我正紧锣密鼓准备一系列干货文章/深度解析&#xff08;选一个更符合你内容的词&#xff09;&#xff0c;很快就会和大家见面啦&am…

OpenCV CUDA模块设备层-----设备端(GPU)线程块级别的一个内存拷贝工具函数blockCopy()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在同一个线程块&#xff08;thread block内&#xff0c;将 [beg, end) 范围内的数据并行地复制到 out 开始的位置。 它使用了 CUDA 线程协作机制…

https没有证书可以访问吗?外网怎么访问内网?

没有SSL证书的网站无法正常通过HTTPS协议访问‌。HTTPS的实现必须依赖有效的SSL证书完成加密握手&#xff0c;否则浏览器会直接阻断连接或显示严重的安全警告。‌‌ 一、技术实现层面‌ ‌HTTPS协议强制要求证书‌。 HTTPS基于SSL/TLS协议实现加密通信&#xff0c;而SSL证书是…

Python pytesseract【OCR引擎库】 简介

想全面了解DeepSeek的看过来 【包邮】DeepSeek全攻略 人人需要的AI通识课 零基础掌握DeepSeek的实用操作手册指南【限量作者亲笔签名版售完即止】 玩转DeepSeek这本就够了 【自营包邮】DeepSeek实战指南 deepseek从入门到精通实用操作指南现代科技科普读物AI普及知识读物人工智…

ubuntu安装postman教程并中文汉化详细教程

一、下载postman安装包 通过网盘分享的文件:Postman-linux-x64-8.7.0.tar.gz 链接: https://pan.baidu.com/s/10WYeguDJlK85cKJ6ptX01w?pwd=xqkh 提取码: xqkh 二、解压到/opt目录 tar -zxvf Postman-linux-x64-8.7.0.tar.gz如果子用户没有/opt权限,可以给子用户赋予/opt的…