Python + 淘宝 API 开发:自动化采集商品数据的完整流程​

在电商数据分析、竞品监控和市场调研等场景中,高效采集淘宝商品数据是关键环节。本文将详细介绍如何利用 Python 结合 API,构建一套自动化的商品数据采集系统,涵盖从 API 申请到数据存储的完整流程,并提供可直接运行的代码实现。​

淘宝API 基础​

淘宝提供了标准化的 API 接口,相比网页爬虫具有稳定性高、合规性强、数据结构统一等优势。常用的商品数据相关 API 包括:​

  • 商品详情 API:获取商品基本信息、价格、库存等​
  • 商品搜索 API:按关键词、分类等条件搜索商品​
  • 店铺商品 API:获取指定店铺的所有商品​
  • 商品评价 API:获取商品的用户评价数据​

使用淘宝 API 需遵守平台规范,注意调用频率限制和数据使用范围,避免违规操作导致账号封禁。​

开发前的准备工作​

1. 账号注册​

  1. 注册开发者账号​
  2. 完成认证(个人或企业认证)​
  3. 获取 Api Key 和 Api Secret​
  4. 申请所需的 API 权限(如商品详情、商品搜索等)​

2. 开发环境搭建​

推荐使用 Python 3.7 + 版本,需安装以下依赖库:

pip install top-api-sdk  # 淘宝API官方Python SDK
pip install pandas       # 数据处理
pip install pymysql      # 数据库连接
pip install python-dotenv # 环境变量管理

3. 核心参数配置​

创建.env文件存储敏感信息:

APP_KEY=你的AppKey
APP_SECRET=你的AppSecret
REDIRECT_URI=你的回调地址
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=你的密码
MYSQL_DB=taobao_data

自动化采集系统设计​

系统架构​

本系统采用模块化设计,主要包含以下组件:​

  • 认证模块:处理 API 授权与令牌管理​
  • 采集模块:调用 API 获取商品数据​
  • 存储模块:将数据保存到数据库​
  • 调度模块:实现定时自动采集​
  • 日志模块:记录系统运行状态​

数据采集流程​

  1. 获取 API 访问令牌(AccessToken)​
  2. 构造 API 请求参数(关键词、页码、排序方式等)​
  3. 调用 API 并处理返回结果​
  4. 数据清洗与格式转换​
  5. 存储到数据库​
  6. 实现增量采集与去重机制​

完整代码实现​

加载环境变量​

load_dotenv()​

配置日志​

logging.basicConfig(​

level=logging.INFO,​

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',​

filename='taobao_collector.log'​

)​

logger = logging.getLogger('taobao_collector')​

class TaobaoAPICollector:​

def init(self):​

"""初始化淘宝 API 采集器"""​

self.app_key = os.getenv ('APP_KEY')​

self.app_secret = os.getenv ('APP_SECRET')​

self.redirect_uri = os.getenv ('REDIRECT_URI')​

数据库连接配置​

self.db_config = {​

'host': os.getenv('MYSQL_HOST'),​

'port': int(os.getenv('MYSQL_PORT')),​

'user': os.getenv('MYSQL_USER'),​

'password': os.getenv('MYSQL_PASSWORD'),​

'db': os.getenv('MYSQL_DB'),​

'charset': 'utf8mb4'​

}​

初始化数据库连接​

self.db_conn = self._get_db_connection()​

self._create_tables()​

API 调用配置​

self.page_size = 20 # 每页商品数量​

self.max_pages = 5 # 最大采集页数​

self.request_interval = 2 # API 请求间隔 (秒)​

def _get_db_connection (self):​

"""获取数据库连接"""​

try:​

conn = pymysql.connect (​

host=self.db_config ['host'],​

port=self.db_config ['port'],​

user=self.db_config ['user'],​

password=self.db_config ['password'],​

db=self.db_config ['db'],​

charset=self.db_config ['charset'],​

cursorclass=DictCursor​

)​

logger.info("数据库连接成功")​

return conn​

except Exception as e:​

logger.error (f"数据库连接失败: {str (e)}")​

raise​

def _create_tables (self):​

"""创建数据库表"""​

try:​

with self.db_conn.cursor () as cursor:​

商品表​

