小蔡学Java

项目二总结:(十一)MongoDB与多级缓存的引入

2024-03-11 08:07 769 0 项目 高并发缓存一致性Redis优化优化RT(响应时间)

背景说明

快递在途中时,客户关心物流实时状态;

并发量是有一定要求的,在电商大促期间,快件数量、查询人量大;此处必须是缓存应用,方可顶住并发!

需求分析

用户寄件后,是需要查看运单的运输详情,也就是需要查看整个转运节点,类似这样

可以看出,物流信息中有状态、时间、具体信息、快递员姓名、快递员联系方式等信息。

实现分析

基于上面的需求分析,我们该如何实现呢?首先要分析一下物流信息功能的特点: ● 数据量大 ● 查询频率高(签收后查询频率低)

  • 数据量大,这个挑战是在存储方面,如果我们做技术选型的话,无非就是两种选择,一种是关系型数据库,另一种是非关系型数据库,选择MongoDB存储要比MySQL更合适一些。

  • 运单在签收之前,查询的频率是非常高的,用户可能会不断的刷物流信息,一般解决查询并发高的解决方案是通过缓存解决,我们也将对查询数据进行缓存。

MySQL实现

假如使用MySQL

查询运单号【SL920733749248】的物流信息:

SELECT
	* 
FROM
	sl_transport_info 
WHERE
	transport_order_id = 'SL920733749248' 
ORDER BY
	created ASC

MongoDB实现

基于MongoDB的实现,可以充分利用MongoDB数据结构的特点,可以这样存储:

{
    "_id": ObjectId("62c6c679a1222549d64ba01e"),
    "transportOrderId": "SL1000000000585",
    "infoList": [
        {
            "created": NumberLong("1657192271195"),
            "info": "神领快递员已取件, 取件人【快递员,电话 18810966207}】",
            "status": "已取件"
        },
        {
            "created": NumberLong("1657192328518"),
            "info": "神领快递员已取件, 取件人【快递员,电话 18810966207}】",
            "status": "已取件"
        }
    ],
    "created": NumberLong("1657194104987"),
    "updated": NumberLong("1657194105064"),
    "_class": "com.sl.transport.info.entity.TransportInfoEntity"
}

如果有新的信息加入的话,只需要向【infoList】中插入元素即可,查询的话按照【transportOrderId】条件查询。

db.sl_transport_info.find({"transportOrderId":"SL1000000000585"})

分析

MySQL存储在一张表中,每条物流信息就是一条行数据,数据条数将是运单数量的数倍,查询时需要通过运单id作为条件,按照时间正序排序得到所有的结果;

MongoDB存储基于其自身特点可以将物流信息列表存储到属性中,数据量等于运单量,查询时只需要按照运单id查询即可

毫无疑问,我们将基于MongoDB进行实现

功能实现

记录物流信息

分析

通过前面的需求分析,可以发现新增物流信息的节点比较多,在取件、派件、物流转运环节都有记录物流信息,站在整体架构的方面的考虑,该如何在众多的业务点钟记录物流信息呢?

一般而言,会有两种方式,一种是微服务直接调用,另一种是通过消息的方式调用,也就是同步和异步的方式。选择哪种方式比较好呢?

在这里,我们选择通过消息的方式,主要原因有两个:

  • 物流信息数据的更新的实时性并不高,例如,运单到达某个转运中心,晚几分种记录信息也是可以的。
  • 更新数据时,并发量比较大,例如,一辆车装了几千或几万个包裹,到达某个转运中心后,司机入库时,需要一下记录几千或几万个运单的物流数据,在这一时刻并发量是比较大的,通过消息的方式,可以进行对流量削峰,从而保障系统的稳定性。

消息体

{
    "info": "您的快件已到达【$organId】",
    "status": "运输中",
    "organId": 1012479939628238305,
    "transportOrderId": "SL920733749248",
    "created": 1653133234913
}

可以看出,在消息中,有具体信息、状态、机构id、运单号、时间,其中在info字段中,约定通过$organId占位符表示机构,也就是,需要通过传入的organId查询机构名称替换到info中,当然了,如果没有机构,无需替换。

/**
 * 物流信息消息
 */
@Component
public class TransportInfoMQListener {

