小蔡学Java

项目一总结:(四)合并写请求提高并发量

2024-01-11 18:49 1708 0 项目 高并发RedisRedis优化合并写请求

前言

在Online学习平台中,当用户观看视频时,用户离开再次观看时我们如何对上次的播放进度进行保存完成续播呢?接下来就由我来揭开这神秘的面纱

我参与了整个学习中心的功能开发,其中有很多的学习辅助功能都很有特色。比如视频播放的进度记录。我们网站的课程是以录播视频为主,为了提高用户的学习体验,需要实现视频续播功能。这个功能本身并不复杂,只不过有如下要求:

  • 首先续播时间误差要控制在30秒以内。
  • 而且要做到用户突然断开,甚至切换设备后,都可以继续上一次播放

要达成这个目的,使用传统的手段显然是不行的。

首先,要做到切换设备后还能续播,用户的播放进度必须保存在服务端,而不是客户端。 其次,用户突然断开或者切换设备,续播的时间误差不能超过30秒,那播放进度的记录频率就需要比较高。我们会在前端每隔15秒就发起一次心跳请求,提交最新的播放进度,记录到服务端。这样用户下一次续播时直接读取服务端的播放进度,就可以将时间误差控制在15秒左右。

业务

提交学习记录

学习记录就是用户当前学了哪些小节,以及学习到该小节的进度如何。而小节类型分为考试、视频两种。

  • 考试比较简单,只要提交了就说明这一节学完了。

  • 视频比较麻烦,需要记录用户的播放进度,进度超过50%才算学完。因此视频播放的过程中需要不断提交播放进度到服务端,而服务端则需要保存学习记录到数据库

只要记录了用户学过的每一个小节,以及小节对应的学习进度、是否学完。无论是视频续播、还是统计学习计划进度,都可以轻松实现了。

因此,提交学习记录就是提交小节的信息和小节的学习进度信息。考试提交一次即可,视频则是播放中频繁提交。提交的信息包括两大部分:

  • 小节的基本信息
    • 小节id
    • lessonId
    • 小节类型:可能是视频,也可能是考试。考试无需提供播放进度信息
    • 提交时间
  • 播放进度信息
    • 视频时长:时长结合播放进度可以判断有没有超过50%
    • 视频播放进度:也就是第几秒

数据库设计

learning_lesson表

按照之前的分析,用户学习的课程包含多个小节,小节的类型包含两种:

  • 视频:视频播放进度超过50%就算当节学完
  • 考试:考完就算一节学完 学习进度除了要记录哪些小节学完,还要记录学过的小节、每小节的播放的进度(方便续播)。因此,需要记录的数据就包含以下部分:
  • 学过的小节的基础信息
    • 小节id
    • 小节对应的lessonId
    • 用户id:学习课程的人
  • 小节的播放进度信息
    • 视频播放进度:也就是播放到了第几秒
    • 是否已经学完:播放进度有没有超过50%
    • 第一次学完的时间:用户可能重复学习,第一次从未学完到学完的时间要记录下来

再加上一些表基础字段,整张表结构就出来了: learning_record表

这些字段是整个课程的进度统计(易错点):

  • learned_sections:已学习小节数量

  • latest_section_id:最近一次学习的小节id

  • latest_learn_time:最近一次学习时间

  • 每当有一个小节被学习,都应该更新latest_section_idlatest_learn_time;每当有一个小节学习完后,learned_sections都应该累加1。不过这里有一点容易出错的地方:

  • 考试只会被参加一次,考试提交则小节学完,learned_sections累加1

  • 视频可以被重复播放,只有在第一次学完一个视频时,learned_sections才需要累加1

  • 随着learned_sections字段不断累加,最终会到达课程的最大小节数,这就意味着当前课程被全部学完了。那么课程状态需要从“学习中”变更为“已学完”

那么问题来了,如何判断视频是否是第一次学完?我认为应该同时满足两个条件:

  • 视频播放进度超过50%
  • 之前学习记录的状态为未学完

优化前面对的问题

综合上述设计出如下流程

核心代码

