JAVA接口调用限速器

发布于:2025-03-31 ⋅ 阅读:(27) ⋅ 点赞:(0)

目录

1、并发限速

2、串行限速  


 

需求:批量调用第三方ERP接口,对方接口限流时,减缓调用速率。

1、并发限速


@Slf4j
@RestController
public class ApiCallTask {
    //第三方接口
    @Resource
    private ErpService erpService;
    //异步线程池
    @Resource
    private ThreadPoolTaskExecutor taskExecutor;
    //定时调度器
    @Resource
    private ThreadPoolTaskScheduler taskScheduler;

    private static final BlockingQueue<Seller> sellerQueue = new LinkedBlockingQueue<>(1000);
    private static final RateLimiter rateLimiter = RateLimiter.create(200.0 / 60.0);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final int MAX_RETRY_COUNT = 5;
    private static final int BATCH_SIZE = 10;

    @Scheduled(cron = "0 0 2 * * ?")
    @RequestMapping(value = "/jobAfterSalesSync")
    public ResponseEntity<String> jobAfterSalesSync() {
        log.info("开始同步商家售后数据...");
        Map<String, String> queryMap = Maps.newHashMap();
        queryMap.put("status", "2");
        List<商家seller> sellerList = erpService.getSellerList(queryMap);
        List<商家seller> sellerList = sellerList  != null ? sellerList  : new ArrayList<>();

        log.info("共 {} 个商家待处理", sellerList.size());
        for (Seller seller : sellerList) {
            if (!sellerQueue.offer(seller)) {
                log.warn("队列已满,商家 {} 未加入队列", seller.getSellerName());
            } else {
                log.debug("商家 {} 已加入队列", seller.getSellerName());
            }
        }

        processBatch(); // 启动分批处理
        log.info("任务已提交,线程池活跃线程数: {}", taskExecutor.getActiveCount());
        return ResponseEntity.ok("任务已触发");
    }

