java线程池实现原理以及应用场景说明

createh52周前 (12-16)技术教程19

java线程池实现原理

一、核心组件解析

1、ThreadPoolExecutor 类: 它是线程池管理器的核心类,负责创建、销毁和管理线程。

核心方法:

  • execute(Runnable command): 将任务提交给线程池执行。
  • shutdown(): 开始关闭线程池,不再接受新任务,但会继续执行已提交的任务。
  • shutdownNow(): 立即关闭线程池,并尝试停止所有正在执行的任务。

关键属性:

  • corePoolSize: 核心线程数,即使没有任务,也会保持这些线程存活。
  • maximumPoolSize: 最大线程数,当任务队列满且核心线程数不足时,会创建新的线程,但不超过最大线程数。
  • keepAliveTime: 空闲线程的存活时间,当线程数超过核心线程数时,空闲线程会存活一段时间,如果超过该时间,则会被回收。
  • workQueue: 任务队列,用于存放等待执行的任务。
  • threadFactory: 线程工厂,用于创建新的线程。
  • handler: 拒绝策略,当线程池无法处理新任务时,会调用拒绝策略处理该任务。

2、Worker 类: 实际执行任务的线程类,每个 Worker 线程都持有一个 Runnable 对象,用来执行任务。

关键方法:

  • run(): 线程启动后,会调用该方法,从任务队列中获取任务并执行。
  • isLocked(): 判断当前线程是否正在执行任务。
  • processWork(Runnable task): 执行任务,并处理任务执行完后的状态。

3、BlockingQueue 接口: 任务队列,用于存放等待执行的任务。

常用实现:

  • ArrayBlockingQueue: 基于数组实现的有界队列,FIFO 顺序。
  • LinkedBlockingQueue: 基于链表实现的无界队列,FIFO 顺序。
  • SynchronousQueue: 同步队列,每个插入操作都需要等待一个相应的移除操作。

4、ThreadFactory 接口: 线程工厂,用于创建新的线程。

常用实现:

  • Executors.defaultThreadFactory(): 默认的线程工厂,用于创建新的线程。
  • 自定义线程工厂: 可以自定义线程名、线程优先级等属性。

5、RejectedExecutionHandler 接口: 拒绝策略,当线程池无法处理新任务时,会调用拒绝策略处理该任务。

常用实现:

  • AbortPolicy: 直接抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy: 由调用者线程执行任务。
  • DiscardPolicy: 丢弃任务。
  • DiscardOldestPolicy: 丢弃队列中最旧的任务。

二、线程池工作流程

1、提交任务: 当调用 execute(Runnable command) 方法提交任务时,线程池会先判断当前线程池的状态:

  • 线程数 < corePoolSize: 创建一个新的核心线程执行任务。
  • 线程数 >= corePoolSize: 将任务放入任务队列。
  • 任务队列已满 && 线程数 < maximumPoolSize: 创建一个新的非核心线程执行任务。
  • 任务队列已满 && 线程数 >= maximumPoolSize: 调用拒绝策略处理该任务。

2、执行任务: Worker 线程会从任务队列中获取任务并执行,执行完成后会再次尝试从队列中获取任务。

3、管理线程: 线程池会根据任务情况动态调整线程数量:

  • 空闲线程超过 keepAliveTime: 回收空闲线程。
  • 任务增加: 创建新的线程处理任务。

4、关闭线程池: 当调用 shutdown() 或 shutdownNow() 方法时,线程池会停止接受新的任务,并等待所有已提交的任务执行完毕后关闭线程池。


jdk中四种线程池使用场景说明

1、newFixedThreadPool(int nThreads)

场景: 适合执行需要固定数量线程的任务,例如处理网络请求、数据库连接池等。

优点:

  • 线程数量固定,不会出现创建过多的线程导致资源耗尽的情况。
  • 可以控制并发程度,避免资源竞争和死锁。

缺点:

  • 线程池大小固定,无法根据任务负载动态调整。
  • 当任务量大时,可能会造成任务积压。

注意事项:

  • 线程池大小固定,如果任务过多,会造成任务积压,导致响应缓慢。
  • 当所有线程都处于繁忙状态时,新任务会被放入队列等待。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建固定大小为 5 的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交 10 个任务
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is executing task");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

