# 基本概念
- MQ全称 Message Queue(消息队列),是消息传输过程中保存消息的容器
- 多用于分布式系统之间进行通信
- 发送方称为生产者,接收方称为消费者
# MQ的优势
- 应用解耦:解除系统之间的相互作用,提高系统容错性和可维护性
- 异步提速:提升用户体验和系统吞吐量(单位时间内处理请求的数目)
- 削峰填谷:限制消费消息的速度,提高系统稳定性
# MQ的劣势
- 系统可用性降低:系统引入的外部依赖越多,稳定性越差。一旦MQ宕机,就会对业务造成影响
- 系统复杂度提高:通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
- 一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
# 常见的MQ产品
# AMQP
即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP
# JMS
- JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是 Java 中关于面向消息中间件的API
- JMS 是 JavaEE 规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范,如ActiveMQ,官方没有提供RabbitMQ的JMS实现包,开源社区有
# RabbitMQ
2007年,Rabbit 技术公司基于 AMQP 协议开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发
Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛
# RabbitMQ 基础架构
Producer:# 生产者
Consumer:# 消费者
Broker:# 接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:# 数据隔离的作用,默认为 /
# 多个用户使用同一个 RabbitMQ server 时,可以划分出多个vhost
# 每个用户在自己的 vhost 创建 exchange/queue
Exchange:# 交换机,匹配查询表中的 routing key,并根据分发规则将消息分发到queue中去。常用的类型有
+ fanout (multicast) # 扇形,广播模式
+ direct (point-to-point) # 点对点模式
+ topic (publish-subscribe) # 发布订阅模式
Queue:# 消息最终被送到这里等待 consumer 取走
Binding:# exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key
Connection:# publisher/consumer 和 broker 之间的 TCP 连接
Channel:# 是Connection内部建立的连接,若应用程序支持多线程,则每个线程创建单独的channel进行通讯
# RabbitMQ 安装配置
RabbitMQ 官网 (opens new window)
- Windows 安装
//安装erlang并配置环境变量
//新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址
//双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中
erl //验证erlang是否安装成功
//安装RabbitMQ
//安装RabbitMQ-Plugins
//RabbitMQ的sbin目录
E:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin
//然后输入以下命令进行安装
rabbitmq-plugins enable rabbitmq_management
//验证rabbitmq是否安装成功
rabbitmqctl status
//打开浏览器,地址栏输入mq访问地址
http://127.0.0.1:15672 //用户名和密码,都为guest
//问题解决:TCP connection succeeded but Erlang distribution failed
//是Erlang新版本的cookie位置换了
C:\Windows\System32\config\systemprofile
//这里有一个.erlang.cookie,复制这个文件到C:\Users\你的用户名下
- Linux 安装
# 安装依赖环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
# 安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
# 安装RabbitMQ
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
# 开启管理界面及配置
rabbitmq-plugins enable rabbitmq_management
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 添加 [{rabbit, [{loopback_users, []}]}]
# 工作模式
# 简单模式
# 一个生产者 对一个 消费者,不需要设置交换机(使用默认的交换机)
# Work queues 工作队列模式
# 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
# 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度,解决消息堆积
注意
默认情况下,RabbitMQ会将消息依次投递给每个消费者,没有考虑消费者是否已经处理完消息,可能出现消息堆积。 因此需要设置preFetch为1,确保同一时刻最多投递给消费者1条消息:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
# Pub / Sub 订阅模式
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到
P:# 生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:# 消费者,消息的接收者,会一直等待消息到来
Queue:# 消息队列,接收消息、缓存消息
Exchange:# 交换机(X),接收生产者发送的消息,并根据分发规则将消息分发到queue中去。常用的类型有
+ Fanout:# 广播,将消息交给所有绑定到交换机的队列
+ Direct:# 定向,把消息交给符合指定routing key 的队列
+ Topic:# 通配符,把消息交给符合routing pattern(路由模式) 的队列
注意
Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
# Routing 路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
P:# 生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
X:# Exchange(交换机)接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
C1:# 消费者,其所在队列指定了需要 routing key 为 error 的消息
C2:# 消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
# Topic 通配符模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型中的 Exchange 可以让队列在绑定 Routing key 的时候使用通配符
# Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 通配符规则:# 匹配一个或多个词,例如:item.# 能够匹配 item.insert.abc或者 item.insert
# * 匹配1个词,例如:item.* 只能匹配 item.insert
# 整合实例
# 用@Bean的方式创建绑定关系
- 引入依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写yml配置,基本信息配置
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
virtual-host: /
- 定义交换机,队列以及绑定关系的配置类
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME ="boot_queue";
//1、交换机 durable:是否持久化
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2、队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3、队列和交换机绑定 noargs:没有参数
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange){
//如果需要设置多个routingkey则需要添加多个bindQueueExchange方法
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
//或者
@Bean
public Binding bindQueueExchange(){
return BindingBuilder.bind(bootQueue()).to(bootExchange()).with("boot.#").noargs();
}
}
- 注入RabbitTemplate,调用方法,完成消息发送
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","hello world");
}
- 消费者:定义监听类,使用@RabbitListener注解完成队列监听
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues ={"boot_queue"})
public void listenerQueue(Message message){
System.out.println(message);
}
}
# 另一种方式:只配置消费者
//多个routingkey通过key={}数组配置
@RabbitListener(bindings = { @QueueBinding(
value = @Queue(value="queue",durable = "true"),
exchange = @Exchange(value = "exchange",durable = "true"),
key = {"red","blue"}
)})
public void handleMessage(byte[] message){
System.out.println("消费消息");
System.out.println(new String(message));
}
# 消息转换器
如果发送的消息类型为对象类型时,获取到的消息是转换后的可读性比较差
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
Map<String,Object> msg = new HashMap<>();
msg.put("name","jack");
msg.put("age",21);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha",msg);
}
建议采用JSON序列化代替默认的JDK序列化,需要做两件事:
- 在producer和consumer中都要引入jackjson依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 在producer和consumer中都要配置MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
# 生产者可靠性
# 生产者重连
有时候可能出现客户端连接MQ失败的情况,通过配置失败重连机制解决:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true #开启超时重连
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时常=initial-interval*multiplier
max-attempts: 3 #最大重试次数
注意
Spring AMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,后续业务代码不会执行,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
# 生产者确认
RabbitMQ作为消息发送方为我们提供了两种方式用来控制消息的投递可靠性模式:
# RabbitMQ 整个消息投递的路径: producer—>rabbitmq broker—>exchange—>queue—>consumer
# 消息从 producer 到 exchange 则通过confirmCallback,成功返回ACK,失败返回NACK
confirm # 确认模式
# 消息从exchange–>queue投递失败会通过returnCallback返回路由异常原因,然后返回ACK,告知投递成功
return # 退回模式
spring:
rabbitmq:
# 开启确认模式
# none关闭确认模式、simple同步阻塞等待MQ回执消息、correlated异步调用返回回执消息
publisher-confirm-type: correlated
publisher-returns: true # 开启退回模式
注意
生产者确认需要额外的网络和系统资源开销,尽量不要使用。
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题。
- 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
@Slf4j
@Configuration
public class RabbitMQConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.debug("收到的return callback, exchange:{},key:{},msg:{},code:{},text:{}",
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText());
}
});
}
}
- ConfirmCallback需要在每次发送消息时指定
@Test
public void testSend(){
//创建消息的唯一id,根据具体的消息进行回调
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.debug("消息回调失败:"+ ex.getMessage());
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if(result.isAck()){
log.debug("消息回调成功,收到ACK");
}else{
log.debug("消息回调失败,收到NACK,原因:"+result.getReason());
}
}
});
Map<String,Object> msg = new HashMap<>();
msg.put("name","jack");
msg.put("age",21);
//需要增加cd参数
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha", msg, cd);
}
# 消息的可靠性
默认情况下,RabbitMQ会将消息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
# 数据持久化
RabbitMQ实现数据持久化包括3个方面:
- 交换机持久化(Durable属性,Spring默认设置为Durable)
- 队列持久化(Durable属性,Spring默认设置为Durable)
- 消息持久化(发送消息时设置delivery_mode=2persisent,Spring发送的消息默认是持久化的)
@Test
void testPageOut(){
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
for (int i = 0; i < 1000000; i++) {
rabbitTemplate.convertAndSend("simple.queue",message);
}
}
注意
非持久化,会出现paged out,会阻塞IO,性能下降
# 惰性队列 Lazy Queue
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
注意
在3.12版本后,所有队列都是Lazy Queue模式,无法更改
3.12之前需要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
//创建惰性队列
@Bean
public Queue lazyQueue(){
return QueueBuilder.durable("lazy.queue").lazy().build();
}
//基于注解创建
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到lazy.queue的消息:{}",msg);
}
# 消费者可靠性
保证消费者的可靠性主要有三种手段:消费者确认机制、消费失败处理、业务幂等性
# 消费者确认机制
当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知消息处理状态,回执有三种可选值:
ack # 成功处理消息,RabbitMQ从队列中删除该消息
nack # 消息处理失败,RabbitMQ需要再次投递消息
reject # 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(一般是消息的参数不正确)
Spring AMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
- 开启消费者确认其机制
spring:
rabbitmq:
listener:
direct:
# none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
# manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
# auto:自动模式。Spring AMQP利用AOP对我们的消息处理逻辑做了环绕增强
# 当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果
# 如果是业务异常,会自动返回nack
# 如果是消息处理或校验异常,自动返回reject
acknowledge-mode: auto
- 消费者业务模拟异常
@RabbitListener(queues = "simple.queue")
public void listSimpleQueue(String msg) {
System.out.println("消费者收到了simple.queue的消息:【" + msg + "】");
throw new RuntimeException("测试异常");
}
结果
auto模式下,消息未被处理会保留,并会一直尝试重新投递给消费者
# 消息失败处理策略
当消费者出现异常后,消息会不断重新入队,再重新发送给消费者,然后再次异常,无限循环,导致mg的消息处理飙升,带来不必要的压力
# 消费者配置文件
spring:
rabbitmq:
listener:
# 需要设置为simple
simple:
acknowledge-mode: auto #none:关闭ack;manual:手动ack;auto:自动ack
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始的失败等待时长为1秒
multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer # 重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer # 重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer # 重试耗尽后,将失败消息投递到指定的交换机
# 业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的: f(x)= f(f(X)。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
保证业务幂等性的方案:
- 方案一:是给每个消息都设置一个唯一id,利用id区分是否是重复消息
@Bean
public MessageConverter jacksonMessageConvertor() {
//1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
//2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
- 方案二:结合业务逻辑,基于业务本身做判断
# 以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态
# 判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理
- 方案三:使用Token令牌,生成一个token存储在redis中
# 请求的时候携带这个token一起请求,后端需要对这个Token作为 Key在redis中进行校验
# 如果 Key存在就执行删除命令,然后正常执行后面的业务逻辑
# 如果不存在对应的 Key 就返回重复执行的错误信息,这样来保证幂等操作
# 延迟的消息
生产者发送消息后,消费者不会立刻收到消息,而是在指定时间之后才收到,延迟消息的实现有两种:
- 死信交换机
- 延迟消息插件
# 死信交换机
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)
# 原理:
# 创建一个过期时间为30s的消息,投递到队列,由于simple.queue没有消费者,30会过期,
# 就会投递到死信交换机,死信队列绑定的消费者30s后就会收到消息
# 需要定义较多的死信交换机和队列,比较繁琐 ,适合用作一个兜底方案处理死信
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
public static final String DEAD_EXCHANGE_NAME = "dead_topic_exchange";
public static final String DEAD_QUEUE_NAME = "dead_queue";
//1、交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2、队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME)
//声明该队列的死信消息发送到的死信交换机
.withArgument("x-dead-letter-exchange",DEAD_EXCHANGE_NAME)
//声明该队列死信消息在交换机的路由键
.withArgument("x-dead-letter-routing-key", "dead.#")
// 设置过期时间
.withArgument("x-message-ttl", 2000L)
.build();
}
//3、队列和交换机绑定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
//1、死信交换机
@Bean("deadExchange")
public Exchange deadExchange() {
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).durable(true).build();
}
//2、死信队列
@Bean("deadQueue")
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
//3、死信队列和交换机绑定
@Bean
public Binding bindDeadQueueExchange(@Qualifier("deadQueue") Queue queue,
@Qualifier("deadExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("dead.#").noargs();
}
//消费者
@RabbitListener(queues ={"dead_queue"})
public void listenerQueue(Message message){
System.out.println(message);
//throw new RuntimeException("测试异常");
}
# 延迟消息插件(推荐使用)
该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列
- 点击下载 (opens new window)与RabbitMQ版本对应的延迟消息插件
- 将下载的插件存放在 D:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.6\plugins 目录下
- 打开RabbitMQ sbin控制台,输入
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 安装成功后打开 RabbitMQ Management,看到一下场景即为安装成功
- 创建提供消息延迟功能的交换机
//创建交换机和队列,以及消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "delay.queue", durable = "true"),
exchange = @Exchange(value = "delay.direct", delayed = "ture"),
key = "hi"
))
public void listenDelayQueue(String msg) {
log.info("接收到delay.queue的消息:{}", msg);
}
//发送者发送消息
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
});
log.info("消息发送成功!!");
}