小蔡学Java

RabbitMQ进阶知识

2024-06-29 20:29 975 0 消息中间件 RabbitMQ消息中间件消息可靠性消费

如何保证消息的可靠性

在生产环境中可能会有一些突发原因导致rabbitMQ重启,那么在RabbitMQ重启时生产者消息投递失败,导致消息丢失,这些就需要手动去处理和恢复。那么我们如何去保障消息可靠的传递?无法投递的消息又该如何处理?

从上图可以看出,消息投递需要经过以下三个对象参与:

  • 生产者
  • broker
  • 消费者
  • 生产者发送消息到broker时,需要保证消息的可靠性,那么主要采用以下两种方案:
  • confirm确认模式
  • return 退回模式

confirm确认模式

confirm确认模式是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答,如何生产者接收到应答后,用来确认这条消息是否正常发送到broker。一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知。下面我们用代码来演示一下confirm确认模式。

首先开启confirm机制在application.yml配置publisher-confirm-type

	spring:
	  rabbitmq: #rabbitmq相关配置
		host: 127.0.0.1
		port: 5672
		username: guest
		password: guest
		publisher-confirm-type: correlated   #开启confirm机制

设置rabbitTemplate的confirmCallback回调函数

	public String sendMsgConfirmCallback(String msg) {
			try {
				String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
				String sendTime = simpleDateFormat.format(new Date());
				Map<String, Object> map = new HashMap<>();
				map.put("msgId", msgId);
				map.put("sendTime", sendTime);
				map.put("msg", msg);
				rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
					@Override
					public void confirm(CorrelationData correlationData, boolean ack, String cause) {
						if(!ack){
							System.out.println("系统繁忙请重新发送");
						}
					}
				});
				//错误的交换机用来测试发送到交换机消息失败
				rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE_CONFIRM_CALLBACK, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
				return "ok";
			} catch (Exception e) {
				e.printStackTrace();
				return "error";
			}
		}

controller层代码

	/**
		 * 发送消息
		 */
		@PostMapping("/sendMsgConfirmCallback")
		public String sendMsgConfirmCallback(@RequestParam(name = "msg") String msg) {
			return rabbitMQService.sendMsgConfirmCallback(msg);
		}

然后我们用idea自带http测试工具执行下,可以看到打印日志

return回退模式

ReturnListener用于处理一些不可路由的消息,例如某些情况,我们发送消息,但是当前exchange不存在或者指定的RoutingKey路由不到,那么这个时候我们就需要监听这种路由不可达的消息,然后通知生产者。它只会通知失败,不会通知成功,如果消息正确路由到队列,则发布者不会受到任何通知,因此它无法确保消息一定成功,因为路由到队列的消息可能会丢失。下面我们用代码来演示一下return回退模式。

在application.yml配置publisher-returns

	spring:
	  rabbitmq: #rabbitmq相关配置
		host: 127.0.0.1
		port: 5672
		username: guest
		password: guest
		publisher-returns: true #开启return机制  用来捕捉虚拟主机向队列中传递信息错误

设置ReturnsCallback回调函数

	public String sendMsgReturnCallback(String msg) {
			try {
				String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
				String sendTime = simpleDateFormat.format(new Date());
				Map<String, Object> map = new HashMap<>();
				map.put("msgId", msgId);
				map.put("sendTime", sendTime);
				map.put("msg", msg);
				rabbitTemplate.setMandatory(true);
				rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
					@Override
					public void returnedMessage(ReturnedMessage returned) {
						//当交换机发送消息到队列过程中失败启动当前方法
						System.out.println(returned.getReplyCode());
					}
				});
				//错误的路由键用来测试发送到queue消息失败
				rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING_RETURN_CALLBACK, map);
				return "ok";
			} catch (Exception e) {
				e.printStackTrace();
				return "error";
			}
		}

	controller层代码
	java 代码解读复制代码   /**
		 * 发送消息
		 */
		@PostMapping("/sendMsgReturnCallback")
		public String sendMsgReturnCallback(@RequestParam(name = "msg") String msg) {
			return rabbitMQService.sendMsgReturnCallback(msg);
		}

然后我们用idea自带http测试工具执行下,可以看到打印日志

持久化

前面我们分析了生产者如何保障消息的可靠性传递,那么在broker中它是怎么保证消息的可靠性的呢? 假设生产者已经将消息传递到交换机,而交换机也成功将消息路由到对应的队列中,此时mq重启了,那么消息还在吗? 为了防止这种情况发生,我们可以开启RabbitMQ的持久化,即消息写入后会持久化到磁盘,此时即使MQ挂了,重启之后也会自动读取之前存储的数据。

1、持久化队列,durable设置为true

	@Bean
		public Queue rabbitmqDemoDirectQueue() {
			/**
			 * 1、name:    队列名称
			 * 2、durable: 是否持久化
			 * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
			 * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
			 * */
			return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
		}

