【多线程】阻塞队列&生产者消费者模型

发布于:2023-02-15 ⋅ 阅读:(550) ⋅ 点赞:(0)

1.阻塞队列的特点

我们在数据结构中学习过所谓的队列,知道队列的特点是先进先出只能在队列的一端进行添加元素,在队列的另一端删除元素,同样我们这里的阻塞队列也有这样的特性。

但是相比于普通的队列(Queue),阻塞队列又有一些其他方面的功能。

  1. 线程安全
  2. 产生阻塞效果
    • 如果队列为空,此时尝试出队列,那么就会发生阻塞,阻塞直到队列不为空为止
    • 如果队列为满,此时尝试进队列,那么就会发生阻塞,阻塞直到队列不为满为止

2.基于阻塞队列实现生产者消费者模型

2.1 生产者消费者模型简单概述

在日常开发中,处理多线程问题的一种典型的方式(生产者消费者模型)

这里举一个恰当的例子:

相信大家在家里都包过饺子吧,在我们包饺子的时候,往往需要多个人一起分工协作。

一个普通的做饺子的流程:和面,擀饺子皮,包饺子,煮饺子,其中和面和煮饺子,这两个操作不太好分工。我们现在就说擀饺子皮和包饺子,这两个任务比较好分工。

假设现在有 A , B ,C 三个人一起擀饺子和包饺子

方法一: A,B,C分别每个人都干一个饺子皮,然后在包一个饺子(但是事实上存在一个问题,我想问除了饭店的后厨,有多个擀面杖,哪家下的没事买那么多的擀面杖干啥。所以说我们假设现在只有一个擀面杖,那么A B C 三个人在擀饺子皮的时候,要么A擀饺子皮,B ,C都等着,等到A的一个饺子皮擀完之后,在交给B,然后B再擀饺子皮,C继续等着…此时我们不难发现,我们单个比方此时的擀面杖就是一个锁,此时就发生了锁冲突,并且锁冲突比较激烈)

方法二:A专门负责擀饺子皮,B和C专门负责包饺子(这是我们生活中的常见情况),那么此时A就是饺子皮的生产者,要不断的生产一些饺子皮。B C 就是饺子皮的消费者,要不断的使用/消耗饺子皮。

即A就是生产者,B C就是消费者,那么对于包饺子来说,用于方饺子皮的那个“盖帘”就是交易场所

在这里插入图片描述

然而此处的阻塞队列就可以生产者消费者模型中的交易场所。

2.2 生产者消费者模型应用到的场景

生产者消费者模型,是实际开发中非常有用的一种多线程开发手段,尤其是在服务器开发的场景中

假设现在有两个服务器A,B A:作为入口服务器,用于接收用户的网络请求,B:作为应用服务器,来给A提供一些数据。

2.2.1 优点一:能够让多个服务器之间根充分的解耦合

在这里插入图片描述
在这里插入图片描述

2.2.2 优点二:能够对于请求进行"削峰填谷"

在不使用阻塞队列的情况下

在这里插入图片描述

在使用阻塞队列的情况下
在这里插入图片描述
那么阻塞队列怎么会有削峰填谷的特点呢?

在这里插入图片描述
在这里插入图片描述

2.3 了解一下JAVA标准库中阻塞队列的用法

