Queue
Queue(队列)是一种
先进先出(FIFO)
的数据结构。它继承自 Collection接口,因此具有集合的基本操作,同时还提供了额外的 插入、提取和检查
操作,每种操作都有两种形式: 一种在操作失败时抛出异常,另一种返回一个特殊值(null或false,取决于操作)
,专门用于处理需要按顺序访问元素的场景(如任务调度、缓冲管理等)
Queue 接口
public interface Queue<E> extends Collection<E> {}
Queue 核心特性
先进先出(FIFO)
最先添加的元素最先被移除
Queue<String> queue = new LinkedList<>();
// 插入元素
queue.offer("A");
queue.offer("B");
queue.offer("C");
System.out.println(queue);//[A, B, C]
// 移除元素 从头开始
queue.poll();
System.out.println(queue); //[B, C]
两种工作操作模式
1.抛出异常
操作失败时抛出异常(如队列为空时删除)
Queue<String> queue = new LinkedList<>();
try {
System.out.println("remove (空队列): " + queue.remove());
} catch (NoSuchElementException e) {
System.out.println("remove 抛出异常: " + e.getClass().getSimpleName());
}
// 打印输出:remove 抛出异常: NoSuchElementException
2.返回特殊值
操作失败时返回 null 或 false(推荐使用
)
返回null
Queue<String> queue = new LinkedList<>();
//移除队首
System.out.println("poll(空队列): " + queue.poll());//poll (空队列): null
返回false
Queue<Integer> boundedQueue = new ArrayBlockingQueue<>(2);
System.out.println("offer 结果: " + boundedQueue.offer(1)); //offer结果: true
System.out.println("offer 结果: " + boundedQueue.offer(2)); //offer结果: true
System.out.println("offer (队列满): " + boundedQueue.offer(3)); //offer(队列满):false
Queue 核心方法
插入元素
add(E e) 插入元素,成功返回 true;队列满时抛出 IllegalStateException
offer(E e) 插入元素,成功返回 true;队列满时返回 false(推荐)
移除元素
remove() 移除并返回头部元素;队列为空时抛出 NoSuchElementException
poll() 移除并返回头部元素;队列为空时返回 null(推荐)
检查头部元素(不移除)
element() 返回头部元素;队列为空时抛出 NoSuchElementException
peek() 返回头部元素;队列为空时返回 null(推荐)
设计原则:
- "抛出异常"方法在操作失败时抛出异常(如队列满/空)
- "返回特殊值"方法在操作失败时返回
false
或null
- 最佳实践:优先使用
offer()/poll()/peek()
方法
抽象类AbstractQueue
是为实现 Queue 接口而设计的一个基础骨架
,
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {}
1.继承AbstractCollection:表明拥有Collection集合通用默认实现功能,如 addAll
2.实现 Queue接口:简化自定义队列实现或标准队列实现,提供默认通用方法的实现逻辑
AbstractQueue 关键方法及其实现策略
1.提供通用实现方法
提供了 add, remove, element, clear, addAll 等方法的符合 Queue 接口规范的默认实现
add(E e): 调用 offer(e)
如果 offer 返回 true,则 add 也返回 true
如果 offer 返回 false(通常表示队列满且有容量限制),则add抛出 IllegalStateException
remove(): 调用 poll()
如果 poll() 返回非 null 元素,则 remove() 返回该元素
如果 poll() 返回 null(队列为空),则 remove() 抛出 NoSuchElementException
element(): 调用 peek()
如果 peek() 返回非 null 元素,则 element() 返回该元素
如果 peek() 返回 null(队列为空),则 element() 抛出 NoSuchElementException
clear(): 通过反复调用 poll() 直到返回 null 来清空队列
addAll(Collection<? extends E> c):
遍历传入的集合 c,对每个元素调用 add(e)(即 offer(e),失败则抛异常)
如果集合c中的元素全部成功添加,则返回 true;
如果在添加过程中任何元素导致 add 失败(抛出异常)
2.子类必须实现的方法
Queue接口有offer, poll, peek 这三个最核心的操作,而AbstractQueue里面没有这三个方法,要求子类仅实现 offer, poll, peek 这三个最核心的操作,
AbstractQueue 重要注意事项
1.抽象类: 不能直接实例化 AbstractQueue。必须创建它的子类。
2.非线程安全: AbstractQueue 本身不提供任何同步机制。具体子类需要自行线程安全方式
3.核心方法依赖: 所有默认实现都依赖于子类正确实现的 offer, poll, peek。如果这三个方法实现有误,整个队列的行为都会出错。
4.addAll 的原子性: 默认的 addAll 实现不是原子的。如果添加过程中某个元素导致 add 失败(抛出异常),队列可能只包含部分集合 c 中的元素。子类如需要可以覆盖此方法以实现原子性添加
5.阻塞与非阻塞: AbstractQueue 及其默认实现都是非阻塞的(offer 失败返回 false, poll 空时返回 null)。Java 并发包中实现了 BlockingQueue 接口的队列(如 ArrayBlockingQueue, LinkedBlockingQueue)也继承自 AbstractQueue,但它们通过覆盖方法并添加等待机制提供了阻塞操作(put, take)
Queue 子接口 双端队列Deque接口
Queue 子接口 阻塞队列BlockingDeque 接口
Queue是非安全的,阻塞队列BlockingDeque 接口及其实现类为其提供了线程安全的阻塞操作
实现类 | 特性 | 容量 |
---|---|---|
ArrayBlockingQueue |
数组实现的有界阻塞队列,公平锁可选 | 有界 |
LinkedBlockingQueue |
链表实现的可选有界阻塞队列(默认 Integer.MAX_VALUE ) |
可选有界 |
PriorityBlockingQueue |
无界优先级阻塞队列(基于 PriorityQueue ) |
无界 |
DelayQueue |
存放 Delayed 元素的无界队列,元素到期才能取出 |
无界 |
SynchronousQueue |
不存储元素的阻塞队列,每个插入操作必须等待对应的移除操作 | 容量0 |
Queue 实现类 LinkedList
整个集合框架中最灵活的类之一LinkedList详情介绍链接
Queue 实现类 PriorityQueue
//默认容量11 依赖元素的自然顺序(元素类需实现 Comparable 接口)
PriorityQueue<E> pq = new PriorityQueue<>();
//指定初始容量
PriorityQueue<E> pq = new PriorityQueue<>(int initialCapacity);
//自定义比较器comparator
PriorityQueue<E> pq = new PriorityQueue<>(Comparator<? super E> comparator);
//指定容量 + 比较器
PriorityQueue<E> pq = new PriorityQueue<>(int initialCapacity,
Comparator<? super E> comparator);
// 从集合初始化,按迭代出的顺序添加元素到队列中
PriorityQueue<E> pq = new PriorityQueue<>(Collection<? extends E> c);
// 从有序集合初始化 直接继承 SortedSet 的比较规则
PriorityQueue<E> pq = new PriorityQueue<>(SortedSet<? extends E> ss);
从有序集合初始化 示例
SortedSet<Integer> sortedSet = new TreeSet<>(Comparator.reverseOrder());
sortedSet.addAll(Arrays.asList(5, 1, 8));
PriorityQueue<Integer> pq = new PriorityQueue<>(sortedSet); // 使用逆序比较器
System.out.println(pq);//[8, 5, 1]
PriorityQueue 底层数据及实现原理
二叉堆结构(以最小堆为例)
// 数组存储结构示例 (下标从0开始)
[2, 4, 10, 8, 7, 15, 20]
// 对应二叉堆:
// 2
// / \
// 4 10
// / \ / \
// 8 7 15 20
二叉堆的结构及操作实现逻辑的详情介绍
插入元素 上浮 (Sift Up)操作
删除元素 下沉 (Sift Down)操作
PriorityQueue 方法
1. 添加元素
boolean add(E e)
或 boolean offer(E e)
add()
在队列满时抛出异常(但 PriorityQueue
是无界的,不会发生)
时间复杂度:O(log n)
PriorityQueue<Integer> pq = new PriorityQueue<>();
pq.add(5); // 添加元素
pq.offer(3); // 添加元素(推荐)
2. 获取队首元素
E peek()
返回队首元素(优先级最高的元素)但不移除,队列为空时返回 null
时间复杂度:O(1)
Integer head = pq.peek(); // 查看最小元素(不删除)
3. 移除队首元素
E poll()
移除并返回队首元素,队列为空时返回 null
时间复杂度:O(log n)
Integer removed = pq.poll(); // 移除并返回最小元素
4. 移除特定元素
boolean remove(Object o)
从队列中移除指定元素(如果存在)
时间复杂度:O(n)(需要遍历查找)
boolean isRemoved = pq.remove(5); // 移除元素5(如果存在)
5. 队列信息查询
int size()
int size = pq.size(); // 返回元素数量
boolean isEmpty()
boolean empty = pq.isEmpty(); // 检查队列是否为空
boolean contains(Object o)
boolean hasElement = pq.contains(3); // 检查元素是否存在
6. 批量操作
void clear()
pq.clear(); // 清空队列
boolean addAll(Collection<? extends E> c)
pq.addAll(Arrays.asList(8, 2, 6)); // 添加多个元素
7. 迭代器
Iterator<E> iterator()
注意:迭代器不保证按优先级顺序
返回元素
Iterator<Integer> it = pq.iterator();
while (it.hasNext()) {
System.out.print(it.next() + " ");
}
PriorityQueue 核心特性
1.PriorityQueue是优先级队列,基于堆(通常是最小堆或最大堆
)实现。
2.元素不是按FIFO顺序处理,而是按优先级
(通过自然顺序或比较器决定)。
3.常用操作的时间复杂度:插入O(log n),删除O(log n),获取队头元素O(1)。
4.非线程安全
,多线程环境下应使用PriorityBlockingQueue。
5.不允许null元素
PriorityQueue 注意事项
1.迭代顺序不等于处理顺序,而是按照 优先级
PriorityQueue<Integer> pq = new PriorityQueue<>();
pq.addAll(List.of(5, 1, 3));
System.out.println(pq); // 可能输出 [1, 5, 3] (堆结构)
while (!pq.isEmpty()) {
System.out.print(pq.poll() + " "); // 正确输出: 1 3 5
}
2.对象必须实现比较规则
- 未实现
Comparable
且未提供Comparator
会抛出ClassCastException
3.线程安全替代方案
PriorityQueue 非线程安全的
// 多线程环境使用阻塞版本
BlockingQueue<Task> safeQueue = new PriorityBlockingQueue<>();
4.扩容开销
频繁插入海量数据时,预初始化容量可减少扩容次数
new PriorityQueue<>(initialCapacity);
5.空值限制:PriorityQueue 不允许插入 null(抛出 NullPointerException)
PriorityQueue 使用示例代码
// 自定义比较器:按学生成绩降序
PriorityQueue<Student> pq = new PriorityQueue<>(
Comparator.comparingInt(Student::getScore).reversed()
);
pq.add(new Student("Alice", 85));
pq.add(new Student("Bob", 92));
pq.add(new Student("Charlie", 78));
while (!pq.isEmpty()) {
System.out.println(pq.poll()); // 按成绩从高到低输出
}
Queue 实现类 ConcurrentLinkedQueue(非阻塞线程安全类)
ConcurrentLinkedQueue
是 Java 并发包 (java.util.concurrent
) 中提供的一个线程安全的无界非阻塞队列,采用 FIFO(先进先出)原则排序元素。它是专为高并发环境设计的队列实现
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
ConcurrentLinkedQueue 数据结构及实现原理
节点结构
// 简化版内部节点结构
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {UNSAFE.putObject(this, itemOffset, item);}
// CAS 操作
boolean casItem(E cmp, E val) { /*...*/ }
void lazySetNext(Node<E> val) { /*...*/ }
boolean casNext(Node<E> cmp, Node<E> val) { /*...*/ }
// ...
}
//ConcurrentLinkedQueue 队列头尾指针分离
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
队列由节点(Node)构成,每个节点包含一个元素(item)和指向下一个节点的引用(next)。
队列有两个指针:head和tail,分别指向头节点和尾节点。注意,tail并不总是尾节点(为了减少CAS次数,采用了一种松弛策略),但通过算法保证从tail开始一定能找到真正的尾节点
实现原理
基于 Michael & Scott 非阻塞队列算法 实现
1.使用 CAS (Compare and Swap) 原子操作
2.基于链表结构(单向链表):"头尾分离"设计:head
指向队列头部,tail
指向队列尾部(可能滞后,非实时更新)
已入队offer方法介绍其步骤逻辑
a.创建一个新节点,新节点的item为待添加的元素e,next为null。
b.从当前tail开始向后查找真正的尾节点(因为tail可能滞后,即tail.next可能不为null)。
c.通过CAS操作将新节点链接到尾节点的next域(即把尾节点的next指向新节点)。
d.如果CAS成功,则尝试更新tail指针(这里不要求一定成功,因为失败的话后续操作会帮助推进tail)。
e.如果CAS失败(有其他线程已经修改了尾节点的next),则重新开始步骤b
关键点:
1.入队操作永远在队列的尾部添加节点。
2.使用CAS操作来保证原子性更新。
3.更新tail指针的策略:并不是每次入队都更新tail,而是间隔更新(每两次入队更新一次tail),这样可以减少CAS竞争,提高性能
流程代码查看
//1.初始状态 队列:10 → 20
head → Node1(item=10, next=Node2)
tail → Node2(item=20, next=null)
// 线程A添加元素42
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
queue.add(10); // 初始元素
queue.add(20); // 初始元素
queue.add(42); // 添加新元素
步骤1:创建新节点
Node<Integer> newNode = new Node<>(42);
步骤2:获取当前尾指针
Node<E> t = tail; // t → Node2
Node<E> p = t; // p → Node2
步骤3:定位实际尾节点
Node<E> q = p.next; // q = null (实际尾节点)
步骤4:尝试 CAS 设置
// 尝试将Node2的next从null设置为newNode
if (p.casNext(null, newNode)) {
// CAS 成功!
}
步骤5:条件性更新 tail
// 检查tail是否落后(p != t)
if (p != t) {
// 尝试将tail从Node2更新到Node3(newNode)
casTail(t, newNode);
}
最终状态:
head → Node1(item=10, next=Node2)
tail → Node3(item=42, next=null) // 可能更新
↑
Node2(item=20, next=Node3)
// 队列:10 → 20 → 42
CAS 操作的关键实现
// 原子性更新next指针
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 条件性更新tail指针
boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
ConcurrentLinkedQueue 核心特性
- 线程安全:通过无锁算法(
基于CAS 操作
)实现,高并发下性能优于LinkedBlockingQueue
- 非阻塞:操作不会导致线程阻塞
- 无界队列:理论上可以无限扩展(受内存限制)
- FIFO 顺序:先进先出的元素处理顺序
- 高性能:在高并发场景下优于阻塞队列
LinkedBlockingQueue
- 不允许 null 元素:插入 null 会抛出
NullPointerException
- 内存可见性: 遵循 happens-before 原则线程 A 添加元素 → 线程 B 获取该元素保证可见性
- 弱一致性方法:
size()
和contains()
结果可能不精确,迭代器是"弱一致性"的,可能反映创建后的修改,也可能不反应
ConcurrentLinkedQueue 操作方法
入队操作
boolean add(E e)
:添加元素(调用 offer
)
boolean offer(E e)
:添加元素(推荐使用)
出队操作
E poll()
:检索并移除队头元素(队列为空返回 null)
E remove()
:检索并移除队头元素(队列为空抛出异常)
查看操作
E peek()
:查看队头元素但不移除(队列为空返回 null)
E element()
:查看队头元素但不移除(队列为空抛出异常)
其他方法
boolean isEmpty()
:检查队列是否为空
int size()
:返回队列大小(注意:高并发下可能不精确)
boolean contains(Object o)
:检查元素是否存在(注意:可能因并发修改失效)
Iterator<E> iterator()
:获取弱一致性的迭代器
ConcurrentLinkedQueue 总结
注意事项
1.避免使用 size()
判断空队列:
// 错误方式(size()可能过时)
if (queue.size() > 0) { ... }
// 正确方式
if (!queue.isEmpty()) { ... } // 但仍有瞬时状态问题
// 最佳实践:直接处理poll结果
E element = queue.poll();
if (element != null) { ... }
2.迭代器弱一致性:
Iterator<String> it = queue.iterator();
while (it.hasNext()) {
// 可能遗漏或包含迭代开始后的修改
System.out.println(it.next());
}
3.批量操作非原子性
// 以下操作不是原子的
if (!queue.isEmpty()) {
String item = queue.poll(); // 其他线程可能在此修改队列
}
适用场景
1.高并发生产者-消费者模型
2.任务调度系统
3.事件处理系统
4.需要非阻塞数据结构的场景
5.当队列大小不重要或不需要精确控制时## 与阻塞队列对比
特性 | ConcurrentLinkedQueue | LinkedBlockingQueue |
---|---|---|
阻塞行为 | 非阻塞 | 阻塞(可指定超时) |
边界 | 无界 | 可选有界/无界 |
锁机制 | 无锁(CAS) | 双锁(putLock/takeLock) |
高并发性能 | 更高 | 良好 |
size() 精确性 | 弱一致性(不精确) | 精确 |
内存消耗 | 每个节点更多对象开销 | 较少对象开销 |
性能总结
操作 | 时间复杂度 | 备注 |
---|---|---|
offer() |
O(1) | 平均常数时间 |
poll() |
O(1) | 平均常数时间 |
peek() |
O(1) | 常数时间 |
size() |
O(n) | 需要遍历整个队列 |
contains() |
O(n) | 需要遍历整个队列 |
整体总结
- ✅ 优点:高并发性能好、非阻塞、线程安全
- ⚠️ 局限:size() 和 contains() 不精确、无界队列可能内存溢出
- 💡 最佳实践:适合高吞吐场景,避免依赖队列大小判断
在需要高吞吐、非阻塞队列的场景下,ConcurrentLinkedQueue
通常是比阻塞队列更好的选择,特别适合在生产者-消费者模型中使用
ConcurrentLinkedQueue 使用示例示例
public class CLQExample {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 100; i++) {
queue.offer(i);
System.out.println("Produced: " + i);
}
}).start();
// 消费者线程
new Thread(() -> {
while (true) {
Integer item = queue.poll();
if (item != null) {
System.out.println("Consumed: " + item);
}
// 添加适当休眠避免CPU空转
try { Thread.sleep(10); } catch (InterruptedException e) {}
}
}).start();
}
}
Queue及其实现类总结
实现类性能对比
特性 | ArrayDeque |
LinkedList |
PriorityQueue |
ArrayBlockingQueue |
---|---|---|---|---|
入队/出队时间复杂度 | O(1) | O(1) | O(log n) | O(1) |
随机访问 | ❌ | ⚠️ O(n) | ❌ | ❌ |
内存占用 | 低 | 高 | 中 | 固定 |
扩容机制 | 动态扩容×2 | 无 | 动态扩容 | 固定大小 |
线程安全 | ❌ | ❌ | ❌ | ✅ |
null 支持 | ❌ | ✅ | ❌ | ❌ |
最佳实践指南
1 单线程环境:
- 首选
ArrayDeque
(高性能,低内存) - 需要列表功能时选
LinkedList
- 优先级处理用
PriorityQueue
2.多线程环境:
- 生产者-消费者模型 →
BlockingQueue
实现类 - 高并发非阻塞 →
ConcurrentLinkedQueue
- 延迟任务 →
DelayQueue
3.设计注意事项:
// 错误用法:LinkedList 作为队列时的随机访问
Queue<Integer> q = new LinkedList<>();
q.add(1);
q.add(2);
int first = q.get(0); // 违反队列设计原则!
// 正确用法:仅使用队列标准操作
int head = q.poll(); // 正确:移除并返回头元素
4.阻塞队列选择原则:
- 需要公平性 →
ArrayBlockingQueue(true)
- 大容量缓冲 →
LinkedBlockingQueue(capacity)
- 直接传递 →
SynchronousQueue
- 延迟任务 →
DelayQueue