小蔡学Java

项目二总结:(五)RabbitMQ是怎么处理消息丢失的及同步机构

2024-02-20 20:56 866 0 项目 RabbitMQ消息可靠性消费消息丢失

说明

为了将MQ的使用相关的代码进行统一,所以将发送消息的代码、消费者的配置抽取到sl-express-mq工程中。

主要功能:

  • 为RabbitTemplate设置了ReturnsCallback,如果消息发送到交换机但是没有到达队列,会进行日志的记录。
  • 统一了配置了消息的消费,消费者如果处理消息失败,会进行重试,如果依然是失败的话,会将错误消息发送到error.queue队列,后续需要人工进行处理。
  • 统一了发送消息代码,如果网络等异常情况导致发送消息失败会进行重试,如果依然失败的话将消息内容持久化到mysql数据库,后续通过xxl-job任务进行重新发送;如果其他情况导致失败,不会进行重试,直接存储消息到mysql数据库中。

错误消息记录表结构

编写配置

在springboot的配置文件中bootstrap-dev.yml修改配置:

spring:
  rabbitmq: #mq的配置
    host: ${rabbitmq.host}
    port: ${rabbitmq.port}
    username: ${rabbitmq.username}
    password: ${rabbitmq.password}
    virtual-host: ${rabbitmq.virtual-host}
    publisher-confirm-type: correlated #发送消息的异步回调,记录消息是否发送成功
    publisher-returns: true #开启publish-return功能,消息到达交换机,但是没有到达对列表
    template:
      mandatory: true #消息路由失败时的策略, true: 调用ReturnCallback, false:丢弃消息
    listener:
      simple:
        acknowledge-mode: auto #,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

发送消息失败场景解决

对于发送消息的场景,正常情况没有问题,直接发送即可:

如果是非正常情况就需要特殊处理了,一般会有三种非正常情况需要处理:

● 第一种情况,消息发送到交换机(exchange),但是没有队列与交换机绑定,消息会丢失。

● 第二种情况,在消息的发送后进行确认,如果发送失败需要将消息持久化,例如:发送的交换机不存在的情况。

● 第三种情况,由于网络、MQ服务宕机等原因导致消息没有发送到MQ服务器。

第一种情况:

对于消息只是到了交换机,并没有到达队列,这种情况记录日志即可,因为我们也不确定哪个队列需要这个消息。 配置如下(nacos中的shared-spring-rabbitmq.yml文件):

@Slf4j
@Configuration
public class MessageConfig implements ApplicationContextAware {

    /**
     * 发送者回执 没有路由到队列的情况
     *
     * @param applicationContext 应用上下文
     * @throws BeansException 异常
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnsCallback(message -> {
            if (StrUtil.contains(message.getExchange(), Constants.MQ.DELAYED_KEYWORD)) {
                //延迟消息没有发到队列是正常情况,无需记录日志
                return;
            }
            // 投递失败,记录日志
            log.error("消息没有投递到队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",
                    message.getReplyCode(), message.getReplyText(), message.getExchange(), message.getRoutingKey(), message.getMessage());
        });
    }

}

第二种情况:

在配文件中开启配置publisher-confirm-type,即可在发送消息时添加回调方法:

在代码中进行处理,将消息数据持久化到数据库中,后续通过xxl-job进行处理,将消息进行重新发送。

同样,如果出现异常情况也是将消息持久化:

第三种情况:

将发送消息的代码进行try{}catch{}处理,如果出现异常会通过Spring-retry机制进重试,最多重试3次,如果依然失败就将消息数据进行持久化:

设置重试:

最终的落库操作:

xxl-job任务,主要负责从数据库中查询出错误消息数据然后进行重试:


/**
 * 失败消息的处理任务
 */
@Slf4j
@Component
@ConditionalOnBean({MQService.class, FailMsgService.class})
public class FailMsgJob {

    @Resource
    private FailMsgService failMsgService;
    @Resource
    private MQService mqService;

