构建分布式京东商品数据采集系统:基于 API 的微服务实现方案

发布于:2025-09-11 ⋅ 阅读:(19) ⋅ 点赞:(0)

随着电子商务的快速发展,商品数据的采集与分析成为企业制定营销策略、优化产品布局的重要依据。京东作为国内领先的电商平台,其商品数据具有极高的商业价值。本文将介绍如何构建一个分布式的京东商品数据采集系统,采用基于 API 的微服务架构,实现高效、稳定、可扩展的数据采集能力。

系统架构设计

分布式京东商品数据采集系统采用微服务架构,主要包含以下几个核心服务:

  1. API 网关服务:统一入口,负责请求路由、负载均衡、认证授权
  2. 任务调度服务:管理采集任务的分发与状态跟踪
  3. 数据采集服务:实际执行商品数据采集的工作节点
  4. 数据存储服务:负责采集数据的持久化存储
  5. 数据清洗服务:对采集的原始数据进行清洗和标准化
  6. 监控告警服务:监控各服务状态,异常时触发告警

系统架构图如下:

plaintext

[客户端] → [API网关] → [任务调度服务]
                        ↓
[监控告警] ← [数据清洗] ← [数据采集集群] → [京东API/网页]
                        ↓
                   [数据存储服务]

核心服务实现

1. API 网关服务

采用 Spring Cloud Gateway 实现 API 网关,负责请求路由和负载均衡。

