SpringBoot默认并发处理
在 Spring Boot 项目中,默认情况下,同一时间能处理的请求数由内嵌的 Tomcat 服务器的线程池配置决定。
默认并发处理能力
请求处理流程
- 请求到达:新请求首先进入 TCP 连接队列(最大 max-connections)。
- 线程分配:
如果有空闲线程,立即处理请求。 - 如果所有线程繁忙,请求进入等待队列(最多 accept-count 个)。
队列满时:若等待队列已满,直接返回 Connection refused 错误(HTTP 503)。
配置调整
在 application.properties 或 application.yml 中修改默认值:
# 调整 Tomcat 线程池参数
server.tomcat.threads.max=500 # 提高最大工作线程数
server.tomcat.accept-count=200 # 增大等待队列长度
server.tomcat.max-connections=10000 # 增加 TCP 连接队列容量
限流业务场景
在短时间内,接口承载成千上万的请求,首先要考虑程序的并发性。大流量会直接将系统打垮,无法对外提供服务。那为了防止出现这种情况最常见的解决方案之一就是限流,当请求达到一定的并发数或速率,就进行等待、排队、降级、拒绝服务等。在 Spring Boot 中,常见的限流算法包括 计数器算法、漏桶算法、令牌桶算法,以及 Resilience4j 和 Spring Cloud Gateway 的集成方案。
常见限流算法
固定窗口计数器限流
原理
在固定时间窗口(如1秒)内统计请求数,超过阈值则拒绝请求。
缺点:存在窗口临界突发问题(如窗口切换时请求翻倍)。
实现示例
@Component
public class FixedWindowCounter {
private final AtomicInteger counter = new AtomicInteger(0);
private final int limit = 100; // 每秒100个请求
@Scheduled(fixedRate = 1000) // 每秒重置计数器
public void resetCounter() {
counter.set(0);
}
public boolean allowRequest() {
return counter.incrementAndGet() <= limit;
}
}
// 在Controller中使用
@RestController
public class ApiController {
@Autowired
private FixedWindowCounter counter;
@GetMapping("/api")
public ResponseEntity<String> handleRequest() {
if (counter.allowRequest()) {
return ResponseEntity.ok("Success");
} else {
return ResponseEntity.status(429).body("Too Many Requests");
}
}
}
滑动窗口计数器限流
原理
将时间窗口划分为多个小窗口(如1秒分为10个100ms的窗口),统计最近N个小窗口的总请求数。
优点:平滑过渡,减少临界问题。
实现示例
@Component
public class SlidingWindowCounter {
private final LinkedList<Long> timestamps = new LinkedList<>();
private final int windowSize = 1000; // 时间窗口1秒
private final int limit = 100; // 窗口内最多100个请求
public synchronized boolean allowRequest() {
long now = System.currentTimeMillis();
// 移除超出时间窗口的旧记录
while (!timestamps.isEmpty() && now - timestamps.getFirst() > windowSize) {
timestamps.removeFirst();
}
if (timestamps.size() < limit) {
timestamps.addLast(now);
return true;
}
return false;
}
}
漏桶算法(Leaky Bucket)
漏桶算法原理
漏桶算法是一种流量整形技术,用于控制请求的处理速率,确保系统平稳处理流量。其核心思想如下:
桶容量(Capacity):桶的最大请求容量,超过则拒绝新请求。
恒定漏出速率(Leak Rate):以固定速率处理请求(如每秒处理10个请求)。
请求处理流程:
请求到达时,若桶未满,加入队列等待处理。
桶满时,新请求被拒绝。
漏桶以恒定速率从队列取出请求处理。
漏桶算法示例
- 漏桶的容量和漏出速率
// application.yml
leaky-bucket:
capacity: 10 # 桶容量(最多容纳10个请求)
leak-rate: 1000 # 漏出速率(每1000毫秒处理1个请求)
// LeakyBucketConfig
@Configuration
@ConfigurationProperties(prefix = "leaky-bucket")
@Data
public class LeakyBucketConfig {
private int capacity; // 桶容量
private long leakRate; // 漏出速率(毫秒)
}
- 实现漏桶算法
@Component
public class LeakyBucket {
private final Queue<Request> bucket = new ConcurrentLinkedQueue<>();
private final int capacity;
private final long leakRate;
public LeakyBucket(LeakyBucketConfig config) {
this.capacity = config.getCapacity();
this.leakRate = config.getLeakRate();
this.initLeakTask();
}
// 初始化漏出任务
private void initLeakTask() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::leak, 0, leakRate, TimeUnit.MILLISECONDS);
}
// 处理漏出请求
private void leak() {
if (!bucket.isEmpty()) {
Request request = bucket.poll();
request.process();
}
}
// 尝试加入请求
public boolean tryAddRequest(Request request) {
if (bucket.size() < capacity) {
return bucket.offer(request);
}
return false;
}
}
- 请求处理类
public class Request {
private final String data;
public Request(String data) {
this.data = data;
}
public void process() {
System.out.println("处理请求: " + data + ",时间: " + new Date());
}
}
- controller
@RestController
public class ApiController {
private final LeakyBucket leakyBucket;
public ApiController(LeakyBucket leakyBucket) {
this.leakyBucket = leakyBucket;
}
@PostMapping("/api")
public ResponseEntity<String> handleRequest(@RequestBody String data) {
Request request = new Request(data);
if (leakyBucket.tryAddRequest(request)) {
return ResponseEntity.ok("请求已接受");
} else {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body("请求过多,请稍后重试");
}
}
}
漏桶算法适用性总结
异步限流:漏桶算法的设计初衷是异步处理,适合需要流量整形的场景(如API限流、消息队列消费)。
同步适配:通过阻塞或Future可强制同步,但会牺牲性能,不推荐在高并发场景使用。
替代方案:若需同步限流,可考虑 计数器算法 或 信号量(如Semaphore)。
结合Future模拟同步示例:
public class SyncLeakyBucket {
private final BlockingQueue<CompletableFuture<String>> queue = new LinkedBlockingQueue<>(10);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public SyncLeakyBucket() {
// 每秒处理1个请求
scheduler.scheduleAtFixedRate(() -> {
CompletableFuture<String> future = queue.poll();
if (future != null) {
future.complete("处理结果");
}
}, 0, 1, TimeUnit.SECONDS);
}
// 同步调用:阻塞等待结果
public String processSync(String data) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = new CompletableFuture<>();
if (queue.offer(future)) {
return future.get(); // 阻塞直到结果返回
} else {
throw new RuntimeException("请求被拒绝");
}
}
}
// 使用示例
SyncLeakyBucket bucket = new SyncLeakyBucket();
String result = bucket.processSync("data"); // 同步阻塞
令牌桶算法(Token Bucket)
令牌桶算法原理
令牌桶算法通过以下机制控制请求速率:
令牌生成:以固定速率(如每秒10个)向桶中添加令牌。
桶容量:桶中最多存储的令牌数(如100个),超出部分丢弃。
请求处理:请求到达时尝试获取令牌,获取成功则处理,否则拒绝。
突发处理:桶内积累的令牌允许一次性消耗,支持突发流量。
实现示例
- 创建令牌桶组件
@Component
public class TokenBucket {
private final int capacity; // 桶容量
private final int refillRate; // 每秒填充的令牌数
private AtomicInteger tokens; // 当前令牌数
private long lastRefillTime; // 上次填充时间
public TokenBucket(@Value("${token-bucket.capacity:100}") int capacity,
@Value("${token-bucket.refill-rate:10}") int refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = new AtomicInteger(capacity);
this.lastRefillTime = System.currentTimeMillis();
}
// 尝试获取令牌
public boolean tryAcquire() {
refillTokens(); // 先补充令牌
while (true) {
int current = tokens.get();
if (current <= 0) {
return false;
}
if (tokens.compareAndSet(current, current - 1)) {
return true;
}
}
}
// 补充令牌(线程安全)
private synchronized void refillTokens() {
long now = System.currentTimeMillis();
long elapsedTime = now - lastRefillTime;
int tokensToAdd = (int) (elapsedTime * refillRate / 1000);
if (tokensToAdd > 0) {
tokens.set(Math.min(tokens.get() + tokensToAdd, capacity));
lastRefillTime = now;
}
}
}
- controller
@RestController
public class ApiController {
private final TokenBucket tokenBucket;
public ApiController(TokenBucket tokenBucket) {
this.tokenBucket = tokenBucket;
}
@GetMapping("/api")
public ResponseEntity<String> handleRequest() {
if (tokenBucket.tryAcquire()) {
return ResponseEntity.ok("请求处理成功");
} else {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body("请求过多,请稍后重试");
}
}
}
令牌桶算法 vs 漏桶算法
令牌桶通过以下方式支持突发流量:
令牌积累:当请求较少时,未使用的令牌会在桶中积累(最多达到桶容量)。
突发消耗:当突发请求到达时,可一次性消耗所有积累的令牌,处理多个请求。
常见限流实现
Guava 的 RateLimiter(单机限流)
基于令牌桶算法,适合单机限流。
优点:简单易用,适合单机场景。
缺点:不支持分布式环境。
- 添加依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
- 在Controller中使用RateLimiter:
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
// 每秒允许2个请求
private final RateLimiter rateLimiter = RateLimiter.create(2.0);
@GetMapping("/api/resource")
public String getResource() {
if (rateLimiter.tryAcquire()) { // 尝试获取令牌
return "Success";
} else {
return "Too many requests!";
}
}
}
基于 AOP + 自定义注解(单机限流)
通过AOP拦截方法调用,实现限流逻辑。
优点:灵活,可自定义限流规则。
缺点:单机限流,计数器未持久化。
- 定义限流注解:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimit {
int value(); // 允许的请求数
int timeWindow() default 60; // 时间窗口(秒)
}
- aop切面
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class RateLimitAspect {
private final ConcurrentHashMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> timestamps = new ConcurrentHashMap<>();
@Around("@annotation(rateLimit)")
public Object limit(ProceedingJoinPoint pjp, RateLimit rateLimit) throws Throwable {
String key = pjp.getSignature().toLongString();
int limit = rateLimit.value();
long window = rateLimit.timeWindow() * 1000L;
long now = System.currentTimeMillis();
synchronized (this) {
if (timestamps.getOrDefault(key, 0L) < now - window) {
counters.put(key, new AtomicInteger(0));
timestamps.put(key, now);
}
AtomicInteger count = counters.get(key);
if (count != null && count.incrementAndGet() > limit) {
throw new RuntimeException("Rate limit exceeded");
}
}
return pjp.proceed();
}
}
- Controller
@RateLimit(value = 10, timeWindow = 60) // 60秒内允许10次请求
@GetMapping("/limited-api")
public String limitedApi() {
return "Processed";
}
Redis + Lua 脚本(分布式限流)
利用Redis的原子性操作实现分布式限流。
优点:支持分布式环境,高并发安全。
缺点:依赖Redis,需要维护脚本。
固定窗口计数器示例:
- redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- Lua脚本 rate_limiter.lua
-- KEYS[1] = 限流键, ARGV[1] = 窗口时间(秒), ARGV[2] = 最大请求数
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current = redis.call('GET', key)
if current and tonumber(current) >= limit then
return 0 -- 超过阈值,拒绝请求
else
redis.call('INCR', key)
if tonumber(current) == 0 then -- 首次设置过期时间
redis.call('EXPIRE', key, window)
end
return 1 -- 允许请求
end
- 在Service中调用Lua脚本
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import java.util.Collections;
@Service
public class RateLimitService {
private final RedisTemplate<String, String> redisTemplate;
private final RedisScript<Long> rateLimiterScript;
public RateLimitService(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
this.rateLimiterScript = RedisScript.of(
new ClassPathResource("rate_limiter.lua"), Long.class
);
}
public boolean allowRequest(String key, int limit, int windowSec) {
Long result = redisTemplate.execute(
rateLimiterScript,
Collections.singletonList(key), // 限流键 列表
String.valueOf(windowSec) // ARGV 参数
String.valueOf(limit) // ARGV 参数
);
return result != null && result == 1;
}
}
- Controller
@GetMapping("/distributed-api")
public String distributedApi() {
String key = "user:123:api"; // 根据用户或接口生成唯一key
if (rateLimitService.allowRequest(key, 60, 100)) {
return "Success";
} else {
return "Too many requests!";
}
}
使用 Sentinel(生产级方案)
通过阿里开源的Sentinel实现流量控制、熔断降级。
优点:功能强大,支持动态规则配置。
缺点:需要额外部署控制台。
- 添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2021.0.5.0</version>
</dependency>
- 配置资源点
@GetMapping("/sentinel-api")
@SentinelResource(value = "sentinelApi", blockHandler = "blockHandler")
public String sentinelApi() {
return "Success";
}
public String blockHandler(BlockException ex) {
return "Blocked by Sentinel";
}
- 在Sentinel控制台配置规则(QPS=2)
// 代码初始化规则(可选)
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("sentinelApi");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setCount(2);
rules.add(rule);
FlowRuleManager.loadRules(rules);
Spring Cloud Gateway 限流(网关层限流)
适用于微服务架构,在网关层统一限流。
优点:网关层统一管控,适合微服务。
缺点:需部署网关和Redis。
- 添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
- 配置路由规则(application.yml)
spring:
cloud:
gateway:
routes:
- id: my_route
uri: http://localhost:8080
predicates:
- Path=/api/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 1 # 每秒1个令牌
redis-rate-limiter.burstCapacity: 2 # 桶容量
key-resolver: "#{@userKeyResolver}"
- 定义KeyResolver(按用户限流)
@Bean
public KeyResolver userKeyResolver() {
return exchange -> Mono.just(
exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()
);
}