    @XxlJob("failMsgJob")
    public void execute() {
        //查询失败的数据,每次最多处理100条错误消息
        LambdaQueryWrapper<FailMsgEntity> queryWrapper = new LambdaQueryWrapper<FailMsgEntity>()
                .orderByAsc(FailMsgEntity::getCreated)
                .last("limit 100");
        List<FailMsgEntity> failMsgEntityList = this.failMsgService.list(queryWrapper);
        if (CollUtil.isEmpty(failMsgEntityList)) {
            return;
        }

        for (FailMsgEntity failMsgEntity : failMsgEntityList) {
            try {
                //发送消息
                this.mqService.sendMsg(failMsgEntity.getExchange(), failMsgEntity.getRoutingKey(), failMsgEntity.getMsg());
                //删除数据
                this.failMsgService.removeById(failMsgEntity.getId());
            } catch (Exception e) {
                log.error("处理错误消息失败, failMsgEntity = {}", failMsgEntity);
            }
        }
    }
}

xxl-job中的任务调度:

消费消息

对于消息的消费,首先采用的自动确认策略:

如果出现消费错误,会进行重试,最多重试3次:

如果3次后依然失败,需要将消息发送到指定的队列,为了区分不同的微服务,所以会针对不同微服务创建不同的队列,但是交换机是同一个:

@Configuration
public class ErrorMessageConfig {

    @Value("${spring.application.name}") //获取微服务的名称
    private String appName;

    @Bean
    public TopicExchange errorMessageExchange() {
        //定义错误消息的交换机,类型为:topic
        return new TopicExchange(Constants.MQ.Exchanges.ERROR, true, false);
    }

    @Bean
    public Queue errorQueue() {
        //【前缀+微服务】名作为错误消息存放的队列名称,并且开启了持久化
        return new Queue(Constants.MQ.Queues.ERROR_PREFIX + appName, true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange) {
        //完成绑定关系
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(appName);
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        //设置全部重试失败后进行重新发送消息,指定了交换机以及路由key
        //需要注意的是,路由key是应用名称,与上述的绑定关系中的路由key一致
        return new RepublishMessageRecoverer(rabbitTemplate, Constants.MQ.Exchanges.ERROR, appName);
    }
}

最终会以微服务名称创建队列:

其绑定关系如下:

统一封装

为了在各个微服务中方便发送消息,所以在sl-express-ms-base微服务中进行了封装,使用时com.sl.ms.base.api.common.MQFeign调用即可。

在base微服务中添加了配置以及启用Spring-retry机制:

使用示例:

发送时指定交换机、路由key、消息内容、延时时间(毫秒)即可。

封装后的MQ的使用

机构同步

机构的新增、更新、删除是在权限管家中完成的,需要是操作后同步到路线规划微服务中,这里采用的是MQ消息通知的方式。

业务流程

权限管家配置

权限管家的MQ配置是在 /itcast/itcast-auth-server/application-test.properties文件中,如下:

可以看出,消息发往的交换机为:itcast-auth,交换机的类型为:topic 发送消息的规则如下:

● 消息为json字符串 ○ 如:

{
  "type": "ORG",
  "content": [
      {
          "managerId": "1",
          "parentId": "0",
          "name": "测试组织",
          "id": "973902113476182273",
          "status": true
      }
  ],
  "operation": "UPDATE"
}

● type表示变更的对象,比如组织:ORG ● content为更改对象列表 ● operation类型列表 ○ 新增-ADD ○ 修改-UPDATE ○ 删除-DEL

业务规范

上图是在权限管家中新增组织的界面,可以从界面中看出,添加的组织并没有标识是【网点】还是【转运中心】,所以,在这里我们做一下约定,按照机构名称的后缀进行区分,具体规则如下: ● xxx转运中心 → 一级转运中心(OLT) ● xxx分拣中心 → 二级转运中心 (TLT) ● xxx营业部 → 网点(AGENCY)

代码实现


/**
 * 对于权限管家系统消息的处理
 */
@Slf4j
@Component
public class AuthMQListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),
            exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),
            key = "#"
    ))
    public void listenAgencyMsg(String msg) {
        //{"type":"ORG","operation":"ADD","content":[{"id":"977263044792942657","name":"55","parentId":"0","managerId":null,"status":true}]}
        log.info("接收到消息 -> {}", msg);
        JSONObject jsonObject = JSONUtil.parseObj(msg);
        String type = jsonObject.getStr("type");
        if (!StrUtil.equalsIgnoreCase(type, "ORG")) {
            //非机构消息
            return;
        }
        String operation = jsonObject.getStr("operation");
        JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
        String name = content.getStr("name");
        Long parentId = content.getLong("parentId");

        IService iService;
        BaseEntity entity;
        if (StrUtil.endWith(name, "转运中心")) {
            //一级转运中心
            iService = OrganServiceFactory.getBean(OrganTypeEnum.OLT.getCode());
            entity = new OLTEntity();
            entity.setParentId(0L);
        } else if (StrUtil.endWith(name, "分拣中心")) {
            //二级转运中心
            iService = OrganServiceFactory.getBean(OrganTypeEnum.TLT.getCode());
            entity = new TLTEntity();
            entity.setParentId(parentId);
        } else if (StrUtil.endWith(name, "营业部")) {
            //网点
            iService = OrganServiceFactory.getBean(OrganTypeEnum.AGENCY.getCode());
            entity = new AgencyEntity();
            entity.setParentId(parentId);
        } else {
            return;
        }

        //设置参数
        entity.setBid(content.getLong("id"));
        entity.setName(name);
        entity.setStatus(content.getBool("status"));

        switch (operation) {
            case "ADD": {
                iService.create(entity);
                break;
            }
            case "UPDATE": {
                iService.update(entity);
                break;
            }
            case "DEL": {
                iService.deleteByBid(entity.getBid());
                break;
            }
        }

    }

}

