首页

使用CountDownLatch进行多线程同步示例及源码解读分享

标签:CountDownLatch,源码,解读,实例,多线程,同步,并发,AbstractQueuedSynchronizer,Synchronization     发布时间:2017-06-13   

一、使用场景

1. 从Java的java.util.concurrent.CountDownLatch源码的说明可以了解“当一个事务需要一个和多个线程在执行完另外一个和多个线程执行完成后才开始执行操作”的场景下考虑使用,源码英文如下

/**  A synchronization aid that allows one or more threads to wait until   a set of operations being performed in other threads completes... */

2.一个CountDownLatch对象初始化后为带有指定数字count的同步等待步骤数,线程没执行一次,等待同步次数减一,直到这个CountDownLatch对象等待数减为0后,主线程才开始继续执行后续任务,需要回退其中某一个步骤可以考虑使用java.util.concurrent.CyclicBarrier来实现,源码对于英文如下

/**@b@ * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.@b@ * The {@link #await await} methods block until the current count reaches@b@ * zero due to invocations of the {@link #countDown} method, after which@b@ * all waiting threads are released and any subsequent invocations of@b@ * {@link #await await} return immediately.  This is a one-shot phenomenon@b@ * -- the count cannot be reset.  If you need a version that resets the@b@ * count, consider using a {@link CyclicBarrier}. */

3. 还有其他如下,大概也是上面的意思

 /** @b@ * <p>A {@code CountDownLatch} is a versatile synchronization tool@b@ * and can be used for a number of purposes.  A@b@ * {@code CountDownLatch} initialized with a count of one serves as a@b@ * simple on/off latch, or gate: all threads invoking {@link #await await}@b@ * wait at the gate until it is opened by a thread invoking {@link@b@ * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>@b@ * can be used to make one thread wait until <em>N</em> threads have@b@ * completed some action, or some action has been completed N times.@b@ *@b@ * <p>A useful property of a {@code CountDownLatch} is that it@b@ * doesn't require that threads calling {@code countDown} wait for@b@ * the count to reach zero before proceeding, it simply prevents any@b@ * thread from proceeding past an {@link #await await} until all@b@ * threads could pass.@b@ *@b@ * <p><b>Sample usage:</b> Here is a pair of classes in which a group@b@ * of worker threads use two countdown latches:@b@ * <ul>@b@ * <li>The first is a start signal that prevents any worker from proceeding@b@ * until the driver is ready for them to proceed;@b@ * <li>The second is a completion signal that allows the driver to wait@b@ * until all workers have completed.@b@ * </ul> */

4.源码中给出Driver和Worker的示例如下,二中会用运动员来代替Worker角色,用实际运动会组织者代替Driver来具体说明

 /** <ul>@b@ * <li>The first is a start signal that prevents any worker from proceeding@b@ * until the driver is ready for them to proceed;@b@ * <li>The second is a completion signal that allows the driver to wait@b@ * until all workers have completed.@b@ * </ul>@b@ *@b@ * <pre>@b@ * class Driver { // ...@b@ *   void main() throws InterruptedException {@b@ *     CountDownLatch startSignal = new CountDownLatch(1);@b@ *     CountDownLatch doneSignal = new CountDownLatch(N);@b@ *@b@ *     for (int i = 0; i < N; ++i) // create and start threads@b@ *       new Thread(new Worker(startSignal, doneSignal)).start();@b@ *@b@ *     doSomethingElse();            // don't let run yet@b@ *     startSignal.countDown();      // let all threads proceed@b@ *     doSomethingElse();@b@ *     doneSignal.await();           // wait for all to finish@b@ *   }@b@ * }@b@ *@b@ * class Worker implements Runnable {@b@ *   private final CountDownLatch startSignal;@b@ *   private final CountDownLatch doneSignal;@b@ *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {@b@ *      this.startSignal = startSignal;@b@ *      this.doneSignal = doneSignal;@b@ *   }@b@ *   public void run() {@b@ *      try {@b@ *        startSignal.await();@b@ *        doWork();@b@ *        doneSignal.countDown();@b@ *      } catch (InterruptedException ex) {} // return;@b@ *   }@b@ *@b@ *   void doWork() { ... }@b@ * }@b@ *@b@ * </pre>@b@ *@b@ * <p>Another typical usage would be to divide a problem into N parts,@b@ * describe each part with a Runnable that executes that portion and@b@ * counts down on the latch, and queue all the Runnables to an@b@ * Executor.  When all sub-parts are complete, the coordinating thread@b@ * will be able to pass through await. (When threads must repeatedly@b@ * count down in this way, instead use a {@link CyclicBarrier}.)@b@ *@b@ * <pre>@b@ * class Driver2 { // ...@b@ *   void main() throws InterruptedException {@b@ *     CountDownLatch doneSignal = new CountDownLatch(N);@b@ *     Executor e = ...@b@ *@b@ *     for (int i = 0; i < N; ++i) // create and start threads@b@ *       e.execute(new WorkerRunnable(doneSignal, i));@b@ *@b@ *     doneSignal.await();           // wait for all to finish@b@ *   }@b@ * }@b@ *@b@ * class WorkerRunnable implements Runnable {@b@ *   private final CountDownLatch doneSignal;@b@ *   private final int i;@b@ *   WorkerRunnable(CountDownLatch doneSignal, int i) {@b@ *      this.doneSignal = doneSignal;@b@ *      this.i = i;@b@ *   }@b@ *   public void run() {@b@ *      try {@b@ *        doWork(i);@b@ *        doneSignal.countDown();@b@ *      } catch (InterruptedException ex) {} // return;@b@ *   }@b@ *@b@ *   void doWork() { ... }@b@ * }@b@ *@b@ * </pre>@b@ *@b@ * <p>Memory consistency effects: Actions in a thread prior to calling@b@ * {@code countDown()}@b@ * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>@b@ * actions following a successful return from a corresponding@b@ * {@code await()} in another thread.@b@ *@b@ * @since 1.5@b@ * @author Doug Lea@b@ */

