数组阻塞队列ArrayBlockingQueue底层数据结构和使用场景

createh511小时前技术教程1

1. 数组

  • 存储元素:ArrayBlockingQueue 使用一个固定大小的数组来存储队列中的元素。在创建 ArrayBlockingQueue 实例时,需要指定数组的容量,例如:
BlockingQueue queue = new ArrayBlockingQueue<>(10);

这里创建了一个容量为 10 的 ArrayBlockingQueue,意味着该队列最多能存储 10 个元素。

  • 循环数组的实现:为了实现队列的先进先出(FIFO)特性,ArrayBlockingQueue 采用了循环数组的方式。通过两个索引变量 takeIndex 和 putIndex 分别记录队首元素和下一个可插入元素的位置。当 putIndex 到达数组末尾时,如果数组头部有空余位置,会将 putIndex 重置为 0,继续从数组头部插入元素;同理,当 takeIndex 到达数组末尾时,也会重置为 0。这种循环使用数组的方式避免了频繁的数据移动,提高了插入和删除操作的效率。

2. ReentrantLock 锁

  • 线程安全保障:ReentrantLock 是一个可重入的互斥锁,ArrayBlockingQueue 使用它来保证在多线程环境下对队列的操作是线程安全的。在进行插入、删除、获取队列大小等操作时,都会先获取锁,操作完成后再释放锁。例如,在 put 方法中:
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

这里通过 lock.lockInterruptibly() 获取锁,确保同一时刻只有一个线程能够执行插入操作,避免了多个线程同时修改队列导致的数据不一致问题。

3. Condition 条件变量

  • 阻塞和唤醒机制:ArrayBlockingQueue 内部使用了两个 Condition 对象 notEmpty 和 notFull 来实现阻塞和唤醒机制。notEmpty:当队列为空时,调用 take 方法的线程会调用 notEmpty.await() 进入等待状态,直到队列中有新元素加入。当有元素插入队列时,会调用 notEmpty.signal() 唤醒一个等待在 notEmpty 上的线程。notFull:当队列已满时,调用 put 方法的线程会调用 notFull.await() 进入等待状态,直到队列中有元素被取出。当有元素从队列中取出时,会调用 notFull.signal() 唤醒一个等待在 notFull 上的线程。

使用场景详细分析

1. 生产者 - 消费者模型

  • 解耦生产者和消费者:在生产者 - 消费者模型中,生产者和消费者的处理速度可能不一致。使用 ArrayBlockingQueue 可以将生产者和消费者解耦,生产者只需要将数据放入队列,而消费者只需要从队列中取出数据进行处理。例如,在一个订单处理系统中,订单生成线程作为生产者将新订单放入队列,订单处理线程作为消费者从队列中取出订单进行处理。这样,即使订单生成速度过快,也不会影响订单处理线程的正常运行,反之亦然。
  • 流量控制:通过设置队列的容量,可以对生产者的生产速度进行控制。当队列已满时,生产者线程会被阻塞,直到队列中有空间可用。这可以避免生产者生产过多的数据导致系统资源耗尽。

2. 线程池任务队列

  • 任务缓冲:线程池中的任务队列用于存储待执行的任务。当有新任务提交时,如果线程池中的线程都在忙碌,任务会被放入队列中等待执行。ArrayBlockingQueue 可以作为线程池的任务队列,确保任务按照提交的顺序依次执行。例如,在一个 Web 服务器中,处理客户端请求的线程池可以使用 ArrayBlockingQueue 来存储待处理的请求,避免请求过多导致服务器崩溃。
  • 控制并发度:通过设置队列的容量,可以控制线程池中的任务并发度。当队列已满时,新提交的任务会被阻塞,直到队列中有任务执行完毕。

3. 数据缓冲和异步处理

  • 提高系统性能:在数据处理系统中,数据的生产和处理可能需要不同的时间。使用 ArrayBlockingQueue 可以将数据生产和处理过程解耦,实现异步处理。例如,在一个日志收集系统中,日志收集线程将日志信息放入队列,日志处理线程从队列中取出日志进行分析和存储。这样,日志收集线程可以快速地将日志信息放入队列,而不必等待日志处理线程完成处理,提高了系统的整体性能。

