java并发-概念

Future

📌 比喻理解
想象你去快餐店点餐,收银员给你一张取餐小票(这就是 Future)。你可以:

  1. 继续玩手机(主线程做其他事)
  2. 时不时问"餐好了吗?"(检查任务是否完成)
  3. 等餐好了凭小票取餐(获取结果)

🔧 核心概念

🛠️ 代码示例

import java.util.concurrent.*;

public class FutureDemo {
    public static void main(String[] args) throws Exception {
        // 创建线程池(后厨团队)
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 提交任务(下单)
        Future<String> future = executor.submit(new Callable<String>() {
            public String call() throws Exception {
                Thread.sleep(2000); // 模拟2秒烹饪时间
                return "汉堡套餐";
            }
        });

        System.out.println("等待餐点时,我去买饮料...");
        
        // 获取结果(凭小票取餐)
        String meal = future.get(); // 这里会阻塞直到结果就绪
        System.out.println("取到餐点:" + meal);

        executor.shutdown(); // 关闭线程池
    }
}

💡 关键方法解释
4. future.get():等待结果(无限等待)
5. future.get(1, TimeUnit.SECONDS):最多等待 1 秒
6. future.isDone():检查任务是否完成
7. future.cancel(true):尝试取消任务

🌟 同步 vs 异步对比

// 同步方式(自己做饭)
public String cookMeal() {
    // 耗时操作...
    return "餐点";
}

// 异步方式(让厨房做)
Future<String> future = executor.submitcookMeal;

⚠️ 注意事项
8. 调用 get() 会阻塞当前线程
9. 必须关闭线程池(shutdown()
10. 处理可能的异常(InterruptedException, ExecutionException

🔄 更现代的替代方案
对于复杂场景,可以使用 CompletableFuture(支持链式调用):

CompletableFuture.supplyAsync(() -> "汉堡套餐")
                .thenAccept(meal -> System.out.println("享受 " + meal));

📚 总结要点
11. Future 就像取餐小票,代表未来的结果
12. 主线程可以继续做其他事情
13. 适合处理耗时操作(网络请求/复杂计算)
14. 通过线程池管理执行线程

建议尝试修改示例中的等待时间(如改成 5 秒),观察程序行为变化,可以帮助更直观地理解异步执行机制。

CompletableFuture

📌 CompletableFuture 是什么?
CompletableFuture 是 Java 8 引入的增强版 Future,它解决了传统 Future 的三大痛点:
15. 无法手动完成任务(传统 Future 只能被动等待结果)
16. 缺乏链式调用能力
17. 难以组合多个异步任务

可以理解为异步编程的瑞士军刀,支持非阻塞的流水线式操作,完美融合函数式编程思想。


🧩 核心特性全景图

                     CompletableFuture
                            |
    ├── 异步任务创建  ─── 任务链式处理  ─── 多任务组合  ─── 异常处理
    |        |               |               |             |
    |        |               |               |             └─ exceptionally()
    |        |               |               ├─ allOf()  
    |        |               |               └─ anyOf()
    |        |               ├─ thenApply() 
    |        |               ├─ thenCompose()
    |        |               └─ thenAccept()
    ├─ supplyAsync() 
    └─ runAsync()

🛠️ 7 大核心用法详解

我们通过一个外卖订餐流程的案例贯穿所有示例(假设每个步骤都是耗时操作):

1️⃣ 基础异步任务创建

// 创建订单(返回订单号)
CompletableFuture<String> createOrder = CompletableFuture.supplyAsync(() -> {
    System.out.println("餐厅接单中...");
    sleep(1); // 模拟1秒延迟
    return "ORDER-9527";
});

// 获取订单号结果
String orderId = createOrder.get(); 
System.out.println("订单号: " + orderId); // 输出: ORDER-9527

2️⃣ 结果转换(thenApply)

// 制作餐品(需要订单号)
CompletableFuture<String> prepareMeal = createOrder.thenApply(order -> {
    System.out.println("制作餐品: " + order);
    sleep(2);
    return "🍔汉堡套餐";
});

String meal = prepareMeal.get();
System.out.println("制作完成: " + meal); // 输出: 🍔汉堡套餐

3️⃣ 结果消费(thenAccept)

// 通知骑手取餐(不需要返回值)
prepareMeal.thenAccept(meal -> {
    System.out.println("通知骑手取餐: " + meal);
    sleep(1);
});

4️⃣ 任务组合(thenCompose)

// 骑手配送(依赖餐品准备完成)
CompletableFuture<String> delivery = prepareMeal.thenCompose(meal -> 
    CompletableFuture.supplyAsync(() -> {
        System.out.println("骑手配送中: " + meal);
        sleep(3);
        return "已送达用户";
    })
);

System.out.println(delivery.get()); // 输出: 已送达用户

5️⃣ 并行任务组合(thenCombine)

// 同时进行开发票和备餐具(并行执行)
CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
    sleep(2);
    return "📃发票";
});

CompletableFuture<String> utensils = CompletableFuture.supplyAsync(() -> {
    sleep(1);
    return "🥢餐具";
});

// 合并两个结果
CompletableFuture<String> packageMeal = invoice.thenCombine(utensils, 
    (inv, uten) -> "打包完成: " + inv + " + " + uten
);