    @Resource
    private OrganFeign organFeign;
    @Resource
    private TransportInfoService transportInfoService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.TRANSPORT_INFO_APPEND),
            exchange = @Exchange(name = Constants.MQ.Exchanges.TRANSPORT_INFO, type = ExchangeTypes.TOPIC),
            key = Constants.MQ.RoutingKeys.TRANSPORT_INFO_APPEND
    ))
    public void listenTransportInfoMsg(String msg) {
        //{"info":"您的快件已到达【$organId】", "status":"运输中", "organId":90001, "transportOrderId":920733749248 , "created":1653133234913}
        TransportInfoMsg transportInfoMsg = JSONUtil.toBean(msg, TransportInfoMsg.class);
        Long organId = transportInfoMsg.getOrganId();
        String transportOrderId = Convert.toStr(transportInfoMsg.getTransportOrderId());
        String info = transportInfoMsg.getInfo();

        //查询机构信息
        if (StrUtil.contains(info, "$organId")) {
            OrganDTO organDTO = this.organFeign.queryById(organId);
            if (organDTO == null) {
                return;
            }
            info = StrUtil.replace(info, "$organId", organDTO.getName());
        }

        //封装Detail对象
        TransportInfoDetail infoDetail = TransportInfoDetail.builder()
                .info(info)
                .status(transportInfoMsg.getStatus())
                .created(transportInfoMsg.getCreated()).build();

        //存储到MongoDB
        this.transportInfoService.saveOrUpdate(transportOrderId, infoDetail);
    }
}

多级缓存解决方案

目前我们已经实现了物流信息的保存、更新操作,基本功能已经了ok了,但是有个问题我们还没解决,就是前面提到的并发大的问题,一般而言,解决查询并发大的问题,常见的手段是为查询接口增加缓存,从而可以减轻持久层的压力。

按照我们以往的经验,在查询接口中增加Redis缓存即可,将查询的结果数据存储到Redis中,执行查询时首先从Redis中命中,如果命中直接返回即可,没有命中查询MongoDB,将解决写入到Redis中。 这样就解决问题了吗?其实并不是,试想一下,如果Redis宕机了或者是Redis中的数据大范围的失效,这样大量的并发压力就会进入持久层,会对持久层有较大的影响,甚至可能直接崩溃。 如何解决该问题呢,可以通过多级缓存的解决方案来进行解决。

由上图可以看出,在用户的一次请求中,可以设置多个缓存以提升查询的性能,能够快速响应。

  • 浏览器的本地缓存
  • 使用Nginx作为反向代理的架构时,可以启用Nginx的本地缓存,对于代理数据进行缓存
  • 如果Nginx的本地缓存未命中,可以在Nginx中编写Lua脚本从Redis中命中数据
  • 如果Redis依然没有命中的话,请求就会进入到Tomcat,也就是执行我们写的程序,在程序中可以设置进程级的缓存,如果命中直接返回即可。
  • 如果进程级的缓存依然没有命中的话,请求才会进入到持久层查询数据。

以上就是多级缓存的基本的设计思路,其核心思想就是让每一个请求节点尽可能的进行缓存操作。

一级缓存

/**
 * Caffeine缓存配置
 */
@Configuration
public class CaffeineConfig {

    @Value("${caffeine.init}")
    private Integer init;
    @Value("${caffeine.max}")
    private Integer max;

    @Bean
    public Cache<String, TransportInfoDTO> transportInfoCache() {
        return Caffeine.newBuilder()
                .initialCapacity(init)
                .maximumSize(max).build();
    }

}

具体的配置配置在Nacos

实现缓存逻辑

    /**
     * 根据运单id查询运单信息
     *
     * @param transportOrderId 运单号
     * @return 运单信息
     */
    @ApiImplicitParams({
            @ApiImplicitParam(name = "transportOrderId", value = "运单id")
    })
    @ApiOperation(value = "查询", notes = "根据运单id查询物流信息")
    @GetMapping("{transportOrderId}")
    public TransportInfoDTO queryByTransportOrderId(@PathVariable("transportOrderId") String transportOrderId) {
        TransportInfoDTO transportInfoDTO = this.transportInfoCache.get(transportOrderId, s -> {
            TransportInfoEntity transportInfoEntity = this.transportInfoService.queryByTransportOrderId(transportOrderId);
            return BeanUtil.toBean(transportInfoEntity, TransportInfoDTO.class);
        });

        if (ObjectUtil.isNotEmpty(transportInfoDTO)) {
            return transportInfoDTO;
        }
        throw new SLException(ExceptionEnum.NOT_FOUND);
    }

测试

未命中场景:

已命中:

结果

二级缓存

Redis配置

Spring Cache默认是采用jdk的对象序列化方式,这种方式比较占用空间而且性能差,所以往往会将值以json的方式存储,此时就需要对RedisCacheManager进行自定义的配置。

/**
 * Redis相关的配置
 */
@Configuration
public class RedisConfig {