    /**
     * 异步处理任务
     * taskScheduler与rateLimiter的分工
     *      processBatch 中每批完成后等待 3 秒再调度下一批,这是批次之间的宏观控制。Instant.now().plusSeconds(3)
     *      rateLimiter.acquire():在每批内部的 10 个任务中,控制每个 API 调用的微观速率。
     * 队列作用
     *      processBatch 在每次批次完成后检查 sellerQueue.isEmpty()。如果队列非空,通过 taskScheduler.schedule 调度下一次 processBatch,形成递归调用。保证所有seller都被处理
     *      限流(801)时,handleRetry 确保 Seller 被重新加入 sellerQueue。即使队列满,也通过延迟重试保证任务不丢失。
     * CompletableFuture作用
     *      CompletableFuture 是对传统 Future 的增强,支持链式调用、异常处理和任务组合,用于异步执行 callErpApi,实现每批 10 个 Seller 的并发处理。
     *      将 callErpApi 的执行从主线程中分离出来,提交给线程池(如 taskExecutor)异步运行。
     *      submit 方法返回一个 Future 对象(这里未使用返回值),表示任务已交给线程池处理。
     *      CompletableFuture.runAsync 创建异步任务,执行 callErpApi。在批处理中,每个 Seller 的 API 调用是独立的异步任务。
     *      futures 收集所有任务的 CompletableFuture 实例。
     *      使用 CompletableFuture.allOf 等待一批任务全部完成,然后触发后续操作(如调度下一批)。
     *      通过 .exceptionally 或 .whenComplete 处理异步任务中的异常,确保任务链不会因错误中断。
     * taskExecutor 的整体作用
     *      异步执行:
     *      将 processBatch 和 callErpApi 从主线程(定时任务或 HTTP 请求线程)中分离出来,避免阻塞主线程。
     *      例如,HTTP 请求可以快速响应,而实际处理在后台进行。
     *      并发处理:
     *      在 processBatch 中,10 个 callErpApi 任务可以并行执行(取决于线程池大小),提高处理效率。
     *      例如,如果线程池核心线程数为 10,则每批 10 个任务可以同时运行。
     *      与 taskScheduler 的分工:
     *      taskExecutor:负责执行具体的任务(processBatch 和 callErpApi)。
     *      taskScheduler:负责调度任务的执行时间(例如批次间隔 3 秒或限流重试延迟)。
     *      与代码目标的关系
     *      分批执行:taskExecutor 使每批 10 个任务并发运行。
     *      持续执行:与 taskScheduler 配合,确保队列非空时任务持续调度。
     *      限流控制:rateLimiter 限制速率,taskExecutor 提供并发支持,二者结合实现高效且受控的处理。
     */
    private void processBatch() {
        if (sellerQueue.isEmpty()) {
            log.info("队列处理完成,剩余大小: {}", sellerQueue.size());
            return;
        }

        List<Seller> batch = new ArrayList<>();
        int drained = sellerQueue.drainTo(batch, BATCH_SIZE); // 取出最多 10 个,出队
        log.info("从队列中取出 {} 个元素,开始处理", drained);
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        /**
         *
         * 对于每批的 10 个 Seller,使用 CompletableFuture.runAsync 将 callErpApi(seller) 提交到 taskExecutor 执行。
         * runAsync 的第二个参数指定了执行器(taskExecutor),确保这些任务在 taskExecutor 的线程池中并行运行。
         */
        for (Seller seller : batch) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> callErpApi(seller), taskExecutor);
            futures.add(future);
        }
        // 等待当前批次所有任务完成。在批次完成后执行回调,检查队列并调度下一次 processBatch(延迟 3 秒)。
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        log.error("批处理异常: {}", throwable.getMessage());
                    }
                    log.info("完成一批处理,剩余队列大小: {}", sellerQueue.size());
                    if (!sellerQueue.isEmpty()) {
                        // 只要 sellerQueue 中还有元素,processBatch 会在每次批次完成后通过 taskScheduler.schedule 重新调用自己。
                        //限流(801 错误)时,handleRetry 会将 Seller 重新加入 sellerQueue,保持队列非空。
                        //每次批次完成后,只要队列非空,就延迟 3 秒调度下一批,直到队列为空。
                        taskScheduler.schedule(this::processBatch, Instant.now().plusSeconds(3));
                    } else {
                        log.info("队列已空,任务结束");
                    }
                });
    }
    //调用ERP接口
    private void callErpApi(Seller seller) {
        rateLimiter.acquire();
        try {
            String response = request(seller);
            if (StringUtils.isBlank(response)) {
                log.info("商家 {} 处理成功(队列剩余: {})", seller.getSellerName(), sellerQueue.size());
            } else {
                JsonNode jsonResponse = objectMapper.readTree(response);
                if (jsonResponse.has("code") && jsonResponse.get("code").asInt() == 801) {
                    int waitTime = extractWaitTime(jsonResponse.get("message").asText());
                    log.warn("限流,商家 {} 暂停 {} 秒后重试", seller.getSellerName(), waitTime);
                    //限流时,sellerQueue.offer(seller) 尝试入队。如果队列满,延迟 waitTime 秒后重试。
                    handleRetry(seller, waitTime);
                } else {
                    log.warn("其他错误,商家: {}, 接口返回: {},视为成功", seller.getSellerName(), response);
                }
            }
        } catch (Exception e) {
            log.error("API 调用异常,商家: {},视为成功", seller.getSellerName(), e);
        }
    }
    //重试处理
    private void handleRetry(Seller seller, int waitTime) {
        if (seller.getRetryCount() < MAX_RETRY_COUNT) {
            seller.incrementRetry();
            boolean requeued = sellerQueue.offer(seller);//入队
            if (requeued) {
                log.info("商家 {} 重试次数: {},已重新入队,等待下次批处理", seller.getSellerName(), seller.getRetryCount());
            } else {
                log.warn("队列已满,商家 {} 延迟 {} 秒后重试", seller.getSellerName(), waitTime);
                taskScheduler.schedule(() -> handleRetry(seller, waitTime), Instant.now().plusSeconds(waitTime));
            }
        } else {
            log.error("商家 {} 达到最大重试次数 {},丢弃", seller.getSellerName(), MAX_RETRY_COUNT);
        }
    }
    //获取限速接口中等待时间
    private static int extractWaitTime(String message) {
        Pattern pattern = Pattern.compile("(\\d+)\\s*秒");
        Matcher matcher = pattern.matcher(message);
        return matcher.find() ? Integer.parseInt(matcher.group(1)) : 30;
    }
    //请求接口
    public String request(Seller seller) {
        Map<String, Object> params = Maps.newHashMap();
        String[] range = DateUtil.getDateRange(14);
        params.put("limit", 200);
        params.put("page", 1);
        params.put("start_time", range[0]);
        params.put("end_time", range[1]);
        params.put("shop_nick", seller.getSellerName());

        try {
            String result = erpService.afterSalesData(params);
            return  result != null ? result  : "";
        } catch (Exception e) {
            log.error("请求 API 失败,商家: {}", seller.getSellerName(), e);
            return "{}";
        }
    }


}