System.out.println(packageMeal.get()); // 输出: 打包完成: 📃发票 + 🥢餐具

6️⃣ 多任务协调(allOf / anyOf)

// 等所有骑手就绪(allOf)
CompletableFuture<Void> allRiders = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> sleep(2)), // 骑手A
    CompletableFuture.runAsync(() -> sleep(3)), // 骑手B
    CompletableFuture.runAsync(() -> sleep(1))  // 骑手C
);

allRiders.thenRun(() -> 
    System.out.println("所有骑手准备就绪!")
);

// 任意骑手接单(anyOf)
CompletableFuture<Object> anyRider = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> {sleep(2); return "骑手A";}),
    CompletableFuture.supplyAsync(() -> {sleep(1); return "骑手B";})
);

System.out.println("接单骑手: " + anyRider.get()); // 输出: 接单骑手: 骑手B

7️⃣ 异常处理(exceptionally / handle)

CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("模拟厨房着火!");
    }
    return "正常餐品";
}).exceptionally(ex -> {
    System.out.println("异常处理: " + ex.getMessage());
    return "备用餐品";
}).thenAccept(result -> 
    System.out.println("最终结果: " + result)
);

// 可能输出:
// 异常处理: java.lang.RuntimeException: 模拟厨房着火!
// 最终结果: 备用餐品
// 或
// 最终结果: 正常餐品

⚙️ 底层机制解析

  1. 默认线程池:使用 ForkJoinPool.commonPool(),可通过第二个参数指定自定义线程池
  2. 完成状态:内部维护 resultstack(回调链)
  3. 非阻塞实现:通过 CAS(Compare-And-Swap)操作保证线程安全

🆚 与传统 Future 对比

CompletableFuture Future
任务组合 支持链式/并行组合 仅支持单个任务
完成控制 可手动完成(complete()) 只能被动等待
异常处理 内置异常处理链 需在 get()时捕获异常
回调机制 支持完成回调
线程管理 自动使用线程池 需手动管理线程池

💡 最佳实践建议

  1. 避免阻塞:尽量使用回调而非 get()
  2. 指定线程池:重要任务使用独立线程池,防止公共池被占满
  3. 异常处理前置:在链式开头使用 handle() 统一处理
  4. 资源释放:在 whenComplete() 中关闭数据库连接等资源
  5. 超时控制:使用 orTimeout() 防止无限等待
CompletableFuture.supplyAsync(() -> longTimeTask())
                .orTimeout(3, TimeUnit.SECONDS)
                .exceptionally(ex -> "超时处理结果");

🌟 完整外卖流程示例

public class FoodDelivery {
    public static void main(String[] args) {
        // 1. 创建订单
        CompletableFuture<String> orderFlow = CompletableFuture.supplyAsync(() -> {
            sleep(1);
            return "ORDER-007";
        })
        // 2. 并行准备餐品和发票
        .thenApplyAsync(orderId -> {
            System.out.println("制作餐品: " + orderId);
            sleep(2);
            return new Meal(orderId, "🍱豪华套餐");
        })
        .thenCombine(CompletableFuture.supplyAsync(() -> {
            sleep(1);
            return "📃电子发票";
        }), (meal, invoice) -> {
            meal.addInvoice(invoice);
            return meal;
        })
        // 3. 调度骑手
        .thenComposeAsync(meal -> 
            CompletableFuture.supplyAsync(() -> {
                System.out.println("骑手接单: " + meal);
                sleep(3);
                return "✅送达完成";
            })
        )
        // 4. 异常处理
        .handle((result, ex) -> {
            if (ex != null) {
                System.out.println("配送异常: " + ex.getMessage());
                return "✉️补偿优惠券已发放";
            }
            return result;
        });

        System.out.println("最终状态: " + orderFlow.join());
    }

    static class Meal {
        String orderId;
        String food;
        String invoice;

        // 构造方法、addInvoice() 略...
    }

    static void sleep(int seconds) {
        try { TimeUnit.SECONDS.sleep(seconds); } 
        catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

关键总结

AQS

AbstractQueuedSynchronizer

📌 用现实世界理解 AQS(AbstractQueuedSynchronizer)
想象你正在银行办理业务,AQS 就是那个管理排队叫号系统的核心机制:


🧩 AQS 核心三要素

组成部分 现实比喻 技术实现
volatile state 当前可用窗口数 同步状态(如锁的持有次数)
FIFO 队列 客户排队序列 CLH 变种队列(双向链表)
CAS 机制 叫号系统的原子性更新 通过 Unsafe 类实现原子操作

🔧 AQS 工作原理流程图

          +-------------------+
          |   Acquire Lock    |
          +-------------------+
                   |
                   v
+---------------------------------+
| 尝试获取锁 (tryAcquire)         |
| if (state == 0 && CAS成功)      |--成功--> 获得锁
+---------------------------------+
                   | 失败
                   v
+---------------------------------+
| 创建 Node 加入队列尾部           |
| (addWaiter)                     |
+---------------------------------+
                   |
                   v
+---------------------------------+
| 进入自旋循环 (acquireQueued)     |
| 1. 检查前驱节点是否为头节点        |
| 2. 再次尝试获取锁                 |--成功--> 成为头节点
| 3. 失败则挂起线程 (LockSupport)   |
+---------------------------------+

🌰 手写简易锁示例(基于 AQS)

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyLock {
    private final Sync sync = new Sync();

    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 尝试获取锁
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) { // CAS 操作
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 尝试释放锁
        protected boolean tryRelease(int arg) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0); // 不需要 CAS,只有持有者能释放
            return true;
        }