cursor.execute ('''​

CREATE TABLE IF NOT EXISTS products (​

id BIGINT PRIMARY KEY COMMENT ' 商品 ID',​

title VARCHAR (255) COMMENT ' 商品标题 ',​

cat_name VARCHAR (100) COMMENT ' 商品分类 ',​

price DECIMAL (10,2) COMMENT ' 商品价格 ',​

sales INT COMMENT ' 销量 ',​

stock INT COMMENT ' 库存 ',​

nick VARCHAR (100) COMMENT ' 卖家昵称 ',​

shop_title VARCHAR (100) COMMENT ' 店铺名称 ',​

pic_url VARCHAR (255) COMMENT ' 商品主图 URL',​

detail_url VARCHAR (255) COMMENT ' 商品详情页 URL',​

created_time DATETIME COMMENT ' 商品创建时间 ',​

collected_time DATETIME COMMENT ' 采集时间 ',​

updated_time DATETIME COMMENT ' 更新时间 '​

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' 淘宝商品表 ';​

''')​

商品详情表​

cursor.execute ('''​

CREATE TABLE IF NOT EXISTS product_details (​

product_id BIGINT PRIMARY KEY COMMENT ' 商品 ID',​

desc TEXT COMMENT ' 商品描述 ',​

props TEXT COMMENT ' 商品属性 ',​

sku TEXT COMMENT 'SKU 信息 ',​

collected_time DATETIME COMMENT ' 采集时间 ',​

updated_time DATETIME COMMENT ' 更新时间 '​

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' 淘宝商品详情表 ';​

''')​

采集任务表​

cursor.execute ('''​

CREATE TABLE IF NOT EXISTS collection_tasks (​

id INT AUTO_INCREMENT PRIMARY KEY,​

keyword VARCHAR (100) COMMENT ' 搜索关键词 ',​

page INT COMMENT ' 采集页数 ',​

status ENUM ('pending', 'running', 'completed', 'failed') DEFAULT 'pending' COMMENT ' 任务状态 ',​

start_time DATETIME COMMENT ' 开始时间 ',​

end_time DATETIME COMMENT ' 结束时间 ',​

total_products INT COMMENT ' 采集商品总数 ',​

comment VARCHAR (255) COMMENT ' 备注 '​

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' 采集任务表 ';​

''')​

self.db_conn.commit()​

logger.info("数据库表创建 / 检查完成")​

except Exception as e:​

logger.error (f"创建数据库表失败: {str (e)}")​

self.db_conn.rollback ()​

def _search_products (self, keyword, page=1):​

"""​

搜索商品​

:param keyword: 搜索关键词​

:param page: 页码​

:return: 商品列表​

"""​

try:​

request = TbkItemGetRequest ()​

request.set_app_info (appinfo (self.app_key, self.app_secret))​

设置请求参数​

request.keyword = keyword​

request.page_no = page​

request.page_size = self.page_size​

request.platform = 2 # 1:PC,2: 无线​

request.sort = "sale_desc" # 按销量降序​

调用 API​

response = request.getResponse()​

处理返回结果​

if "tbk_item_get_response" in response and "results" in response ["tbk_item_get_response"]:​

return response ["tbk_item_get_response"]["results"]["n_tbk_item"]​

return []​

except Exception as e:​

logger.error (f"搜索商品失败 (keyword: {keyword}, page: {page}): {str (e)}")​

return []​

def _get_product_detail (self, product_id):​

"""​

获取商品详情​

:param product_id: 商品 ID​

:return: 商品详情​

"""​

try:​

request = TbkItemDetailGetRequest ()​

request.set_app_info (appinfo (self.app_key, self.app_secret))​

request.num_iid = product_id​

调用 API​

response = request.getResponse()​

if "tbk_item_detail_get_response" in response and "data" in response ["tbk_item_detail_get_response"]:​

return response ["tbk_item_detail_get_response"]["data"]​

return None​

except Exception as e:​

logger.error (f"获取商品详情失败 (id: {product_id}): {str (e)}")​

return None​

def _save_products (self, products):​

"""保存商品数据到数据库"""​

if not products:​

return 0​

try:​

with self.db_conn.cursor() as cursor:​

count = 0​

now = datetime.now()​

for item in products:​

检查商品是否已存在​

cursor.execute("SELECT id FROM products WHERE id = %s", (item['num_iid'],))​

exists = cursor.fetchone() is not None​

if exists:​

更新现有商品​

sql = '''​

UPDATE products SET​

title = %s, price = %s, sales = %s, stock = %s,​

pic_url = %s, updated_time = %s​

WHERE id = %s​

'''​

