前言

在现代多核CPU架构下,并发编程已成为提升应用程序性能的关键技术。Java的java.util.concurrent(JUC)包提供了丰富的工具类来帮助开发者更安全、更高效地处理并发问题。本文将深入探讨JUC中几个常用的锁和同步器:ReentrantReadWriteLockStampedLockSemaphoreCountDownLatchCyclicBarrier,理解它们的设计理念、使用场景及注意事项。

ReentrantReadWriteLock:读写分离锁

ReentrantReadWriteLock是一种读写分离锁,它允许多个线程同时读取共享资源,但在写入时则需要独占访问。这在“读多写少”的场景下能显著提高性能,因为它避免了不必要的阻塞。

读写锁原理图-Illustration showing multiple reader threads accessing a shared resource concurrently, while a single writer thread waits, conceptual lock diagram

核心特性:

  1. 读锁与写锁互斥:当一个线程持有写锁时,其他线程(无论是读线程还是写线程)都必须等待。当一个线程持有读锁时,写线程必须等待。
  2. 写锁与写锁互斥:同一时间只允许一个线程持有写锁。
  3. 读锁与读锁不互斥:多个线程可以同时持有读锁,并发读取。

锁升级与降级:

  • 锁升级(不支持):持有读锁的线程尝试获取写锁,会导致死锁或永久等待。这是因为获取写锁需要等待所有读锁释放,而当前线程持有的读锁永远无法释放(因为它在等待写锁)。
  • 锁降级(支持):持有写锁的线程可以继续获取读锁,然后释放写锁。这样就从写锁降级为了读锁,允许其他读线程进入,但仍阻止其他写线程。

示例代码:

下面是一个简单的示例,演示了ReentrantReadWriteLock的基本用法。DataContainer类使用读写锁来保护共享数据data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

// 使用Slf4j或其他日志框架
// import lombok.extern.slf4j.Slf4j;
// @Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
private static final Logger log = LoggerFactory.getLogger(TestReadWriteLock.class);

public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();

// 启动两个读线程
new Thread(() -> {
dataContainer.read();
}, "t1-Reader").start();

new Thread(() -> {
dataContainer.read();
}, "t2-Reader").start();

// 稍等片刻,启动一个写线程
TimeUnit.MILLISECONDS.sleep(100);
new Thread(() -> {
dataContainer.write("New Data");
}, "t3-Writer").start();

// 再启动一个读线程,观察其是否被写锁阻塞
TimeUnit.MILLISECONDS.sleep(100);
new Thread(() -> {
dataContainer.read();
}, "t4-Reader").start();
}

// 模拟休眠,简化代码
public static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

// 使用Slf4j或其他日志框架
// import lombok.extern.slf4j.Slf4j;
// @Slf4j(topic = "c.DataContainer")
class DataContainer {
private static final Logger log = LoggerFactory.getLogger(DataContainer.class);
private Object data = "Initial Data";
// 1. 创建读写锁对象
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
// 2. 获取读锁和写锁实例
private final ReentrantReadWriteLock.ReadLock r = rw.readLock();
private final ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public Object read() {
log.debug("尝试获取读锁...");
r.lock(); // 3. 加读锁
try {
log.debug("成功获取读锁,开始读取...");
sleep(1); // 模拟读取耗时
log.debug("读取完成, data: {}", data);
return data;
} finally {
log.debug("准备释放读锁...");
r.unlock(); // 4. 释放读锁
}
}

public void write(Object newData) {
log.debug("尝试获取写锁...");
w.lock(); // 3. 加写锁
try {
log.debug("成功获取写锁,开始写入...");
this.data = newData;
sleep(1); // 模拟写入耗时
log.debug("写入完成, data: {}", data);
} finally {
log.debug("准备释放写锁...");
w.unlock(); // 4. 释放写锁
}
}

// 模拟休眠
private static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

运行结果分析:

你会观察到 t1-Readert2-Reader 可以并发执行读取操作。当 t3-Writer 尝试获取写锁时,它会等待 t1t2 释放读锁。一旦 t3 持有写锁,后续的 t4-Reader 会被阻塞,直到 t3 释放写锁。

StampedLock:带版本戳的乐观读写锁

StampedLock 是 JDK 8 引入的一种更高级的锁,它在ReentrantReadWriteLock的基础上进行了优化,特别是在读操作远多于写操作的场景下。StampedLock 提供了乐观读(Optimistic Reading)的模式。

核心概念:

  • 戳(Stamp):每次获取锁(读锁、写锁或乐观读)都会返回一个非零的版本戳。释放锁或转换锁状态时需要传入相应的戳。

  • 乐观读 (tryOptimisticRead):尝试获取一个非独占的读锁,但实际上不加锁,只是获取当前的版本戳。之后,线程可以读取共享资源。读取完成后,需要调用validate(stamp)方法检查在读取期间是否有写操作介入(即版本戳是否发生变化)。

  • 如果validate返回true,表示没有写操作干扰,读取的数据有效,性能极高(无锁操作)。

    • 如果validate返回false,表示有写操作干扰,数据可能已失效,此时需要升级为悲观读锁(调用readLock())重新读取数据。
  • 悲观读锁 (readLock):类似ReentrantReadWriteLock的读锁,会阻塞写操作,但不阻塞其他读操作。

  • 写锁 (writeLock):类似ReentrantReadWriteLock的写锁,独占访问。

StampedLock乐观读流程图-Flowchart diagram illustrating the StampedLock optimistic read process: tryOptimisticRead, read data, validate stamp, if valid use data, if invalid acquire readLock and re-read

优点:

  • 在读多写少且写操作时间短的场景下,乐观读能大幅减少锁竞争,提升吞吐量。

缺点:

  • StampedLock不可重入的。如果一个线程已经持有写锁,再尝试获取写锁会导致死锁;持有读锁再尝试获取读锁也可能产生问题。
  • 不支持条件变量(Condition)。
  • 使用相对复杂,需要正确处理乐观读失败后的锁升级逻辑。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;

// 使用Slf4j或其他日志框架
// import lombok.extern.slf4j.Slf4j;
// @Slf4j(topic = "c.TestStampedLock")
public class TestStampedLock {
private static final Logger log = LoggerFactory.getLogger(TestStampedLock.class);

public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);

// 线程t1进行一次快速读取(乐观读成功)
new Thread(() -> {
dataContainer.read(1); // 模拟读取时间1秒
}, "t1-optimistic-success").start();

sleep(0.1); // 确保t1开始

// 线程t-writer进行写操作
new Thread(() -> {
log.debug("Writer thread starting write operation...");
dataContainer.write(99);
}, "t-writer").start();

sleep(0.5); // 确保写操作有机会介入

// 线程t2进行一次较慢的读取(乐观读失败,升级为读锁)
new Thread(() -> {
dataContainer.read(2); // 模拟读取时间2秒,期间writer会修改数据
}, "t2-optimistic-fail").start();
}

// 模拟休眠,简化代码
public static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

// 使用Slf4j或其他日志框架
// import lombok.extern.slf4j.Slf4j;
// @Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {
private static final Logger log = LoggerFactory.getLogger(DataContainerStamped.class);
private int data;
private final StampedLock lock = new StampedLock();

public DataContainerStamped(int data) {
this.data = data;
}

public int read(int readTime) {
long stamp = lock.tryOptimisticRead(); // 1. 尝试乐观读,获取戳
log.debug("Optimistic read attempt, stamp: {}", stamp);
sleep(readTime); // 模拟读取操作耗时

// 2. 校验戳,检查读取期间是否有写操作介入
if (lock.validate(stamp)) {
log.debug("Optimistic read validation successful (stamp: {}), data: {}", stamp, data);
return data; // 校验成功,数据有效
}

// 3. 乐观读失败,升级为悲观读锁
log.debug("Optimistic read validation failed (stamp: {}), upgrading to read lock...", stamp);
try {
stamp = lock.readLock(); // 获取悲观读锁,阻塞写操作
log.debug("Acquired read lock, stamp: {}", stamp);
sleep(readTime); // 模拟重新读取耗时
log.debug("Read finished under read lock (stamp: {}), data: {}", stamp, data);
return data; // 返回在读锁保护下读取的数据
} finally {
log.debug("Releasing read lock, stamp: {}", stamp);
lock.unlockRead(stamp); // 4. 释放悲观读锁
}
}

