JAVA中多线程的经典案例

发布于:2025-04-19 ⋅ 阅读:(16) ⋅ 点赞:(0)


一、线程安全的单例模式

单例模式即代码中的某个类只能有一个实例,不能有多个 。单例模式有两种主要实现方式,饿汉模式和懒汉模式

(一)饿汉模式

 在类加载阶段就会直接创建唯一实例,步骤如下

  1. 首先使用static创建一个实例,并立即进行实例化
  • 经过static修饰的成员,准确来说应该称为“类成员”,修饰为对应的类属性或类方法
    一个JAVA程序中,一个类对象存在一份(类名.class文件被加载到JVM内存中生成的一个对象),那么类成员(static修饰的成员)也存在一份。
  1. 为了防止再重新new一个实例,需要把构造方法设置成private

  2. 提供能拿到唯一实例的方法

class Singleton{
    private static Singleton instance = new Singleton();
    private Singleton(){
    }
    public static Singleton getInstance(){
        return instance;
    }
}
public class Demo {
    public static void main(String[] args) {
        //Singleton singleton = new Singleton();//error
        Singleton singleton = Singleton.getInstance();
    }
}

线程安全看多个线程同时调用getInstance()方法时是否会出现错误,饿汉模式中的getInstance仅读取变量内容,如果多个线程同时读一个变量,此时线程是安全的

(二)懒汉模式

 懒汉模式不会立即初始化实例,而是等到使用的时候再创建。在饿汉模式的基础上进行了修改

class Singleton2{
    private static Singleton2 instance = null;
    private Singleton2(){
    }
    public static Singleton2 getInstance(){
        if(instance == null){
            instance = new Singleton2();
        }
        return instance;
    }
}
public class Demo {
    public static void main(String[] args) {
        Singleton2 singleton2 = Singleton2.getInstance();
    }
}

懒汉模式中的getInstance()方法既包含了读操作,又包含了修改操作,是非原子性的,可能导致实例被创建出多份,存在线程不安全问题

可以通过加锁将操作打包成原子的来保障线程安全,这里的类对象作为锁对象

class Singleton2{
    private static Singleton2 instance = null;
    private Singleton2(){
    }
    public static Singleton2 getInstance(){
        synchronized (Singleton2.class){
            if(instance == null){
                instance = new Singleton2();
            }
        }
        return instance;
    }
}

 线程不安全是发生在instance被初始化前,未初始化时多线程调用getInstance可能同时涉及到读和修改,一旦instance初始化之后,仅存在读操作,线程也就安全了。这样初始化后及时线程安全了,每次调用getInstance方法都需要进行加锁,从而产生锁竞争的问题
 对应的改进方案,再添加一个判定条件,让instance初始化之前进行加锁,初始化后就不进行加锁了。里层的 if 条件不能进行省略,在两个 if 条件判断的时间差内,可能存在instance的修改操作,若去掉里层的 if 那么就没有将读写操作进行原子性打包

    public static Singleton2 getInstance(){
        if(instance == null){
            synchronized (Singleton2.class){
                if(instance == null){
                    instance = new Singleton2();
                }
            }
        }
        return instance;
    }

 如果多个线程块都去调用 getInstance 方法,大量的读操作会产生编译器优化,读内存的操作变成了读寄存器的操作,这样如果第一个线程完成了对 instance 的修改,后续的线程中第一个 if 读取并不会感知到这个变化,会引起多余的加锁操作,但是内层的 if 不会引起到误判,不会因此创建多个实例,因此给instance假设volatile修饰防止多加锁操作
 最终的代码如下

class Singleton2{
    private static volatile Singleton2 instance = null;
    private Singleton2(){

    }
    public static Singleton2 getInstance(){
        if(instance == null){
            synchronized (Singleton2.class){
                if(instance == null){
                    instance = new Singleton2();
                }
            }
        }
        return instance;
    }
}

