Java面试实战系列【并发篇】- Semaphore深度解析与实战

发布于:2025-08-17 ⋅ 阅读:(13) ⋅ 点赞:(0)

一、引言

1.1 什么是Semaphore信号量

Semaphore(信号量)是Java并发包java.util.concurrent中的一个重要同步工具类,它用于控制同时访问某个资源的线程数量。Semaphore维护了一组许可证(permits),线程在访问资源前需要获取许可证,访问完成后释放许可证。

核心概念

  • 许可证(Permits):代表可以同时访问资源的线程数量
  • 获取(Acquire):线程请求获得一个或多个许可证
  • 释放(Release):线程释放持有的许可证,供其他线程使用
acquire()
acquire()
acquire()
等待
release()
release()
release()
Semaphore
许可证池
许可证1
许可证2
许可证3
...
线程1
线程2
线程3
线程4

图解说明:上图展示了Semaphore的基本工作原理。许可证池中有固定数量的许可证,线程获取许可证后才能访问资源,当许可证用完时,新来的线程需要等待。

1.2 Semaphore的核心概念:许可证机制

许可证机制是Semaphore的核心设计思想,它类似于现实生活中的停车场管理:

线程1 线程2 线程3 Semaphore(2) 资源 初始许可证数量: 2 acquire() 获得许可证1 访问资源 acquire() 获得许可证2 访问资源 acquire() 等待(无可用许可证) release() 释放许可证1 分配许可证给T3 访问资源 release() 释放许可证2 release() 释放许可证 线程1 线程2 线程3 Semaphore(2) 资源

时序图说明

  1. Semaphore初始化时设定许可证数量(本例为2)
  2. 线程1和线程2成功获取许可证并访问资源
  3. 线程3因为无可用许可证而等待
  4. 当线程1释放许可证后,线程3立即获得许可证
  5. 所有线程使用完资源后都会释放许可证

1.3 为什么需要Semaphore

在实际开发中,我们经常遇到需要限制并发访问数量的场景:

典型应用场景

  • 数据库连接池:限制同时连接数据库的连接数
  • HTTP连接池:控制同时发起的HTTP请求数量
  • 线程池控制:限制同时执行的任务数量
  • 限流保护:保护系统不被过多请求压垮
  • 资源池管理:管理有限的系统资源(如文件句柄、内存等)

不使用Semaphore的问题

// 问题示例:无控制的并发访问
public class UncontrolledAccess {
    private final ExpensiveResource resource = new ExpensiveResource();
    
    public void accessResource() {
        // 所有线程都可以同时访问,可能导致:
        // 1. 系统资源耗尽
        // 2. 性能急剧下降
        // 3. 系统崩溃
        resource.doWork();
    }
}

使用Semaphore的解决方案

// 解决方案:使用Semaphore控制并发
public class ControlledAccess {
    private final Semaphore semaphore = new Semaphore(5); // 最多5个线程同时访问
    private final ExpensiveResource resource = new ExpensiveResource();
    
    public void accessResource() throws InterruptedException {
        semaphore.acquire(); // 获取许可证
        try {
            resource.doWork(); // 安全访问资源
        } finally {
            semaphore.release(); // 确保释放许可证
        }
    }
}

二、Semaphore核心原理

2.1 信号量的理论基础

信号量(Semaphore)概念最初由计算机科学家Edsger Dijkstra在1965年提出,是解决并发编程中资源竞争问题的重要工具。

信号量的数学模型

P操作(获取):
if (S > 0) {
    S = S - 1;  // 获取成功,减少可用资源
} else {
    block();    // 获取失败,线程阻塞等待
}

V操作(释放):
S = S + 1;      // 增加可用资源
wakeup();       // 唤醒等待的线程

Java中的对应关系

  • P操作acquire() 方法
  • V操作release() 方法
  • S值 ↔ 可用许可证数量

2.2 基于AQS的底层实现机制

Semaphore基于AbstractQueuedSynchronizer(AQS)框架实现,AQS提供了同步状态管理和线程阻塞/唤醒的基础设施。

Semaphore
-Sync sync
+acquire() : void
+release() : void
+tryAcquire() : boolean
+availablePermits() : int
«abstract»
Sync
#tryAcquireShared(int) : int
#tryReleaseShared(int) : boolean
FairSync
#tryAcquireShared(int) : int
NonfairSync
#tryAcquireShared(int) : int
AQS
-int state
-Node head
-Node tail
+acquireShared(int) : void
+releaseShared(int) : boolean

