如何排查kafkaProducer.flush,引发的死锁阻塞问题

createh54个月前 (03-12)技术教程43

Kafka-client 版本 2.2.2

这里用一个demo来解释这个问题的原因和排查思路

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MessageQueueProducer {

    KafkaProducer kafkaProducer;

    public MessageQueueProducer(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public synchronized void send(final String topic, final String message) {
        ProducerRecord kafkaMessage = new ProducerRecord(topic, message);

        kafkaProducer.send(kafkaMessage, (recordMetadata, e) -> {
                    if (e == null) {
                        //success
                    } else {
                        //send "retry-topic" to retry
                        send("retry-topic", message);
                    }
                }
        );
    }

    public synchronized void close() {
        kafkaProducer.flush();
        kafkaProducer.close();
    }

}

原因分析

  1. 假设我们对KafkaProducer进行了一个简单的封装(如上)
  2. 假设有两个线程 threadA 和 threadB
  3. threadA 调用 MessageQueueProducer.close方法,close方法中的 flush本意是想,在Producer被close之前把buffer中数据一次性发送到Broker来保障数据的完整。
  4. 所有方法都有synchronized修饰,所以threadA拿到了MessageQueueProducer对象锁
  5. flush方法要等待所有的数据发送完成并收到Broker的确认

就在此刻,问题来了
假设threadB 调用了 MessageQueueProducer.send时遇到了异常,需要在Kafka的回调函数中向重试队列发送消息,进行异步重试

参看Kafka的文档,回掉函数是由Producer的ioThread进行调用的,所以此刻,ioThread开始请求 MessageQueueProducer 这个对象的对象锁,但是这个对象的锁已经被threadA占有,且threadA在等待ioThread执行所有回调,来保证消息发送完成。
所以MessageQueueProducer对象就一直被锁住了,send也send不了,close也close不了

flush方法的源码解析

RecordAccumulator.java :690

/**
 * Mark all partitions as ready to send and block until the send is complete
 */
public void awaitFlushCompletion() throws InterruptedException {
    try {
        for (ProducerBatch batch : this.incomplete.copyAll())
        //所有未完成的消息,挨个等待完成确认
            batch.produceFuture.await();
    } finally {
        this.flushesInProgress.decrementAndGet();
    }
}



produceFuture :ProduceRequestResult

    /**
     * Mark this request as complete and unblock any threads waiting on its completion.
     */
    public void done() {
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }

    /**
     * Await the completion of this request
     */
    public void await() throws InterruptedException {
        latch.await();
    }
可以发现CountDownLatch 只有在done方法中会执行countDown,看一下done方法是不是被ioThread调用的

Sender.completeBatch --> ProducerBatch.done --> ProducerBatch.completeFutureAndFireCallbacks  -->
produceFuture.done

 
复制代码

问题排查的思路

现象

很明显,就发现调用MessageQueueProducer.send方法的线程都HANG住了,没法继续执行。
上面例子中就是threadA 和 threadB都处于 BLOCKED状态。

告警

告警对于程序的健壮太重要了,上面的情况如果没有告警,可能难于发现,告警指标可以根据业务需要来配置,比如定时produce消息的线程十分钟还不发一条消息就告警。

排查

1.到机器上启动Arthas attch当前应用

2.执行thread -all,查看所有线程的状态

3.可以看到threadA 和 threadB都是阻塞状态

4.执行thread -b

5.Arthas找到 阻塞其他线程的罪魁祸首,也就是threadA。打印threadA的堆栈

6.jstack -l pid 拿到所有线程的堆栈

7.对比被threadA 阻塞的线程数量和 Arthas输出的信息是否一致。

原文链接:
https://juejin.cn/post/6994349871960424462

相关文章

一篇文章搞懂同步与异步、阻塞与非阻塞

要想掌握好Java NIO需要涉及了解同步与异步、阻塞与非阻塞,本文通过相关例子让你深入理解其本质@mikechen阻塞阻塞与非阻塞是对同一个线程来说的,在某个时刻,线程要么处于阻塞,要么处于非阻塞。...

技术干货分享:RabbitMQ消息积压的几种解决思路

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:消费者消费消息的速度赶不上生产速度,这种问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需...

链表阻塞队列LinkedBlockingQueue的数据结构和使用场景

一、数据结构1.底层链表节点LinkedBlockingQueue 基于单向链表实现,链表节点由内部类 Node 表示,其代码定义如下:static class Node { E item;...

Java进程突然失去响应的原因排查

Java进程突然失去响应,可能是真崩溃了,也有可能是假死。我们先要确定是不是假死。假死Java进程的假死是指Java应用程序看似仍在运行,但实际上没有任何实际的工作进行,也不响应用户请求或其他外部输入...

面试官:NIO非阻塞网络编程原理了解吗?一文深度讲解避坑

NIO非阻塞网络编程原理1、NIO基本介绍Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输...

奇安信一面:咱们聊一聊什么叫做阻塞队列的有界和无界?

面试官:什么叫做阻塞队列的有界和无界?能否结合源码分析一下?并站在架构设计的角度谈谈它们的作用。我:好的,这个问题很有意思,我来为您详细解释一下。1. 阻塞队列的有界和无界(1) 有界阻塞队列定义:有...