cursor.execute(sql, (​

item['title'], item['zk_final_price'], item.get('sales', 0),​

item.get('stock', 0), item['pict_url'], now, item['num_iid']​

))​

else:​

插入新商品​

sql = '''​

INSERT INTO products (​

id, title, cat_name, price, sales, stock, nick, shop_title,​

pic_url, detail_url, created_time, collected_time, updated_time​

) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)​

'''​

cursor.execute(sql, (​

item['num_iid'], item['title'], item.get('cat_name', ''),​

item['zk_final_price'], item.get('sales', 0), item.get('stock', 0),​

item['nick'], item.get('shop_title', ''), item['pict_url'],​

item['item_url'], datetime.now(), now, now​

))​

count += 1​

self.db_conn.commit()​

logger.info(f"成功保存 / 更新 {count} 个商品")​

return count​

except Exception as e:​

logger.error (f"保存商品失败: {str (e)}")​

self.db_conn.rollback ()​

return 0​

def _save_product_details (self, product_id, detail):​

"""保存商品详情到数据库"""​

if not detail:​

return False​

try:​

with self.db_conn.cursor() as cursor:​

now = datetime.now()​

提取 SKU 信息​

sku_info = []​

if "sku" in detail and "sku_map" in detail["sku"]:​

for key, value in detail["sku"]["sku_map"].items():​

sku_info.append({​

"key": key,​

"price": value.get("price", ""),​

"stock": value.get("stock", 0),​

"props": value.get("props", "")​

})​

检查详情是否已存在​

cursor.execute("SELECT product_id FROM product_details WHERE product_id = %s", (product_id,))​

exists = cursor.fetchone() is not None​

if exists:​

更新现有详情​

sql = '''​

UPDATE product_details SET​

desc = %s, props = %s, sku = %s, updated_time = %s​

WHERE product_id = %s​

'''​

cursor.execute(sql, (​

str(detail.get("desc", "")), str(detail.get("props", "")),​

str(sku_info), now, product_id​

))​

else:​

插入新详情​

sql = '''​

INSERT INTO product_details (​

product_id, desc, props, sku, collected_time, updated_time​

) VALUES (%s, %s, %s, %s, %s, %s)​

'''​

cursor.execute(sql, (​

product_id, str(detail.get("desc", "")), str(detail.get("props", "")),​

str(sku_info), now, now​

))​

self.db_conn.commit ()​

return True​

except Exception as e:​

logger.error (f"保存商品详情失败 (id: {product_id}): {str (e)}")​

self.db_conn.rollback ()​

return False​

def collect_by_keyword (self, keyword, collect_details=True):​

"""​

按关键词采集商品数据​

:param keyword: 搜索关键词​

:param collect_details: 是否采集商品详情​

:return: 采集总数​

"""​

logger.info(f"开始按关键词采集: {keyword}")​

创建采集任务记录​

task_id = self._create_task (keyword)​

if not task_id:​

logger.error ("创建采集任务失败")​

return 0​

total_count = 0​

try:​

分页采集​

for page in range(1, self.max_pages + 1):​

logger.info(f"采集第 {page} 页,关键词: {keyword}")​

搜索商品​

products = self._search_products(keyword, page)​

if not products:​

logger.info(f"第 {page} 页没有获取到商品数据,停止采集")​

break​

保存商品数据​

saved_count = self._save_products(products)​

total_count += saved_count​

采集商品详情​

if collect_details:​

for product in products:​

product_id = product['num_iid']​

logger.info(f"采集商品详情: {product_id}")​

detail = self._get_product_detail(product_id)​

if detail:​

self._save_product_details(product_id, detail)​

控制请求频率​

time.sleep(self.request_interval)​

控制请求频率​

time.sleep(self.request_interval)​

更新任务状态​

self._update_task(task_id, "completed", total_count)​

logger.info(f"关键词 {keyword} 采集完成,共采集 {total_count} 个商品")​

return total_count​

except Exception as e:​

logger.error (f"采集过程出错: {str (e)}")​

self._update_task (task_id, "failed", total_count, str (e))​

return total_count​

def _create_task (self, keyword):​

"""创建采集任务记录"""​

try:​

with self.db_conn.cursor () as cursor:​

sql = '''​

INSERT INTO collection_tasks (​

keyword, page, status, start_time​

) VALUES (% s, % s, % s, % s)​

'''​