二、阻塞队列

 阻塞队列同样是一个先进先出的队列,相对于普通队列来说,阻塞队列线程安全,且产生阻塞效果
 1.如果队列为空,尝试出队列,就会出现阻塞,阻塞到队列不为空为止
 2.如果队列为满,尝试入队列,也会出现阻塞,阻塞到队列不满为止

(一)生产者消费者模型

 假设有两个服务器A、B。A 作为入口服务器直接接收用户的网络请求,B 作为应用服务器来给A提供一些数据,A B之间直接进行交互,那么就需要A B互相知道对应的接口,这样耦合性较高,因此使用生产者消费者模型就来降低耦合性
 A只需要和阻塞队列进行交互,不需要关注B,B也只需要和阻塞队列进行交易,若B发生异常或进行了替换,

在这里插入图片描述
 使用生产者消费者模型的优点:

  • 优点1:让多个服务器之间充分的解耦

  • 优点2:能够对请求进行“削峰填谷”

  • 如果未使用生产者消费者模型时,请求如果突然暴涨,A作为入口服务器计算量很轻,B作为应用服务器计算量可能很大,需要的系统资源也很多,请求增多后需要的资源进一步增加,会出现程序崩溃。
    使用生产者消费者模型后,阻塞队列的请求暴涨,B仍然按照原来的速度消费数据,不会因为A的暴涨产生影响

  • 当没有数据请求的时候,B也会继续处理阻塞队列中积压的数据

 在实际开发过程中的阻塞队列并不是一个简单的数据结构,而是一个/一组专门的服务器程序,不仅又阻塞队列的功能,而且还有数据化持久存储、支持多个数据通道、支持多节点 冗余备份、支持面板管理方便参数配置…此时的队列称为“消息队列”

(二)阻塞队列

 下面是JAVA标准库中的阻塞队列BlockingDeque

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<String> queue = new LinkedBlockingDeque<>();
        queue.put("hello");//入队列
        String s = queue.take();//出队列
    }
}

(三)自定义阻塞队列

 使用数组的方式实现阻塞队列,判断队列空还是满一般采用浪费一个额外空间,或者另创建一个size变量计数的方法,本例选择另创建一个size变量计数的方法。

  • put的阻塞条件是队列为满,put中的wait要由take来唤醒,只要take成功一个元素,队列就不满
  • take的阻塞条件是队列为空,队列为空后由put来唤醒,只要put成功后,队列为非空
  • notify在唤醒的时候是随机唤醒一个线程的
  • 生产者较慢的时候,消费者就等待着生产者进行消费;消费者较慢的时候,生产者快速填满队列出现阻塞,生产者等待消费者消费后进行生产
class MyBlockQueue{
    //保存数据本体
    private int[] data = new int[1000];
    private int size = 0;
    private int head = 0;//队首
    private int tail = 0;//队尾

