CountDownLatch,CyclicBarrier和Semaphore都是在多线程编程中,对线程进行管理的对象。
CountDownLatch
使用new CountDownLatch(n)来创建,然后会创建一个值为n的计数器,当子线程完成的时候,调用countDown来减少计数器数值,主线程调用countDownLatch.await()方法,在n不为0 的时候会一直阻塞,直到n=0才会继续进行。阻塞的原理是await的时候会创建一个for循环,直到满足条件,然后会return出来。
private static ExecutorService executorService;
private int count = 0;
static int totalCount = 100000;
private Lock lock = new ReentrantLock();
final CountDownLatch countDownLatch = new CountDownLatch(50); //建立长度为50的countDownLatch
void addCount() {
while (count < totalCount) {
count += 1;
System.out.println(count + "-------" + Thread.currentThread().getName());
}
countDownLatch.countDown(); //countDown减一
}
public Testsynchronized() {
Long startTime = System.currentTimeMillis();
executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 50; i++) {
executorService.execute(() -> addCount());
}
try {
countDownLatch.await(); // 等待计数器的结果变为0
Long stopTime = System.currentTimeMillis();
System.out.println("耗时为" + (stopTime - startTime));
} catch (InterruptedException e) {
System.out.println("捕获到异常");
}
}
上面的代码建立一个计数器,然后等各个线程完成计算之后,主线程再去计算总共的耗时。
源码
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
这是await部分的代码,能够看出通过创建一个for循环,然后通过return跳出。
cyclicBarrier
cyclicBarrier跟countDownLatch的作用相反,countDownLatch相当于是一F1赛车进入检修区一样,等到各种检修都结束了,才能够开走。而cyclicBarrier相当于平常一堆人去吃饭,有的人吃得快有的人吃得慢,吃得快的就会在哪等着,直到所有人都吃完了,然后一起回去。 cyclicBarrier跟countDownLatch都是通过计数器来实现的,当子线程准备好了之后,调用cyclicBarrier.await()进入等待状态,当所有的子线程都进入await()状态之后,栅栏会释放,才会往下进行。这里面也是使用for循环来进行线程的阻塞
final CyclicBarrier cyclicBarrier = new CyclicBarrier(50);
void addCount() {
try {
int waitTime = (int) (Math.random() * 1000);
Thread.sleep(waitTime);
System.out.println("已经就绪" + Thread.currentThread().getName());
cyclicBarrier.await();
System.out.println("开始工作" + Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("sleep异常");
} catch (BrokenBarrierException e) {
System.out.println("遇到异常");
}
}
public Testsynchronized() {
Long startTime = System.currentTimeMillis();
executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 50; i++) {
executorService.execute(() -> addCount());
}
Long stopTime = System.currentTimeMillis();
System.out.println("耗时为" + (stopTime - startTime));
System.out.println("捕获到异常");
}
可以看出等到所有的线程都就绪之后,才开始进行工作。
Semaphore
Semaphore能够限制访问资源的数量,通过新建一个Semaphore对象的时候传入需要限制的数量,然后在线程里面调用countSemaphore.acquire()去获取资源,如果访问资源的数量超过了设定的数量,就会一直被阻塞,直到有线程执行了countSemaphore.release(),释放资源。Semaphore实现了类似的锁的功能,能够起到限流的作用
final static Semaphore countSemaphore = new Semaphore(5);
void addCount() {
try {
int waitTime = (int) (Math.random() * 1000);
countSemaphore.acquire();
System.out.println("获取到资源了" + Thread.currentThread().getName());
Thread.sleep(waitTime);
countSemaphore.release();
System.out.println("释放资源了" + Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println("sleep异常");
}
}
可以看到能够对线程的运行进行了限制,同时只要5个线程在运行addCount的内容,在线程处理结束,执行了release操作之后,其它线程进行acquire,然后开始运行。就像高峰时候的马路一样,我们限制了这段路同时只能承载5辆车,当有车开出去之后,又有新的车开进来。