一、Semaphore 解析
1. 核心概念
Semaphore(信号量)是 Java 并发包 (java.util.concurrent
) 中的一种计数信号量,用于控制同时访问特定资源的线程数量。其核心是一个许可证计数器(permits):
- 当线程访问资源时,需要获取许可证(计数器减1)
- 使用完毕后释放许可证(计数器加1)
- 当计数器为0时,新线程必须等待许可证释放
2. 核心特性
特性 | 说明 |
---|---|
许可证管理 | 通过 acquire() /release() 控制许可证 |
公平性选择 | 支持公平/非公平两种模式(构造参数控制) |
批量操作 | 支持一次获取/释放多个许可证 |
非阻塞操作 | 提供 tryAcquire() 立即返回结果 |
超时机制 | tryAcquire(long timeout, TimeUnit unit) |
3. 底层实现
基于 AQS(AbstractQueuedSynchronizer)实现:
- 共享锁模式:允许多个线程同时获取锁
- 状态值:AQS 的 state 字段存储可用许可证数量
- 同步队列:管理等待许可证的线程
// (非公平模式)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
4. 与相关工具对比
工具 | 特性 | 适用场景 |
---|---|---|
Semaphore | 控制并发数量 | 资源池、限流 |
ReentrantLock | 互斥访问 | 单一资源保护 |
CountDownLatch | 一次性等待 | 启动/停止协调 |
CyclicBarrier | 多线程同步点 | 分阶段计算 |
二、简易使用案例
案例1:数据库连接池模拟
import java.util.concurrent.*;
class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int poolSize) {
semaphore = new Semaphore(poolSize, true); // 公平模式
pool = new ArrayBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
pool.offer(new Connection("Conn-" + (i+1)));
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可证
return pool.take();
}
public void releaseConnection(Connection conn) {
pool.offer(conn);
semaphore.release(); // 释放许可证
}
static class Connection {
private final String id;
public Connection(String id) { this.id = id; }
public void execute() throws InterruptedException {
System.out.println(Thread.currentThread().getName()
+ " 使用连接: " + id);
Thread.sleep(1000); // 模拟数据库操作
}
}
}
public class SemaphoreDemo {
public static void main(String[] args) {
final ConnectionPool pool = new ConnectionPool(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++) {
executor.execute(() -> {
try {
Connection conn = pool.getConnection();
conn.execute();
pool.releaseConnection(conn);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
输出示例:
pool-1-thread-1 使用连接: Conn-1
pool-1-thread-3 使用连接: Conn-3
pool-1-thread-2 使用连接: Conn-2
(1秒后)
pool-1-thread-4 使用连接: Conn-1
pool-1-thread-5 使用连接: Conn-2
pool-1-thread-6 使用连接: Conn-3
...
案例2:限流系统(每秒最多5个请求)
import java.util.concurrent.*;
class RateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
semaphore = new Semaphore(permitsPerSecond);
scheduler = Executors.newScheduledThreadPool(1);
// 每秒释放所有许可证
scheduler.scheduleAtFixedRate(
() -> semaphore.release(permitsPerSecond - semaphore.availablePermits()),
1, 1, TimeUnit.SECONDS
);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void shutdown() {
scheduler.shutdown();
}
}
public class RateLimitDemo {
public static void main(String[] args) {
RateLimiter limiter = new RateLimiter(5); // 每秒5个请求
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
if (limiter.tryAcquire()) {
System.out.println(Thread.currentThread().getName()
+ " 处理请求");
} else {
System.out.println(Thread.currentThread().getName()
+ " 请求被限流");
}
}, "Thread-"+i).start();
try { Thread.sleep(100); } catch (Exception e) {}
}
limiter.shutdown();
}
}
输出特点:
- 前5个线程成功获取许可
- 后续请求被限流
- 1秒后新一轮请求可获取许可
三、最佳实践
释放许可证:
try { semaphore.acquire(); // 访问共享资源 } finally { semaphore.release(); // 确保总是释放 }
避免死锁:
- 使用
tryAcquire()
替代阻塞方法 - 设置合理的超时时间
- 使用
动态调整(Java 17+):
semaphore.reducePermits(2); // 永久减少许可证数量
资源清理:
// 关闭限流器中的定时任务 scheduler.shutdown();
四、适用场景
- 资源池管理(数据库连接、线程池)
- API 限流(控制 QPS)
- 生产者-消费者模型(有界缓冲区)
- 并行任务控制(限制最大并发数)
- 系统保护(防止资源耗尽)