这里的设计是采用的Mybatis-plus的设计

IService

在Service中一些方法是通用的,比如新增、更新、删除等,这个通用的方法可以写到一个Service中,其他的Service继承该Service即可。

/**
 * 基础服务实现
 */
public interface IService<T extends BaseEntity> {

    /**
     * 根据业务id查询数据
     *
     * @param bid 业务id
     * @return 节点数据
     */
    T queryByBid(Long bid);

    /**
     * 新增节点
     *
     * @param t 节点数据
     * @return 新增的节点数据
     */
    T create(T t);

    /**
     * 更新节点
     *
     * @param t 节点数据
     * @return 更新的节点数据
     */
    T update(T t);

    /**
     * 根据业务id删除数据
     *
     * @param bid 业务id
     * @return 是否删除成功
     */
    Boolean deleteByBid(Long bid);

}

ServiceImpl

下面编写具体的实现类:

/**
 * 基础服务的实现
 */
public class ServiceImpl<R extends BaseRepository, T extends BaseEntity> implements IService<T> {

    @Autowired
    private R repository;

    @Override
    public T queryByBid(Long bid) {
        return (T) this.repository.findByBid(bid).orElse(null);
    }

    @Override
    public T create(T t) {
        t.setId(null);//id由neo4j自动生成
        return (T) this.repository.save(t);
    }

    @Override
    public T update(T t) {
        //先查询,再更新
        T tData = this.queryByBid(t.getBid());
        if (ObjectUtil.isEmpty(tData)) {
            return null;
        }
        BeanUtil.copyProperties(t, tData, CopyOptions.create().ignoreNullValue().setIgnoreProperties("id", "bid"));
        return (T) this.repository.save(tData);
    }

    @Override
    public Boolean deleteByBid(Long bid) {
        return this.repository.deleteByBid(bid) > 0;
    }
}

然后将各个节点分别对应的继承 ServiceImpl<TRepository, TEntity> 实现 TService 如下

@Service
public class AgencyServiceImpl extends ServiceImpl<AgencyRepository, AgencyEntity> implements AgencyService {

}

树形结构的使用

在后台系统中,对于机构数据的展现需要通过树形结构展现,如下:

所以在com.sl.transport.service.OrganService中findAllTree()方法中封装了树形结构。 具体的封装逻辑采用hutool工具包中的TreeUtil,参考文档:点击查看 代码实现如下:

    @Override
    public String findAllTree() {
        List<OrganDTO> organList = this.findAll(null);
        if (CollUtil.isEmpty(organList)) {
            return "";
        }

        //构造树结构
        List<Tree<Long>> treeNodes = TreeUtil.build(organList, 0L,
                (organDTO, tree) -> {
                    tree.setId(organDTO.getId());
                    tree.setParentId(organDTO.getParentId());
                    tree.putAll(BeanUtil.beanToMap(organDTO));
                    tree.remove("bid");
                });

        try {
            return this.objectMapper.writeValueAsString(treeNodes);
        } catch (JsonProcessingException e) {
            throw new SLException("序列化json出错!", e);
        }
    }

数据类似这样:

[
    {
        "id": "1012438698496623009",
        "parentId": "0",
        "name": "上海市转运中心",
        "type": 1,
        "phone": null,
        "address": null,
        "latitude": null,
        "longitude": null,
        "managerName": null,
        "extra": null,
        "status": true,
        "children": [
            {
                "id": "1012479939628238305",
                "parentId": "1012438698496623009",
                "name": "浦东区分拣中心",
                "type": 2,
                "phone": null,
                "address": null,
                "latitude": null,
                "longitude": null,
                "managerName": null,
                "extra": null,
                "status": true
            }
        ]
    },
    {
        "id": "1012479716659037537",
        "parentId": "0",
        "name": "北京市转运中心",
        "type": 1,
        "phone": null,
        "address": null,
        "latitude": null,
        "longitude": null,
        "managerName": null,
        "extra": null,
        "status": true
    }
]

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录