多线程实操&&阻塞队列

发布于:2022-12-22 ⋅ 阅读:(422) ⋅ 点赞:(0)

一、什么是阻塞队列

阻塞队列是一种特殊的队列 . 也遵守 " 先进先出 " 的原则 .
阻塞队列能是一种线程安全的数据结构 , 并且具有以下特性 :
  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
        阻塞队列的一个典型应用场景就是 " 生产者消费者模型 ". 这是一种非常典型的开发模型 .

二、生产者消费者模型

        生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
        生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.

1.解耦

         无阻塞队列的生产者消费者模型:

         像这样的"生产者"和"消费者"都是直接通信,这样二者的代码就相互依存(高耦合),“生产者”这边就需要知道“消费者”是如何接收的,相反,“消费者”也需要知道“生产者”是如何发送数据的,一方改动可能就直接导致程序奔溃。

        所以我们需要将二者解耦,才能更好的管理代码(追求高内聚,低耦合) 

         使用阻塞队列作为中间商

         使用这一模型,”生产者“只要生产了对象就直接往阻塞队列里面丢,而”消费者“只要发现阻塞队列里面有东西就直接往里面拿,”生产者“和”消费者“根本不用知道对方是谁,它们只关心”中间商“是否需要货或者是否有货,所以”生产者“这边发生了变化(代码修改)也不会影响到”消费者“的正常运行

2.削峰填谷

         阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 不会因为”生产者“线程请求突然增大而导致”消费者“线程处理不过来而导致程序奔溃。所以就算请求突然增大,也会被加进阻塞队列,”消费者“线程任然以正常的速度进行解决。
【案例】
        在 " 秒杀 " 场景下 , 服务器同一时刻可能会收到大量的支付请求 . 如果直接处理这些支付请求 , 服务器可能扛不住( 每个支付请求的处理都需要比较复杂的流程 ). 这个时候就可以把这些请求都放 到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求 . 这样做可以有效进行 " 削峰 ", 防止服务器被突然到来的一波请求直接冲垮

三、标准库中的阻塞队列

        在 Java 标准库中内置了阻塞队列 . 如果我们需要在一些程序中使用阻塞队列 , 直接使用标准库中的即可 .
  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞. 
String elem = queue.take();

生产者消费者模型

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
    Thread customer = new Thread(() -> {
        while (true) {
            try {
                int value = blockingQueue.take();
                System.out.println("消费元素: " + value);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
   }, "消费者");
    customer.start();

    Thread producer = new Thread(() -> {
        Random random = new Random();
        while (true) {
            try {
                int num = random.nextInt(1000);
                System.out.println("生产元素: " + num);
                blockingQueue.put(num);
                Thread.sleep(1000);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
   }, "生产者");

    producer.start();
    customer.join();
    producer.join();
}

四、手动实现阻塞队列

阻塞队列实现
  • 通过 "循环队列" 的方式来实现.
  • 使用 synchronized 进行加锁控制.
  • put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).
  • take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)
public class BlockingQueueDemo {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread producter = new Thread(() -> {
            int num = 0;
            while (true) {
                try {
                    num++;
                    queue.put(num);
                    System.out.println("生产了" + num + "个数");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "生产者");
        producter.start();
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int value = queue.take();
                    Thread.sleep(1000);
                    System.out.println("消费了" + value + "个数");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }, "消费者");
        customer.start();
        try {
            producter.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            customer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class MyBlockingQueue {
    private int[] container = new int[1000];
    private volatile int size = 0;
    private int first = 0;
    private int rear = 0;

    public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断阻塞队列是否为满,为满则需要等take将数据取走,才能添加新的数据
            while (size == container.length) {
                // 此处最好使用 while.
                // 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
                // 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
                // 就只能继续等待
                wait();
            }
            container[rear] = value;
            rear++;
            size++;
            //此处也可以使用求余来确定rear的值(rear=((rear +1)%length)),但是相比之下,求余数需要用到除法,效率没有直接判断是否为length的高
            if (rear == container.length) {
                rear = 0;
            }
            //由于先前阻塞队列可能是没有数据的状态,所以到这一步后,阻塞队列又重新获得了数据
            //这个时候就需要将之前试图获取元素但是发送队列内没有数据的线程唤醒
            //之后线程开始争夺锁
            notifyAll();
        }
    }

    public int take() throws InterruptedException {
        int temp = 0;
        synchronized (this) {
            //判断阻塞队列是否为空,为空则拿不到数据,需要等待put将数据加入队列
            while (size == 0) {
                // 此处最好使用 while.
                // 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,
                // 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了
                // 就只能继续等待
                wait();
            }
            temp = container[first];
            first++;
            size--;
            //此处也可以使用求余来确定first的值(first=((first+1)%length)),但是相比之下,求余数需要用到除法,效率没有直接判断是否为length的高
            if (first == container.length) {
                first = 0;
            }
            //由于先前的队列可能是满的,导致数据放不进阻塞队列,到这一步后,数据又有了空位,
            //所以需要将直接试图加入元素但是因为队满而失败的线程唤醒
            //之后,线程开始争夺锁
            notifyAll();
        }
        return temp;
    }

    public synchronized int getSize() {
        return size;
    }
}

 

本文含有隐藏内容,请 开通VIP 后查看