本章是Reactor核心-前置知识(第四期),主要讲解线程池。本文章只适合有基础或从业人员进行学习。如果觉得文章有用,就点赞加藏关注支持一下吧。
filter、map、flatMap等等中间操作:流的每一个元素都完整走完一个流水线,才会得到最终结果。当前中间操作执行完,才会执行下一个中间操作。基于事件机制回调
复习上一章讲解了StreamAPI:
流的三大部分:1、创建流 2、0-N个中间操作 3、一个终止操作
流的特性:流式Lazy的,没用最终操作,中间操作的方法不会执行。
一、线程池定义
线程池是一种管理线程的机制,它预先创建一定数量的线程,并将这些线程存储在一个 “池” 中。当有任务提交时,线程池会从池中取出一个空闲线程来执行该任务;任务执行完毕后,线程不会被销毁,而是返回到线程池中,等待下一个任务,实现线程的复用。
二、工作原理
任务提交:当有新的任务到达时,会将任务提交给线程池。
线程分配:线程池会根据自身的状态和配置来决定如何处理这个任务。
如果线程池中的线程数量小于核心线程数,会创建一个新的线程来执行该任务。
如果线程数量已经达到核心线程数,任务会被放入任务队列中等待执行。
如果任务队列已满,且线程数量小于最大线程数,会创建新的线程来执行任务。
如果线程数量已经达到最大线程数,任务队列也已满,此时会根据线程池的拒绝策略来处理新任务。
任务执行:线程从任务队列中取出任务并执行。
线程回收:任务执行完毕后,线程不会立即销毁,而是返回线程池等待下一个任务。
三、线程池的优势
降低资源消耗:通过复用线程,避免了频繁创建和销毁线程带来的开销,减少了系统资源的消耗。
提高响应速度:由于线程已经预先创建,当有任务提交时,无需等待线程的创建,能够立即开始执行任务,提高了系统的响应速度。
便于线程管理:线程池可以对线程进行统一的管理和监控,例如设置线程的最大数量、控制任务队列的大小等,有助于提高系统的稳定性和可维护性。
四、创建方式
在 Java 中,可以使用 java.util.concurrent
包下的 Executors
类来创建不同类型的线程池,也可以直接使用 ThreadPoolExecutor
类进行自定义创建。
1.固定大小线程池:Executors.newFixedThreadPool(int nThreads)
创建一个固定大小的线程池,线程数量始终保持不变。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为 3 的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed.");
});
}
executor.shutdown();
}
}
2.单线程线程池:Executors.newSingleThreadExecutor()
创建一个只有一个线程的线程池,任务会按照提交的顺序依次执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
// 创建一个单线程线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed.");
});
}
executor.shutdown();
}
}
3.缓存线程池:Executors.newCachedThreadPool()
创建一个可缓存的线程池,线程数量会根据任务的多少自动调整。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
// 创建一个缓存线程池
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed.");
});
}
executor.shutdown();
}
}
4.定时任务线程池:Executors.newScheduledThreadPool(int corePoolSize)
创建一个可以执行定时任务和周期性任务的线程池。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
// 创建一个定时任务线程池,核心线程数为 2
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 延迟 2 秒后执行任务
executor.schedule(() -> {
System.out.println("Task is running on thread " + Thread.currentThread().getName());
}, 2, TimeUnit.SECONDS);
// 延迟 1 秒后开始执行任务,之后每隔 3 秒执行一次
executor.scheduleAtFixedRate(() -> {
System.out.println("Periodic task is running on thread " + Thread.currentThread().getName());
}, 1, 3, TimeUnit.SECONDS);
}
}
5.使用 ThreadPoolExecutor
自定义创建
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
// 创建一个自定义的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, // 线程空闲时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(10) // 任务队列
);
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed.");
});
}
executor.shutdown();
}
}
注意事项:
避免使用
Executors
创建线程池:Executors
提供的一些创建线程池的方法可能会导致内存溢出等问题,例如newFixedThreadPool
和newSingleThreadExecutor
使用的是无界队列,可能会导致任务堆积;newCachedThreadPool
允许创建的线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,耗尽系统资源。建议使用ThreadPoolExecutor
进行自定义创建。合理配置线程池参数:根据系统的实际情况,合理设置核心线程数、最大线程数、任务队列大小等参数,以充分发挥线程池的性能。
正确关闭线程池:使用完线程池后,需要调用
shutdown()
或shutdownNow()
方法来关闭线程池,避免资源泄漏。
四、流与线程
1.流是并发还是不并发?和for循环有什么区别?
答:流默认不并发,和for循环一样挨个处理数据。也可以并发,使用paraller创建并发流,不能保证执行顺序,但执行结果一致。并发以后自行解决多线程安全问题,加锁等等。有状态数据会产生并发安全问题,千万不要这么写。流的所有操作都是无状态的,数据状态仅在此函数内有效,不溢出至函数外。流的数据受函数外变量影响,则会引起线程安全问题,流的所有操作都应该是独立的。
2.将集合转换为流进行处理,会改变原集合吗?
答:拿到集合流,其实就是拿到集合深拷贝的值,流的所有操作都是流元素的引用,不影响原有集合的值。
3.线程是越多越好,还是越少越好?
答:和CPU核心一样多最好,线程多了就会去抢时间片、资源,线程少的话,处理任务可能就不够。100个线程:一个CPU核心排了很多线程,线程就要切换,切换保留线程(浪费内存,浪费时间)。线程越多,线程之间的竞争越激烈。
思路:让少量的线程一直忙,而不是大量的线程一直切换等待。在工作中不会一个员工分配一件事,100件事分配给100个员工,开销是很大的。
什么问题都可以评论区留言,看见都会回复的
如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区
多多支持吧!!!
点赞加藏评论,是对小编莫大的肯定。抱拳了!