        // 是否独占持有
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
}

使用示例

MyLock lock = new MyLock();
new Thread(() -> {
    lock.lock();
    try {
        System.out.println("线程1获得锁");
        Thread.sleep(1000);
    } finally {
        lock.unlock();
        System.out.println("线程1释放锁");
    }
}).start();

new Thread(() -> {
    lock.lock();
    try {
        System.out.println("线程2获得锁");
    } finally {
        lock.unlock();
    }
}).start();

// 输出:
// 线程1获得锁
// (1秒后)
// 线程1释放锁
// 线程2获得锁

🔍 关键方法解析

方法 作用说明 类比场景
acquire(int arg) 获取资源的入口方法 客户点击取号机按钮
tryAcquire() 尝试直接获取资源(需子类实现) 查看是否有空闲窗口
addWaiter() 创建节点加入队列尾部 生成排队号码
acquireQueued() 队列中的自旋等待逻辑 等待叫号并实时关注窗口状态
release(int arg) 释放资源的入口方法 客户办完业务离开窗口

⚙️ AQS 在 JDK 中的应用

  1. ReentrantLock

    • 通过 Sync 内部类继承 AQS
    • state 表示重入次数(0=未锁定,≥1=锁定次数)
  2. Semaphore

    • state 表示可用许可证数量
    • 获取信号量 = 减少 state
    • 释放信号量 = 增加 state
  3. CountDownLatch

    • state 初始化设置为计数器的值
    • countDown() 递减 state
    • await() 等待 state 变为 0

🆚 AQS 两种模式对比

独占模式 (Exclusive) 共享模式 (Shared)
资源分配 同一时刻只有一个线程能访问 允许多个线程同时访问
典型实现 ReentrantLock Semaphore/CountDownLatch
方法区别 tryAcquire/tryRelease tryAcquireShared/tryReleaseShared
唤醒策略 只唤醒下一个节点 传播唤醒后续多个节点

💡 深入理解技巧

  1. 画队列变化图:在纸上画出节点入队/出队过程
  2. 调试 AQS 源码:在 IDEA 中跟踪 ReentrantLock.lock() 调用链
  3. 对比不同同步器:比较 Semaphore 和 ReentrantLock 的 AQS 实现差异
  4. 观察 state 变化:在代码中打印 state 值的变化过程

📊 AQS 类结构图

AbstractQueuedSynchronizer
    ├── state: int (volatile)
    ├── head: Node
    ├── tail: Node
    │
    ├── acquire()
    ├── release()
    │
    └── Node (内部类)
        ├── waitStatus: int
        ├── prev: Node
        ├── next: Node
        └── thread: Thread

常见误区澄清

  1. 误区一:AQS 队列是严格先进先出

    • 事实:存在 Condition 队列实现优先级机制
  2. 误区二:state 只能表示二进制状态

    • 事实:state 是 int 类型,可表示复杂状态(如读写锁的读锁计数)
  3. 误区三:AQS 只能用于锁实现


终极理解秘诀:把 AQS 想象成一个智能的排队管理系统,它通过:
11. 状态管理(state 变量)
12. 排队机制(CLH 队列)
13. 原子操作(CAS)
这三个核心要素,实现了高效、公平的线程调度。下次看到 ReentrantLock.lock() 时,就想象自己在银行取号排队的过程吧!

Semaphore

🔍 Semaphore(信号量)的核心作用
Semaphore 是 Java 并发包中用于控制并发访问资源数量的通行证系统。其核心能力可以用一个现实比喻来理解:

想象一个停车场:


🧩 核心特性全景图

特性 说明 代码示例
许可证总量控制 初始化时设定最大并发量 new Semaphore(5)
公平/非公平模式 决定获取许可证的顺序 new Semaphore(5, true)(公平)
可伸缩许可证数 动态调整可用许可证数量 semaphore.reducePermits(2)
尝试非阻塞获取 立即返回是否获取成功 tryAcquire()
超时获取 在指定时间内尝试获取 tryAcquire(3, TimeUnit.SEC)
共享模式 允许多个线程同时访问资源 与独占锁形成对比

⚙️ 工作原理详解

内部数据结构

Semaphore
├── Sync extends AbstractQueuedSynchronizer
│   ├── nonfairTryAcquireShared() // 非公平模式尝试获取
│   └── tryReleaseShared()        // 释放许可证
├── FairSync                      // 公平模式实现
└── NonfairSync                   // 非公平模式实现

获取许可证流程

  1. 检查可用许可证数量(state 变量)
  2. 若剩余许可证 ≥ 请求数 → 通过 CAS 减少许可证数量 → 获取成功
  3. 若不足 → 进入 AQS 队列等待

🛠️ 六大应用场景与代码示例

1️⃣ 数据库连接池限流

public class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> pool;

    public ConnectionPool(int size) {
        semaphore = new Semaphore(size);
        pool = new ArrayBlockingQueue<>(size);
        for (int i=0; i<size; i++) {
            pool.add(createConnection());
        }
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire(); // 获取许可证
        return pool.take();
    }

    public void releaseConnection(Connection conn) {
        pool.offer(conn);
        semaphore.release(); // 释放许可证
    }
}

