1.yield返回数据的原理?
为什么要用yield返回数据给管道?
- 遍历这个函数的返回值的时候,挨个把数据读到内存,不会造成内存的瞬间占用过高,
Python3
中的range
和python2
中的xrange
同理。 scrapy
是异步爬取,所以通过yield
能够将运行权限教给其他的协程任务去执行,这样整个程序运行效果会更高。
注意点:解析函数中的yield
能够传递的对象只能是:BaseItem
、Request
、dict
、None
前置知识,关于数据的存储
可以参考我另一篇文章python爬虫之数据存储_爬虫代码w是什么意思-CSDN博客
2.Mysql存储管道封装
第一步创建一个管道文件夹
咱们要用多管道,所以咱们不用自带的pipelines.py文件,虽然可以写在一个文件,但是如果写多个导致代码冗余。不好查看。
类名修改如下:
第二步在setting.py设置开启管道文件
按你的路径修改
数据越小,优先级越大
第四步设置管道
先下载 pip install pymsql
以豆瓣为例,这三个字段
初始数据
def __init__(self):# 数据库连接信息self.host = '127.0.0.1'self.user = 'root'self.passwd = '12345'self.db = 'spider2'self.charset = 'utf8mb4'
链接数据库
def open_spider(self, spider):# 连接数据库self.conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)self.cursor = self.conn.cursor()
注意open_spider是爬虫开启时自动运行,当前启动的爬虫对象,可以通过它获取爬虫的名称、设置等信息。
建表,以爬虫名称为表名
self.cursor = self.conn.cursor()# 根据爬虫名字创建表table_name = spider.namecreate_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (id INT AUTO_INCREMENT PRIMARY KEY,title VARCHAR(255),rating VARCHAR(50),url VARCHAR(255))"""self.cursor.execute(create_table_sql)self.conn.commit()
插入数据
def process_item(self, item, spider):# 获取爬虫名字作为表名table_name = spider.name# 插入数据insert_sql = f"""INSERT INTO {table_name} (title, rating, url)VALUES (%s, %s, %s)"""self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))self.conn.commit()return item
process_item
方法
作用:每当爬虫抓取到一个数据项(item
)时,该方法会被调用来处理这个数据项。这是管道的核心方法,用于对数据项进行清洗、验证、存储等操作。
return item 作用:返回处理后的 item
,以便后续的管道可以继续处理。如果抛出 DropItem
异常
,则丢弃该数据项,不再传递给后续的管道。
def close_spider(self, spider):# 关闭数据库连接self.cursor.close()self.conn.close()
close_spider()在爬虫关闭时被调用,主要用于清理资源,如关闭数据库连接、文件等操作。
完整代码如下:
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html# useful for handling different item types with a single interface
from itemadapter import ItemAdapterimport pymysql
class Mysql_Pipeline:def __init__(self):# 数据库连接信息self.host = '127.0.0.1'self.user = 'root'self.passwd = '12345'self.db = 'spider2'self.charset = 'utf8mb4'def open_spider(self, spider):# 连接数据库self.conn = pymysql.connect(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset)self.cursor = self.conn.cursor()# 根据爬虫名字创建表table_name = spider.namecreate_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (id INT AUTO_INCREMENT PRIMARY KEY,title VARCHAR(255),rating VARCHAR(50),url VARCHAR(255))"""self.cursor.execute(create_table_sql)self.conn.commit()def process_item(self, item, spider):# 获取爬虫名字作为表名table_name = spider.name# 插入数据insert_sql = f"""INSERT INTO {table_name} (title, rating, url)VALUES (%s, %s, %s)"""self.cursor.execute(insert_sql, (item['title'], item['rating'], item['url']))self.conn.commit()print('mysql插入成功')return itemdef close_spider(self, spider):# 关闭数据库连接self.cursor.close()self.conn.close()
结果如下:
3.
MongoDB存储管道封装
直接创建管道
开启管道
ITEM_PIPELINES = {'myspider.pipelines.Mysql_pipelines.Mysql_Pipeline': 300,'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 200,
}
还是以豆瓣三个字段为例
pip insatll pymongo
import pymongo
from scrapy import signalsclass MongoDBPipeline:def __init__(self):self.mongo_uri = 'mongodb://localhost:27017/' # MongoDB 的 URI 地址self.mongo_db = 'spider' # MongoDB 的数据库名称def open_spider(self, spider):# 在爬虫启动时执行,用于初始化操作,如建立 MongoDB 连接self.client = pymongo.MongoClient(self.mongo_uri)#自动创建数据库self.db = self.client[self.mongo_db]def close_spider(self, spider):# 在爬虫关闭时执行,用于清理操作,如关闭 MongoDB 连接self.client.close()def process_item(self, item, spider):# 处理每个数据项,将数据存储到 MongoDB 中collection_name = spider.name # 使用爬虫名字作为集合名#自动创建表self.db[collection_name].insert_one(item) # 将数据插入到集合中print('MongoDB插入成功!')#字典字典插入return item
原理差不多一样,链接数据库,插入数据库,关闭数据库。
双数据库插入
结果如下:
mongodb
mysql
但如果只想插入其中一个数据库
要么注释return item,优先级设置高,这样会抛出错误
要么注释管道开启
最好的还是在爬虫里重写配置
custom_settings = {'ITEM_PIPELINES': {'myspider.pipelines.MongoDB_piplines.MongoDBPipeline': 300,}}
这样管道设置以爬虫类为主,只插入mongodb
4.文件管道封装
图片存储
class FilePipeline:def process_item(self, item, spider):# getcwd(): 用于获取当前工作目录(Current Working Directory)的路径#图片下载if item.get("f_type"):if item.get("f_type") == 'img':download_path = os.getcwd() + f'/img/'if not os.path.exists(download_path):os.mkdir(download_path)# 图片保存image_name = item.get("title")image_content = item.get("content")if image_content:with open(download_path+f'{image_name}.jpg', "wb") as f:f.write(image_content)print("图片保存成功: ", image_name)
一般用item.get("f_type")区分文件下载用os生成文件夹
图片存储一共就两个参数,响应体和名字,这样配置,
在爬虫里重写设置
custom_settings = {'ITEM_PIPELINES': {'myspider.pipelines.File_piplines.FilePipeline': 300,}}
二次请求图片url
以上有个小错误,把 item_img['title'] =item['title']改为item_img['title'] =item
结果如下:
利用item.get("f_type")区分文件下载,文件管道可以封装如下:
import os
import picklefrom itemadapter import ItemAdapterimport json
class FilePipeline:def process_item(self, item, spider):# getcwd(): 用于获取当前工作目录(Current Working Directory)的路径#图片下载if item.get("f_type"):if item.get("f_type") == 'img':download_path = os.getcwd() + f'/img/'if not os.path.exists(download_path):os.mkdir(download_path)# 图片保存image_name = item.get("title")image_content = item.get("content")if image_content:with open(download_path+f'{image_name}.jpg', "wb") as f:f.write(image_content)print("图片保存成功: ", image_name)elif item.get("f_type") == 'txt':download_path = os.getcwd() + '/txt/'if not os.path.exists(download_path):os.mkdir(download_path)# 文本保存txt_name = item.get("title")txt_content = item.get("content")with open(download_path+f'{txt_name}.txt', 'a', encoding='utf-8') as f:f.write(txt_content + '\n')print('文本存储成功')elif item.get("f_type") == 'json':download_path = os.getcwd() + '/json/'if not os.path.exists(download_path):os.mkdir(download_path)# 文本保存json_name = item.get("title")json_obj = itemwith open(download_path+f'{json_name}.json', 'a', encoding='utf-8') as file:file.write(json.dumps(json_obj, indent=2, ensure_ascii=False), )print('json存储成功')elif item.get("f_type") == 'music':download_path = os.getcwd() + '/music/'if not os.path.exists(download_path):os.mkdir(download_path)# 文本保存music_name = item.get("title")music_content = item.get("content")with open(download_path+f'{music_name}.mp3', 'a', encoding='utf-8') as f:f.write(music_content + '\n')print('MP3存储成功')else:print('无事发生')
包括mp3,文本,json,图片等文件下载。
但是事实上,一个文件可以进行多次存储,这也是用yield返回给管道的主要原因
def parse(self, response, **kwargs):# scrapy的response对象可以直接进行xpathol_list = response.xpath('//ol[@class="grid_view"]/li')for ol in ol_list:# 创建一个数据字典item = {}# 利用scrapy封装好的xpath选择器定位元素,并通过extract()或extract_first()来获取结果item['title'] = ol.xpath('.//div[@class="hd"]/a/span[1]/text()').extract_first()#标题item['content'] = ol.xpath('.//div[@class="bd"]/div/span[2]/text()').extract_first()#评分item['f_type'] = 'txt'yield itemitem['url'] = ol.xpath('.//a/@href').extract_first()#链接item['img_url'] = ol.xpath('.//img/@src').extract_first()item['f_type'] = 'json'yield item# yield itemyield scrapy.Request(url= item['img_url'] , headers=self.headers, callback=self.img_parse,cb_kwargs={"item":item['title']},meta={"meta":item})#自定义一个回调方法def img_parse(self, response,item):item_img = {'f_type':"img"}item_img['content'] = response.bodyitem_img['title'] =itemyield item_imgif __name__ == '__main__':cmdline.execute('scrapy crawl douban'.split())
以上三个yield,返回三次 ,分别是文本保存,json保存,图片保存。
运行结果如下:
文本存储评分
管道封装到此一段落了。