在跨境电商领域,实时获取商品数据对商家优化定价策略、监控竞品动态及把握市场趋势至关重要。本文将详细介绍如何通过速卖通API 高效获取商品实时数据,涵盖从注册到数据处理的完整流程,并提供可复用的代码示例。
1. 速卖通 API 接入基础
1.1 平台概述
速卖通平台提供 RESTful API 接口,支持商品信息、订单管理、物流查询等核心功能。需通过认证获取访问权限,接口返回 JSON 格式数据。
1.2 接入准备
- 注册账号
- 获取
ApiKey
和ApiSecret
- 申请接口权限(如商品详情、列表查询)
- 了解 API 调用频率限制(通常为 5-10 次 / 秒)
2. 认证与授权流程
速卖通 API 采用授权码模式,需完成以下步骤:
import requests
import json
import time
from urllib.parse import urlencode
# 配置信息
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
REDIRECT_URI = "https://your-callback-url.com"
AUTHORIZATION_CODE = "" # 授权后获取
ACCESS_TOKEN = "" # 访问令牌
REFRESH_TOKEN = "" # 刷新令牌(有效期1年)
# 1. 获取授权URL
def get_authorization_url():
params = {
"client_id": APP_KEY,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": "all",
"state": "init"
}
return f"https://gw.api.alibaba.com/auth/authorize.htm?{urlencode(params)}"
# 2. 通过授权码换取访问令牌
def get_access_token(auth_code):
url = "https://gw.api.alibaba.com/openapi/http/1/system.oauth2/getToken"
payload = {
"grant_type": "authorization_code",
"client_id": APP_KEY,
"client_secret": APP_SECRET,
"code": auth_code,
"redirect_uri": REDIRECT_URI
}
response = requests.post(url, data=payload)
return response.json()
# 3. 刷新访问令牌(避免频繁授权)
def refresh_token(refresh_token):
url = "https://gw.api.alibaba.com/openapi/http/1/system.oauth2/getToken"
payload = {
"grant_type": "refresh_token",
"client_id": APP_KEY,
"client_secret": APP_SECRET,
"refresh_token": refresh_token
}
response = requests.post(url, data=payload)
return response.json()
3. 商品数据 API 调用实现
3.1 签名生成算法
速卖通 API 要求对请求参数进行 HMAC-SHA1 签名:
import hmac
import hashlib
def generate_signature(params, app_secret):
"""生成API请求签名"""
# 按参数名排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接参数名和值
string_to_sign = app_secret
for key, value in sorted_params:
string_to_sign += f"{key}{value}"
# HMAC-SHA1加密
signature = hmac.new(
app_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha1
).hexdigest().upper()
return signature
3.2 商品详情 API 调用
def get_product_detail(product_id, access_token):
"""获取单个商品详情"""
api_url = f"https://gw.api.alibaba.com/openapi/param2/2/aliexpress.affiliate.productdetail.get/{APP_KEY}"
# 请求参数
params = {
"app_key": APP_KEY,
"session": access_token,
"timestamp": int(time.time() * 1000), # 毫秒级时间戳
"format": "json",
"v": "2",
"sign_method": "hmac-sha1",
"product_id": product_id,
"fields": "productId,productTitle,originalPrice,salePrice,imageUrl,categoryId,rating,orderCount,storeName,description"
}
# 生成签名
params["sign"] = generate_signature(params, APP_SECRET)
# 发送请求
response = requests.post(api_url, data=params)
return response.json()
3.3 商品列表批量查询
def get_product_list(keywords, page=1, page_size=20, access_token=None):
"""根据关键词搜索商品列表"""
api_url = f"https://gw.api.alibaba.com/openapi/param2/2/aliexpress.affiliate.products.search/{APP_KEY}"
params = {
"app_key": APP_KEY,
"session": access_token,
"timestamp": int(time.time() * 1000),
"format": "json",
"v": "2",
"sign_method": "hmac-sha1",
"keywords": keywords,
"page_no": page,
"page_size": page_size,
"fields": "productId,productTitle,originalPrice,salePrice,imageUrl,categoryId,rating,orderCount,storeName"
}
params["sign"] = generate_signature(params, APP_SECRET)
response = requests.post(api_url, data=params)
return response.json()
4. 数据处理与存储
4.1 数据解析与清洗
def parse_product_data(api_response):
"""解析API返回的商品数据"""
if "errorCode" in api_response:
print(f"API错误: {api_response['errorCode']} - {api_response['errorMessage']}")
return None
if "result" not in api_response:
print("无有效数据返回")
return None
# 提取商品信息
product = api_response["result"]
parsed_data = {
"product_id": product.get("productId"),
"title": product.get("productTitle"),
"original_price": float(product.get("originalPrice", 0)),
"sale_price": float(product.get("salePrice", 0)),
"discount_rate": round(1 - (float(product.get("salePrice", 0)) / float(product.get("originalPrice", 1))), 2),
"image_url": product.get("imageUrl"),
"category_id": product.get("categoryId"),
"rating": float(product.get("rating", 0)),
"order_count": int(product.get("orderCount", 0)),
"store_name": product.get("storeName"),
"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
return parsed_data
4.2 数据存储(MySQL 示例)
import pymysql
from pymysql.cursors import DictCursor
def save_to_mysql(product_data, table_name="aliexpress_products"):
"""将商品数据存入MySQL数据库"""
connection = pymysql.connect(
host="localhost",
user="your_username",
password="your_password",
database="aliexpress_data",
charset="utf8mb4",
cursorclass=DictCursor
)
try:
with connection.cursor() as cursor:
# 检查表格是否存在,不存在则创建
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS `{table_name}` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`product_id` VARCHAR(255) NOT NULL,
`title` TEXT,
`original_price` DECIMAL(10, 2),
`sale_price` DECIMAL(10, 2),
`discount_rate` DECIMAL(3, 2),
`image_url` TEXT,
`category_id` VARCHAR(255),
`rating` DECIMAL(3, 2),
`order_count` INT,
`store_name` VARCHAR(255),
`crawl_time` DATETIME
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor.execute(create_table_sql)
# 插入数据
insert_sql = f"""
INSERT INTO `{table_name}`
(`product_id`, `title`, `original_price`, `sale_price`, `discount_rate`,
`image_url`, `category_id`, `rating`, `order_count`, `store_name`, `crawl_time`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
cursor.execute(insert_sql, (
product_data["product_id"],
product_data["title"],
product_data["original_price"],
product_data["sale_price"],
product_data["discount_rate"],
product_data["image_url"],
product_data["category_id"],
product_data["rating"],
product_data["order_count"],
product_data["store_name"],
product_data["crawl_time"]
))
# 提交事务
connection.commit()
print(f"成功存储商品: {product_data['product_id']}")
except Exception as e:
print(f"存储失败: {str(e)}")
connection.rollback()
finally:
connection.close()
5.2 智能限流与重试
import time
from functools import wraps
def rate_limit(max_calls=5, period=1):
"""API调用限流装饰器"""
calls = []
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 移除时间窗口外的调用记录
now = time.time()
calls[:] = [t for t in calls if t > now - period]
# 检查是否超过限制
if len(calls) >= max_calls:
wait_time = period - (now - calls[0])
time.sleep(wait_time)
# 记录本次调用
calls.append(now)
# 执行函数
return func(*args, **kwargs)
return wrapper
return decorator
def retry(max_attempts=3, delay=1):
"""API调用失败重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts == max_attempts:
raise e
print(f"尝试 {attempts}/{max_attempts} 失败: {str(e)},{delay}秒后重试")
time.sleep(delay)
delay *= 2 # 指数退避
return wrapper
return decorator
6. 数据监控与分析
6.1 价格波动监控
def monitor_price_changes(product_id, threshold=0.1):
"""监控商品价格变化(与历史数据对比)"""
connection = pymysql.connect(
host="localhost",
user="your_username",
password="your_password",
database="aliexpress_data",
charset="utf8mb4",
cursorclass=DictCursor
)
try:
with connection.cursor() as cursor:
# 查询最近两次价格
query = """
SELECT sale_price
FROM aliexpress_products
WHERE product_id = %s
ORDER BY crawl_time DESC
LIMIT 2;
"""
cursor.execute(query, (product_id,))
results = cursor.fetchall()
if len(results) < 2:
print("历史数据不足,无法判断价格变化")
return None
old_price = float(results[1]["sale_price"])
new_price = float(results[0]["sale_price"])
change_rate = (new_price - old_price) / old_price
if abs(change_rate) >= threshold:
print(f"价格变动超过{threshold*100}%: 原价{old_price} → 现价{new_price}")
return {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_rate": change_rate
}
return None
finally:
connection.close()
7. 部署与运维建议
定时任务配置:
- 使用
APScheduler
或 Linuxcrontab
设置定时采集 - 避免高峰期集中请求,分散 API 调用时间
- 使用
异常处理机制:
- 记录详细日志,包括请求参数、响应内容和错误信息
- 设置告警系统(邮件 / 短信),监控 API 调用成功率
数据安全:
- 敏感信息(如 AppSecret)使用环境变量管理
- 定期备份数据库,防止数据丢失
性能优化:
- 使用缓存(如 Redis)存储高频访问数据
- 优化数据库查询,添加合适索引
8. 完整示例:商品监控系统
# 完整示例:商品监控系统
if __name__ == "__main__":
# 1. 获取授权(首次运行)
# auth_url = get_authorization_url()
# print(f"请访问以下URL授权: {auth_url}")
# # 授权后获取AUTHORIZATION_CODE,然后:
# token_data = get_access_token(AUTHORIZATION_CODE)
# ACCESS_TOKEN = token_data["access_token"]
# REFRESH_TOKEN = token_data["refresh_token"]
# 2. 刷新令牌(后续运行)
# token_data = refresh_token(REFRESH_TOKEN)
# ACCESS_TOKEN = token_data["access_token"]
# 3. 监控商品列表
products_to_monitor = ["3256804740043007", "3256804740043008", "3256804740043009"]
for product_id in products_to_monitor:
# 获取商品详情
result = get_product_detail(product_id, ACCESS_TOKEN)
# 解析数据
product_data = parse_product_data(result)
if product_data:
# 存储数据
save_to_mysql(product_data)
# 检查价格变化
price_change = monitor_price_changes(product_id)
if price_change:
print(f"⚠️ 价格变动警报: {price_change}")
总结
通过本文介绍的方法,开发者可以高效接入速卖通 API,实现商品数据的实时采集与监控。关键技术要点包括:
- 认证流程
- API 签名生成算法
- 并发请求与限流控制
- 数据解析与持久化
- 价格波动监控机制
实际应用中,建议根据业务需求调整采集频率和数据字段,同时注意遵守速卖通 API 使用规范,避免因过度请求导致 IP 封禁。通过合理设计架构,可以构建出稳定、高效的跨境电商数据采集系统。