public void write(int newData) {
long stamp = lock.writeLock(); // 1. 获取写锁,阻塞所有读写
log.debug("Acquired write lock, stamp: {}", stamp);
try {
log.debug("Writing data...");
sleep(2); // 模拟写入耗时
this.data = newData;
log.debug("Write finished, new data: {}", data);
} finally {
log.debug("Releasing write lock, stamp: {}", stamp);
lock.unlockWrite(stamp); // 2. 释放写锁
}
}

// 模拟休眠
private static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

运行结果分析:

  • t1-optimistic-success 线程尝试乐观读,由于读取期间没有写操作,validate成功,快速返回。
  • t-writer 获取写锁并修改数据。
  • t2-optimistic-fail 线程尝试乐观读,但在其读取期间(sleep(2)),t-writer 修改了数据,导致validate失败。随后 t2 升级为悲观读锁,等待 t-writer 释放写锁后,再重新获取数据。

Semaphore:信号量

Semaphore(信号量)用于控制同时访问特定资源的线程数量。它维护了一个许可(permit)集,线程必须先获得许可才能访问资源,使用完毕后释放许可。如果许可数量为0,则尝试获取许可的线程将被阻塞,直到有其他线程释放许可。

信号量示意图-Illustration of a Semaphore controlling access to limited resources, showing threads waiting and acquiring permits, like a gatekeeper

常用场景:

  • 限流:控制对某个服务的并发调用量。
  • 资源池:管理有限的资源,如数据库连接池。

核心方法:

  • Semaphore(int permits):构造函数,初始化许可数量。
  • acquire():获取一个许可,如果无可用许可则阻塞。
  • acquire(int permits):获取指定数量的许可。
  • release():释放一个许可。
  • release(int permits):释放指定数量的许可。
  • tryAcquire():尝试获取一个许可,立即返回成功或失败,不阻塞。

示例代码:

下面的示例模拟了10个线程尝试访问一个只有3个许可的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

// 使用Slf4j或其他日志框架
// import lombok.extern.slf4j.Slf4j;
// @Slf4j(topic = "c.TestSemaphore")
public class TestSemaphore {
private static final Logger log = LoggerFactory.getLogger(TestSemaphore.class);

public static void main(String[] args) {
// 1. 创建 Semaphore 对象,设置许可数量为 3
Semaphore semaphore = new Semaphore(3);

// 2. 创建 10 个线程尝试获取许可并执行任务
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 3. 尝试获取许可,获取不到则阻塞
semaphore.acquire();
log.debug(" acquired permit, running...");
sleep(1); // 模拟执行任务
log.debug(" task finished, releasing permit...");
} catch (InterruptedException e) {
log.error("Thread interrupted", e);
Thread.currentThread().interrupt();
} finally {
// 4. 释放许可
semaphore.release();
}
}, "Thread-" + i).start();
}
}

// 模拟休眠,简化代码
public static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

运行结果分析:

同一时间内,最多只有3个线程能够获得许可并打印 “running…”。其他线程会阻塞在 acquire() 方法上,直到有线程调用 release() 释放许可。

CountDownLatch:倒计时门闩

CountDownLatch 是一种同步辅助类,它允许一个或多个线程等待其他一组线程完成操作。它像一个倒计时器,主线程(或任何需要等待的线程)调用 await() 方法阻塞,直到计数器减到零。其他线程则通过调用 countDown() 方法来使计数器减一。

CountDownLatch示意图-Illustration showing multiple worker threads decrementing a counter (CountDownLatch), while a main thread waits (await) until the counter reaches zero, like a starting gate

