一、基础概念
1.1 进程与线程
进程是指运行中的程序。 比如我们使用浏览器,需要启动这个程序,操作系统会给这个程序分配一定的资源(占用内存资源)。
线程是CPU调度的基本单位,每个线程执行的都是某一个进程的代码的某个片段。
1.2 多线程
多线程是指:单个进程中同时运行多个线程。
多线程的不低是为了提高CPU的利用率。
可以通过避免一些网络IO或者磁盘IO等需要等待的操作,让CPU去调度其他线程。
多线程的局限
- 如果线程数量特别多,CPU在切换线程上下文时,会额外造成很大的消耗。
- 任务的拆分需要依赖业务场景,有一些异构化的任务,很难对任务拆分,还有很多业务并不是多线程处理更好。
- 线程安全问题:虽然多线程带来了一定的性能提升,但是再做一些操作时,多线程如果操作临界资源,可能会发生一些数据不一致的安全问题,甚至涉及到锁操作时,会造成死锁问题。
1.3 串行、并行、并发
串行就是一个一个排队,第一个做完,第二个才能上。
并行就是同时处理。(一起上!!!)
多线程中的并发概念(CPU调度线程的概念)。CPU在极短的时间内,反复切换执行不同的线程,看似好像是并行,但是只是CPU高速的切换。
并行囊括并发。
并行就是多核CPU同时调度多个线程,是真正的多个线程同时执行。
单核CPU无法实现并行效果,单核CPU是并发。
二、线程的创建
2.1 继承Thread类 重写run方法
启动线程是调用start方法,这样会创建一个新的线程,并执行线程的任务。
如果直接调用run方法,这样会让当前线程执行run方法中的业务逻辑。
public class MiTest {
public static void main(String[] args) {
MyJob t1 = new MyJob();
t1.start();
for (int i = 0; i < 100; i++) {
System.out.println("main:" + i);
}
}
}
class MyJob extends Thread{
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("MyJob:" + i);
}
}
}
2.2 实现Runnable接口 重写run方法
public class MiTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
Thread t1 = new Thread(myRunnable);
t1.start();
for (int i = 0; i < 1000; i++) {
System.out.println("main:" + i);
}
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("MyRunnable:" + i);
}
}
}
匿名内部类方式:
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("匿名内部类:" + i);
}
}
});
lambda方式:
Thread t2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
System.out.println("lambda:" + i);
}
});
2.3 实现Callable 重写call方法,配合FutureTask
Callable一般用于有返回结果的非阻塞的执行方法。
public class MiTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 创建MyCallable
MyCallable myCallable = new MyCallable();
//2. 创建FutureTask,传入Callable
FutureTask futureTask = new FutureTask(myCallable);
//3. 创建Thread线程
Thread t1 = new Thread(futureTask);
//4. 启动线程
t1.start();
//5. 做一些操作
//6. 要结果
Object count = futureTask.get();
System.out.println("总和为:" + count);
}
}
class MyCallable implements Callable{
@Override
public Object call() throws Exception {
int count = 0;
for (int i = 0; i < 100; i++) {
count += i;
}
return count;
}
}
三、线程的使用
3.1 线程的状态
NEW:Thread对象被创建出来了,但是还没有执行start方法。
RUNNABLE:Thread对象调用了start方法,就为RUNNABLE状态(CPU调度/没有调度)
WAITING:可以理解为是阻塞、等待状态,因为处在这三种状态下,CPU不会调度当前线程
WAITING:调用wait方法就会处于WAITING状态,需要被手动唤醒
TERMINATED:run方法执行完毕,线程生命周期到头了
3.2 线程的常用方法
3.2.1 获取当前线程
Thread的静态方法获取当前线程对象:
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 获取当前线程的方法
Thread main = Thread.currentThread();
System.out.println(main);
// "Thread[" + getName() + "," + getPriority() + "," + group.getName() + "]";
// Thread[main,5,main]
}
3.2.2 线程的名字
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName());
});
t1.setName("模块-功能-计数器");
t1.start();
}
3.2.3 线程的优先级
低到高:1-10
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
System.out.println("t1:" + i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
System.out.println("t2:" + i);
}
});
t1.setPriority(1);
t2.setPriority(10);
t2.start();
t1.start();
}
3.2.4 线程的让步
可以通过Thread的静态方法yield,让当前线程从运行状态转变为就绪状态。
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
if(i == 50){
Thread.yield();
}
System.out.println("t1:" + i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
System.out.println("t2:" + i);
}
});
t2.start();
t1.start();
}
3.2.5 线程的休眠
Thread的静态方法,让线程从运行状态转变为等待状态
sleep有两个方法重载:
- 第一个就是native修饰的,让线程转为等待状态的效果
public static native void sleep(long millis) throws InterruptedException;
- 第二个是可以传入毫秒和一个纳秒的方法(如果纳秒值大于等于0.5毫秒,就给休眠的毫秒值+1。如果传入的毫秒值是0,纳秒值不为0,就休眠1毫秒)
public static void sleep(long millis, int nanos)
throws InterruptedException {
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}
if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
millis++;
}
sleep(millis);
}
public static void main(String[] args) throws InterruptedException {
System.out.println(System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(System.currentTimeMillis());
}
3.2.6 线程的强占
某一个线程下去调用Thread的非静态方法join方法。
如果在main线程中调用了t1.join(),那么main线程会进入到等待状态,需要等待t1线程全部执行完毕,在恢复到就绪状态等待CPU调度。
如果在main线程中调用了t1.join(2000),那么main线程会进入到等待状态,需要等待t1执行2s后,在恢复到就绪状态等待CPU调度。如果在等待期间,t1已经结束了,那么main线程自动变为就绪状态等待CPU调度。
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("t1:" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
for (int i = 0; i < 10; i++) {
System.out.println("main:" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (i == 1){
try {
t1.join(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3.2.7 线程的等待和唤醒
获取synchronized锁资源的线程通过wait方法进去到锁的等待池,并且会释放锁资源。
可以让获取synchronized锁资源的线程,通过notify或者notifyAll方法,将等待池中的线程唤醒,添加到锁池中。
notify随机的唤醒等待池中的一个线程到锁池。
notifyAll将等待池中的全部线程都唤醒,并且添加到锁池。
在调用wait方法和notify以及norifyAll方法时,必须在synchronized修饰的代码块或者方法内部才可以,因为要操作基于某个对象的锁的信息维护。
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
sync();
},"t1");
Thread t2 = new Thread(() -> {
sync();
},"t2");
t1.start();
t2.start();
Thread.sleep(12000);
synchronized (MiTest.class) {
MiTest.class.notifyAll();
}
}
public static synchronized void sync() {
try {
for (int i = 0; i < 10; i++) {
if(i == 5) {
MiTest.class.wait();
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
四、锁
4.1锁的分类
1 可重入锁、不可重入锁
Java中提供的synchronized,ReentrantLock,ReentrantReadWriteLock都是可重入锁。
重入:当前线程获取到A锁,在获取之后尝试再次获取A锁是可以直接拿到的。
不可重入:当前线程获取到A锁,在获取之后尝试再次获取A锁,无法获取到的,因为A锁被当前线程占用着,需要等待自己释放锁再获取锁。
2 乐观锁、悲观锁
Java中提供的synchronized,ReentrantLock,ReentrantReadWriteLock都是悲观锁。
Java中提供的CAS操作,就是乐观锁的一种实现。
悲观锁:获取不到锁资源时,会将当前线程挂起(进入BLOCKED、WAITING),线程挂起会涉及到用户态和内核的太的切换,而这种切换是比较消耗资源的。
- 用户态:JVM可以自行执行的指令,不需要借助操作系统执行。
- 内核态:JVM不可以自行执行,需要操作系统才可以执行。
乐观锁:获取不到锁资源,可以再次让CPU调度,重新尝试获取锁资源。
Atomic原子性类中,就是基于CAS乐观锁实现的。
3 公平锁、非公平锁
Java中提供的synchronized只能是非公平锁。
Java中提供的ReentrantLock,ReentrantReadWriteLock可以实现公平锁和非公平锁
公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,锁被A持有,同时线程B在排队。直接排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。
非公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,先尝试竞争一波
- 拿到锁资源:开心,插队成功。
- 没有拿到锁资源:依然要排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。
4 互斥锁、共享锁
Java中提供的synchronized、ReentrantLock是互斥锁。
Java中提供的ReentrantReadWriteLock,有互斥锁也有共享锁。
互斥锁:同一时间点,只会有一个线程持有者当前互斥锁。
共享锁:同一时间点,当前共享锁可以被多个线程同时持有。
4.2synchronized
synchronized的锁是基于对象实现的,使用一般就是同步方法和同步代码块。
synchronized的优化:
锁消除:在synchronized修饰的代码中,如果不存在操作临界资源的情况,会触发锁消除,你即便写了synchronized,他也不会触发。
锁膨胀:如果在一个循环中,频繁的获取和释放做资源,这样带来的消耗很大,锁膨胀就是将锁的范围扩大,避免频繁的竞争和获取锁资源带来不必要的消耗。
锁升级:ReentrantLock的实现,是先基于乐观锁的CAS尝试获取锁资源,如果拿不到锁资源,才会挂起线程。
- 无锁、匿名偏向:当前对象没有作为锁存在。
- 偏向锁:如果当前锁资源,只有一个线程在频繁的获取和释放,那么这个线程过来,只需要判断,当前指向的线程是否是当前线程 。
- 如果是,直接拿着锁资源走。
- 如果当前线程不是,基于CAS的方式,尝试将偏向锁指向当前线程。如果获取不到,触发锁升级,升级为轻量级锁。(偏向锁状态出现了锁竞争的情况)
- 轻量级锁:会采用自旋锁的方式去频繁的以CAS的形式获取锁资源(采用的是自适应自旋锁)
- 如果成功获取到,拿着锁资源走
- 如果自旋了一定次数,没拿到锁资源,锁升级。
- 重量级锁:就是最传统的synchronized方式,拿不到锁资源,就挂起当前线程。(用户态&内核态)
偏向锁在升级为轻量级锁时,会涉及到偏向锁撤销,需要等到一个安全点(STW),才可以做偏向锁撤销,在明知道有并发情况,就可以选择不开启偏向锁,或者是设置偏向锁延迟开启。
因为JVM在启动时,需要加载大量的.class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销操作,JVM启动初期,有一个延迟4s开启偏向锁的操作
如果正常开启偏向锁了,那么不会出现无锁状态,对象会直接变为匿名偏向。
4.3ReentrantLock
- ReentrantLock是个类,synchronized是关键字,当然都是在JVM层面实现互斥锁的方式
效率区别:
- 如果竞争比较激烈,推荐ReentrantLock去实现,不存在锁升级概念。而synchronized是存在锁升级概念的,如果升级到重量级锁,是不存在锁降级的。
底层实现区别:
- 实现原理是不一样,ReentrantLock基于AQS实现的,synchronized是基于ObjectMonitor
功能向的区别:
- ReentrantLock的功能比synchronized更全面。
- ReentrantLock支持公平锁和非公平锁
- ReentrantLock可以指定等待锁资源的时间。
非公平锁的流程
释放锁
五、阻塞队列
生产者 消费者彼此之间不会直接通讯的,而是通过一个容器(队列)进行通讯。
所以生产者生产完数据后扔到容器中,不通用等待消费者来处理。
消费者不需要去找生产者要数据,直接从容器中获取即可。
而这种容器最常用的结构就是队列。
5.1ArrayBlockingQueue
ArrayBlockingQueue在初始化的时候,必须指定当前队列的长度。
因为ArrayBlockingQueue是基于数组实现的队列结构,数组长度不可变,必须提前设置数组长度信息。
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
// 必须设置队列的长度
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 生产者扔数据
queue.add("1");
queue.offer("2");
queue.offer("3",2,TimeUnit.SECONDS);
queue.put("2");
// 消费者取数据
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2,TimeUnit.SECONDS));
System.out.println(queue. Take());
}
生产者方法实现原理
add方法实现
add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常
public boolean add(E e) {
if (offer(e))
return true;
else
// 抛出的异常
throw new IllegalStateException("Queue full");
}
offer方法实现
public boolean offer(E e) {
// 要求存储的数据不允许为null,为null就抛出空指针
checkNotNull(e);
// 当前阻塞队列的lock锁
final ReentrantLock lock = this.lock;
// 为了保证线程安全,加锁
lock.lock();
try {
// 如果队列中的元素已经存满了,
if (count == items.length)
// 返回false
return false;
else {
// 队列没满,执行enqueue将元素添加到队列中
enqueue(e);
// 返回true
return true;
}
} finally {
// 操作完释放锁
lock.unlock();
}
}
//==========================================================
private void enqueue(E x) {
// 拿到数组的引用
final Object[] items = this.items;
// 将元素放到指定位置
items[putIndex] = x;
// 对inputIndex进行++操作,并且判断是否已经等于数组长度,需要归位
if (++putIndex == items.length)
// 将索引设置为0
putIndex = 0;
// 元素添加成功,进行++操作。
count++;
// 将一个Condition中阻塞的线程唤醒。
notEmpty.signal();
}
offer(time,unit)方法
生产者在添加数据时,如果队列已经满了,阻塞一会。
- 阻塞到消费者消费了消息,然后唤醒当前阻塞线程
- 阻塞到了time时间,再次判断是否可以添加,不能,不添加。
// 如果线程在挂起的时候,如果对当前阻塞线程的中断标记位进行设置,此时会抛出异常直接结束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空检验
checkNotNull(e);
// 将时间单位转换为纳秒
long nanos = unit.toNanos(timeout);
// 加锁
final ReentrantLock lock = this.lock;
// 允许线程中断并排除异常的加锁方式
lock.lockInterruptibly();
try {
// 为什么是while(虚假唤醒)
// 如果元素个数和数组长度一致,队列慢了
while (count == items.length) {
// 判断等待的时间是否还充裕
if (nanos <= 0)
// 不充裕,直接添加失败
return false;
// 挂起等待,会同时释放锁资源(对标sync的wait方法)
// awaitNanos会挂起线程,并且返回剩余的阻塞时间
// 恢复执行时,需要重新获取锁资源
nanos = notFull.awaitNanos(nanos);
}
// 说明队列有空间了,enqueue将数据扔到阻塞队列中
enqueue(e);
return true;
} finally {
// 释放锁资源
lock.unlock();
}
}
put方法
如果队列是满的, 就一直挂起,直到被唤醒,或者被中断。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// await方法一直阻塞,直到被唤醒或者中断标记位
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
消费者方法实现原理
remove方法
// remove方法就是调用了poll
public E remove() {
E x = poll();
// 如果有数据,直接返回
if (x != null)
return x;
// 没数据抛出异常
else
throw new NoSuchElementException();
}
poll方法
// 拉取数据
public E poll() {
// 加锁操作
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果没有数据,直接返回null,如果有数据,执行dequeue,取出数据并返回
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//==========================================================
// 取出数据
private E dequeue() {
// 将成员变量引用到局部变量
final Object[] items = this.items;
// 直接获取指定索引位置的数据
E x = (E) items[takeIndex];
// 将数组上指定索引位置设置为null
items[takeIndex] = null;
// 设置下次取数据时的索引位置
if (++takeIndex == items.length)
takeIndex = 0;
// 对count进行--操作
count--;
// 迭代器内容,先跳过
if (itrs != null)
itrs.elementDequeued();
// signal方法,会唤醒当前Condition中排队的一个Node。
// signalAll方法,会将Condition中所有的Node,全都唤醒
notFull.signal();
// 返回数据。
return x;
}
poll(time, unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 转换时间单位
long nanos = unit.toNanos(timeout);
// 竞争锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果没有数据
while (count == 0) {
if (nanos <= 0)
// 没数据,也无法阻塞了,返回null
return null;
// 没数据,挂起消费者线程
nanos = notEmpty.awaitNanos(nanos);
}
// 取数据
return dequeue();
} finally {
lock.unlock();
}
}
take()方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 虚假唤醒
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
六、线程池
6.1构建线程池
6.1.1newFixedThreadPool
这个线程池的线程数是固定的,在创建时指定。构建方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构建好当前线程池后,线程个数已经固定好(线程是懒加载,在构建之初,线程并没有构建出来,而是随着人任务的提交才会将线程在线程池中国构建出来)。如果线程没构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到LinkedBlockingQueue无界队列中存放,等待线程从LinkedBlockingQueue中去take出任务,然后执行。
6.1.2newSingleThreadExecutor
单例线程池,线程池中只有一个工作线程在处理任务,业务涉及到顺序消费可以使用:
// 当前这里就是构建单例线程池的方式
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
// 在内部依然是构建了ThreadPoolExecutor,设置的线程个数为1
// 当任务投递过来后,第一个任务会被工作线程处理,后续的任务会被扔到阻塞队列中
// 投递到阻塞队列中任务的顺序,就是工作线程处理的顺序
// 当前这种线程池可以用作顺序处理的一些业务中
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
// 线程池的使用没有区别,跟正常的ThreadPoolExecutor没区别
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
// finalize是当前对象被GC干掉之前要执行的方法
// 当前FinalizableDelegatedExecutorService的目的是为了在当前线程池被GC回收之前
// 可以执行shutdown,shutdown方法是将当前线程池停止,并且干掉工作线程
// 但是不能基于这种方式保证线程池一定会执行shutdown
// finalize在执行时,是守护线程,这种线程无法保证一定可以执行完毕。
// 在使用线程池时,如果线程池是基于一个业务构建的,在使用完毕之后,一定要手动执行shutdown,
// 否则会造成JVM中一堆线程
protected void finalize() {
super.shutdown();
}
}
6.1.3newCachedThreadPool
构建方式:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
当第一次提交任务到线程池时,会直接构建一个工作线程。
这个工作线程带执行完人后,60秒没有任务可以执行后,会结束。
如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行。
如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。
最大的一个特点,任务只要提交到当前的newCachedThreadPool中,就必然有工作线程可以处理。
6.1.4newScheduleThreadPool
是一个定时任务的线程,可以以一个周期去执行一个任务,或者延迟多久执行一个任务一次。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
在原来的线程池基础上实现了定时任务的功能,原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后,再次扔回到阻塞队列。
6.2ThreadPoolExecutor应用
ThreadPoolExecutor提供的七个核心参数:
public ThreadPoolExecutor(
int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间
TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位
BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息
RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
// 初始化线程池的操作
}
拒绝策略:
AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异常:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
//默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃掉
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操作
private static class MyRejectedExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("根据自己的业务情况,决定编写的代码!");
}
}
代码构建线程池,并处理有无返回结果的任务:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 构建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-ThreadPoolExecutor");
return thread;
}
},
new MyRejectedExecution()
);
//2. 让线程池处理任务,没返回结果
threadPool.execute(() -> {
System.out.println("没有返回结果的任务");
});
//3. 让线程池处理有返回结果的任务
Future<Object> future = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("我有返回结果!");
return "返回结果";
}
});
Object result = future.get();
System.out.println(result);
//4. 如果是局部变量的线程池,记得用完要shutdown
threadPool.shutdown();
}
private static class MyRejectedExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("根据自己的业务情况,决定编写的代码!");
}
}
核心属性:
// 当前是线程池的核心属性
// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。
// ctl表示着线程池中的2个核心状态:
// 线程池的状态:ctl的高3位,表示线程池状态
// 工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE:在获取Integer的bit位个数
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态的表示
// 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;
// 在使用下面这几个方法时,需要传递ctl进来
// 基于&运算的特点,保证只会拿到ctl高三位的值。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c) { return c & CAPACITY; }
线程池状态和切换方式:
ThreadPoolExecutor的execute方法
// 提交任务到线程池的核心方法
// command就是提交过来的任务
public void execute(Runnable command) {
// 提交的任务不能为null
if (command == null)
throw new NullPointerException();
// 获取核心属性ctl,用于后面的判断
int c = ctl.get();
// 如果工作线程个数,小于核心线程数。
// 满足要求,添加核心工作线程
if (workerCountOf(c) < corePoolSize) {
// addWorker(任务,是核心线程吗)
// addWorker返回true:代表添加工作线程成功
// addWorker返回false:代表添加工作线程失败
// addWorker中会基于线程池状态,以及工作线程个数做判断,查看能否添加工作线程
if (addWorker(command, true))
// 工作线程构建出来了,任务也交给command去处理了。
return;
// 说明线程池状态或者是工作线程个数发生了变化,导致添加失败,重新获取一次ctl
c = ctl.get();
}
// 添加核心工作线程失败,往这走
// 判断线程池状态是否是RUNNING,如果是,正常基于阻塞队列的offer方法,将任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 如果任务添加到阻塞队列成功,走if内部
// 如果任务在扔到阻塞队列之前,线程池状态突然改变了。
// 重新获取ctl
int recheck = ctl.get();
// 如果线程池的状态不是RUNNING,将任务从阻塞队列移除,
if (!isRunning(recheck) && remove(command))
// 并且直接拒绝策略
reject(command);
// 在这,说明阻塞队列有我刚刚放进去的任务
// 查看一下工作线程数是不是0个
// 如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务
// 发生这种情况有两种:
// 1. 构建线程池时,核心线程数是0个。
// 2. 即便有核心线程,可以设置核心线程也允许超时,设置allowCoreThreadTimeOut为true,代表核心线程也可以超时
else if (workerCountOf(recheck) == 0)
// 为了避免阻塞队列中的任务饥饿,添加一个非核心工作线程去处理
addWorker(null, false);
}
// 任务添加到阻塞队列失败
// 构建一个非核心工作线程
// 如果添加非核心工作线程成功,直接完事,告辞
else if (!addWorker(command, false))
// 添加失败,执行决绝策略
reject(command);
}
execute方法的流程图:
ThreadPoolExecutor的addWorker方法:
- 一、校验线程池的状态以及工作线程个数
- 二、添加工作线程并且启动工作线程
校验线程池的状态以及工作线程个数
// 添加工作线程之校验源码
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层for循环在校验线程池的状态
// 内层for循环是在校验工作线程的个数
// retry是给外层for循环添加一个标记,是为了方便在内层for循坏跳出外层for循环
retry:
for (;;) {
// 获取ctl
int c = ctl.get();
// 拿到ctl的高3位的值
int rs = runStateOf(c);
//==========================线程池状态判断==================================================
// 如果线程池状态是SHUTDOWN,并且此时阻塞队列有任务,工作线程个数为0,添加一个工作线程去处理阻塞队列的任务
// 判断线程池的状态是否大于等于SHUTDOWN,如果满足,说明线程池不是RUNNING
if (rs >= SHUTDOWN &&
// 如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列任务
// 如果三个条件有一个没满足,返回false,配合!,就代表不需要添加
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
// 不需要添加工作线程
return false;
for (;;) {
//==========================工作线程个数判断==================================================
// 基于ctl拿到低29位的值,代表当前工作线程个数
int wc = workerCountOf(c);
// 如果工作线程个数大于最大值了,不可以添加了,返回false
if (wc >= CAPACITY ||
// 基于core来判断添加的是否是核心工作线程
// 如果是核心:基于corePoolSize去判断
// 如果是非核心:基于maximumPoolSize去判断
wc >= (core ? corePoolSize : maximumPoolSize))
// 代表不能添加,工作线程个数不满足要求
return false;
// 针对ctl进行 + 1,采用CAS的方式
if (compareAndIncrementWorkerCount(c))
// CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。
break retry;
// 重新获取一次ctl的值
c = ctl.get();
// 判断重新获取到的ctl中,表示的线程池状态跟之前的是否有区别
// 如果状态不一样,说明有变化,重新的去判断线程池状态
if (runStateOf(c) != rs)
// 跳出一次外层for循环
continue retry;
}
}
// 省略添加工作线程以及启动的过程
}
添加工作线程并且启动工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 省略校验部分的代码
// 添加工作线程以及启动工作线程~~~
// 声明了三个变量
// 工作线程启动了没,默认false
boolean workerStarted = false;
// 工作线程添加了没,默认false
boolean workerAdded = false;
// 工作线程,默认为null
Worker w = null;
try {
// 构建工作线程,并且将任务传递进去
w = new Worker(firstTask);
// 获取了Worker中的Thread对象
final Thread t = w.thread;
// 判断Thread是否不为null,在new Worker时,内部会通过给予的ThreadFactory去构建Thread交给Worker
// 一般如果为null,代表ThreadFactory有问题。
if (t != null) {
// 加锁,保证使用workers成员变量以及对largestPoolSize赋值时,保证线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取线程池状态。
int rs = runStateOf(ctl.get());
// 再次判断
// 如果满足 rs < SHUTDOWN 说明线程池是RUNNING,状态正常,执行if代码块
// 如果线程池状态为SHUTDOWN,并且firstTask为null,添加非核心工作处理阻塞队列任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 到这,可以添加工作线程。
// 校验ThreadFactory构建线程后,不能自己启动线程,如果启动了,抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
// private final HashSet<Worker> workers = new HashSet<Worker>();
// 将new好的Worker添加到HashSet中。
workers.add(w);
// 获取了HashSet的size,拿到工作线程个数
int s = workers.size();
// largestPoolSize在记录最大线程个数的记录
// 如果当前工作线程个数,大于最大线程个数的记录,就赋值
if (s > largestPoolSize)
largestPoolSize = s;
// 添加工作线程成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果工作线程添加成功,
if (workerAdded) {
// 直接启动Worker中的线程
t.start();
// 启动工作线程成功
workerStarted = true;
}
}
} finally {
// 做补偿的操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉
if (!workerStarted)
addWorkerFailed(w);
}
// 返回工作线程是否启动成功
return workerStarted;
}
// 工作线程启动失败,需要不的步长操作
private void addWorkerFailed(Worker w) {
// 因为操作了workers,需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果w不为null,之前Worker已经new出来了。
if (w != null)
// 从HashSet中移除
workers.remove(w);
// 同时对ctl进行 - 1,代表去掉了一个工作线程个数
decrementWorkerCount();
// 因为工作线程启动失败,判断一下状态的问题,是不是可以走TIDYING状态最终到TERMINATED状态了。
tryTerminate();
} finally {
// 释放锁
mainLock.unlock();
}
}