线程池相关知识
ThreadLocal
解决让每个线程都有自己的专属本地变量的问题,类似于并行处理课程中的 Pthread 的本地内存
ThreadLocal
类允许每个线程绑定自己的值,可以将其形象地比喻为一个“存放数据的盒子”。每个线程都有自己独立的盒子,用于存储私有数据,确保不同线程之间的数据互不干扰。
我个人认为这个视频讲清楚了:
- ThreadLocal实现原理与内存泄漏问题_哔哩哔哩_bilibili
- 有一个人写了相关的笔记,在电脑的:
E:\Download_copy\IDM_Download\JavaNote-main
文件夹中,可以参考 - 黑马的笔记:
E:\Download_copy\IDM_Download\并发编程笔记
像是所谓的比较的完整的资料(作为看 javaguide 看不懂的备用资料即可,因为根本看不完的,时间为上)
看这个基本就能懂了。
可能会被问到的问题有:
- ThreadLocal 的原理
- 导致的内存泄露问题
- 如何跨线程传递 ThreadLocal 的值?
ThreadLocal 的原理
原理类似于:
ThreadLocal 自动清理机制与扩容原理深度解析
一、自动清理触发点原理分析
ThreadLocalMap 通过两个核心方法 replaceStaleEntry
和 expungeStaleEntry
实现过期 Entry 的自动清理,避免内存泄漏。
1. replaceStaleEntry
方法
触发场景:在 set()
方法中发现当前槽位的 Entry 已过期(Entry.key == null
),需要替换旧值并清理相邻的过期 Entry。
执行流程:
1. 定位过期槽位(staleSlot)
2. 向前扫描:找到最前端的连续过期 Entry 的起始位置(slotToExpunge)
3. 向后扫描:找到第一个非空 Entry 或数组末尾
- 若找到匹配的 key,直接替换 value
- 若找到新的过期 Entry,更新 slotToExpunge
4. 清理起始位置到当前槽位的过期 Entry
5. 调用 expungeStaleEntry 进行深度清理
代码逻辑(简化版):
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// 向前扫描找到第一个过期 Entry 的位置
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) {
if (e.get() == null) slotToExpunge = i;
}
// 向后扫描查找可替换的 Entry
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 找到匹配的 key,替换 value 并交换槽位
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 若向前扫描未找到其他过期 Entry,更新清理起点
if (slotToExpunge == staleSlot) slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 发现新的过期 Entry,更新清理起点
if (k == null && slotToExpunge == staleSlot) {
slotToExpunge = i;
}
}
// 未找到匹配 key,直接替换当前槽位
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// 触发深度清理
if (slotToExpunge != staleSlot) {
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
}
2. expungeStaleEntry
方法
触发场景:在 get()
或 remove()
中发现过期 Entry,需要清理并重新哈希有效 Entry。
执行流程:
1. 清理指定槽位的过期 Entry
2. 向后扫描直到遇到空槽位:
- 清理所有遇到的过期 Entry
- 对有效 Entry 重新哈希定位
3. 返回下一个空槽位的位置
代码逻辑(简化版):
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 清理当前槽位
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
Entry e;
int i;
// 向后扫描清理并重新哈希
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) { // 过期 Entry
e.value = null;
tab[i] = null;
size--;
} else { // 有效 Entry
int h = k.threadLocalHashCode & (len - 1);
if (h != i) { // 需要重新哈希
tab[i] = null;
while (tab[h] != null) h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i; // 返回下一个空槽位
}
二、扩容机制原理分析
ThreadLocalMap 的扩容策略是先清理后扩容,确保扩容时仅保留有效 Entry。
1. 扩容触发条件
初始容量 = 16
扩容阈值 = 初始容量 * 2/3 = 10
触发条件:size >= threshold
2. 扩容流程
1. 全量清理过期 Entry(调用 expungeStaleEntries)
2. 若清理后 size >= threshold * 3/4,执行扩容
3. 容量翻倍(newLen = oldLen * 2)
4. 重新哈希所有有效 Entry 到新数组
代码逻辑(简化版):
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2; // 容量翻倍
Entry[] newTab = new Entry[newLen];
int count = 0;
// 重新哈希有效 Entry
for (int j = 0; j < oldLen; j++) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) { // 跳过过期 Entry
e.value = null;
} else {
// 计算新位置
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null) h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 更新阈值
setThreshold(newLen);
size = count;
table = newTab;
}
3. 重新哈希冲突解决
- 线性探测法:新位置冲突时,依次查找下一个空槽位。
- 探测步长:固定为 1(
nextIndex = (i + 1) % len
)。
三、设计意义与性能权衡
机制 | 优点 | 代价 |
---|---|---|
弱引用键 + 自动清理 | 减少内存泄漏风险 | 增加 set / get 的时间复杂度 |
惰性清理(Lazy) | 避免全局扫描的开销 | 可能残留部分过期 Entry |
先清理后扩容 | 确保扩容仅针对有效数据 | 扩容成本较高 |
线性探测法 | 实现简单,缓存友好 | 哈希冲突时性能下降 |
四、完整清理流程图解
[set()/get() 触发]
|
v
发现过期 Entry(key=null)
|
+-------+-------+
| |
v v
replaceStaleEntry expungeStaleEntry
(替换并局部清理) (深度清理)
| |
+-------+-------+
|
v
cleanSomeSlots
(启发式清理后续槽位)
|
v
[清理完成]
五、关键结论
-
内存泄漏防护:
- 自动清理机制可回收弱引用键已失效的 Entry,但 value 的强引用仍需依赖
remove()
显式清除。
- 自动清理机制可回收弱引用键已失效的 Entry,但 value 的强引用仍需依赖
-
性能优化:
- 惰性清理避免了全表扫描,但极端情况下可能残留过期 Entry。
- 2 倍扩容平衡了空间利用率与重新哈希成本。
-
开发者责任:
- 必须显式调用
remove()
(尤其是线程池场景),这是自动清理机制无法替代的。
- 必须显式调用
导致的内存泄露问题
内存泄漏问题调用 remove
方法即可,必要时,使用 try... finally
的方式防止内存泄漏
跨线程传递 ThreadLocal 的值
使用代码来说明:
对于 InheritableThreadLocal
:允许子线程继承父线程的 ThreadLocal
值。当创建子线程时,父线程的 InheritableThreadLocal
值会被复制到子线程中。
- 每个线程内部维护一个
ThreadLocalMap
,用于存储ThreadLocal
的值。InheritableThreadLocal
会在创建子线程时,将父线程的InheritableThreadLocal
值复制到子线程的ThreadLocalMap
中。
public class InheritableThreadLocalExample {
private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) {
// 在主线程中设置 InheritableThreadLocal 的值
inheritableThreadLocal.set("Hello from parent thread");
// 创建子线程
Thread childThread = new Thread(() -> {
// 在子线程中获取 InheritableThreadLocal 的值
String value = inheritableThreadLocal.get();
System.out.println("Child thread value: " + value);
});
// 启动子线程
childThread.start();
// 等待子线程执行完毕
try {
childThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
对于 TransmittableThreadLocal
(阿里巴巴通过装饰器模式增强了 InheritableThreadLocal
,确保在线程池中也能正确传递 ThreadLocal
值。)
TODO:这里的代码涉及到了线程池,之后再看
import com.alibaba.ttl.TransmittableThreadLocal;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TransmittableThreadLocalExample {
private static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();
public static void main(String[] args) {
// 在主线程中设置 TransmittableThreadLocal 的值
transmittableThreadLocal.set("Hello from parent thread");
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交任务到线程池
executorService.submit(() -> {
// 在子线程中获取 TransmittableThreadLocal 的值
String value = transmittableThreadLocal.get();
System.out.println("Child thread value: " + value);
});
// 关闭线程池
executorService.shutdown();
}
}
线程池
管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。如何创建线程池?
创建线程池的方式 :
- 通过
ThreadPoolExecutor
构造函数来创建(推荐) - 通过
Executor
框架的工具类Executors
来创建。(可以创建不同类型的线程池)FixedThreadPool
,线程数量确定,若有多余的任务且没空闲的线程,则会被暂存在任务队列中等到空闲时再处理其中的任务。- 有界阻塞队列是
LinkedBlockingQueue
- 有界阻塞队列是
SingleThreadExecutor
:和上面的区别是这个线程池的线程数量只有一个,其他的一样CachedThreadPool
:线程数量不确定,但有可以复用的线程时优先使用可以复用的线程,若满了,则创建新的线程处理任务- 同步队列
SynchronousQueue
- 同步队列
ScheduledThreadPool
:给定的延迟后运行任务或者定期执行任务的线程池。- 无界的延迟阻塞队列
DelayedWorkQueue
- 无界的延迟阻塞队列
线程池常见参数有哪些 ?如何解释?
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
若使用了
allowCoreThreadTimeOut(boolean value)
,将设置为true
之后,则就可以回收核心线程了,时间间隔仍然由keepAliveTime
决定
对于拒绝策略而言 :
当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.callerRunsPolicy
:调用执行者自己的线程运行任务,也就是直接在调用execute
方法的线程中运行( run )被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果你的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃掉。ThreadPoo1Executor.DiscardoldestPolicy
:此策略将丢弃最早的未处理的任务请求,
对于 callerRunsPolicy
,不会丢弃掉,可以让所有任务都得到执行。则这种拒绝策略有什么风险呢?如何解决呢?
风险:对于被放入主线程的运行任务,有可能会让主线程等待很久才能执行完,这会导致后面的线程无法及时执行。
解决方法:调整阻塞队列的大小和最大线程数的大小。
出现这个问题的本质就是我们不希望任何一个任务被丢弃,如果服务器资源达到了极限,则需要更换调度策略(使得保证任务不被丢弃且在服务器有余力时及时处理),那么采用什么调度策略呢?
任务持久化:几种方式:
- 设计一张任务表将任务存储到 MySQL 数据库中。
- Redis 缓存任务。
- 将任务提交到消息队列中。
对于第一个方式:
- 实现
RejectedExecutionHandler
接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中) - 继承
BlockingQueue
实现一个混合式阻塞队列,该队列包含 JDK 自带的 ArrayBlockingQueue。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()
方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从ArrayBlockingQueue
中去取任务。
对于 Netty 的处理方法:直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控
对于 ActiveMQ 的处理方法:尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:
对于常用的阻塞队列:
- 容量为 Integer.MAX_VALUE 的
LinkedBlockingQueue
(有界阻塞队列):FixedThreadPool
和singleThreadExecutor
。FixedThreadPoo1 最多只能创建核心线程数的线程(核心线程数和最大线程数相等),singleThreadExecutor
只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。 synchronousqueue
(同步队列):cachedThreadPool
。synchronousqueue
没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,cachedThreadPool
的最大线程数是 Integer.MAX_VALUE,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。DelayedWorkQueue
(延队列):scheduledThreadPool
和singleThreadscheduledExecutor
Delayedworkqueue
的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。Delayedworkqueue
添加元素满了之后会自动扩容,增加原来容量的 50%,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。ArrayBlockingqueue
(有界阻塞队列):底层由数组实现,容量一旦创建,就不能修改
线程池处理任务的流程
线程池在提交任务前,可以提前创建线程吗?
prestartCoreThread()
:启动一个线程,等待任务,如果已达到核心线程数,这个方法返回 false,否则返回 true;prestartAllCoreThreads()
:启动所有的核心线程,并返回启动成功的核心线程数。
线程池中线程异常后,销毁还是复用?
- 使用
execute()
提交任务:当任务通过execute()
提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。 - 使用
submit()
提交任务:对于通过submit()
提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()
返回的Future
对象中。当调用Future.get()
方法时,可以捕获到一个ExecutionException
。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。
这种设计允许 submit()
提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而 execute()
则适用于那些不需要关注执行结果的场景。
如何给线程命名
- 利用
guava
的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
- 自定义
ThreadFactory
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final String name;
/**
* 创建一个带名字的线程池生产工厂
*/
public NamingThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
动态修改线程池参数
使用:
setCorePoolSize(int corePoolSize)
:设置核心线程数。setMaximumPoolSize(int maximumPoolSize)
:设置最大线程数。setKeepAliveTime(long time, TimeUnit unit)
:设置非核心线程的空闲存活时间。allowCoreThreadTimeOut(boolean value)
:设置是否允许核心线程在空闲时被回收。
对于修改线程参数的示例:
import java.util.concurrent.*;
public class DynamicThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
// 初始参数
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
// 提交初始任务
for (int i = 1; i <= 6; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " was rejected");
}
}
// 动态调整线程池参数
Thread.sleep(3000); // 等待一段时间
System.out.println("Adjusting thread pool parameters...");
// 增加核心线程数和最大线程数
executor.setCorePoolSize(4);
executor.setMaximumPoolSize(6);
// 提交更多任务
for (int i = 7; i <= 12; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " was rejected");
}
}
// 关闭线程池
executor.shutdown();
}
}
修改后会将修改后的参数生效,进行后续的任务
如何设计一个动态线程池?
那么如何设计一个动态线程池呢?
(面试中可能会被问到)
deepseek 的代码:
代码:ThreadPool/DynamicThreadPool
需要自定义 ResizableCapacityLinkedBlockingQueue
class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private final AtomicInteger capacity; // 使用 AtomicInteger 来支持动态调整容量
public ResizableCapacityLinkedBlockingQueue(int capacity) {
super(capacity); // 初始化队列容量
this.capacity = new AtomicInteger(capacity);
}
public synchronized void setCapacity(int newCapacity) {
if (newCapacity < size()) {
throw new IllegalArgumentException("New capacity cannot be less than current size");
}
this.capacity.set(newCapacity);
}
public int getCapacity() {
return capacity.get();
}
@Override
public boolean offer(E e) {
// 检查当前队列大小是否超过容量
if (size() >= capacity.get()) {
return false; // 队列已满,拒绝添加
}
return super.offer(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 检查当前队列大小是否超过容量
if (size() >= capacity.get()) {
return false; // 队列已满,拒绝添加
}
return super.offer(e, timeout, unit);
}
@Override
public void put(E e) throws InterruptedException {
// 检查当前队列大小是否超过容量
while (size() >= capacity.get()) {
Thread.yield(); // 等待队列有空闲空间
}
super.put(e);
}
}
如何设计一个根据任务的优先级执行的线程池?
有两种方式:
- 提交到线程池的任务实现
Comparable
接口,并重写compareTo
方法来指定任务之间的优先级比较规则。 - 创建
PriorityBlockingQueue
时传入一个Comparator
对象来指定任务之间的排序规则(推荐)。
潜在的问题:
PriorityBlockingQueue
是无界的,可能堆积大量的请求,从而导致 OOM。- 可能会导致饥饿问题,即低优先级的任务长时间得不到执行。
- 由于需要对队列中的元素进行排序操作以及保证线程安全(并发控制采用的是可重入锁
ReentrantLock
),因此会降低性能。
解决办法:(对应上面的问题)
- 继承 PriorityBlockingQueue 并重写一下 offer 方法(入队)的逻辑,当插入的元素数量超过指定值就返回 false 。
- 通过优化设计来解决(比较麻烦),比如等待时间过长的任务会被移除并重新添加到队列中,但是优先级会被提升。
- ()对于性能方面的影响,是没办法避免的,毕竟需要对任务进行排序操作。并且,对于大部分业务场景来说,这点性能影响是可以接受的。