核心特性:

  • 一次性:计数器的值只能被初始化一次,并且一旦减到零,就不能再重置或增加。
  • 等待机制await() 方法阻塞调用线程,直到计数器为零。
  • 计数递减countDown() 方法将计数器减一。

常用场景:

  • 等待多个初始化任务完成:主线程等待所有初始化子线程完成后再继续执行。
  • 模拟并发:确保多个线程同时开始执行某个任务。

示例代码:

下面的示例模拟了一个游戏加载场景,主线程需要等待10个加载任务(每个任务在一个线程中执行)全部完成后才能开始游戏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TestCountDownLatch {
private static final Logger log = LoggerFactory.getLogger(TestCountDownLatch.class);

public static void main(String[] args) throws InterruptedException {
testLoadingGame();
}

private static void testLoadingGame() throws InterruptedException {
AtomicInteger threadCounter = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
// 自定义线程名
return new Thread(r, "loader-" + threadCounter.getAndIncrement());
});

// 1. 初始化 CountDownLatch,计数为 10
CountDownLatch latch = new CountDownLatch(10);
String[] loadingProgress = new String[10]; // 用于显示加载进度
Arrays.fill(loadingProgress, "0%"); // 初始化进度

Random random = new Random();
log.info("Starting game loading process...");

// 2. 提交 10 个加载任务到线程池
for (int j = 0; j < 10; j++) {
int taskIndex = j; // lambda 中需要 final 或 effectively final 变量
service.submit(() -> {
try {
log.debug("{} starting loading...", Thread.currentThread().getName());
// 模拟加载过程
for (int i = 0; i <= 100; i++) {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(50)); // 随机加载耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 更新进度(注意:这里的更新方式不是线程安全的,仅为演示)
// 在实际应用中,更新共享状态需要同步或使用并发集合
synchronized (loadingProgress) {
loadingProgress[taskIndex] = Thread.currentThread().getName() + "(" + i + "%)";
// 打印进度条(\r 实现原地更新)
System.out.print("\rLoading progress: " + Arrays.toString(loadingProgress));
}
}
log.debug("{} finished loading.", Thread.currentThread().getName());
} finally {
// 3. 任务完成,计数器减一
latch.countDown();
}
});
}

// 4. 主线程调用 await() 等待所有加载任务完成(计数器归零)
log.info("Main thread waiting for all loading tasks to complete...");
latch.await(); // 阻塞直到 latch 计数为 0

System.out.println(); // 换行
log.info("All loading tasks completed. Game starting...");
service.shutdown(); // 关闭线程池
}
}

运行结果分析:

主线程会打印 “Main thread waiting…” 并阻塞在 latch.await()。同时,10个加载线程并发执行,每个线程完成加载后调用 latch.countDown()。当第10个线程完成并调用 countDown() 后,计数器变为0,await() 方法返回,主线程继续执行,打印 “Game starting…”。

CyclicBarrier:循环栅栏

CyclicBarrier(循环栅栏)与CountDownLatch类似,也是用于线程协作,但它更像一个可重用的屏障。它允许一组线程相互等待,直到所有线程都到达某个公共屏障点(barrier point),然后所有线程再同时继续执行。

CyclicBarrier示意图-Illustration showing multiple threads arriving at a barrier (CyclicBarrier), waiting for each other, once all arrive the barrier breaks, and they proceed together. The barrier can reset.

核心特性:

  • 计数等待:构造时指定参与的线程数(parties)。线程调用 await() 方法到达屏障,并阻塞等待,直到指定数量的线程都调用了await()
  • 屏障动作(可选):可以在构造时提供一个 Runnable 任务,当所有线程都到达屏障后,在任何线程被释放之前,会由最后一个到达屏障的线程执行这个任务。
  • 可重用:当所有等待线程被释放后,CyclicBarrier 可以被重置(自动或手动),用于下一轮的同步。这就像“人满发车”,发车后可以等待下一批人。