cursor.execute (sql, (keyword, self.max_pages, 'running', datetime.now ()))​

self.db_conn.commit ()​

return cursor.lastrowid​

except Exception as e:​

logger.error (f"创建任务记录失败: {str (e)}")​

self.db_conn.rollback ()​

return None​

def _update_task (self, task_id, status, total_products=0, comment=""):​

"""更新任务状态"""​

try:​

with self.db_conn.cursor () as cursor:​

sql = '''​

UPDATE collection_tasks SET​

status = % s, end_time = % s,​

total_products = % s, comment = % s​

WHERE id = % s​

'''​

cursor.execute (sql, (status, datetime.now (), total_products, comment, task_id))​

self.db_conn.commit ()​

except Exception as e:​

logger.error (f"更新任务状态失败: {str (e)}")​

self.db_conn.rollback ()​

def export_to_excel (self, keyword, filename=None):​

"""将采集的商品数据导出为 Excel"""​

try:​

with self.db_conn.cursor () as cursor:​

搜索包含关键词的商品​

sql = "SELECT * FROM products WHERE title LIKE %s LIMIT 1000"​

cursor.execute(sql, (f'%{keyword}%',))​

products = cursor.fetchall()​

if not products:​

logger.info(f"没有找到包含关键词 {keyword} 的商品数据")​

return False​

转换为 DataFrame​

df = pd.DataFrame(products)​

生成文件名​

if not filename:​

filename = f"taobao_products_{keyword}_{datetime.now().strftime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/96332.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/96332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025.8.21总结

工作一年多了,在这期间,确实也有不少压力,但每当工作有压力的时候,最后面都会解决。好像每次遇到解决不了的事情,都有同事给我兜底。这种压力,确实会加速一个人的成长。这种狼性文化,这种环境&a…

VS2022 - C#程序简单打包操作

文章目录VS2022 - C#程序简单打包操作概述笔记实验过程新建工程让依赖的运行时程序安装包在安装时运行(如果发现运行时不能每次都安装程序,就不要做这步)关于”运行时安装程序无法每次都安装成功“的应对知识点尝试打包旧工程bug修复从需求属性中,可以原…

在JAVA中如何给Main方法传参?

一、在IDEA中进行传参:先创建一个类:MainTestimport java.util.Arrays;public class MainTest {public static void main(String[] args) {System.out.println(args.length);System.out.println(Arrays.toString(args));} }1.IDEA ---> 在运行的按钮上…

ORACLE中如何批量重置序列

背景:数据库所有序列都重置为1了,所以要将所有的序列都更新为对应的表主键(这里是id)的最大值1。我这里序列的规则是SEQ_表名。BEGINENHANCED_SYNC_SEQUENCES(WJ_CPP); -- 替换为你的模式名 END; / CREATE OR REPLACE PROCEDURE E…

公号文章排版教程:图文双排、添加图片超链接、往期推荐、推文采集(2025-08-21)

文章目录 排版的基本原则 I 图片超链接 方式1: 利用公号原生编辑器 方式2:在CSDN平台使用markdown编辑器, 利用标签实现图片链接。 II 排版小技巧 自定义页面模版教程 使用壹伴进行文章素材的采集 美编助手的往期推荐还不错 利用365编辑器创建图文双排效果 排版的基本原则 亲…

计算两幅图像在特定交点位置的置信度评分。置信度评分反映了该位置特征匹配的可靠性,通常用于图像处理任务(如特征匹配、立体视觉等)

这段代码定义了一个名为compute_confidence的函数,用于计算两幅图像在特定交点位置的置信度评分。置信度评分反映了该位置特征匹配的可靠性,通常用于图像处理任务(如特征匹配、立体视觉等)。以下是逐部分解析: 3. 结果…

计算机视觉第一课opencv(三)保姆级教学

简介 计算机视觉第一课opencv(一)保姆级教学 计算机视觉第一课opencv(二)保姆级教学 今天继续学习opencv。 一、 图像形态学 什么是形态学:图像形态学是一种处理图像形状特征的图像处理技术,主要用于描…

24.早期目标检测

早期目标检测 第一步,计算机图形学做初步大量候选框,把物体圈出来 第二步,依次将所有的候选框图片,输入到分类模型进行判断 选择性搜索 选择搜索算法(Selective Search),是一种熟知的计算机图像…

Java基础知识点汇总(三)

