Java四种线程池和参数详解 java线程池有几种线程

createh52周前 (12-17)技术教程20

一、四种线程池

Java通过Executors提供四种静态方法来创建线程池

例如:

//创建一个可缓存线程池

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

//执行任务

cachedThreadPool.execute(Runnable command);

如下:

1.newSingleThreadPool 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。

2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

3.newScheduledThreadPool 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。

4.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需求,可灵活回收空闲线程,若无可回收线程,则新建线程。

二、核心参数

四种线程池本质都是创建ThreadPoolExecutor类,ThreadPoolExecutor构造参数如下

1.int corePoolSize 核心线程数量

2.int maximumPoolSize 最大线程数量

3.long keepAliveTime 超过corePoolSize的线程多久不活动别销毁时间

4.TimeUnit unit 时间单位

5.BlockingQueue<Runnable> workQueue 任务队列

6.ThreadFactory threadFactory 线程池工厂

7.RejectedExecutionHandler handler 拒绝策略

三、线程池工厂 ThreadFactory

所在包位置:java.util.concurrent.ThreadFactory

ThreadFactory接口。这个接口是Java自身提供的,用户可以实现它自定义自己的线程启动方式,可以设置线程名称、类型以及优先级等属性。

ThreadFactory vs Default ThreadFactory:

在一个典型的Java ExecutorService程序中,其线程都需要被指定以何种形式运行,如果程序初始化ExecutorService时没有指定ThreadFactory,程序会采用一个默认的ThreadFactory来生成提交线程,但是对于一个严谨对程序来说,定义自己的ThreadFactory永远是个最佳选择。Why??

1.设置更有描述意义的线程名称。如果使用默认的ThreadFactory,它给线程起名字大概规律就是pool-m-thread-n这个样子,如pool-1-thread-1。但是当你分析一个thread dump时,看着这样的名字就很难知道线程的目的。所以使用一个有描述意义的线程名称是分析追踪问题的clue NO.1。

2.设置线程是否是守护线程,默认的ThreadFactory总是提交非守护线程

3.设置线程优先级,默认ThreadFactory总是提交的一般优先级线程

例子:

CustomThreadFactoryBuilder类实现了一种优雅的Builder Mechanism方式去得到一个自定义ThreadFactory实例。ThreadFactory接口中有一个接受Runnable类型参数的方法newThread(Runnable r),你自己的factory逻辑就应该写在这个方法中,去配置线程名称、优先级、守护线程状态等属性。

public class CustomThreadFactoryBuilder {

    private String namePrefix = null;

    private boolean daemon = false;

    private int priority = Thread.NORM_PRIORITY;

    public CustomThreadFactoryBuilder setNamePrefix(String namePrefix) {

        if (namePrefix == null) {

            throw new NullPointerException();

        }

        this.namePrefix = namePrefix;

        return this;

    }

    public CustomThreadFactoryBuilder setDaemon(boolean daemon) {

        this.daemon = daemon;

        return this;

    }