/**
 * <p>
 * 学习记录表 服务实现类
 * </p>
 */
@Service
@RequiredArgsConstructor
public class LearningRecordServiceImpl extends ServiceImpl<LearningRecordMapper, LearningRecord> implements ILearningRecordService {

    private final ILearningLessonService lessonService;

    private final CourseClient courseClient;

    // 。。。略

    @Override
    @Transactional
    public void addLearningRecord(LearningRecordFormDTO recordDTO) {
        // 1.获取登录用户
        Long userId = UserContext.getUser();
        // 2.处理学习记录
        boolean finished = false;
        if (recordDTO.getSectionType() == SectionType.VIDEO) {
            // 2.1.处理视频
            finished = handleVideoRecord(userId, recordDTO);
        }else{
            // 2.2.处理考试
            finished = handleExamRecord(userId, recordDTO);
        }

        // 3.处理课表数据
        handleLearningLessonsChanges(recordDTO, finished);
    }

    private void handleLearningLessonsChanges(LearningRecordFormDTO recordDTO, boolean finished) {
        // 1.查询课表
        LearningLesson lesson = lessonService.getById(recordDTO.getLessonId());
        if (lesson == null) {
            throw new BizIllegalException("课程不存在,无法更新数据!");
        }
        // 2.判断是否有新的完成小节
        boolean allLearned = false;
        if(finished){
            // 3.如果有新完成的小节,则需要查询课程数据
            CourseFullInfoDTO cInfo = courseClient.getCourseInfoById(lesson.getCourseId(), false, false);
            if (cInfo == null) {
                throw new BizIllegalException("课程不存在,无法更新数据!");
            }
            // 4.比较课程是否全部学完:已学习小节 >= 课程总小节
            allLearned = lesson.getLearnedSections() + 1 >= cInfo.getSectionNum();     
        }
        // 5.更新课表
        lessonService.lambdaUpdate()
                .set(lesson.getLearnedSections() == 0, LearningLesson::getStatus, LessonStatus.LEARNING.getValue())
                .set(allLearned, LearningLesson::getStatus, LessonStatus.FINISHED.getValue())
                .set(!finished, LearningLesson::getLatestSectionId, recordDTO.getSectionId())
                .set(!finished, LearningLesson::getLatestLearnTime, recordDTO.getCommitTime())
                .setSql(finished, "learned_sections = learned_sections + 1")
                .eq(LearningLesson::getId, lesson.getId())
                .update();
    }

    private boolean handleVideoRecord(Long userId, LearningRecordFormDTO recordDTO) {
        // 1.查询旧的学习记录
        LearningRecord old = queryOldRecord(recordDTO.getLessonId(), recordDTO.getSectionId());
        // 2.判断是否存在
        if (old == null) {
            // 3.不存在,则新增
            // 3.1.转换PO
            LearningRecord record = BeanUtils.copyBean(recordDTO, LearningRecord.class);
            // 3.2.填充数据
            record.setUserId(userId);
            // 3.3.写入数据库
            boolean success = save(record);
            if (!success) {
                throw new DbException("新增学习记录失败!");
            }
            return false;
        }
        // 4.存在,则更新
        // 4.1.判断是否是第一次完成
        boolean finished = !old.getFinished() && recordDTO.getMoment() * 2 >= recordDTO.getDuration();
        // 4.2.更新数据
        boolean success = lambdaUpdate()
                .set(LearningRecord::getMoment, recordDTO.getMoment())
                .set(finished, LearningRecord::getFinished, true)
                .set(finished, LearningRecord::getFinishTime, recordDTO.getCommitTime())
                .eq(LearningRecord::getId, old.getId())
                .update();
        if(!success){
            throw new DbException("更新学习记录失败!");
        }
        return finished ;
    }

    private LearningRecord queryOldRecord(Long lessonId, Long sectionId) {
        return lambdaQuery()
                .eq(LearningRecord::getLessonId, lessonId)
                .eq(LearningRecord::getSectionId, sectionId)
                .one();
    }

