深入理解Java并发包(JUC)中的锁与同步器
|字数总计:4.6k|阅读时长:18分钟|阅读量:|
前言
在现代多核CPU架构下,并发编程已成为提升应用程序性能的关键技术。Java的java.util.concurrent
(JUC)包提供了丰富的工具类来帮助开发者更安全、更高效地处理并发问题。本文将深入探讨JUC中几个常用的锁和同步器:ReentrantReadWriteLock
、StampedLock
、Semaphore
、CountDownLatch
和CyclicBarrier
,理解它们的设计理念、使用场景及注意事项。
ReentrantReadWriteLock:读写分离锁
ReentrantReadWriteLock
是一种读写分离锁,它允许多个线程同时读取共享资源,但在写入时则需要独占访问。这在“读多写少”的场景下能显著提高性能,因为它避免了不必要的阻塞。
![读写锁原理图-Illustration showing multiple reader threads accessing a shared resource concurrently, while a single writer thread waits, conceptual lock diagram]()
核心特性:
- 读锁与写锁互斥:当一个线程持有写锁时,其他线程(无论是读线程还是写线程)都必须等待。当一个线程持有读锁时,写线程必须等待。
- 写锁与写锁互斥:同一时间只允许一个线程持有写锁。
- 读锁与读锁不互斥:多个线程可以同时持有读锁,并发读取。
锁升级与降级:
- 锁升级(不支持):持有读锁的线程尝试获取写锁,会导致死锁或永久等待。这是因为获取写锁需要等待所有读锁释放,而当前线程持有的读锁永远无法释放(因为它在等待写锁)。
- 锁降级(支持):持有写锁的线程可以继续获取读锁,然后释放写锁。这样就从写锁降级为了读锁,允许其他读线程进入,但仍阻止其他写线程。
示例代码:
下面是一个简单的示例,演示了ReentrantReadWriteLock
的基本用法。DataContainer
类使用读写锁来保护共享数据data
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestReadWriteLock { private static final Logger log = LoggerFactory.getLogger(TestReadWriteLock.class);
public static void main(String[] args) throws InterruptedException { DataContainer dataContainer = new DataContainer();
new Thread(() -> { dataContainer.read(); }, "t1-Reader").start();
new Thread(() -> { dataContainer.read(); }, "t2-Reader").start();
TimeUnit.MILLISECONDS.sleep(100); new Thread(() -> { dataContainer.write("New Data"); }, "t3-Writer").start();
TimeUnit.MILLISECONDS.sleep(100); new Thread(() -> { dataContainer.read(); }, "t4-Reader").start(); }
public static void sleep(double seconds) { try { TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
class DataContainer { private static final Logger log = LoggerFactory.getLogger(DataContainer.class); private Object data = "Initial Data"; private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock r = rw.readLock(); private final ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() { log.debug("尝试获取读锁..."); r.lock(); try { log.debug("成功获取读锁,开始读取..."); sleep(1); log.debug("读取完成, data: {}", data); return data; } finally { log.debug("准备释放读锁..."); r.unlock(); } }
public void write(Object newData) { log.debug("尝试获取写锁..."); w.lock(); try { log.debug("成功获取写锁,开始写入..."); this.data = newData; sleep(1); log.debug("写入完成, data: {}", data); } finally { log.debug("准备释放写锁..."); w.unlock(); } }
private static void sleep(double seconds) { try { TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
|
运行结果分析:
你会观察到 t1-Reader
和 t2-Reader
可以并发执行读取操作。当 t3-Writer
尝试获取写锁时,它会等待 t1
和 t2
释放读锁。一旦 t3
持有写锁,后续的 t4-Reader
会被阻塞,直到 t3
释放写锁。
StampedLock:带版本戳的乐观读写锁
StampedLock
是 JDK 8 引入的一种更高级的锁,它在ReentrantReadWriteLock
的基础上进行了优化,特别是在读操作远多于写操作的场景下。StampedLock
提供了乐观读(Optimistic Reading)的模式。
核心概念:
戳(Stamp):每次获取锁(读锁、写锁或乐观读)都会返回一个非零的版本戳。释放锁或转换锁状态时需要传入相应的戳。
乐观读 (tryOptimisticRead
):尝试获取一个非独占的读锁,但实际上不加锁,只是获取当前的版本戳。之后,线程可以读取共享资源。读取完成后,需要调用validate(stamp)
方法检查在读取期间是否有写操作介入(即版本戳是否发生变化)。
如果validate
返回true
,表示没有写操作干扰,读取的数据有效,性能极高(无锁操作)。
- 如果
validate
返回false
,表示有写操作干扰,数据可能已失效,此时需要升级为悲观读锁(调用readLock()
)重新读取数据。
悲观读锁 (readLock
):类似ReentrantReadWriteLock
的读锁,会阻塞写操作,但不阻塞其他读操作。
写锁 (writeLock
):类似ReentrantReadWriteLock
的写锁,独占访问。

优点:
- 在读多写少且写操作时间短的场景下,乐观读能大幅减少锁竞争,提升吞吐量。
缺点:
StampedLock
是不可重入的。如果一个线程已经持有写锁,再尝试获取写锁会导致死锁;持有读锁再尝试获取读锁也可能产生问题。
- 不支持条件变量(Condition)。
- 使用相对复杂,需要正确处理乐观读失败后的锁升级逻辑。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock;
public class TestStampedLock { private static final Logger log = LoggerFactory.getLogger(TestStampedLock.class);
public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> { dataContainer.read(1); }, "t1-optimistic-success").start();
sleep(0.1);
new Thread(() -> { log.debug("Writer thread starting write operation..."); dataContainer.write(99); }, "t-writer").start();
sleep(0.5);
new Thread(() -> { dataContainer.read(2); }, "t2-optimistic-fail").start(); }
public static void sleep(double seconds) { try { TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
class DataContainerStamped { private static final Logger log = LoggerFactory.getLogger(DataContainerStamped.class); private int data; private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) { this.data = data; }
public int read(int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("Optimistic read attempt, stamp: {}", stamp); sleep(readTime);
if (lock.validate(stamp)) { log.debug("Optimistic read validation successful (stamp: {}), data: {}", stamp, data); return data; }
log.debug("Optimistic read validation failed (stamp: {}), upgrading to read lock...", stamp); try { stamp = lock.readLock(); log.debug("Acquired read lock, stamp: {}", stamp); sleep(readTime); log.debug("Read finished under read lock (stamp: {}), data: {}", stamp, data); return data; } finally { log.debug("Releasing read lock, stamp: {}", stamp); lock.unlockRead(stamp); } }
public void write(int newData) { long stamp = lock.writeLock(); log.debug("Acquired write lock, stamp: {}", stamp); try { log.debug("Writing data..."); sleep(2); this.data = newData; log.debug("Write finished, new data: {}", data); } finally { log.debug("Releasing write lock, stamp: {}", stamp); lock.unlockWrite(stamp); } }
private static void sleep(double seconds) { try { TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
|
运行结果分析:
t1-optimistic-success
线程尝试乐观读,由于读取期间没有写操作,validate
成功,快速返回。
t-writer
获取写锁并修改数据。
t2-optimistic-fail
线程尝试乐观读,但在其读取期间(sleep(2)
),t-writer
修改了数据,导致validate
失败。随后 t2
升级为悲观读锁,等待 t-writer
释放写锁后,再重新获取数据。
Semaphore:信号量
Semaphore
(信号量)用于控制同时访问特定资源的线程数量。它维护了一个许可(permit)集,线程必须先获得许可才能访问资源,使用完毕后释放许可。如果许可数量为0,则尝试获取许可的线程将被阻塞,直到有其他线程释放许可。

常用场景:
- 限流:控制对某个服务的并发调用量。
- 资源池:管理有限的资源,如数据库连接池。
核心方法:
Semaphore(int permits)
:构造函数,初始化许可数量。
acquire()
:获取一个许可,如果无可用许可则阻塞。
acquire(int permits)
:获取指定数量的许可。
release()
:释放一个许可。
release(int permits)
:释放指定数量的许可。
tryAcquire()
:尝试获取一个许可,立即返回成功或失败,不阻塞。
示例代码:
下面的示例模拟了10个线程尝试访问一个只有3个许可的资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;
public class TestSemaphore { private static final Logger log = LoggerFactory.getLogger(TestSemaphore.class);
public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); log.debug(" acquired permit, running..."); sleep(1); log.debug(" task finished, releasing permit..."); } catch (InterruptedException e) { log.error("Thread interrupted", e); Thread.currentThread().interrupt(); } finally { semaphore.release(); } }, "Thread-" + i).start(); } }
public static void sleep(double seconds) { try { TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
|
运行结果分析:
同一时间内,最多只有3个线程能够获得许可并打印 “running…”。其他线程会阻塞在 acquire()
方法上,直到有线程调用 release()
释放许可。
CountDownLatch:倒计时门闩
CountDownLatch
是一种同步辅助类,它允许一个或多个线程等待其他一组线程完成操作。它像一个倒计时器,主线程(或任何需要等待的线程)调用 await()
方法阻塞,直到计数器减到零。其他线程则通过调用 countDown()
方法来使计数器减一。

核心特性:
- 一次性:计数器的值只能被初始化一次,并且一旦减到零,就不能再重置或增加。
- 等待机制:
await()
方法阻塞调用线程,直到计数器为零。
- 计数递减:
countDown()
方法将计数器减一。
常用场景:
- 等待多个初始化任务完成:主线程等待所有初始化子线程完成后再继续执行。
- 模拟并发:确保多个线程同时开始执行某个任务。
示例代码:
下面的示例模拟了一个游戏加载场景,主线程需要等待10个加载任务(每个任务在一个线程中执行)全部完成后才能开始游戏。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;
public class TestCountDownLatch { private static final Logger log = LoggerFactory.getLogger(TestCountDownLatch.class);
public static void main(String[] args) throws InterruptedException { testLoadingGame(); }
private static void testLoadingGame() throws InterruptedException { AtomicInteger threadCounter = new AtomicInteger(0); ExecutorService service = Executors.newFixedThreadPool(10, (r) -> { return new Thread(r, "loader-" + threadCounter.getAndIncrement()); });
CountDownLatch latch = new CountDownLatch(10); String[] loadingProgress = new String[10]; Arrays.fill(loadingProgress, "0%");
Random random = new Random(); log.info("Starting game loading process...");
for (int j = 0; j < 10; j++) { int taskIndex = j; service.submit(() -> { try { log.debug("{} starting loading...", Thread.currentThread().getName()); for (int i = 0; i <= 100; i++) { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(50)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } synchronized (loadingProgress) { loadingProgress[taskIndex] = Thread.currentThread().getName() + "(" + i + "%)"; System.out.print("\rLoading progress: " + Arrays.toString(loadingProgress)); } } log.debug("{} finished loading.", Thread.currentThread().getName()); } finally { latch.countDown(); } }); }
log.info("Main thread waiting for all loading tasks to complete..."); latch.await();
System.out.println(); log.info("All loading tasks completed. Game starting..."); service.shutdown(); } }
|
运行结果分析:
主线程会打印 “Main thread waiting…” 并阻塞在 latch.await()
。同时,10个加载线程并发执行,每个线程完成加载后调用 latch.countDown()
。当第10个线程完成并调用 countDown()
后,计数器变为0,await()
方法返回,主线程继续执行,打印 “Game starting…”。
CyclicBarrier:循环栅栏
CyclicBarrier
(循环栅栏)与CountDownLatch
类似,也是用于线程协作,但它更像一个可重用的屏障。它允许一组线程相互等待,直到所有线程都到达某个公共屏障点(barrier point),然后所有线程再同时继续执行。

核心特性:
- 计数等待:构造时指定参与的线程数(parties)。线程调用
await()
方法到达屏障,并阻塞等待,直到指定数量的线程都调用了await()
。
- 屏障动作(可选):可以在构造时提供一个
Runnable
任务,当所有线程都到达屏障后,在任何线程被释放之前,会由最后一个到达屏障的线程执行这个任务。
- 可重用:当所有等待线程被释放后,
CyclicBarrier
可以被重置(自动或手动),用于下一轮的同步。这就像“人满发车”,发车后可以等待下一批人。
常用场景:
- 多线程计算:将一个大任务分解给多个线程,等待所有线程计算完成中间结果后,再进行汇总或下一步计算。
- 并行测试:确保所有测试线程都准备好后再开始执行测试。
CyclicBarrier
vs CountDownLatch
:
CountDownLatch
是一次性的,计数减到0后不能重置;CyclicBarrier
是可重用的。
CountDownLatch
主要用于一个或多个线程等待其他线程完成某事;CyclicBarrier
用于一组线程相互等待,直到所有成员都到达某个点。
示例代码:
下面的示例模拟了两个任务(task1, task2)需要协同工作,每一轮都需要双方都完成后才能进入下一轮。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*;
public class TestCyclicBarrier { private static final Logger log = LoggerFactory.getLogger(TestCyclicBarrier.class);
public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, () -> { log.info("--- Both task1 and task2 reached the barrier. Barrier action executed. Proceeding to next round... ---"); });
for (int i = 0; i < 3; i++) { log.info("Starting round {}", i + 1); service.submit(() -> { try { log.debug("task1 starting work for round..."); sleep(1); log.debug("task1 finished work, waiting at barrier..."); barrier.await(); log.debug("task1 passed barrier."); } catch (InterruptedException | BrokenBarrierException e) { log.error("Task1 interrupted or barrier broken", e); Thread.currentThread().interrupt(); } });
service.submit(() -> { try { log.debug("task2 starting work for round..."); sleep(2); log.debug("task2 finished work, waiting at barrier..."); barrier.await(); log.debug("task2 passed barrier."); } catch (InterruptedException | BrokenBarrierException e) { log.error("Task2 interrupted or barrier broken", e); Thread.currentThread().interrupt(); } });
sleep(3); }
service.shutdown(); }
public static void sleep(double seconds) { try { TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }
|
运行结果分析:
在每一轮中,task1
先完成工作并调用 await()
等待。task2
稍后完成工作并调用 await()
。当 task2
调用 await()
时,满足了屏障所需的2个线程数,此时:
- 屏障动作(打印 “— Both task1 and task2 reached…”)被执行。
task1
和 task2
都从 await()
调用中返回,继续执行后续代码(打印 “passed barrier.”)。
CyclicBarrier
自动重置,可以开始下一轮的等待。 这个过程会重复3次。
总结
Java并发包(JUC)提供了强大的工具来应对并发编程的挑战。
ReentrantReadWriteLock
:适用于读多写少的场景,通过读写分离提高性能。注意锁升级不支持,锁降级支持。
StampedLock
:JDK 8 引入的优化读写锁,提供乐观读模式,在特定场景下性能优于ReentrantReadWriteLock
,但不可重入且使用更复杂。
Semaphore
:控制对有限资源的并发访问数量,常用于限流和资源池管理。
CountDownLatch
:一次性的同步工具,用于等待一组操作完成。
CyclicBarrier
:可重用的同步工具,用于让一组线程在某个点相互等待,达到同步执行的效果。
理解这些工具的特性和适用场景,是编写高效、健壮的并发程序的关键。根据具体的业务需求选择合适的并发工具,能够有效提升系统性能和可靠性。