2️⃣ 接口限流(QPS 控制)

class RateLimiter {
    private final Semaphore semaphore;
    
    public RateLimiter(int qps) {
        semaphore = new Semaphore(qps);
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> 
            semaphore.release(semaphore.drainPermits()), 1, 1, TimeUnit.SECONDS);
    }

    public void callApi() throws InterruptedException {
        semaphore.acquire();
        // 执行API调用...
    }
}

3️⃣ 生产者-消费者模式

Semaphore full = new Semaphore(0);    // 初始产品数量为0
Semaphore empty = new Semaphore(10); // 缓冲区容量10
Semaphore mutex = new Semaphore(1); // 互斥锁

// 生产者
void produce() throws InterruptedException {
    empty.acquire();  // 等待空位
    mutex.acquire();
    // 生产物品...
    mutex.release();
    full.release();   // 增加产品计数
}

// 消费者
void consume() throws InterruptedException {
    full.acquire();   // 等待产品
    mutex.acquire();
    // 消费物品...
    mutex.release();
    empty.release();  // 释放空位
}

4️⃣ 并行任务限流

ExecutorService executor = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(5); // 最大5个并行

for (int i=0; i<100; i++) {
    semaphore.acquire(); // 前5个立即获取,后续等待
    executor.submit(() -> {
        try {
            // 执行任务...
        } finally {
            semaphore.release();
        }
    });
}

5️⃣ 资源访问控制

class RestrictedResource {
    private final Semaphore semaphore = new Semaphore(3);

    public void access() throws InterruptedException {
        semaphore.acquire();
        try {
            // 最多3个线程同时访问
            // 操作共享资源...
        } finally {
            semaphore.release();
        }
    }
}

6️⃣ 流量削峰

Semaphore semaphore = new Semaphore(1000); // 每秒最大处理量

void handleRequest(Request request) {
    if (!semaphore.tryAcquire()) {
        // 返回"系统繁忙"提示
        return;
    }
    
    try {
        // 处理请求...
    } finally {
        semaphore.release();
    }
}

⚖️ 公平模式 vs 非公平模式对比

特性 公平模式 非公平模式
获取顺序 严格遵循 FIFO 顺序 允许插队
吞吐量 较低 较高
适用场景 要求绝对公平的资源分配 高并发场景,优先保证吞吐量
实现方式 检查是否有前驱节点在等待 直接尝试获取许可证
代码示例 new Semaphore(5, true) new Semaphore(5, false)(默认)

💡 最佳实践指南

  1. 总是使用 try-finally 释放

    semaphore.acquire();
    try {
        // 临界区操作...
    } finally {
        semaphore.release();
    }
    
  2. 避免许可证泄漏

    • 确保每次 acquire() 都有对应的 release()
    • 使用 semaphore.availablePermits() 监控状态
  3. 动态调整容量

    // JDK 17+
    semaphore.release(5);       // 增加5个许可证
    semaphore.reducePermits(3); // 减少3个许可证
    
  4. 结合其他同步工具

    // 使用Semaphore + ReentrantLock实现复杂控制
    Semaphore sem = new Semaphore(5);
    Lock lock = new ReentrantLock();
    
    void complexOperation() {
        lock.lock();
        try {
            sem.acquire();
            // 组合操作...
        } finally {
            lock.unlock();
            sem.release();
        }
    }
    
  5. 预防死锁

    • 设置获取超时时间
    if (!semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
        throw new TimeoutException();
    }
    

常见问题与解决方案

问题 1:许可证数量超过初始值
原因:多次调用 release()
解决:使用 drainPermits() 重置

int overRelease = semaphore.drainPermits();
semaphore.release(overRelease - maxPermits);

问题 2:线程被意外中断
处理方式:使用可中断方法

try {
    semaphore.acquireUninterruptibly();
} catch (InterruptedException e) {
    // 处理中断逻辑
}

问题 3:系统吞吐量下降
优化方案:结合非公平模式+批量处理

Semaphore sem = new Semaphore(100, false); // 非公平模式

void batchProcess() {
    sem.acquire(10); // 批量获取10个许可证
    try {
        // 批量处理数据...
    } finally {
        sem.release(10);
    }
}

🆚 与其他同步工具对比

工具 特点 与 Semaphore 的区别
ReentrantLock 独占锁,互斥访问 Semaphore 允许多线程并发访问
CountDownLatch 一次性屏障 Semaphore 可重复使用
CyclicBarrier 线程相互等待的屏障 Semaphore 不要求线程间协同
ThreadPool 管理线程生命周期 Semaphore 只控制并发数量

总结:Semaphore 是控制资源并发访问的瑞士军刀,通过许可证机制实现精细化的流量控制,适用于从简单限流到复杂系统设计的各种场景。理解其底层基于 AQS 的实现机制,结合具体业务需求灵活运用不同模式,能显著提升系统稳定性和性能。

CountDownLatch 和 CyclicBarrier 对比

📌 CountDownLatch 与 CyclicBarrier 深度对比
让我们用团队协作场景来理解这两个并发工具的核心差异:


🧩 核心概念对比表

