4 同步工具类

felix.shao2025-02-16

4 同步工具类

4.1 Semaphore

 信号量控制,提供了资源数量的并发访问控制,使用代码如下。

// 初始有 10 个共享资源。第 2 个参数是公平或非公平选项,最多只有 10 个线程能获取到,其他线程都会阻塞,指导有线程释放了资源
Semaphore available = new Semaphore(10, true);

// 每次获取一个,如果获取不到,则线程就会阻塞。
available.acquire();

// 用完释放
available.release();

 当初始的资源个数为 1 时,Semaphore 会退化为排他锁。
 Semaphore 和锁的实现原理基本相同,源码略。

public class Semaphore{
    public void acquire() throws InterruptedException {
        // 调用 AQS 的方法
        sync.acquireSharedInterruptibly(1);
    }

    public void release() {
        // 调用 AQS 的方法
        sync.releaseShared(1);
    }
}

4.2 CountDownLatch

4.2.1 CountDownLatch 使用场景

 使一个线程等待其他线程各自执行完毕后再执行,使用代码如下。

// 主线程等 5 个线程都执行完后开始退出
CountDownLatch doneSignal = new CountDownLatch(5);
// 主线程阻塞住
doneSignal.await();
// 每个子线程执行完减 1,为 0 后,主线程唤醒
doneSignal.countDown();

4.2.2 await() 实现分析

 源码如下。

public abstract class AbstractQueuedSynchronizer{
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
}

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int acquires) {
            // state 不为 0,就会阻塞
            return (getState() == 0) ? 1 : -1;
        }
    }
    public void await() throws InterruptedException {
        // 调用 AQS 的方法
        sync.acquireSharedInterruptibly(1);
    }
}

4.2.3 countDown() 实现分析

 源码如下。

public class CountDownLatch {
    public void countDown() {
        // 调用 AQS 的方法,前面讲过
        sync.releaseShared(1);
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

4.3 CyclicBarrier

4.3.1 CyclicBarrier 使用场景

 让一组线程到达一个屏障(也可以叫同步点 Barrier)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。使用代码如下。

// 5 个人面试都到了才能一起面试
CyclicBarrier cb = new CyclicBarrier(5, Runable r);
// 调用 5 次后,线程唤醒
cb.await()

 使用示例见 com/study/java/concurrent/chapter04/CyclicBarrierTest.java

4.3.2 CyclicBarrier 实现原理

 CyclicBarrier 基于 ReentrantLock + Condition 实现。

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    // 用于线程之间互相唤醒
    private final Condition trip = lock.newCondition();
    // 总线程数
    private final int parties;
    private int count;
    private Generation generation = new Generation();

    // 当所有的线程被唤醒时,barrierAction 被执行
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    // 略
    public int await() throws InterruptedException, BrokenBarrierException {
    }
}

4.4 Exchanger

4.4.1 Exchanger 使用场景

 线程之间交换数据。使用示例见 com/study/java/concurrent/chapter04/ExchangerTest.java
 注意两两交换数据不是固定匹配的,按实际执行顺序交换的,但是执行顺序可能是无序的。

4.4.2 Exchanger 实现原理

 JDK7 和 JDK8 的实现源码有所不同,这里不例举详细源码了。

4.4.3 exchanger(V x) 实现分析

 源码如下。

public class Exchanger<V> {
    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
            // 2 个线程之间交换数据
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
            // 多个线程之间交换数据
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    private final Object slotExchange(Object item, boolean timed, long ns) {
    }

    private final Object arenaExchange(Object item, boolean timed, long ns) {

    }
}

4.5 Phaser

 这里不一一例举原理了。

4.5.1 用 Phaser 替代 CyclicBarrier 和 CountDownLatch

 使用示例见 com/study/java/concurrent/chapter04/PhaserTest.java
 awaitAdvance 可以替代 CyclicBarrier,arriveAndAwaitAdvance 可以替代 CyclicBarrier。

4.5.2 Phaser 新特性

  1. 动态调整线程个数。
  2. 多个层次 Phaser

4.5.3 state 变量解析

 略。

4.5.4 阻塞与唤醒(Treiber Stack)

 略。

4.5.5 arrive() 函数分析

 略。

4.5.6 awaitAdvance() 函数分析

 略。

Last Updated 2/16/2025, 4:13:06 PM