    private boolean handleExamRecord(Long userId, LearningRecordFormDTO recordDTO) {
        // 1.转换DTO为PO
        LearningRecord record = BeanUtils.copyBean(recordDTO, LearningRecord.class);
        // 2.填充数据
        record.setUserId(userId);
        record.setFinished(true);
        record.setFinishTime(recordDTO.getCommitTime());
        // 3.写入数据库
        boolean success = save(record);
        if (!success) {
            throw new DbException("新增考试记录失败!");
        }
        return true;
    }
}

问题定位

我们实现了学习计划和学习进度的统计功能。,为了更精确的记录用户上一次播放的进度,我们采用的方案是:前端每隔15秒就发起一次请求,将播放记录写入数据库。

但问题是,提交播放记录的业务太复杂了,其中涉及到大量的数据库操作

综上所述:面对严重的问题如下 频繁的进行写数据库的操作,用户量逐渐增大会导致,写库频率过高影响其他业务对数据库的操作;进而导致整个系统运行缓慢

优化方案

宏观角度来说有3个方向

其中,水平扩展和服务保护侧重的是运维层面的处理。而提高单机并发能力侧重的则是业务层面的处理,也就是我们程序员在开发时可以做到的。

因此,我们本章重点讨论如何通过编码来提供业务的单机并发能力。

单机并发能力

在机器性能一定的情况下,提高单机并发能力就是要尽可能缩短业务的响应时间(ResponseTime),而对响应时间影响最大的往往是对数据库的操作。而从数据库角度来说,我们的业务无非就是读或写两种类型。

对于读多写少的业务,其优化手段大家都比较熟悉了,主要包括两方面:

  • 优化代码和SQL
  • 添加缓存 对于写多读少的业务,大家可能较少碰到,优化的手段可能也不太熟悉,这也是我们要讲解的重点。

对于高并发写的优化方案有:

  • 优化代码及SQL
  • 变同步写为异步写
  • 合并写请求

变同步为异步

假如一个业务比较复杂,需要有多次数据库的写业务,如图所示:

由于各个业务之间是同步串行执行,因此整个业务的响应时间就是每一次数据库写业务的响应时间之和,并发能力肯定不会太好。

优化的思路很简单,我们之前讲解MQ的时候就说过,利用MQ可以把同步业务变成异步,从而提高效率。

  • 当我们接收到用户请求后,可以先不处理业务,而是发送MQ消息并返回给用户结果
  • 而后通过消息监听器监听MQ消息,处理后续业务。

这样一来,用户请求处理和后续数据库写就从同步变为异步,用户无需等待后续的数据库写操作,响应时间自然会大大缩短。并发能力自然大大提高。

优点:
- 无需等待复杂业务处理,大大减少响应时间
- 利用MQ暂存消息,起到流量削峰整形作用
- 降低写数据库频率,减轻数据库并发压力
缺点:
- 依赖于MQ的可靠性
- 降低了些频率,但是没有减少数据库写次数
应用场景:
- 比较适合应用于业务复杂, 业务链较长,有多次数据库写操作的业务。

合并写请求

合并写请求方案其实是参考高并发读的优化思路:当读数据库并发较高时,我们可以把数据缓存到Redis,这样就无需访问数据库,大大减少数据库压力,减少响应时间。

既然读数据可以建立缓存,那么写数据可以不可以也缓存到Redis呢?

答案是肯定的,合并写请求就是指当写数据库并发较高时,不再直接写到数据库而是先将数据缓存到Redis,然后定期将缓存中的数据批量写入数据库。 如图:

由于Redis是内存操作,写的效率也非常高,这样每次请求的处理速度大大提高,响应时间大大缩短,并发能力肯定有很大的提升。

而且由于数据都缓存到Redis了,积累一些数据后再批量写入数据库,这样数据库的写频率、写次数都大大减少,对数据库压力小了非常多!

Redis数据结构设计

我们先讨论下Redis缓存中需要记录哪些数据。

我们的优化方案要处理的不是所有的提交学习记录请求。仅仅是视频播放时的高频更新播放进度的请求,对应的业务分支如图:

这条业务支线的流程如下:

  • 查询播放记录,判断是否存在
    • 如果不存在,新增一条记录
    • 如果存在,则更新学习记录
  • 判断当前进度是否是第一次学完
    • 播放进度要超过50%
    • 原本的记录状态是未学完
  • 更新课表中最近学习小节id、学习时间

这里有多次数据库操作,例如:

  • 查询播放记录:需要知道播放记录是否存在播放记录当前的完成状态
  • 更新播放记录:更新播放进度
  • 更新最近学习小节id、时间

一方面我们要缓存写数据,减少写数据库频率;另一方面我们要缓存播放记录,减少查询数据库。因此,缓存中至少要包含3个字段:

  • 记录id :id,用于根据id更新数据库
  • 播放进度:moment,用于缓存播放进度
  • 播放状态(是否学完):finished,用于判断是否是第一次学完

既然一个小节要保存多个字段,是不是可以考虑使用Hash结构来保存这些数据,如图:

不过,这样设计有一个问题。课程有很多,每个课程的小节也非常多。每个小节都是一个独立的KEY,需要创建的KEY也会非常多,浪费大量内存。

而且,用户学习视频的过程中,可能会在多个视频之间来回跳转,这就会导致频繁的创建缓存、缓存过期,影响到最终的业务性能。该如何解决呢?

既然一个课程包含多个小节,我们完全可以把一个课程的多个小节作为一个KEY来缓存,如图:

这样做有两个好处:

  • 可以大大减少需要创建的KEY的数量,减少内存占用。
  • 一个课程创建一个缓存,当用户在多个视频间跳转时,整个缓存的有效期都会被延续,不会频繁的创建和销毁缓存数据

添加缓存以后,学习记录提交的业务流程就需要发生一些变化了,如图:

变化最大的有两点:

  • 提交播放进度后,如果是更新播放进度则不写数据库,而是写缓存
  • 需要一个定时任务,定期将缓存数据写入数据库

持久化思路

对于合并写请求方案,一定有一个步骤就是持久化缓存数据到数据库。一般采用的是定时任务持久化:

但是定时任务的持久化方式在播放进度记录业务中存在一些问题,主要就是时效性问题。我们要求视频续播的时间误差不能超过30秒。

  • 假如定时任务间隔较短,例如20秒一次,对数据库的更新频率太高,压力太大
  • 假如定时任务间隔较长,例如2分钟一次,更新频率较低,续播误差可能超过2分钟,不满足需求

那么问题来了,有什么办法能够在不增加数据库压力的情况下,保证时间误差较低吗?

假如一个视频时长为20分钟,我们从头播放至15分钟关闭,每隔15秒提交一次播放进度,大概需要提交60次请求。

但是下一次我们再次打开该视频续播的时候,肯定是从最后一次提交的播放进度来续播。也就是说续播进度之前的N次播放进度都是没有意义的,都会被覆盖

既然如此,我们完全没有必要定期把这些播放进度写到数据库,只需要将用户最后一次提交的播放进度写入数据库即可。

但问题来了,我们怎么知道哪一次提交是最后一次提交呢?

只要用户一直在提交记录,Redis中的播放进度就会一直变化。如果Redis中的播放进度不变,肯定是停止了播放,是最后一次提交。

因此,我们只要能判断Redis中的播放进度是否变化即可。怎么判断呢?

每当前端提交播放记录时,我们可以设置一个延迟任务并保存这次提交的进度。等待20秒后(因为前端每15秒提交一次,20秒就是等待下一次提交),检查Redis中的缓存的进度与任务中的进度是否一致。

  • 不一致:说明持续在提交,无需处理
  • 一致:说明是最后一次提交,更新学习记录、更新课表最近学习小节和时间到数据库中

优化实现

延迟队列的引入

为了确定用户提交的播放记录是否变化,我们需要将播放记录保存为一个延迟任务,等待超过一个提交周期(20s)后检查播放进度。

那么延迟任务该如何实现呢?

以上四种方案都可以解决问题,不过本例中我们会使用DelayQueue方案。因为这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。