2、串行限速  

以上代码仍然有限速问题,调用接口限速频率太高,改造并优化。


@Slf4j
@RestController
public class ErpApiCallTask {

    @Resource
    private ErpService erpService;

    @Resource
    private ThreadPoolTaskExecutor taskExecutor;

    @Resource
    private ThreadPoolTaskScheduler taskScheduler;

    private static final BlockingQueue<Seller> sellerQueue = new LinkedBlockingQueue<>(1000);
    private static final RateLimiter rateLimiter = RateLimiter.create(1.0 / 5.0); // 每 5 秒 1 次
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final int MAX_RETRY_COUNT = 5;
    private static final long WAIT_INTERVAL = 30000; // 等待 30 秒检查新入队元素

    @Scheduled(cron = "0 0 2 * * ?")
    @RequestMapping(value = "/jobAfterSalesSync")
    public ResponseEntity<String> jobAfterSalesSync() {
        log.info("开始从ERP系统同步商家售后数据...");
        Map<String, String> queryMap = Maps.newHashMap();
        queryMap.put("status", "2");
        List<商家seller> sellerList = erpService.getSellerList(queryMap);
        log.info("共 {} 个商家待处理", sellerList.size());
        for (Seller seller : sellerList) {
            if (!sellerQueue.offer(seller)) {
                log.warn("队列已满,商家 {} 未加入队列,当前队列大小: {}", seller.getSellerName(), sellerQueue.size());
            } else {
                log.debug("商家 {} 已加入队列,当前队列大小: {}", seller.getSellerName(), sellerQueue.size());
            }
        }
        log.info("队列初始化完成,当前队列大小: {}", sellerQueue.size());

        // 异步启动处理
        taskExecutor.submit(this::processQueue);
        log.info("任务已提交,线程池活跃线程数: {}", taskExecutor.getActiveCount());
        return ResponseEntity.ok("任务已触发");
    }

    /**
     * 串行处理队列,使用 RateLimiter 控制每 5 秒 1 次调用,队列为空时等待新入队元素
     * 移除批处理和并发:原代码按批次处理(每次 10 个),并通过 CompletableFuture 并发执行。现在改为 processQueue,串行处理队列中的每个 Seller。
     * 移除 CompletableFuture:不需要并发,直接在单线程中顺序调用 callErpApi。
     * 串行执行:使用 orderQueue.poll() 逐个取出 Seller,每次处理一个后等待 5 秒。
     * 保留 taskExecutor.submit:异步启动处理,避免阻塞主线程(定时任务或 HTTP 请求)。处理逻辑在后台线程中串行执行。
     * 使用 RateLimiter 控制速率,在每次调用 callErpApi 前获取令牌,确保5秒最多 1 次调用。
     * 相比 Thread.sleep(1000),RateLimiter 更灵活,能动态调整速率并处理突发请求。
     * 限流处理,限流(801)时,延迟 waitTime 秒后重新入队。队列满时递归重试,确保任务不丢失。
     * 新增等待机制:当 orderQueue.isEmpty() 时,不直接退出,而是等待 WAIT_INTERVAL(秒),然后再次检查队列。如果等待后队列仍为空,设置 hasMoreTasks = false,结束循环;否则继续处理。
     * 新增标志变量 hasMoreTasks:用布尔变量控制外层循环,避免无限等待。
     */
    private void processQueue() {
        boolean hasMoreTasks = true;
        while (hasMoreTasks) {
            if (!sellerQueue.isEmpty()) {
                log.info("开始处理队列,当前队列大小: {}", sellerQueue.size());
                Seller seller = sellerQueue.poll(); // 取出队列头部元素,
                if (seller != null) {
                    log.info("从队列中取出商家: {},剩余队列大小: {}", seller.getSellerName(), sellerQueue.size());
                    rateLimiter.acquire(); // 获取令牌,控制速率
                    callErpApi(seller);
                }
            } else {
                log.info("队列当前为空,等待 {} 毫秒检查新入队元素", WAIT_INTERVAL);
                try {
                    Thread.sleep(WAIT_INTERVAL); // 等待一段时间,检查是否有新元素
                } catch (InterruptedException e) {
                    log.error("等待被中断", e);
                    Thread.currentThread().interrupt();
                }
                if (sellerQueue.isEmpty()) {
                    log.info("等待后队列仍为空,任务结束");
                    hasMoreTasks = false; // 队列仍为空,结束循环
                } else {
                    log.info("检测到新入队元素,继续处理,当前队列大小: {}", sellerQueue.size());
                }
            }
        }
        log.info("队列处理完成,剩余大小: {}", sellerQueue.size());
    }

