- CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
CountDownLatch的构造函数接受一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N编程零。此外还有一个带指定时间的await方法,这个方法等待指定时间后,就不会再阻塞当前线程。
package test;public class ThreadJoinTest { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable(){ public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("parser1 finish"); } }); Thread parser2 = new Thread(new Runnable(){ public void run() { System.out.println("parser2 finish"); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all finished"); }}---------parser2 finishparser1 finishall finishedpackage test;import java.util.concurrent.CountDownLatch;public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable(){ public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("parser1 finish"); c.countDown(); } }).start(); new Thread(new Runnable(){ public void run() { System.out.println("parser2 finish"); c.countDown(); } }).start(); c.await(); System.out.println("all finished"); }}---------parser2 finishparser1 finishall finished
- CyclicBarrier CyclicBarrier让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续运行。 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达了屏障,然后当前线程被阻塞。 CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties , Runnable barrierAction),用于在线程到达屏障是,优先执行barrierAction,方便处理更复杂的业务。
package test;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String args[]) throws InterruptedException, BrokenBarrierException{ new Thread(new Runnable(){ public void run() { System.out.println("begin"); try { Thread.sleep(1000); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("end"); } }).start(); System.out.println("current thread waiting.."); c.await(); System.out.println("all arrived"); }}-------current thread waiting..beginendall arrived
package test;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2,new A()); public static void main(String args[]) throws InterruptedException, BrokenBarrierException{ new Thread(new Runnable(){ public void run() { System.out.println("begin"); try { Thread.sleep(1000); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("end"); } }).start(); System.out.println("current thread waiting.."); c.await(); System.out.println("all arrived"); } static class A implements Runnable{ public void run() { System.out.println("Thread A Running.."); } }}----------current thread waiting..beginThread A Running..endall arrived
- CyclicBarrier和CountDownLatch的区别 CountDownLatch的计数只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所以CyclicBarrier能处理更复杂的业务场景。 CyclicBarrier还提供其他的方法,如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断。
- Semaphore Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理的使用公共资源。 Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。
package test;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class SemaphoreTest { private static final int THREAD_COUNT = 10; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(5); public static void main(String[] args) throws InterruptedException { while(true){ threadPool.execute(new Runnable(){ public void run() { try { s.acquire(); System.out.println(Thread.currentThread().getName()+": save data..."); Thread.sleep(5000); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("availablePermits: "+s.availablePermits()); Thread.sleep(500); } //threadPool.shutdown(); }}----------------------------availablePermits: 5pool-1-thread-1: save data...availablePermits: 4pool-1-thread-2: save data...availablePermits: 3pool-1-thread-3: save data...availablePermits: 2pool-1-thread-4: save data...availablePermits: 1pool-1-thread-5: save data...availablePermits: 0availablePermits: 0availablePermits: 0availablePermits: 0availablePermits: 0pool-1-thread-6: save data...availablePermits: 0pool-1-thread-7: save data...availablePermits: 0pool-1-thread-8: save data...availablePermits: 0pool-1-thread-9: save data...availablePermits: 0pool-1-thread-10: save data...availablePermits: 0availablePermits: 0availablePermits: 0availablePermits: 0availablePermits: 0availablePermits: 0pool-1-thread-1: save data...availablePermits: 0pool-1-thread-7: save data...availablePermits: 0pool-1-thread-2: save data...availablePermits: 0pool-1-thread-9: save data...availablePermits: 0pool-1-thread-3: save data...availablePermits: 0availablePermits: 0availablePermits: 0
在代码中,虽然有10个线程在执行,但是只允许5个并发执行。