毕竟DelayQueue是JVM层面的,一旦服务宕机或者重启就什么都没了,在分布式情况下也会有很多漏洞;后续我会优化使用RabbitMQ延时插件来实现延时队列

但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。但考虑到任务存储时间比较短(只有20秒),因此也可以接收。

如果你们的数据量非常大,DelayQueue不能满足业务需求,大家也可以替换为其它延迟队列方式,例如RedissonMQ

从源码中可以看出,Delayed类型必须具备两个方法:

  • getDelay():获取延迟任务的剩余延迟时间
  • compareTo(T t):比较两个延迟任务的延迟时间,判断执行顺序

可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。

将来每一次提交播放记录,就可以将播放记录保存在这样的一个Delayed类型的延迟任务里并设定20秒的延迟时间。然后交给DelayQueue队列。DelayQueue会调用compareTo方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行

@Data
public class DelayTask<D> implements Delayed {
    private D data;
    private long deadlineNanos;

    public DelayTask(D data, Duration delayTime) {
        this.data = data;
        this.deadlineNanos = System.nanoTime() + delayTime.toNanos();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        if(l > 0){
            return 1;
        }else if(l < 0){
            return -1;
        }else {
            return 0;
        }
    }
}

代码改造(定义延迟任务工具类)

因此,我们的工具类就应该具备上述4个方法:

  • ① 添加播放记录到Redis并添加一个延迟检测任务DelayQueue
  • 查询Redis缓存中的指定小节的播放记录
  • 删除Redis缓存中的指定小节的播放记录
  • ④ 异步执行DelayQueue中的延迟检测任务,检测播放进度是否变化,如果无变化则写入数据库
@Slf4j
@Component
@RequiredArgsConstructor
public class LearningRecordDelayTaskHandler {

    private final StringRedisTemplate redisTemplate;
    private final LearningRecordMapper recordMapper;
    private final ILearningLessonService lessonService;
    private final DelayQueue<DelayTask<RecordTaskData>> queue = new DelayQueue<>();
    private final static String RECORD_KEY_TEMPLATE = "learning:frowning:{}";
    private static volatile boolean begin = true;

    @PostConstruct
    public void init(){
        CompletableFuture.runAsync(this::handleDelayTask);
    }
    @PreDestroy
    public void destroy(){
        begin = false;
        log.debug("延迟任务停止执行!");
    }

    public void handleDelayTask(){
        while (begin) {
            try {
                // 1.获取到期的延迟任务
                DelayTask<RecordTaskData> task = queue.take();
                RecordTaskData data = task.getData();
                // 2.查询Redis缓存
                LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId());
                if (record == null) {
                    continue;
                }
                // 3.比较数据,moment值
                if(!Objects.equals(data.getMoment(), record.getMoment())) {
                    // 不一致,说明用户还在持续提交播放进度,放弃旧数据
                    continue;
                }

                // 4.一致,持久化播放进度数据到数据库
                // 4.1.更新学习记录的moment
                record.setFinished(null);
                recordMapper.updateById(record);
                // 4.2.更新课表最近学习信息
                LearningLesson lesson = new LearningLesson();
                lesson.setId(data.getLessonId());
                lesson.setLatestSectionId(data.getSectionId());
                lesson.setLatestLearnTime(LocalDateTime.now());
                lessonService.updateById(lesson);
            } catch (Exception e) {
                log.error("处理延迟任务发生异常", e);
            }
        }
    }

    public void addLearningRecordTask(LearningRecord record){
        // 1.添加数据到Redis缓存
        writeRecordCache(record);
        // 2.提交延迟任务到延迟队列 DelayQueue
        queue.add(new DelayTask<>(new RecordTaskData(record), Duration.ofSeconds(20)));
    }

    public void writeRecordCache(LearningRecord record) {
        log.debug("更新学习记录的缓存数据");
        try {
            // 1.数据转换
            String json = JsonUtils.toJsonStr(new RecordCacheData(record));
            // 2.写入Redis
            String key = StringUtils.format(RECORD_KEY_TEMPLATE, record.getLessonId());
            redisTemplate.opsForHash().put(key, record.getSectionId().toString(), json);
            // 3.添加缓存过期时间
            redisTemplate.expire(key, Duration.ofMinutes(1));
        } catch (Exception e) {
            log.error("更新学习记录缓存异常", e);
        }
    }

    public LearningRecord readRecordCache(Long lessonId, Long sectionId){
        try {
            // 1.读取Redis数据
            String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId);
            Object cacheData = redisTemplate.opsForHash().get(key, sectionId.toString());
            if (cacheData == null) {
                return null;
            }
            // 2.数据检查和转换
            return JsonUtils.toBean(cacheData.toString(), LearningRecord.class);
        } catch (Exception e) {
            log.error("缓存读取异常", e);
            return null;
        }
    }

    public void cleanRecordCache(Long lessonId, Long sectionId){
        // 删除数据
        String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId);
        redisTemplate.opsForHash().delete(key, sectionId.toString());
    }

    @Data
    @NoArgsConstructor
    private static class RecordCacheData{
        private Long id;
        private Integer moment;
        private Boolean finished;

        public RecordCacheData(LearningRecord record) {
            this.id = record.getId();
            this.moment = record.getMoment();
            this.finished = record.getFinished();
        }
    }
    @Data
    @NoArgsConstructor
    private static class RecordTaskData{
        private Long lessonId;
        private Long sectionId;
        private Integer moment;

        public RecordTaskData(LearningRecord record) {
            this.lessonId = record.getLessonId();
            this.sectionId = record.getSectionId();
            this.moment = record.getMoment();
        }
    }
}