    private void callErpApi(Seller seller) {
        try {
            String response = request(seller);
            if (StringUtils.isBlank(response)) {
                log.info("商家 {} 处理成功,当前队列大小: {}", seller.getSellerName(), sellerQueue.size());
            } else {
                JsonNode jsonResponse = objectMapper.readTree(response);
                if (jsonResponse.has("code") && jsonResponse.get("code").asInt() == 801) {
                    int waitTime = extractWaitTime(jsonResponse.get("message").asText());
                    log.warn("限流,商家 {} 暂停 {} 秒后重试,当前队列大小: {}",
                            seller.getSellerName(), waitTime, sellerQueue.size());
                    handleRetry(seller, waitTime);
                } else {
                    log.warn("其他错误,商家: {},接口返回: {},视为成功,当前队列大小: {}",
                            seller.getSellerName(), response, sellerQueue.size());
                }
            }
        } catch (Exception e) {
            log.error("API 调用异常,商家: {},视为成功,当前队列大小: {}",
                    seller.getSellerName(), sellerQueue.size(), e);
        }
    }

    private void handleRetry(Seller seller, int waitTime) {
        if (seller.getRetryCount() < MAX_RETRY_COUNT) {
            seller.incrementRetry();
            // 延迟 waitTime 秒后重新入队
            taskScheduler.schedule(() -> {
                boolean requeued = sellerQueue.offer(seller);
                if (requeued) {
                    log.info("商家 {} 重试次数: {},已重新入队,当前队列大小: {}",
                            seller.getSellerName(), seller.getRetryCount(), sellerQueue.size());
                } else {
                    log.warn("队列已满,商家 {} 延迟 {} 秒后重试,当前队列大小: {}",
                            seller.getSellerName(), waitTime, sellerQueue.size());
                    handleRetry(seller, waitTime); // 递归重试
                }
            }, Instant.now().plusSeconds(waitTime));
        } else {
            log.error("商家 {} 达到最大重试次数 {},丢弃,当前队列大小: {}",
                    seller.getSellerName(), MAX_RETRY_COUNT, sellerQueue.size());
        }
    }

    private static int extractWaitTime(String message) {
        Pattern pattern = Pattern.compile("(\\d+)\\s*秒");
        Matcher matcher = pattern.matcher(message);
        return matcher.find() ? Integer.parseInt(matcher.group(1)) : 30;
    }

    public String request(Seller seller) {
        Map<String, Object> params = Maps.newHashMap();
        String[] range = DateUtil.getDateRange(14);
        params.put("limit", 200);
        params.put("page", 1);
        params.put("start_time", range[0]);
        params.put("end_time", range[1]);
        params.put("shop_nick", seller.getSellerName());
        try {
            String result = erpService.afterSalesSyncHandler(params);
            return result  != null ? result  : "";
        } catch (Exception e) {
            log.error("请求 API 失败,商家: {}", seller.getSellerName(), e);
            return "{}";
        }
    }


}


网站公告

今日签到

点亮在社区的每一天
去签到