Java 并发包 - Semaphore类

发布于:2025-07-20 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、Semaphore 解析

1. 核心概念

Semaphore(信号量)是 Java 并发包 (java.util.concurrent) 中的一种计数信号量,用于控制同时访问特定资源的线程数量。其核心是一个许可证计数器(permits):

  • 当线程访问资源时,需要获取许可证(计数器减1)
  • 使用完毕后释放许可证(计数器加1)
  • 当计数器为0时,新线程必须等待许可证释放
2. 核心特性
特性 说明
许可证管理 通过 acquire()/release() 控制许可证
公平性选择 支持公平/非公平两种模式(构造参数控制)
批量操作 支持一次获取/释放多个许可证
非阻塞操作 提供 tryAcquire() 立即返回结果
超时机制 tryAcquire(long timeout, TimeUnit unit)
3. 底层实现

基于 AQS(AbstractQueuedSynchronizer)实现:

  • 共享锁模式:允许多个线程同时获取锁
  • 状态值:AQS 的 state 字段存储可用许可证数量
  • 同步队列:管理等待许可证的线程
// (非公平模式)
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining)) {
            return remaining;
        }
    }
}
4. 与相关工具对比
工具 特性 适用场景
Semaphore 控制并发数量 资源池、限流
ReentrantLock 互斥访问 单一资源保护
CountDownLatch 一次性等待 启动/停止协调
CyclicBarrier 多线程同步点 分阶段计算

二、简易使用案例

案例1:数据库连接池模拟
import java.util.concurrent.*;

class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> pool;

    public ConnectionPool(int poolSize) {
        semaphore = new Semaphore(poolSize, true); // 公平模式
        pool = new ArrayBlockingQueue<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            pool.offer(new Connection("Conn-" + (i+1)));
        }
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire(); // 获取许可证
        return pool.take();
    }

    public void releaseConnection(Connection conn) {
        pool.offer(conn);
        semaphore.release(); // 释放许可证
    }

    static class Connection {
        private final String id;
        public Connection(String id) { this.id = id; }
        public void execute() throws InterruptedException {
            System.out.println(Thread.currentThread().getName() 
                + " 使用连接: " + id);
            Thread.sleep(1000); // 模拟数据库操作
        }
    }
}

public class SemaphoreDemo {
    public static void main(String[] args) {
        final ConnectionPool pool = new ConnectionPool(3);
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        for (int i = 1; i <= 10; i++) {
            executor.execute(() -> {
                try {
                    Connection conn = pool.getConnection();
                    conn.execute();
                    pool.releaseConnection(conn);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        executor.shutdown();
    }
}

输出示例:

pool-1-thread-1 使用连接: Conn-1
pool-1-thread-3 使用连接: Conn-3
pool-1-thread-2 使用连接: Conn-2
(1秒后)
pool-1-thread-4 使用连接: Conn-1
pool-1-thread-5 使用连接: Conn-2
pool-1-thread-6 使用连接: Conn-3
...
案例2:限流系统(每秒最多5个请求)
import java.util.concurrent.*;

class RateLimiter {
    private final Semaphore semaphore;
    private final ScheduledExecutorService scheduler;

    public RateLimiter(int permitsPerSecond) {
        semaphore = new Semaphore(permitsPerSecond);
        scheduler = Executors.newScheduledThreadPool(1);
        // 每秒释放所有许可证
        scheduler.scheduleAtFixedRate(
            () -> semaphore.release(permitsPerSecond - semaphore.availablePermits()),
            1, 1, TimeUnit.SECONDS
        );
    }

    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

public class RateLimitDemo {
    public static void main(String[] args) {
        RateLimiter limiter = new RateLimiter(5); // 每秒5个请求
        
        for (int i = 1; i <= 20; i++) {
            new Thread(() -> {
                if (limiter.tryAcquire()) {
                    System.out.println(Thread.currentThread().getName() 
                        + " 处理请求");
                } else {
                    System.out.println(Thread.currentThread().getName() 
                        + " 请求被限流");
                }
            }, "Thread-"+i).start();
            try { Thread.sleep(100); } catch (Exception e) {}
        }
        limiter.shutdown();
    }
}

输出特点:

  • 前5个线程成功获取许可
  • 后续请求被限流
  • 1秒后新一轮请求可获取许可

三、最佳实践

  1. 释放许可证

    try {
        semaphore.acquire();
        // 访问共享资源
    } finally {
        semaphore.release(); // 确保总是释放
    }
    
  2. 避免死锁

    • 使用 tryAcquire() 替代阻塞方法
    • 设置合理的超时时间
  3. 动态调整(Java 17+):

    semaphore.reducePermits(2); // 永久减少许可证数量
    
  4. 资源清理

    // 关闭限流器中的定时任务
    scheduler.shutdown(); 
    

四、适用场景

  1. 资源池管理(数据库连接、线程池)
  2. API 限流(控制 QPS)
  3. 生产者-消费者模型(有界缓冲区)
  4. 并行任务控制(限制最大并发数)
  5. 系统保护(防止资源耗尽)

网站公告

今日签到

点亮在社区的每一天
去签到