    /**
     * 存储的默认有效期时间,单位:小时
     */
    @Value("${redis.ttl:1}")
    private Integer redisTtl;

    @Bean
    public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
        // 默认配置
        RedisCacheConfiguration defaultCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                // 设置key的序列化方式为字符串
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
                // 设置value的序列化方式为json格式
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()))
                .disableCachingNullValues() // 不缓存null
                .entryTtl(Duration.ofHours(redisTtl));  // 默认缓存数据保存1小时

        // 构redis缓存管理器
        RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder
                .fromConnectionFactory(redisTemplate.getConnectionFactory())
                .cacheDefaults(defaultCacheConfiguration)
                .transactionAware() // 只在事务成功提交后才会进行缓存的put/evict操作
                .build();
        return redisCacheManager;
    }
}

缓存注解

  • 接下来需要在Service中增加SpringCache的注解,确保数据可以保存、更新数据到Redis
    @Override
    @CachePut(value = "transport-info", key = "#p0") //更新缓存数据
    public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
        //省略代码
    }

    @Override
    @Cacheable(value = "transport-info", key = "#p0") //新增缓存数据
    public TransportInfoEntity queryByTransportOrderId(String transportOrderId) {
       //省略代码
    }

测试

重启服务,进行功能测试,发现数据可以正常写入到Redis中,并且查询时二级缓存已经生效

一级缓存更新的问题

更新物流信息时,只是更新了Redis中的数据,并没有更新Caffeine中的数据,需要在更新数据时将Caffeine中相应的数据删除

    @Resource
    private Cache<String, TransportInfoDTO> transportInfoCache;

	@Override
    @CachePut(value = "transport-info", key = "#p0") //更新缓存数据
    public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
        //省略代码

        //清除缓存中的数据
        this.transportInfoCache.invalidate(transportOrderId);

        //保存/更新到MongoDB
        return this.mongoTemplate.save(transportInfoEntity);
    }

分布式场景下的问题

通过前面的解决,视乎可以完成一级、二级缓存中数据的同步,如果在单节点项目中是没有问题的,但是,在分布式场景下是有问题的,看下图:

说明:

  • 部署了2个transport-info微服务节点,每个微服务都有自己进程级的一级缓存,都共享同一个Redis作为二级缓存
  • 假设,所有节点的一级和二级缓存都是空的,此时,用户通过节点1查询运单物流信息,在完成后,节点1的caffeine和Redis中都会有数据
  • 接着,系统通过节点2更新了物流数据,此时节点2中的caffeine和Redis都是更新后的数据
  • 用户还是进行查询动作,依然是通过节点1查询,此时查询到的将是旧的数据,也就是出现了一级缓存与二级缓存之间的数据不一致的问题

问题解决

如何解决该问题呢?可以通过消息的方式解决,就是任意一个节点数据更新了数据,发个消息出来,通知其他节点,其他节点接收到消息后,将自己caffeine中相应的数据删除即可。

关于消息的实现,可以采用RabbitMQ,也可以采用Redis的消息订阅发布来实现,在这里为了应用技术的多样化,所以采用Redis的订阅发布来实现。

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

当有新消息通过 publish 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端。 Redis的订阅发布功能与传统的消息中间件(如:RabbitMQ)相比,相对轻量一些,针对数据准确和安全性要求没有那么高的场景可以直接使用。

    public static final String CHANNEL_TOPIC = "sl-express-ms-transport-info-caffeine";

    /**
     * 配置订阅,用于解决Caffeine一致性的问题
     *
     * @param connectionFactory 链接工厂
     * @param listenerAdapter 消息监听器
     * @return 消息监听容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new ChannelTopic(CHANNEL_TOPIC));
        return container;
    }

编写RedisMessageListener用于监听消息,删除caffeine中的数据。

/**
 * redis消息监听,解决Caffeine一致性的问题
 */
@Component
public class RedisMessageListener extends MessageListenerAdapter {

    @Resource
    private Cache<String, TransportInfoDTO> transportInfoCache;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        //获取到消息中的运单id
        String transportOrderId = Convert.toStr(message);
        //将本jvm中的缓存删除掉
        this.transportInfoCache.invalidate(transportOrderId);
    }
}

更新数据后发送消息:

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    @CachePut(value = "transport-info", key = "#p0")
    public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
		//省略代码

        //清除缓存中的数据
        // this.transportInfoCache.invalidate(transportOrderId);
        //发布订阅消息到redis
        this.stringRedisTemplate.convertAndSend(RedisConfig.CHANNEL_TOPIC, transportOrderId);

        //保存/更新到MongoDB
        return this.mongoTemplate.save(transportInfoEntity);
    }

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录