类图说明

  • Semaphore:对外提供的API接口
  • Sync:内部抽象同步器,继承自AQS
  • FairSync/NonfairSync:公平和非公平模式的具体实现
  • AQS:提供同步状态管理和线程队列管理

AQS状态表示

// AQS中的state字段表示可用许可证数量
// state > 0  : 有可用许可证
// state = 0  : 无可用许可证,新线程需要等待
// state < 0  : 理论上不会出现(Semaphore确保state >= 0)

protected int getState() { return state; }
protected boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.3 公平与非公平模式原理

Semaphore支持两种获取许可证的策略:公平模式和非公平模式。

公平模式(FairSync)

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 关键:检查是否有前驱节点在等待
        if (hasQueuedPredecessors())
            return -1;  // 有线程在排队,当前线程不能插队
            
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

非公平模式(NonfairSync)

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
        // 注意:没有检查hasQueuedPredecessors(),允许插队
    }
}

两种模式的对比

非公平模式
公平模式
直接尝试获取许可证
新线程到达
获取成功?
执行任务
加入队列等待
队列中有等待线程?
新线程到达
加入队列末尾等待
尝试获取许可证
获取成功,执行任务

性能特性分析

  • 公平模式:保证先到先得,但性能较低(需要检查队列状态)
  • 非公平模式:性能更高,但可能导致线程饥饿

2.4 许可证的获取与释放流程

获取许可证的完整流程

线程调用acquire
tryAcquireShared
许可证足够?
CAS更新state
CAS成功?
获取成功返回
加入AQS等待队列
park阻塞线程
被唤醒

释放许可证的完整流程

线程调用release
tryReleaseShared
CAS增加state
CAS成功?
检查等待队列
有等待线程?
唤醒队列头部线程
释放完成

2.5 源码关键片段深度解析

核心获取逻辑

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AQS中的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// 非公平模式的tryAcquireShared实现
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

源码解析要点

1. 中断检查机制

if (Thread.interrupted())
    throw new InterruptedException();

在获取许可证前首先检查线程是否被中断,体现了可中断设计原则。

2. 快速路径优化

if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);

首先尝试快速获取,只有在失败时才进入复杂的队列等待逻辑,这是一种重要的性能优化策略。

3. CAS无锁更新

if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;

这里的逻辑很巧妙:

  • 如果remaining < 0,说明许可证不足,直接返回负数表示失败
  • 如果许可证足够,尝试CAS更新;如果CAS失败,会在下一次循环中重试
  • 这种设计在高并发下既保证了正确性,又提供了较好的性能

4. 释放逻辑的实现