    private Object locker = new Object();
    //入队列
    public void put(int value) throws InterruptedException {
        synchronized(locker){
            if(size == data.length){
                //return;
                locker.wait();
            }
            data[tail] = value;
            tail++;
            if(tail >= data.length){
                tail = 0;
            }
            size++;
            locker.notify();
        }
    }
    //出队列
    public Integer take() throws InterruptedException {
        synchronized(locker){
            if(size == 0){
                //return null;
                locker.wait();
            }
            //取出head位置的元素
            int ret = data[head];
            head++;
            if(head >=data.length){
                head = 0;
            }
            size--;
            //take 成功之后唤醒 put 中的等待
            locker.notify();
            return ret;
        }

    }
}
public class Demo {
    private static MyBlockQueue queue = new MyBlockQueue();
    public static void main(String[] args) {
        Thread producer = new Thread(()->{
           int num = 0;
           while(true){
               try {
                   System.out.println("生产者:"+num);
                   queue.put(num);
                   num++;

               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        });
        producer.start();

        Thread customer = new Thread(()->{
            while(true){
                try {
                    int num = queue.take();
                    System.out.println("消费着:"+num);
                    //Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
    }
}

三、定时器

 定时器是用于安排任务在将来的某个时间执行的一种工具,即等待一定时间后,唤醒并执行某个之前设定好的任务,系统拥有自带的定时器例如sleep(指定休眠是新建),join(休眠指定时间)

(一)定时器的使用

 自己设定定时器使用到了java.util.Timer类,包含一个主要的方法 schedule(),涉及两个参数,第一个参数描述需要定时的任务,第二个参数设定要等待的时间,TimerTask类实现了Runnable接口

    public void schedule(TimerTask task, long delay) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        sched(task, System.currentTimeMillis()+delay, 0);
    }

(二)自定义定时器

  1. 创建一个专门的类来表示定时器中的任务(类比TimeTask类)

  2. 使用优先队列组织任务,即使用一定的数据结构将任务放到一起,在执行任务的时候,需要优先执行剩余时间最小的任务,涉及到的数据结构为堆,标准库中的数据结构PriorityQueue。
    此处的队列要考虑线程安全问题,可能要在多个线程里面注册任务,同时还有一个专门的线程来取任务执行,此处的队列需要注意线程安全问题,因此采用PriorityBlockingQueue类,既带有优先级,又带有阻塞队列

  3. 实现schedule方法来注册任务到队列中

  4. 设置一个扫描线程,不断检查当前优先队列的队首元素,如果队首元素的 时间到了就执行任务,若没有到就将该任务返回队列

import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.PriorityBlockingQueue;

class MyTask{
    //任务具体要干什么
    private Runnable runnable;
    //任务具体干什么,保存任务要执行的毫秒级时间戳
    private long time;
    ///after是一个时间间隔,保存任务要执行的毫秒级时间戳
    public MyTask(Runnable runnable,long after){
        this.runnable = runnable;
        this.time = System.currentTimeMillis()+after;
    }
    public void run(){
        runnable.run();
    }
    public long getTime() {
        return time;
    }
}

class MyTimer{
    private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
    public void schedule(Runnable  runnable,long delay){
        MyTask task = new MyTask(runnable,delay);
        queue.put(task);
    }
    public MyTimer(){
        Thread t = new Thread(()->{
           while (true){
               try {
                   //首先取出队首元素
                   MyTask task = queue.take();
                   long curTime = System.currentTimeMillis();
                   if(curTime < task.getTime()){
                       //时间没到,将任务返回至队列
                       queue.put(task);
                   }
                   else {
                       //时间到了,执行这个任务
                       task.run();
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

           }
        });
        t.start();
    }
}
public class Demo {
    public static void main(String[] args) {
        MyTimer timer = new MyTimer();
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello timer");
            }
        },3000);
        System.out.println("main");
    }
}

(三)自定义定时器缺陷分析

  • 缺陷一:MyTask没有指定比较规则

  • 在上述的代码中,需要注意,设置优先队列的的类型为自定义MyTask类,并没有比较规则,因此需要实现这个Comparable接口,需要手动指定按照时间大小来比较

class MyTask implements Comparable<MyTask>{
    //任务具体要干什么
    private Runnable runnable;
    //任务具体干什么,保存任务要执行的毫秒级时间戳
    private long time;
    ///after是一个时间间隔,保存任务要执行的毫秒级时间戳
    public MyTask(Runnable runnable,long after){
        this.runnable = runnable;
        this.time = System.currentTimeMillis()+after;
    }
    public void run(){
        runnable.run();
    }
    public long getTime() {
        return time;
    }
    @Override
    public int compareTo(MyTask o) {
        return (int) (this.time- o.time);
    }
}

  • 缺陷二:忙等现象

  • 扫描线程不断检查当前优先队列的队首元素,直到达到可运行的时间,线程处于忙等状态,浪费CPU的资源。这里采用 wait(等待的时间) 的方式指定等待时间,不需要notify唤醒。
    这里使用wait,不使用sleep是因为sleep中途不能被唤醒,如果在sleep中途插入了一个时间更近的任务,就没有办法执行该任务,因此使用wait,每次插入新的任务时唤醒扫描线程

class MyTimer{
    private Object locker = new Object();
    private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
    public void schedule(Runnable  runnable,long delay){
        MyTask task = new MyTask(runnable,delay);
        queue.put(task);
        //每次插入成功后都唤醒以下扫描线程,让线程重新检查队首的任务时间到了
        synchronized (locker){
            locker.notify();
        }
    }
    public MyTimer(){
        Thread t = new Thread(()->{
           while (true){
               try {
                   //首先取出队首元素
                   MyTask task = queue.take();
                   long curTime = System.currentTimeMillis();
                   if(curTime < task.getTime()){
                       //时间没到,将任务返回至队列
                       queue.put(task);
                       //指定等待时间
                       synchronized (locker){
                           locker.wait(task.getTime()-curTime);
                       }
                   }
                   else {
                       //时间到了,执行这个任务
                       task.run();
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

           }
        });
        t.start();
    }
}

完整的代码如下

import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.PriorityBlockingQueue;

class MyTask implements Comparable<MyTask>{
    //任务具体要干什么
    private Runnable runnable;
    //任务具体干什么,保存任务要执行的毫秒级时间戳
    private long time;
    ///after是一个时间间隔,保存任务要执行的毫秒级时间戳
    public MyTask(Runnable runnable,long after){
        this.runnable = runnable;
        this.time = System.currentTimeMillis()+after;
    }
    public void run(){
        runnable.run();
    }
    public long getTime() {
        return time;
    }

    @Override
    public int compareTo(MyTask o) {
        return (int) (this.time- o.time);
    }
}

class MyTimer{
    private Object locker = new Object();
    private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
    public void schedule(Runnable  runnable,long delay){
        MyTask task = new MyTask(runnable,delay);
        queue.put(task);
        //每次插入成功后都唤醒以下扫描线程,让线程重新检查队首的任务时间到了
        synchronized (locker){
            locker.notify();
        }
    }
    public MyTimer(){
        Thread t = new Thread(()->{
           while (true){
               try {
                   //首先取出队首元素
                   MyTask task = queue.take();
                   long curTime = System.currentTimeMillis();
                   if(curTime < task.getTime()){
                       //时间没到,将任务返回至队列
                       queue.put(task);
                       //指定等待时间
                       synchronized (locker){
                           locker.wait(task.getTime()-curTime);
                       }
                   }
                   else {
                       //时间到了,执行这个任务
                       task.run();
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

           }
        });
        t.start();
    }
}
public class Demo {
    public static void main(String[] args) {
        MyTimer timer = new MyTimer();
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello timer");
            }
        },3000);
        System.out.println("main");
    }
}

四、线程池

 线程虽然比进程更轻量级,但是如果创建/销毁线程的频率进一步增加,所带来的开销也会比较大,采用线程池的方法,线程创建好放进线程池中,后续需要线程的时候直接从线程池中取,不需要向系统申请,且线程用完后不会还给系统,而是再次放入线程池以备下次使用。

(一)用户态vs内核态

在这里插入图片描述

程序代码在最上面的应用程序层来运行,这里的代码被称为“用户态”运行的代码,然而有些代码需要调用操作系统的API,进一步的逻辑就会在内核中执行,例如调用一个Aystem.out.println,本质上要经过write系统调用,进入到内核中,内核执行对应的逻辑,再控制台输出字符串。

在内核中运行的代码,称为"内核态",例如创建线程操作需要内核的支持,即在内核中创建PCB加到就绪链表中,Thread.start本质需要进入到内核态来运行,池子是用户态实现的,而把创建好的线程放到线程池中,整个过程就不需要涉及到内核态,使用用户态代码就能完成。
一般来说认为纯用户态的操作效率比经过内核态处理的操作要效率更高,代码进入了内核态后存在不可控的现象,

(二)标准库线程池

 JAVA提供的标准库线程池 ThreadPoolExecutor 包含在并发相关的包java.util.concurrent内部,其创建线程池的构造方法如下

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • int corePoolSize 表示核心线程数,int maximumPoolSize 表示最大线程数,包括核心线程和非核心线程。
  • long keepAliveTime 表述了非核心线程的存活时间
  • TimeUnit unit 表示keepAliveTime的时间单位
  • BlockingQueue workQueue 任务队列,线程池会提供一个submit方法将任务放入到线程池任务队列中
  • ThreadFactory threadFactory 线程工厂,描述线程是怎么创建出来的
  • RejectedExecutionHandler handler,拒绝策略,当任务队列满了设定的 具体策略,包括忽略最新任务、阻塞等待、丢弃最旧任务等方式

 对于设定的核心线程数量,需要根据实际的业务场景进行设定,根据不同的线程池的线程数,来观察程序处理任务的速度,和程序持有的CPU的占用率。在实际编程中,因为不同类型的程序,单个任务里CPU上的计算时间和阻塞时间是不相同的,因此需要寻求一个程序执行速度和CPU占用合理的平衡点

  • 当线程数多时,整体的速度会变快,但是CPU的占用率也会高;当线程数少时,整体的速度会变慢,但是CPU的占用率也会下降。
  • 需要考虑CPU是占用率不能太高,对于线上服务器来说需要设定一定的CPU冗余,以应对随时可能突发的情况,例如需求量暴涨等。

 标准库中提供了一个简化版本的线程池Executors,本质上是对ThreadPoolExecutor进行了一个封装,Executors类创建线程池的时候实际就是调用ThreadPoolExecutor类的构造方法来创建,Executors提供了一些默认的参数,如下所示

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
    public static void main(String[] args) {
        //创建一个固定线程数目的线程池,参数指定了线程个数
        ExecutorService pool = Executors.newFixedThreadPool(10);
        //创建一个自动扩容的线程池,会根据任务来自动进行扩容
        //Executors.newCachedThreadPool();
        //创建一个只有一个线程的线程池
        //Executors.newSingleThreadExecutor();
        //创建一个带有时器功能的线程池,类似于Timer
        //Executors.newScheduledThreadPool();

        for(int i =0;i<100;i++){
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("hello threadpool");
                }
            });
        }
    }
}

(三)自定义线程池