public class demo1 {
    public static void main(String[] args) {
        //这里的BlockingQueue是一个接口 <String>表示的是在阻塞队列中添加的元素类型,在这里实现阻塞队列有两种方式
        //第一种:使用链表LinkedBolckingQueue<String>()
        //第二种:使用顺序表ArrayBlockingQueue<String>()
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        try {
            queue.put("hello");
            queue.put("world");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

BlockingQueue中的take()方法和put()方法 在这里插入图片描述

在这里插入图片描述

2.4基于JAVA标准库中阻塞队列的特点,我们自己实现一个阻塞队列

首先我们要自己实现一个阻塞队列,那么最基本的就要先实现一个基本的队列。实现队列的方法有很多种,在这里我们使用数组的方式进行实现。博主在以前的博客中讲到过队列,在那篇文章中同样是利用数组实现的,但是是一个循环队列。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

class MyBlockingQueue{
    public int size; //数组的有效程度
    public int head; //数组下标的头
    public int tail; //数组下标尾
    public int []data = new int[10000]; //在队列中使用的数组
    //创建锁对象
    public Object locker = new Object();
    public void put(int val) throws InterruptedException {
        synchronized(locker) {
            if (size == data.length) {
                 //如果此时的队列是满的,就要进入到阻塞状态,直到队列不为满为止
                locker.wait();
            }
            //在队列中添加元素
            data[tail] = val;
            tail++;
            size++;
            if(tail >= data.length){
                tail = 0;
            }
            //这个唤醒操作值针对当队列中的元素,为空时,上面的几行代码,让队列中有了元素之后,唤醒这个线程
            locker.notify();
        }
    }
    public int take() throws InterruptedException {
        synchronized (locker) {//我们此时可以看到在take()和put方法中每一行代码都是在操作公共的变量,既然如此,直接给整个方法加锁即可,加上synchronized就就线程安全了
            //在队列中删除元素
            if (size == 0) {
                //如果队列中没有元素的时候,此时就处于阻塞等待状态,直到队列中有元素为止
                locker.wait();
            }
            int ret = data[head];
            head++;
            size--;
            //这个唤醒操作是是针对当队列为满时,上面的几行代码,让队列不未满,唤醒这个线程
            locker.notify();
            return ret;
        }
    }
}
public class demo2 {
    //自己实现一个阻塞队列
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue queue = new MyBlockingQueue();
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }
}

代码解析:
在这里插入图片描述

同时如果locker.wait()在等待,locker.notify()就能唤醒,但是如果没有线程此时处于阻塞状态,那么此时notify()也没有任何副作用。

这里有一个问题:如果这里有三个线程都是locker所,那么notify怎么样是精准唤醒?

notify()只能唤醒随机的一个等待线程,不能做到精准,如果要想精准,就必须使用不同的锁对象,

想唤醒t1,就要o1.notify(),想要阻塞t1,就要o1.wait()

想要唤醒t2,就要o2.notify(),想要阻塞t2就要o2.wait()

我们就下来实现一个简单的生产者消费者模型:

public class demo2 {
    //自己实现一个阻塞队列
    public static MyBlockingQueue queue = new MyBlockingQueue();
    public static void main(String[] args) throws InterruptedException {

        //实现生产者消费者模型
        Thread producer = new Thread(()->{
            int num= 0;
            while(true){
                try {
                    queue.put(num); //在阻塞队列中添加元素
                    System.out.println("生产者" + num);
                    num++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        Thread customer = new Thread(()->{
            while(true){
                //消费者
                try {
                    System.out.println("消费者" + queue.take());
                    //Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
    }
}

在上述的代码中,因为在线程代码中如果没有限制阻塞时间,那么线程代码运行就很快,所以在生产者模块中,Thread.sleep(500),但是没有在消费者代码中植入阻塞时间,这是为什么呢》其实因为在生产者消费者模型中,生产者生产了几个元素,消费者才能消费几个元素,就比如说此时我生产者生产了5个元素,但是你消费者不可能消费6个元素呀,我没有那么多,即使你的消费者线程运行的多快,我生产者没有造出产品,你就要在这给我等着。

如下图:

请添加图片描述

那么如果此时给消费者线程代码中植入Thread.sleep(500)阻塞时间,又会是怎么样的呢?

其实如果在消费者线程中有了阻塞时间,那么生产者生产的元素就很快,在把整个队列占满的时候,就会进入阻塞状态,直到队列中元素不为满时,那么此时当消费者线程中的阻塞时间一到,线程处于就绪状态,就会消费数据,但是此时消费一个数据,生产者就会生产一个数据。

class MyBlockingQueue{
    public int size; //数组的有效程度
    public int head; //数组下标的头
    public int tail; //数组下标尾
    public int []data = new int[1000]; //在队列中使用的数组
    //创建锁对象
    public Object locker = new Object();
    public void put(int val) throws InterruptedException {
        synchronized(locker) {
            //如果此时的队列是满的
            if (size == data.length) {
                locker.wait();
            }
            //在队列中添加元素
            data[tail] = val;
            tail++;
            if(tail == data.length){
                tail = 0;
            }
            size++;
            locker.notify();
        }
    }
    public int take() throws InterruptedException {
        synchronized (locker) {
            //在队列中删除元素
            //如果此时队列中的元素为空
            if (size == 0) {
                locker.wait();
            }
            int ret = data[head];
            head++;
            if (head == data.length) {
                head = 0;
            }
            size--;
            locker.notify();
            return ret;
        }
    }
}
public class demo2 {
    //自己实现一个阻塞队列
    public static MyBlockingQueue queue = new MyBlockingQueue();
    public static void main(String[] args) throws InterruptedException {

        //实现生产者消费者模型
        Thread producer = new Thread(()->{
            int num= 0;
            while(true){
                try {
                    queue.put(num); //在阻塞队列中添加元素
                    System.out.println("生产者" + num);
                    num++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        Thread customer = new Thread(()->{
            while(true){
                //消费者
                try {
                    System.out.println("消费者" + queue.take());
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        customer.start();
    }
}

请添加图片描述