优化后

@Service
@RequiredArgsConstructor
public class LearningRecordServiceImpl extends ServiceImpl<LearningRecordMapper, LearningRecord> implements ILearningRecordService {

    private final ILearningLessonService lessonService;

    private final CourseClient courseClient;

    private final LearningRecordDelayTaskHandler taskHandler;

    @Override
    public LearningLessonDTO queryLearningRecordByCourse(Long courseId) {
        // 1.获取登录用户
        Long userId = UserContext.getUser();
        // 2.查询课表
        LearningLesson lesson = lessonService.queryByUserAndCourseId(userId, courseId);
        // 3.查询学习记录
        // select * from xx where lesson_id = #{lessonId}
        List<LearningRecord> records = lambdaQuery().eq(LearningRecord::getLessonId, lesson.getId()).list();
        // 4.封装结果
        LearningLessonDTO dto = new LearningLessonDTO();
        dto.setId(lesson.getId());
        dto.setLatestSectionId(lesson.getLatestSectionId());
        dto.setRecords(BeanUtils.copyList(records, LearningRecordDTO.class));
        return dto;
    }

    @Override
    @Transactional
    public void addLearningRecord(LearningRecordFormDTO recordDTO) {
        // 1.获取登录用户
        Long userId = UserContext.getUser();
        // 2.处理学习记录
        boolean finished = false;
        if (recordDTO.getSectionType() == SectionType.VIDEO) {
            // 2.1.处理视频
            finished = handleVideoRecord(userId, recordDTO);
        } else {
            // 2.2.处理考试
            finished = handleExamRecord(userId, recordDTO);
        }
		//优化地方
        if (!finished) {
            // 没有新学完的小节,无需更新课表中的学习进度
            return;
        }
		//优化地方
        // 3.处理课表数据
        handleLearningLessonsChanges(recordDTO);
    }

