线程池相关知识

ThreadLocal

解决让每个线程都有自己的专属本地变量的问题,类似于并行处理课程中的 Pthread 的本地内存

ThreadLocal 类允许每个线程绑定自己的值,可以将其形象地比喻为一个“存放数据的盒子”。每个线程都有自己独立的盒子,用于存储私有数据,确保不同线程之间的数据互不干扰。

我个人认为这个视频讲清楚了:

像是所谓的比较的完整的资料(作为看 javaguide 看不懂的备用资料即可,因为根本看不完的,时间为上)

看这个基本就能懂了。

可能会被问到的问题有:

ThreadLocal 的原理

原理类似于:
../../../../ZZZ-Misc/Z-Attachment/images/Pasted image 20250113203923.png|450

ThreadLocal 自动清理机制与扩容原理深度解析


一、自动清理触发点原理分析

ThreadLocalMap 通过两个核心方法 replaceStaleEntryexpungeStaleEntry 实现过期 Entry 的自动清理,避免内存泄漏。


1. replaceStaleEntry 方法

触发场景:在 set() 方法中发现当前槽位的 Entry 已过期(Entry.key == null),需要替换旧值并清理相邻的过期 Entry。

执行流程

1. 定位过期槽位(staleSlot)
2. 向前扫描:找到最前端的连续过期 Entry 的起始位置(slotToExpunge)
3. 向后扫描:找到第一个非空 Entry 或数组末尾
   - 若找到匹配的 key,直接替换 value
   - 若找到新的过期 Entry,更新 slotToExpunge
4. 清理起始位置到当前槽位的过期 Entry
5. 调用 expungeStaleEntry 进行深度清理

代码逻辑(简化版):

private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // 向前扫描找到第一个过期 Entry 的位置
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) {
        if (e.get() == null) slotToExpunge = i;
    }

    // 向后扫描查找可替换的 Entry
    for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // 找到匹配的 key,替换 value 并交换槽位
        if (k == key) {
            e.value = value;
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // 若向前扫描未找到其他过期 Entry,更新清理起点
            if (slotToExpunge == staleSlot) slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // 发现新的过期 Entry,更新清理起点
        if (k == null && slotToExpunge == staleSlot) {
            slotToExpunge = i;
        }
    }

    // 未找到匹配 key,直接替换当前槽位
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // 触发深度清理
    if (slotToExpunge != staleSlot) {
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }
}

2. expungeStaleEntry 方法

触发场景:在 get()remove() 中发现过期 Entry,需要清理并重新哈希有效 Entry。

执行流程

1. 清理指定槽位的过期 Entry
2. 向后扫描直到遇到空槽位:
   - 清理所有遇到的过期 Entry
   - 对有效 Entry 重新哈希定位
3. 返回下一个空槽位的位置

代码逻辑(简化版):

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 清理当前槽位
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    Entry e;
    int i;
    // 向后扫描清理并重新哈希
    for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        if (k == null) { // 过期 Entry
            e.value = null;
            tab[i] = null;
            size--;
        } else { // 有效 Entry
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) { // 需要重新哈希
                tab[i] = null;
                while (tab[h] != null) h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i; // 返回下一个空槽位
}

二、扩容机制原理分析

ThreadLocalMap 的扩容策略是先清理后扩容,确保扩容时仅保留有效 Entry。


1. 扩容触发条件
初始容量 = 16
扩容阈值 = 初始容量 * 2/3 = 10
触发条件:size >= threshold
2. 扩容流程
1. 全量清理过期 Entry(调用 expungeStaleEntries)
2. 若清理后 size >= threshold * 3/4,执行扩容
3. 容量翻倍(newLen = oldLen * 2)
4. 重新哈希所有有效 Entry 到新数组

代码逻辑(简化版):

private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2; // 容量翻倍
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    // 重新哈希有效 Entry
    for (int j = 0; j < oldLen; j++) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) { // 跳过过期 Entry
                e.value = null; 
            } else {
                // 计算新位置
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null) h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    // 更新阈值
    setThreshold(newLen);
    size = count;
    table = newTab;
}

3. 重新哈希冲突解决

三、设计意义与性能权衡

机制 优点 代价
弱引用键 + 自动清理 减少内存泄漏风险 增加 set / get 的时间复杂度
惰性清理(Lazy) 避免全局扫描的开销 可能残留部分过期 Entry
先清理后扩容 确保扩容仅针对有效数据 扩容成本较高
线性探测法 实现简单,缓存友好 哈希冲突时性能下降

四、完整清理流程图解

         [set()/get() 触发]
                 |
                 v
        发现过期 Entry(key=null)
                 |
         +-------+-------+
         |               |
         v               v
 replaceStaleEntry   expungeStaleEntry
 (替换并局部清理)      (深度清理)
         |               |
         +-------+-------+
                 |
                 v
           cleanSomeSlots
        (启发式清理后续槽位)
                 |
                 v
            [清理完成]

五、关键结论

  1. 内存泄漏防护

    • 自动清理机制可回收弱引用键已失效的 Entry,但 value 的强引用仍需依赖 remove() 显式清除。
  2. 性能优化

    • 惰性清理避免了全表扫描,但极端情况下可能残留过期 Entry。
    • 2 倍扩容平衡了空间利用率与重新哈希成本。
  3. 开发者责任

    • 必须显式调用 remove()(尤其是线程池场景),这是自动清理机制无法替代的。

导致的内存泄露问题

内存泄漏问题调用 remove 方法即可,必要时,使用 try... finally 的方式防止内存泄漏

跨线程传递 ThreadLocal 的值

使用代码来说明:

对于 InheritableThreadLocal :允许子线程继承父线程的 ThreadLocal 值。当创建子线程时,父线程的 InheritableThreadLocal 值会被复制到子线程中。