特性 CountDownLatch(倒计时门闩) CyclicBarrier(循环屏障)
核心用途 等待其他线程完成任务 协调多个线程同步到某个阶段
重用性 一次性(计数器归零后失效) 可重复使用(自动重置计数器)
计数器方向 递减(countDown()) 递增(await() 增加到达数)
触发动作 可设置屏障触发时的回调动作
异常处理 不影响其他线程 若线程被中断,所有等待线程抛出异常
典型场景 主线程等待多个子线程初始化完成 多阶段并行计算(如 MapReduce)

🛠️ 工作原理图解

1. CountDownLatch 流程

主线程启动
   ↓
初始化 CountDownLatch(3)
   ↓
启动子线程A → 执行任务 → countDown() [计数器2]
启动子线程B → 执行任务 → countDown() [计数器1]
启动子线程C → 执行任务 → countDown() [计数器0]
   ↓
主线程 await() 被唤醒

2. CyclicBarrier 流程

线程A到达屏障 → 等待
线程B到达屏障 → 等待
线程C到达屏障 → 触发回调 → 所有线程继续执行
   ↓
(可重复使用,进入下一轮屏障)

💻 代码示例对比

1. CountDownLatch 示例:主厨等备料完成

public class KitchenDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        
        // 三个备菜工
        new Thread(() -> { prepare("蔬菜"); latch.countDown(); }).start();
        new Thread(() -> { prepare("肉类"); latch.countDown(); }).start();
        new Thread(() -> { prepare("调料"); latch.countDown(); }).start();

        latch.await(); // 主厨等待备料
        System.out.println("开始烹饪!");
    }

    static void prepare(String material) {
        try {
            Thread.sleep((long)(Math.random() * 1000);
            System.out.println(material + " 准备完成");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

2. CyclicBarrier 示例:多阶段游戏加载

public class GameLoading {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(4, 
            () -> System.out.println("\n所有玩家准备就绪,开始下一阶段!\n"));

        for (int i=0; i<4; i++) {
            new Player(barrier).start();
        }
    }
}

class Player extends Thread {
    private final CyclicBarrier barrier;
    
    public Player(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
    
    public void run() {
        try {
            for (int stage=1; stage<=3; stage++) {
                System.out.println(getName() + " 完成阶段" + stage + "加载");
                barrier.await(2, TimeUnit.SECONDS); // 等待其他玩家
            }
        } catch (Exception e) {
            handleException(e);
        }
    }
    
    void handleException(Exception e) {
        // 异常处理逻辑
    }
}

⚖️ 关键差异深度解析

1. 状态重置机制

2. 异常传播

3. 超时处理

// CountDownLatch
boolean success = latch.await(1, TimeUnit.SECONDS);

// CyclicBarrier
int index = barrier.await(500, TimeUnit.MILLISECONDS);

🔄 混合使用场景示例

电商订单处理流程

1. 使用 CountDownLatch 等待所有商品库存校验完成
2. 使用 CyclicBarrier 同步支付服务和物流服务的准备
3. 再次使用 CountDownLatch 确认所有子系统完成最终提交

代码片段

CountDownLatch stockLatch = new CountDownLatch(items.size());
CyclicBarrier paymentBarrier = new CyclicBarrier(2);
CountDownLatch commitLatch = new CountDownLatch(2);

// 库存校验线程
executor.submit(() -> {
    checkStock();
    stockLatch.countDown();
});

// 支付服务
executor.submit(() -> {
    stockLatch.await();
    paymentBarrier.await(); // 与物流服务同步
    processPayment();
    commitLatch.countDown();
});

// 物流服务
executor.submit(() -> {
    stockLatch.await();
    paymentBarrier.await(); // 与支付服务同步
    arrangeDelivery();
    commitLatch.countDown();
});

commitLatch.await();
System.out.println("订单处理完成!");

🚀 最佳实践指南

场景 推荐工具 理由
主线程等待多个初始化任务完成 CountDownLatch 天然的一次性等待机制
多阶段并行计算 CyclicBarrier 自动重置适合迭代过程
需要执行完成回调 CyclicBarrier 内置回调函数支持
不确定等待线程数 都不适用,考虑 Phaser 动态注册参与者机制更灵活

⚠️ 常见陷阱与规避

  1. CountDownLatch 的幽灵计数

    • 问题:多次调用 countDown()
    • 解决:使用 getCount() 检查状态
  2. CyclicBarrier 的死锁风险

    • 问题:等待线程数超过初始化数量
    • 解决:使用 isBroken() 检测屏障状态
  3. 资源泄漏

    • 问题:未正确关闭线程池导致屏障未触发
    • 解决:配合使用 awaitTermination()

🌟 总结选择策略

两者都基于 AQS 实现,但设计目标不同。理解它们的核心差异,就能像搭积木一样灵活构建并发流程控制!

线程池总结

📚 Java 线程池深度解析

从原理到实战,全面掌握线程池的核心机制与最佳实践。


🧩 线程池核心参数

线程池通过 ThreadPoolExecutor 构造函数配置,以下是核心参数:

public ThreadPoolExecutor(
    int corePoolSize,          // 核心线程数(长期存活的线程)
    int maximumPoolSize,       // 最大线程数(临时线程 = max - core)
    long keepAliveTime,        // 临时线程空闲存活时间
    TimeUnit unit,             // 时间单位
    BlockingQueue<Runnable> workQueue, // 任务队列
    RejectedExecutionHandler handler   // 拒绝策略
) 

⚙️ 线程池工作流程

任务提交 → 核心线程是否已满?  
├─ 未满 → 创建新线程执行  
├─ 已满 → 任务入队列  
│   ├─ 队列未满 → 等待执行  
│   └─ 队列已满 → 创建临时线程(直到max)  
│       ├─ 成功 → 执行任务  
│       └─ 失败 → 触发拒绝策略  

🔧 线程池类型与代码示例

1. FixedThreadPool(固定线程池)

ExecutorService fixedPool = Executors.newFixedThreadPool(4);

// 适用场景:长期稳定的并发任务
// 特点:无界队列(LinkedBlockingQueue),可能OOM

2. CachedThreadPool(缓存线程池)

ExecutorService cachedPool = Executors.newCachedThreadPool();

// 适用场景:大量短期异步任务
// 特点:SynchronousQueue,max=Integer.MAX_VALUE,可能创建过多线程

3. ScheduledThreadPool(定时线程池)

ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);

// 延迟执行
scheduledPool.schedule(() -> System.out.println("5秒后执行"), 5, TimeUnit.SECONDS);

// 固定频率执行
scheduledPool.scheduleAtFixedRate(() -> 
    System.out.println("每隔3秒执行"), 0, 3, TimeUnit.SECONDS);

4. SingleThreadExecutor(单线程池)

ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 适用场景:任务顺序执行
// 特点:保证任务顺序性,无界队列

自定义线程池(推荐方式)

ThreadPoolExecutor customPool = new ThreadPoolExecutor(
    4,                              // corePoolSize
    8,                              // maximumPoolSize
    30, TimeUnit.SECONDS,           // keepAliveTime
    new ArrayBlockingQueue<>(100),   // 有界队列(推荐!)
    new ThreadFactory() {           // 自定义线程工厂
        private AtomicInteger count = new AtomicInteger(0);
        public Thread newThread(Runnable r) {
            return new Thread(r, "Worker-" + count.incrementAndGet());
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

🛡️ 拒绝策略详解

策略类 行为 适用场景
AbortPolicy(默认) 抛出 RejectedExecutionException 严格要求任务必须被处理
CallerRunsPolicy 由提交任务的线程直接执行 保证任务不丢失,但可能阻塞主线程
DiscardPolicy 静默丢弃新任务 允许丢弃新任务的场景
DiscardOldestPolicy 丢弃队列中最旧的任务,尝试重新提交 优先处理新任务

自定义拒绝策略示例

new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志并降级处理
        logger.warn("任务被拒绝: {}", r);
        sendToBackupQueue(r);
    }
}

📊 线程池监控方法

// 获取活跃线程数
int activeCount = customPool.getActiveCount();

// 获取已完成任务数
long completedCount = customPool.getCompletedTaskCount();

// 获取队列中等待任务数
int queueSize = customPool.getQueue().size();

// 扩展钩子方法(监控任务执行时间)
customPool.beforeExecute((t, r) -> startTime.set(System.currentTimeMillis()));
customPool.afterExecute((r, t) -> 
    logger.info("任务耗时: {}ms", System.currentTimeMillis() - startTime.get()));

💡 线程池调优指南

线程数设置公式

队列选择策略

队列类型 特点 适用场景
SynchronousQueue 直接传递任务,无容量 CachedThreadPool
LinkedBlockingQueue 无界队列(默认) Fixed/SingleThreadPool
ArrayBlockingQueue 有界队列(推荐) 需要防止资源耗尽
PriorityBlockingQueue 优先级队列 任务需要优先级调度

⚠️ 常见陷阱与解决方案

问题 现象 解决方案
任务堆积导致 OOM 队列使用无界队列 改用有界队列 + 合理拒绝策略
线程泄漏 线程未正确关闭 调用 shutdown() + awaitTermination()
死锁 线程互相等待资源 避免嵌套提交任务,使用超时机制
上下文切换过多 CPU 利用率低但吞吐量不高 减少线程数,合并任务

🌰 实战案例:Web 服务器请求处理

// 自定义线程池配置
ThreadPoolExecutor serverPool = new ThreadPoolExecutor(
    10, 50, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new NamedThreadFactory("WebServer-Worker"),
    new ThreadPoolExecutor.AbortPolicy()
);

// 处理请求
void handleRequest(HttpRequest request) {
    serverPool.execute(() -> {
        try {
            processRequest(request); // 实际业务处理
        } catch (Exception e) {
            logger.error("处理失败", e);
        }
    });
}

// 优雅关闭
serverPool.shutdown();
if (!serverPool.awaitTermination(30, TimeUnit.SECONDS)) {
    List<Runnable> unfinished = serverPool.shutdownNow();
    logger.warn("强制终止{}个任务", unfinished.size());
}

🔄 线程池生命周期管理

            +----------------+
            |  Running       |
            |  (接受新任务)   |
            +---+------------+
                | shutdown()
                v
+----------------+------------+
|  Shutdown                  |
|  (不接收新任务,继续处理队列) |
+---+------------------------+
    | shutdownNow()
    v
+----------------+------------+
|  Terminated                 |
|  (所有线程终止)              |
+----------------------------+

🌟 最佳实践总结

  1. 使用有界队列:防止 OOM
  2. 合理设置线程数:根据任务类型调整
  3. 明确线程命名:便于问题排查
  4. 监控任务队列:设置阈值报警
  5. 优雅关闭:shutdown() + awaitTermination()
  6. 异常处理:使用 try-catch 包裹任务代码
  7. 避免任务嵌套提交:防止死锁

终极建议:在项目中避免直接使用 Executors 快捷方法,而是通过 ThreadPoolExecutor 构造函数显式配置参数,这是《阿里巴巴 Java 开发手册》中的强制要求,可以有效规避资源耗尽风险。

线程池底层原理

📚 Java 线程池底层原理深度解析

从 JVM 层面对线程池的工作机制进行拆解,揭示其高效调度与资源管理的核心设计。


🧩 核心数据结构与状态管理

1. 控制变量 ctl

线程池通过原子整型 ctl 同时管理线程池状态工作线程数

// 高3位表示状态,低29位表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 状态定义
private static final int RUNNING    = -1 << COUNT_BITS; // 接受新任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 不接收新任务,处理队列任务
private static final int STOP       =  1 << COUNT_BITS; // 不处理任何任务,中断进行中的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务终止,准备执行terminated()
private static final int TERMINATED =  3 << COUNT_BITS; // terminated()执行完成

2. Worker 内部类

每个工作线程被封装为 Worker 对象,继承 AQS(AbstractQueuedSynchronizer)

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;   // 实际执行线程
    Runnable firstTask;    // 初始任务(可为null)

    Worker(Runnable firstTask) {
        setState(-1); // 禁止中断直到runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);   // 核心执行循环
    }

    // 实现简单不可重入锁
    protected boolean isHeldExclusively() { return getState() != 0; }
    protected boolean tryAcquire(int unused) { return compareAndSetState(0, 1); }
    protected boolean tryRelease(int unused) { setState(0); return true; }
}

⚙️ 核心执行流程剖析

1. 任务提交入口 execute()

public void execute(Runnable command) {
    if (command == null) throw new NPE();
    int c = ctl.get();
    
    // 阶段1:尝试创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) return;
        c = ctl.get();
    }

    // 阶段2:尝试入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 创建非核心线程处理队列
    }
    
    // 阶段3:尝试创建非核心线程
    else if (!addWorker(command, false))
        reject(command); // 触发拒绝策略
}

