4 同步工具类
4 同步工具类
4.1 Semaphore
信号量控制,提供了资源数量的并发访问控制,使用代码如下。
// 初始有 10 个共享资源。第 2 个参数是公平或非公平选项,最多只有 10 个线程能获取到,其他线程都会阻塞,指导有线程释放了资源
Semaphore available = new Semaphore(10, true);
// 每次获取一个,如果获取不到,则线程就会阻塞。
available.acquire();
// 用完释放
available.release();
当初始的资源个数为 1 时,Semaphore 会退化为排他锁。
Semaphore 和锁的实现原理基本相同,源码略。
public class Semaphore{
public void acquire() throws InterruptedException {
// 调用 AQS 的方法
sync.acquireSharedInterruptibly(1);
}
public void release() {
// 调用 AQS 的方法
sync.releaseShared(1);
}
}
4.2 CountDownLatch
4.2.1 CountDownLatch 使用场景
使一个线程等待其他线程各自执行完毕后再执行,使用代码如下。
// 主线程等 5 个线程都执行完后开始退出
CountDownLatch doneSignal = new CountDownLatch(5);
// 主线程阻塞住
doneSignal.await();
// 每个子线程执行完减 1,为 0 后,主线程唤醒
doneSignal.countDown();
4.2.2 await() 实现分析
源码如下。
public abstract class AbstractQueuedSynchronizer{
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
}
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int acquires) {
// state 不为 0,就会阻塞
return (getState() == 0) ? 1 : -1;
}
}
public void await() throws InterruptedException {
// 调用 AQS 的方法
sync.acquireSharedInterruptibly(1);
}
}
4.2.3 countDown() 实现分析
源码如下。
public class CountDownLatch {
public void countDown() {
// 调用 AQS 的方法,前面讲过
sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
4.3 CyclicBarrier
4.3.1 CyclicBarrier 使用场景
让一组线程到达一个屏障(也可以叫同步点 Barrier)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。使用代码如下。
// 5 个人面试都到了才能一起面试
CyclicBarrier cb = new CyclicBarrier(5, Runable r);
// 调用 5 次后,线程唤醒
cb.await()
使用示例见 com/study/java/concurrent/chapter04/CyclicBarrierTest.java
。
4.3.2 CyclicBarrier 实现原理
CyclicBarrier 基于 ReentrantLock + Condition 实现。
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
// 用于线程之间互相唤醒
private final Condition trip = lock.newCondition();
// 总线程数
private final int parties;
private int count;
private Generation generation = new Generation();
// 当所有的线程被唤醒时,barrierAction 被执行
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// 略
public int await() throws InterruptedException, BrokenBarrierException {
}
}
4.4 Exchanger
4.4.1 Exchanger 使用场景
线程之间交换数据。使用示例见 com/study/java/concurrent/chapter04/ExchangerTest.java
。
注意两两交换数据不是固定匹配的,按实际执行顺序交换的,但是执行顺序可能是无序的。
4.4.2 Exchanger 实现原理
JDK7 和 JDK8 的实现源码有所不同,这里不例举详细源码了。
4.4.3 exchanger(V x) 实现分析
源码如下。
public class Exchanger<V> {
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
// 2 个线程之间交换数据
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
// 多个线程之间交换数据
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
private final Object slotExchange(Object item, boolean timed, long ns) {
}
private final Object arenaExchange(Object item, boolean timed, long ns) {
}
}
4.5 Phaser
这里不一一例举原理了。
4.5.1 用 Phaser 替代 CyclicBarrier 和 CountDownLatch
使用示例见 com/study/java/concurrent/chapter04/PhaserTest.java
。
awaitAdvance 可以替代 CyclicBarrier,arriveAndAwaitAdvance 可以替代 CyclicBarrier。
4.5.2 Phaser 新特性
- 动态调整线程个数。
- 多个层次 Phaser
4.5.3 state 变量解析
略。
4.5.4 阻塞与唤醒(Treiber Stack)
略。
4.5.5 arrive() 函数分析
略。
4.5.6 awaitAdvance() 函数分析
略。