2、newCachedThreadPool()

场景: 适合执行大量短时间任务,例如处理 HTTP 请求、文件上传下载等。

优点

  • 可以根据需要动态创建线程,避免资源浪费。
  • 能够快速响应大量短时间任务。

缺点

  • 可能创建过多的线程,导致系统资源耗尽。
  • 频繁创建销毁线程,会造成性能损耗。

注意事项:

  • 可能会创建大量线程,导致系统资源耗尽。
  • 线程空闲超过 60 秒就会被回收,频繁创建销毁线程,会造成性能损耗。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        // 创建可缓存线程池
        ExecutorService executor = Executors.newCachedThreadPool();

        // 提交 10 个任务
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is executing task");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

3、newSingleThreadExecutor

场景: 适合执行串行任务,例如处理数据库操作、文件读写等。

优点

  • 保证任务按顺序执行。
  • 避免多线程竞争资源。

缺点

  • 无法利用多核 CPU 的优势,效率较低。
  • 当单个任务执行时间过长时,会影响其他任务的执行。

注意事项:

  • 仅创建单个线程,所有任务都由该线程串行执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        // 创建单线程线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 提交 10 个任务
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is executing task");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

4、newScheduledThreadPool(int corePoolSize)

场景: 适合执行定时任务和周期性任务,例如定期数据备份、定时清理缓存等。

优点

  • 支持定时任务和周期性任务执行。
  • 可以灵活控制任务执行时间。

缺点

  • 需要手动管理任务时间间隔。
  • 当任务执行时间过长时,可能会影响其他任务的执行。

注意事项:

  • 需要设置核心线程数和任务执行的时间间隔。
  • 建议使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法来执行定时任务。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        // 创建带定时功能的线程池
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        // 定时任务:每隔 1 秒执行一次
        executor.scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName() + " is executing scheduled task");
        }, 1, 1, TimeUnit.SECONDS);

        executor.shutdown();
    }
}


线程池运行情况监控

一、开源项目

1、Zimug-Monitor-ThreadPool

实现原理:

  • 使用 Java Agent 技术在程序启动时修改 ThreadPoolExecutor 类的字节码。
  • 在 ThreadPoolExecutor 的构造函数中添加代码,将新建的线程池添加到一个全局集合中。
  • 通过定时任务监控全局集合中的所有线程池,获取状态信息并输出到日志或图形界面。

优势:

  • 专注于线程池监控,提供丰富的线程池状态信息,包括活动线程数、队列大小、已完成任务数量、核心线程数、最大线程数、保持活动时间等。
  • 可以根据监控信息进行综合分析,判断线程池是否处于过载、队列是否满等情况,并输出警告信息。

劣势:

  • 需要使用 Java Agent 技术,需要修改程序的字节码,可能需要重新编译程序。
  • 监控线程池的性能影响可能会比较大,因为需要不断地获取线程池的状态信息

2、JavaMelody

实现原理:

  • 使用 Java Agent 技术,在程序启动时修改部分核心类,例如 ClassLoader、Thread 等。
  • 通过 AOP 技术拦截方法调用,获取程序运行的性能指标,例如 CPU 使用率、内存占用、方法执行时间等。
  • 提供 Web 界面展示性能指标,并提供一些诊断和分析工具

优势:

  • 功能强大,可以监控程序的各个方面,包括线程池、内存、CPU、网络等。
  • 提供丰富的诊断工具,可以帮助用户分析程序性能问题。
  • 提供 Web 界面,方便用户查看和分析监控数据。

劣势:

  • 监控范围比较广,可能导致监控数据比较多,增加分析难度。
  • 性能影响比较大,因为需要拦截方法调用,获取性能指标

3、Spring Boot Actuator

原理:

Spring Boot Actuator 提供了 Endpoint 机制,可以暴露程序的运行状态信息,包括线程池信息。Actuator 使用 Micrometer 库收集性能指标,并通过 HTTP 协议暴露指标数据。

