java线程池实现原理以及应用场景说明
java线程池实现原理
一、核心组件解析
1、ThreadPoolExecutor 类: 它是线程池管理器的核心类,负责创建、销毁和管理线程。
核心方法:
- execute(Runnable command): 将任务提交给线程池执行。
- shutdown(): 开始关闭线程池,不再接受新任务,但会继续执行已提交的任务。
- shutdownNow(): 立即关闭线程池,并尝试停止所有正在执行的任务。
关键属性:
- corePoolSize: 核心线程数,即使没有任务,也会保持这些线程存活。
- maximumPoolSize: 最大线程数,当任务队列满且核心线程数不足时,会创建新的线程,但不超过最大线程数。
- keepAliveTime: 空闲线程的存活时间,当线程数超过核心线程数时,空闲线程会存活一段时间,如果超过该时间,则会被回收。
- workQueue: 任务队列,用于存放等待执行的任务。
- threadFactory: 线程工厂,用于创建新的线程。
- handler: 拒绝策略,当线程池无法处理新任务时,会调用拒绝策略处理该任务。
2、Worker 类: 实际执行任务的线程类,每个 Worker 线程都持有一个 Runnable 对象,用来执行任务。
关键方法:
- run(): 线程启动后,会调用该方法,从任务队列中获取任务并执行。
- isLocked(): 判断当前线程是否正在执行任务。
- processWork(Runnable task): 执行任务,并处理任务执行完后的状态。
3、BlockingQueue 接口: 任务队列,用于存放等待执行的任务。
常用实现:
- ArrayBlockingQueue: 基于数组实现的有界队列,FIFO 顺序。
- LinkedBlockingQueue: 基于链表实现的无界队列,FIFO 顺序。
- SynchronousQueue: 同步队列,每个插入操作都需要等待一个相应的移除操作。
4、ThreadFactory 接口: 线程工厂,用于创建新的线程。
常用实现:
- Executors.defaultThreadFactory(): 默认的线程工厂,用于创建新的线程。
- 自定义线程工厂: 可以自定义线程名、线程优先级等属性。
5、RejectedExecutionHandler 接口: 拒绝策略,当线程池无法处理新任务时,会调用拒绝策略处理该任务。
常用实现:
- AbortPolicy: 直接抛出 RejectedExecutionException 异常。
- CallerRunsPolicy: 由调用者线程执行任务。
- DiscardPolicy: 丢弃任务。
- DiscardOldestPolicy: 丢弃队列中最旧的任务。
二、线程池工作流程
1、提交任务: 当调用 execute(Runnable command) 方法提交任务时,线程池会先判断当前线程池的状态:
- 线程数 < corePoolSize: 创建一个新的核心线程执行任务。
- 线程数 >= corePoolSize: 将任务放入任务队列。
- 任务队列已满 && 线程数 < maximumPoolSize: 创建一个新的非核心线程执行任务。
- 任务队列已满 && 线程数 >= maximumPoolSize: 调用拒绝策略处理该任务。
2、执行任务: Worker 线程会从任务队列中获取任务并执行,执行完成后会再次尝试从队列中获取任务。
3、管理线程: 线程池会根据任务情况动态调整线程数量:
- 空闲线程超过 keepAliveTime: 回收空闲线程。
- 任务增加: 创建新的线程处理任务。
4、关闭线程池: 当调用 shutdown() 或 shutdownNow() 方法时,线程池会停止接受新的任务,并等待所有已提交的任务执行完毕后关闭线程池。
jdk中四种线程池使用场景说明
1、newFixedThreadPool(int nThreads)
场景: 适合执行需要固定数量线程的任务,例如处理网络请求、数据库连接池等。
优点:
- 线程数量固定,不会出现创建过多的线程导致资源耗尽的情况。
- 可以控制并发程度,避免资源竞争和死锁。
缺点:
- 线程池大小固定,无法根据任务负载动态调整。
- 当任务量大时,可能会造成任务积压。
注意事项:
- 线程池大小固定,如果任务过多,会造成任务积压,导致响应缓慢。
- 当所有线程都处于繁忙状态时,新任务会被放入队列等待。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小为 5 的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交 10 个任务
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing task");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
2、newCachedThreadPool()
场景: 适合执行大量短时间任务,例如处理 HTTP 请求、文件上传下载等。
优点:
- 可以根据需要动态创建线程,避免资源浪费。
- 能够快速响应大量短时间任务。
缺点:
- 可能创建过多的线程,导致系统资源耗尽。
- 频繁创建销毁线程,会造成性能损耗。
注意事项:
- 可能会创建大量线程,导致系统资源耗尽。
- 线程空闲超过 60 秒就会被回收,频繁创建销毁线程,会造成性能损耗。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
// 创建可缓存线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 提交 10 个任务
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing task");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
3、newSingleThreadExecutor
场景: 适合执行串行任务,例如处理数据库操作、文件读写等。
优点:
- 保证任务按顺序执行。
- 避免多线程竞争资源。
缺点:
- 无法利用多核 CPU 的优势,效率较低。
- 当单个任务执行时间过长时,会影响其他任务的执行。
注意事项:
- 仅创建单个线程,所有任务都由该线程串行执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
// 创建单线程线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交 10 个任务
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing task");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
4、newScheduledThreadPool(int corePoolSize)
场景: 适合执行定时任务和周期性任务,例如定期数据备份、定时清理缓存等。
优点:
- 支持定时任务和周期性任务执行。
- 可以灵活控制任务执行时间。
缺点:
- 需要手动管理任务时间间隔。
- 当任务执行时间过长时,可能会影响其他任务的执行。
注意事项:
- 需要设置核心线程数和任务执行的时间间隔。
- 建议使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法来执行定时任务。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
// 创建带定时功能的线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 定时任务:每隔 1 秒执行一次
executor.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + " is executing scheduled task");
}, 1, 1, TimeUnit.SECONDS);
executor.shutdown();
}
}
线程池运行情况监控
一、开源项目
1、Zimug-Monitor-ThreadPool
实现原理:
- 使用 Java Agent 技术在程序启动时修改 ThreadPoolExecutor 类的字节码。
- 在 ThreadPoolExecutor 的构造函数中添加代码,将新建的线程池添加到一个全局集合中。
- 通过定时任务监控全局集合中的所有线程池,获取状态信息并输出到日志或图形界面。
优势:
- 专注于线程池监控,提供丰富的线程池状态信息,包括活动线程数、队列大小、已完成任务数量、核心线程数、最大线程数、保持活动时间等。
- 可以根据监控信息进行综合分析,判断线程池是否处于过载、队列是否满等情况,并输出警告信息。
劣势:
- 需要使用 Java Agent 技术,需要修改程序的字节码,可能需要重新编译程序。
- 监控线程池的性能影响可能会比较大,因为需要不断地获取线程池的状态信息。
2、JavaMelody
实现原理:
- 使用 Java Agent 技术,在程序启动时修改部分核心类,例如 ClassLoader、Thread 等。
- 通过 AOP 技术拦截方法调用,获取程序运行的性能指标,例如 CPU 使用率、内存占用、方法执行时间等。
- 提供 Web 界面展示性能指标,并提供一些诊断和分析工具。
优势:
- 功能强大,可以监控程序的各个方面,包括线程池、内存、CPU、网络等。
- 提供丰富的诊断工具,可以帮助用户分析程序性能问题。
- 提供 Web 界面,方便用户查看和分析监控数据。
劣势:
- 监控范围比较广,可能导致监控数据比较多,增加分析难度。
- 性能影响比较大,因为需要拦截方法调用,获取性能指标。
3、Spring Boot Actuator
原理:
Spring Boot Actuator 提供了 Endpoint 机制,可以暴露程序的运行状态信息,包括线程池信息。Actuator 使用 Micrometer 库收集性能指标,并通过 HTTP 协议暴露指标数据。
监控线程池的原理:
- 集成 Micrometer 库: Spring Boot Actuator 默认集成 Micrometer 库,用于收集程序的性能指标。
- 创建 ThreadPoolMetrics 对象: Micrometer 库提供了 ThreadPoolMetrics 类,用于收集线程池的性能指标。
- 注册 ThreadPoolMetrics 对象: 将 ThreadPoolMetrics 对象注册到 MeterRegistry 中,并关联到相应的线程池。
- 使用 Endpoint 暴露指标数据: Actuator 提供了 /actuator/metrics 端点,可以获取程序的性能指标数据,包括线程池的指标数据。
优点:
- 简单易用: Spring Boot Actuator 提供了开箱即用的线程池监控功能,无需额外配置。
- 集成方便: Actuator 与 Micrometer 库集成,可以方便地收集其他性能指标。
- 功能强大: Actuator 提供了丰富的端点,可以获取程序的各种运行状态信息,包括线程池、内存、CPU 等。
- 可扩展性强: Actuator 可以自定义 Endpoint,满足各种监控需求。
缺点:
- 性能影响: Actuator 的监控机制会对程序性能造成一定影响,特别是高性能程序。
- 依赖 Spring Boot: Actuator 只能在 Spring Boot 项目中使用。
- 监控范围有限: Actuator 主要监控 Spring Boot 应用程序,对其他程序的监控功能有限。
=====1、开启监控
management.endpoint.metrics.enabled=true
=====2、启动 Spring Boot 应用程序后,访问 http://localhost:8080/actuator/metrics,
可以获取应用程序的性能指标数据,包括线程池的指标数据。
{
"names": [
"system.cpu.usage",
"jvm.memory.used",
"jvm.memory.max",
"jvm.threads.count",
"http.server.requests",
"threadpool.executor.active",
"threadpool.executor.queue.size",
"threadpool.executor.completed",
"threadpool.executor.rejected"
]
}
4、Prometheus with Grafana
二、自定义实现
- ThreadPoolMonitorAgent 类: 与之前代码基本一致,使用 Java Agent 技术监控所有创建的 ThreadPoolExecutor 线程池。
- monitorThreadPool() 方法:
- 添加了更多监控指标的获取,包括 corePoolSize、maximumPoolSize、keepAliveTime。
- 添加了线程池状态判断,包括 isRunning(判断线程池是否处于运行状态)和 isSaturated(判断线程池是否处于饱和状态)。
- 打印或记录监控信息,包括线程池状态、监控指标和综合分析结果。
- 综合分析线程池运行情况,根据监控指标判断线程池是否处于过载、队列是否满等情况,并输出警告信息。
import java.lang.instrument.Instrumentation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class ThreadPoolMonitorAgent {
private static final Set<ThreadPoolExecutor> monitoredPools = new HashSet<>();
public static void premain(String agentArgs, Instrumentation inst) {
inst.addTransformer((loader, className, classfileBuffer, protectionDomain) -> {
if (className.equals("java/util/concurrent/ThreadPoolExecutor")) {
return new ClassFileTransformer() {
@Override
public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) {
try {
return instrumentClass(classfileBuffer);
} catch (Exception e) {
e.printStackTrace();
return classfileBuffer;
}
}
}.transform(loader, className, null, protectionDomain, classfileBuffer);
}
return classfileBuffer;
});
}
private static byte[] instrumentClass(byte[] classfileBuffer) throws Exception {
// 使用 ASM 或其他字节码操作库修改 ThreadPoolExecutor 类
// 在构造函数中添加代码来监控线程池
// 例如,将线程池添加到 monitoredPools 集合中
return classfileBuffer; // 这里需要实际修改字节码
}
// 监控线程池状态的代码
public static void monitorThreadPool() {
for (ThreadPoolExecutor pool : monitoredPools) {
// 获取线程池状态信息
int activeCount = pool.getActiveCount();
int queueSize = pool.getQueue().size();
long completedTaskCount = pool.getCompletedTaskCount();
int corePoolSize = pool.getCorePoolSize();
int maximumPoolSize = pool.getMaximumPoolSize();
long keepAliveTime = pool.getKeepAliveTime(TimeUnit.SECONDS);
// 判断线程池状态
boolean isRunning = !pool.isShutdown() && !pool.isTerminated();
boolean isSaturated = queueSize >= maximumPoolSize;
// 打印或记录监控信息
System.out.println("ThreadPool: " + pool);
System.out.println("Status: " + (isRunning ? "RUNNING" : "SHUTDOWN"));
System.out.println("Active Count: " + activeCount);
System.out.println("Queue Size: " + queueSize);
System.out.println("Completed Task Count: " + completedTaskCount);
System.out.println("Core Pool Size: " + corePoolSize);
System.out.println("Maximum Pool Size: " + maximumPoolSize);
System.out.println("Keep Alive Time: " + keepAliveTime + " seconds");
System.out.println("Saturated: " + isSaturated);
System.out.println("--------------------------------------");
// 综合分析线程池运行情况
if (activeCount >= maximumPoolSize) {
System.out.println("WARNING: ThreadPool is overloaded. Consider increasing maximumPoolSize.");
}
if (queueSize > 0 && isSaturated) {
System.out.println("WARNING: ThreadPool queue is full. Tasks may be rejected.");
}
}
}
}
// 在程序启动时添加 agent 参数:
// -javaagent:path/to/ThreadPoolMonitorAgent.jar
第三方线程池框架
- Apache Commons Pool2: 提供了对象池管理机制,可以用于线程池管理。
- Google Guava: 包含 ListeningExecutorService 等类,提供更丰富的线程池监控和控制功能。
- Spring Framework: 提供了 TaskExecutor 接口,可以使用 Spring 的 ThreadPoolTaskExecutor 实现线程池管理,并与 Spring 容器集成。
- Hutool
java程序堆栈信息线程池分析
1、使用 jstack 命令
- 导出堆栈信息: 在命令行中使用 jstack 命令,并指定 Java 进程的 PID,例如:
- jstack 12345 > thread_dump.txt
2、使用 JProfiler 或 VisualVM 等性能分析工具
- 获取线程堆栈信息: 使用这些工具连接到 Java 进程,并选择 "Threads" 或 "Thread Dump" 视图,可以查看所有线程的堆栈信息。
- 分析线程池状态: 这些工具通常提供更直观的视图,可以更方便地识别线程池线程、分析线程状态、查看任务执行逻辑以及寻找死锁。
- 生成报告: 这些工具可以生成包含线程堆栈信息和其他性能数据的报告,方便分享给其他团队成员。
3、在线工具
https://gceasy.io/index.jsp
https://github.com/PerfMa/XPocket
https://arthas.aliyun.com/doc/