原创

第十一篇 : 线程池


简介

  • 第四种获取线程的方法:线程池,一个 ExecutorService,它使用可能的几个池线程之 一执行每个提交的任务,通常使用 Executors 工厂方法配置。
  • 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在 执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行 任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数 据,如完成的任务数。
  • 为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但 是,强烈建议程序员使用较为方便的 Executors 工厂方法 :
    • Executors.newSingleThreadExecutor() :该方法返回一个只有一个线程的线程池,若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
    • Executors.newFixedThreadPool(int) :该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个新的任务队列中,待有空闲时,便处理在任务队列中的任务。
    • Executors.newCachedThreadPool() :该方法返回一个可以根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可以复用的线程。若所有的线程均在工作,又有新的任务提交,则会创建新的线程处理任务,所有线程在当前任务执行完毕后,将返回线程池进行复用。
    • Executors.newSingleThreadScheduledExecutor():该方法返回一个ScheduledExecutorService对象,线程池大小为 1。ScheduleExectorService 接口在ExecutorService 接口之上扩展了给定时间执行某任务的功能,如果某个固定的延时之后,或者周期性执行某个任务。
    • newScheduleThreadPool():该方法也返回一个ScheduledExecutorService 对象,但该线程池可以指定线程数量。

一、实例

1. 固定大小的线程池(1)

package com.gf.demo;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPool {

    public static void main(String args[]){
        //1. 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool( 5 );

        ThreadPoolDemo td = new ThreadPoolDemo();
        //2. 为线程池中的线程分配任务
        for (int i = 0 ; i < 10 ; i++) {
            pool.submit( td );
        }

        //3. 关闭线程池
        pool.shutdown();

    }

}

class ThreadPoolDemo implements Runnable {

    @Override
    public void run() {
        for (int i = 1 ; i <= 100 ; i++) {
            System.out.println(Thread.currentThread().getName()+ " : " + i);
        }
    }
}

2. 固定大小线程池(2)

package com.gf.demo;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestThreadPool {

    public static void main(String args[]){
        //1. 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool( 5 );

        ThreadPoolDemo td = new ThreadPoolDemo();
        //2. 为线程池中的线程分配任务
        List<Future> list = new ArrayList<>();
        for (int i = 0 ; i < 10 ; i++) {
            Future<Integer> submit = pool.submit( td );
            list.add( submit );
        }

        //3. 关闭线程池
        pool.shutdown();

        for (Future future : list) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

    }

}

class ThreadPoolDemo implements Callable<Integer> {


    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1 ; i <= 100 ; i++) {
            sum += i;
        }
        return sum;
    }
}

3. 计划任务

package com.gf.demo;


import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduledThreadPool {

    public static void main(String args[]) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool( 5 );

        for (int i = 1; i <= 5; i++) {
            //i 秒后执行一次
            pool.schedule( new Runnable() {
                @Override
                public void run() {
                    int num = new Random().nextInt( 1000 );
                    System.out.println( Thread.currentThread().getName() + " : " + num );
                }
            }, i, TimeUnit.SECONDS );
        }

        //延迟6秒执行,周后2秒执行1次
        pool.scheduleAtFixedRate( new Runnable() {
            int i = 0;

            @Override
            public void run() {
                System.out.println( "周期任务,执行了 " + ++i + " 次" );
            }
        }, 6, 2, TimeUnit.SECONDS );
    }
}

4. 自定义线程池

线程池不建议使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式写的同学更加明确线程池的运行规则,避免资源耗尽的风险。Executors各个方法的弊端:

    1. newFixedThreadPoolnewSingelThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
    1. newCahedThreadPoolnewScheduleThreadPool:主要问题是线程最大数是Integer.MAX_VALUE,可能会创建数量非常对的线程,甚至OOM。

ThreadPoolExecutor 构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数含义如下:

  • corePoolSize:指定了线程池中的线程数量。
  • maximumPoolSize:指定了线程池中最大线程数量。
  • keepAliveTime:当前线程池线程数量超过 corePoolSize 时,多余的空闲线程的存活时间。
  • unit:keepAliveTime 的单位。
  • workQueue:任务队列,被提交但尚未执行的任务。
  • threadFactory:线程工厂,用于创建线程,一般默认的即可。
  • handler:拒绝策略。但任务太多来不及处理,如何拒绝任务。

自定义线程池比较好的方式:

//方式1
public void example1() {

    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(
            1,
            new BasicThreadFactory.Builder().namingPattern( "example-schedule-pool-%d" ).daemon( true ).build()
    );
}
//方式2
public void example2() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

    ExecutorService pool = new ThreadPoolExecutor(
            5 ,
            200 ,
            0L ,
            TimeUnit.MICROSECONDS,
            new LinkedBlockingQueue<>( 1024 ),
            new ThreadPoolExecutor.AbortPolicy());
    pool.execute( () -> System.out.println( Thread.currentThread().getName() ) );
    pool.shutdown();

}
<!-- 方式3 -->
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="10" />
    <property name="maxPoolSize" value="100" />
    <property name="queueCapacity" value="2000" />
    <property name="threadFactory" value="threadFactory" />
    <property name="rejectedExecutionHandler">
        <ref local="rejectedExecutionHandler" />
    </property>
</bean>

JDK内置的拒绝策略如下:

  • AbortPolicy策略:该策略会抛出异常,阻止系统正常工作。
  • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做,不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
  • DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并再次尝试提交当前任务。
  • DiscardPolicy策略:该策略默默丢弃无法处理任务,不予任何处理。如果允许任务丢弃,我觉得这可能是最好的一种方案。

自定义拒绝策略:

package com.gf.demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


public class TestUseThreadPool {

    public static void main(String args[]){
        UseTask ut = new UseTask();
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>( 5 );
        ExecutorService executor = new ThreadPoolExecutor(
                5,
                10,
                120,
                TimeUnit.SECONDS,
                queue,
                new UseRejected()
        );

        for (int i = 1 ; i <= 20 ; i++) {
            executor.execute( ut );
        }

        try {
            Thread.sleep( 1000 );
            System.out.println("queue size : " + queue.size());
            Thread.sleep( 2000 );
            executor.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

class UseTask implements Runnable {

    private static AtomicInteger count = new AtomicInteger(0);

    @Override
    public void run() {
        try {
            int temp = count.incrementAndGet();
            System.out.println("任务" + temp);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

class UseRejected implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定义拒绝策略...");
        System.out.println("当前被拒绝的任务为 : " + r.toString());
    }
}

juc
  • 作者:程序员果果
  • 发表时间:2018-11-08 09:33
  • 版权声明:自由转载-非商用-非衍生-保持署名 (创意共享4.0许可证)
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论