高效获取速卖通商品实时数据:API 接口开发与接入全流程

发布于:2025-06-22 ⋅ 阅读:(21) ⋅ 点赞:(0)

在跨境电商领域,实时获取商品数据对商家优化定价策略、监控竞品动态及把握市场趋势至关重要。本文将详细介绍如何通过速卖通API 高效获取商品实时数据,涵盖从注册到数据处理的完整流程,并提供可复用的代码示例。

1. 速卖通 API 接入基础
1.1 平台概述

速卖通平台提供 RESTful API 接口,支持商品信息、订单管理、物流查询等核心功能。需通过认证获取访问权限,接口返回 JSON 格式数据。

1.2 接入准备
  • 注册账号
  • 获取ApiKeyApiSecret
  • 申请接口权限(如商品详情、列表查询)
  • 了解 API 调用频率限制(通常为 5-10 次 / 秒)
2. 认证与授权流程

速卖通 API 采用授权码模式,需完成以下步骤:

 

import requests
import json
import time
from urllib.parse import urlencode

# 配置信息
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
REDIRECT_URI = "https://your-callback-url.com"
AUTHORIZATION_CODE = ""  # 授权后获取
ACCESS_TOKEN = ""        # 访问令牌
REFRESH_TOKEN = ""       # 刷新令牌(有效期1年)

# 1. 获取授权URL
def get_authorization_url():
    params = {
        "client_id": APP_KEY,
        "redirect_uri": REDIRECT_URI,
        "response_type": "code",
        "scope": "all",
        "state": "init"
    }
    return f"https://gw.api.alibaba.com/auth/authorize.htm?{urlencode(params)}"

# 2. 通过授权码换取访问令牌
def get_access_token(auth_code):
    url = "https://gw.api.alibaba.com/openapi/http/1/system.oauth2/getToken"
    payload = {
        "grant_type": "authorization_code",
        "client_id": APP_KEY,
        "client_secret": APP_SECRET,
        "code": auth_code,
        "redirect_uri": REDIRECT_URI
    }
    response = requests.post(url, data=payload)
    return response.json()

# 3. 刷新访问令牌(避免频繁授权)
def refresh_token(refresh_token):
    url = "https://gw.api.alibaba.com/openapi/http/1/system.oauth2/getToken"
    payload = {
        "grant_type": "refresh_token",
        "client_id": APP_KEY,
        "client_secret": APP_SECRET,
        "refresh_token": refresh_token
    }
    response = requests.post(url, data=payload)
    return response.json()
3. 商品数据 API 调用实现
3.1 签名生成算法

速卖通 API 要求对请求参数进行 HMAC-SHA1 签名:

import hmac
import hashlib

def generate_signature(params, app_secret):
    """生成API请求签名"""
    # 按参数名排序
    sorted_params = sorted(params.items(), key=lambda x: x[0])
    
    # 拼接参数名和值
    string_to_sign = app_secret
    for key, value in sorted_params:
        string_to_sign += f"{key}{value}"
    
    # HMAC-SHA1加密
    signature = hmac.new(
        app_secret.encode("utf-8"),
        string_to_sign.encode("utf-8"),
        hashlib.sha1
    ).hexdigest().upper()
    
    return signature

 

3.2 商品详情 API 调用
def get_product_detail(product_id, access_token):
    """获取单个商品详情"""
    api_url = f"https://gw.api.alibaba.com/openapi/param2/2/aliexpress.affiliate.productdetail.get/{APP_KEY}"
    
    # 请求参数
    params = {
        "app_key": APP_KEY,
        "session": access_token,
        "timestamp": int(time.time() * 1000),  # 毫秒级时间戳
        "format": "json",
        "v": "2",
        "sign_method": "hmac-sha1",
        "product_id": product_id,
        "fields": "productId,productTitle,originalPrice,salePrice,imageUrl,categoryId,rating,orderCount,storeName,description"
    }
    
    # 生成签名
    params["sign"] = generate_signature(params, APP_SECRET)
    
    # 发送请求
    response = requests.post(api_url, data=params)
    return response.json()

 3.3 商品列表批量查询