实践方法详细示例

1. 基本操作示例

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个容量为 3 的 ArrayBlockingQueue
        BlockingQueue queue = new ArrayBlockingQueue<>(3);

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 5 i string data='Data-' i queue.putdata system.out.println: data thread.sleep500 catch interruptedexception e thread.currentthread.interrupt thread consumer='new' thread -> {
            try {
                for (int i = 0; i < 5; i++) {
                    String data = queue.take();
                    System.out.println("消费者取出数据: " + data);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,创建了一个容量为 3 的 ArrayBlockingQueue。生产者线程向队列中放入 5 个数据,消费者线程从队列中取出 5 个数据。由于队列容量为 3,当队列已满时,生产者线程会被阻塞,直到消费者线程从队列中取出元素,释放空间。

2. 异常处理和关闭操作

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueWithExceptionHandling {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue<>(2);

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                queue.put(1);
                queue.put(2);
                System.out.println("生产者放入两个元素");
                // 尝试放入第三个元素,会阻塞
                queue.put(3);
            } catch (InterruptedException e) {
                System.out.println("生产者线程被中断");
                Thread.currentThread().interrupt();
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(2000);
                Integer element = queue.take();
                System.out.println("消费者取出元素: " + element);
                // 中断生产者线程
                producer.interrupt();
            } catch (InterruptedException e) {
                System.out.println("消费者线程被中断");
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,生产者线程尝试向容量为 2 的队列中放入 3 个元素,当放入第三个元素时会被阻塞。消费者线程在等待 2 秒后取出一个元素,然后中断生产者线程。在实际应用中,需要正确处理 InterruptedException 异常,避免线程在阻塞状态下被中断而导致的数据不一致问题。

注意事项

  • 容量设置:在创建 ArrayBlockingQueue 时,需要根据实际需求合理设置队列的容量。如果容量设置过小,可能会导致生产者频繁阻塞;如果容量设置过大,会占用过多的内存资源。
  • 线程安全:虽然 ArrayBlockingQueue 本身是线程安全的,但在使用时需要注意避免在队列操作过程中出现死锁等问题。例如,在持有锁的情况下调用其他可能会阻塞的方法,可能会导致死锁。
  • 异常处理:在使用 put 和 take 等可能会抛出 InterruptedException 异常的方法时,需要正确处理该异常,避免线程在阻塞状态下被中断而导致程序出现异常。

相关文章

终于有人把Java程序员都要学的知识点整理了,令人茅塞顿开

JVM无论处于何种层级的 Java 从业者,JVM 皆为其在进阶之途上必然需要跨越的一道难关。不论是在日常的工作情境之中,还是在至关重要的面试环节里,JVM 均是不可或缺的必考之题。倘若对 JVM 缺...

阻塞模型将会使线程休眠,为什么 Java 线程状态却是 RUNNABLE?

使用 Java 阻塞 I/O 模型读取数据,将会导致线程阻塞,线程将会进入休眠,从而让出 CPU 的执行权,直到数据读取完成。这个期间如果使用 jstack 查看线程状态,却可以发现Java 线程状态...

技术干货分享:RabbitMQ消息积压的几种解决思路

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:消费者消费消息的速度赶不上生产速度,这种问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需...

链表阻塞队列LinkedBlockingQueue的数据结构和使用场景

一、数据结构1.底层链表节点LinkedBlockingQueue 基于单向链表实现,链表节点由内部类 Node 表示,其代码定义如下:static class Node { E item;...

Java进程突然失去响应的原因排查

Java进程突然失去响应,可能是真崩溃了,也有可能是假死。我们先要确定是不是假死。假死Java进程的假死是指Java应用程序看似仍在运行,但实际上没有任何实际的工作进行,也不响应用户请求或其他外部输入...