常用场景:

  • 多线程计算:将一个大任务分解给多个线程,等待所有线程计算完成中间结果后,再进行汇总或下一步计算。
  • 并行测试:确保所有测试线程都准备好后再开始执行测试。

CyclicBarrier vs CountDownLatch

  • CountDownLatch 是一次性的,计数减到0后不能重置;CyclicBarrier 是可重用的。
  • CountDownLatch 主要用于一个或多个线程等待其他线程完成某事;CyclicBarrier 用于一组线程相互等待,直到所有成员都到达某个点。

示例代码:

下面的示例模拟了两个任务(task1, task2)需要协同工作,每一轮都需要双方都完成后才能进入下一轮。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;

// 使用Slf4j或其他日志框架
// import lombok.extern.slf4j.Slf4j;
// @Slf4j(topic = "c.TestCyclicBarrier")
public class TestCyclicBarrier {
private static final Logger log = LoggerFactory.getLogger(TestCyclicBarrier.class);

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);

// 1. 创建 CyclicBarrier,指定参与者数量为 2,并提供一个屏障动作
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
// 这个 Runnable 会在两个线程都到达屏障后,由最后一个到达的线程执行
log.info("--- Both task1 and task2 reached the barrier. Barrier action executed. Proceeding to next round... ---");
});

// 模拟执行 3 轮任务
for (int i = 0; i < 3; i++) {
log.info("Starting round {}", i + 1);
// 提交 task1
service.submit(() -> {
try {
log.debug("task1 starting work for round...");
sleep(1); // 模拟 task1 工作耗时
log.debug("task1 finished work, waiting at barrier...");
// 2. task1 到达屏障,等待 task2
barrier.await(); // 如果是第一个到达,会阻塞;如果是第二个到达,会唤醒等待的线程,并触发屏障动作
log.debug("task1 passed barrier.");
} catch (InterruptedException | BrokenBarrierException e) {
log.error("Task1 interrupted or barrier broken", e);
Thread.currentThread().interrupt();
}
});

// 提交 task2
service.submit(() -> {
try {
log.debug("task2 starting work for round...");
sleep(2); // 模拟 task2 工作耗时 (比 task1 长)
log.debug("task2 finished work, waiting at barrier...");
// 2. task2 到达屏障,等待 task1
barrier.await();
log.debug("task2 passed barrier.");
} catch (InterruptedException | BrokenBarrierException e) {
log.error("Task2 interrupted or barrier broken", e);
Thread.currentThread().interrupt();
}
});

// 等待一轮结束,为了清晰观察日志
sleep(3);
}

service.shutdown();
}

// 模拟休眠,简化代码
public static void sleep(double seconds) {
try {
TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

运行结果分析:

在每一轮中,task1 先完成工作并调用 await() 等待。task2 稍后完成工作并调用 await()。当 task2 调用 await() 时,满足了屏障所需的2个线程数,此时:

  1. 屏障动作(打印 “— Both task1 and task2 reached…”)被执行。
  2. task1task2 都从 await() 调用中返回,继续执行后续代码(打印 “passed barrier.”)。
  3. CyclicBarrier 自动重置,可以开始下一轮的等待。 这个过程会重复3次。

总结

Java并发包(JUC)提供了强大的工具来应对并发编程的挑战。

  • ReentrantReadWriteLock:适用于读多写少的场景,通过读写分离提高性能。注意锁升级不支持,锁降级支持。
  • StampedLock:JDK 8 引入的优化读写锁,提供乐观读模式,在特定场景下性能优于ReentrantReadWriteLock,但不可重入且使用更复杂。
  • Semaphore:控制对有限资源的并发访问数量,常用于限流和资源池管理。
  • CountDownLatch:一次性的同步工具,用于等待一组操作完成。
  • CyclicBarrier:可重用的同步工具,用于让一组线程在某个点相互等待,达到同步执行的效果。

理解这些工具的特性和适用场景,是编写高效、健壮的并发程序的关键。根据具体的业务需求选择合适的并发工具,能够有效提升系统性能和可靠性。