Java四种线程池和参数详解 java线程池有几种线程
一、四种线程池
Java通过Executors提供四种静态方法来创建线程池
例如:
//创建一个可缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//执行任务
cachedThreadPool.execute(Runnable command);
如下:
1.newSingleThreadPool 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。
2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3.newScheduledThreadPool 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
4.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需求,可灵活回收空闲线程,若无可回收线程,则新建线程。
二、核心参数
四种线程池本质都是创建ThreadPoolExecutor类,ThreadPoolExecutor构造参数如下
1.int corePoolSize 核心线程数量
2.int maximumPoolSize 最大线程数量
3.long keepAliveTime 超过corePoolSize的线程多久不活动别销毁时间
4.TimeUnit unit 时间单位
5.BlockingQueue<Runnable> workQueue 任务队列
6.ThreadFactory threadFactory 线程池工厂
7.RejectedExecutionHandler handler 拒绝策略
三、线程池工厂 ThreadFactory
所在包位置:java.util.concurrent.ThreadFactory
ThreadFactory接口。这个接口是Java自身提供的,用户可以实现它自定义自己的线程启动方式,可以设置线程名称、类型以及优先级等属性。
ThreadFactory vs Default ThreadFactory:
在一个典型的Java ExecutorService程序中,其线程都需要被指定以何种形式运行,如果程序初始化ExecutorService时没有指定ThreadFactory,程序会采用一个默认的ThreadFactory来生成提交线程,但是对于一个严谨对程序来说,定义自己的ThreadFactory永远是个最佳选择。Why??
1.设置更有描述意义的线程名称。如果使用默认的ThreadFactory,它给线程起名字大概规律就是pool-m-thread-n这个样子,如pool-1-thread-1。但是当你分析一个thread dump时,看着这样的名字就很难知道线程的目的。所以使用一个有描述意义的线程名称是分析追踪问题的clue NO.1。
2.设置线程是否是守护线程,默认的ThreadFactory总是提交非守护线程
3.设置线程优先级,默认ThreadFactory总是提交的一般优先级线程
例子:
CustomThreadFactoryBuilder类实现了一种优雅的Builder Mechanism方式去得到一个自定义ThreadFactory实例。ThreadFactory接口中有一个接受Runnable类型参数的方法newThread(Runnable r),你自己的factory逻辑就应该写在这个方法中,去配置线程名称、优先级、守护线程状态等属性。
public class CustomThreadFactoryBuilder {
private String namePrefix = null;
private boolean daemon = false;
private int priority = Thread.NORM_PRIORITY;
public CustomThreadFactoryBuilder setNamePrefix(String namePrefix) {
if (namePrefix == null) {
throw new NullPointerException();
}
this.namePrefix = namePrefix;
return this;
}
public CustomThreadFactoryBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public CustomThreadFactoryBuilder setPriority(int priority) {
if (priority < Thread.MIN_PRIORITY){
throw new IllegalArgumentException(String.format(
"Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));
}
if (priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(String.format(
"Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));
}
this.priority = priority;
return this;
}
public ThreadFactory build() {
return build(this);
}
private static ThreadFactory build(CustomThreadFactoryBuilder builder) {
final String namePrefix = builder.namePrefix;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final AtomicLong count = new AtomicLong(0);
/*
return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
if (namePrefix != null) {
thread.setName(namePrefix + "-" + count.getAndIncrement());
}
if (daemon != null) {
thread.setDaemon(daemon);
}
if (priority != null) {
thread.setPriority(priority);
}
return thread;
}
};*/
//jdk8中还是优先使用lamb表达式
return (Runnable runnable) -> {
Thread thread = new Thread(runnable);
if (namePrefix != null) {
thread.setName(namePrefix + "-" + count.getAndIncrement());
}
if (daemon != null) {
thread.setDaemon(daemon);
}
/*
thread.setPriority(priority);
*/
return thread;
};
}
}
SimpleTask类实现类Runnable接口,打印出了线程的运行属性(名称,优先级等)。
public class SimpleTask implements Runnable {
private long sleepTime;
public SimpleTask(long sleepTime) {
super();
this.sleepTime = sleepTime;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Simple task is running on "
+ Thread.currentThread().getName() + " with priority " + Thread.currentThread().getPriority());
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
CustomThreadFactoryDemo类使用我们上面的CustomThreadFactoryBuilder类创建类一个ThreadFactory实例,又使用这个实例获得类一个ExecutoryService,这样所有这个线程池中的线程都会按我们定义好的属性被生成,下面代码中执行类三个SimpleTask。
public class CustomThreadFactoryDemo {
public static void main(String[] args) {
ThreadFactory customThreadfactory = new CustomThreadFactoryBuilder()
.setNamePrefix("DemoPool-Thread").setDaemon(false)
.setPriority(Thread.MAX_PRIORITY).build();
ExecutorService executorService = Executors.newFixedThreadPool(3,
customThreadfactory);
// Create three simple tasks with 1000 ms sleep time
SimpleTask simpleTask1 = new SimpleTask(1000);
SimpleTask simpleTask2 = new SimpleTask(1000);
SimpleTask simpleTask3 = new SimpleTask(1000);
// Execute three simple tasks with 1000 ms sleep time
executorService.execute(simpleTask1);
executorService.execute(simpleTask2);
executorService.execute(simpleTask3);
}
}
输出结果:
Simple task is running on DemoPool-Thread-0 with priority 10
Simple task is running on DemoPool-Thread-1 with priority 10
Simple task is running on DemoPool-Thread-2 with priority 10
Simple task is running on DemoPool-Thread-0 with priority 10
Simple task is running on DemoPool-Thread-1 with priority 10
Simple task is running on DemoPool-Thread-2 with priority 10
Simple task is running on DemoPool-Thread-0 with priority 10
Simple task is running on DemoPool-Thread-1 with priority 10
Simple task is running on DemoPool-Thread-2 with priority 10
Simple task is running on DemoPool-Thread-0 with priority 10
Simple task is running on DemoPool-Thread-1 with priority 10
Simple task is running on DemoPool-Thread-2 with priority 10
四、任务队列 BlockingQueue
Tops:BlockingQueue不能够添加null对象,否则会抛出空指针异常。
接口抽象方法
boolean add(E e);
添加元素,添加成功返回true ,添加失败抛出异常 IllegalStateException。
boolean offer(E e);
true:添加元素成功 ; false : 添加元素失败。
void put(E e) throws InterruptedException;
添加元素,直到有空间添加成功才会返回,阻塞方法。
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
true:添加数据成功,false:超时时间到。
E take() throws InterruptedException;
获取队头的元素,阻塞方法,会一直等到有元素获取到才会返回,获取到元素时并将队列中的该元素删除。
E poll(long timeout, TimeUnit unit) throws InterruptedException;
获取队头的元素,阻塞方法,超时时间到则返回null,获取到元素时并将队列中的该元素删除。
int remainingCapacity();
返回理想情况下此队列可以添加的其他元素的数量.
boolean remove(Object o);
移除指定的元素。
boolean contains(Object o);
检查是否包含该元素
int drainTo(Collection<? super E> c);
移除队列中的所有元素并添加到集合c,返回被移除元素的数量。
int drainTo(Collection<? super E> c, int maxElements);
移除队列中maxElements个元素并添加到集合c,返回被移除元素的数量。
实现类
Tops: 所有的实现类都是并发安全的。
ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
SynchronousQueue
它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。
LinkedBlockingDeque
LinkedBlockingDeque就是一个双向队列,任何一端都可以进行元素的出入。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
LinkedBlockingQueue
LinkedBlockingQueue是一个单向队列,只能一端出一端入的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
DelayQueue
是一个支持延时获取元素的无界阻塞队列。内部用 PriorityQueue 实现。
LinkedTransferQueue
PriorityBlockingQueue
PriorityBlockingQueue是带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
五、拒绝策略 RejectedExecutionHandle
在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,针对这些问题java线程池提供了以下几种策略:
1. AbortPolicy
2. DiscardPolicy
3. DiscardOldestPolicy
4. CallerRunsPolicy
5. 自定义策略
AbortPolicy
该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//不做任何处理,直接抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy
这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//就是一个空的方法
}
DiscardOldestPolicy
这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。
源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//移除队头元素
e.getQueue().poll();
//再尝试入队
e.execute(r);
}
}
CallerRunsPolicy
使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。
源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//直接执行run方法
r.run();
}
}
自定义策略
如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。
例如:我定义了我的一个拒绝策略,叫做MyRejectPolicy,里面的逻辑就是打印处理被拒绝的任务内容
public class MyRejectPolicy implements RejectedExecutionHandler{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//Sender是我的Runnable类,里面有message字段
if (r instanceof Sender) {
Sender sender = (Sender) r;
//直接打印
System.out.println(sender.getMessage());
}
}
}
Tops:这几种策略没有好坏之分,只是适用不同场景,具体哪种合适根据具体场景和业务需要选择,如果需要特殊处理就自己定义好了。