    private void handleLearningLessonsChanges(LearningRecordFormDTO recordDTO) {
        // 1.查询课表
        LearningLesson lesson = lessonService.getById(recordDTO.getLessonId());
        if (lesson == null) {
            throw new BizIllegalException("课程不存在,无法更新数据!");
        }
		//优化地方
        // 2.判断是否有新的完成小节
        boolean allLearned = false;

        // 3.如果有新完成的小节,则需要查询课程数据
        CourseFullInfoDTO cInfo = courseClient.getCourseInfoById(lesson.getCourseId(), false, false);
        if (cInfo == null) {
            throw new BizIllegalException("课程不存在,无法更新数据!");
        }
        // 4.比较课程是否全部学完:已学习小节 >= 课程总小节
        allLearned = lesson.getLearnedSections() + 1 >= cInfo.getSectionNum();

        // 5.更新课表
        lessonService.lambdaUpdate()
                .set(lesson.getLearnedSections() == 0, LearningLesson::getStatus, LessonStatus.LEARNING.getValue())
                .set(allLearned, LearningLesson::getStatus, LessonStatus.FINISHED.getValue())
                .set(allLearned, LearningLesson::getFinishTime, LocalDateTime.now())
                .setSql("learned_sections = learned_sections + 1")
                .eq(LearningLesson::getId, lesson.getId())
                .update();
				//优化地方
    }

    private boolean handleVideoRecord(Long userId, LearningRecordFormDTO recordDTO) {
        // 1.查询旧的学习记录
        LearningRecord old = queryOldRecord(recordDTO.getLessonId(), recordDTO.getSectionId());
        // 2.判断是否存在
        if (old == null) {
            // 3.不存在,则新增
            // 3.1.转换PO
            LearningRecord record = BeanUtils.copyBean(recordDTO, LearningRecord.class);
            // 3.2.填充数据
            record.setUserId(userId);
            // 3.3.写入数据库
            boolean success = save(record);
            if (!success) {
                throw new DbException("新增学习记录失败!");
            }
            return false;
        }
        // 4.存在,则更新
        // 4.1.判断是否是第一次完成
        boolean finished = !old.getFinished() && recordDTO.getMoment() * 2 >= recordDTO.getDuration();
		//优化地方
        if (!finished) {
            LearningRecord record = new LearningRecord();
            record.setLessonId(recordDTO.getLessonId());
            record.setSectionId(recordDTO.getSectionId());
            record.setMoment(recordDTO.getMoment());
            record.setId(old.getId());
            record.setFinished(old.getFinished());
            taskHandler.addLearningRecordTask(record);
            return false;
        }
		//优化地方
        // 4.2.更新数据
        boolean success = lambdaUpdate()
                .set(LearningRecord::getMoment, recordDTO.getMoment())
                .set(LearningRecord::getFinished, true)
                .set(LearningRecord::getFinishTime, recordDTO.getCommitTime())
                .eq(LearningRecord::getId, old.getId())
                .update();
        if (!success) {
            throw new DbException("更新学习记录失败!");
        }
		//优化地方
        // 4.3.清理缓存
        taskHandler.cleanRecordCache(recordDTO.getLessonId(),
		//优化地方recordDTO.getSectionId());
        return true;
    }

    private LearningRecord queryOldRecord(Long lessonId, Long sectionId) {
	//优化地方
        // 1.查询缓存
        LearningRecord record = taskHandler.readRecordCache(lessonId, sectionId);
        // 2.如果命中,直接返回
        if (record != null) {
            return record;
        }
        // 3.未命中,查询数据库
        record = lambdaQuery()
                .eq(LearningRecord::getLessonId, lessonId)
                .eq(LearningRecord::getSectionId, sectionId)
                .one();
        // 4.写入缓存
        taskHandler.writeRecordCache(record);
        return record;
		//优化地方
    }

    private boolean handleExamRecord(Long userId, LearningRecordFormDTO recordDTO) {
        // 1.转换DTO为PO
        LearningRecord record = BeanUtils.copyBean(recordDTO, LearningRecord.class);
        // 2.填充数据
        record.setUserId(userId);
        record.setFinished(true);
        record.setFinishTime(recordDTO.getCommitTime());
        // 3.写入数据库
        boolean success = save(record);
        if (!success) {
            throw new DbException("新增考试记录失败!");
        }
        return true;
    }
}

MQ优化方案

以上是基于JVM的延时队列优化方案,下面我来展示用MQ实现的延时队列实现视频播放的续播功能!

由于我的项目中对MQ进行的封装,请看 项目中对MQ的封装

延时任务缓存处理工具类

这个部分和之前基本一样主要还是对Redis中学习记录缓存的写入,读取,清除结合业务对功能的实现

@Slf4j
@Component
@RequiredArgsConstructor
public class VideoDelayTaskHandler {

