说明
为了将MQ的使用相关的代码进行统一,所以将发送消息的代码、消费者的配置抽取到sl-express-mq工程中。
主要功能:
- 为RabbitTemplate设置了ReturnsCallback,如果消息发送到交换机但是没有到达队列,会进行日志的记录。
- 统一了配置了消息的消费,消费者如果处理消息失败,会进行重试,如果依然是失败的话,会将错误消息发送到error.queue队列,后续需要人工进行处理。
- 统一了发送消息代码,如果网络等异常情况导致发送消息失败会进行重试,如果依然失败的话将消息内容持久化到mysql数据库,后续通过xxl-job任务进行重新发送;如果其他情况导致失败,不会进行重试,直接存储消息到mysql数据库中。
错误消息记录表结构
编写配置
在springboot的配置文件中bootstrap-dev.yml
修改配置:
spring:
rabbitmq: #mq的配置
host: ${rabbitmq.host}
port: ${rabbitmq.port}
username: ${rabbitmq.username}
password: ${rabbitmq.password}
virtual-host: ${rabbitmq.virtual-host}
publisher-confirm-type: correlated #发送消息的异步回调,记录消息是否发送成功
publisher-returns: true #开启publish-return功能,消息到达交换机,但是没有到达对列表
template:
mandatory: true #消息路由失败时的策略, true: 调用ReturnCallback, false:丢弃消息
listener:
simple:
acknowledge-mode: auto #,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
发送消息失败场景解决
对于发送消息的场景,正常情况没有问题,直接发送即可:
如果是非正常情况就需要特殊处理了,一般会有三种非正常情况需要处理:
● 第一种情况,消息发送到交换机(exchange),但是没有队列与交换机绑定,消息会丢失。
● 第二种情况,在消息的发送后进行确认,如果发送失败需要将消息持久化,例如:发送的交换机不存在的情况。
● 第三种情况,由于网络、MQ服务宕机等原因导致消息没有发送到MQ服务器。
第一种情况:
对于消息只是到了交换机,并没有到达队列,这种情况记录日志即可,因为我们也不确定哪个队列需要这个消息。 配置如下(nacos中的shared-spring-rabbitmq.yml文件):
@Slf4j
@Configuration
public class MessageConfig implements ApplicationContextAware {
/**
* 发送者回执 没有路由到队列的情况
*
* @param applicationContext 应用上下文
* @throws BeansException 异常
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnsCallback(message -> {
if (StrUtil.contains(message.getExchange(), Constants.MQ.DELAYED_KEYWORD)) {
//延迟消息没有发到队列是正常情况,无需记录日志
return;
}
// 投递失败,记录日志
log.error("消息没有投递到队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",
message.getReplyCode(), message.getReplyText(), message.getExchange(), message.getRoutingKey(), message.getMessage());
});
}
}
第二种情况:
在配文件中开启配置publisher-confirm-type,即可在发送消息时添加回调方法:
在代码中进行处理,将消息数据持久化到数据库中,后续通过xxl-job进行处理,将消息进行重新发送。
同样,如果出现异常情况也是将消息持久化:
第三种情况:
将发送消息的代码进行try{}catch{}处理,如果出现异常会通过Spring-retry机制进重试,最多重试3次,如果依然失败就将消息数据进行持久化:
设置重试:
最终的落库操作:
xxl-job任务,主要负责从数据库中查询出错误消息数据然后进行重试:
/**
* 失败消息的处理任务
*/
@Slf4j
@Component
@ConditionalOnBean({MQService.class, FailMsgService.class})
public class FailMsgJob {
@Resource
private FailMsgService failMsgService;
@Resource
private MQService mqService;
@XxlJob("failMsgJob")
public void execute() {
//查询失败的数据,每次最多处理100条错误消息
LambdaQueryWrapper<FailMsgEntity> queryWrapper = new LambdaQueryWrapper<FailMsgEntity>()
.orderByAsc(FailMsgEntity::getCreated)
.last("limit 100");
List<FailMsgEntity> failMsgEntityList = this.failMsgService.list(queryWrapper);
if (CollUtil.isEmpty(failMsgEntityList)) {
return;
}
for (FailMsgEntity failMsgEntity : failMsgEntityList) {
try {
//发送消息
this.mqService.sendMsg(failMsgEntity.getExchange(), failMsgEntity.getRoutingKey(), failMsgEntity.getMsg());
//删除数据
this.failMsgService.removeById(failMsgEntity.getId());
} catch (Exception e) {
log.error("处理错误消息失败, failMsgEntity = {}", failMsgEntity);
}
}
}
}
xxl-job中的任务调度:
消费消息
对于消息的消费,首先采用的自动确认策略:
如果出现消费错误,会进行重试,最多重试3次:
如果3次后依然失败,需要将消息发送到指定的队列,为了区分不同的微服务,所以会针对不同微服务创建不同的队列,但是交换机是同一个:
@Configuration
public class ErrorMessageConfig {
@Value("${spring.application.name}") //获取微服务的名称
private String appName;
@Bean
public TopicExchange errorMessageExchange() {
//定义错误消息的交换机,类型为:topic
return new TopicExchange(Constants.MQ.Exchanges.ERROR, true, false);
}
@Bean
public Queue errorQueue() {
//【前缀+微服务】名作为错误消息存放的队列名称,并且开启了持久化
return new Queue(Constants.MQ.Queues.ERROR_PREFIX + appName, true);
}
@Bean
public Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange) {
//完成绑定关系
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(appName);
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
//设置全部重试失败后进行重新发送消息,指定了交换机以及路由key
//需要注意的是,路由key是应用名称,与上述的绑定关系中的路由key一致
return new RepublishMessageRecoverer(rabbitTemplate, Constants.MQ.Exchanges.ERROR, appName);
}
}
最终会以微服务名称创建队列:
其绑定关系如下:
统一封装
为了在各个微服务中方便发送消息,所以在sl-express-ms-base微服务中进行了封装,使用时com.sl.ms.base.api.common.MQFeign调用即可。
在base微服务中添加了配置以及启用Spring-retry机制:
使用示例:
发送时指定交换机、路由key、消息内容、延时时间(毫秒)即可。
封装后的MQ的使用
机构同步
机构的新增、更新、删除是在权限管家中完成的,需要是操作后同步到路线规划微服务中,这里采用的是MQ消息通知的方式。
业务流程
权限管家配置
权限管家的MQ配置是在 /itcast/itcast-auth-server/application-test.properties文件中,如下:
可以看出,消息发往的交换机为:itcast-auth,交换机的类型为:topic 发送消息的规则如下:
● 消息为json字符串 ○ 如:
{
"type": "ORG",
"content": [
{
"managerId": "1",
"parentId": "0",
"name": "测试组织",
"id": "973902113476182273",
"status": true
}
],
"operation": "UPDATE"
}
● type表示变更的对象,比如组织:ORG ● content为更改对象列表 ● operation类型列表 ○ 新增-ADD ○ 修改-UPDATE ○ 删除-DEL
业务规范
上图是在权限管家中新增组织的界面,可以从界面中看出,添加的组织并没有标识是【网点】还是【转运中心】,所以,在这里我们做一下约定,按照机构名称的后缀进行区分,具体规则如下: ● xxx转运中心 → 一级转运中心(OLT) ● xxx分拣中心 → 二级转运中心 (TLT) ● xxx营业部 → 网点(AGENCY)
代码实现
/**
* 对于权限管家系统消息的处理
*/
@Slf4j
@Component
public class AuthMQListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),
exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),
key = "#"
))
public void listenAgencyMsg(String msg) {
//{"type":"ORG","operation":"ADD","content":[{"id":"977263044792942657","name":"55","parentId":"0","managerId":null,"status":true}]}
log.info("接收到消息 -> {}", msg);
JSONObject jsonObject = JSONUtil.parseObj(msg);
String type = jsonObject.getStr("type");
if (!StrUtil.equalsIgnoreCase(type, "ORG")) {
//非机构消息
return;
}
String operation = jsonObject.getStr("operation");
JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
String name = content.getStr("name");
Long parentId = content.getLong("parentId");
IService iService;
BaseEntity entity;
if (StrUtil.endWith(name, "转运中心")) {
//一级转运中心
iService = OrganServiceFactory.getBean(OrganTypeEnum.OLT.getCode());
entity = new OLTEntity();
entity.setParentId(0L);
} else if (StrUtil.endWith(name, "分拣中心")) {
//二级转运中心
iService = OrganServiceFactory.getBean(OrganTypeEnum.TLT.getCode());
entity = new TLTEntity();
entity.setParentId(parentId);
} else if (StrUtil.endWith(name, "营业部")) {
//网点
iService = OrganServiceFactory.getBean(OrganTypeEnum.AGENCY.getCode());
entity = new AgencyEntity();
entity.setParentId(parentId);
} else {
return;
}
//设置参数
entity.setBid(content.getLong("id"));
entity.setName(name);
entity.setStatus(content.getBool("status"));
switch (operation) {
case "ADD": {
iService.create(entity);
break;
}
case "UPDATE": {
iService.update(entity);
break;
}
case "DEL": {
iService.deleteByBid(entity.getBid());
break;
}
}
}
}
这里的设计是采用的Mybatis-plus的设计
IService
在Service中一些方法是通用的,比如新增、更新、删除等,这个通用的方法可以写到一个Service中,其他的Service继承该Service即可。
/**
* 基础服务实现
*/
public interface IService<T extends BaseEntity> {
/**
* 根据业务id查询数据
*
* @param bid 业务id
* @return 节点数据
*/
T queryByBid(Long bid);
/**
* 新增节点
*
* @param t 节点数据
* @return 新增的节点数据
*/
T create(T t);
/**
* 更新节点
*
* @param t 节点数据
* @return 更新的节点数据
*/
T update(T t);
/**
* 根据业务id删除数据
*
* @param bid 业务id
* @return 是否删除成功
*/
Boolean deleteByBid(Long bid);
}
ServiceImpl
下面编写具体的实现类:
/**
* 基础服务的实现
*/
public class ServiceImpl<R extends BaseRepository, T extends BaseEntity> implements IService<T> {
@Autowired
private R repository;
@Override
public T queryByBid(Long bid) {
return (T) this.repository.findByBid(bid).orElse(null);
}
@Override
public T create(T t) {
t.setId(null);//id由neo4j自动生成
return (T) this.repository.save(t);
}
@Override
public T update(T t) {
//先查询,再更新
T tData = this.queryByBid(t.getBid());
if (ObjectUtil.isEmpty(tData)) {
return null;
}
BeanUtil.copyProperties(t, tData, CopyOptions.create().ignoreNullValue().setIgnoreProperties("id", "bid"));
return (T) this.repository.save(tData);
}
@Override
public Boolean deleteByBid(Long bid) {
return this.repository.deleteByBid(bid) > 0;
}
}
然后将各个节点分别对应的继承 ServiceImpl<TRepository, TEntity>
实现 TService
如下
@Service
public class AgencyServiceImpl extends ServiceImpl<AgencyRepository, AgencyEntity> implements AgencyService {
}
树形结构的使用
在后台系统中,对于机构数据的展现需要通过树形结构展现,如下:
所以在com.sl.transport.service.OrganService中findAllTree()方法中封装了树形结构。 具体的封装逻辑采用hutool工具包中的TreeUtil,参考文档:点击查看 代码实现如下:
@Override
public String findAllTree() {
List<OrganDTO> organList = this.findAll(null);
if (CollUtil.isEmpty(organList)) {
return "";
}
//构造树结构
List<Tree<Long>> treeNodes = TreeUtil.build(organList, 0L,
(organDTO, tree) -> {
tree.setId(organDTO.getId());
tree.setParentId(organDTO.getParentId());
tree.putAll(BeanUtil.beanToMap(organDTO));
tree.remove("bid");
});
try {
return this.objectMapper.writeValueAsString(treeNodes);
} catch (JsonProcessingException e) {
throw new SLException("序列化json出错!", e);
}
}
数据类似这样:
[
{
"id": "1012438698496623009",
"parentId": "0",
"name": "上海市转运中心",
"type": 1,
"phone": null,
"address": null,
"latitude": null,
"longitude": null,
"managerName": null,
"extra": null,
"status": true,
"children": [
{
"id": "1012479939628238305",
"parentId": "1012438698496623009",
"name": "浦东区分拣中心",
"type": 2,
"phone": null,
"address": null,
"latitude": null,
"longitude": null,
"managerName": null,
"extra": null,
"status": true
}
]
},
{
"id": "1012479716659037537",
"parentId": "0",
"name": "北京市转运中心",
"type": 1,
"phone": null,
"address": null,
"latitude": null,
"longitude": null,
"managerName": null,
"extra": null,
"status": true
}
]
评论( 0 )