public void release() {
    sync.releaseShared(1);
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

释放逻辑的关键点

  • 溢出检查if (next < current)检查整数溢出,防止许可证数量超过最大值
  • 无条件释放:release操作不检查当前线程是否持有许可证,这是Semaphore的设计特点
  • CAS重试:使用无锁的CAS操作更新状态,失败时自动重试

5. 等待队列的管理

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这段代码展示了AQS队列的精妙设计:

  • 共享模式节点:使用Node.SHARED标记,与独占模式不同
  • 传播机制setHeadAndPropagate确保在有多个许可证时能唤醒多个等待线程
  • 中断响应parkAndCheckInterrupt()检查中断状态,实现可中断等待
  • 异常安全:finally块确保在异常情况下正确清理节点

三、Semaphore核心API详解

3.1 构造方法详解

Semaphore提供了两个构造方法,允许配置许可证数量和公平性策略:

// 构造方法1:指定许可证数量,默认非公平模式
public Semaphore(int permits)

// 构造方法2:指定许可证数量和公平性策略
public Semaphore(int permits, boolean fair)

参数说明

  • permits:初始许可证数量,必须 >= 0
  • fair:公平性策略,true为公平模式,false为非公平模式

使用示例

// 创建一个有5个许可证的非公平信号量
Semaphore semaphore1 = new Semaphore(5);

// 创建一个有3个许可证的公平信号量
Semaphore semaphore2 = new Semaphore(3, true);

// 创建一个许可证数量为0的信号量(常用于一次性事件)
Semaphore semaphore3 = new Semaphore(0);

构造方法源码解析

public Semaphore(int permits) {
    sync = new NonfairSync(permits);  // 默认使用非公平模式
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

// Sync构造方法
Sync(int permits) {
    setState(permits);  // 设置AQS的state为许可证数量
}

构造方法设计要点

  • 默认选择非公平模式是因为性能更好,大多数场景下也是合适的
  • 许可证数量直接设置为AQS的state值,利用AQS的原子操作保证线程安全
  • 通过不同的Sync实现类来支持公平和非公平两种策略

3.2 acquire()系列方法深度剖析

acquire()系列方法是获取许可证的核心API,提供了多种获取策略:

// 基本获取方法
public void acquire() throws InterruptedException
public void acquire(int permits) throws InterruptedException

// 不可中断获取方法
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)

3.2.1 基本acquire()方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

方法特性分析

  • 可中断:线程在等待过程中可以被中断
  • 阻塞等待:如果许可证不足,线程会阻塞直到有可用许可证
  • 异常安全:被中断时抛出InterruptedException

使用示例

public class ResourceManager {
    private final Semaphore semaphore = new Semaphore(3);
    
    public void accessResource() throws InterruptedException {
        // 获取一个许可证
        semaphore.acquire();
        try {
            System.out.println(Thread.currentThread().getName() + " 正在访问资源");
            Thread.sleep(2000); // 模拟资源使用
        } finally {
            semaphore.release(); // 确保释放许可证
        }
    }
    
    public void batchAccess(int count) throws InterruptedException {
        // 批量获取多个许可证
        semaphore.acquire(count);
        try {
            System.out.println("批量获取了 " + count + " 个许可证");
            // 执行需要多个许可证的操作
        } finally {
            semaphore.release(count); // 批量释放
        }
    }
}

3.2.2 不可中断acquire方法

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

不可中断方法的特点

  • 线程在等待过程中不响应中断
  • 适用于必须获得资源才能继续的场景
  • 即使线程被中断,也会继续等待直到获得许可证

使用场景对比

// 场景1:可中断获取 - 适用于可以取消的任务
public void cancellableTask() {
    try {
        semaphore.acquire(); // 可以被中断取消
        doWork();
    } catch (InterruptedException e) {
        System.out.println("任务被取消");
        Thread.currentThread().interrupt(); // 恢复中断状态
    } finally {
        semaphore.release();
    }
}

// 场景2:不可中断获取 - 适用于关键资源访问
public void criticalTask() {
    semaphore.acquireUninterruptibly(); // 不可被中断
    try {
        doCriticalWork(); // 关键任务必须完成
    } finally {
        semaphore.release();
    }
}

3.3 release()系列方法详解

release()方法用于释放许可证,增加可用许可证的数量:

// 释放一个许可证
public void release()

// 释放多个许可证
public void release(int permits)

3.3.1 基本release()方法源码

public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

// AQS中的releaseShared方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();  // 唤醒等待的线程
        return true;
    }
    return false;
}

release方法的重要特性

1. 无所有权检查

// Semaphore不检查释放许可证的线程是否曾经获取过许可证
public class PermitDemo {
    private static final Semaphore semaphore = new Semaphore(1);
    