2. 线程创建 addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检查状态是否允许创建线程
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            if (compareAndIncrementWorkerCount(c)) 
                break retry;
            // CAS 失败重试
        }
    }

    // 创建 Worker 对象并启动线程
    Worker w = new Worker(firstTask);
    Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }
        t.start(); // 调用 Worker.run() → runWorker()
    }
    return true;
}

3. 线程执行循环 runWorker()

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 表示线程正在工作
            // 处理线程中断状态
            if ((runStateAtLeast(ctl.get(), STOP) || 
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 钩子方法
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException | Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 钩子方法
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 线程退出处理
    }
}

🔄 任务获取与线程回收

1. getTask() 方法

private Runnable getTask() {
    boolean timedOut = false; // 上次poll是否超时

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检查状态是否允许获取任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 线程数超过max或超时 → 减少线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

2. 线程退出 processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 异常退出需补偿线程数
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 尝试终止线程池
    tryTerminate();

    // 补充新线程(若需要)
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

🛠️ 关键设计思想

1. 无锁化设计

2. 资源弹性管理

3. 优雅退化机制


性能优化细节

1. 避免伪共享(False Sharing)

Worker 对象通过 缓存行填充 优化内存布局:

// JDK 内部实现(简化版)
@sun.misc.Contended
class Worker extends AbstractQueuedSynchronizer {
    // 字段布局优化
}

2. 线程创建优化

3. 队列选择策略

不同队列实现的性能差异:

队列类型 吞吐量 公平性 适用场景
SynchronousQueue 可选 短任务、高并发
LinkedBlockingQueue 任务执行时间差异大
ArrayBlockingQueue 可选 需要严格控制队列大小

🌰 底层源码级示例

Worker 锁的应用

// 在 runWorker 中通过 lock 标记工作状态
w.lock(); // 获取 Worker 锁
try {
    beforeExecute(wt, task);
    task.run();
} finally {
    w.unlock(); // 释放锁
}

状态转换原子性

// tryTerminate() 中的状态转换
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // 中断空闲线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // 转换为 TIDYING 状态
        if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
            try {
                terminated();
            } finally {
                ctl.set(ctlOf(TERMINATED, 0));
                termination.signalAll();
            }
            return;
        }
    }
}

