线程池


线程池是什么

是一种基于池化思想管理线程的工具

线程池解决的是什么问题

  • 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  • 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  • 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为什么要使用线程池

线程池提供了一种限制和管理资源。每个线程池还维护一些基本统计信息。

  • 降低资源消耗(重复利用已创建的线程降低线程的创建和销毁造成的消耗)
  • 提高响应速度(任务达到时不需要等到线程创建就能立即执行)
  • 提高线程的可管理性(使用线程池可以进行统一的分配、调优和监控)

实现原理

预先启动一些线程,线程无限循环从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程,如此反复

线程池业务中的实践

线程池业务中的实践

线程池的运行状态

生命周期的转换

Runnable借口与Callable接口区别

Runnable不会返回结果或者抛出异常

Runnable

public interface Runnable
{
	//被线程执行,没有返回值也无法抛出异常
	public abstract void run();
}

Callable

public interface Callable<V>
{
	//被线程执行,没有返回值也无法抛出异常
	v call() throw Exeption;
}

如何创建线程池

建议使用构造方法的方式实现,避免资源> 耗尽的风险 Executor的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

通过构造方法

ThreadPoolExecutor

通过Executor框架的工具类Executors实现

可以创建三种类型的ThreadPoolExecutor,实际上方法内部都是调用了ThreadPoolExecutor的构造方法

  • FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新任务提交时,如果有空闲线程,立即执行;如果没有,则新的任务会被暂存在一个任务队列中,直到有线程空闲,便处理任务队列中的线程任务。
  • SingleThreadExecutor:方法返回一个只有一个线程的线程池,如果多余一个任务被提交到该线程池,任务会被保留到一个任务队列中,直到有线程空闲,便处理任务队列中的线程任务。
  • CachedThreadPool:该方法返回一个可以根据实际情况调整的线程数量的线程池。线程池的线程数量不确定,如果有空闲的线程可以复用,则优先使用可复用的线程。如果所有线程都在工作,又有新的任务提交,则会创建新的线程处理任务,所有线程再当前任务执行完毕后,将返回线程池进行复用

ThreadPoolExecutor参数分析(重点)

ThreadPoolExecutor3个最重要的参数

  • corePoolSize:核心线程数定义了最小可以同时运行的线程数量
  • maxiumPoolSize:最大线程数,线程池中允许的最大线程数
  • workQueue:阻塞队列,用来存储等待执行的任务,线程安全,当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中

其他常见参数

  • keepAliveTime:当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,超过了keepAliveTime才会被回收销毁
  • unit:keepAliveTime参数的时间单位
  • threadFactory:线程工厂,用来创建线程,默认正常优先级
  • handler:拒绝策略

拒绝策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务

  • ThreadPoolExecutor.AbortPolicy:抛出异常拒绝任务
  • ThreadPoolExecutor.CallerRunsPolicy:重试
  • ThreadPoolExecutor.DiscardPolicy: 直接丢弃
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早

线程池的运行原理

  1. 提交任务
  2. 判断核心线程池的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建),则创建一个新的工作流程来执行任务,如果核心线程都在执行任务,则进入下一个流程
  3. 线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里,如果工作队列满了,进入下一个流程
  4. 判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果满了,则执行拒绝策略

源码解析

提交线程
public void execute(Runnable command) { 
	if (command == null)
		throw new NullPointerException(); 
	int c = ctl.get();
	if (workerCountOf(c) < corePoolSize)
 	{
		if (addWorker(command, true)) 
			return;
		c = ctl.get(); 
	}
	if (isRunning(c) && workQueue.offer(command)) 
	{ 
		int recheck = ctl.get();
		if (! isRunning(recheck) && remove(command))
			reject(command);
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
 	}
	else if (!addWorker(command, false)) 
		reject(command);
	}
  • ctl.get()取的是记录线程状态和线程个数的值,最终需要使用方法workerCountOf(c),获取当前线程数量;
  • 根据当前线程池中线程数量,与核心线程数corePoolSize比较,小于则添加线程到任务执行队列
  • 如果此时线程数量已满,那么需要判断线程池是否为运行状态isRunning(c)。如果是运行状态,则把无法执行的线程放入线程队列中。
  • 放入线程队列后,还需要重新判断线程是否允许以及移除操作,如果非允许且移除,则进行拒绝策略。否则判断线程数量未0后添加新线程。
  • 最后再次尝试添加任务执行,此方法addWorker的第二个入参是false,最终会影响添加执行任务数量判断。如果添加失败进行拒绝策略。

Atomic原子类

主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升

AQS

介绍

抽象队列同步器

AQS可以说是一个给予实现同步锁、同步器的一个框架,很多实现类都在它的的基础上构建的

AQS原理概述

它的核心思想是,如果被请求的共享资源空闲,则当前请求资源线程设置为有效的工作线程,并且将共享资源设置为锁定的状态。如果被请求的共享资源被占用,就需要一套线程阻塞等待以及被唤醒时所分配的机制(这个机制AQS用的是CLH队列锁实现,即将暂时获取不到的锁的线程加入到队列中)

CLH队列是一个虚拟的双向队列,即不存在队列实例,仅存在节点之间的关联,AQS将每条请求共享资源的线程封装成一个CLH锁队列的一个节点来实现锁的分配

AQS相关组件

  • Semaphore(信号量)-允许多个线程同时访问:可以指定多个线程同时访问某个资源。
  • CountDownLatch (倒计时器):可以让某一个线程等到直到倒计时结束,才开始执行。允许多个线程阻塞到一个地方,直到所有线程都执行完毕。
  • CyclicBarrier(循环栅栏):让一组线程到达一个屏障被阻塞,直到最后一个线程到达屏障,屏障才会开门,所有被屏障拦截的线程才会继续执行。



Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • 2379. Minimum Recolors to Get K Consecutive Black Blocks
  • 2471. Minimum Number of Operations to Sort a Binary Tree by Level
  • 1387. Sort Integers by The Power Value
  • 2090. K Radius Subarray Averages
  • 2545. Sort the Students by Their Kth Score