    public static void main(String[] args) throws InterruptedException {
        // 线程A获取许可证
        Thread threadA = new Thread(() -> {
            try {
                semaphore.acquire();
                System.out.println("Thread A 获取许可证");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 注意:Thread A 不释放许可证
        });
        
        // 线程B释放许可证(即使它没有获取过)
        Thread threadB = new Thread(() -> {
            try {
                Thread.sleep(1000);
                semaphore.release(); // 这是允许的!
                System.out.println("Thread B 释放许可证");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        threadA.start();
        threadB.start();
        threadA.join();
        threadB.join();
    }
}

这种设计的优缺点

  • 优点:灵活性高,可以实现复杂的许可证管理策略
  • 缺点:容易出现许可证泄漏或错误释放的问题

2. 许可证数量可以超过初始值

public class PermitOverflow {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2); // 初始2个许可证
        
        System.out.println("初始许可证: " + semaphore.availablePermits()); // 输出: 2
        
        semaphore.release(3); // 释放3个许可证
        System.out.println("释放后许可证: " + semaphore.availablePermits()); // 输出: 5
        
        // 现在有5个许可证可用,超过了初始值2
    }
}

3. 批量释放的原子性

// 批量释放许可证的示例
public class BatchRelease {
    private final Semaphore semaphore = new Semaphore(10);
    
    public void batchOperation() throws InterruptedException {
        int permits = 5;
        semaphore.acquire(permits); // 批量获取5个许可证
        
        try {
            // 执行需要5个许可证的操作
            processWithMultiplePermits();
        } finally {
            semaphore.release(permits); // 原子性地释放5个许可证
        }
    }
    
    private void processWithMultiplePermits() {
        // 模拟需要多个许可证的操作
        System.out.println("使用5个许可证执行批量操作");
    }
}

3.4 tryAcquire()尝试获取许可证

tryAcquire()系列方法提供非阻塞的许可证获取方式:

// 立即尝试获取,不等待
public boolean tryAcquire()
public boolean tryAcquire(int permits)

// 带超时的尝试获取
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException

3.4.1 立即尝试获取

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

重要特性

  • 非阻塞:立即返回,不会等待
  • 非公平:即使是公平模式的Semaphore,tryAcquire也是非公平的
  • 返回值:true表示获取成功,false表示获取失败

使用示例

public class NonBlockingAccess {
    private final Semaphore semaphore = new Semaphore(3);
    private final AtomicInteger successCount = new AtomicInteger(0);
    private final AtomicInteger failureCount = new AtomicInteger(0);
    
    public void attemptAccess() {
        if (semaphore.tryAcquire()) {
            successCount.incrementAndGet();
            try {
                System.out.println(Thread.currentThread().getName() + " 获取许可证成功");
                Thread.sleep(1000); // 模拟工作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                semaphore.release();
            }
        } else {
            failureCount.incrementAndGet();
            System.out.println(Thread.currentThread().getName() + " 获取许可证失败,执行降级逻辑");
            fallbackLogic(); // 执行备用逻辑
        }
    }
    
    private void fallbackLogic() {
        // 获取许可证失败时的备用处理逻辑
        System.out.println("执行备用处理方案");
    }
    
    public void printStatistics() {
        System.out.println("成功获取: " + successCount.get());
        System.out.println("获取失败: " + failureCount.get());
    }
}

3.4.2 带超时的尝试获取

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

超时获取的特性

  • 有限等待:在指定时间内等待,超时则返回false
  • 可中断:等待过程中可以被中断
  • 精确控制:可以精确控制等待时间

超时获取的使用场景

public class TimeoutAccess {
    private final Semaphore semaphore = new Semaphore(2);
    
    // 场景1:用户请求处理,不能无限等待
    public boolean processUserRequest() {
        try {
            // 最多等待5秒
            if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
                try {
                    return doProcessing();
                } finally {
                    semaphore.release();
                }
            } else {
                System.out.println("系统繁忙,请稍后重试");
                return false;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    // 场景2:批处理任务,设置合理超时
    public boolean processBatch(List<Task> tasks) {
        int requiredPermits = Math.min(tasks.size(), semaphore.availablePermits());
        
        try {
            // 根据任务数量动态设置超时时间
            long timeout = tasks.size() * 100; // 每个任务预期100ms
            
            if (semaphore.tryAcquire(requiredPermits, timeout, TimeUnit.MILLISECONDS)) {
                try {
                    return processTasks(tasks.subList(0, requiredPermits));
                } finally {
                    semaphore.release(requiredPermits);
                }
            } else {
                // 部分处理策略
                return processPartially(tasks);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    private boolean doProcessing() {
        // 模拟处理逻辑
        return true;
    }
    
    private boolean processTasks(List<Task> tasks) {
        // 批量处理任务
        return true;
    }
    
    private boolean processPartially(List<Task> tasks) {
        // 部分处理逻辑
        return true;
    }
    
    static class Task {
        // 任务定义
    }
}

3.5 其他重要方法介绍

除了核心的获取和释放方法,Semaphore还提供了一些实用的辅助方法:

3.5.1 状态查询方法

// 获取当前可用许可证数量
public int availablePermits()

// 获取正在等待许可证的线程数量(估算值)
public final int getQueueLength()

// 检查是否有线程正在等待许可证
public final boolean hasQueuedThreads()

// 返回等待线程的集合(用于调试)
protected Collection<Thread> getQueuedThreads()

// 检查是否使用公平策略
public boolean isFair()

状态查询方法的使用示例

public class SemaphoreMonitor {
    private final Semaphore semaphore;
    private final int totalPermits;
    
    public SemaphoreMonitor(int permits, boolean fair) {
        this.semaphore = new Semaphore(permits, fair);
        this.totalPermits = permits;
    }
    
    public void printStatus() {
        System.out.println("=== Semaphore 状态监控 ===");
        System.out.println("总许可证数量: " + totalPermits);
        System.out.println("可用许可证: " + semaphore.availablePermits());
        System.out.println("使用中许可证: " + (totalPermits - semaphore.availablePermits()));
        System.out.println("等待队列长度: " + semaphore.getQueueLength());
        System.out.println("是否有等待线程: " + semaphore.hasQueuedThreads());
        System.out.println("公平模式: " + semaphore.isFair());
        System.out.println("使用率: " + String.format("%.1f%%", 
            (totalPermits - semaphore.availablePermits()) * 100.0 / totalPermits));
    }
    
    // 监控线程,定期输出状态
    public void startMonitoring() {
        Thread monitor = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    printStatus();
                    Thread.sleep(5000); // 每5秒监控一次
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        monitor.setDaemon(true);
        monitor.start();
    }
}

3.5.2 批量操作方法

// 排空所有可用许可证
public int drainPermits()

// 减少可用许可证数量
protected void reducePermits(int reduction)

批量操作的源码实现

public int drainPermits() {
    return sync.drainPermits();
}

final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

drainPermits的使用场景

public class SystemMaintenance {
    private final Semaphore semaphore = new Semaphore(10);
    
    // 系统维护时,停止新的请求处理
    public void startMaintenance() {
        System.out.println("开始系统维护...");
        
        // 排空所有许可证,阻止新请求
        int drained = semaphore.drainPermits();
        System.out.println("已排空 " + drained + " 个许可证");
        
        // 等待当前正在处理的请求完成
        waitForActiveRequestsComplete();
        
        // 执行维护操作
        performMaintenance();
    }
    
    public void endMaintenance() {
        System.out.println("维护完成,恢复服务...");
        
        // 恢复许可证,重新允许请求处理
        semaphore.release(10);
    }
    
    private void waitForActiveRequestsComplete() {
        // 等待活跃请求完成的逻辑
    }
    
    private void performMaintenance() {
        // 维护操作逻辑
    }
}

方法使用注意事项

  1. availablePermits():返回的是瞬时值,在高并发环境下可能立即过期
  2. getQueueLength():返回的是估算值,主要用于监控和调试
  3. drainPermits():会影响所有等待的线程,使用时需要谨慎
  4. reducePermits():受保护的方法,主要用于子类扩展

四、Semaphore典型使用场景

4.1 连接池资源管理

连接池是Semaphore最经典的应用场景之一。在数据库连接池、HTTP连接池等场景中,需要限制同时建立的连接数量以避免资源耗尽。

许可证状态
连接池架构
有可用许可
无可用许可
许可证池
容量: 5
已使用: 3
可用: 2
连接池管理器
客户端请求
Semaphore检查
分配连接
等待或失败
执行数据库操作
归还连接
释放许可证
唤醒等待线程

连接池的基本实现框架

public class DatabaseConnectionPool {
    private final Semaphore connectionSemaphore;
    private final Queue<Connection> availableConnections;
    private final Set<Connection> allConnections;
    private final int maxConnections;
    
    public DatabaseConnectionPool(int maxConnections) {
        this.maxConnections = maxConnections;
        this.connectionSemaphore = new Semaphore(maxConnections);
        this.availableConnections = new ConcurrentLinkedQueue<>();
        this.allConnections = ConcurrentHashMap.newKeySet();
        
        // 初始化连接池
        initializeConnections();
    }
    
    public Connection getConnection() throws InterruptedException {
        // 获取许可证
        connectionSemaphore.acquire();
        
        try {
            Connection connection = availableConnections.poll();
            if (connection == null || !isValidConnection(connection)) {
                connection = createNewConnection();
            }
            return connection;
        } catch (Exception e) {
            // 如果获取连接失败,释放许可证
            connectionSemaphore.release();
            throw e;
        }
    }
    
    public void returnConnection(Connection connection) {
        if (connection != null && allConnections.contains(connection)) {
            if (isValidConnection(connection)) {
                availableConnections.offer(connection);
            } else {
                // 连接已失效,创建新连接补充
                try {
                    Connection newConnection = createNewConnection();
                    availableConnections.offer(newConnection);
                } catch (Exception e) {
                    // 创建失败时记录日志,但仍要释放许可证
                    System.err.println("Failed to create replacement connection: " + e.getMessage());
                }
            }
            // 释放许可证,允许其他线程获取连接
            connectionSemaphore.release();
        }
    }
    
    private void initializeConnections() {
        for (int i = 0; i < maxConnections; i++) {
            try {
                Connection connection = createNewConnection();
                availableConnections.offer(connection);
            } catch (Exception e) {
                System.err.println("Failed to initialize connection: " + e.getMessage());
            }
        }
    }
    
    private Connection createNewConnection() {
        // 模拟创建数据库连接
        Connection connection = new MockConnection();
        allConnections.add(connection);
        return connection;
    }
    
    private boolean isValidConnection(Connection connection) {
        // 检查连接是否有效
        try {
            return connection != null && !connection.isClosed();
        } catch (Exception e) {
            return false;
        }
    }
    
    // 模拟Connection类
    private static class MockConnection implements Connection {
        private boolean closed = false;
        
        @Override
        public boolean isClosed() { return closed; }
        
        @Override
        public void close() { closed = true; }
        
        // 其他Connection方法的模拟实现...
    }
}

连接池使用示例

public class DatabaseService {
    private final DatabaseConnectionPool connectionPool;
    
    public DatabaseService() {
        this.connectionPool = new DatabaseConnectionPool(10); // 最大10个连接
    }
    
    public void executeQuery(String sql) {
        Connection connection = null;
        try {
            // 获取连接(可能需要等待)
            connection = connectionPool.getConnection();
            
            // 执行数据库操作
            executeSQL(connection, sql);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("获取数据库连接被中断");
        } catch (Exception e) {
            System.err.println("数据库操作失败: " + e.getMessage());
        } finally {
            // 确保连接被归还
            if (connection != null) {
                connectionPool.returnConnection(connection);
            }
        }
    }
    
    private void executeSQL(Connection connection, String sql) {
        // 模拟SQL执行
        System.out.println("执行SQL: " + sql);
        try {
            Thread.sleep(1000); // 模拟数据库操作耗时
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

4.2 限流与流量控制

在微服务架构中,限流是保护系统稳定性的重要手段。Semaphore可以有效控制接口的并发访问量。

客户端1 客户端2 客户端3 限流器(Semaphore) API服务 许可证数量: 2 请求许可证 获得许可证1 调用API 请求许可证 获得许可证2 调用API 请求许可证 等待(无可用许可证) 返回结果 释放许可证1 分配许可证给C3 调用API 返回结果 释放许可证2 返回结果 释放许可证 客户端1 客户端2 客户端3 限流器(Semaphore) API服务

接口限流器的实现

public class RateLimiter {
    private final Semaphore semaphore;
    private final int maxConcurrency;
    private final long timeoutMs;
    
    // 监控统计
    private final AtomicLong totalRequests = new AtomicLong(0);
    private final AtomicLong rejectedRequests = new AtomicLong(0);
    private final AtomicLong timeoutRequests = new AtomicLong(0);
    
    public RateLimiter(int maxConcurrency, long timeoutMs) {
        this.maxConcurrency = maxConcurrency;
        this.timeoutMs = timeoutMs;
        this.semaphore = new Semaphore(maxConcurrency);
    }
    
    /**
     * 执行限流保护的操作
     */
    public <T> T execute(Supplier<T> operation) throws RateLimitException {
        totalRequests.incrementAndGet();
        
        try {
            // 尝试在指定时间内获取许可证
            if (semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS)) {
                try {
                    return operation.get();
                } finally {
                    semaphore.release();
                }
            } else {
                // 获取许可证超时
                timeoutRequests.incrementAndGet();
                throw new RateLimitException("Request timeout: too many concurrent requests");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            rejectedRequests.incrementAndGet();
            throw new RateLimitException("Request interrupted", e);
        }
    }
    
    /**
     * 立即执行,不等待
     */
    public <T> Optional<T> tryExecute(Supplier<T> operation) {
        totalRequests.incrementAndGet();
        
        if (semaphore.tryAcquire()) {
            try {
                return Optional.of(operation.get());
            } finally {
                semaphore.release();
            }
        } else {
            rejectedRequests.incrementAndGet();
            return Optional.empty();
        }
    }
    
    /**
     * 获取限流统计信息
     */
    public RateLimitStats getStats() {
        return new RateLimitStats(
            maxConcurrency,
            semaphore.availablePermits(),
            totalRequests.get(),
            rejectedRequests.get(),
            timeoutRequests.get(),
            semaphore.getQueueLength()
        );
    }
    
    /**
     * 动态调整并发限制
     */
    public void adjustLimit(int newLimit) {
        int currentLimit = maxConcurrency;
        int difference = newLimit - currentLimit;
        
        if (difference > 0) {
            // 增加许可证
            semaphore.release(difference);
        } else if (difference < 0) {
            // 减少许可证
            try {
                semaphore.acquire(-difference);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 自定义异常
    public static class RateLimitException extends Exception {
        public RateLimitException(String message) {
            super(message);
        }
        
        public RateLimitException(String message, Throwable cause) {
            super(message, cause);
        }
    }
    
    // 统计信息类
    public static class RateLimitStats {
        private final int maxConcurrency;
        private final int availablePermits;
        private final long totalRequests;
        private final long rejectedRequests;
        private final long timeoutRequests;
        private final int waitingThreads;
        
        public RateLimitStats(int maxConcurrency, int availablePermits, 
                            long totalRequests, long rejectedRequests, 
                            long timeoutRequests, int waitingThreads) {
            this.maxConcurrency = maxConcurrency;
            this.availablePermits = availablePermits;
            this.totalRequests = totalRequests;
            this.rejectedRequests = rejectedRequests;
            this.timeoutRequests = timeoutRequests;
            this.waitingThreads = waitingThreads;
        }
        
        public double getSuccessRate() {
            return totalRequests == 0 ? 1.0 : 
                (totalRequests - rejectedRequests - timeoutRequests) * 1.0 / totalRequests;
        }
        
        public double getCurrentUtilization() {
            return (maxConcurrency - availablePermits) * 1.0 / maxConcurrency;
        }
        
        @Override
        public String toString() {
            return String.format(
                "RateLimitStats{maxConcurrency=%d, available=%d, " +
                "total=%d, rejected=%d, timeout=%d, waiting=%d, " +
                "successRate=%.2f%%, utilization=%.2f%%}",
                maxConcurrency, availablePermits, totalRequests, 
                rejectedRequests, timeoutRequests, waitingThreads,
                getSuccessRate() * 100, getCurrentUtilization() * 100
            );
        }
    }
}

限流器的使用示例

public class APIController {
    private final RateLimiter rateLimiter = new RateLimiter(10, 5000); // 最大10并发,超时5秒
    
    // 使用限流保护的API
    public ResponseEntity<String> protectedAPI() {
        try {
            String result = rateLimiter.execute(() -> {
                // 执行实际的业务逻辑
                return performBusinessLogic();
            });
            return ResponseEntity.ok(result);
            
        } catch (RateLimiter.RateLimitException e) {
            return ResponseEntity.status(429) // Too Many Requests
                .body("Service is busy, please try again later");
        }
    }
    
    // 立即响应的API(不等待)
    public ResponseEntity<String> immediateAPI() {
        Optional<String> result = rateLimiter.tryExecute(() -> {
            return performBusinessLogic();
        });
        
        if (result.isPresent()) {
            return ResponseEntity.ok(result.get());
        } else {
            return ResponseEntity.status(503) // Service Unavailable
                .body("Service is temporarily unavailable");
        }
    }
    
    // 监控接口
    public ResponseEntity<RateLimiter.RateLimitStats> getStats() {
        return ResponseEntity.ok(rateLimiter.getStats());
    }
    
    private String performBusinessLogic() {
        // 模拟业务处理
        try {
            Thread.sleep(1000); // 模拟1秒处理时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Business logic executed successfully";
    }
}

4.3 生产者-消费者模式

在生产者-消费者模式中,Semaphore可以用来控制缓冲区的容量,确保生产者不会生产过多数据,消费者也不会消费空缓冲区。

信号量控制
生产者-消费者模型
put
put
take
take
take
empty信号量
空槽位数量
full信号量
满槽位数量
mutex信号量
互斥访问
缓冲区
容量:5
生产者1
生产者2
消费者1
消费者2
消费者3

基于Semaphore的生产者-消费者实现

public class ProducerConsumerBuffer<T> {
    private final Queue<T> buffer;
    private final Semaphore emptySlots;  // 空槽位信号量
    private final Semaphore fullSlots;   // 满槽位信号量
    private final Semaphore mutex;       // 互斥访问信号量
    private final int capacity;
    
    // 统计信息
    private final AtomicLong producedCount = new AtomicLong(0);
    private final AtomicLong consumedCount = new AtomicLong(0);
    
    public ProducerConsumerBuffer(int capacity) {
        this.capacity = capacity;
        this.buffer = new LinkedList<>();
        this.emptySlots = new Semaphore(capacity);    // 初始时所有槽位都为空
        this.fullSlots = new Semaphore(0);            // 初始时没有满槽位
        this.mutex = new Semaphore(1);                // 互斥访问
    }
    
    /**
     * 生产者放入数据
     */
    public void put(T item) throws InterruptedException {
        emptySlots.acquire();   // 等待空槽位
        mutex.acquire();        // 获取互斥锁
        
        try {
            buffer.offer(item);
            producedCount.incrementAndGet();
            System.out.println("Produced: " + item + ", Buffer size: " + buffer.size());
        } finally {
            mutex.release();    // 释放互斥锁
            fullSlots.release(); // 增加满槽位计数
        }
    }
    
    /**
     * 生产者尝试放入数据(非阻塞)
     */
    public boolean tryPut(T item, long timeout, TimeUnit unit) throws InterruptedException {
        if (emptySlots.tryAcquire(timeout, unit)) {
            if (mutex.tryAcquire(timeout, unit)) {
                try {
                    buffer.offer(item);
                    producedCount.incrementAndGet();
                    System.out.println("Produced (timeout): " + item + ", Buffer size: " + buffer.size());
                    return true;
                } finally {
                    mutex.release();
                    fullSlots.release();
                }
            } else {
                emptySlots.release(); // 获取mutex失败,释放emptySlots
                return false;
            }
        }
        return false;
    }
    
    /**
     * 消费者取出数据
     */
    public T take() throws InterruptedException {
        fullSlots.acquire();    // 等待满槽位
        mutex.acquire();        // 获取互斥锁
        
        try {
            T item = buffer.poll();
            consumedCount.incrementAndGet();
            System.out.println("Consumed: " + item + ", Buffer size: " + buffer.size());
            return item;
        } finally {
            mutex.release();     // 释放互斥锁
            emptySlots.release(); // 增加空槽位计数
        }
    }
    
    /**
     * 消费者尝试取出数据(非阻塞)
     */
    public T tryTake(long timeout, TimeUnit unit) throws InterruptedException {
        if (fullSlots.tryAcquire(timeout, unit)) {
            if (mutex.tryAcquire(timeout, unit)) {
                try {
                    T item = buffer.poll();
                    consumedCount.incrementAndGet();
                    System.out.println("Consumed (timeout): " + item + ", Buffer size: " + buffer.size());
                    return item;
                } finally {
                    mutex.release();
                    emptySlots.release();
                }
            } else {
                fullSlots.release(); // 获取mutex失败,释放fullSlots
                return null;
            }
        }
        return null;
    }
    
    /**
     * 获取缓冲区状态
     */
    public BufferStats getStats() {
        return new BufferStats(
            capacity,
            buffer.size(),
            emptySlots.availablePermits(),
            fullSlots.availablePermits(),
            producedCount.get(),
            consumedCount.get()
        );
    }
    
    /**
     * 清空缓冲区
     */
    public void clear() throws InterruptedException {
        mutex.acquire();
        try {
            int size = buffer.size();
            buffer.clear();
            
            // 重置信号量状态
            fullSlots.drainPermits();
            emptySlots.drainPermits();
            emptySlots.release(capacity);
            
            System.out.println("Buffer cleared, removed " + size + " items");
        } finally {
            mutex.release();
        }
    }
    
    // 缓冲区统计信息
    public static class BufferStats {
        private final int capacity;
        private final int currentSize;
        private final int emptySlots;
        private final int fullSlots;
        private final long producedCount;
        private final long consumedCount;
        
        public BufferStats(int capacity, int currentSize, int emptySlots, 
                          int fullSlots, long producedCount, long consumedCount) {
            this.capacity = capacity;
            this.currentSize = currentSize;
            this.emptySlots = emptySlots;
            this.fullSlots = fullSlots;
            this.producedCount = producedCount;
            this.consumedCount = consumedCount;
        }
        
        public double getUtilization() {
            return currentSize * 1.0 / capacity;
        }
        
        @Override
        public String toString() {
            return String.format(
                "BufferStats{capacity=%d, size=%d, empty=%d, full=%d, " +
                "produced=%d, consumed=%d, utilization=%.1f%%}",
                capacity, currentSize, emptySlots, fullSlots,
                producedCount, consumedCount, getUtilization() * 100
            );
        }
    }
}

生产者-消费者的使用示例

public class ProducerConsumerDemo {
    private static final ProducerConsumerBuffer<String> buffer = 
        new ProducerConsumerBuffer<>(5);
    
    public static void main(String[] args) throws InterruptedException {
        // 启动多个生产者
        for (int i = 0; i < 3; i++) {
            final int producerId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < 10; j++) {
                        String item = "Producer-" + producerId + "-Item-" + j;
                        buffer.put(item);
                        Thread.sleep(1000); // 生产间隔
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i).start();
        }
        
        // 启动多个消费者
        for (int i = 0; i < 2; i++) {
            final int consumerId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < 15; j++) {
                        String item = buffer.take();
                        // 模拟消费处理时间
                        Thread.sleep(1500);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Consumer-" + i).start();
        }
        
        // 监控线程
        new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    Thread.sleep(3000);
                    System.out.println("=== " + buffer.getStats() + " ===");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Monitor").start();
        
        // 主线程等待
        Thread.sleep(30000);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到