简单示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
   | class CountDownLatchTest {
      private static final CountDownLatch startSignal = new CountDownLatch(1);     private static final CountDownLatch doneSignal = new CountDownLatch(10);     private static final Random random = new Random();
      public static void main(String[] args) throws InterruptedException {         for (int i = 0; i < 10; i++) {             new Thread(new Worker(i + 1)).start();         }                  TimeUnit.SECONDS.sleep(1);                  startSignal.countDown();                  doneSignal.await();         System.out.println("Done");     }
 
      private static class Worker implements Runnable {         private final int i;
          public Worker(int i) {             this.i = i;         }
          @Override         public void run() {                          try {                 startSignal.await();             } catch (InterruptedException e) {                 e.printStackTrace();             }             long start = System.currentTimeMillis();             System.out.println("Worker-" + i + " started");                          try {                 TimeUnit.SECONDS.sleep(random.nextInt(5) + 1);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("Worker-" + i + " done! cost " + (System.currentTimeMillis() - start) + "ms");             doneSignal.countDown();         }     } }
  | 
 
这里通过两个 CountDownLatch 实现了多线程的同步启动以及完全结束的控制
这里再次注意:初始化线程时,不要用 Worker::new
new Thread(Worker::new).start 等同于:
1 2 3 4 5 6
   | new Thread(new Runnable() {     @Override     public void run() {         new Worker();     } }).start();
  | 
 
源码分析
相较于 ReentranLock 和 ReentranReadWriteLock,CountDownLatch 的实现算是「十分简单啦」
CountDownLatch 中只有一个变量
和 ReentrantLock 它们类似,这个 sync 也是一个继承了 AQS 的内部类
构造函数中会初始化这个 sync
1 2 3 4
   | public CountDownLatch(int count) {     if (count < 0) throw new IllegalArgumentException("count < 0");     this.sync = new Sync(count); }
  | 
 
先看下 CountDownLatch 对外暴露的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   |  public void await() throws InterruptedException {     sync.acquireSharedInterruptibly(1); }
 
  public boolean await(long timeout, TimeUnit unit)     throws InterruptedException {     return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
 
  public void countDown() {     sync.releaseShared(1); }
 
  public long getCount() {     return sync.getCount(); }
  public String toString() {     return super.toString() + "[Count = " + sync.getCount() + "]"; }
 
  | 
 
所有的实现都是通过 Sync 实现的,所以直接看 Sync 就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
   | private static final class Sync extends AbstractQueuedSynchronizer {     private static final long serialVersionUID = 4982264981922014374L;
           Sync(int count) {                  setState(count);     }
      int getCount() {         return getState();     }
                protected int tryAcquireShared(int acquires) {         return (getState() == 0) ? 1 : -1;     }
                protected boolean tryReleaseShared(int releases) {         for (;;) {                          int c = getState();                          if (c == 0)                 return false;                          int nextc = c - 1;                          if (compareAndSetState(c, nextc))                                  return nextc == 0;         }     } }
  | 
 
这个 Sync 内部类的逻辑也是很简单的,整体 CountDownLatch 的实现还需结合 AQS 来看
初始化
首先在 CountDownLatch 的构造函数中初始化 Sync
1 2 3 4
   | public CountDownLatch(int count) {     if (count < 0) throw new IllegalArgumentException("count < 0");     this.sync = new Sync(count); }
  | 
 
然后在内部类 Sync 的构造函数中初始化 AQS 中的状态
1 2 3 4
   |  Sync(int count) {     setState(count); }
 
  | 
 
通过给 state 设置值,也是设置了 Sync 共享锁的重入次数(计数器次数)
await
CountDownLatch 中调用 await 方法
1 2 3
   | public void await() throws InterruptedException {     sync.acquireSharedInterruptibly(1); }
  | 
 
这个 acquireSharedInterruptibly 方法并未在 CountDownLatch.Sync 中重写,直接使用的是 AQS 的
1 2 3 4 5 6 7 8 9 10 11 12 13
   | 
  public final void acquireSharedInterruptibly(int arg)     throws InterruptedException {          if (Thread.interrupted())         throw new InterruptedException();               if (tryAcquireShared(arg) < 0)                  doAcquireSharedInterruptibly(arg); }
 
  | 
 
countDown
CountDownLatch 中调用 countDown 方法
1 2 3 4
   | public void countDown() {          sync.releaseShared(1); }
  | 
 
进入到 AQS 的 releaseShared 方法
1 2 3 4 5 6 7 8 9 10
   | public final boolean releaseShared(int arg) {               if (tryReleaseShared(arg)) {                  doReleaseShared();         return true;     }     return false; }
  |