小蔡学Java

项目一总结:(八)排行榜功能及海量数据存储

2024-01-24 15:21 1129 0 项目 RedisRedis优化分库分表定时任务

排行榜

在我的Online学习平台项目中,实现了积分功能,并且也将用户的积分明细保存到了数据库?积分的高低排名就是排行榜,那么排行榜该如何实现呢?

基于数据库实现

排行榜是基于积分实现的,积分明细表如下

产生的数据如下:

要想形成排行榜,我们在查询数据库时,需要先对用户分组,再对积分求和,最终按照积分和排序,Sql语句是这样:

SELECT user_id, SUM(points) FROM points_record GROUP BY user_id ORDER BY SUM(points)

查询结果如下:

面对的问题

每个用户都可能会有数十甚至上百条积分记录,当用户规模达到百万规模,可能产生的积分记录就是数以亿计。 要在每次查询排行榜时,在内存中对这么多数据做分组、求和、排序,对内存和CPU的占用会非常恐怖,不太靠谱。 那该怎么办呢?

解决方案

  • 方案一:基于MySQL的离线排序
  • 方案二:基于Redis的SortedSet

首先说方案一:

简单来说,就是将数据库中的数据查询出来,在内存中自己利用算法实现排序,而后将排序得到的榜单保存到数据库中。但由于这个排序比较复杂,我们无法实时更新排行榜,而是每隔几分钟计算一次排行榜。这种方案实现起来比较复杂,而且实时性较差。不过优点是不会一直占用系统资源

再说方案二:

RedisSortedSet底层采用了跳表的数据结构,因此可以非常高效的实现排序功能,百万用户排序轻松搞定。而且每当用户积分发生变更时,我们可以实时更新Redis中的用户积分,而SortedSet也会实时更新排名。实现起来简单、高效,实时性也非常好。缺点就是需要一直占用Redis的内存,当用户量达到数千万万时,性能有一定的下降。

当系统用户量规模达到数千万,乃至数亿时,我们可以采用分治的思想,将用户数据按照积分范围划分为多个桶,例如: 0~100分、101~200分、201~300分、301~500分、501~800分、801~1200分、1201~1500分、1501~2000分

在Redis内为每个桶创建一个SortedSet类型的key,这样就可以将数据分散减少单个KEY的数据规模了。而要计算排名时,只需要按照范围查询出用户积分所在的桶,再累加分值比他高的桶的用户数量即可。依然非常简单、高效。

综上,我们推荐基于Redis的SortedSet来实现排行榜功能。

基于Redis的实现

用户每次积分变更时,累加积分到Redis的SortedSet中

对之前改造如下:

在Redis中,使用SortedSet结构,以赛季的日期为key,以用户id为member,以积分和为score. 每当用户新增积分,就累加到score中,SortedSet排名就会实时更新。这样一个实时的当前赛季榜单就出现了。

代码实现

@Override
public void addPointsRecord(Long userId, int points, PointsRecordType type) {
    LocalDateTime now = LocalDateTime.now();
    int maxPoints = type.getMaxPoints();
    // 1.判断当前方式有没有积分上限
    int realPoints = points;
    if(maxPoints > 0) {
        // 2.有,则需要判断是否超过上限
        LocalDateTime begin = DateUtils.getDayStartTime(now);
        LocalDateTime end = DateUtils.getDayEndTime(now);
        // 2.1.查询今日已得积分
        int currentPoints = queryUserPointsByTypeAndDate(userId, type, begin, end);
        // 2.2.判断是否超过上限
        if(currentPoints >= maxPoints) {
            // 2.3.超过,直接结束
            return;
        }
        // 2.4.没超过,保存积分记录
        if(currentPoints + points > maxPoints){
            realPoints = maxPoints - currentPoints;
        }
    }
    // 3.没有,直接保存积分记录
    PointsRecord p = new PointsRecord();
    p.setPoints(realPoints);
    p.setUserId(userId);
    p.setType(type);
    save(p);
    // 4.更新总积分到Redis
    String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + now.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER);
    redisTemplate.opsForZSet().incrementScore(key, userId.toString(), realPoints);
}

在我的项目中,当前榜单是存储在Redis历史榜单的话我们就存储在MySQL