监控线程池的原理:

  1. 集成 Micrometer 库: Spring Boot Actuator 默认集成 Micrometer 库,用于收集程序的性能指标。
  2. 创建 ThreadPoolMetrics 对象: Micrometer 库提供了 ThreadPoolMetrics 类,用于收集线程池的性能指标。
  3. 注册 ThreadPoolMetrics 对象: 将 ThreadPoolMetrics 对象注册到 MeterRegistry 中,并关联到相应的线程池。
  4. 使用 Endpoint 暴露指标数据: Actuator 提供了 /actuator/metrics 端点,可以获取程序的性能指标数据,包括线程池的指标数据。

优点:

  • 简单易用: Spring Boot Actuator 提供了开箱即用的线程池监控功能,无需额外配置
  • 集成方便: Actuator 与 Micrometer 库集成,可以方便地收集其他性能指标。
  • 功能强大: Actuator 提供了丰富的端点,可以获取程序的各种运行状态信息,包括线程池、内存、CPU 等。
  • 可扩展性强: Actuator 可以自定义 Endpoint,满足各种监控需求。

缺点:

  • 性能影响: Actuator 的监控机制会对程序性能造成一定影响,特别是高性能程序
  • 依赖 Spring Boot: Actuator 只能在 Spring Boot 项目中使用。
  • 监控范围有限: Actuator 主要监控 Spring Boot 应用程序,对其他程序的监控功能有限。
=====1、开启监控
management.endpoint.metrics.enabled=true

=====2、启动 Spring Boot 应用程序后,访问 http://localhost:8080/actuator/metrics,
可以获取应用程序的性能指标数据,包括线程池的指标数据。
{
  "names": [
    "system.cpu.usage",
    "jvm.memory.used",
    "jvm.memory.max",
    "jvm.threads.count",
    "http.server.requests",
    "threadpool.executor.active",
    "threadpool.executor.queue.size",
    "threadpool.executor.completed",
    "threadpool.executor.rejected"
  ]
}

4、Prometheus with Grafana


二、自定义实现

  1. ThreadPoolMonitorAgent 类: 与之前代码基本一致,使用 Java Agent 技术监控所有创建的 ThreadPoolExecutor 线程池。
  2. monitorThreadPool() 方法:
  • 添加了更多监控指标的获取,包括 corePoolSize、maximumPoolSize、keepAliveTime。
  • 添加了线程池状态判断,包括 isRunning(判断线程池是否处于运行状态)和 isSaturated(判断线程池是否处于饱和状态)。
  • 打印或记录监控信息,包括线程池状态、监控指标和综合分析结果。
  • 综合分析线程池运行情况,根据监控指标判断线程池是否处于过载、队列是否满等情况,并输出警告信息。
import java.lang.instrument.Instrumentation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ThreadPoolMonitorAgent {

    private static final Set<ThreadPoolExecutor> monitoredPools = new HashSet<>();

    public static void premain(String agentArgs, Instrumentation inst) {
        inst.addTransformer((loader, className, classfileBuffer, protectionDomain) -> {
            if (className.equals("java/util/concurrent/ThreadPoolExecutor")) {
                return new ClassFileTransformer() {
                    @Override
                    public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) {
                        try {
                            return instrumentClass(classfileBuffer);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return classfileBuffer;
                        }
                    }
                }.transform(loader, className, null, protectionDomain, classfileBuffer);
            }
            return classfileBuffer;
        });
    }

    private static byte[] instrumentClass(byte[] classfileBuffer) throws Exception {
        // 使用 ASM 或其他字节码操作库修改 ThreadPoolExecutor 类
        // 在构造函数中添加代码来监控线程池
        // 例如,将线程池添加到 monitoredPools 集合中
        return classfileBuffer; // 这里需要实际修改字节码
    }

    // 监控线程池状态的代码
    public static void monitorThreadPool() {
        for (ThreadPoolExecutor pool : monitoredPools) {
            // 获取线程池状态信息
            int activeCount = pool.getActiveCount();
            int queueSize = pool.getQueue().size();
            long completedTaskCount = pool.getCompletedTaskCount();
            int corePoolSize = pool.getCorePoolSize();
            int maximumPoolSize = pool.getMaximumPoolSize();
            long keepAliveTime = pool.getKeepAliveTime(TimeUnit.SECONDS);

            // 判断线程池状态
            boolean isRunning = !pool.isShutdown() && !pool.isTerminated();
            boolean isSaturated = queueSize >= maximumPoolSize;

            // 打印或记录监控信息
            System.out.println("ThreadPool: " + pool);
            System.out.println("Status: " + (isRunning ? "RUNNING" : "SHUTDOWN"));
            System.out.println("Active Count: " + activeCount);
            System.out.println("Queue Size: " + queueSize);
            System.out.println("Completed Task Count: " + completedTaskCount);
            System.out.println("Core Pool Size: " + corePoolSize);
            System.out.println("Maximum Pool Size: " + maximumPoolSize);
            System.out.println("Keep Alive Time: " + keepAliveTime + " seconds");
            System.out.println("Saturated: " + isSaturated);
            System.out.println("--------------------------------------");

            // 综合分析线程池运行情况
            if (activeCount >= maximumPoolSize) {
                System.out.println("WARNING: ThreadPool is overloaded. Consider increasing maximumPoolSize.");
            }
            if (queueSize > 0 && isSaturated) {
                System.out.println("WARNING: ThreadPool queue is full. Tasks may be rejected.");
            }
        }
    }
}

