IOTXING

记录技术学习之路

0%

Java 多线程(3)并发管理

CountDownLatch,CyclicBarrier和Semaphore都是在多线程编程中,对线程进行管理的对象。

CountDownLatch

使用new CountDownLatch(n)来创建,然后会创建一个值为n的计数器,当子线程完成的时候,调用countDown来减少计数器数值,主线程调用countDownLatch.await()方法,在n不为0 的时候会一直阻塞,直到n=0才会继续进行。阻塞的原理是await的时候会创建一个for循环,直到满足条件,然后会return出来。

private static ExecutorService executorService;
private int count = 0;
static int totalCount = 100000;
private Lock lock = new ReentrantLock();
final CountDownLatch countDownLatch = new CountDownLatch(50); //建立长度为50的countDownLatch

void addCount() {
    while (count < totalCount) {
        count += 1;
        System.out.println(count + "-------" + Thread.currentThread().getName());
    }
    countDownLatch.countDown(); //countDown减一
}


public Testsynchronized() {
    Long startTime = System.currentTimeMillis();
    executorService = Executors.newFixedThreadPool(50);
    for (int i = 0; i < 50; i++) {
        executorService.execute(() -> addCount());
    }

    try {
        countDownLatch.await(); // 等待计数器的结果变为0
        Long stopTime = System.currentTimeMillis();
        System.out.println("耗时为" + (stopTime - startTime));
    } catch (InterruptedException e) {
        System.out.println("捕获到异常");
    }

}

上面的代码建立一个计数器,然后等各个线程完成计算之后,主线程再去计算总共的耗时。

源码

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

这是await部分的代码,能够看出通过创建一个for循环,然后通过return跳出。

cyclicBarrier

cyclicBarrier跟countDownLatch的作用相反,countDownLatch相当于是一F1赛车进入检修区一样,等到各种检修都结束了,才能够开走。而cyclicBarrier相当于平常一堆人去吃饭,有的人吃得快有的人吃得慢,吃得快的就会在哪等着,直到所有人都吃完了,然后一起回去。 cyclicBarrier跟countDownLatch都是通过计数器来实现的,当子线程准备好了之后,调用cyclicBarrier.await()进入等待状态,当所有的子线程都进入await()状态之后,栅栏会释放,才会往下进行。这里面也是使用for循环来进行线程的阻塞

final CyclicBarrier cyclicBarrier = new CyclicBarrier(50);

void addCount() {
    try {
        int waitTime = (int) (Math.random() * 1000);
        Thread.sleep(waitTime);
        System.out.println("已经就绪" + Thread.currentThread().getName());
        cyclicBarrier.await();
        System.out.println("开始工作" + Thread.currentThread().getName());
    } catch (InterruptedException e) {
        System.out.println("sleep异常");
    } catch (BrokenBarrierException e) {
        System.out.println("遇到异常");
    }
}


public Testsynchronized() {
    Long startTime = System.currentTimeMillis();
    executorService = Executors.newFixedThreadPool(50);
    for (int i = 0; i < 50; i++) {
        executorService.execute(() -> addCount());
    }

    Long stopTime = System.currentTimeMillis();
    System.out.println("耗时为" + (stopTime - startTime));
    System.out.println("捕获到异常");

}

可以看出等到所有的线程都就绪之后,才开始进行工作。

Semaphore

Semaphore能够限制访问资源的数量,通过新建一个Semaphore对象的时候传入需要限制的数量,然后在线程里面调用countSemaphore.acquire()去获取资源,如果访问资源的数量超过了设定的数量,就会一直被阻塞,直到有线程执行了countSemaphore.release(),释放资源。Semaphore实现了类似的锁的功能,能够起到限流的作用

final static Semaphore countSemaphore = new Semaphore(5);

void addCount() {
    try {
        int waitTime = (int) (Math.random() * 1000);

        countSemaphore.acquire();
        System.out.println("获取到资源了" + Thread.currentThread().getName());
        Thread.sleep(waitTime);
        countSemaphore.release();
        System.out.println("释放资源了" + Thread.currentThread().getName());
    } catch (InterruptedException e) {
        System.out.println("sleep异常");
    }
}

可以看到能够对线程的运行进行了限制,同时只要5个线程在运行addCount的内容,在线程处理结束,执行了release操作之后,其它线程进行acquire,然后开始运行。就像高峰时候的马路一样,我们限制了这段路同时只能承载5辆车,当有车开出去之后,又有新的车开进来。