二、代码示例

1. 运动会组织者Driver代码如下

import java.util.concurrent.CountDownLatch;@b@@b@public class Driver {@b@	@b@	private static final int startSignalStep=1;@b@	public static final int doneSignalStep=10;@b@ @b@	public static void main(String[] args)  throws InterruptedException{@b@		@b@		CountDownLatch startSignal = new CountDownLatch(startSignalStep);@b@		CountDownLatch doneSignal = new CountDownLatch(doneSignalStep);@b@		 @b@		for (int i = 0; i < doneSignalStep; ++i){ // create and start threads@b@			new Thread(new Worker(startSignal, doneSignal)).start();@b@		}@b@		 @b@		      System.out.println("10个运动员都已经上场了(10个线程对象注入)");           // don't let run yet@b@		      startSignal.countDown();      // let all threads proceed@b@		      System.out.println("各就各位,预备...");@b@		      doneSignal.await();           // wait for all to finish@b@		      System.out.println("开始给前三名颁奖...");   @b@@b@	}@b@@b@}

2. 运动员Worker代码如下

import java.util.concurrent.CountDownLatch;@b@@b@public class Worker implements Runnable {@b@	@b@	 private final CountDownLatch startSignal;@b@	 private final CountDownLatch doneSignal;@b@	 @b@	 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {@b@		 this.startSignal = startSignal;@b@		 this.doneSignal = doneSignal;@b@	 }@b@	 @b@	 public void run() {@b@		 try {@b@	         startSignal.await();@b@	         doWork();@b@	         doneSignal.countDown();@b@	         System.out.println("第"+Thread.currentThread().getId()+"号运动员是第"+(Driver.doneSignalStep-doneSignal.getCount())+"名次");@b@	       } catch (InterruptedException ex) {} // return;@b@	 }@b@	 @b@	 void doWork() {@b@		 System.out.println("第"+Thread.currentThread().getId()+"号运动员开始使劲跑 ...");@b@	 }@b@@b@}

运行结果如下

10个运动员都已经上场了(10个线程对象注入)@b@各就各位,预备...@b@第9号运动员开始使劲跑 ...@b@第12号运动员开始使劲跑 ...@b@第13号运动员开始使劲跑 ...@b@第10号运动员开始使劲跑 ...@b@第11号运动员开始使劲跑 ...@b@第10号运动员是第4名次@b@第13号运动员是第3名次@b@第14号运动员开始使劲跑 ...@b@第12号运动员是第2名次@b@第9号运动员是第1名次@b@第15号运动员开始使劲跑 ...@b@第15号运动员是第7名次@b@第14号运动员是第6名次@b@第11号运动员是第5名次@b@第16号运动员开始使劲跑 ...@b@第16号运动员是第8名次@b@第18号运动员开始使劲跑 ...@b@第18号运动员是第9名次@b@第17号运动员开始使劲跑 ...@b@第17号运动员是第10名次@b@开始给前三名颁奖...