🚀 总结

Java 线程池通过精妙的状态管理、弹性线程调度和高效的任务处理机制,实现了高并发环境下的资源优化利用。其底层设计体现了以下核心思想:

  1. 原子操作优先:通过 CAS 减少锁竞争
  2. 分层管理:Worker 对象封装线程生命周期
  3. 弹性伸缩:动态调整线程数与任务队列的平衡
  4. 优雅降级:拒绝策略防止系统过载崩溃

理解这些底层原理,有助于开发者编写更高效可靠的并发代码,并在性能调优时做出精准决策。

小结

以下是对 Java 并发编程核心概念的结构化总结,我们将从底层到高层,用最清晰的方式梳理所有关键概念及其相互关系:


🌐 Java 并发知识体系全景图

                      Java并发体系
                           |
        +------------------+------------------+
        |                  |                  |
   【线程基础】        【同步机制】          【并发工具】
        |                  |                  |
   ├─ 线程生命周期    ├─ 互斥锁            ├─ 线程池
   ├─ Thread/Runnable ├─ synchronized      ├─ Executor框架
   ├─ 线程状态切换     ├─ ReentrantLock     ├─ Fork/Join
   ├─ 线程优先级       ├─ ReadWriteLock     ├─ CompletableFuture
   └─ Daemon线程       ├─ 原子类            └─ 并发集合
                      ├─ volatile
                      └─ 内存模型 (JMM)

