
消息中间件-RabbitMQ
RabbitMQ轻巧,易于在内部和云中部署。它支持多种消息传递协议。 RabbitMQ可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。 RabbitMQ可在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。
RabbitMQ
RabbitMQ 是一个开源的消息代理软件(亦称面向消息的中间件),它支持多种消息协议。RabbitMQ服务器是用Erlang语言编写的,而客户端可以用任何语言来实现,包括Java、.NET、Ruby、PHP、Python、JavaScript等。
| 特性 | RabbitMQ | ActiveMQ | RocketMQ | --- |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP0perRESTXMPP,AMOP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
几个主要的概念
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
- exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- queue:消息队列,消息最终被送到这里等待消费者消费。
- virtual host:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
- binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
- Broker:简单来说就是消息队列服务器实体。
安装
下载地址:https://www.rabbitmq.com/download.html Erlang安装:https://www.erlang-solutions.com/downloads/
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v
# 安装socat
yum install -y socat
# 下载rabbitmq
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.13/rabbitmq-server-3.8.13-1.el8.noarch.rpm
rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm
# 启动rabbitmq服务
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 开机启动服务
systemctl enable rabbitmq-server
RabbitMQ 在安装完毕以后,会绑定一些端口,如果你购买的是阿里云或者腾讯云相关的服务器一定要在安全组中把对应的端口添加到防火墙。
RabbitMQ管理界面
默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效
rabbitmq-plugins enable rabbitmq_management
# 说明:rabbitmq有一个默认账号和密码是:guest 默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。
systemctl restart rabbitmq-server
账户授权 可以在可视化界面的Admin下操作相关账户信息
# 添加用户
rabbitmqctl add_user admin admin
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 查看用户
rabbitmqctl list_users
# administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
# monitoring 监控者 登录控制台,查看所有信息,不可以对rabbitmq进行管理
# policymaker 策略制定者 登录控制台,查看所有信息,可以对策略进行管理
# managment 普通管理员 登录控制台,无法看到节点信息,只能看到自己相关节点信息
使用
依赖
<!-- Java原生依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<!-- spring依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<!-- springboot依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
示例代码
定义生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("your_ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,优秀靓仔!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
// 分发多个消息
// channel.basicPublish("", "queue2", null, message.getBytes());
// channel.basicPublish("", "queue3", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

定义消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("47.104.141.27");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,优秀靓仔!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息发送成功!");
// 订阅消息
channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
}
},new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
})
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
使用场景
以下订单为例MQ的使用方式
- 同步:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
- 并行:异步线程,同时执行任务,任务完成后,通知用户。减少了用户等待的时间,提高了系统的吞吐量
- 问题1:耦合度高
- 问题2:需要自己写线程池自己维护成本太高
- 问题3:如果其中一个任务执行失败,需要手动处理异常,增加了代码的复杂度
- 问题4:如何保证消息的可靠性你自己写,如果服务器承载不了,你需要自己去写高可用
- 异步消息队列:
- 好处1:完全解耦,用MQ建立桥接
- 好处2:有独立的线程池和运行模型
- 好处3:出现了消息可能会丢失,MQ有持久化功能
- 好处4:如何保证消息的可靠性,死信队列和消息转移的等
- 好处5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
public void makeOrder(){
// 1 :保存订单
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息内容");
}
逻辑流程
基本消息队列的消息发送流程: 1.建立connection 2.创建channel 3.利用channel声明队列: 4.利用channel向队列发送消息 5.关闭channel和connection
基本消息队列的消息接收消费流程: 1.建立connection 2.创建channel 3.利用channel声明队列 4.定义consumer的消费行为handleDelivery() 5.利用channel将消费者与队列绑定 6.关闭channel和connection 
SpringAMQP
Spring AMQP是基于AHQP协议定义的一套API规范(与语言和平台无关),提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现,
流程如下: 1.在父工程中引入spring-amqp的依赖:spring-boot-starter-amqp 2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列 3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列
生产者和消费者的配置文件基本一致
# application.yml
spring:
rabbitmq:
host: 47.104.141.27 # rabbitmq的ip地址 # 消费者账户调整
port: 5672 # 消费者账户调整
virtual-host: / # rabbitmq的虚拟host
username: admin # 消费者账户调整
password: admin # 消费者账户调整
virtual-host: /
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
workQueue
多个消费者绑定到同一个队列,队列中的消息以轮询的方式发送给消费者。Work Queues(工作队列)模式分为两种,一种是轮询分发,另一种是公平分发。
- 轮询模式的分发:(默认方式)一个消费者一条,按均分配;
- 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
要解决平均分配的问题要在配置中设置
# application.yml
spring:
rabbitmq:
# ....
listener:
simple:
prefetch: manual # 设置每次只能接收一条消息,处理完成再接收下一条
// 生产者
// .....
public class RabbitWorkQueue {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertSend("work.queue", "work queue message " + i);
}
}
}
// 消息消费者
// .....
public class RabbitWorkQueueConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String WORK_QUEUE = "work.queue";
@RabbitListener(queues = WORK_QUEUE)
public Queue workQueue(String message) {
System.out.println("消费者1接收到消息:" + message);
}
// 定义多个消费者
@RabbitListener(queues = WORK_QUEUE)
public Queue workQueue2(String message) {
System.out.println("消费者2接收到消息:" + message);
}
}
发布订阅
Rabbitmq 消息队列默认是一个消息只能给一个消费者消费,消费之后会在队列中移除,如果想要实现一个消息被多个消费者消费,就需要使用到发布订阅模式。 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)
- Fanout Exchange: 广播,将消息发送给所有绑定队列,无routingkey的概念
- Direct Exchange: 按照routingkey分发到指定队列
- Topic Exchange: 多关键字匹配
- Headers Exchange: 按照消息中的headers属性进行匹配
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失
Fanout Exchange
队列消息通过交换机发送给消费者,交换机下的消费者都能接收到消息

