RabbitMQ安装
笔者是在CentOS环境下使用Docker安装RabbitMQ,简单快捷。
查询镜像
docker search rabbitmq:management
➜ ~ docker search rabbitmq:management
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
macintoshplus/rabbitmq-management Based on rabbitmq:management whit python and… 10 [OK]
transmitsms/rabbitmq-sharded Fork of rabbitmq:management with sharded_exc… 0
yunyan2140/rabbitmq docker pull rabbitmq:management 0
➜ ~
获取镜像
docker pull rabbitmq:management
运行镜像
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
访问管理界面
运行成功后,访问http://[宿主机IP]:15672, 如何通过默认账号密码登录访问,账号和密码都是guest。
使用guest用户登录,可以看到RabbitMQ的管理界面,到这里就完成安装部署了。
RabbitMQ代码示例
服务端搭建完后,我们就要开始客户端的操作, 接下来用一个简单示例来演示一下,首先我们引入maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建一个公共配置类,用来共享一些配置,比如队列主题,交换机名称,路由匹配键名称等等。
public class RabbitMQConfig {
/**
* RabbitMQ的队列主题名称
*/
public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq.demo.topic";
/**
* RabbitMQ的DIRECT交换机名称
*/
public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbitmq.demo.direct.exchange";
/**
* RabbitMQ的DIRECT交换机和队列绑定的匹配键 DirectRouting
*/
public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq.demo.direct.routing";
/**
* RabbitMQ的DIRECT交换机��队列绑定的匹配键 DirectRouting(用于测试ReturnCallback)
*/
public static final String RABBITMQ_DEMO_DIRECT_ROUTING_RETURN_CALLBACK = "rabbitmq.demo.direct.routing.return";
/**
* RabbitMQ的DIRECT交换机名称(用于测试ConfirmCallback)
*/
public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE_CONFIRM_CALLBACK = "rabbitmq.demo.direct.exchange.confirm";
/**
* RabbitMQ的死信交换机名称
*/
public static final String RABBITMQ_DEAD_DIRECT_EXCHANGE = "rabbitmq.dead.direct.exchange";
/**
* RabbitMQ的死信交换机和队列绑定的匹配键 DirectRouting
*/
public static final String RABBITMQ_DEAD_DIRECT_ROUTING = "rabbitmq.dead.direct.routing";
}
然后在application.yml上配置RabbitMQ相应的配置信息,端口15672为网页管理,5672为AMQP端口
spring:
rabbitmq: #rabbitmq相关配置
host: 127.0.0.1
port: 5672
username: guest
password: guest
创建一个Direct交换机以及队列的配置
@Component
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue rabbitmqDemoDirectQueue() {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDemoDirectQueue())
//到交换机
.to(rabbitmqDemoDirectExchange())
//并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
}
然后创建一个发送消息的Service类
public interface RabbitMQService {
public String sendMsg(String msg);
}
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
//日期格式化
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH
ss");
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public String sendMsg(String msg) {
try {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
String sendTime = simpleDateFormat.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
然后我们写一个Controller来模拟业务发送消息。
@RestController
@RequestMapping("/test/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQService rabbitMQService;
/**
* 发送消息
*/
@PostMapping("/sendMsg")
public String sendMsg(@RequestParam(name = "msg") String msg) {
return rabbitMQService.sendMsg(msg);
}
}
这时我们在写消费者代码,创建消费者类,然后@RabbitListener注解写上监听队列的名称。
@Component
public class RabbitMQConsumer {
enum Action {
//处理成功
SUCCESS,
//可以重试的错误,消息重回队列
RETRY,
//无需重试的错误,拒绝消息,并从队列中删除
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(Map msg, Message message, Channel channel) {
System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + JSON.toJSONString(msg));
}
}
然后我们启动程序,通过idea自带的测试工具访问接口
然后我们在RabbitMQ管理界面可以看到刚创建的队列,和一条等待消费的消息。
然后我们转向idea看一下Consumer消费消息,至此我们的小demo演示就到这里了。
评论( 0 )