package com.gf.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String args[]){
//1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool( 5 );
ThreadPoolDemo td = new ThreadPoolDemo();
//2. 为线程池中的线程分配任务
for (int i = 0 ; i < 10 ; i++) {
pool.submit( td );
}
//3. 关闭线程池
pool.shutdown();
}
}
class ThreadPoolDemo implements Runnable {
@Override
public void run() {
for (int i = 1 ; i <= 100 ; i++) {
System.out.println(Thread.currentThread().getName()+ " : " + i);
}
}
}
package com.gf.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestThreadPool {
public static void main(String args[]){
//1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool( 5 );
ThreadPoolDemo td = new ThreadPoolDemo();
//2. 为线程池中的线程分配任务
List<Future> list = new ArrayList<>();
for (int i = 0 ; i < 10 ; i++) {
Future<Integer> submit = pool.submit( td );
list.add( submit );
}
//3. 关闭线程池
pool.shutdown();
for (Future future : list) {
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
class ThreadPoolDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1 ; i <= 100 ; i++) {
sum += i;
}
return sum;
}
}
package com.gf.demo;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPool {
public static void main(String args[]) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool( 5 );
for (int i = 1; i <= 5; i++) {
//i 秒后执行一次
pool.schedule( new Runnable() {
@Override
public void run() {
int num = new Random().nextInt( 1000 );
System.out.println( Thread.currentThread().getName() + " : " + num );
}
}, i, TimeUnit.SECONDS );
}
//延迟6秒执行,周后2秒执行1次
pool.scheduleAtFixedRate( new Runnable() {
int i = 0;
@Override
public void run() {
System.out.println( "周期任务,执行了 " + ++i + " 次" );
}
}, 6, 2, TimeUnit.SECONDS );
}
}
线程池不建议使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式写的同学更加明确线程池的运行规则,避免资源耗尽的风险。Executors各个方法的弊端:
ThreadPoolExecutor 构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数含义如下:
自定义线程池比较好的方式:
//方式1
public void example1() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(
1,
new BasicThreadFactory.Builder().namingPattern( "example-schedule-pool-%d" ).daemon( true ).build()
);
}
//方式2
public void example2() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(
5 ,
200 ,
0L ,
TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<>( 1024 ),
new ThreadPoolExecutor.AbortPolicy());
pool.execute( () -> System.out.println( Thread.currentThread().getName() ) );
pool.shutdown();
}
<!-- 方式3 -->
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value="threadFactory" />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
JDK内置的拒绝策略如下:
自定义拒绝策略:
package com.gf.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TestUseThreadPool {
public static void main(String args[]){
UseTask ut = new UseTask();
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>( 5 );
ExecutorService executor = new ThreadPoolExecutor(
5,
10,
120,
TimeUnit.SECONDS,
queue,
new UseRejected()
);
for (int i = 1 ; i <= 20 ; i++) {
executor.execute( ut );
}
try {
Thread.sleep( 1000 );
System.out.println("queue size : " + queue.size());
Thread.sleep( 2000 );
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class UseTask implements Runnable {
private static AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
try {
int temp = count.incrementAndGet();
System.out.println("任务" + temp);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class UseRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义拒绝策略...");
System.out.println("当前被拒绝的任务为 : " + r.toString());
}
}
评论