背景说明
快递在途中时,客户关心物流实时状态;
并发量是有一定要求的,在电商大促期间,快件数量、查询人量大;此处必须是缓存应用,方可顶住并发!
需求分析
用户寄件后,是需要查看运单的运输详情,也就是需要查看整个转运节点,类似这样
可以看出,物流信息中有状态、时间、具体信息、快递员姓名、快递员联系方式等信息。
实现分析
基于上面的需求分析,我们该如何实现呢?首先要分析一下物流信息功能的特点: ● 数据量大 ● 查询频率高(签收后查询频率低)
-
数据量大,这个挑战是在存储方面,如果我们做技术选型的话,无非就是两种选择,一种是关系型数据库,另一种是非关系型数据库,选择MongoDB存储要比MySQL更合适一些。
-
运单在签收之前,查询的频率是非常高的,用户可能会不断的刷物流信息,一般解决查询并发高的解决方案是通过缓存解决,我们也将对查询数据进行缓存。
MySQL实现
假如使用MySQL
查询运单号【SL920733749248】的物流信息:
SELECT
*
FROM
sl_transport_info
WHERE
transport_order_id = 'SL920733749248'
ORDER BY
created ASC
MongoDB实现
基于MongoDB的实现,可以充分利用MongoDB数据结构的特点,可以这样存储:
{
"_id": ObjectId("62c6c679a1222549d64ba01e"),
"transportOrderId": "SL1000000000585",
"infoList": [
{
"created": NumberLong("1657192271195"),
"info": "神领快递员已取件, 取件人【快递员,电话 18810966207}】",
"status": "已取件"
},
{
"created": NumberLong("1657192328518"),
"info": "神领快递员已取件, 取件人【快递员,电话 18810966207}】",
"status": "已取件"
}
],
"created": NumberLong("1657194104987"),
"updated": NumberLong("1657194105064"),
"_class": "com.sl.transport.info.entity.TransportInfoEntity"
}
如果有新的信息加入的话,只需要向【infoList】中插入元素即可,查询的话按照【transportOrderId】条件查询。
db.sl_transport_info.find({"transportOrderId":"SL1000000000585"})
分析
MySQL存储在一张表中,每条物流信息就是一条行数据,数据条数将是运单数量的数倍
,查询时需要通过运单id作为条件,按照时间正序排序得到所有的结果;
MongoDB存储基于其自身特点可以将物流信息列表存储到属性中,数据量等于运单量
,查询时只需要按照运单id查询即可
毫无疑问,我们将基于MongoDB进行实现
功能实现
记录物流信息
分析
通过前面的需求分析,可以发现新增物流信息的节点比较多,在取件、派件、物流转运环节都有记录物流信息,站在整体架构的方面的考虑,该如何在众多的业务点钟记录物流信息呢?
一般而言,会有两种方式,一种是微服务直接调用,另一种是通过消息的方式调用,也就是同步和异步的方式。选择哪种方式比较好呢?
在这里,我们选择通过消息的方式,主要原因有两个:
- 物流信息数据的更新的实时性并不高,例如,运单到达某个转运中心,晚几分种记录信息也是可以的。
- 更新数据时,并发量比较大,例如,一辆车装了几千或几万个包裹,到达某个转运中心后,司机入库时,需要一下记录几千或几万个运单的物流数据,在这一时刻并发量是比较大的,通过消息的方式,可以进行对流量削峰,从而保障系统的稳定性。
消息体
{
"info": "您的快件已到达【$organId】",
"status": "运输中",
"organId": 1012479939628238305,
"transportOrderId": "SL920733749248",
"created": 1653133234913
}
可以看出,在消息中,有具体信息、状态、机构id、运单号、时间,其中在info字段中,约定通过$organId占位符表示机构,也就是,需要通过传入的organId查询机构名称替换到info中,当然了,如果没有机构,无需替换。
/**
* 物流信息消息
*/
@Component
public class TransportInfoMQListener {
@Resource
private OrganFeign organFeign;
@Resource
private TransportInfoService transportInfoService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = Constants.MQ.Queues.TRANSPORT_INFO_APPEND),
exchange = @Exchange(name = Constants.MQ.Exchanges.TRANSPORT_INFO, type = ExchangeTypes.TOPIC),
key = Constants.MQ.RoutingKeys.TRANSPORT_INFO_APPEND
))
public void listenTransportInfoMsg(String msg) {
//{"info":"您的快件已到达【$organId】", "status":"运输中", "organId":90001, "transportOrderId":920733749248 , "created":1653133234913}
TransportInfoMsg transportInfoMsg = JSONUtil.toBean(msg, TransportInfoMsg.class);
Long organId = transportInfoMsg.getOrganId();
String transportOrderId = Convert.toStr(transportInfoMsg.getTransportOrderId());
String info = transportInfoMsg.getInfo();
//查询机构信息
if (StrUtil.contains(info, "$organId")) {
OrganDTO organDTO = this.organFeign.queryById(organId);
if (organDTO == null) {
return;
}
info = StrUtil.replace(info, "$organId", organDTO.getName());
}
//封装Detail对象
TransportInfoDetail infoDetail = TransportInfoDetail.builder()
.info(info)
.status(transportInfoMsg.getStatus())
.created(transportInfoMsg.getCreated()).build();
//存储到MongoDB
this.transportInfoService.saveOrUpdate(transportOrderId, infoDetail);
}
}
多级缓存解决方案
目前我们已经实现了物流信息的保存、更新操作,基本功能已经了ok了,但是有个问题我们还没解决,就是前面提到的并发大的问题,一般而言,解决查询并发大的问题,常见的手段是为查询接口增加缓存,从而可以减轻持久层的压力。
按照我们以往的经验,在查询接口中增加Redis缓存即可,将查询的结果数据存储到Redis中,执行查询时首先从Redis中命中,如果命中直接返回即可,没有命中查询MongoDB,将解决写入到Redis中。 这样就解决问题了吗?其实并不是,试想一下,如果Redis宕机了或者是Redis中的数据大范围的失效,这样大量的并发压力就会进入持久层,会对持久层有较大的影响,甚至可能直接崩溃。 如何解决该问题呢,可以通过多级缓存的解决方案来进行解决。
由上图可以看出,在用户的一次请求中,可以设置多个缓存以提升查询的性能,能够快速响应。
- 浏览器的本地缓存
- 使用Nginx作为反向代理的架构时,可以启用Nginx的本地缓存,对于代理数据进行缓存
- 如果Nginx的本地缓存未命中,可以在Nginx中编写Lua脚本从Redis中命中数据
- 如果Redis依然没有命中的话,请求就会进入到Tomcat,也就是执行我们写的程序,在程序中可以设置进程级的缓存,如果命中直接返回即可。
- 如果进程级的缓存依然没有命中的话,请求才会进入到持久层查询数据。
以上就是多级缓存的基本的设计思路,其核心思想就是让每一个请求节点尽可能的进行缓存操作。
一级缓存
/**
* Caffeine缓存配置
*/
@Configuration
public class CaffeineConfig {
@Value("${caffeine.init}")
private Integer init;
@Value("${caffeine.max}")
private Integer max;
@Bean
public Cache<String, TransportInfoDTO> transportInfoCache() {
return Caffeine.newBuilder()
.initialCapacity(init)
.maximumSize(max).build();
}
}
具体的配置配置在Nacos
实现缓存逻辑
/**
* 根据运单id查询运单信息
*
* @param transportOrderId 运单号
* @return 运单信息
*/
@ApiImplicitParams({
@ApiImplicitParam(name = "transportOrderId", value = "运单id")
})
@ApiOperation(value = "查询", notes = "根据运单id查询物流信息")
@GetMapping("{transportOrderId}")
public TransportInfoDTO queryByTransportOrderId(@PathVariable("transportOrderId") String transportOrderId) {
TransportInfoDTO transportInfoDTO = this.transportInfoCache.get(transportOrderId, s -> {
TransportInfoEntity transportInfoEntity = this.transportInfoService.queryByTransportOrderId(transportOrderId);
return BeanUtil.toBean(transportInfoEntity, TransportInfoDTO.class);
});
if (ObjectUtil.isNotEmpty(transportInfoDTO)) {
return transportInfoDTO;
}
throw new SLException(ExceptionEnum.NOT_FOUND);
}
测试
未命中场景:
已命中:
结果
二级缓存
Redis配置
Spring Cache默认是采用jdk的对象序列化方式,这种方式比较占用空间而且性能差,所以往往会将值以json的方式存储,此时就需要对RedisCacheManager进行自定义的配置。
/**
* Redis相关的配置
*/
@Configuration
public class RedisConfig {
/**
* 存储的默认有效期时间,单位:小时
*/
@Value("${redis.ttl:1}")
private Integer redisTtl;
@Bean
public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
// 默认配置
RedisCacheConfiguration defaultCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
// 设置key的序列化方式为字符串
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
// 设置value的序列化方式为json格式
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues() // 不缓存null
.entryTtl(Duration.ofHours(redisTtl)); // 默认缓存数据保存1小时
// 构redis缓存管理器
RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder
.fromConnectionFactory(redisTemplate.getConnectionFactory())
.cacheDefaults(defaultCacheConfiguration)
.transactionAware() // 只在事务成功提交后才会进行缓存的put/evict操作
.build();
return redisCacheManager;
}
}
缓存注解
- 接下来需要在Service中增加SpringCache的注解,确保数据可以保存、更新数据到Redis
@Override
@CachePut(value = "transport-info", key = "#p0") //更新缓存数据
public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
//省略代码
}
@Override
@Cacheable(value = "transport-info", key = "#p0") //新增缓存数据
public TransportInfoEntity queryByTransportOrderId(String transportOrderId) {
//省略代码
}
测试
重启服务,进行功能测试,发现数据可以正常写入到Redis中,并且查询时二级缓存已经生效
一级缓存更新的问题
更新物流信息时,只是更新了Redis中的数据,并没有更新Caffeine中的数据,需要在更新数据时将Caffeine中相应的数据删除
@Resource
private Cache<String, TransportInfoDTO> transportInfoCache;
@Override
@CachePut(value = "transport-info", key = "#p0") //更新缓存数据
public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
//省略代码
//清除缓存中的数据
this.transportInfoCache.invalidate(transportOrderId);
//保存/更新到MongoDB
return this.mongoTemplate.save(transportInfoEntity);
}
分布式场景下的问题
通过前面的解决,视乎可以完成一级、二级缓存中数据的同步,如果在单节点项目中是没有问题的,但是,在分布式场景下是有问题的,看下图:
说明:
- 部署了2个transport-info微服务节点,每个微服务都有自己进程级的一级缓存,都共享同一个Redis作为二级缓存
- 假设,所有节点的一级和二级缓存都是空的,此时,用户通过节点1查询运单物流信息,在完成后,节点1的caffeine和Redis中都会有数据
- 接着,系统通过节点2更新了物流数据,此时节点2中的caffeine和Redis都是更新后的数据
- 用户还是进行查询动作,依然是通过节点1查询,此时查询到的将是旧的数据,也就是出现了一级缓存与二级缓存之间的数据不一致的问题
问题解决
如何解决该问题呢?可以通过消息的方式解决,就是任意一个节点数据更新了数据,发个消息出来,通知其他节点,其他节点接收到消息后,将自己caffeine中相应的数据删除即可。
关于消息的实现,可以采用RabbitMQ,也可以采用Redis的消息订阅发布来实现,在这里为了应用技术的多样化,所以采用Redis的订阅发布来实现。
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
当有新消息通过 publish 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端。
Redis的订阅发布功能与传统的消息中间件(如:RabbitMQ)相比,相对轻量一些,针对数据准确和安全性要求没有那么高的场景可以直接使用。
public static final String CHANNEL_TOPIC = "sl-express-ms-transport-info-caffeine";
/**
* 配置订阅,用于解决Caffeine一致性的问题
*
* @param connectionFactory 链接工厂
* @param listenerAdapter 消息监听器
* @return 消息监听容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new ChannelTopic(CHANNEL_TOPIC));
return container;
}
编写RedisMessageListener用于监听消息,删除caffeine中的数据。
/**
* redis消息监听,解决Caffeine一致性的问题
*/
@Component
public class RedisMessageListener extends MessageListenerAdapter {
@Resource
private Cache<String, TransportInfoDTO> transportInfoCache;
@Override
public void onMessage(Message message, byte[] pattern) {
//获取到消息中的运单id
String transportOrderId = Convert.toStr(message);
//将本jvm中的缓存删除掉
this.transportInfoCache.invalidate(transportOrderId);
}
}
更新数据后发送消息:
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
@CachePut(value = "transport-info", key = "#p0")
public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
//省略代码
//清除缓存中的数据
// this.transportInfoCache.invalidate(transportOrderId);
//发布订阅消息到redis
this.stringRedisTemplate.convertAndSend(RedisConfig.CHANNEL_TOPIC, transportOrderId);
//保存/更新到MongoDB
return this.mongoTemplate.save(transportInfoEntity);
}
评论( 0 )