超时版(工作线程获取任务超时):自定义线程池——超时版
拒绝策略版(生产者添加任务超时) :自定义线程池——拒绝策略版
实现逻辑
实现
ThreadPool
/**
* 自定义线程池
*/
@Slf4j(topic = "c.eight-Demo1")
class ThreadPool{
//线程集合:用来存放工作线程
private HashSet<Wroking> threadsSet;
//集合最大值:用户指定线程池的线程数量
private final int threadsSetSize;
//阻塞队列:工作线程繁忙时存放任务
private BlockingQueue<Runnable> blockingQueue;
ThreadPool(int threadsSetSize,int blockingQueueSize) {
threadsSet=new HashSet<>();
this.threadsSetSize=threadsSetSize;
blockingQueue=new BlockingQueue(blockingQueueSize);
}
//1.调用线程池的execute(Runnable r)
public synchronized void execute(Runnable r){
//2.判断线程集合的大小是否>=最大线程数?
if(threadsSet.size()>=threadsSetSize){
//2.2 是,将任务添加到阻塞队列中
blockingQueue.put(r);
}else {
// 2.1 否,创建Worker对象并添加到线程集合中并启动线程.Worker对象继承Thread
Wroking wroking=new Wroking(r);
threadsSet.add(wroking);
wroking.start();
}
}
//自定义工作线程
class Wroking extends Thread{
//该线程执行的任务内容
Runnable task;
public Wroking(Runnable r) {
task=r;
}
@Override
public void run() {
//在run()中执行Runnable任务,执行完成后从阻塞队列中获取任务继续执行
while(task!=null || (task=blockingQueue.get())!=null){
task.run();
task=null;
}
//任务全部执行完毕,在线程集合中移除线程。
}
}
}
BlockingQueue
/**
* 自定义阻塞队列
*/
@Slf4j(topic = "c.eight-Demo1")
class BlockingQueue<T>{
//任务队列
private Deque<T> tasksQueue=new ArrayDeque<>();
//队列最大值
private int blockingQueueSize;
//锁
private ReentrantLock lock=new ReentrantLock();
//消费者条件变量
private Condition Consumer=lock.newCondition();
//生产者条件变量
private Condition Producer=lock.newCondition();
public BlockingQueue(int blockingQueueSize) {
this.blockingQueueSize=blockingQueueSize;
}
//将任务添加到阻塞队列中
public void put(T r) {
lock.lock();
try{
// 3.阻塞队列判断队列大小是否==限制大小
while(tasksQueue.size()==blockingQueueSize){
// 3.2 是,将执行任务的线程阻塞在生产者的条件变量中
try {
Producer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 3.1 否,将任务添加到队列队尾并唤醒消费者条件变量中的阻塞线程
tasksQueue.addLast(r);
Consumer.signal();
}finally {
lock.unlock();
}
}
//线程任务执行完毕,取任务
public T get(){
lock.lock();
try{
while(tasksQueue.size()==0){
// 2.1.1 队列为空,进入消费者条件变量中进行阻塞
try {
Consumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 2.1.2 队列不为空,获取任务执行并唤醒生产条件变量中的生产线程
T r=tasksQueue.pollFirst();
Producer.signal();
return r;
}finally {
lock.unlock();
}
}
}
测试
代码
public static void main(String[] args) {
ThreadPool threadPool=new ThreadPool(2,1);
for (int i = 0; i < 5; i++) {
int j=i;
threadPool.execute(()->{
log.debug("执行任务"+j);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("任务"+j+"执行完毕");
});
}
}
结果
17:48:26 [Thread-1] c.eight-Demo1 - 执行任务1
17:48:26 [Thread-0] c.eight-Demo1 - 执行任务0
17:48:28 [Thread-1] c.eight-Demo1 - 任务1执行完毕
17:48:28 [Thread-0] c.eight-Demo1 - 任务0执行完毕
17:48:28 [Thread-1] c.eight-Demo1 - 执行任务2
17:48:28 [Thread-0] c.eight-Demo1 - 执行任务3
17:48:30 [Thread-1] c.eight-Demo1 - 任务2执行完毕
17:48:30 [Thread-0] c.eight-Demo1 - 任务3执行完毕
17:48:30 [Thread-1] c.eight-Demo1 - 执行任务4
17:48:32 [Thread-1] c.eight-Demo1 - 任务4执行完毕