
RabbitMQ 消息队列
RabbitMQ 消息队列
RabbitMQ是一种开源的消息中间件(Message-Oriented Middleware, MOM),它遵循高级消息队列协议(AMQP, Advanced Message Queuing Protocol)标准,用Erlang语言编写而成。RabbitMQ的核心功能是接受、存储和转发消息,作为消息系统,它允许不同的应用程序组件通过异步的方式进行通信,从而实现解耦、可靠传输、流量控制、负载均衡和错误恢复等目的。
什么是消息队列?
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
参与消息传递的双方称为 生产者 和 消费者 ,生产者负责发送消息,消费者负责处理消息。
消息队列有什么用?
通常情况下使用消息队列能为我们的系统带来下面三点好处:
通过异步处理提高系统性能(减少响应所需时间)
削峰/限流
降低系统耦合性
在项目中可能结合业务逻辑来使用
使用消息队列会带来哪些问题?
系统可用性降低:系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
系统复杂性提高:加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
一致性问题:我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
Producer(生产者) 和 Consumer(消费者)
Producer(生产者)
生产消息,投递消息的一方。
消息一般由消息头(Label)和 消息体(payLoad)组成。
消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Consumer(消费者)
消费消息,接收消息的一方。
消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
Exchange(交换器)
在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。
RoutingKey(路由键)
Producer(生产者)发送消息给Exchange(交换器)一般会指定消息的RoutingKey(路由键),用来配合交换器的路由规则,来决定这个消息发送给那些队列。
BindingKey(绑定建)
Exchange(交换器) 与 Queue(消息队列) 是通过Binding(绑定)来关联起来的,在绑定时,一般会指定一个BindingKey(绑定建),一个交换器可以绑定多个队列,消息被投递到那些队列是由路由规则来决定的。
RoutingKey(路由键)与BindingKey(绑定建)的使用依赖于交换器的类型。
比如direct类型中 RoutingKey = BindingKey 消息才会被交换器投递到队列中。
而fanout类型中会无视 RoutingKey 直接将消息路由到所有与该交换器绑定的队列中。
Exchange Types(交换器类型)
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种。
fanout
fanout 类型的 Exchange 路由规则是把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
在下图中,因为交换器是fanout类型,所以消息会直接被投递到 queue1与queue2中。
fanout简单图例
direct(默认)
direct 类型的 Exchange 路由规则是把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
在下图中如果消息的 RoutingKey 等于 BindingKey1,不等于BindingKey2,那么消息会被投递到 Queue1中。
direct简单图例
topic
topic 类型的交换器在direct 匹配规则上进行了扩展,Bindingkey 与 RoutingKey 的匹配更加灵活。
BindingKey 和 RoutingKey 可以是点号“.”分隔的字符串。
BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配。
“*”用于匹配一个单词
“#”用于匹配多个单词(可以是零个)
如 RoutingKey 是a.b.c,BindingKey 为 *.*.c 或 a.# 都可以匹配上。
topic简单图例
headers(不推荐)
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。headers 类型的交换器性能会比较差,基本上不会使用它。
Queue(消息队列)
Queue(消息队列) :RabbitMQ中消息队列用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走,队列遵循的是先进入出的原则。
订阅:消费者通过订阅队列,来对队列中的消息进行消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
Broker(消息中间件的服务节点)
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
RabbitMQ 的工作模式
简单模式
又称为一对一模式,即一个生产者发送消息到一个特定的队列,然后一个消费者从该队列中消费消息。
Work Queues(工作队列)
在这种模式下,多个消费者可以从同一个队列中获取消息,但每条消息只会被一个消费者消费,通常用于负载均衡和任务分发,确保消息按照顺序且仅被执行一次。
Publish/Subscribe(发布与订阅模式)
生产者发布消息到一个交换机,多个消费者可以订阅这个交换机,但不同于工作队列,同一消息会被所有感兴趣的消费者接收。
Routing(路由模式)
消息发送时会附带一个路由键,交换机会根据这个路由键决定消息应该投递到哪个或哪些队列中,消费者则监听这些队列来接收消息。
Topics(主题模式)
类似于路由模式,但是更灵活,消费者和交换机之间的绑定使用的是模式匹配,其中“#”可以匹配任意数量的单词,“*”可以匹配一个单词。
AMQP协议
RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。
AMQP 协议的三层:
Module Layer(协议最高层):主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer(中间层):主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
Transport Layer(最底层):主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
AMQP 模型的三大组件:
Exchange (交换器):消息代理服务器中用于把消息路由到队列的组件。
Queue(队列 ):用来存储消息的数据结构,位于硬盘或内存中。
Binding(绑定):一套规则,告知交换器消息应该将消息投递给哪个队列。
消息怎么传输?
由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。
信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。
就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。
如何保证消息的可靠性?
了解可靠性之前先要了解为什么不可靠。
消息丢失有三种情况,生产者到MQ丢失,MQ自己丢失,MQ到消费者丢失。
生产者到MQ解决方案:
发布者确认(Publisher Confirms):
生产者可以开启确认模式,当消息发送到 RabbitMQ 后,RabbitMQ 将会发送一个确认给生产者,表明消息已成功存储。若未收到确认,生产者可选择重新发送消息以确保消息送达。
事务(Transactions):
虽然较少使用,因为会影响性能,但在某些场景下,可以使用事务来确保一组消息要么全部成功发布到队列,要么全部不发布。
注意:confirms 与事务不可同时开启,会导致 RabbitMQ 报错。
MQ自己丢失解决方案
消息持久化:
生产者可以设置消息属性使消息持久化,这样即使 RabbitMQ 服务器发生崩溃或重启,消息也会被存储在磁盘上而不会丢失。
高可用性集群(Mirrored Queues and Clustering):
使用镜像队列或多节点集群,可以在多台机器之间复制消息,提高消息的持久性和系统整体的可用性。
MQ到消费者丢失
死信队列(Dead Letter Exchanges):
设置死信交换机和路由键,当消息在一个队列中达到最大重试次数仍然无法被正确消费时,可以转发到死信队列中,以便后续分析或特殊处理。
消费者确认(Consumer Acknowledgments):
消费者在接收到消息后,必须手动确认消息已经被正确处理。如果不进行确认,RabbitMQ 会在一定时间内认为消息未被正确处理,并可能重新将消息放回队列中让消费者再次尝试消费,防止消息丢失。这个机制也被称为 basicAck 机制。
有时候,可能需要人工介入,根据业务查日志,查询数据进行重导 MQ 等方法进行消息补偿。
如何保证消息的顺序性?
单个队列和单个消费者
交换器只绑定一个队列,这个队列也只有一个消费者,利用队列进行先出的原则,消费者将按照队列的顺序依次处理消息。
多个队列和单个消费者
交换器绑定多个队列,每个队列只有一个消费者,消息可以根据某种业务关联性或顺序标识符分布到对应的队列,然后各个消费者按照预先定义好的顺序依次处理各自队列中的消息。
如何保证高可用?
单机模式
Demo 级别。
普通集群模式
多个MQ节点组成普通集群,创建队列时,队列只存储在某一个节点上,其他节点会同步队列的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。
消费时,如果连接到了存储队列元素据的节点,那么这个节点会根据元数据找到队列实例将数据拉取过来。
这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个队列的读写操作。
镜像集群模式
镜像集群才是所谓的 RabbitMQ 的高可用模式。
在这个模式下,创建队列无论是元数据还是队列种的消息都会存储在每个节点中。
每次发送消息到队列时,所有节点都会进行同步。
优点:任何节点宕机了,其他节点包含完整数据,所以消费者可以从其他节点进行消费。
缺点:因为消息会同步到所有节点中,对网络带宽压力大,性能消耗也大。
如何解决消息过期失效问题?
RabbtiMQ 是可以设置过期时间的,也就是 TTL。
设置TTL后,如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,数据不会积压在MQ里,而是丢失。
合理设置TTL(Time to Live)
根据业务需求设置合适的消息过期时间(TTL),确保消息在有效期内被消费。如果消息的有效期设置得不合适,可能会导致消息过期后被自动删除,从而影响业务处理。调整消息或队列的TTL值,使其符合业务的实际需求。
死信队列(Dead Letter Queue, DLQ)
配置死信队列来捕获过期的消息。当消息在原始队列中过期时,可以配置RabbitMQ将其转发至死信队列,而不是直接丢弃。死信队列中的消息可以用于后续分析、备份或重试处理。
消息过期后的处理策略
根据业务进行补偿操作,比如查询数据后批量重导入MQ中。