spring:
  cloud:
    gateway:
      routes:
        - id: task-service
          uri: lb://task-service
          predicates:
            - Path=/api/tasks/**filters:
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 10
                redis-rate-limiter.burstCapacity: 20
        - id: crawler-service
          uri: lb://crawler-service
          predicates:
            - Path=/api/crawlers/**
        - id: data-service
          uri: lb://data-service
          predicates:
            - Path=/api/data/**eureka:
  client:
    serviceUrl:
      defaultZone: http://eureka-server1:8761/eureka/,http://eureka-server2:8762/eureka/
server:
  port: 8080

2. 任务调度服务

任务调度服务负责管理采集任务的创建、分发和状态跟踪,采用 Spring Boot 实现。

@Service
public class TaskService {
    
    @Autowired
    private TaskRepository taskRepository;
    
    @Autowired
    private RestTemplate restTemplate;
    
    @Autowired
    private DiscoveryClient discoveryClient;
    
    // 创建新任务
    public Task createTask(TaskRequest request) {
        Task task = new Task();
        task.setKeywords(request.getKeywords());
        task.setStatus(TaskStatus.PENDING);
        task.setCreatedTime(new Date());
        task.setPriority(request.getPriority());
        task.setTotalPages(calculateTotalPages(request));
        
        return taskRepository.save(task);
    }
    
    // 分配任务给可用的采集节点
    @Scheduled(fixedRate = 5000)
    public void assignTasks() {
        List<Task> pendingTasks = taskRepository.findByStatus(TaskStatus.PENDING);
        if (pendingTasks.isEmpty()) {
            return;
        }
        
        List<String> crawlerInstances = discoveryClient.getInstances("crawler-service")
                .stream()
                .map(instance -> instance.getHost() + ":" + instance.getPort())
                .collect(Collectors.toList());
        
        if (crawlerInstances.isEmpty()) {
            log.warn("No available crawler instances");
            return;
        }
        
        // 简单的轮询分配策略
        for (int i = 0; i < pendingTasks.size(); i++) {
            Task task = pendingTasks.get(i);
            String crawlerUrl = "http://" + crawlerInstances.get(i % crawlerInstances.size()) + "/api/crawlers/execute";
            
            try {
                restTemplate.postForObject(crawlerUrl, task, Task.class);
                task.setStatus(TaskStatus.ASSIGNED);
                taskRepository.save(task);
                log.info("Task {} assigned to crawler", task.getId());
            } catch (Exception e) {
                log.error("Failed to assign task {} to crawler", task.getId(), e);
            }
        }
    }
    
    // 计算总页数
    private int calculateTotalPages(TaskRequest request) {
        // 根据关键词预估总页数,实际应用中可根据京东API返回结果动态调整
        return 10; // 简化示例
    }
    
    // 更新任务状态
    public void updateTaskStatus(Long taskId, TaskStatus status) {
        Task task = taskRepository.findById(taskId)
                .orElseThrow(() -> new TaskNotFoundException("Task not found: " + taskId));
        
        task.setStatus(status);
        if (status == TaskStatus.COMPLETED) {
            task.setCompletedTime(new Date());
        }
        
        taskRepository.save(task);
    }
}

3. 数据采集服务

数据采集服务是实际执行商品数据采集的工作节点,采用分布式部署以提高采集效率和系统可用性。

@Service
public class JdCrawlerService {
    
    private static final Logger log = LoggerFactory.getLogger(JdCrawlerService.class);
    
    @Autowired
    private RestTemplate restTemplate;
    
    @Value("${jd.api.url}")
    private String jdApiUrl;
    
    @Value("${jd.api.appkey}")
    private String appKey;
    
    @Value("${jd.api.secret}")
    private String secret;
    
    @Autowired
    private DataServiceClient dataServiceClient;
    
    // 执行采集任务
    public void executeTask(Task task) {
        log.info("Starting to crawl task: {}", task.getId());
        
        try {
            // 更新任务状态为运行中
            updateTaskStatus(task.getId(), TaskStatus.RUNNING);
            
            // 分页采集商品数据
            for (int page = 1; page <= task.getTotalPages(); page++) {
                List<Product> products = crawlPage(task.getKeywords(), page);
                
                if (products.isEmpty()) {
                    log.info("No more products found for task {}, stopping early", task.getId());
                    break;
                }
                
                // 保存采集的数据
                dataServiceClient.saveProducts(products, task.getId());
                log.info("Crawled page {} for task {}, {} products", 
                        page, task.getId(), products.size());
                
                // 随机休眠,避免触发反爬机制
                Thread.sleep(new Random().nextInt(3000) + 2000);
            }
            
            // 采集完成,更新任务状态
            updateTaskStatus(task.getId(), TaskStatus.COMPLETED);
            log.info("Task {} completed successfully", task.getId());
            
        } catch (Exception e) {
            log.error("Error crawling task {}", task.getId(), e);
            updateTaskStatus(task.getId(), TaskStatus.FAILED);
        }
    }
    
    // 采集单页商品数据
    private List<Product> crawlPage(String keywords, int page) {
        try {
            // 构建API请求参数
            String timestamp = String.valueOf(System.currentTimeMillis());
            String sign = generateSign(keywords, page, timestamp);
            
            Map<String, String> params = new HashMap<>();
            params.put("keyword", keywords);
            params.put("page", String.valueOf(page));
            params.put("appkey", appKey);
            params.put("timestamp", timestamp);
            params.put("sign", sign);
            
            // 调用京东API获取商品数据
            String url = buildUrlWithParams(jdApiUrl, params);
            JdApiResponse response = restTemplate.getForObject(url, JdApiResponse.class);
            
            if (response.isSuccess()) {
                return convertToProducts(response.getData());
            } else {
                log.error("JD API request failed: {}", response.getMsg());
                return Collections.emptyList();
            }
        } catch (Exception e) {
            log.error("Error crawling page {} for keyword {}", page, keywords, e);
            return Collections.emptyList();
        }
    }
    
    // 生成签名,防止API滥用
    private String generateSign(String keyword, int page, String timestamp) {
        String baseString = appKey + keyword + page + timestamp + secret;
        return DigestUtils.md5DigestAsHex(baseString.getBytes(StandardCharsets.UTF_8));
    }
    
    // 构建带参数的URL
    private String buildUrlWithParams(String baseUrl, Map<String, String> params) {
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(baseUrl);
        params.forEach(builder::queryParam);
        return builder.toUriString();
    }
    
    // 转换API响应数据为Product对象
    private List<Product> convertToProducts(List<Map<String, Object>> dataList) {
        List<Product> products = new ArrayList<>();
        
        for (Map<String, Object> data : dataList) {
            Product product = new Product();
            product.setSkuId(data.get("sku_id").toString());
            product.setName(data.get("name").toString());
            product.setPrice(new BigDecimal(data.get("price").toString()));
            product.setShopName(data.get("shop_name").toString());
            product.setCommentCount(Integer.parseInt(data.get("comment_count").toString()));
            product.setGoodRate(new BigDecimal(data.get("good_rate").toString()));
            product.setCrawlTime(new Date());
            
            products.add(product);
        }
        
        return products;
    }
    
    // 更新任务状态
    private void updateTaskStatus(Long taskId, TaskStatus status) {
        // 调用任务服务更新状态
        restTemplate.put("http://task-service/api/tasks/" + taskId + "/status", status);
    }
}

4. 数据存储服务

数据存储服务负责将采集到的商品数据进行持久化存储,采用 MySQL 作为主数据库,Redis 作为缓存。

@Service
public class ProductService {
    
    @Autowired
    private ProductRepository productRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 批量保存商品数据
    @Transactional
    public void saveProducts(List<Product> products, Long taskId) {
        if (CollectionUtils.isEmpty(products)) {
            return;
        }
        
        // 批量保存到数据库
        List<Product> savedProducts = productRepository.saveAll(products);
        
        // 缓存热门商品
        saveHotProductsToCache(savedProducts);
        
        // 记录任务关联的商品ID
        saveProductTaskRelation(savedProducts, taskId);
    }
    
    // 缓存热门商品
    private void saveHotProductsToCache(List<Product> products) {
        // 筛选评论数多的商品作为热门商品
        List<Product> hotProducts = products.stream()
                .sorted((p1, p2) -> Integer.compare(p2.getCommentCount(), p1.getCommentCount()))
                .limit(10)
                .collect(Collectors.toList());
        
        // 存入Redis缓存,过期时间1小时
        for (Product product : hotProducts) {
            redisTemplate.opsForValue().set(
                    "product:hot:" + product.getSkuId(), 
                    product, 
                    1, 
                    TimeUnit.HOURS
            );
        }
    }
    
    // 记录任务与商品的关联关系
    private void saveProductTaskRelation(List<Product> products, Long taskId) {
        List<ProductTaskRelation> relations = products.stream()
                .map(product -> {
                    ProductTaskRelation relation = new ProductTaskRelation();
                    relation.setProductId(product.getId());
                    relation.setTaskId(taskId);
                    return relation;
                })
                .collect(Collectors.toList());
        
        // 保存关联关系
        // productTaskRelationRepository.saveAll(relations);
    }
    
    // 根据关键词查询商品
    public Page<Product> findProductsByKeyword(String keyword, Pageable pageable) {
        // 先查缓存
        String cacheKey = "products:keyword:" + keyword + ":" + pageable.getPageNumber() + ":" + pageable.getPageSize();
        Page<Product> cachedPage = (Page<Product>) redisTemplate.opsForValue().get(cacheKey);
        
        if (cachedPage != null) {
            return cachedPage;
        }
        
        // 缓存未命中,查询数据库
        Page<Product> productPage = productRepository.findByNameContaining(keyword, pageable);
        
        // 存入缓存,过期时间30分钟
        redisTemplate.opsForValue().set(cacheKey, productPage, 30, TimeUnit.MINUTES);
        
        return productPage;
    }
    
    // 根据SKU ID查询商品详情
    public Product findBySkuId(String skuId) {
        // 先查缓存
        Product cachedProduct = (Product) redisTemplate.opsForValue().get("product:sku:" + skuId);
        if (cachedProduct != null) {
            return cachedProduct;
        }
        
        // 缓存未命中,查询数据库
        Product product = productRepository.findBySkuId(skuId)
                .orElseThrow(() -> new ProductNotFoundException("Product not found: " + skuId));
        
        // 存入缓存,过期时间1小时
        redisTemplate.opsForValue().set("product:sku:" + skuId, product, 1, TimeUnit.HOURS);
        
        return product;
    }
}

系统扩展性与可靠性设计

1. 水平扩展能力

系统设计支持水平扩展,通过增加服务实例即可提高系统的处理能力:

  • 采集服务可根据任务量动态扩缩容
  • 采用服务注册与发现机制(如 Eureka),新增节点自动加入集群
  • 任务调度服务采用分布式锁避免任务重复分配
@Component
public class RedisDistributedLock {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String LOCK_PREFIX = "distributed:lock:";
    private static final int LOCK_EXPIRE = 30000; // 锁默认过期时间30秒
    
    // 获取锁
    public boolean lock(String key) {
        return lock(key, LOCK_EXPIRE);
    }
    
    // 获取锁,指定过期时间
    public boolean lock(String key, int expireMillis) {
        String lockKey = LOCK_PREFIX + key;
        String value = UUID.randomUUID().toString();
        
        // 使用SET NX EX命令获取锁
        Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireMillis, TimeUnit.MILLISECONDS);
        
        return Boolean.TRUE.equals(success);
    }
    
    // 释放锁
    public void unlock(String key) {
        String lockKey = LOCK_PREFIX + key;
        String value = redisTemplate.opsForValue().get(lockKey);
        
        if (value != null) {
            // 使用Lua脚本保证删除操作的原子性
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class), 
                    Collections.singletonList(lockKey), value);
        }
    }
    
    // 尝试获取锁,带超时时间
    public boolean tryLock(String key, long waitTime, int leaseTime) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        String lockKey = LOCK_PREFIX + key;
        String value = UUID.randomUUID().toString();
        
        while (true) {
            // 尝试获取锁
            Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, value, leaseTime, TimeUnit.MILLISECONDS);
            if (Boolean.TRUE.equals(success)) {
                return true;
            }
            
            // 判断是否超时
            if (System.currentTimeMillis() - startTime > waitTime) {
                return false;
            }
            
            // 短暂休眠后重试
            Thread.sleep(100);
        }
    }
}

2. 容错与重试机制

为提高系统可靠性,设计了多层次的容错机制:

  • 服务间调用采用熔断机制(如 Resilience4j)
  • 采集任务失败自动重试
  • 节点故障时任务自动转移到其他节点
@Configuration
public class RetryConfig {
    
    @Bean
    public RetryRegistry retryRegistry() {
        RetryConfig config = RetryConfig.custom()
                .maxAttempts(3) // 最大重试次数
                .waitDuration(Duration.ofSeconds(2)) // 重试间隔
                .retryExceptions(IOException.class, RestClientException.class) // 需要重试的异常
                .ignoreExceptions(IllegalArgumentException.class) // 不需要重试的异常
                .build();
        
        return RetryRegistry.of(config);
    }
    
    @Bean
    public Retry taskRetry(RetryRegistry retryRegistry) {
        return retryRegistry.retry("taskRetry");
    }
    
    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
                .failureRateThreshold(50) // 失败率阈值,超过此时打开断路器
                .waitDurationInOpenState(Duration.ofSeconds(60)) // 断路器打开状态持续时间
                .slidingWindowSize(10) // 滑动窗口大小
                .build();
        
        return CircuitBreakerRegistry.of(config);
    }
    
    @Bean
    public CircuitBreaker jdApiCircuitBreaker(CircuitBreakerRegistry registry) {
        return registry.circuitBreaker("jdApi");
    }
}

监控与告警

系统集成了 Spring Boot Actuator 和 Prometheus、Grafana 实现监控功能,主要监控指标包括:

  • 各服务节点的 CPU、内存、磁盘使用率
  • 任务执行成功率、平均执行时间
  • API 调用频率、响应时间
  • 数据采集量、存储量
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true
  endpoint:
    health:
      show-details: always
      probes:
        enabled: true

# 自定义监控指标
custom:
  metrics:
    task:
      enabled: true
    crawler:
      enabled: true

总结与展望

本文介绍了基于微服务架构的分布式京东商品数据采集系统的设计与实现方案。该系统具有以下特点:

  1. 高可扩展性:采用微服务架构,支持各组件独立扩展
  2. 高可靠性:通过熔断、重试、分布式锁等机制保证系统稳定运行
  3. 高效采集:分布式部署的采集节点可并行处理大量采集任务
  4. 易于维护:各服务职责单一,便于开发和维护

未来可以从以下几个方面进行优化:

  1. 引入机器学习算法优化任务分配策略,提高系统整体效率
  2. 增加数据挖掘和分析功能,提供更有价值的商业洞察
  3. 优化反爬机制的应对策略,提高数据采集的稳定性
  4. 引入流处理技术,实现实时数据采集与分析

通过这个分布式商品数据采集系统,企业可以高效、稳定地获取京东平台的商品数据,为业务决策提供有力支持。


网站公告

今日签到

点亮在社区的每一天
去签到