查看当前赛季榜单

@Override
    public List<PointsBoard> queryCurrentBoardList(String key, Integer pageNo, Integer pageSize) {
        // 1.计算分页
        int from = (pageNo - 1) * pageSize;
        // 2.查询
        Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate.opsForZSet()
                .reverseRangeWithScores(key, from, from + pageSize - 1);
        if (CollUtils.isEmpty(tuples)) {
            return CollUtils.emptyList();
        }
        // 3.封装
        int rank = from + 1;
        List<PointsBoard> list = new ArrayList<>(tuples.size());
        for (ZSetOperations.TypedTuple<String> tuple : tuples) {
            String userId = tuple.getValue();
            Double points = tuple.getScore();
            if (userId == null || points == null) {
                continue;
            }
            PointsBoard p = new PointsBoard();
            p.setUserId(Long.valueOf(userId));
            p.setPoints(points.intValue());
            p.setRank(rank++);
            list.add(p);
        }
        return list;
    }

查询当前用户在某一个赛季的排名

 private PointsBoard queryMyHistoryBoard(Long season) {
        // 1.获取登录用户
        Long userId = UserContext.getUser();
        // 2.计算表名
        TableInfoContext.setInfo(POINTS_BOARD_TABLE_PREFIX + season);
        // 3.查询数据
        Optional<PointsBoard> opt = lambdaQuery().eq(PointsBoard::getUserId, userId).oneOpt();
        if (opt.isEmpty()) {
            return null;
        }
        // 4.转换数据
        PointsBoard pointsBoard = opt.get();
        pointsBoard.setRank(pointsBoard.getId().intValue());
        return pointsBoard;
    }

历史榜单的存储

在天Online学习平台中, 积分排行榜是分赛季的 ,每一个月是一个赛季。因此每到每个月的月初,就会进入一个新的赛季。所有用户的积分应该清零,重新累积。

但是,我们能把Redis中的榜单数据直接清空吗?显然不行!Redis中的榜单数据是上个月的数据,属于历史榜单了,直接清空就丢失了一个赛季的数据。

因此,我们 必须将Redis中的历史数据持久化到数据库中 ,然后再清零。如图:

不过,这里就有一个问题需要解决:

假如有数百万用户,这就意味着每个赛季榜单都有数百万数据。随着时间推移,历史赛季越来越多,如果全部保存到一张表中,数据量会非常恐怖!(虽然我的项目咩有哈,但是我们要从学习的角度去看待嘛?)

分库分表

该怎么办呢?那就是分库分表:如果有不了解的伙伴可以看我这篇文章: 关于分库分表

Online学习平台项目是一个教育类项目,用户规模并不会很高,一般在十多万到百万级别。因此最终的数据规模也并不会非常庞大。

综合之前的分析,结合的项目情况,我们可以对 榜单数据做分表,但是 暂时不需要做分库和集群

由于我们要解决的是数据过多问题,因此分表的方式选择 水平分表 。具体来说,就是按照赛季拆分,每一个赛季是一个独立的表,如图:

由于我们上表中的rank字段可以使用主键id来替代,又是水平分表可以把reason字段放到表名称中 优化如下:

定时任务创建表

每个赛季要有不同的表,这些表什么时候创建呢?

应该在每个赛季刚开始的时候(月初)来创建新的赛季榜单表 。每个月的月初执行一个创建表的任务,我们可以利用 定时任务 来实现。

由于表的名称中包含赛季id,因此在定时任务中我们还要先查询赛季信息,获取赛季id拼接得到表名,最后创建表

SpringTask实现

定时任务

@Scheduled(cron = "0 0 3 1 * ?") // 每月1号,凌晨3点执行
    public void createPointsBoardTableOfLastSeason(){
        // 1.获取上月时间
        LocalDateTime time = LocalDateTime.now().minusMonths(1);
        // 2.查询赛季id
        Integer season = seasonService.querySeasonByTime(time);
        if (season == null) {
            // 赛季不存在
            return;
        }
        // 3.创建表
        pointsBoardService.createPointsBoardTableBySeason(season);
    }

createPointsBoardTableBySeason(season)建表

service接口

public interface IPointsBoardService extends IService<PointsBoard> {
    PointsBoardVO queryPointsBoardBySeason(PointsBoardQuery query);

