链表阻塞队列LinkedBlockingQueue的数据结构和使用场景
一、数据结构
1.底层链表节点
LinkedBlockingQueue 基于单向链表实现,链表节点由内部类 Node 表示,其代码定义如下:
static class Node {
E item;
Node next;
Node(E x) { item = x; }
}
- item:用于存储实际的元素。
- next:指向下一个节点的引用,通过该引用将各个节点连接成一个单向链表。
2.核心属性
// 队列容量,默认是 Integer.MAX_VALUE
private final int capacity;
// 当前队列中元素的数量
private final AtomicInteger count = new AtomicInteger();
// 队列头节点,head.item 始终为 null
transient Node head;
// 队列尾节点,last.next 始终为 null
private transient Node last;
// 出队锁,用于控制出队操作的线程安全
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时,出队线程在该条件上等待
private final Condition notEmpty = takeLock.newCondition();
// 入队锁,用于控制入队操作的线程安全
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时,入队线程在该条件上等待
private final Condition notFull = putLock.newCondition();
- capacity:队列的最大容量,在创建队列时可以指定,如果不指定则默认为 Integer.MAX_VALUE。
- count:使用 AtomicInteger 来记录队列中元素的数量,保证在多线程环境下对元素数量的统计是线程安全的。
- head 和 last:分别指向队列的头部和尾部节点。head.item 始终为 null,这是为了简化出队操作的逻辑;last.next 始终为 null。
- takeLock 和 putLock:分别用于控制出队和入队操作的线程安全。使用两把锁可以使入队和出队操作可以并行进行,提高并发性能。
- notEmpty 和 notFull:分别是出队锁和入队锁的条件变量,用于实现线程的等待和唤醒机制。
3.入队和出队操作
- 入队操作:当调用 put 方法时,如果队列已满,当前线程会在 notFull 条件上等待;如果队列未满,会创建一个新的节点并将其添加到队列尾部,然后唤醒可能正在等待出队的线程。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
- 出队操作:当调用 take 方法时,如果队列为空,当前线程会在 notEmpty 条件上等待;如果队列不为空,会从队列头部取出一个节点并返回其元素,然后唤醒可能正在等待入队的线程。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
二、使用场景
1.生产者 - 消费者模型
在生产者 - 消费者模型中,生产者负责生成数据并将其放入队列,消费者负责从队列中取出数据进行处理。使用 LinkedBlockingQueue 可以实现生产者和消费者之间的解耦,提高系统的并发处理能力。
import java.util.concurrent.LinkedBlockingQueue;
class Producer implements Runnable {
private final LinkedBlockingQueue queue;
public Producer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final LinkedBlockingQueue queue;
public Consumer(LinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
LinkedBlockingQueue queue = new LinkedBlockingQueue<>(5);
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
}
}
2.消息队列
在分布式系统中,不同的服务之间需要进行异步通信。LinkedBlockingQueue 可以作为一个简单的消息队列,用于在不同的服务或模块之间传递消息。例如,一个日志收集系统中,日志生成模块作为生产者将日志消息放入队列,日志存储模块作为消费者从队列中取出日志消息并存储到磁盘或数据库中。
3.线程池任务队列
线程池通常会使用一个任务队列来存储待执行的任务。LinkedBlockingQueue 可以作为线程池的任务队列,当提交的任务数量超过线程池的核心线程数时,多余的任务会被放入队列中等待执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
LinkedBlockingQueue taskQueue = new LinkedBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, // 线程空闲时间
TimeUnit.SECONDS,
taskQueue
);
for (int i = 0; i < 20 i final int taskid='i;' executor.submit -> {
System.out.println("Executing task: " + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
三、最佳实践
合理设置队列容量
- 如果队列容量设置过小,当生产者速度较快时,队列会频繁满员,导致生产者线程频繁阻塞,影响系统性能。
- 如果队列容量设置过大,会占用过多的内存资源,可能导致系统内存不足。因此,需要根据实际的业务场景和系统资源情况,合理设置队列的容量。
异常处理
在使用 put 和 take 方法时,这些方法会抛出 InterruptedException 异常,需要在代码中正确处理该异常。通常的做法是在捕获异常后,恢复线程的中断状态,避免丢失中断信息。
try {
queue.put(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
避免锁竞争
虽然 LinkedBlockingQueue 使用了两把锁来提高并发性能,但在高并发场景下,仍然可能会出现锁竞争的问题。可以通过以下方式来减少锁竞争:
- 调整生产者和消费者的数量,避免过多的线程同时竞争锁。
- 优化业务逻辑,减少入队和出队操作的频率。
监控队列状态
在实际应用中,需要对队列的状态进行监控,例如队列的大小、是否已满、是否为空等。可以通过定时打印队列状态信息或者使用监控工具来实现。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class QueueMonitoringExample {
public static void main(String[] args) {
LinkedBlockingQueue queue = new LinkedBlockingQueue<>(10);
new Thread(() -> {
try {
while (true) {
System.out.println("Queue size: " + queue.size());
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
资源管理
在使用完 LinkedBlockingQueue 后,不需要手动释放资源,但如果使用了与队列相关的线程池等资源,需要在合适的时机调用 shutdown 方法来关闭线程池,避免资源泄漏。
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}