2021-04-02

线程池底层原理你真的懂吗?

与线程池相关的接口实现类关系
Exectutor是接口 Executors 是辅助工具类 就好像是Collection 接口 Collections 工具类

 

使用场景:
对比new Thread() 创建和销毁线程都非常耗时,使用线程池可以达到线程复用/重用。

池化:享元模式,如连接池,常量池

线程池的底层原理及七大参数的意义
七大参数
public ThreadPoolExecutor(
int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,
TimeUnit unit, //临时线程空闲时间
ThreadFactory threadFactory ,//线程工厂
BlockingQueue<Runnable> workQueue,//阻塞队列
RejectedExecutionHandler handler //拒绝策略
)

参数 作用
corePoolSize 核心线程数(银行今日当值网窗口)
maximumPoolSize 最大线程数(银行最多的窗口也就是所有的,最多容纳的,物理窗口上限)
keepAliveTime 空闲线程最大存活时间 当前线程池数量超过corePoolSize并且时间达到keepAliveTime,多余的空闲线程就会被销毁直到线程的数量只剩下corePoolSize
TimeUnitkeepAliveTime 时间单位
workQueue 阻塞任务队列(corePoolSize用完了,客户先去银行里面的候客区)
threadFactory 新建线程工厂(默认,银行网点的logo/工作人员的制服等等都是默认的,都是标配的)
RejectedExecutionHandler 拒绝策略 当提交任务数超过 maxmumPoolSize+workQueue 之和时(所有窗口都用上了并且候客区也满了),任务会交给RejectedExecutionHandler 来处理
3-4 空闲线程的存活时间 (加班窗口如果没有业务量的最多的等待时间),没有业务那么线程数量会慢慢回退,直到线程的数量只剩下corePoolSize

工作原理 很重要!


1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要
获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行
这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法

线程池中的线程执行任务分两种情况,如下。

在execute()方法中创建一个线程时,会让这个线程执行当前任务。
这个线程执行完当前任务后,会反复从BlockingQueue获取任务来执行。
特科
线程池的拒绝策略你谈谈?
是什么

等待队列已经满了,再也塞不下新的任务,同时线程池中的线程数达到了最大线程数,无法继续为新任务服务。
均实现了RejectedExecutionHandler接口

拒绝策略
jdk内置的拒绝策略

AbortPolicy:直接抛出异常。(默认采用此策略),处理程序遭到拒绝将抛出运行时 RejectedExecutionException
CallerRunsPolicy:只用调用者所在线程来运行任务,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。也就是说如果线程池执行不了任务了,就由main线程(main线程调用的线程池)来执行线程池执行不了的任务
DiscardPolicy:不处理,丢弃掉。,不能执行的任务将被删除
DiscardOldestPolicy::丢弃队列里最近的一个任务,并执行当前任务。,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
自定义线程池的使用
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit 不仅可以用于多线程的提交还有Callable,是带返回值的

public static void main(String[] args) {


//手写参数
ExecutorService threadPool = new ThreadPoolExecutor(
2,//核心
5,//最大
1L, //空闲时间 单位是long
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),//任务队列
Executors.defaultThreadFactory(),//线程工厂
//直接抛出异常
new ThreadPoolExecutor.AbortPolicy()

//main线程调用的是这个线程 谁调用我我回退给谁
//你从哪个银行来我们银行办理业务的,你就回哪去
/*new ThreadPoolExecutor.CallerRunsPolicy()*/

//这下面两个都最多执行的maxmunPoolSize 也就是8 剩下的直接抛弃
/*new ThreadPoolExecutor.DiscardOldestPolicy()*/
/* new ThreadPoolExecutor.DiscardPolicy()*/
);
try {
//模拟十个线程变量业务
for (int i = 0; i < 9; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
threadPool.shutdown();//关闭线程池
}
}

submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,也就是虽然你使用了拒绝策略,但是在提交任务这里,是由main线程来做的,所以main线程如果获取不到返回值,就会阻塞不会再继续提交新的任务。

public static void main(String[] args) {
sumbit();
}
private static void sumbit() {
//手写参数
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
//直接抛出异常
new ThreadPoolExecutor.AbortPolicy()

//main线程调用的是这个线程 谁调用我我回退给谁
//你从哪个银行来我们银行办理业务的,你就回哪去
/*new ThreadPoolExecutor.CallerRunsPolicy()*/

//这下面两个都最多执行的maxmunPoolSize 也就是8 剩下的直接抛弃
/*new ThreadPoolExecutor.DiscardOldestPolicy()*/
/* new ThreadPoolExecutor.DiscardPolicy()*/
);
try {
//模拟十个线程变量业务
for (int i = 0; i < 9; i++) {
//获取返回值
Future<String> submit = threadPool.submit(new CallableThread());
//get()方法会阻塞当前线程直到任务完成
//也就是说会阻塞main线程 不会去继续提交新的任务
String str = submit.get();
System.out.println(str);
}
}catch (Exception e) {
e.printStackTrace();
}finally {
threadPool.shutdown();//关闭线程池
}
}

线程池的关闭
它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止

shutdown 只是中断任务队列里面的线程,不中断正在执行的

shutdownnow 中断任务队列和正在运行的

JDK自带的线程池
Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类。通过Executor框架的工具类
Executors,可以创建3种类型的ThreadPoolExecutor。

创建固定大小的线程池:public static ExecutorService newFixedThreadPool(int
nThreads)
单线程池:public static ExecutorService newSingleThreadExecutor()
创建无大小限制的线程池:public static ExecutorService newCachedThreadPool()
或者定期执行任务:public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)


需要了解同步队列的相关知识同步队列前两个是linkedList,后面的是同步队列

 

对Java中的阻塞队列有所了解的话,看到这里或许就能够明白原因了。
Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。

这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。

而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。

上面提到的问题主要体现在newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并不是说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式创建的最大线程数可能是Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM。

为什么不用这些JDK自带的线程池
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。阿里的编程规范中明确指出。

合理配置线程池你是否考虑过?
首先你需要知道cpu核心数

System.out.println(Runtime.getRuntime().availableProcessors());
1
然后根据不同的业务采用不同的策略

CPU 密集型
CPU 密集的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行。
CPU 密集型任务尽可能的少的线程数量,一般为 CPU 核数 + 1 个线程的线程池。
IO 密集型
由于 IO 密集型任务线程并不是一直在执行任务,可以多分配一点线程数,如 CPU * 2 。
也可以使用公式:CPU 核数 / (1 - 阻塞系数);其中阻塞系数在 0.8 ~ 0.9 之间。

本文首发于java黑洞网,博客园同步更新









原文转载:http://www.shaoqun.com/a/660723.html

跨境电商:https://www.ikjzd.com/

联动优势:https://www.ikjzd.com/w/1921

代购公司:https://www.ikjzd.com/w/1982


与线程池相关的接口实现类关系Exectutor是接口Executors是辅助工具类就好像是Collection接口Collections工具类使用场景:对比newThread()创建和销毁线程都非常耗时,使用线程池可以达到线程复用/重用。池化:享元模式,如连接池,常量池线程池的底层原理及七大参数的意义七大参数publicThreadPoolExecutor(intcorePoolSize,//核心
aeo:https://www.ikjzd.com/w/2356
55海淘:https://www.ikjzd.com/w/1723
haofang:https://www.ikjzd.com/w/1046
Amazon名词解读:A9算法:https://www.ikjzd.com/tl/3678
继德国和意大利之后,亚马逊西班牙站也要罢工?:https://www.ikjzd.com/home/333
口述:老婆说如果我出轨她就给饭菜下毒老婆出轨心理:http://lady.shaoqun.com/m/a/30948.html

No comments:

Post a Comment