    void createPointsBoardTableBySeason(Integer season);
}

实现方法

@Override
public void createPointsBoardTableBySeason(Integer season) {
    getBaseMapper().createPointsBoardTable(POINTS_BOARD_TABLE_PREFIX + season);
}

Mapper文件

<mapper namespace="com.online.learning.mapper.PointsBoardMapper">

    <insert id="createPointsBoardTable" parameterType="java.lang.String">
        CREATE TABLE `${tableName}`
        (
            `id`      BIGINT NOT NULL AUTO_INCREMENT COMMENT '榜单id',
            `user_id` BIGINT NOT NULL COMMENT '学生id',
            `points`  INT    NOT NULL COMMENT '积分值',
            PRIMARY KEY (`id`) USING BTREE,
            INDEX `idx_user_id` (`user_id`) USING BTREE
        )
            COMMENT ='学霸天梯榜'
            COLLATE = 'utf8mb4_0900_ai_ci'
            ENGINE = InnoDB
            ROW_FORMAT = DYNAMIC
    </insert>
</mapper>

SpringTask面临的问题

目前,我们的定时任务都是基于SpringTask来实现的。但是SpringTask存在一些问题:

  • 微服务多实例部署时,定时任务会被执行多次 。而事实上我们只需要 这个任务被执行一 次即可。
  • 我们除了要定时创建表,还要 定时持久化Redis数据到数据 库,我们希望这多个定时任务能够 按照顺序依次执行 ,SpringTask无法控制任务顺序

不仅仅是SpringTask,其它单机使用的定时任务工具,都无法实现像这种任务执行者的调度、任务执行顺序的编排、任务监控等功能。这些功能必须要用到分布式任务调度组件。

这里我们使用的是Xxl-Job定时任务框架:不了解的伙伴请看:xxl-job定时任务

榜单持久化

榜单持久化的基本流程是这样的:

  • 创建表
  • 持久化Redis数据到数据库
  • 清理Redis数据

现在,创建表的动作已经完成,接下来就轮到Redis数据的持久化了。持久化的步骤如下:

  • 读取Redis数据
  • 判断数据是否存在
    • 不存在,直接结束
    • 存在,则继续
  • 保存数据到数据库

KEY 中包含一个上赛季对应的 日期 ,因此要读取Redis数据,我们必须先得到上赛季的日期。 另外,我们采用了水平分表的策略,每一个赛季都是一个独立表。那么在写数据到数据库时,必须先知道 表名称

持久化流程:

动态表名

持久化的流程中存在一个问题,我们的数据库持久化采用的是MybatisPlus来实现的。而MybatisPlus读取表名的方式是通过实体类上的@Table注解,而注解往往是写死的:

如何让MybatisPlus在执行的时候改变数据写入的表名称呢?

MybatisPlus中提供了一个动态表名的插件:点击跳转

可见表名称动态获取就是依赖于tableNameHandlerMapping中的具体的TableNameHandler,这个Map如图:

TableNameHandler部分源码:

public interface TableNameHandler {

    /**
     * 生成动态表名
     *
     * @param sql       当前执行 SQL
     * @param tableName 表名
     * @return String
     */
    String dynamicTableName(String sql, String tableName);
}

OK,因此我们要做的事情就很简单了,定义DynamicTableNameInnterInterceptor,向其中添加一个TableNameHandler,将points_board这个表名,替换为points_board_赛季id的名称

当我们批量的写数据到数据库时,如果每次插入都计算一次表名,那性能也太差了。因此,我们肯定是希望一次计算,在TableNameHandler中可以随时获取。这里我们使用的是ThreadLocal

如何传递表名称呢?

虽然无法传参,但是从计算表名,到动态表名插件执行,调用TableNameHandler,都是在一个 线程内完成的。要在一个线程内实现数据共享,该用什么呢?

大家应该很容易想到,就是ThreadLocal.

我们可以在定时任务中计算完动态表名后,将表名存入ThreadLocal,然后在插件中从ThreadLocal中读取即可:


public class TableInfoContext {
    private static final ThreadLocal<String> TL = new ThreadLocal<>();

    public static void setInfo(String info) {
        TL.set(info);
    }

