如何排查kafkaProducer.flush,引发的死锁阻塞问题

Kafka-client 版本 2.2.2

这里用一个demo来解释这个问题的原因和排查思路

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MessageQueueProducer {

    KafkaProducer kafkaProducer;

    public MessageQueueProducer(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public synchronized void send(final String topic, final String message) {
        ProducerRecord kafkaMessage = new ProducerRecord(topic, message);

        kafkaProducer.send(kafkaMessage, (recordMetadata, e) -> {
                    if (e == null) {
                        //success
                    } else {
                        //send "retry-topic" to retry
                        send("retry-topic", message);
                    }
                }
        );
    }

    public synchronized void close() {
        kafkaProducer.flush();
        kafkaProducer.close();
    }

}

原因分析

  1. 假设我们对KafkaProducer进行了一个简单的封装(如上)
  2. 假设有两个线程 threadA 和 threadB
  3. threadA 调用 MessageQueueProducer.close方法,close方法中的 flush本意是想,在Producer被close之前把buffer中数据一次性发送到Broker来保障数据的完整。
  4. 所有方法都有synchronized修饰,所以threadA拿到了MessageQueueProducer对象锁
  5. flush方法要等待所有的数据发送完成并收到Broker的确认

就在此刻,问题来了
假设threadB 调用了 MessageQueueProducer.send时遇到了异常,需要在Kafka的回调函数中向重试队列发送消息,进行异步重试

参看Kafka的文档,回掉函数是由Producer的ioThread进行调用的,所以此刻,ioThread开始请求 MessageQueueProducer 这个对象的对象锁,但是这个对象的锁已经被threadA占有,且threadA在等待ioThread执行所有回调,来保证消息发送完成。
所以MessageQueueProducer对象就一直被锁住了,send也send不了,close也close不了

flush方法的源码解析

RecordAccumulator.java :690

/**
 * Mark all partitions as ready to send and block until the send is complete
 */
public void awaitFlushCompletion() throws InterruptedException {
    try {
        for (ProducerBatch batch : this.incomplete.copyAll())
        //所有未完成的消息,挨个等待完成确认
            batch.produceFuture.await();
    } finally {
        this.flushesInProgress.decrementAndGet();
    }
}



produceFuture :ProduceRequestResult

    /**
     * Mark this request as complete and unblock any threads waiting on its completion.
     */
    public void done() {
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }

    /**
     * Await the completion of this request
     */
    public void await() throws InterruptedException {
        latch.await();
    }
可以发现CountDownLatch 只有在done方法中会执行countDown,看一下done方法是不是被ioThread调用的

Sender.completeBatch --> ProducerBatch.done --> ProducerBatch.completeFutureAndFireCallbacks  -->
produceFuture.done

 
复制代码

问题排查的思路

现象

很明显,就发现调用MessageQueueProducer.send方法的线程都HANG住了,没法继续执行。
上面例子中就是threadA 和 threadB都处于 BLOCKED状态。

告警

告警对于程序的健壮太重要了,上面的情况如果没有告警,可能难于发现,告警指标可以根据业务需要来配置,比如定时produce消息的线程十分钟还不发一条消息就告警。

排查

1.到机器上启动Arthas attch当前应用

2.执行thread -all,查看所有线程的状态

3.可以看到threadA 和 threadB都是阻塞状态

4.执行thread -b

5.Arthas找到 阻塞其他线程的罪魁祸首,也就是threadA。打印threadA的堆栈

6.jstack -l pid 拿到所有线程的堆栈

7.对比被threadA 阻塞的线程数量和 Arthas输出的信息是否一致。

原文链接:
https://juejin.cn/post/6994349871960424462

相关文章

一篇文章搞懂同步与异步、阻塞与非阻塞

要想掌握好Java NIO需要涉及了解同步与异步、阻塞与非阻塞,本文通过相关例子让你深入理解其本质@mikechen阻塞阻塞与非阻塞是对同一个线程来说的,在某个时刻,线程要么处于阻塞,要么处于非阻塞。...

数组阻塞队列ArrayBlockingQueue底层数据结构和使用场景

1. 数组存储元素:ArrayBlockingQueue 使用一个固定大小的数组来存储队列中的元素。在创建 ArrayBlockingQueue 实例时,需要指定数组的容量,例如:BlockingQu...

终于有人把Java程序员都要学的知识点整理了,令人茅塞顿开

JVM无论处于何种层级的 Java 从业者,JVM 皆为其在进阶之途上必然需要跨越的一道难关。不论是在日常的工作情境之中,还是在至关重要的面试环节里,JVM 均是不可或缺的必考之题。倘若对 JVM 缺...

阻塞模型将会使线程休眠,为什么 Java 线程状态却是 RUNNABLE?

使用 Java 阻塞 I/O 模型读取数据,将会导致线程阻塞,线程将会进入休眠,从而让出 CPU 的执行权,直到数据读取完成。这个期间如果使用 jstack 查看线程状态,却可以发现Java 线程状态...

JAVA并发之BlockingQueue(阻塞队列)

Java从JDK5开始在并发包内引入了BlockingQueue(阻塞队列),它除了提供队列的FIFO功能之外,还提供了额外的功能,例如:当获取队列内容时发现队列为空,则等待其变为非空。当往队列存储内...

技术干货分享:RabbitMQ消息积压的几种解决思路

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:消费者消费消息的速度赶不上生产速度,这种问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需...