    public CustomThreadFactoryBuilder setPriority(int priority) {

        if (priority < Thread.MIN_PRIORITY){

            throw new IllegalArgumentException(String.format(

                    "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));

        }

        if (priority > Thread.MAX_PRIORITY) {

            throw new IllegalArgumentException(String.format(

                    "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));

        }

        this.priority = priority;

        return this;

    }

    public ThreadFactory build() {

        return build(this);

    }

    private static ThreadFactory build(CustomThreadFactoryBuilder builder) {

        final String namePrefix = builder.namePrefix;

        final Boolean daemon = builder.daemon;

        final Integer priority = builder.priority;

        final AtomicLong count = new AtomicLong(0);

         /*

        return new ThreadFactory() {

            @Override

            public Thread newThread(Runnable runnable) {

                Thread thread = new Thread(runnable);

                if (namePrefix != null) {

                    thread.setName(namePrefix + "-" + count.getAndIncrement());

                }

                if (daemon != null) {

                    thread.setDaemon(daemon);

                }

                if (priority != null) {

                    thread.setPriority(priority);

                }

                return thread;

            }

        };*/

        //jdk8中还是优先使用lamb表达式

        return (Runnable runnable) -> {

            Thread thread = new Thread(runnable);

            if (namePrefix != null) {

                thread.setName(namePrefix + "-" + count.getAndIncrement());

            }

            if (daemon != null) {

                thread.setDaemon(daemon);

            }

                /*

                    thread.setPriority(priority);

                */

            return thread;

        };

    }

}

SimpleTask类实现类Runnable接口,打印出了线程的运行属性(名称,优先级等)。

public class SimpleTask implements Runnable {

    private long sleepTime;

    public SimpleTask(long sleepTime) {

        super();

        this.sleepTime = sleepTime;

    }

    @Override

    public void run() {

        while (true) {

            try {

                System.out.println("Simple task is running on " 

                        + Thread.currentThread().getName() + " with priority " + Thread.currentThread().getPriority());

                Thread.sleep(sleepTime);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}


CustomThreadFactoryDemo类使用我们上面的CustomThreadFactoryBuilder类创建类一个ThreadFactory实例,又使用这个实例获得类一个ExecutoryService,这样所有这个线程池中的线程都会按我们定义好的属性被生成,下面代码中执行类三个SimpleTask。

public class CustomThreadFactoryDemo {

    public static void main(String[] args) {

        ThreadFactory customThreadfactory = new CustomThreadFactoryBuilder()

        .setNamePrefix("DemoPool-Thread").setDaemon(false)

        .setPriority(Thread.MAX_PRIORITY).build();

        ExecutorService executorService = Executors.newFixedThreadPool(3,

        customThreadfactory);

        // Create three simple tasks with 1000 ms sleep time

        SimpleTask simpleTask1 = new SimpleTask(1000);

        SimpleTask simpleTask2 = new SimpleTask(1000);

        SimpleTask simpleTask3 = new SimpleTask(1000);

        // Execute three simple tasks with 1000 ms sleep time

        executorService.execute(simpleTask1);

        executorService.execute(simpleTask2);

        executorService.execute(simpleTask3);

    }

}


输出结果:

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

Simple task is running on DemoPool-Thread-0 with priority 10

Simple task is running on DemoPool-Thread-1 with priority 10

Simple task is running on DemoPool-Thread-2 with priority 10

四、任务队列 BlockingQueue

Tops:BlockingQueue不能够添加null对象,否则会抛出空指针异常。

接口抽象方法

boolean add(E e);

添加元素,添加成功返回true ,添加失败抛出异常 IllegalStateException。

boolean offer(E e);

true:添加元素成功 ; false : 添加元素失败。

void put(E e) throws InterruptedException;

添加元素,直到有空间添加成功才会返回,阻塞方法。

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

true:添加数据成功,false:超时时间到。

E take() throws InterruptedException;

获取队头的元素,阻塞方法,会一直等到有元素获取到才会返回,获取到元素时并将队列中的该元素删除。

E poll(long timeout, TimeUnit unit) throws InterruptedException;

获取队头的元素,阻塞方法,超时时间到则返回null,获取到元素时并将队列中的该元素删除。

int remainingCapacity();

返回理想情况下此队列可以添加的其他元素的数量.

boolean remove(Object o);

移除指定的元素。

boolean contains(Object o);

检查是否包含该元素

int drainTo(Collection<? super E> c);

移除队列中的所有元素并添加到集合c,返回被移除元素的数量。

int drainTo(Collection<? super E> c, int maxElements);

移除队列中maxElements个元素并添加到集合c,返回被移除元素的数量。

实现类

Tops: 所有的实现类都是并发安全的。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

SynchronousQueue

它是一个特殊的队列,它的名字其实就蕴含了它的特征 – - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

LinkedBlockingDeque

LinkedBlockingDeque就是一个双向队列,任何一端都可以进行元素的出入。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

LinkedBlockingQueue

LinkedBlockingQueue是一个单向队列,只能一端出一端入的单向队列结构,是有FIFO特性的,并且是通过两个ReentrantLock和两个Condition来实现的。底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

DelayQueue

是一个支持延时获取元素的无界阻塞队列。内部用 PriorityQueue 实现。

LinkedTransferQueue

PriorityBlockingQueue

PriorityBlockingQueue是带排序的 BlockingQueue 实现,其并发控制采用的是 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

五、拒绝策略 RejectedExecutionHandle

在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,针对这些问题java线程池提供了以下几种策略:

1. AbortPolicy

2. DiscardPolicy

3. DiscardOldestPolicy

4. CallerRunsPolicy

5. 自定义策略

AbortPolicy

该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            //不做任何处理,直接抛出异常

            throw new RejectedExecutionException("Task " + r.toString() +

                                                 " rejected from " +

                                                 e.toString());

        }

DiscardPolicy

这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        //就是一个空的方法

        }

DiscardOldestPolicy

这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。

因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

            	//移除队头元素

                e.getQueue().poll();

                //再尝试入队

                e.execute(r);

            }

        }

CallerRunsPolicy

使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。

源码如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                //直接执行run方法

                r.run();

            }

        }


自定义策略

如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。

例如:我定义了我的一个拒绝策略,叫做MyRejectPolicy,里面的逻辑就是打印处理被拒绝的任务内容

public class MyRejectPolicy implements RejectedExecutionHandler{

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        //Sender是我的Runnable类,里面有message字段

        if (r instanceof Sender) {

            Sender sender = (Sender) r;

            //直接打印

            System.out.println(sender.getMessage());

        }

    }

}

Tops:这几种策略没有好坏之分,只是适用不同场景,具体哪种合适根据具体场景和业务需要选择,如果需要特殊处理就自己定义好了。

相关文章

java -jar 启动参数 java -jar 启动参数 内存

/usr/local/java/jdk1.8.0_131/bin/java -jar -server -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=...

腾讯大佬详细讲解Java 启动exe程序,传递参数和获取参数

这篇文章主要介绍了java 启动exe程序,传递参数和获取参数操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧1、java中启动exe程序 ,并添加传参String[] cmd =...

JVM诊断之查看运行参数 jvm运行参数说明

问题描述为了分析和定位一个Java线上系统问题,我们需要查看JVM启动时的一些参数设置,例如:垃圾回收算法、堆大小等等。这些参数可能在启动脚本中明确指明,也可能采用默认值。在系统运行过程中其他人也许动...

基于容器的Java内存参数解析 java内置容器

在基于物理的服务器(此处主要与容器平台进行区分,故此描述)上运行Java应用程序时,我们通常会使用Java虚拟机参数"-Xms、-Xmx"来指定Java堆内存的初始值和最大值。如果要将...

Tomcat、JVM 参数如何调到性能最好?

Tomcat性能调优找到Tomcat根目录下的conf目录,修改server.xml文件的内容。对于这部分的调优,我所了解到的就是无非设置一下Tomcat服务器的最大并发数和Tomcat初始化时创建的...

Java面试常见问题:JVM内存异常及内存参数设置

前文《Java面试必考问题:JVM内存区域如何划分? 》介绍了Java虚拟机的内存区域划分。内存异常问题是程序开发过程中经常遇到的问题,也是面试中常问到的,本文重点介绍一下JVM的内存异常以及相关的内...