java并发-概念
Future
📌 比喻理解:
想象你去快餐店点餐,收银员给你一张取餐小票(这就是 Future)。你可以:
- 继续玩手机(主线程做其他事)
- 时不时问"餐好了吗?"(检查任务是否完成)
- 等餐好了凭小票取餐(获取结果)
🔧 核心概念:
Future
是表示异步计算结果的票据Callable
是能返回结果的任务(类似 Runnable 但能返回值)ExecutorService
是管理线程的"后厨团队"
🛠️ 代码示例:
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws Exception {
// 创建线程池(后厨团队)
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交任务(下单)
Future<String> future = executor.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(2000); // 模拟2秒烹饪时间
return "汉堡套餐";
}
});
System.out.println("等待餐点时,我去买饮料...");
// 获取结果(凭小票取餐)
String meal = future.get(); // 这里会阻塞直到结果就绪
System.out.println("取到餐点:" + meal);
executor.shutdown(); // 关闭线程池
}
}
💡 关键方法解释:
4. future.get()
:等待结果(无限等待)
5. future.get(1, TimeUnit.SECONDS)
:最多等待 1 秒
6. future.isDone()
:检查任务是否完成
7. future.cancel(true)
:尝试取消任务
🌟 同步 vs 异步对比:
// 同步方式(自己做饭)
public String cookMeal() {
// 耗时操作...
return "餐点";
}
// 异步方式(让厨房做)
Future<String> future = executor.submitcookMeal;
⚠️ 注意事项:
8. 调用 get()
会阻塞当前线程
9. 必须关闭线程池(shutdown()
)
10. 处理可能的异常(InterruptedException
, ExecutionException
)
🔄 更现代的替代方案:
对于复杂场景,可以使用 CompletableFuture
(支持链式调用):
CompletableFuture.supplyAsync(() -> "汉堡套餐")
.thenAccept(meal -> System.out.println("享受 " + meal));
📚 总结要点:
11. Future 就像取餐小票,代表未来的结果
12. 主线程可以继续做其他事情
13. 适合处理耗时操作(网络请求/复杂计算)
14. 通过线程池管理执行线程
建议尝试修改示例中的等待时间(如改成 5 秒),观察程序行为变化,可以帮助更直观地理解异步执行机制。
CompletableFuture
📌 CompletableFuture 是什么?
CompletableFuture
是 Java 8 引入的增强版 Future
,它解决了传统 Future 的三大痛点:
15. 无法手动完成任务(传统 Future 只能被动等待结果)
16. 缺乏链式调用能力
17. 难以组合多个异步任务
可以理解为异步编程的瑞士军刀,支持非阻塞的流水线式操作,完美融合函数式编程思想。
🧩 核心特性全景图
CompletableFuture
|
├── 异步任务创建 ─── 任务链式处理 ─── 多任务组合 ─── 异常处理
| | | | |
| | | | └─ exceptionally()
| | | ├─ allOf()
| | | └─ anyOf()
| | ├─ thenApply()
| | ├─ thenCompose()
| | └─ thenAccept()
├─ supplyAsync()
└─ runAsync()
🛠️ 7 大核心用法详解
我们通过一个外卖订餐流程的案例贯穿所有示例(假设每个步骤都是耗时操作):
1️⃣ 基础异步任务创建
// 创建订单(返回订单号)
CompletableFuture<String> createOrder = CompletableFuture.supplyAsync(() -> {
System.out.println("餐厅接单中...");
sleep(1); // 模拟1秒延迟
return "ORDER-9527";
});
// 获取订单号结果
String orderId = createOrder.get();
System.out.println("订单号: " + orderId); // 输出: ORDER-9527
2️⃣ 结果转换(thenApply)
// 制作餐品(需要订单号)
CompletableFuture<String> prepareMeal = createOrder.thenApply(order -> {
System.out.println("制作餐品: " + order);
sleep(2);
return "🍔汉堡套餐";
});
String meal = prepareMeal.get();
System.out.println("制作完成: " + meal); // 输出: 🍔汉堡套餐
3️⃣ 结果消费(thenAccept)
// 通知骑手取餐(不需要返回值)
prepareMeal.thenAccept(meal -> {
System.out.println("通知骑手取餐: " + meal);
sleep(1);
});
4️⃣ 任务组合(thenCompose)
// 骑手配送(依赖餐品准备完成)
CompletableFuture<String> delivery = prepareMeal.thenCompose(meal ->
CompletableFuture.supplyAsync(() -> {
System.out.println("骑手配送中: " + meal);
sleep(3);
return "已送达用户";
})
);
System.out.println(delivery.get()); // 输出: 已送达用户
5️⃣ 并行任务组合(thenCombine)
// 同时进行开发票和备餐具(并行执行)
CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
sleep(2);
return "📃发票";
});
CompletableFuture<String> utensils = CompletableFuture.supplyAsync(() -> {
sleep(1);
return "🥢餐具";
});
// 合并两个结果
CompletableFuture<String> packageMeal = invoice.thenCombine(utensils,
(inv, uten) -> "打包完成: " + inv + " + " + uten
);
System.out.println(packageMeal.get()); // 输出: 打包完成: 📃发票 + 🥢餐具
6️⃣ 多任务协调(allOf / anyOf)
// 等所有骑手就绪(allOf)
CompletableFuture<Void> allRiders = CompletableFuture.allOf(
CompletableFuture.runAsync(() -> sleep(2)), // 骑手A
CompletableFuture.runAsync(() -> sleep(3)), // 骑手B
CompletableFuture.runAsync(() -> sleep(1)) // 骑手C
);
allRiders.thenRun(() ->
System.out.println("所有骑手准备就绪!")
);
// 任意骑手接单(anyOf)
CompletableFuture<Object> anyRider = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {sleep(2); return "骑手A";}),
CompletableFuture.supplyAsync(() -> {sleep(1); return "骑手B";})
);
System.out.println("接单骑手: " + anyRider.get()); // 输出: 接单骑手: 骑手B
7️⃣ 异常处理(exceptionally / handle)
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("模拟厨房着火!");
}
return "正常餐品";
}).exceptionally(ex -> {
System.out.println("异常处理: " + ex.getMessage());
return "备用餐品";
}).thenAccept(result ->
System.out.println("最终结果: " + result)
);
// 可能输出:
// 异常处理: java.lang.RuntimeException: 模拟厨房着火!
// 最终结果: 备用餐品
// 或
// 最终结果: 正常餐品
⚙️ 底层机制解析
- 默认线程池:使用
ForkJoinPool.commonPool()
,可通过第二个参数指定自定义线程池 - 完成状态:内部维护
result
和stack
(回调链) - 非阻塞实现:通过 CAS(Compare-And-Swap)操作保证线程安全
🆚 与传统 Future 对比
CompletableFuture | Future | |
---|---|---|
任务组合 | 支持链式/并行组合 | 仅支持单个任务 |
完成控制 | 可手动完成(complete() ) |
只能被动等待 |
异常处理 | 内置异常处理链 | 需在 get()时捕获异常 |
回调机制 | 支持完成回调 | 无 |
线程管理 | 自动使用线程池 | 需手动管理线程池 |
💡 最佳实践建议
- 避免阻塞:尽量使用回调而非
get()
- 指定线程池:重要任务使用独立线程池,防止公共池被占满
- 异常处理前置:在链式开头使用
handle()
统一处理 - 资源释放:在
whenComplete()
中关闭数据库连接等资源 - 超时控制:使用
orTimeout()
防止无限等待
CompletableFuture.supplyAsync(() -> longTimeTask())
.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(ex -> "超时处理结果");
🌟 完整外卖流程示例
public class FoodDelivery {
public static void main(String[] args) {
// 1. 创建订单
CompletableFuture<String> orderFlow = CompletableFuture.supplyAsync(() -> {
sleep(1);
return "ORDER-007";
})
// 2. 并行准备餐品和发票
.thenApplyAsync(orderId -> {
System.out.println("制作餐品: " + orderId);
sleep(2);
return new Meal(orderId, "🍱豪华套餐");
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
sleep(1);
return "📃电子发票";
}), (meal, invoice) -> {
meal.addInvoice(invoice);
return meal;
})
// 3. 调度骑手
.thenComposeAsync(meal ->
CompletableFuture.supplyAsync(() -> {
System.out.println("骑手接单: " + meal);
sleep(3);
return "✅送达完成";
})
)
// 4. 异常处理
.handle((result, ex) -> {
if (ex != null) {
System.out.println("配送异常: " + ex.getMessage());
return "✉️补偿优惠券已发放";
}
return result;
});
System.out.println("最终状态: " + orderFlow.join());
}
static class Meal {
String orderId;
String food;
String invoice;
// 构造方法、addInvoice() 略...
}
static void sleep(int seconds) {
try { TimeUnit.SECONDS.sleep(seconds); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
关键总结:
CompletableFuture
是构建异步流水线的神器- 通过链式调用取代回调地狱
- 组合操作(
thenCombine
/allOf
)实现复杂工作流 - 异常处理贯穿整个生命周期
- 合理使用可使异步代码如同步代码般直观
AQS
AbstractQueuedSynchronizer
📌 用现实世界理解 AQS(AbstractQueuedSynchronizer)
想象你正在银行办理业务,AQS 就是那个管理排队叫号系统的核心机制:
- state 变量 → 当前可用的窗口数量(比如 3 个窗口)
- CLH 队列 → 客户在取号机拿到的排队小票组成的队列
- CAS 操作 → 叫号系统的自动更新逻辑(保证不会重复叫号)
🧩 AQS 核心三要素
组成部分 | 现实比喻 | 技术实现 |
---|---|---|
volatile state | 当前可用窗口数 | 同步状态(如锁的持有次数) |
FIFO 队列 | 客户排队序列 | CLH 变种队列(双向链表) |
CAS 机制 | 叫号系统的原子性更新 | 通过 Unsafe 类实现原子操作 |
🔧 AQS 工作原理流程图
+-------------------+
| Acquire Lock |
+-------------------+
|
v
+---------------------------------+
| 尝试获取锁 (tryAcquire) |
| if (state == 0 && CAS成功) |--成功--> 获得锁
+---------------------------------+
| 失败
v
+---------------------------------+
| 创建 Node 加入队列尾部 |
| (addWaiter) |
+---------------------------------+
|
v
+---------------------------------+
| 进入自旋循环 (acquireQueued) |
| 1. 检查前驱节点是否为头节点 |
| 2. 再次尝试获取锁 |--成功--> 成为头节点
| 3. 失败则挂起线程 (LockSupport) |
+---------------------------------+
🌰 手写简易锁示例(基于 AQS)
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyLock {
private final Sync sync = new Sync();
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取锁
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) { // CAS 操作
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0); // 不需要 CAS,只有持有者能释放
return true;
}
// 是否独占持有
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
使用示例:
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
System.out.println("线程1获得锁");
Thread.sleep(1000);
} finally {
lock.unlock();
System.out.println("线程1释放锁");
}
}).start();
new Thread(() -> {
lock.lock();
try {
System.out.println("线程2获得锁");
} finally {
lock.unlock();
}
}).start();
// 输出:
// 线程1获得锁
// (1秒后)
// 线程1释放锁
// 线程2获得锁
🔍 关键方法解析
方法 | 作用说明 | 类比场景 |
---|---|---|
acquire(int arg) |
获取资源的入口方法 | 客户点击取号机按钮 |
tryAcquire() |
尝试直接获取资源(需子类实现) | 查看是否有空闲窗口 |
addWaiter() |
创建节点加入队列尾部 | 生成排队号码 |
acquireQueued() |
队列中的自旋等待逻辑 | 等待叫号并实时关注窗口状态 |
release(int arg) |
释放资源的入口方法 | 客户办完业务离开窗口 |
⚙️ AQS 在 JDK 中的应用
-
ReentrantLock
- 通过
Sync
内部类继承 AQS state
表示重入次数(0=未锁定,≥1=锁定次数)
- 通过
-
Semaphore
state
表示可用许可证数量- 获取信号量 = 减少 state
- 释放信号量 = 增加 state
-
CountDownLatch
state
初始化设置为计数器的值countDown()
递减 stateawait()
等待 state 变为 0
🆚 AQS 两种模式对比
独占模式 (Exclusive) | 共享模式 (Shared) | |
---|---|---|
资源分配 | 同一时刻只有一个线程能访问 | 允许多个线程同时访问 |
典型实现 | ReentrantLock | Semaphore/CountDownLatch |
方法区别 | tryAcquire/tryRelease | tryAcquireShared/tryReleaseShared |
唤醒策略 | 只唤醒下一个节点 | 传播唤醒后续多个节点 |
💡 深入理解技巧
- 画队列变化图:在纸上画出节点入队/出队过程
- 调试 AQS 源码:在 IDEA 中跟踪
ReentrantLock.lock()
调用链 - 对比不同同步器:比较 Semaphore 和 ReentrantLock 的 AQS 实现差异
- 观察 state 变化:在代码中打印 state 值的变化过程
📊 AQS 类结构图
AbstractQueuedSynchronizer
├── state: int (volatile)
├── head: Node
├── tail: Node
│
├── acquire()
├── release()
│
└── Node (内部类)
├── waitStatus: int
├── prev: Node
├── next: Node
└── thread: Thread
❗ 常见误区澄清
-
误区一:AQS 队列是严格先进先出
- 事实:存在 Condition 队列实现优先级机制
-
误区二:state 只能表示二进制状态
- 事实:state 是 int 类型,可表示复杂状态(如读写锁的读锁计数)
-
误区三:AQS 只能用于锁实现
- 事实:任何需要同步控制的场景都可使用(如自定义屏障)
终极理解秘诀:把 AQS 想象成一个智能的排队管理系统,它通过:
11. 状态管理(state 变量)
12. 排队机制(CLH 队列)
13. 原子操作(CAS)
这三个核心要素,实现了高效、公平的线程调度。下次看到 ReentrantLock.lock()
时,就想象自己在银行取号排队的过程吧!
Semaphore
🔍 Semaphore(信号量)的核心作用
Semaphore 是 Java 并发包中用于控制并发访问资源数量的通行证系统。其核心能力可以用一个现实比喻来理解:
想象一个停车场:
- 总共有 10 个车位(
Semaphore semaphore = new Semaphore(10)
)- 车辆进入需获取停车卡(
semaphore.acquire()
)- 离开时归还停车卡(
semaphore.release()
)- 当车位满时,新车辆必须等待空位
🧩 核心特性全景图
特性 | 说明 | 代码示例 |
---|---|---|
许可证总量控制 | 初始化时设定最大并发量 | new Semaphore(5) |
公平/非公平模式 | 决定获取许可证的顺序 | new Semaphore(5, true) (公平) |
可伸缩许可证数 | 动态调整可用许可证数量 | semaphore.reducePermits(2) |
尝试非阻塞获取 | 立即返回是否获取成功 | tryAcquire() |
超时获取 | 在指定时间内尝试获取 | tryAcquire(3, TimeUnit.SEC) |
共享模式 | 允许多个线程同时访问资源 | 与独占锁形成对比 |
⚙️ 工作原理详解
内部数据结构:
Semaphore
├── Sync extends AbstractQueuedSynchronizer
│ ├── nonfairTryAcquireShared() // 非公平模式尝试获取
│ └── tryReleaseShared() // 释放许可证
├── FairSync // 公平模式实现
└── NonfairSync // 非公平模式实现
获取许可证流程:
- 检查可用许可证数量(
state
变量) - 若剩余许可证 ≥ 请求数 → 通过 CAS 减少许可证数量 → 获取成功
- 若不足 → 进入 AQS 队列等待
🛠️ 六大应用场景与代码示例
1️⃣ 数据库连接池限流
public class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int size) {
semaphore = new Semaphore(size);
pool = new ArrayBlockingQueue<>(size);
for (int i=0; i<size; i++) {
pool.add(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可证
return pool.take();
}
public void releaseConnection(Connection conn) {
pool.offer(conn);
semaphore.release(); // 释放许可证
}
}
2️⃣ 接口限流(QPS 控制)
class RateLimiter {
private final Semaphore semaphore;
public RateLimiter(int qps) {
semaphore = new Semaphore(qps);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() ->
semaphore.release(semaphore.drainPermits()), 1, 1, TimeUnit.SECONDS);
}
public void callApi() throws InterruptedException {
semaphore.acquire();
// 执行API调用...
}
}
3️⃣ 生产者-消费者模式
Semaphore full = new Semaphore(0); // 初始产品数量为0
Semaphore empty = new Semaphore(10); // 缓冲区容量10
Semaphore mutex = new Semaphore(1); // 互斥锁
// 生产者
void produce() throws InterruptedException {
empty.acquire(); // 等待空位
mutex.acquire();
// 生产物品...
mutex.release();
full.release(); // 增加产品计数
}
// 消费者
void consume() throws InterruptedException {
full.acquire(); // 等待产品
mutex.acquire();
// 消费物品...
mutex.release();
empty.release(); // 释放空位
}
4️⃣ 并行任务限流
ExecutorService executor = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(5); // 最大5个并行
for (int i=0; i<100; i++) {
semaphore.acquire(); // 前5个立即获取,后续等待
executor.submit(() -> {
try {
// 执行任务...
} finally {
semaphore.release();
}
});
}
5️⃣ 资源访问控制
class RestrictedResource {
private final Semaphore semaphore = new Semaphore(3);
public void access() throws InterruptedException {
semaphore.acquire();
try {
// 最多3个线程同时访问
// 操作共享资源...
} finally {
semaphore.release();
}
}
}
6️⃣ 流量削峰
Semaphore semaphore = new Semaphore(1000); // 每秒最大处理量
void handleRequest(Request request) {
if (!semaphore.tryAcquire()) {
// 返回"系统繁忙"提示
return;
}
try {
// 处理请求...
} finally {
semaphore.release();
}
}
⚖️ 公平模式 vs 非公平模式对比
特性 | 公平模式 | 非公平模式 |
---|---|---|
获取顺序 | 严格遵循 FIFO 顺序 | 允许插队 |
吞吐量 | 较低 | 较高 |
适用场景 | 要求绝对公平的资源分配 | 高并发场景,优先保证吞吐量 |
实现方式 | 检查是否有前驱节点在等待 | 直接尝试获取许可证 |
代码示例 | new Semaphore(5, true) |
new Semaphore(5, false) (默认) |
💡 最佳实践指南
-
总是使用 try-finally 释放
semaphore.acquire(); try { // 临界区操作... } finally { semaphore.release(); }
-
避免许可证泄漏
- 确保每次 acquire() 都有对应的 release()
- 使用
semaphore.availablePermits()
监控状态
-
动态调整容量
// JDK 17+ semaphore.release(5); // 增加5个许可证 semaphore.reducePermits(3); // 减少3个许可证
-
结合其他同步工具
// 使用Semaphore + ReentrantLock实现复杂控制 Semaphore sem = new Semaphore(5); Lock lock = new ReentrantLock(); void complexOperation() { lock.lock(); try { sem.acquire(); // 组合操作... } finally { lock.unlock(); sem.release(); } }
-
预防死锁
- 设置获取超时时间
if (!semaphore.tryAcquire(3, TimeUnit.SECONDS)) { throw new TimeoutException(); }
❗ 常见问题与解决方案
问题 1:许可证数量超过初始值
原因:多次调用 release()
解决:使用 drainPermits()
重置
int overRelease = semaphore.drainPermits();
semaphore.release(overRelease - maxPermits);
问题 2:线程被意外中断
处理方式:使用可中断方法
try {
semaphore.acquireUninterruptibly();
} catch (InterruptedException e) {
// 处理中断逻辑
}
问题 3:系统吞吐量下降
优化方案:结合非公平模式+批量处理
Semaphore sem = new Semaphore(100, false); // 非公平模式
void batchProcess() {
sem.acquire(10); // 批量获取10个许可证
try {
// 批量处理数据...
} finally {
sem.release(10);
}
}
🆚 与其他同步工具对比
工具 | 特点 | 与 Semaphore 的区别 |
---|---|---|
ReentrantLock | 独占锁,互斥访问 | Semaphore 允许多线程并发访问 |
CountDownLatch | 一次性屏障 | Semaphore 可重复使用 |
CyclicBarrier | 线程相互等待的屏障 | Semaphore 不要求线程间协同 |
ThreadPool | 管理线程生命周期 | Semaphore 只控制并发数量 |
总结:Semaphore 是控制资源并发访问的瑞士军刀,通过许可证机制实现精细化的流量控制,适用于从简单限流到复杂系统设计的各种场景。理解其底层基于 AQS 的实现机制,结合具体业务需求灵活运用不同模式,能显著提升系统稳定性和性能。
CountDownLatch 和 CyclicBarrier 对比
📌 CountDownLatch 与 CyclicBarrier 深度对比
让我们用团队协作场景来理解这两个并发工具的核心差异:
🧩 核心概念对比表
特性 | CountDownLatch(倒计时门闩) | CyclicBarrier(循环屏障) |
---|---|---|
核心用途 | 等待其他线程完成任务 | 协调多个线程同步到某个阶段 |
重用性 | 一次性(计数器归零后失效) | 可重复使用(自动重置计数器) |
计数器方向 | 递减(countDown()) | 递增(await() 增加到达数) |
触发动作 | 无 | 可设置屏障触发时的回调动作 |
异常处理 | 不影响其他线程 | 若线程被中断,所有等待线程抛出异常 |
典型场景 | 主线程等待多个子线程初始化完成 | 多阶段并行计算(如 MapReduce) |
🛠️ 工作原理图解
1. CountDownLatch 流程
主线程启动
↓
初始化 CountDownLatch(3)
↓
启动子线程A → 执行任务 → countDown() [计数器2]
启动子线程B → 执行任务 → countDown() [计数器1]
启动子线程C → 执行任务 → countDown() [计数器0]
↓
主线程 await() 被唤醒
2. CyclicBarrier 流程
线程A到达屏障 → 等待
线程B到达屏障 → 等待
线程C到达屏障 → 触发回调 → 所有线程继续执行
↓
(可重复使用,进入下一轮屏障)
💻 代码示例对比
1. CountDownLatch 示例:主厨等备料完成
public class KitchenDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
// 三个备菜工
new Thread(() -> { prepare("蔬菜"); latch.countDown(); }).start();
new Thread(() -> { prepare("肉类"); latch.countDown(); }).start();
new Thread(() -> { prepare("调料"); latch.countDown(); }).start();
latch.await(); // 主厨等待备料
System.out.println("开始烹饪!");
}
static void prepare(String material) {
try {
Thread.sleep((long)(Math.random() * 1000);
System.out.println(material + " 准备完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
2. CyclicBarrier 示例:多阶段游戏加载
public class GameLoading {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(4,
() -> System.out.println("\n所有玩家准备就绪,开始下一阶段!\n"));
for (int i=0; i<4; i++) {
new Player(barrier).start();
}
}
}
class Player extends Thread {
private final CyclicBarrier barrier;
public Player(CyclicBarrier barrier) {
this.barrier = barrier;
}
public void run() {
try {
for (int stage=1; stage<=3; stage++) {
System.out.println(getName() + " 完成阶段" + stage + "加载");
barrier.await(2, TimeUnit.SECONDS); // 等待其他玩家
}
} catch (Exception e) {
handleException(e);
}
}
void handleException(Exception e) {
// 异常处理逻辑
}
}
⚖️ 关键差异深度解析
1. 状态重置机制
-
CountDownLatch:
// 无法重置,只能新建实例 if (latch.getCount() == 0) { latch = new CountDownLatch(3); }
-
CyclicBarrier:
barrier.reset(); // 自动重置到初始状态
2. 异常传播
- CountDownLatch:某个线程异常不影响其他线程
- CyclicBarrier:若一个线程中断或超时,其他等待线程会抛出:
java.util.concurrent.BrokenBarrierException
3. 超时处理
// CountDownLatch
boolean success = latch.await(1, TimeUnit.SECONDS);
// CyclicBarrier
int index = barrier.await(500, TimeUnit.MILLISECONDS);
🔄 混合使用场景示例
电商订单处理流程:
1. 使用 CountDownLatch 等待所有商品库存校验完成
2. 使用 CyclicBarrier 同步支付服务和物流服务的准备
3. 再次使用 CountDownLatch 确认所有子系统完成最终提交
代码片段:
CountDownLatch stockLatch = new CountDownLatch(items.size());
CyclicBarrier paymentBarrier = new CyclicBarrier(2);
CountDownLatch commitLatch = new CountDownLatch(2);
// 库存校验线程
executor.submit(() -> {
checkStock();
stockLatch.countDown();
});
// 支付服务
executor.submit(() -> {
stockLatch.await();
paymentBarrier.await(); // 与物流服务同步
processPayment();
commitLatch.countDown();
});
// 物流服务
executor.submit(() -> {
stockLatch.await();
paymentBarrier.await(); // 与支付服务同步
arrangeDelivery();
commitLatch.countDown();
});
commitLatch.await();
System.out.println("订单处理完成!");
🚀 最佳实践指南
场景 | 推荐工具 | 理由 |
---|---|---|
主线程等待多个初始化任务完成 | CountDownLatch | 天然的一次性等待机制 |
多阶段并行计算 | CyclicBarrier | 自动重置适合迭代过程 |
需要执行完成回调 | CyclicBarrier | 内置回调函数支持 |
不确定等待线程数 | 都不适用,考虑 Phaser | 动态注册参与者机制更灵活 |
⚠️ 常见陷阱与规避
-
CountDownLatch 的幽灵计数
- 问题:多次调用 countDown()
- 解决:使用 getCount() 检查状态
-
CyclicBarrier 的死锁风险
- 问题:等待线程数超过初始化数量
- 解决:使用
isBroken()
检测屏障状态
-
资源泄漏
- 问题:未正确关闭线程池导致屏障未触发
- 解决:配合使用
awaitTermination()
🌟 总结选择策略
-
选择 CountDownLatch 当:
✅ 需要单次等待机制
✅ 主从线程协作模式
✅ 不关心各个子线程完成顺序 -
选择 CyclicBarrier 当:
✅ 需要多阶段协同
✅ 线程之间是平等协作关系
✅ 需要自动重置或完成回调
两者都基于 AQS 实现,但设计目标不同。理解它们的核心差异,就能像搭积木一样灵活构建并发流程控制!
线程池总结
📚 Java 线程池深度解析
从原理到实战,全面掌握线程池的核心机制与最佳实践。
🧩 线程池核心参数
线程池通过 ThreadPoolExecutor
构造函数配置,以下是核心参数:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(长期存活的线程)
int maximumPoolSize, // 最大线程数(临时线程 = max - core)
long keepAliveTime, // 临时线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
RejectedExecutionHandler handler // 拒绝策略
)
⚙️ 线程池工作流程
任务提交 → 核心线程是否已满?
├─ 未满 → 创建新线程执行
├─ 已满 → 任务入队列
│ ├─ 队列未满 → 等待执行
│ └─ 队列已满 → 创建临时线程(直到max)
│ ├─ 成功 → 执行任务
│ └─ 失败 → 触发拒绝策略
🔧 线程池类型与代码示例
1. FixedThreadPool(固定线程池)
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 适用场景:长期稳定的并发任务
// 特点:无界队列(LinkedBlockingQueue),可能OOM
2. CachedThreadPool(缓存线程池)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 适用场景:大量短期异步任务
// 特点:SynchronousQueue,max=Integer.MAX_VALUE,可能创建过多线程
3. ScheduledThreadPool(定时线程池)
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// 延迟执行
scheduledPool.schedule(() -> System.out.println("5秒后执行"), 5, TimeUnit.SECONDS);
// 固定频率执行
scheduledPool.scheduleAtFixedRate(() ->
System.out.println("每隔3秒执行"), 0, 3, TimeUnit.SECONDS);
4. SingleThreadExecutor(单线程池)
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 适用场景:任务顺序执行
// 特点:保证任务顺序性,无界队列
⚡ 自定义线程池(推荐方式)
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
4, // corePoolSize
8, // maximumPoolSize
30, TimeUnit.SECONDS, // keepAliveTime
new ArrayBlockingQueue<>(100), // 有界队列(推荐!)
new ThreadFactory() { // 自定义线程工厂
private AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "Worker-" + count.incrementAndGet());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
🛡️ 拒绝策略详解
策略类 | 行为 | 适用场景 |
---|---|---|
AbortPolicy(默认) | 抛出 RejectedExecutionException | 严格要求任务必须被处理 |
CallerRunsPolicy | 由提交任务的线程直接执行 | 保证任务不丢失,但可能阻塞主线程 |
DiscardPolicy | 静默丢弃新任务 | 允许丢弃新任务的场景 |
DiscardOldestPolicy | 丢弃队列中最旧的任务,尝试重新提交 | 优先处理新任务 |
自定义拒绝策略示例:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志并降级处理
logger.warn("任务被拒绝: {}", r);
sendToBackupQueue(r);
}
}
📊 线程池监控方法
// 获取活跃线程数
int activeCount = customPool.getActiveCount();
// 获取已完成任务数
long completedCount = customPool.getCompletedTaskCount();
// 获取队列中等待任务数
int queueSize = customPool.getQueue().size();
// 扩展钩子方法(监控任务执行时间)
customPool.beforeExecute((t, r) -> startTime.set(System.currentTimeMillis()));
customPool.afterExecute((r, t) ->
logger.info("任务耗时: {}ms", System.currentTimeMillis() - startTime.get()));
💡 线程池调优指南
线程数设置公式
- CPU 密集型任务:
int coreSize = Runtime.getRuntime().availableProcessors() + 1;
- IO 密集型任务:
int maxSize = Runtime.getRuntime().availableProcessors() * 2;
队列选择策略
队列类型 | 特点 | 适用场景 |
---|---|---|
SynchronousQueue | 直接传递任务,无容量 | CachedThreadPool |
LinkedBlockingQueue | 无界队列(默认) | Fixed/SingleThreadPool |
ArrayBlockingQueue | 有界队列(推荐) | 需要防止资源耗尽 |
PriorityBlockingQueue | 优先级队列 | 任务需要优先级调度 |
⚠️ 常见陷阱与解决方案
问题 | 现象 | 解决方案 |
---|---|---|
任务堆积导致 OOM | 队列使用无界队列 | 改用有界队列 + 合理拒绝策略 |
线程泄漏 | 线程未正确关闭 | 调用 shutdown() + awaitTermination() |
死锁 | 线程互相等待资源 | 避免嵌套提交任务,使用超时机制 |
上下文切换过多 | CPU 利用率低但吞吐量不高 | 减少线程数,合并任务 |
🌰 实战案例:Web 服务器请求处理
// 自定义线程池配置
ThreadPoolExecutor serverPool = new ThreadPoolExecutor(
10, 50, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("WebServer-Worker"),
new ThreadPoolExecutor.AbortPolicy()
);
// 处理请求
void handleRequest(HttpRequest request) {
serverPool.execute(() -> {
try {
processRequest(request); // 实际业务处理
} catch (Exception e) {
logger.error("处理失败", e);
}
});
}
// 优雅关闭
serverPool.shutdown();
if (!serverPool.awaitTermination(30, TimeUnit.SECONDS)) {
List<Runnable> unfinished = serverPool.shutdownNow();
logger.warn("强制终止{}个任务", unfinished.size());
}
🔄 线程池生命周期管理
+----------------+
| Running |
| (接受新任务) |
+---+------------+
| shutdown()
v
+----------------+------------+
| Shutdown |
| (不接收新任务,继续处理队列) |
+---+------------------------+
| shutdownNow()
v
+----------------+------------+
| Terminated |
| (所有线程终止) |
+----------------------------+
🌟 最佳实践总结
- 使用有界队列:防止 OOM
- 合理设置线程数:根据任务类型调整
- 明确线程命名:便于问题排查
- 监控任务队列:设置阈值报警
- 优雅关闭:shutdown() + awaitTermination()
- 异常处理:使用 try-catch 包裹任务代码
- 避免任务嵌套提交:防止死锁
终极建议:在项目中避免直接使用 Executors
快捷方法,而是通过 ThreadPoolExecutor
构造函数显式配置参数,这是《阿里巴巴 Java 开发手册》中的强制要求,可以有效规避资源耗尽风险。
线程池底层原理
📚 Java 线程池底层原理深度解析
从 JVM 层面对线程池的工作机制进行拆解,揭示其高效调度与资源管理的核心设计。
🧩 核心数据结构与状态管理
1. 控制变量 ctl
线程池通过原子整型 ctl
同时管理线程池状态和工作线程数:
// 高3位表示状态,低29位表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 状态定义
private static final int RUNNING = -1 << COUNT_BITS; // 接受新任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,处理队列任务
private static final int STOP = 1 << COUNT_BITS; // 不处理任何任务,中断进行中的任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务终止,准备执行terminated()
private static final int TERMINATED = 3 << COUNT_BITS; // terminated()执行完成
- 状态转换流程:
RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED
2. Worker 内部类
每个工作线程被封装为 Worker
对象,继承 AQS(AbstractQueuedSynchronizer):
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 实际执行线程
Runnable firstTask; // 初始任务(可为null)
Worker(Runnable firstTask) {
setState(-1); // 禁止中断直到runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 核心执行循环
}
// 实现简单不可重入锁
protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) { return compareAndSetState(0, 1); }
protected boolean tryRelease(int unused) { setState(0); return true; }
}
⚙️ 核心执行流程剖析
1. 任务提交入口 execute()
public void execute(Runnable command) {
if (command == null) throw new NPE();
int c = ctl.get();
// 阶段1:尝试创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
c = ctl.get();
}
// 阶段2:尝试入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 创建非核心线程处理队列
}
// 阶段3:尝试创建非核心线程
else if (!addWorker(command, false))
reject(command); // 触发拒绝策略
}
2. 线程创建 addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查状态是否允许创建线程
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 失败重试
}
}
// 创建 Worker 对象并启动线程
Worker w = new Worker(firstTask);
Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
t.start(); // 调用 Worker.run() → runWorker()
}
return true;
}
3. 线程执行循环 runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 表示线程正在工作
// 处理线程中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException | Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 钩子方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出处理
}
}
🔄 任务获取与线程回收
1. getTask()
方法
private Runnable getTask() {
boolean timedOut = false; // 上次poll是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查状态是否允许获取任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程数超过max或超时 → 减少线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 关键点:
- 核心线程通过
take()
阻塞等待新任务 - 非核心线程通过
poll(keepAliveTime)
超时等待 - 超时后返回
null
触发线程回收
- 核心线程通过
2. 线程退出 processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 异常退出需补偿线程数
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 补充新线程(若需要)
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
🛠️ 关键设计思想
1. 无锁化设计
ctl
原子操作:通过 CAS 更新线程数和状态,避免全局锁竞争- Worker 状态管理:每个 Worker 使用独立锁(非重入锁),减少同步范围
2. 资源弹性管理
- 动态扩缩容:根据任务负载自动调整活跃线程数
- 线程复用:通过
runWorker
循环避免频繁创建/销毁线程
3. 优雅退化机制
- 队列缓冲:平衡任务提交速度与处理能力
- 拒绝策略:在系统过载时提供可控的失败处理
⚡ 性能优化细节
1. 避免伪共享(False Sharing)
Worker
对象通过 缓存行填充 优化内存布局:
// JDK 内部实现(简化版)
@sun.misc.Contended
class Worker extends AbstractQueuedSynchronizer {
// 字段布局优化
}
2. 线程创建优化
- 线程工厂(ThreadFactory):允许自定义线程属性(名称、优先级、守护状态)
- 预热核心线程:通过
prestartAllCoreThreads()
提前创建核心线程
3. 队列选择策略
不同队列实现的性能差异:
队列类型 | 吞吐量 | 公平性 | 适用场景 |
---|---|---|---|
SynchronousQueue |
高 | 可选 | 短任务、高并发 |
LinkedBlockingQueue |
中 | 无 | 任务执行时间差异大 |
ArrayBlockingQueue |
低 | 可选 | 需要严格控制队列大小 |
🌰 底层源码级示例
Worker 锁的应用
// 在 runWorker 中通过 lock 标记工作状态
w.lock(); // 获取 Worker 锁
try {
beforeExecute(wt, task);
task.run();
} finally {
w.unlock(); // 释放锁
}
- 作用:防止线程被意外中断时任务状态不一致
状态转换原子性
// tryTerminate() 中的状态转换
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 中断空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
// 转换为 TIDYING 状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
}
}
🚀 总结
Java 线程池通过精妙的状态管理、弹性线程调度和高效的任务处理机制,实现了高并发环境下的资源优化利用。其底层设计体现了以下核心思想:
- 原子操作优先:通过 CAS 减少锁竞争
- 分层管理:Worker 对象封装线程生命周期
- 弹性伸缩:动态调整线程数与任务队列的平衡
- 优雅降级:拒绝策略防止系统过载崩溃
理解这些底层原理,有助于开发者编写更高效可靠的并发代码,并在性能调优时做出精准决策。
小结
以下是对 Java 并发编程核心概念的结构化总结,我们将从底层到高层,用最清晰的方式梳理所有关键概念及其相互关系:
🌐 Java 并发知识体系全景图
Java并发体系
|
+------------------+------------------+
| | |
【线程基础】 【同步机制】 【并发工具】
| | |
├─ 线程生命周期 ├─ 互斥锁 ├─ 线程池
├─ Thread/Runnable ├─ synchronized ├─ Executor框架
├─ 线程状态切换 ├─ ReentrantLock ├─ Fork/Join
├─ 线程优先级 ├─ ReadWriteLock ├─ CompletableFuture
└─ Daemon线程 ├─ 原子类 └─ 并发集合
├─ volatile
└─ 内存模型 (JMM)
一、线程基础
概念 | 核心要点 | 代码示例 |
---|---|---|
线程创建 | 继承 Thread 或实现 Runnable / Callable |
new Thread(() -> {...}).start() |
线程状态 | NEW → RUNNABLE → BLOCKED → WAITING → TIMED_WAITING → TERMINATED | Thread.getState() |
Daemon 线程 | 守护线程(随主线程结束) | thread.setDaemon(true) |
线程优先级 | 1~10 的优先级设置(不保证严格执行) | thread.setPriority(8) |
二、同步机制
1. 互斥锁
工具 | 特点 | 适用场景 |
---|---|---|
synchronized |
JVM 内置锁,自动释放,不可中断 | 简单的代码块同步 |
ReentrantLock |
可重入、可中断、公平锁,需手动释放 | 需要高级功能的复杂同步 |
StampedLock |
乐观读锁,适用于读多写少场景 | 高并发读取操作 |
对比示例:
// synchronized
synchronized(lock) { /* 临界区 */ }
// ReentrantLock
Lock lock = new ReentrantLock();
lock.lock();
try { /* 临界区 */ }
finally { lock.unlock(); }
2. 原子操作
类 | 功能 | 示例方法 |
---|---|---|
AtomicInteger |
原子整型操作 | incrementAndGet() |
AtomicReference |
原子对象引用 | compareAndSet() |
LongAdder |
高并发下性能更好的计数器 | add() , sum() |
原理:基于 CAS (Compare-And-Swap) 实现无锁并发
三、线程通信
机制 | 说明 | 关键方法 |
---|---|---|
wait()/notify() |
对象监视器方法 | 需在同步块内使用 |
Condition |
更灵活的等待/通知机制 | await() , signal() |
BlockingQueue |
线程安全的队列实现 | put() , take() |
典型模式:
// 生产者-消费者模式
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
queue.put(item);
// 消费者
String item = queue.take();
四、线程管理
1. 线程池体系
ExecutorService
├─ ThreadPoolExecutor(标准线程池)
├─ ScheduledThreadPoolExecutor(定时任务)
└─ ForkJoinPool(分治任务)
核心参数:
corePoolSize
: 核心线程数maximumPoolSize
: 最大线程数keepAliveTime
: 空闲线程存活时间workQueue
: 任务队列(ArrayBlockingQueue/LinkedBlockingQueue)handler
: 拒绝策略(AbortPolicy/CallerRunsPolicy 等)
创建方式:
ExecutorService pool = Executors.newFixedThreadPool(4);
2. Future 机制
类 | 特点 |
---|---|
Future |
基础异步结果获取 |
CompletableFuture |
支持链式调用、组合异步任务(Java8+) |
示例:
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenAcceptprintln;
五、并发工具类
工具 | 用途 | 核心方法 |
---|---|---|
CountDownLatch |
等待多个任务完成 | await() , countDown() |
CyclicBarrier |
多线程到达屏障后统一行动 | await() |
Semaphore |
控制并发访问数量 | acquire() , release() |
Phaser |
更灵活的阶段同步(Java7+) | arriveAndAwaitAdvance() |
对比:
CountDownLatch:一次性使用,等待N个任务完成
CyclicBarrier:可重复使用,等待N个线程到达
Semaphore:控制资源访问并发数
六、并发集合
集合类 | 特点 | 实现原理 |
---|---|---|
ConcurrentHashMap |
高并发安全的哈希表 | 分段锁 + CAS |
CopyOnWriteArrayList |
写时复制列表 | 读无锁,写复制整个数组 |
BlockingQueue |
阻塞队列(Array/Linked/Priority) | 锁 + Condition |
ConcurrentLinkedQueue |
无界非阻塞队列 | CAS 实现 |
选型指南:
- 读多写少 →
CopyOnWriteArrayList
- 高并发写入 →
ConcurrentHashMap
- 任务传递 →
LinkedBlockingQueue
七、内存模型(JMM)
核心原则
- 可见性:volatile 保证变量修改的可见性
- 有序性:防止指令重排序(happens-before 原则)
- 原子性:锁和原子类保证操作的原子性
volatile 关键字:
volatile boolean flag = false; // 保证所有线程看到最新值
happens-before 规则:
7. 程序顺序规则
8. 锁规则
9. volatile 变量规则
10. 线程启动规则
11. 传递性规则
八、并发设计模式
模式 | 实现方式 | 应用场景 |
---|---|---|
Thread-Per-Message | 为每个任务创建新线程 | 简单任务处理 |
Worker Thread | 使用线程池管理线程 | 高并发任务处理 |
Producer-Consumer | 通过阻塞队列解耦生产消费 | 数据管道处理 |
Two-Phase Termination | 通过标志位安全终止线程 | 优雅关闭线程 |
🚀 学习路线建议
- 基础阶段
- 掌握线程创建/启动/停止
- 理解 synchronized 和 volatile
- 学会使用基本的并发集合
- 进阶阶段
- 深入线程池配置与调优
- 掌握 Lock/Condition 高级用法
- 学习 AQS 实现原理
- 高阶阶段
- 研究 JMM 内存模型
- 分析 JDK 并发工具源码
- 掌握分布式并发控制
⚠️ 常见陷阱与解决方案
问题 | 解决方案 | |
---|---|---|
死锁 | 按固定顺序获取锁,使用 tryLock 超时机制 | |
线程泄漏 | 正确关闭线程池(shutdown + awaitTermination) | |
上下文切换开销 | 减少同步代码块范围,使用无锁数据结构 | |
伪共享(False Sharing) | 使用 @Contended 注解填充缓存行 |
最终总结:Java 并发体系的核心是安全地管理状态共享与高效利用计算资源。理解这个体系需要从三个维度入手:
15. 线程管理(创建、调度、协作)
16. 共享数据保护(锁、原子类、内存可见性)
17. 并发模式设计(工具类、线程池、队列)
建议通过实际项目中的场景(如接口限流、批量处理、异步任务等)逐步实践这些概念,遇到问题时再回来看这张知识地图,会有更深刻的理解。