def get_product_list(keywords, page=1, page_size=20, access_token=None):
    """根据关键词搜索商品列表"""
    api_url = f"https://gw.api.alibaba.com/openapi/param2/2/aliexpress.affiliate.products.search/{APP_KEY}"
    
    params = {
        "app_key": APP_KEY,
        "session": access_token,
        "timestamp": int(time.time() * 1000),
        "format": "json",
        "v": "2",
        "sign_method": "hmac-sha1",
        "keywords": keywords,
        "page_no": page,
        "page_size": page_size,
        "fields": "productId,productTitle,originalPrice,salePrice,imageUrl,categoryId,rating,orderCount,storeName"
    }
    
    params["sign"] = generate_signature(params, APP_SECRET)
    response = requests.post(api_url, data=params)
    return response.json()

 

4. 数据处理与存储
4.1 数据解析与清洗
def parse_product_data(api_response):
    """解析API返回的商品数据"""
    if "errorCode" in api_response:
        print(f"API错误: {api_response['errorCode']} - {api_response['errorMessage']}")
        return None
    
    if "result" not in api_response:
        print("无有效数据返回")
        return None
    
    # 提取商品信息
    product = api_response["result"]
    parsed_data = {
        "product_id": product.get("productId"),
        "title": product.get("productTitle"),
        "original_price": float(product.get("originalPrice", 0)),
        "sale_price": float(product.get("salePrice", 0)),
        "discount_rate": round(1 - (float(product.get("salePrice", 0)) / float(product.get("originalPrice", 1))), 2),
        "image_url": product.get("imageUrl"),
        "category_id": product.get("categoryId"),
        "rating": float(product.get("rating", 0)),
        "order_count": int(product.get("orderCount", 0)),
        "store_name": product.get("storeName"),
        "crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
    }
    return parsed_data

 4.2 数据存储(MySQL 示例)

import pymysql
from pymysql.cursors import DictCursor