    public static String getInfo() {
        return TL.get();
    }

    public static void remove() {
        TL.remove();
    }
}

定义一个配置类,用于定义DynamicTableNameInnterInterceptor插件:

@Configuration
public class MybatisConfiguration {

    @Bean
    public DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor() {
        // 准备一个Map,用于存储TableNameHandler
        Map<String, TableNameHandler> map = new HashMap<>(1);
        // 存入一个TableNameHandler,用来替换points_board表名称
        // 替换方式,就是从TableInfoContext中读取保存好的动态表名
        map.put("points_board", (sql, tableName) -> TableInfoContext.getInfo() == null ? tableName : TableInfoContext.getInfo());
        return new DynamicTableNameInnerInterceptor(map);
    }
}

DynamicTableNameInnerInterceptor配置进去

修改其中的拦截器配置:

@Configuration
@ConditionalOnClass({MybatisPlusInterceptor.class, BaseMapper.class})
public class MybatisConfig {

    /**
     * @see MyBatisAutoFillInterceptor 通过自定义拦截器来实现自动注入creater和updater
     * @deprecated 存在任务更新数据导致updater写入0或null的问题,暂时废弃
     */
    // @Bean
    // @ConditionalOnMissingBean
    public BaseMetaObjectHandler baseMetaObjectHandler() {
        return new BaseMetaObjectHandler();
    }

    @Bean
    @ConditionalOnMissingBean
    public MybatisPlusInterceptor mybatisPlusInterceptor(@Autowired(required = false) DynamicTableNameInnerInterceptor innerInterceptor) {
        // 1.定义插件主体,注意顺序:表名 > 多租户 > 分页 > 乐观锁 > 字段填充
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        // 2.表名插件
        if (innerInterceptor != null) {
            interceptor.addInnerInterceptor(innerInterceptor);
        }
        // 3.分页插件
        PaginationInnerInterceptor paginationInnerInterceptor = new PaginationInnerInterceptor(DbType.MYSQL);
        paginationInnerInterceptor.setMaxLimit(200L);
        interceptor.addInnerInterceptor(paginationInnerInterceptor);
        // 4.字段填充插件
        interceptor.addInnerInterceptor(new MyBatisAutoFillInterceptor());
        return interceptor;
    }
}
  • 由于DynamicTableNameInnerInterceptor并不是每一个微服务都用了,所以这里加入了@Autowired(required= false),避免未定义该拦截器的微服务报错。
  • MybatisPlus的插件定义顺序非常重要,必须按照一定的顺序来定义:参考链接

定时持久化任务实现

@XxlJob("savePointsBoard2DB")
public void savePointsBoard2DB(){
    // 1.获取上月时间
    LocalDateTime time = LocalDateTime.now().minusMonths(1);

    // 2.计算动态表名
    // 2.1.查询赛季信息
    Integer season = seasonService.querySeasonByTime(time);
    // 2.2.存入ThreadLocal
    TableInfoContext.setInfo(POINTS_BOARD_TABLE_PREFIX + season);

    // 3.查询榜单数据
    // 3.1.拼接KEY
    String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER);
    // 3.2.查询数据
    int index = XxlJobHelper.getShardIndex();
    int total = XxlJobHelper.getShardTotal();
    int pageNo = index + 1; // 起始页,就是分片序号+1
    int pageSize = 10;
    while (true) {
        List<PointsBoard> boardList = pointsBoardService.queryCurrentBoardList(key, pageNo, pageSize);
        if (CollUtils.isEmpty(boardList)) {
            break;
        }
        // 4.持久化到数据库
        // 4.1.把排名信息写入id
        boardList.forEach(b -> {
            b.setId(b.getRank().longValue());
            b.setRank(null);
        });
        // 4.2.持久化
        pointsBoardService.saveBatch(boardList);
        // 5.翻页,跳过N个页,N就是分片数量
        pageNo+=total;
    }

    TableInfoContext.remove();
}

清理Redis缓存任务

    @XxlJob("clearPointsBoardFromRedis")
    public void clearPointsBoardFromRedis(){
        // 1.获取上月时间
        LocalDateTime time = LocalDateTime.now().minusMonths(1);
        // 2.计算key
        String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER);
        // 3.删除
        redisTemplate.unlink(key);
    }