一、面向对象的特征有哪些方面 Java中面向对象的特征主要包括以下四个核心方面:封装(Encapsulation) 封装是指将对象的属性(数据)和方法(操作)捆绑在一起,隐藏对象的内部实现细节&am…

GEO优化专家孟庆涛:让AI“聪明”选择,为企业“精准”生长

在生成式AI席卷全球的今天,企业最常遇到的困惑或许是:“为什么我的AI生成内容总像‘模板套娃’?”“用户明明想要A,AI却拼命输出B?”当生成式AI从“能用”迈向“好用”的关键阶段,如何让AI真正理解用户需求…

【交易系统系列04】交易所版《速度与激情》:如何为狂飙的BTC交易引擎上演“空中加油”?

交易所版《速度与激情》:如何为狂飙的BTC交易引擎上演“空中加油”? 想象一下这个场景:你正端着一杯热气腾腾的咖啡,看着窗外我家那只贪睡的橘猫趴在阳光下打着呼噜。突然,手机上的警报开始尖叫,交易系统监…

windows下jdk环境切换为jdk17后,临时需要jdk1.8的处理

近段时间,终于决定把开发环境全面转向jdk17,这不就遇到了问题。 windows主环境已经设置为jdk17了。 修改的JAVA_HOME D:\java\jdk-17CLASSPATH设置 .;D:\java\jdk-17\lib\dt.jar;D:\java\jdk-17\lib\tools.jar;PATH中增加 D:\java\jdk-17\bin但是有些程序…

Android URC 介绍及源码案例参考

1. URC 含义 URC 是 Unsolicited Result Code(非请求结果码)的缩写。 它是 modem(基带)在不需要 AP 主动请求的情况下向上层主动上报的消息。 典型例子:短信到达提示、网络状态变更、来电通知、信号质量变化等。 URC 一般以 AT 命令扩展的形式从 modem 发到 AP,例如串口…

VB.NET发送邮件给OUTLOOK.COM的用户,用OUTLOOK.COM邮箱账号登录给别人发邮件

在VB.NET中通过代码发送邮件时,确实会遇到邮箱服务的身份认证(Authentication)要求。特别是微软Outlook/Hotmail等服务,已经逐步禁用传统的“基本身份验证”(Basic Authentication),转而强制要求…

【网络运维】Shell:变量进阶知识

Shell 变量进阶知识 Shell 中的特殊变量 位置参数变量 Shell 脚本中常用的位置参数变量如下: $0:获取当前执行的 Shell 脚本文件名(包含路径时包括路径)$n:获取第 n 个参数值(n>9 时需使用 ${n}&#xf…

部署Qwen2.5-VL-7B-Instruct-GPTQ-Int3

模型下载 from modelscope import snapshot_download model_dir snapshot_download(ChineseAlpacaGroup/Qwen2.5-VL-7B-Instruct-GPTQ-Int3)相关包导入 import os import numpy as np import pandas as pd from tqdm import tqdm from datetime import datetime,timedelta fro…

sourcetree 拉取代码

提示:文章旨在于教授大家 sourcetree 拉取代码的方式,关于代码的提交合并等操作后续会补充。 文章目录前言一、sourcetree 安装二、http 与 ssh 拉取代码1.http 方式(1)生成 token(2)拼接项目的 url&#x…

epoll模型网络编程知识要领

1、程序初始化创建监听socket调用bind函数绑定ip地址、port端口号调用listen函数监听调用epoll_create函数创建epollfd调用epoll_ctrl函数将listenfd绑定到epollfd上,监测listenfd的读事件在一个无限循环中,调用epoll_wait函数等待事件发生2、处理客户端…

15-day12LLM结构变化、位置编码和投机采样

多头机制transformer结构归一化层选择 归一化层位置归一化层类型激活函数Llama2结构MoE架构 混合专家模型DeepSeek MLA为何需要位置编码目前的主流位置编码正余弦位置编码可学习位置编码ROPE旋转位置编码推导参考: https://spaces.ac.cn/archives/8265 https://zhua…

记录 docker容器打包成镜像 在其他服务器快速启动镜像和容器

我有个nginx服务器 需要在其他服务器直接部署使用 里面都是完整的 使用 docker ps 查看容器id 进行打包成镜像docker commit [容器ID或名称] 新镜像名:版本 docker commit 28f60e2206b2 my-nginx-custom:v1镜像保存成文件 docker save -o my-nginx-custom.tar my-nginx-custom:…