1.阻塞队列的特点
我们在数据结构中学习过所谓的队列,知道队列的特点是先进先出
,只能在队列的一端进行添加元素,在队列的另一端删除元素,同样我们这里的阻塞队列也有这样的特性。
但是相比于普通的队列(Queue),阻塞队列又有一些其他方面的功能。
- 线程安全
- 产生阻塞效果
- 如果队列为空,此时尝试出队列,那么就会发生阻塞,阻塞直到队列不为空为止
- 如果队列为满,此时尝试进队列,那么就会发生阻塞,阻塞直到队列不为满为止
2.基于阻塞队列实现生产者消费者模型
2.1 生产者消费者模型简单概述
在日常开发中,处理多线程问题的一种典型的方式(生产者消费者模型)
这里举一个恰当的例子:
相信大家在家里都包过饺子吧,在我们包饺子的时候,往往需要多个人一起分工协作。
一个普通的做饺子的流程:和面,擀饺子皮,包饺子,煮饺子,其中和面和煮饺子,这两个操作不太好分工。我们现在就说擀饺子皮和包饺子,这两个任务比较好分工。
假设现在有 A , B ,C 三个人一起擀饺子和包饺子
方法一: A,B,C分别每个人都干一个饺子皮,然后在包一个饺子(但是事实上存在一个问题,我想问除了饭店的后厨,有多个擀面杖,哪家下的没事买那么多的擀面杖干啥。所以说我们假设现在只有一个擀面杖,那么A B C 三个人在擀饺子皮的时候,要么A擀饺子皮,B ,C都等着,等到A的一个饺子皮擀完之后,在交给B,然后B再擀饺子皮,C继续等着…此时我们不难发现,我们单个比方此时的擀面杖就是一个锁,此时就发生了锁冲突,并且锁冲突比较激烈)
方法二:A专门负责擀饺子皮,B和C专门负责包饺子(这是我们生活中的常见情况),那么此时A就是饺子皮的生产者,要不断的生产一些饺子皮。B C 就是饺子皮的消费者,要不断的使用/消耗饺子皮。
即A就是生产者,B C就是消费者,那么对于包饺子来说,用于方饺子皮的那个“盖帘”就是交易场所
然而此处的阻塞队列就可以生产者消费者模型中的交易场所。
2.2 生产者消费者模型应用到的场景
生产者消费者模型,是实际开发中非常有用的一种多线程开发手段,尤其是在服务器开发的场景中
假设现在有两个服务器A,B A:作为入口服务器,用于接收用户的网络请求,B:作为应用服务器,来给A提供一些数据。
2.2.1 优点一:能够让多个服务器之间根充分的解耦合
2.2.2 优点二:能够对于请求进行"削峰填谷"
在不使用阻塞队列的情况下
在使用阻塞队列的情况下
那么阻塞队列怎么会有削峰填谷的特点呢?
2.3 了解一下JAVA标准库中阻塞队列的用法
public class demo1 {
public static void main(String[] args) {
//这里的BlockingQueue是一个接口 <String>表示的是在阻塞队列中添加的元素类型,在这里实现阻塞队列有两种方式
//第一种:使用链表LinkedBolckingQueue<String>()
//第二种:使用顺序表ArrayBlockingQueue<String>()
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
try {
queue.put("hello");
queue.put("world");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
BlockingQueue中的take()方法和put()方法
2.4基于JAVA标准库中阻塞队列的特点,我们自己实现一个阻塞队列
首先我们要自己实现一个阻塞队列,那么最基本的就要先实现一个基本的队列。实现队列的方法有很多种,在这里我们使用数组的方式进行实现。博主在以前的博客中讲到过队列,在那篇文章中同样是利用数组实现的,但是是一个循环队列。
class MyBlockingQueue{
public int size; //数组的有效程度
public int head; //数组下标的头
public int tail; //数组下标尾
public int []data = new int[10000]; //在队列中使用的数组
//创建锁对象
public Object locker = new Object();
public void put(int val) throws InterruptedException {
synchronized(locker) {
if (size == data.length) {
//如果此时的队列是满的,就要进入到阻塞状态,直到队列不为满为止
locker.wait();
}
//在队列中添加元素
data[tail] = val;
tail++;
size++;
if(tail >= data.length){
tail = 0;
}
//这个唤醒操作值针对当队列中的元素,为空时,上面的几行代码,让队列中有了元素之后,唤醒这个线程
locker.notify();
}
}
public int take() throws InterruptedException {
synchronized (locker) {//我们此时可以看到在take()和put方法中每一行代码都是在操作公共的变量,既然如此,直接给整个方法加锁即可,加上synchronized就就线程安全了
//在队列中删除元素
if (size == 0) {
//如果队列中没有元素的时候,此时就处于阻塞等待状态,直到队列中有元素为止
locker.wait();
}
int ret = data[head];
head++;
size--;
//这个唤醒操作是是针对当队列为满时,上面的几行代码,让队列不未满,唤醒这个线程
locker.notify();
return ret;
}
}
}
public class demo2 {
//自己实现一个阻塞队列
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue();
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
代码解析:
同时如果locker.wait()在等待,locker.notify()就能唤醒,但是如果没有线程此时处于阻塞状态,那么此时notify()也没有任何副作用。
这里有一个问题:如果这里有三个线程都是locker所,那么notify怎么样是精准唤醒?
notify()只能唤醒随机的一个等待线程,不能做到精准,如果要想精准,就必须使用不同的锁对象,
想唤醒t1,就要o1.notify(),想要阻塞t1,就要o1.wait()
想要唤醒t2,就要o2.notify(),想要阻塞t2就要o2.wait()
我们就下来实现一个简单的生产者消费者模型:
public class demo2 {
//自己实现一个阻塞队列
public static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) throws InterruptedException {
//实现生产者消费者模型
Thread producer = new Thread(()->{
int num= 0;
while(true){
try {
queue.put(num); //在阻塞队列中添加元素
System.out.println("生产者" + num);
num++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(()->{
while(true){
//消费者
try {
System.out.println("消费者" + queue.take());
//Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}
在上述的代码中,因为在线程代码中如果没有限制阻塞时间,那么线程代码运行就很快,所以在生产者模块中,Thread.sleep(500),但是没有在消费者代码中植入阻塞时间,这是为什么呢》其实因为在生产者消费者模型中,生产者生产了几个元素,消费者才能消费几个元素,就比如说此时我生产者生产了5个元素,但是你消费者不可能消费6个元素呀,我没有那么多,即使你的消费者线程运行的多快,我生产者没有造出产品,你就要在这给我等着。
如下图:
那么如果此时给消费者线程代码中植入Thread.sleep(500)阻塞时间,又会是怎么样的呢?
其实如果在消费者线程中有了阻塞时间,那么生产者生产的元素就很快,在把整个队列占满的时候,就会进入阻塞状态,直到队列中元素不为满时,那么此时当消费者线程中的阻塞时间一到,线程处于就绪状态,就会消费数据,但是此时消费一个数据,生产者就会生产一个数据。
class MyBlockingQueue{
public int size; //数组的有效程度
public int head; //数组下标的头
public int tail; //数组下标尾
public int []data = new int[1000]; //在队列中使用的数组
//创建锁对象
public Object locker = new Object();
public void put(int val) throws InterruptedException {
synchronized(locker) {
//如果此时的队列是满的
if (size == data.length) {
locker.wait();
}
//在队列中添加元素
data[tail] = val;
tail++;
if(tail == data.length){
tail = 0;
}
size++;
locker.notify();
}
}
public int take() throws InterruptedException {
synchronized (locker) {
//在队列中删除元素
//如果此时队列中的元素为空
if (size == 0) {
locker.wait();
}
int ret = data[head];
head++;
if (head == data.length) {
head = 0;
}
size--;
locker.notify();
return ret;
}
}
}
public class demo2 {
//自己实现一个阻塞队列
public static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) throws InterruptedException {
//实现生产者消费者模型
Thread producer = new Thread(()->{
int num= 0;
while(true){
try {
queue.put(num); //在阻塞队列中添加元素
System.out.println("生产者" + num);
num++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(()->{
while(true){
//消费者
try {
System.out.println("消费者" + queue.take());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}