def save_to_mysql(product_data, table_name="aliexpress_products"):
    """将商品数据存入MySQL数据库"""
    connection = pymysql.connect(
        host="localhost",
        user="your_username",
        password="your_password",
        database="aliexpress_data",
        charset="utf8mb4",
        cursorclass=DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            # 检查表格是否存在,不存在则创建
            create_table_sql = f"""
            CREATE TABLE IF NOT EXISTS `{table_name}` (
                `id` INT AUTO_INCREMENT PRIMARY KEY,
                `product_id` VARCHAR(255) NOT NULL,
                `title` TEXT,
                `original_price` DECIMAL(10, 2),
                `sale_price` DECIMAL(10, 2),
                `discount_rate` DECIMAL(3, 2),
                `image_url` TEXT,
                `category_id` VARCHAR(255),
                `rating` DECIMAL(3, 2),
                `order_count` INT,
                `store_name` VARCHAR(255),
                `crawl_time` DATETIME
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
            """
            cursor.execute(create_table_sql)
            
            # 插入数据
            insert_sql = f"""
            INSERT INTO `{table_name}` 
            (`product_id`, `title`, `original_price`, `sale_price`, `discount_rate`, 
            `image_url`, `category_id`, `rating`, `order_count`, `store_name`, `crawl_time`)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
            """
            
            cursor.execute(insert_sql, (
                product_data["product_id"],
                product_data["title"],
                product_data["original_price"],
                product_data["sale_price"],
                product_data["discount_rate"],
                product_data["image_url"],
                product_data["category_id"],
                product_data["rating"],
                product_data["order_count"],
                product_data["store_name"],
                product_data["crawl_time"]
            ))
        
        # 提交事务
        connection.commit()
        print(f"成功存储商品: {product_data['product_id']}")
        
    except Exception as e:
        print(f"存储失败: {str(e)}")
        connection.rollback()
        
    finally:
        connection.close()

 5.2 智能限流与重试

import time
from functools import wraps

def rate_limit(max_calls=5, period=1):
    """API调用限流装饰器"""
    calls = []
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 移除时间窗口外的调用记录
            now = time.time()
            calls[:] = [t for t in calls if t > now - period]
            
            # 检查是否超过限制
            if len(calls) >= max_calls:
                wait_time = period - (now - calls[0])
                time.sleep(wait_time)
            
            # 记录本次调用
            calls.append(now)
            
            # 执行函数
            return func(*args, **kwargs)
        
        return wrapper
    
    return decorator

def retry(max_attempts=3, delay=1):
    """API调用失败重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    if attempts == max_attempts:
                        raise e
                    print(f"尝试 {attempts}/{max_attempts} 失败: {str(e)},{delay}秒后重试")
                    time.sleep(delay)
                    delay *= 2  # 指数退避
        return wrapper
    return decorator

 

6. 数据监控与分析
6.1 价格波动监控

 

def monitor_price_changes(product_id, threshold=0.1):
    """监控商品价格变化(与历史数据对比)"""
    connection = pymysql.connect(
        host="localhost",
        user="your_username",
        password="your_password",
        database="aliexpress_data",
        charset="utf8mb4",
        cursorclass=DictCursor
    )
    
    try:
        with connection.cursor() as cursor:
            # 查询最近两次价格
            query = """
            SELECT sale_price 
            FROM aliexpress_products 
            WHERE product_id = %s 
            ORDER BY crawl_time DESC 
            LIMIT 2;
            """
            cursor.execute(query, (product_id,))
            results = cursor.fetchall()
            
            if len(results) < 2:
                print("历史数据不足,无法判断价格变化")
                return None
            
            old_price = float(results[1]["sale_price"])
            new_price = float(results[0]["sale_price"])
            change_rate = (new_price - old_price) / old_price
            
            if abs(change_rate) >= threshold:
                print(f"价格变动超过{threshold*100}%: 原价{old_price} → 现价{new_price}")
                return {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_rate": change_rate
                }
            return None
            
    finally:
        connection.close()
7. 部署与运维建议
  1. 定时任务配置

    • 使用APScheduler或 Linuxcrontab设置定时采集
    • 避免高峰期集中请求,分散 API 调用时间
  2. 异常处理机制

    • 记录详细日志,包括请求参数、响应内容和错误信息
    • 设置告警系统(邮件 / 短信),监控 API 调用成功率
  3. 数据安全

    • 敏感信息(如 AppSecret)使用环境变量管理
    • 定期备份数据库,防止数据丢失
  4. 性能优化

    • 使用缓存(如 Redis)存储高频访问数据
    • 优化数据库查询,添加合适索引
8. 完整示例:商品监控系统
# 完整示例:商品监控系统
if __name__ == "__main__":
    # 1. 获取授权(首次运行)
    # auth_url = get_authorization_url()
    # print(f"请访问以下URL授权: {auth_url}")
    # # 授权后获取AUTHORIZATION_CODE,然后:
    # token_data = get_access_token(AUTHORIZATION_CODE)
    # ACCESS_TOKEN = token_data["access_token"]
    # REFRESH_TOKEN = token_data["refresh_token"]
    
    # 2. 刷新令牌(后续运行)
    # token_data = refresh_token(REFRESH_TOKEN)
    # ACCESS_TOKEN = token_data["access_token"]
    
    # 3. 监控商品列表
    products_to_monitor = ["3256804740043007", "3256804740043008", "3256804740043009"]
    
    for product_id in products_to_monitor:
        # 获取商品详情
        result = get_product_detail(product_id, ACCESS_TOKEN)
        
        # 解析数据
        product_data = parse_product_data(result)
        if product_data:
            # 存储数据
            save_to_mysql(product_data)
            
            # 检查价格变化
            price_change = monitor_price_changes(product_id)
            if price_change:
                print(f"⚠️ 价格变动警报: {price_change}")

 

总结

通过本文介绍的方法,开发者可以高效接入速卖通 API,实现商品数据的实时采集与监控。关键技术要点包括:

  1. 认证流程
  2. API 签名生成算法
  3. 并发请求与限流控制
  4. 数据解析与持久化
  5. 价格波动监控机制

实际应用中,建议根据业务需求调整采集频率和数据字段,同时注意遵守速卖通 API 使用规范,避免因过度请求导致 IP 封禁。通过合理设计架构,可以构建出稳定、高效的跨境电商数据采集系统。


网站公告

今日签到

点亮在社区的每一天
去签到