爬虫03 - 爬虫的数据存储
文章目录
- 爬虫03 - 爬虫的数据存储
- 一:CSV数据存储
- 1:基本介绍
- 2:基本使用
- 3:高级使用
- 4:使用示例
- 二:JSON数据存储
- 1:基础json读写
- 2:字符串和对象的转换
- 3:日期和时间的特殊处理
- 4:自定义序列化
- 5:高级使用
- 三:数据库数据存储
- 1:基础部分学习
- 2:补充说明
一:CSV数据存储
1:基本介绍
csv是一种轻量级、跨平台的文件格式,广泛用于数据交换、日志记录及中小规模数据存储
- 无需依赖:直接通过Python标准库csv模块操作。
- 人类可读:文本格式可直接用Excel或文本编辑器查看。
- 高效灵活:适合快速导出、导入表格型数据。
csv格式如下:
- 每行表示一条记录,字段以特定分隔符(默认为逗号)分隔。
- 支持文本引用(如双引号)包裹含特殊字符的字段。
- 首行可定义列名(表头)。
适用场景 | 说明 |
---|---|
数据导出、备份 | 从数据库或API批量导出结构化数据 |
数据分析预处理 | 配合Pandas进行统计与可视化 |
跨系统数据交换 | 兼容Excel/R/MATLAB等工具 |
2:基本使用
python内置CSV模块,无需额外安装
基本读写csv实例
import csvheaders = ["id", "name", "email"]
data = [[1, "张三", "zhangsan@example.com"],[2, "李四", "lisi@test.org"],[3, "王五", "wangwu@demo.net"]
]# 用open函数,第一个参数是文件名,第二个参数是模式(r -> read, w -> wirte),第三个参数是编码方式encoding -> utf-8
with open("output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f)writer.writerow(headers) # 写入表头writer.writerows(data) # 批量写入数据
import csvwith open ("output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.reader(f)for row in reader:print(row)
字典方式读写实例
# 写入字典数据
with open("dict_output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "name", "email"])writer.writeheader()writer.writerow({"id": 1, "name": "张三", "email": "zhangsan@example.com"})# 读取为字典数据
with open("dict_output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:print(row)print(row["id"], row["name"], row["email"])
自定义分割符和引号规则
# 使用分号分隔,双引号包裹所有字段
with open("custom.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, delimiter=";", quoting=csv.QUOTE_ALL)writer.writerow(["id", "name"])writer.writerow([1, "张三"])
3:高级使用
含特殊字符的字段
字段包含逗号、换行符等破坏CSV结构的字符 -> 使用引号包裹字段,并配置csv.QUOTE_MINIMAL
或csv.QUOTE_NONNUMERIC
data = [[4, "Alice, Smith", "alice@example.com"],[5, "Bob\nJohnson", "bob@test.org"]
]with open("special_chars.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)writer.writerows(data)
嵌套数据
import jsondata = [{"id": 1, "info": '{"age": 30, "city": "北京"}'},{"id": 2, "info": '{"age": 25, "city": "上海"}'}
]# 写入嵌套JSON
with open("nested_data.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "info"])writer.writeheader()writer.writerows(data)# 读取并解析JSON
with open("nested_data.csv", "r", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:info = json.loads(row["info"])print(f"ID: {row['id']}, 城市: {info['city']}")
复杂情况的处理
大文件 -> 逐行读取 & 分批读取
复杂数据处理 -> pandas
4:使用示例
# 读取百度图书,并将结果放入到csv文件中,方便后面使用pandas进行数据分析
import csv
import requests
from bs4 import BeautifulSoupurl = "https://book.douban.com/top250"
headers = {"User-Agent": "Mozilla/5.0"}response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")books = []
for item in soup.select("tr.item"):title = item.select_one(".pl2 a")["title"]score = item.select_one(".rating_nums").textbooks.append({"title": title, "score": score})# 写入CSV
with open("douban_books.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["title", "score"])writer.writeheader()writer.writerows(books)print("数据已存储至 douban_books.csv")
二:JSON数据存储
使用场景 | 说明 |
---|---|
配置文件存储 | 程序参数、路径配置等(如config.json) |
API数据交互 | 前后端通过JSON格式传递请求与响应 |
结构化日志记录 | 记录带元数据的操作日志,便于后续分析 |
1:基础json读写
dump -> 写入到json, load -> 从json读取
import jsondata = {"project": "数据分析平台","version": 2.1,"authors": ["张三", "李四"],"tags": {"python": 5, "database": 3}
}with open("data.json", "w", encoding="utf-8") as f:json.dump(data, f, ensure_ascii=False, indent=2) # 禁用ASCII转义,缩进2空格with open("data.json", "r", encoding="utf-8") as f:loaded_data = json.load(f)
print(loaded_data["tags"]["python"]) # 输出:5
2:字符串和对象的转换
dumps -> json转成字符串,loads -> 字符串转成json
data_str = json.dumps(data, ensure_ascii=False)
print(type(data_str)) # <class 'str'>json_str = '{"name": "王五", "age": 30}'
obj = json.loads(json_str)
print(obj["age"]) # 输出:30
3:日期和时间的特殊处理
JSON默认不支持Python的datetime对象,需自定义转换逻辑
from datetime import datetimedef datetime_encoder(obj):if isinstance(obj, datetime):return obj.isoformat() # 转为ISO格式字符串raise TypeError("类型无法序列化")data = {"event": "发布会", "time": datetime.now()}# 序列化时指定自定义编码函数
json_str = json.dumps(data, default=datetime_encoder, ensure_ascii=False)
print(json_str) # {"event": "发布会", "time": "2024-07-20T15:30:45.123456"}
4:自定义序列化
class User:def __init__(self, name, level):self.name = nameself.level = leveluser = User("赵六", 3)# 方法1:手动转换字典
user_dict = {"name": user.name, "level": user.level}
json.dumps(user_dict)# 方法2:使用__dict__(需类属性均为可序列化类型)
json.dumps(user.__dict__)
5:高级使用
跳过特定的字段
def filter_encoder(obj):if "password" in obj:del obj["password"]return objuser_data = {"name": "张三", "password": "123456"}
json.dumps(user_data, default=filter_encoder) # {"name": "张三"}
取消缩进和空格(紧凑输出)
json.dumps(data, separators=(",", ":")) # 输出最简格式
ujson代替(C实现的JSON超高速库,API完全兼容)
import ujson as json # 替换标准库
json.dumps(data) # 速度提升3-10倍
大文件的读取
# 逐行读取JSON数组文件(每行为独立JSON对象)
with open("large_data.json", "r", encoding="utf-8") as f:for line in f:item = json.loads(line)process(item)
三:数据库数据存储
1:基础部分学习
各种数据库的数据存储请看这里
2:补充说明
可以使用dbutils进行mysql的连接池管理
import pymysql
from dbutils.pooled_db import PooledDBpool = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=2, # 链接池中最多闲置的链接,0和None不限制maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待ping=0, # ping MySQL服务端,检查是否服务可用host='127.0.0.1', # 数据库服务器的IP地址port=3306, # 数据库服务端口user='root', # 数据库用户名password='314159', # 数据库密码database='test', # 数据库名称charset='utf8' # 数据库编码
)# 创建游标对象
conn = pool.connection()
cursor = conn.cursor()# 有了游标对象就能操作了,方式都是 -> cursor.???(sql, args)
sql = "select * from user where id = %s"
cursor.execute(sql, [1]) # 执行SQL语句 -> select * from user where id = 1
# 获取结果
print(cursor.fetchone())
mongodb的管道操作
from datetime import datetimefrom pymongo import MongoClient
from pymongo.errors import ConnectionFailure# 建立连接(默认连接池大小100)
client = MongoClient(host="localhost",port=27017,
)try:# 心跳检测client.admin.command('ping')print("Successfully connected to MongoDB!")
except ConnectionFailure:print("Server not available")# 选择数据库与集合(自动懒创建)
db = client["ecommerce"]
products_col = db["products"]# 插入文档(自动生成_id)
product_data = {"name": "Wireless Mouse","price": 49.99,"tags": ["electronics", "computer"],"stock": {"warehouse_A": 100, "warehouse_B": 50},"last_modified": datetime.now()
}
insert_result = products_col.insert_one(product_data)
print(f"Inserted ID: {insert_result.inserted_id}")# 查询文档(支持嵌套查询)
query = {"price": {"$lt": 60}, "tags": "electronics"} # 查询价格小于60 并且 tags是electronics 的文档
projection = {"name": 1, "price": 1} # 类似SQL SELECT, 只返回name和price字段
cursor = products_col.find(query, projection).limit(5)
for doc in cursor:print(doc)# 更新文档(原子操作)
update_filter = {"name": "Wireless Mouse"}
update_data = {"$inc": {"stock.warehouse_A": -10}, "$set": {"last_modified": datetime.now()}}
update_result = products_col.update_one(update_filter, update_data)
print(f"Modified count: {update_result.modified_count}")# 删除文档
delete_result = products_col.delete_many({"price": {"$gt": 200}})
print(f"Deleted count: {delete_result.deleted_count}")# 管道操作
# 统计各仓库库存总量
pipeline = [{"$unwind": "$stock"}, # 阶段一:展开嵌套文档{"$group": { # 阶段二:分组聚合# 分组字段(分段的字段是stock.warehouse)记为_id# 聚合字段为 sum(对每一个分组的stock.quantity进行sum)"_id": "$stock.warehouse","total_stock": {"$sum": "$stock.quantity"}}},# 阶段三:排序, 根据分组条件降序排序{"$sort": {"total_stock": -1}}
]
results = products_col.aggregate(pipeline)
for res in results:print(f"Warehouse {res['_id']}: {res['total_stock']} units")
mongodb大数据的处理
from pymongo import MongoClient
from faker import Faker
import timeclient = MongoClient('mongodb://localhost:27017/')
db = client['bigdata']
collection = db['user_profiles']fake = Faker()
batch_size = 5000 # 分批次插入减少内存压力def generate_batch(batch_size):return [{"name": fake.name(),"email": fake.email(),"last_login": fake.date_time_this_year()} for _ in range(batch_size)]start_time = time.time()
for _ in range(200): # 总数据量100万batch_data = generate_batch(batch_size)collection.insert_many(batch_data, ordered=False) # 无序插入提升速度print(f"已插入 {(i+1)*batch_size} 条数据")print(f"总耗时: {time.time()-start_time:.2f}秒") # 分析电商订单数据(含嵌套结构)
pipeline = [{"$unwind": "$items"}, # 展开订单中的商品数组{"$match": {"status": "completed"}}, # 筛选已完成订单{"$group": {"_id": "$items.category","total_sales": {"$sum": "$items.price"},"avg_quantity": {"$avg": "$items.quantity"},"top_product": {"$max": "$items.name"}}},{"$sort": {"total_sales": -1}},{"$limit": 10}
]orders_col = db["orders"]
results = orders_col.aggregate(pipeline)for res in results:print(f"品类 {res['_id']}: 销售额{res['total_sales']}元")
性能优化的关键措施 -> 添加索引 & bluk操作
# 创建索引(提升查询速度)
products_col.create_index([("name", pymongo.ASCENDING)], unique=True)
products_col.create_index([("price", pymongo.ASCENDING), ("tags", pymongo.ASCENDING)])# 批量写入提升吞吐量
bulk_ops = [pymongo.InsertOne({"name": "Keyboard", "price": 89.99}),pymongo.UpdateOne({"name": "Mouse"}, {"$set": {"price": 59.99}}),pymongo.DeleteOne({"name": "Earphones"})
]
results = products_col.bulk_write(bulk_ops)
高可用架构配置
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure# MongoDB 副本集连接字符串
# replicaSet=rs0 是副本集的名称
uri = "mongodb://192.127.1.1:27017,192.127.1.2:27017,192.127.1.3:27017/mydb?replicaSet=rs0"# 创建 MongoClient 实例
client = MongoClient(uri)# 测试连接
try:# 通过执行一个简单的命令来验证连接是否成功client.admin.command('ping')print("成功连接到 MongoDB 副本集!")
except ConnectionFailure as e:print(f"无法连接到 MongoDB 副本集: {e}")