public class InheritableThreadLocalExample {
    private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();

    public static void main(String[] args) {
        // 在主线程中设置 InheritableThreadLocal 的值
        inheritableThreadLocal.set("Hello from parent thread");

        // 创建子线程
        Thread childThread = new Thread(() -> {
            // 在子线程中获取 InheritableThreadLocal 的值
            String value = inheritableThreadLocal.get();
            System.out.println("Child thread value: " + value);
        });

        // 启动子线程
        childThread.start();

        // 等待子线程执行完毕
        try {
            childThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

对于 TransmittableThreadLocal (阿里巴巴通过装饰器模式增强了 InheritableThreadLocal,确保在线程池中也能正确传递 ThreadLocal 值。)

TODO:这里的代码涉及到了线程池,之后再看

import com.alibaba.ttl.TransmittableThreadLocal;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TransmittableThreadLocalExample {
    private static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();

    public static void main(String[] args) {
        // 在主线程中设置 TransmittableThreadLocal 的值
        transmittableThreadLocal.set("Hello from parent thread");

        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 提交任务到线程池
        executorService.submit(() -> {
            // 在子线程中获取 TransmittableThreadLocal 的值
            String value = transmittableThreadLocal.get();
            System.out.println("Child thread value: " + value);
        });

        // 关闭线程池
        executorService.shutdown();
    }
}

线程池

管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务

创建线程池的方式 :

线程池常见参数有哪些 ?如何解释?

public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务

若使用了 allowCoreThreadTimeOut(boolean value),将设置为 true 之后,则就可以回收核心线程了,时间间隔仍然由 keepAliveTime 决定

对于拒绝策略而言 :

当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时

对于 callerRunsPolicy,不会丢弃掉,可以让所有任务都得到执行。则这种拒绝策略有什么风险呢?如何解决呢?

风险:对于被放入主线程的运行任务,有可能会让主线程等待很久才能执行完,这会导致后面的线程无法及时执行。
解决方法:调整阻塞队列的大小和最大线程数的大小。

出现这个问题的本质就是我们不希望任何一个任务被丢弃,如果服务器资源达到了极限,则需要更换调度策略(使得保证任务不被丢弃且在服务器有余力时及时处理),那么采用什么调度策略呢?

任务持久化:几种方式:

对于第一个方式:

对于 Netty 的处理方法:直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控
对于 ActiveMQ 的处理方法:尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:

对于常用的阻塞队列:

线程池处理任务的流程

../../../../ZZZ-Misc/Z-Attachment/images/Pasted image 20250117213541.png

线程池在提交任务前,可以提前创建线程吗?

线程池中线程异常后,销毁还是复用?

这种设计允许 submit()提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而 execute()则适用于那些不需要关注执行结果的场景。

如何给线程命名

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

动态修改线程池参数

使用:

对于修改线程参数的示例:

import java.util.concurrent.*;

public class DynamicThreadPoolExample {
    public static void main(String[] args) throws InterruptedException {
        // 初始参数
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler
        );

        // 提交初始任务
        for (int i = 1; i <= 6; i++) {
            final int taskId = i;
            try {
                executor.submit(() -> {
                    System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000); // 模拟任务执行时间
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            } catch (RejectedExecutionException e) {
                System.out.println("Task " + taskId + " was rejected");
            }
        }

        // 动态调整线程池参数
        Thread.sleep(3000); // 等待一段时间
        System.out.println("Adjusting thread pool parameters...");

        // 增加核心线程数和最大线程数
        executor.setCorePoolSize(4);
        executor.setMaximumPoolSize(6);

        // 提交更多任务
        for (int i = 7; i <= 12; i++) {
            final int taskId = i;
            try {
                executor.submit(() -> {
                    System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000); // 模拟任务执行时间
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            } catch (RejectedExecutionException e) {
                System.out.println("Task " + taskId + " was rejected");
            }
        }

        // 关闭线程池
        executor.shutdown();
    }
}

修改后会将修改后的参数生效,进行后续的任务

如何设计一个动态线程池?

那么如何设计一个动态线程池呢?

(面试中可能会被问到)

deepseek 的代码:

代码:ThreadPool/DynamicThreadPool

需要自定义 ResizableCapacityLinkedBlockingQueue

class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
    private final AtomicInteger capacity; // 使用 AtomicInteger 来支持动态调整容量

    public ResizableCapacityLinkedBlockingQueue(int capacity) {
        super(capacity); // 初始化队列容量
        this.capacity = new AtomicInteger(capacity);
    }

    public synchronized void setCapacity(int newCapacity) {
        if (newCapacity < size()) {
            throw new IllegalArgumentException("New capacity cannot be less than current size");
        }
        this.capacity.set(newCapacity);
    }

    public int getCapacity() {
        return capacity.get();
    }

    @Override
    public boolean offer(E e) {
        // 检查当前队列大小是否超过容量
        if (size() >= capacity.get()) {
            return false; // 队列已满,拒绝添加
        }
        return super.offer(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        // 检查当前队列大小是否超过容量
        if (size() >= capacity.get()) {
            return false; // 队列已满,拒绝添加
        }
        return super.offer(e, timeout, unit);
    }

    @Override
    public void put(E e) throws InterruptedException {
        // 检查当前队列大小是否超过容量
        while (size() >= capacity.get()) {
            Thread.yield(); // 等待队列有空闲空间
        }
        super.put(e);
    }
}

如何设计一个根据任务的优先级执行的线程池?

有两种方式:

潜在的问题:

解决办法:(对应上面的问题)

Future

Future,AQS 的内容参考 java并发-概念,其他的主要参考 java相关