// 生产者
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 定义交换机名称
String exchangeName = "fanout exchange";
rabbitTemplate.convertSend(exchangeName, "", "fanout message");
}
}
// 消费者
@Configration
public class FanoutConfig {
// 声明交换机
@Bean
public Exchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
// 声明队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
// 绑定队列和交换机
@Bean
public Binding binding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
// 消费者接收的代码不变
Direct Exchange
队列消息设置绑定的值通过交换机发送给对应消费者,交换机下的消费者通过指定的绑定值接收到消息,若是消费者绑定的多个值,则都能接收到消息,例如下面的绑定值red

// 生产者
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 定义交换机名称
String exchangeName = "fanout exchange";
String routingKey = "red";
rabbitTemplate.convertSend(exchangeName, routingKey, "fanout message");
}
}
// 消费者
public class DirectConfig {
//
@RabbitListener(bingdings = @QueueBinding(
// 声明消息队列
value = @Queue(name = "direct.queue1"),
// 声明交换机
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
// 绑定的值
key = {"red", "blue"}
))
public void listenQueue1() {
System.out.println("消费者1接收到消息");
}
//
@RabbitListener(bingdings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenQueue2() {
System.out.println("消费者2接收到消息");
}
}
// 消费者接收的代码不变
Topic Exchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。 例如:order.cancel、order.sucess、order.fail表示订单的类型。common.product、activity.product、free.product表示不同类型的商品。
order.#:表示所有的订单类型#.product:表示所有的商品

// 生产者
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 定义交换机名称
String exchangeName = "topic exchange";
String routingKey = "order.sucess";
rabbitTemplate.convertSend(exchangeName, routingKey, "topic message");
}
}
// 消费者
public class TopicConfig {
//
@RabbitListener(bingdings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "order.#"
))
public void listenTopicQueue1() {
System.out.println("Topic消费者1接收到消息");
}
//
@RabbitListener(bingdings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "#.product"
))
public void listenTopicQueue2() {
System.out.println("Topic消费者2接收到消息");
}
}
// 消费者接收的代码不变
RabbitMQ面试
Rabbitmq 为什么需要信道,为什么不是TCP直接通信
1、TCP的创建和销毁,开销大,创建要三次握手,销毁要4次分手。
2、如果不用信道,那应用程序就会TCP连接到Rabbit服务器,高峰时每秒成千上万连接就会造成资源的巨大浪费,
而且==底层操作系统每秒处理tcp连接数也是有限制的,==必定造成性能瓶颈。
3、信道的原理是一条线程一条信道,多条线程多条信道同用一条TCP连接,一条TCP连接可以容纳无限的信道,
即使每秒成千上万的请求也不会成为性能瓶颈。