一、线程基础

概念 核心要点 代码示例
线程创建 继承 Thread 或实现 Runnable / Callable new Thread(() -> {...}).start()
线程状态 NEW → RUNNABLE → BLOCKED → WAITING → TIMED_WAITING → TERMINATED Thread.getState()
Daemon 线程 守护线程(随主线程结束) thread.setDaemon(true)
线程优先级 1~10 的优先级设置(不保证严格执行) thread.setPriority(8)

二、同步机制

1. 互斥锁

工具 特点 适用场景
synchronized JVM 内置锁,自动释放,不可中断 简单的代码块同步
ReentrantLock 可重入、可中断、公平锁,需手动释放 需要高级功能的复杂同步
StampedLock 乐观读锁,适用于读多写少场景 高并发读取操作

对比示例

// synchronized
synchronized(lock) { /* 临界区 */ }

// ReentrantLock
Lock lock = new ReentrantLock();
lock.lock();
try { /* 临界区 */ } 
finally { lock.unlock(); }

2. 原子操作

功能 示例方法
AtomicInteger 原子整型操作 incrementAndGet()
AtomicReference 原子对象引用 compareAndSet()
LongAdder 高并发下性能更好的计数器 add(), sum()

原理:基于 CAS (Compare-And-Swap) 实现无锁并发


三、线程通信

机制 说明 关键方法
wait()/notify() 对象监视器方法 需在同步块内使用
Condition 更灵活的等待/通知机制 await(), signal()
BlockingQueue 线程安全的队列实现 put(), take()

典型模式

// 生产者-消费者模式
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 生产者
queue.put(item);

// 消费者
String item = queue.take();

四、线程管理

1. 线程池体系

ExecutorService
    ├─ ThreadPoolExecutor(标准线程池)
    ├─ ScheduledThreadPoolExecutor(定时任务)
    └─ ForkJoinPool(分治任务)

核心参数

创建方式

ExecutorService pool = Executors.newFixedThreadPool(4);

2. Future 机制

特点
Future 基础异步结果获取
CompletableFuture 支持链式调用、组合异步任务(Java8+)

示例

CompletableFuture.supplyAsync(() -> "Hello")
                 .thenApply(s -> s + " World")
                 .thenAcceptprintln;

五、并发工具类

工具 用途 核心方法
CountDownLatch 等待多个任务完成 await(), countDown()
CyclicBarrier 多线程到达屏障后统一行动 await()
Semaphore 控制并发访问数量 acquire(), release()
Phaser 更灵活的阶段同步(Java7+) arriveAndAwaitAdvance()

对比

CountDownLatch:一次性使用,等待N个任务完成
CyclicBarrier:可重复使用,等待N个线程到达
Semaphore:控制资源访问并发数

六、并发集合

集合类 特点 实现原理
ConcurrentHashMap 高并发安全的哈希表 分段锁 + CAS
CopyOnWriteArrayList 写时复制列表 读无锁,写复制整个数组
BlockingQueue 阻塞队列(Array/Linked/Priority) 锁 + Condition
ConcurrentLinkedQueue 无界非阻塞队列 CAS 实现

选型指南


七、内存模型(JMM)

核心原则

volatile 关键字

volatile boolean flag = false; // 保证所有线程看到最新值

happens-before 规则
7. 程序顺序规则
8. 锁规则
9. volatile 变量规则
10. 线程启动规则
11. 传递性规则


八、并发设计模式

模式 实现方式 应用场景
Thread-Per-Message 为每个任务创建新线程 简单任务处理
Worker Thread 使用线程池管理线程 高并发任务处理
Producer-Consumer 通过阻塞队列解耦生产消费 数据管道处理
Two-Phase Termination 通过标志位安全终止线程 优雅关闭线程

🚀 学习路线建议

  1. 基础阶段
  1. 进阶阶段
  1. 高阶阶段

⚠️ 常见陷阱与解决方案

问题 解决方案
死锁 按固定顺序获取锁,使用 tryLock 超时机制
线程泄漏 正确关闭线程池(shutdown + awaitTermination)
上下文切换开销 减少同步代码块范围,使用无锁数据结构
伪共享(False Sharing) 使用 @Contended 注解填充缓存行

最终总结:Java 并发体系的核心是安全地管理状态共享高效利用计算资源。理解这个体系需要从三个维度入手:
15. 线程管理(创建、调度、协作)
16. 共享数据保护(锁、原子类、内存可见性)
17. 并发模式设计(工具类、线程池、队列)

建议通过实际项目中的场景(如接口限流、批量处理、异步任务等)逐步实践这些概念,遇到问题时再回来看这张知识地图,会有更深刻的理解。