经过上诉我们发现会有如下步骤:

他们是有顺序的,我们可以使用XXL-JOB中的子任务功能

要想让任务A、B依次执行,其实就是配置任务B作为任务A的子任务。因此,我们按照下面方式配置:

  • 定期执行创建赛季榜单(14)的子任务是持久化榜单数据任务(15)
  • 持久化榜单数据任务(15)的子任务是清理Redis中的历史榜单(16)

也就是说:14的子任务是15, 15的子任务是16

总结:

面试模拟 你在项目中负责积分排行榜功能,说说看你们排行榜怎么设计实现的?

我们的排行榜功能分为两部分:一个是当前赛季排行榜,一个是历史排行榜。
因为我们的产品设计是每个月为一个赛季,月初清零积分记录,这样学员就有持续的动力去学习。这就有了赛季的概念,因此也就有了当前赛季榜单和历史榜单的区分,其实现思路也不一样。
首先说当前赛季榜单,我们采用了Redis的SortedSet来实现。member是用户id,score就是当月积分总值。每当用户产生积分行为的时候,获取积分时,就会更新score值。这样Redis就会自动形成榜单了。非常方便且高效。
然后再说历史榜单,历史榜单肯定是保存到数据库了。不过由于数据过多,所以需要对数据做水平拆分,我们目前的思路是按照赛季来拆分,也就是每一个赛季的榜单单独一张表。这样做有几个好处:
- 拆分数据时比较自然,无需做额外处理
- 查询数据时往往都是按照赛季来查询,这样一次只需要查一张表,不存在跨表查询问题
因此我们就不需要用到分库分表的插件了,直接在业务层利用MybatisPlus就可以实现动态表名,动态插入了。简单高效。
我们会利用一个定时任务在每月初生成上赛季的榜单表,然后再用一个定时任务读取Redis中的上赛季榜单数据,持久化到数据库中。最后再有一个定时任务清理Redis中的历史数据。
这里要说明一下,这里三个任务是有关联的,之所以让任务分开定义,是为了避免任务耦合。这样在部分任务失败时,可以单独重试,无需所有任务从头重试。
当然,最终我们肯定要确保这三个任务的执行顺序,一定是依次执行的。

你们使用Redis的SortedSet来保存榜单数据,如果用户量非常多怎么办?

首先Redis的SortedSet底层利用了跳表机制,性能还是非常不错的。即便有百万级别的用户量,利用SortedSet也没什么问题,性能上也能得到保证。在我们的项目用户量下,完全足够。
当系统用户量规模达到数千万,乃至数亿时,我们可以采用分治的思想,将用户数据按照积分范围划分为多个桶。
然后为每个桶创建一个SortedSet类型的key,这样就可以将数据分散,减少单个KEY的数据规模了。
而要计算排名时,只需要按照范围查询出用户积分所在的桶,再累加分值范围比他高的桶的用户数量即可。依然非常简单、高效。

你们使用历史榜单采用的定时任务框架是哪个?处理数百万的榜单数据时任务是如何分片的?你们是如何确保多个任务依次执行的呢?

我们采用的是XXL-JOB框架。
XXL-JOB自带任务分片广播机制,每一个任务执行器都能通过API得到自己的分片编号、总分片数量。在做榜单数据批处理时,我们是按照分页查询的方式:
- 每个执行器的读取的起始页都是自己的分片编号+1,例如第一个执行器,其起始页就是1,第二个执行器,其起始页就是2,以此类推
- 然后不是逐页查询,而是有一个页的跨度,跨度值就是分片总数量。例如分了3片,那么跨度就是3
此时,第一个分片处理的数据就是第1、4、7、10、13等几页数据,第二个分片处理的就是第2、5、8、11、14等页的数据,第三个分片处理的就是第3、6、9、12、15等页的数据。
这样就能确保所有数据都会被处理,而且每一个执行器都执行的是不同的数据了。

最后,要确保多个任务的执行顺序,可以利用XXL-JOB中的子任务功能。比如有任务A、B、C,要按照字母顺序依次执行,我们就可以将C设置为B的子任务,再将B设置为A的子任务。然后给A设置一个触发器。

这样,当A触发时,就会依次执行这三个任务了。

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录