// 在程序启动时添加 agent 参数:
// -javaagent:path/to/ThreadPoolMonitorAgent.jar


第三方线程池框架

  • Apache Commons Pool2: 提供了对象池管理机制,可以用于线程池管理。
  • Google Guava: 包含 ListeningExecutorService 等类,提供更丰富的线程池监控和控制功能。
  • Spring Framework: 提供了 TaskExecutor 接口,可以使用 Spring 的 ThreadPoolTaskExecutor 实现线程池管理,并与 Spring 容器集成。
  • Hutool

java程序堆栈信息线程池分析

1、使用 jstack 命令

  • 导出堆栈信息: 在命令行中使用 jstack 命令,并指定 Java 进程的 PID,例如:
  • jstack 12345 > thread_dump.txt

2、使用 JProfiler 或 VisualVM 等性能分析工具

  • 获取线程堆栈信息: 使用这些工具连接到 Java 进程,并选择 "Threads" 或 "Thread Dump" 视图,可以查看所有线程的堆栈信息。
  • 分析线程池状态: 这些工具通常提供更直观的视图,可以更方便地识别线程池线程、分析线程状态、查看任务执行逻辑以及寻找死锁。
  • 生成报告: 这些工具可以生成包含线程堆栈信息和其他性能数据的报告,方便分享给其他团队成员。

3、在线工具

https://gceasy.io/index.jsp

https://github.com/PerfMa/XPocket

https://arthas.aliyun.com/doc/

相关文章

java程序、小程序和应用程序三者为什么可以共同存在

今天跟大家总结一下java程序、小程序和应用程序三者之间的应用关系,如果你有更好的想法,可以写在评论区。一个程序中可以有多个类,但只能有一个类是主类。在Java应用程序中,这个主类是指包含main()...

深入探究hprof文件:如何分析Java应用程序性能瓶颈

Java应用程序的性能问题是开发人员面临的一个常见挑战。为了解决这些问题,开发人员需要了解应用程序的性能瓶颈所在,并采取相应的措施来优化它们。在此过程中,hprof文件是一种非常有用的工具,可以帮助开...

JSTAT命令-对Java应用程序的资源和性能进行实时的监控

对于监控JVM,jdk也提供了很多工具,供我们来使用,其中命令jstat,是JDK自带的一个轻量级小工具,可以查看堆内存各部分的使用量,以及加载类的数量,对Java应用程序的资源和性能进行实时的监控...

Netty 框架学习——基于 Netty 的 HTTP/HTTPS 应用程序

2|0 通过SSL/TLS保护应用程序SSL 和 TLS 安全协议层叠在其他协议之上,用以实现数据安全。为了支持 SSL/TLS,Java 提供了 javax.net.ssl 包,它的 SSLCont...

对比 java,go,c# 生成的本地可执行程序

现在java,go和csharp都可以编译成多个平台的本地可执行程序了,我们今天就来对比测试下他们生成本地程序后对系统资源的占用差别。java的我用quarkus框架来做个测试,虽然springboo...

Java窗体应用程序人事管理系统web人资招聘员工劳资jsp源码mysql

本项目为前几天收费帮学妹做的一个项目,Java EE JSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述Java窗体应用程序人事管...