    private final RabbitMqHelper mqHelper;
    private final StringRedisTemplate redisTemplate;
    private final static String RECORD_KEY_TEMPLATE = "learning:frowning:{}";

    public void addCacheAndSendDelayMsg(LearningRecord record){
        // 1.添加数据到Redis缓存
        writeRecordCache(record);
        //发送延时消息
        mqHelper.sendDelayMessage(
                VIDEO_DELAY_EXCHANGE,
                VIDEO_DELAY_KEY,
                new RecordTaskData(record),
                Duration.ofSeconds(20)
        );
    }

    public void writeRecordCache(LearningRecord record) {
        log.debug("更新学习记录的缓存数据");
        try {
            // 1.数据转换
            String json = JsonUtils.toJsonStr(new RecordCacheData(record));
            // 2.写入Redis
            String key = StringUtils.format(RECORD_KEY_TEMPLATE, record.getLessonId());
            redisTemplate.opsForHash().put(key, record.getSectionId().toString(), json);
            // 3.添加缓存过期时间
            redisTemplate.expire(key, Duration.ofMinutes(1));
        } catch (Exception e) {
            log.error("更新学习记录缓存异常", e);
        }
    }

    public LearningRecord readRecordCache(Long lessonId, Long sectionId){
        try {
            // 1.读取Redis数据
            String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId);
            Object cacheData = redisTemplate.opsForHash().get(key, sectionId.toString());
            if (cacheData == null) {
                return null;
            }
            // 2.数据检查和转换
            return JsonUtils.toBean(cacheData.toString(), LearningRecord.class);
        } catch (Exception e) {
            log.error("缓存读取异常", e);
            return null;
        }
    }

    public void cleanRecordCache(Long lessonId, Long sectionId){
        // 删除数据
        String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId);
        redisTemplate.opsForHash().delete(key, sectionId.toString());
    }

    @Data
    @NoArgsConstructor
    private static class RecordCacheData{
        private Long id;
        private Integer moment;
        private Boolean finished;

        public RecordCacheData(LearningRecord record) {
            this.id = record.getId();
            this.moment = record.getMoment();
            this.finished = record.getFinished();
        }
    }
}

主要优化地方

消息监听器

具体处理逻辑

public void handleVideoTask(RecordTaskData data){
        try {
            //查询Redis缓存
            LearningRecord record = videoTaskHandler.readRecordCache(data.getLessonId(), data.getSectionId());

            if (record == null) {
                return;
            }
            // 3.比较数据,moment值
            if(!Objects.equals(data.getMoment(), record.getMoment())) {
                // 不一致,说明用户还在持续提交播放进度,放弃旧数据
                return;
            }

            // 4.一致,持久化播放进度数据到数据库
            // 4.1.更新学习记录的moment
            record.setFinished(null);
            recordMapper.updateById(record);
            // 4.2.更新课表最近学习信息
            LearningLesson lesson = new LearningLesson();
            lesson.setId(data.getLessonId());
            lesson.setLatestSectionId(data.getSectionId());
            lesson.setLatestLearnTime(LocalDateTime.now());
            lessonService.updateById(lesson);
        } catch (Exception e) {
            log.error("处理延迟任务发生异常", e);
        }
    }

实现功能截图

  • 监听到的消息
  • Redis中的记录情况
  • 数据库只记录最终停止播放的moment

总结

提交播放记录最终肯定是要保存到数据库中的。因为我们不仅要做视频续播,还有用户学习计划、学习进度统计等功能,都需要用到用户的播放记录数据。

但确实如你所说,前端每隔15秒一次请求,如果在用户量较大时,直接全部写入数据库,对数据库压力会比较大。因此我们采用了合并写请求的方案,当用户提交播放进度时会先缓存在Redis中,后续再将数据保存到数据库即可。

由于播放进度会不断覆盖,只保留最后一次即可。这样就可以大大减少对于数据库的访问次数和访问频率了。

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录