链表阻塞队列LinkedBlockingQueue的数据结构和使用场景

createh515小时前技术教程1

一、数据结构

1.底层链表节点

LinkedBlockingQueue 基于单向链表实现,链表节点由内部类 Node 表示,其代码定义如下:

static class Node {
    E item;
    Node next;
    Node(E x) { item = x; }
}
  • item:用于存储实际的元素。
  • next:指向下一个节点的引用,通过该引用将各个节点连接成一个单向链表。

2.核心属性

// 队列容量,默认是 Integer.MAX_VALUE
private final int capacity;
// 当前队列中元素的数量
private final AtomicInteger count = new AtomicInteger();
// 队列头节点,head.item 始终为 null
transient Node head;
// 队列尾节点,last.next 始终为 null
private transient Node last;
// 出队锁,用于控制出队操作的线程安全
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时,出队线程在该条件上等待
private final Condition notEmpty = takeLock.newCondition();
// 入队锁,用于控制入队操作的线程安全
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时,入队线程在该条件上等待
private final Condition notFull = putLock.newCondition();
  • capacity:队列的最大容量,在创建队列时可以指定,如果不指定则默认为 Integer.MAX_VALUE。
  • count:使用 AtomicInteger 来记录队列中元素的数量,保证在多线程环境下对元素数量的统计是线程安全的。
  • head 和 last:分别指向队列的头部和尾部节点。head.item 始终为 null,这是为了简化出队操作的逻辑;last.next 始终为 null。
  • takeLock 和 putLock:分别用于控制出队和入队操作的线程安全。使用两把锁可以使入队和出队操作可以并行进行,提高并发性能。
  • notEmpty 和 notFull:分别是出队锁和入队锁的条件变量,用于实现线程的等待和唤醒机制。

3.入队和出队操作

  • 入队操作:当调用 put 方法时,如果队列已满,当前线程会在 notFull 条件上等待;如果队列未满,会创建一个新的节点并将其添加到队列尾部,然后唤醒可能正在等待出队的线程。
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
  • 出队操作:当调用 take 方法时,如果队列为空,当前线程会在 notEmpty 条件上等待;如果队列不为空,会从队列头部取出一个节点并返回其元素,然后唤醒可能正在等待入队的线程。
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

二、使用场景

1.生产者 - 消费者模型

在生产者 - 消费者模型中,生产者负责生成数据并将其放入队列,消费者负责从队列中取出数据进行处理。使用 LinkedBlockingQueue 可以实现生产者和消费者之间的解耦,提高系统的并发处理能力。

import java.util.concurrent.LinkedBlockingQueue;

class Producer implements Runnable {
    private final LinkedBlockingQueue queue;

    public Producer(LinkedBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                queue.put(i);
                System.out.println("Produced: " + i);
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final LinkedBlockingQueue queue;

    public Consumer(LinkedBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer item = queue.take();
                System.out.println("Consumed: " + item);
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ProducerConsumerExample {
    public static void main(String[] args) {
        LinkedBlockingQueue queue = new LinkedBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));
        producerThread.start();
        consumerThread.start();
    }
}

2.消息队列

在分布式系统中,不同的服务之间需要进行异步通信。LinkedBlockingQueue 可以作为一个简单的消息队列,用于在不同的服务或模块之间传递消息。例如,一个日志收集系统中,日志生成模块作为生产者将日志消息放入队列,日志存储模块作为消费者从队列中取出日志消息并存储到磁盘或数据库中。

3.线程池任务队列

线程池通常会使用一个任务队列来存储待执行的任务。LinkedBlockingQueue 可以作为线程池的任务队列,当提交的任务数量超过线程池的核心线程数时,多余的任务会被放入队列中等待执行。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        LinkedBlockingQueue taskQueue = new LinkedBlockingQueue<>(10);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // 核心线程数
                5, // 最大线程数
                60, // 线程空闲时间
                TimeUnit.SECONDS,
                taskQueue
        );
        for (int i = 0; i < 20 i final int taskid='i;' executor.submit -> {
                System.out.println("Executing task: " + taskId);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        executor.shutdown();
    }
}

三、最佳实践

合理设置队列容量

  • 如果队列容量设置过小,当生产者速度较快时,队列会频繁满员,导致生产者线程频繁阻塞,影响系统性能。
  • 如果队列容量设置过大,会占用过多的内存资源,可能导致系统内存不足。因此,需要根据实际的业务场景和系统资源情况,合理设置队列的容量。

异常处理

在使用 put 和 take 方法时,这些方法会抛出 InterruptedException 异常,需要在代码中正确处理该异常。通常的做法是在捕获异常后,恢复线程的中断状态,避免丢失中断信息。

try {
    queue.put(item);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

避免锁竞争

虽然 LinkedBlockingQueue 使用了两把锁来提高并发性能,但在高并发场景下,仍然可能会出现锁竞争的问题。可以通过以下方式来减少锁竞争:

  • 调整生产者和消费者的数量,避免过多的线程同时竞争锁。
  • 优化业务逻辑,减少入队和出队操作的频率。

监控队列状态

在实际应用中,需要对队列的状态进行监控,例如队列的大小、是否已满、是否为空等。可以通过定时打印队列状态信息或者使用监控工具来实现。

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class QueueMonitoringExample {
    public static void main(String[] args) {
        LinkedBlockingQueue queue = new LinkedBlockingQueue<>(10);
        new Thread(() -> {
            try {
                while (true) {
                    System.out.println("Queue size: " + queue.size());
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

资源管理

在使用完 LinkedBlockingQueue 后,不需要手动释放资源,但如果使用了与队列相关的线程池等资源,需要在合适的时机调用 shutdown 方法来关闭线程池,避免资源泄漏。

executor.shutdown();
try {
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        executor.shutdownNow();
    }
} catch (InterruptedException e) {
    executor.shutdownNow();
}

相关文章

一篇文章搞懂同步与异步、阻塞与非阻塞

要想掌握好Java NIO需要涉及了解同步与异步、阻塞与非阻塞,本文通过相关例子让你深入理解其本质@mikechen阻塞阻塞与非阻塞是对同一个线程来说的,在某个时刻,线程要么处于阻塞,要么处于非阻塞。...

数组阻塞队列ArrayBlockingQueue底层数据结构和使用场景

1. 数组存储元素:ArrayBlockingQueue 使用一个固定大小的数组来存储队列中的元素。在创建 ArrayBlockingQueue 实例时,需要指定数组的容量,例如:BlockingQu...

终于有人把Java程序员都要学的知识点整理了,令人茅塞顿开

JVM无论处于何种层级的 Java 从业者,JVM 皆为其在进阶之途上必然需要跨越的一道难关。不论是在日常的工作情境之中,还是在至关重要的面试环节里,JVM 均是不可或缺的必考之题。倘若对 JVM 缺...

阻塞模型将会使线程休眠,为什么 Java 线程状态却是 RUNNABLE?

使用 Java 阻塞 I/O 模型读取数据,将会导致线程阻塞,线程将会进入休眠,从而让出 CPU 的执行权,直到数据读取完成。这个期间如果使用 jstack 查看线程状态,却可以发现Java 线程状态...

JAVA并发之BlockingQueue(阻塞队列)

Java从JDK5开始在并发包内引入了BlockingQueue(阻塞队列),它除了提供队列的FIFO功能之外,还提供了额外的功能,例如:当获取队列内容时发现队列为空,则等待其变为非空。当往队列存储内...

技术干货分享:RabbitMQ消息积压的几种解决思路

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:消费者消费消息的速度赶不上生产速度,这种问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需...