目录
线程简介
线程的概念
说起线程,不可避免的我们要提一句进程。那么什么是进程呢?操作系统在运行一个程序的时候,就会为其创建一个进程。例如,我们启动一个java程序,那就会产生一个java进程。
进程:是系统进行分配和管理资源的基本单位。
有了进程,线程就出现了。在一个进程里可以创建多个线程,这些线程拥有各自的堆栈和局部变量的属性,并且可以访问共享的内存变量。
线程:进程的一个执行单元,是进程内调度的实体,是cpu调度和分派的基本单位,是比进程更小的独立运行的基本单位。
一个程序至少有一个进程,一个进程至少有一个线程。
一个java程序从main方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程的参与,但实际上java程序天生就是多线程程序,因为执行main方法的是一个名称为main的线程。我们可以使用JMX来查看一个java程序包含哪些线程。
public static void main(String[] args) {
//获取java线程管理MXBean
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
//不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false,false);
//遍历线程信息,仅打印线程id和线程名称信息
for (ThreadInfo threadInfo: threadInfos) {
System.out.println("["+threadInfo.getThreadId()+"]"+ threadInfo.getThreadName());
}
}
输出如下图所示内容(输出的内容可能不同)
由此可以看出,一个java程序的运行不仅仅是main方法的运行,而是main线程和多个其他线程的同时运行。
线程的状态
状态名称 | 说明 |
初始(NEW) | 新创建了一个线程对象,但还没有调用start()方法 |
运行(RUNNABLE) | 处于可运行状态的线程正在JVM中执行,但它可能正在等待来自操作系统的其他资源,例如处理器。 |
阻塞(BLOCKED) | 线程阻塞于synchronized锁,等待获取synchronized锁的状态。 |
等待(WAITING) | Object.wait()、join()、 LockSupport.park(),进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。 |
超时等待(TIME_WAITING) | Object.wait(long)、Thread.join()、LockSupport.parkNanos()、LockSupport.parkUntil,该状态不同于WAITING,它可以在指定的时间内自行返回。 |
终止(TERMINATED) | 表示该线程已经执行完毕。 |
下面就用我们上一篇提到的jstack工具来查看代码运行时的线程信息,更加深入理解线程状态。
package com.demo.controller;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.TimeUnit;
public class Demo {
public static void main(String[] args) {
new Thread(new TimeWaiting(),"超时等待线程").start();
new Thread(new Waiting(),"等待线程").start();
new Thread(new Blocked(),"阻塞线程1").start();
new Thread(new Blocked(),"阻塞线程2").start();
}
/**
* 该线程不断进行睡眠
*/
static class TimeWaiting implements Runnable {
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 该线程在waiting.class实例上等待
*/
static class Waiting implements Runnable {
@Override
public void run() {
while (true){
synchronized (Waiting.class){
try {
Waiting.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 该线程在Blocked.class实例上加锁后,不会释放该锁
*/
static class Blocked implements Runnable {
@Override
public void run() {
synchronized (Blocked.class){
while (true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
运行该程序,打开终端输入jps,输出如下
可以看到运行代码对应的进程id为11976,于是我们输入jatack 11976,输出如下
可以看到超时等待线程的状态为TIMED_WAITING
等待线程在java.lang.Class@6275b223实例上的WAITING
阻塞线程1的状态TIMED_WAITING
阻塞线程2是在java.lang.Class@437fc1b0实例上的BLOCKED, 拥有者: 阻塞线程1
由图可知,线程创建之后,调用start()方法开始运行。当线程执行wait()方法之后,线程进入等待状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而超时等待状态相当于在等待状态的基础上增加了超时限制,也就是超时时间到达时会返回到运行状态。当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到阻塞状态。线程在执行Runnable的run方法之后将会进入到终止状态。
创建线程方式
- 继承Thread,并重写父类的run方法
- 实现Runable接口,并实现run方法
- 使用匿名内部类
- Lambda表达式
- 线程池
下面针对以上几种方式用代码实现
1.继承Thread,输出为“线程demo”
public class ThreadDemo extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
ThreadDemo threadDemo = new ThreadDemo();
threadDemo.setName("线程demo");
threadDemo.start();
}
}
2. 实现Runable接口,输出为“线程demo”
public class ThreadDemo1 implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new ThreadDemo1());
thread.setName("线程demo");
thread.start();
}
}
3. 使用匿名内部类,输出为“Thread-0”
/**
* 匿名内部类的方式
*/
public class MyThread {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
thread.start();
}
}
4. Lambda表达式,输出为“Thread-0”
public class LambadaThread {
public static void main(String[] args) {
new Thread(()->{
System.out.println(Thread.currentThread().getName());
}).start();
}
}
5.线程池,输出为“pool-1-thread-1”
public class ThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
线程的优先级
线程的优先级告诉程序该线程的重要程度有多大。如果有大量线程都被堵塞,都在等候运行,程序会尽可能地先运行优先级的那个线程。 但是,这并不表示优先级较低的线程不会运行。若线程的优先级较低,只不过表示它被准许运行的机会小一些而已。
线程分配到的时间片多少也决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器资源的线程属性。
线程的优先级设置可以为1-10的任一数值,在线程构建的时候可以通过setPriority(int)方法来修改优先级,Thread类中定义了三个线程优先级,分别是:MIN_PRIORITY(1)、NORM_PRIORITY(5)、MAX_PRIORITY(10),默认优先级是5。
package com.demo.controller;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Priority {
private static volatile boolean notStart = true;
private static volatile boolean notEnd = true;
public static void main(String[] args) throws InterruptedException {
List<Job> jobs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
Job job = new Job(priority);
jobs.add(job);
Thread thread = new Thread(job,"Thread:"+i);
thread.setPriority(priority);
thread.start();
}
notStart = false;
TimeUnit.SECONDS.sleep(10);
notEnd = false;
for (Job job:jobs) {
System.out.println("Job priority:"+job.priority+", Count:"+job.jobCount);
}
}
static class Job implements Runnable {
private int priority;
private long jobCount;
public Job(int priority){
this.priority = priority;
}
@Override
public void run() {
while (notStart){
Thread.yield();
}
while (notEnd){
Thread.yield();
jobCount++;
}
}
}
}
运行该程序,可以得到输出如下:
由此可以看出,线程优先级没有生效,优先级1和优先级10的job计数的结果很接近,没有明显差距。这表示程序正确性不能依赖线程的优先级高低。
线程的启动和终止
线程初始化
在运行线程之前首先要构造一个线程对象,并提供构造所需要的属性,以下代码为Thread中的init方法源码
/**
* Allocates a new {@code Thread} object. This constructor has the same
* effect as {@linkplain #Thread(ThreadGroup,Runnable,String) Thread}
* {@code (null, null, gname)}, where {@code gname} is a newly generated
* name. Automatically generated names are of the form
* {@code "Thread-"+}<i>n</i>, where <i>n</i> is an integer.
*/
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
/**
* Initializes a Thread.
*
* @param g the Thread group
* @param target the object whose run() method gets called
* @param name the name of the new Thread
* @param stackSize the desired stack size for the new thread, or
* zero to indicate that this parameter is to be ignored.
* @param acc the AccessControlContext to inherit, or
* AccessController.getContext() if null
* @param inheritThreadLocals if {@code true}, inherit initial values for
* inheritable thread-locals from the constructing thread
*/
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
Thread parent = currentThread(); //当前线程就是该线程的父线程
SecurityManager security = System.getSecurityManager();
if (g == null) {
/* Determine if it's an applet or not */
/* If there is a security manager, ask the security manager
what to do. */
if (security != null) {
g = security.getThreadGroup();
}
/* If the security doesn't have a strong opinion of the matter
use the parent thread group. */
if (g == null) {
g = parent.getThreadGroup();
}
}
/* checkAccess regardless of whether or not threadgroup is
explicitly passed in. */
g.checkAccess();
/*
* Do we have the required permissions?
*/
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
//将daemon、priority属性设置为父线程对应属性
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
//将父线程的inheritableThreadLocals复制过来
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
在上述代码中,我们可以知道一个新构造的线程对象是由其父线程来进行空间分配的,子线程会继承父线程的属性和加载资源的contextClassLoader以及可继承的ThreadLocal,同时还会分配一个唯一的id来标识这个子线程。至此,一个线程对象就创建完成了,并在对内存中等待运行。
启动线程
线程对象完成初始化后,调用start()方法就可以启动这个线程。通过调用Thread类的start()方法来启动一个线程,通过源码注释也可以清晰的知道。“Causes this thread to begin execution; the Java Virtual Machine calls the <code>run</code> method of this thread.”
被废弃的suspend()、resume()、stop()
线程的暂停、恢复、停止操作对应在线程Thread的API就是suspend()、resume()和stop()。
被废弃的方法 thread.suspend() 该方法不会释放线程所占用的资源。如果使用该方法将某个线程挂起,则可能会使其他等待资源的线程死锁。
thread.resume() 方法本身并无问题,但是不能独立于suspend()方法存在。可以使用的方法 wait() 暂停执行、放弃已经获得的锁、进入等待状态。
stop() 废弃方法,开发中建议不要使用。因为一调用,线程就立刻停止,此时有可能引发相应的线程安全性问题。
用一个代码来更加清晰的理解这三个方法。
package thread;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new RunnableThread(),"打印线程");
thread.setDaemon(true);
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.suspend();
System.out.println("main方法暂停线程,输出停止");
TimeUnit.SECONDS.sleep(3);
thread.resume();
System.out.println("main方法恢复线程,输出继续");
TimeUnit.SECONDS.sleep(3);
thread.stop();
System.out.println("main方法终止线程,输出停止");
}
static class RunnableThread implements Runnable {
@Override
public void run() {
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
while (true){
System.out.println(Thread.currentThread().getName() + ", 执行时间:" + dateFormat.format(new Date()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
在执行的过程中,thread运行了3秒,随后被暂停,3秒后恢复,经过3秒后被终止。虽然suspend()、resume()和stop()方法实现了线程的暂停、恢复和终止工作,但是这些API是被废弃的方法,因此也不建议使用,这里只做了解就好,暂停和恢复操作可以使用等待/通知机制来代替。
不建议使用的原因:以suspend()方法为例,在调用后,线程不会释放已经占有的资源,而是占有资源进入睡眠状态,容易引发死锁问题。同样,stop()方法在终止一个线程时不会保证线程的资源正常释放,因此会导致程序问题可能工作在不确定状态下。正是由于这些方法带来的问题,因此才不建议使用。
终止线程
中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作,其他线程通过调用该线程的interrupt()方法对其进行中断操作。
线程通过检查自身是否被中断来进行响应,通过isInterrupted()来判断。因此,我们采用中断操作来终止线程执行。
package thread;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new RunnableThread(),"计数线程");
thread.start();
//睡眠3秒,main线程对thread进行中断,使thread能够感知中断而结束
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
System.out.println("main方法暂停线程,输出停止");
RunnableThread runnableThread = new RunnableThread();
Thread thread2 = new Thread(runnableThread,"计数线程2");
thread2.start();
//睡眠3秒,main线程对runnableThread进行取消,是runnableThread感知到flag为false而结束
TimeUnit.SECONDS.sleep(3);
runnableThread.cancel();
System.out.println("main方法取消线程,输出停止");
}
static class RunnableThread implements Runnable {
private long i;
private volatile boolean flag = true;
@Override
public void run() {
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
while (flag && !Thread.currentThread().isInterrupted()){
i ++;
}
System.out.println("i=" + i + ", 执行时间:" + dateFormat.format(new Date()));
}
public void cancel(){
flag = false;
}
}
}
输出结果如下所示
由代码可知,main方法使用中断操作Thread.interrupt()方法和cancel()均可以使thread终止,这种通过标识位或者中断操作的方式能够使线程在终止的时候释放资源,更加安全。
线程间的通信
等待/通知机制
方法名称 | 说明 |
notify() | 通知一个在对象上等待的线程,使其从wati()方法返回。即随机唤醒一个等待的线程。 |
notifyAll() | 唤醒所有在该对象上等待的线程。 |
wait() | 调用该方法的现场进行WAITING状态,只有等待其他线程的通知或被中断才会返回,wait()会释放锁资源 |
wait(long) | 超时等待一段时间,单位毫秒,即等待n毫秒,如果没有通知就超时返回 |
等待/通知机制,就是指一个线程A调用了对象o的wait()方法进入等待状态,另一个线程B调用了对象o的notify()或者notifyAll()方法,线程A收到通知后从对象o的wait()方法返回,进行后续操作。
package thread;
public class Demo1 {
private static volatile boolean flag = false;
public static void main(String[] args) throws InterruptedException {
Object obj = new Object();
new Thread(()->{
while (!flag){
synchronized (obj){
try {
System.out.println("false");
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("true");
}).start();
new Thread(()->{
while (!flag){
synchronized (obj){
try {
System.out.println("false");
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("true");
}).start();
Thread.sleep(1000L);
new Thread(()->{
flag = true;
synchronized (obj){
obj.notifyAll();
}
}).start();
}
}
输出为一下内容:
这段代码主要是介绍了notifyAll()方法,两个线程在对象o上进入等待状态,notifyAll唤醒了所有的等待线程。
⚠️注意:
- 使用wait()、notify()、notifyAll()时需要先对调用对象加锁
- 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
- notify()、notifyAll()方法调用后,等待县城依旧不会从wait()返回,需要调用notify()或notifyAll()的线程释放锁之后,等待线程才有机会从wait()返回。
- notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
- 从wait()方法返回的前提是获得了调用对象的锁。
等待/通知经典范式
等待方(消费者)和通知方(生产者)
等待方遵循以下原则:
- 获取对象锁。
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
- 条件满足则执行对应逻辑。
对应的伪代码:
synchronized (obj){ while(条件){ obj.wait(); } 处理逻辑 }
通知方遵循原则:
- 获取对象锁
- 改变条件
- 通知所有等待在对象上的线程
对应的伪代码:
synchronized (obj){ 改变条件 obj.notifyAll(); }
结合等待/通知机制部分的实例代码,也可以看出代码遵循了以上原则。
使用管道流通信
管道输入/输出流和普通的输入/输出流的区别在于,它是以内存为媒介,用于线程之间的数据传输。主要有面向字节:【PipedOutputStream、PipedInputStream】、面向字符【PipedReader、PipedWriter】
package thread;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class Piped {
public static void main(String[] args) throws InterruptedException, IOException {
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream();
//将输出流和输入流进行连接,否则在使用的时候会抛出IOException
pipedOutputStream.connect(pipedInputStream);
new Thread(new Reader(pipedInputStream)).start();
BufferedReader bufferedReader = null;
try{
bufferedReader = new BufferedReader(new InputStreamReader(System.in));
pipedOutputStream.write(bufferedReader.readLine().getBytes());
}finally {
pipedOutputStream.close();
if (bufferedReader != null){
bufferedReader.close();
}
}
}
static class Reader implements Runnable {
private PipedInputStream pipedInputStream;
public Reader(PipedInputStream pipedInputStream){
this.pipedInputStream = pipedInputStream;
}
@Override
public void run() {
if (pipedInputStream != null){
String collect = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
System.out.println(Thread.currentThread().getName() + ", 输出:" + collect);
}
try {
pipedInputStream.close();
} catch (Exception e){
}
}
}
}
运行该代码,输入字符串,可以看到被thread进行了原样输出。输出内容如下:
Thread.join()的源码及使用
使用场景:线程A执行到一半,需要一个数据,这个数据需要线程B去执行修改,只有B修改完成之后,A才能继续操作线程A的run方法里面,调用线程B的join方法,这个时候,线程A会等待线程B运行完成之后,再接着运行。
package thread;
public class JoinDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
System.out.println(Thread.currentThread().getName() +"开始运行");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"结束运行");
},"线程1");
new Thread(()->{
System.out.println(Thread.currentThread().getName() +"开始运行");
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"结束运行");
},"线程2").start();
}
}
输出结果如下:
从上述输出可以看到,线程2终止的前提是前驱线程也就是线程1的终止,线程2等待线程1终止后,才从join()方法返回,这里涉及了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)。
源码分析:
//加锁当前线程对象
public final synchronized void join(long millis)
throws InterruptedException {
//获取当前系统时间
long base = System.currentTimeMillis();
//定义now
long now = 0;
//进行判断,等待时间肯定不能小于0,否则抛出异常
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
//等于0表示无限制
if (millis == 0) {
//条件不满足,继续等待
while (isAlive()) {
wait(0);
}
} else {
//循环判断,条件符合,方法返回
while (isAlive()) {
//计算时间差
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
ThreadLocal的使用
ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。为每个线程单独存放一份变量副本,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。
只要线程处于活动状态并且ThreadLocal实例可访问,那么每个线程都拥有对其本地线程副本的隐式引用变量。一个线程消失后,它的所有副本线程局部实例受垃圾回收(除非其他存在对这些副本的引用)
可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。
package thread;
public class ThreadLocalDemo {
ThreadLocal<Integer> num = ThreadLocal.withInitial(()-> 0);
/**
* 自增并输出num值
*/
public void create(){
Integer myNum = num.get();
myNum++;
System.out.println(Thread.currentThread().getName() +",myNum:"+myNum);
num.set(myNum);
}
public static void main(String[] args) throws InterruptedException {
ThreadLocalDemo threadLocalDemo = new ThreadLocalDemo();
for (int i = 1; i < 3; i++) {
new Thread(()->{
while(true){
threadLocalDemo.create();
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
输出如下内容:
由上述输出可知,ThreadLocal为每一个线程都单独存放一份变量副本。
好啦,本篇的内容分享就到这里了,觉得不错的同学记得收藏点赞支持一下哦~~拜拜