大家好!我是小蔡!
CountDownLatch有何作用?
-
我们知道AQS是专属于构造锁和同步器的一个抽象工具类,基于它Java构造出了大量的常用同步工具,如
ReentrantLock
、Semaphore
、ReentrantReadWriteLock
、SynchronousQueue
等等,我们今天的主角CountDownLatch
同样如此。 -
CountDownLatch(倒时器)
允许N个线程阻塞在同一个地方,直至所有线程的任务都执行完毕 。CountDownLatch 有一个 计数器 ,可以通过countDown()
方法对计数器的数目进行减一
操作,也可以通过await()方法来阻塞当前线程
,直到计数器的值为0
。
CountDownLatch(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)
- 其中构造参数用来初始化等待计数值
- await() 用来等待计数归零
- countDown() 用来让计数减一
CountDownLatch的底层原理
【源码解析1】
//几乎所有基于AQS构造的同步类,内部都需要一个静态内部类去继承AQS
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
}
private final Sync sync;
//构造方法中初始化count值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
几乎所有基于AQS构造的同步类,内部都需要一个静态内部类
去继承AQS
,并 实现其提供的钩子方法 ,通过封装AQS中的state为count来确定多个线程的计时器
【源码解析2】(countDown()方法)
//核心方法,内部封装了共享模式下的线程释放
public void countDown() {
//内部类Sync,继承了AQS
sync.releaseShared(1);
}
//AQS内部的实现
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
在CountDownLatch中通过countDown来减少倒计时数
,这是最重要的一个方法,我们继续跟进源码看到它通过releaseShared()方法去释放锁
,这个方法是AQS
内部的默认实现方法,而在这个方法中再一次的调用了tryReleaseShared(arg)
,这是一个AQS的钩子方法
,方法内部仅有默认的异常处理,真正的实现由CountDownLatch内部类Sync完成
【源码解析3】
// 对 state 进行递减,直到 state 变成 0;
// 只有 count 递减到 0 时,countDown 才会返回 true
protected boolean tryReleaseShared(int releases) {
// 自选检查 state 是否为 0
for (;;) {
int c = getState();
// 如果 state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
// CAS 操作更新 state 的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
await()方法
除了countDown()
方法外,在CountDownLatch中还有一个重要方法就是 await ,在多线程环境下,线程的执行顺序并不一致,因此,对于一个倒时器也说,先开始的线程应该阻塞等待直至最后一个线程执行完成,而实现这一效果的就是await()
方法!
【源码解析4】
// 等待(也可以叫做加锁)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
其中await()方法可以配置带有时间参数的,表示最大阻塞时间,当调用 await()
的时候,如果 state 不为 0
,那就证明任务还没有执行完毕,await()
就会一直阻塞
,也就是说 await() 之后的语句不会被执行。然后, CountDownLatch 会自旋 CAS 判断 state是否等于0,若是就会释放所有等待的线程,await() 方法之后的语句得到执行
CountDownLatch的小Demo
【代码样例1】
public class Test {
public static void main(String[] args) throws InterruptedException {
// 创建一个倒计数为 3 的 CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
Thread service1 = new Thread(new Service("3", 1000, latch));
Thread service2 = new Thread(new Service("2", 2000, latch));
Thread service3 = new Thread(new Service("1", 3000, latch));
service1.start();
service2.start();
service3.start();
// 等待所有服务初始化完成
latch.await();
System.out.println("发射");
}
static class Service implements Runnable {
private final String name;
private final int timeToStart;
private final CountDownLatch latch;
public Service(String name, int timeToStart, CountDownLatch latch) {
this.name = name;
this.timeToStart = timeToStart;
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(timeToStart);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name);
// 减少倒计数
latch.countDown();
}
}
}
输出:
3
2
1
发射
- CountDownLatch还有另外一个使用场景:实现多个线程开始执行任务的最大并行性, 多个线程在某一时刻同时开始执行 。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。
- 具体做法是: 初始化一个共享的 CountDownLatch 对象,将其计数器初始化为
1 (new CountDownLatch(1))
,多个线程在开始执行任务前首先coundownlatch.await()
,当主线程调用countDown()
时, 计数器变为 0,多个线程同时被唤醒。
【代码样例2】
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println("5位运动员就位!");
//等待发令枪响
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "起跑!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 裁判准备发令
Thread.sleep(2000);
//发令枪响
countDownLatch.countDown();
}
}
输出:
5位运动员就位!
5位运动员就位!
5位运动员就位!
5位运动员就位!
5位运动员就位!
Thread-0起跑!
Thread-3起跑!
Thread-4起跑!
Thread-1起跑!
Thread-2起跑!
CountDownLatch (结合CompleteableFuture并发计算)实战
场景:在我的项目一 Online微课中,用户使用优惠卷购买课程;这其中涉及到复杂的优惠算法,例如:先进行优惠卷全排列,然后依次匹配我们购物车中的课程价格,满足条件时预使用优惠卷,然后判断优惠后的价格是否符合剩下的课程(其中包括:满减、每满减、打几折)并且有些优惠卷还有对应的使用哪些的课程范围;每种排序用卷的先后都会导致最终的价格不一样。
可以发现,上节的优惠券算法还是比较复杂的。而且由于优惠方案很多,目前我们此案有的是for循环逐个方案串行计算,整体性能可想而知。
我们要做的就是计算出所有优惠卷的组合排列的优惠方案供用户使用;所以,为了提高计算效率,我们可以利用多线程并行计算。具体步骤如下:
- 定义一个线程池
- for循环将每个方案交给一个线程去任务执行
- 等待所有任务计算完毕,返回结果
这里的难点有两个:
1)线程任务是带返回值的任务
2)虽然是多线程运行,但是我们要等待所有线程都执行完毕后才返回结果
针对第二个点,我们可以利用JUC包提供的工具CountDownLatch
来实现。
针对第一个点,我们则需要利用一个JDK1.8的新工具:CompletableFuture
来实现
【代码1:定义线程池】
@Slf4j
@Configuration
public class PromotionConfig {
@Bean
public Executor discountSolutionExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 1.核心线程池大小
executor.setCorePoolSize(12);
// 2.最大线程池大小
executor.setMaxPoolSize(12);
// 3.队列大小
executor.setQueueCapacity(99999);
// 4.线程名称
executor.setThreadNamePrefix("discount-solution-calculator-");
// 5.拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
【代码2:countdownlatch】
@Slf4j
@Service
@RequiredArgsConstructor
public class DiscountServiceImpl implements IDiscountService {
private final UserCouponMapper userCouponMapper;
private final ICouponScopeService scopeService;
private final Executor discountSolutionExecutor;
@Override
public List<CouponDiscountDTO> findDiscountSolution(List<OrderCourseDTO> orderCourses) {
// 1.查询我的所有可用优惠券
List<Coupon> coupons = userCouponMapper.queryMyCoupons(UserContext.getUser());
if (CollUtils.isEmpty(coupons)) {
return CollUtils.emptyList();
}
// 2.初筛
// 2.1.计算订单总价
int totalAmount = orderCourses.stream().mapToInt(OrderCourseDTO::getPrice).sum();
// 2.2.筛选可用券
List<Coupon> availableCoupons = coupons.stream()
.filter(c -> DiscountStrategy.getDiscount(c.getDiscountType()).canUse(totalAmount, c))
.collect(Collectors.toList());
if (CollUtils.isEmpty(availableCoupons)) {
return CollUtils.emptyList();
}
// 3.排列组合出所有方案
// 3.1.细筛(找出每一个优惠券的可用的课程,判断课程总价是否达到优惠券的使用需求)
Map<Coupon, List<OrderCourseDTO>> availableCouponMap = findAvailableCoupon(availableCoupons, orderCourses);
if (CollUtils.isEmpty(availableCouponMap)) {
return CollUtils.emptyList();
}
// 3.2.排列组合
availableCoupons = new ArrayList<>(availableCouponMap.keySet());
List<List<Coupon>> solutions = PermuteUtil.permute(availableCoupons);
// 3.3.添加单券的方案
for (Coupon c : availableCoupons) {
solutions.add(List.of(c));
}
// 4.计算方案的优惠明细
List<CouponDiscountDTO> list = Collections.synchronizedList(new ArrayList<>(solutions.size()));
// 4.1.定义闭锁
CountDownLatch latch = new CountDownLatch(solutions.size());
for (List<Coupon> solution : solutions) {
// 4.2.异步计算
CompletableFuture
.supplyAsync(
() -> calculateSolutionDiscount(availableCouponMap, orderCourses, solution),
discountSolutionExecutor
).thenAccept(dto -> {
// 4.3.提交任务结果
list.add(dto);
latch.countDown();
});
}
// 4.4.等待运算结束
try {
latch.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("优惠方案计算被中断,{}", e.getMessage());
}**
// TODO 5.筛选最优解
return null;
}
// ... 略
}
定义一个countdownlatch
闭锁,大小为优惠卷方案的总数
CountDownLatch latch = new CountDownLatch(solutions.size());
提交任务到线程池,方案采用异步。计算出来(thenAccept
)后收集起来,调用countDown()
减少一个计数,减到0就全部完成了
for (List<Coupon> solution : solutions) {
// 4.2.异步计算
CompletableFuture
.supplyAsync(
() -> calculateSolutionDiscount(availableCouponMap, orderCourses, solution),
discountSolutionExecutor
).thenAccept(dto -> {
// 4.3.提交任务结果
list.add(dto);
latch.countDown();
});
}
优点
-
简化了线程间的通信和同步。在某些并发场景中,需要 等待其他线程完成任务后才能继续执行,使用 CountDownLatch 可以简化这种操作,而不需要复杂的锁和等待/通知机制。
-
提高性能。由于 CountDownLatch 可以让线程在完成任务后立即递减计数值,而不需要等待其他线程完成任务,因此可以 减少阻塞,提高程序运行性能。
-
支持灵活的计数。可以通过创建不同的 CountDownLatch 实例,实现对多个线程任务计数
缺点
-
单次使用。CountDownLatch 的计数值无法重置。一旦计数值到达零,它就不能再被使用了。在需要重复使用的场景中,可以选用 CyclicBarrier 或 Semaphore。
-
没有返回值。
CountDownLatch
无法获得执行任务的线程所返回的结果。如果需要收集线程执行结果,可以考虑使用 java.util.concurrent.Future 和 java.util.concurrent.ExecutorService
使用场景
-
启动多个线程执行并行任务,主线程等待所有并行任务完成后继续执行 。例如:在测试中,准备数据阶段,需要同时查询多个子系统的数据和处理,等待处理结束后再进行下一步操作
-
控制 线程的执行顺序 。一个线程需要等待其他线程的结果或者完成任务后才能继续执行。例如:一个文件解压缩程序,首先需要下载文件,下载完成后解压文件
-
实现一个计数器,允许一个或多个线程等待直到计数器为0。这对于在系统初始化时,需要等待 资源加载或者初始化的场景 十分有用。例如:等待加 载配置文件、启动连接池 等操作完成后才开始处理其他任务
大唐贞观 2024-08-01 21:01 回复 取消回复
跪求大佬总结,cyclebarrier samphere这些