 创建自定义线程池的步骤如下

  1. 使用Runnable来描述任务
  2. 直接使用BlockingQueue阻塞队列来组织任务
  3. 能够描述工作线程Worker,线程循环从队列中取任务并执行
  4. 创建一个数据结构来组织若干个Worker线程
  5. 创建一个方法实现往线程池中添加任务
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;

class MyThreadPool{
    //1.描述一个任务,直接使用Runnable,不需要额外创建类了
    //2.使用一个数据结构来组织若干任务
    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    //3.描述一个线程,工作线程的功能就是从任务队列中取任务并执行
    static class Worker extends Thread{
        private BlockingQueue<Runnable> queue = null;

        public Worker(BlockingQueue<Runnable> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            //需要能够拿到上面的队列
            while(true){
                try {
                    //循环获取任务队列中的任务
                    //如果队列为空就直接阻塞,如果队列不为空,就获取里面的内容
                    Runnable runnable = queue.take();
                    //获取到之后就执行任务
                    runnable.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    //4.创建一个数据结构来组织若干个线程
    private List<Thread> workers = new ArrayList<>();

    public MyThreadPool(int n){
        //在构造方法中创建若干个线程放入上面的数组中
        for (int i=0;i<n;i++){
            Worker worker = new Worker(queue);
            worker.start();
            workers.add(worker);
        }
    }

    //5.创建一个方法,使程序员能够将任务放到线程池中
    public void submit(Runnable runnable){
        try {
            queue.put(runnable);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
public class Demo {
    public static void main(String[] args) {
        MyThreadPool pool = new MyThreadPool(10);
        for(int i=0;i<100;i++){
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Hello ThreadPool");
                }
            });
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到