2、持久化交换机,durable设置为true

	@Bean
		public DirectExchange rabbitmqDemoDirectExchange() {
			//Direct交换机
			return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
		}

3、发送持久化消息,设置deliveryMode=2;SpringBoot的话,发送消息时自动设置deliveryMode=2。注意如果消息如果不设置过期时间默认为持久化。

设置好持久化参数后,可以在管理界面上查看到队列的Features为D。

消费方消息可靠性

1、ACK确认机制(手动确认)

消费者接收到消息,但是还未处理或者还未处理完消息,此时消费者挂了,此时mq会认为消费者已经完成消息消费,就会从队列中删除这些消息,从而导致消息丢失。

上诉情况要怎么解决呢?通过RabbitMQ提供的ACK确认机制,RabbitMQ默认是自动ack,此时需要改为手动ack确认(即自己的程序确定消息已经处理完成后,手动提交ack),此时遇到上述情况,由于没有提交ack,MQ就不会删除这条消息,而会将消息发送给其他消费者进行消费,避免消息丢失。

2、ACK的实现 在application.yml中添加acknowledge-mode: manual

	spring:
	  rabbitmq: #rabbitmq相关配置
		listener:
		  simple:
			acknowledge-mode: manual # 手动ack

消费者实现

	@RabbitHandler
		@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
		public void process(Map msg, Message message, Channel channel) throws IOException {

			long deliveryTag = message.getMessageProperties().getDeliveryTag();
			byte[] body = message.getBody();
			String s = new String(body);
			logger.info("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + JSON.toJSONString(msg));
			try {
				logger.info("消息的内容:" + s);
				/*
				 * basicAck:确认消息 -- rabbit服务端删除
				 * long deliveryTag,第一个参数为消息的标志
				 * boolean multiple 第二个参数为是否把该消费之前未确认的消息一起确认掉
				 * */
				channel.basicAck(deliveryTag, true);
			} catch (Exception e) {
				/*
				 * basicNack:服务继续发送消息
				 * long deliveryTag, boolean multiple, 前两个参数与上面的意义一样
				 * boolean requeue 是否要求rabbitmq服务器重新发送该消息
				 * */
				channel.basicNack(deliveryTag, true, true);
			}
		 }

延迟队列

延迟队列指消息进入队列中并不会马上被消费者消费而是到指定时间才会被消费者消费。 RabbitMQ本来并没有延迟队列,但是可以依靠TTL(消息过期时间)和死信队列来实现消息延迟。

TTL(Time To Live)即消息过期时间,是RabbitMQ中一个消息或者队列的属性,单位是毫秒。当消息设置了TTL或者进入设置TTL的队列中时,在TTL设置的时间内没被消费则变为‘死信’;如果不设置TTL,则表示消息永远不会过期;如果TTL为0,则表示此时可以马上投递该消息到消费者,否则直接丢弃。下面我们看一下TTL设置案例

设置消息TTL

	public String sendMsgExpiration() {
			try {
				Message message = new Message("zayton".getBytes());
				message.getMessageProperties().setExpiration("5000");
				rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message);
				return "ok";
			} catch (Exception e) {
				e.printStackTrace();
				return "error";
			}
		}

设置队列TTL

	@Bean
		public Queue rabbitmqDemoDirectQueue() {
			Map<String,Object> map = new HashMap<>();
			map.put("x-message-ttl",5000);
			/**
			 * 1、name:    队列名称
			 * 2、durable: 是否持久化
			 * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
			 * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
			 * */
			return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false,map);
		}

死信队列

说到延时队列就不得不说下死信队列。一般来说,生产者将消息投递到队列中,消费者会从队列中取出消息进行消费,但是有时候由于某些原因导致队列中的消息无法被消费,这样的消息如果没有被后续处理就会变成死信,当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX(Dead-Letter-Exchange),即死信交换器。 导致死信的几种原因

  • 消息 TTL 过期
  • 消息被拒绝( basicReject /basicNack),且requeue=false
  • 队列满了,无法添加消息

队列绑定死信交换机:

代码示例

	@Bean
		public Queue rabbitmqDemoDirectQueue() {
			Map<String, Object> map = new HashMap<>(2);
			// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
			map.put("x-dead-letter-exchange", RabbitMQConfig.RABBITMQ_DEAD_DIRECT_EXCHANGE);
			// x-dead-letter-routing-key 这里声明当前队列的死信路由key
			map.put("x-dead-letter-routing-key", RabbitMQConfig.RABBITMQ_DEAD_DIRECT_ROUTING);
			/**
			 * 1、name:    队列名称
			 * 2、durable: 是否持久化
			 * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
			 * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
			 * */
			return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false,map);

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录