`
uule
  • 浏览: 6310237 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

RabbitMQ总结

 
阅读更多

基础:

RabbitMQ使用总结(JAVA)

消息队列探秘-RabbitMQ消息队列(详细)

三分钟带你快速了解RabbitMQ,掌握RabbitMQ入门秘籍

 

使用配置:

RabbitMQ 配置文件详解(生产者和消费者)

RabbitMQ 配置,生产者和消费者

 

安装:

RabbitMQ集群之安装+各种命令

(一)CentOS7 安装 RabbitMQ v3.5.6

RabbitMQ 普通集群 & 镜像集群 搭建

 

应用:

SpingBoot中集成RabbitMQ  

 

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。



 

 

 

broker,原话是RabbitMQ isn’t a food truck, it’s a delivery service,其实说白了,就是一种传输服务。

 

Exchange: 【接受生产者发送的消息】,并根据Binding规则【将消息路由给服务器中的队列】。

ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

 

Message Queue: 消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

 

Binding Key:它表示的是【Exchange与Message Queue是通过binding key进行联系的】,这个关系是固定的,初始化的时候,我们就会建立该队列。

 

Routing Key:【生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则】。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

简单来说就是:路由关键字,exchange根据这个关键字进行消息投递。

 

routing key为一个句点号“.”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串

 binding key中有两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

 

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。

 

Channel:信道,多路复用连接中的一条独立的双向数据流通道。【它建立在上述的TCP连接中。数据流动都是在Channel中进行的】。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是【发布消息、订阅队列还是接收消息,这些动作都是通过信道完成】。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

 

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。 

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

 

vhost: 虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

 

初始化的时候,exchange与各个队列的绑定关系是通过binding key进行绑定的;

发送消息的时候,使用的routing key就是binding key的某一个(实质,两者是一个含义,角度不同,名称含义不同)

 

对于消费端来说,只用知道MQ的virtual host 和queue的名称就可以了。

而对于发送端,则需要知道exchange和routing key的名称,相对而言queue的名称就不那么重要了(不过也要依Exchange Type而定)。

 

 

三种ExchangeType

http://www.rabbitmq.com/tutorials/amqp-concepts.html

http://www.365yg.com/i6598820210227020296/#mid=1586721753620493

 

这里介绍三种最主要的类型的exchange:direct、fanout和topic。

 

1、direct交换器

Direct交换器很简单,如果是Direct类型,就会将【消息中的RoutingKey】与【该Exchange关联的所有Binding中的BindingKey】进行比较,如果相等,则发送到该Binding对应的Queue中。

有一个需要注意的地方:【如果找不到指定的exchange,就会报错】。但【routing key找不到的话,不会报错,这条消息会直接丢失】,所以此处要小心。

 

2、fanout交换器

Fanout 扇出,顾名思义,就是像风扇吹面粉一样,吹得到处都是。如果使用fanout类型的exchange,那么routing key就不重要了。因为【凡是绑定到这个exchange的queue,都会受到消息】。

 

3、topic交换器

direct是将消息放到exchange绑定的一个queue里(一对一);

fanout是将消息放到exchange绑定的所有queue里(一对所有)

 

那可不可以把消息放到exchange绑定的一部分queue里,或者多个routing key可以路由到一个queue里呢?

 

topic类型的exchange就可以实现(一对部分)。

topic应用场景:打印不同级别的错误日志

例如,我们的系统出错后会根据不同的错误级别生成error_levelX.log日志,我们在后台首先要把所有的error保存在一个总的queue(绑定了一个*.error的路由键)里,然后再按level分别存放在不同的queue。

 

 

RabbitMQ 消息分发轮询

(1)消费者consumer采用轮询机制;把消息依次分发

(2)假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理?(可以模拟消费端断了,分别注释和不注释 no_ack=True 看一下)你没给我回复确认,就代表消息没处理完。

 

(3)消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢?因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。

 

Prefetch count

上面的模式只是依次分发,实际情况是机器配置不一样。怎么设置类似权重的操作?

 

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时【如果每个消息的处理时间不同】,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过【设置prefetchCount来限制Queue每次发送给每个消费者的消息数】,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

 

只需要在消费者端,channel.basic_consume前加上就可以了。

channel.basic_qos(prefetch_count=1)  # 类似权重,按能力分发,如果有一个消息,就不在给你发

 

 

Message acknowledgment

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。

 

【默认情况下,RabbitMQ会顺序的分发每个Message。当每个【收到ack后,会将该Message删除】,然后将下一个Message分发到下一个Consumer】这种分发方式叫做round-robin。这种分发还有问题。

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,【每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了】。

 

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。【为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack】。而应该是在【处理完数据后发送ack】。(在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了,【如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer】。这样就保证了在Consumer异常退出的情况下数据也不会丢失)

 

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;【如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理】。【这里不存在timeout概念】,【一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开】。

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

 

RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

 

 

RabbitMQ 消息持久化(durable、properties)

rabbitmqctl list_queues # 查看当前queue数量及queue里消息数量

 

如果队列里还有消息,RabbitMQ服务端宕机了呢?消息还在不在?

下面看看如何把消息持久化。【每次声明队列的时候,都加上durable】,注意每个队列都得写,客户端、服务端声明的时候都得写。

 

# 在管道里声明

queuechannel.queue_declare(queue='hello2',durable=True)

 

测试结果发现,只是把队列持久化了,但是队列里的消息没了。【durable的作用只是把队列持久化】。离消息持久话还差一步:

 

发送端发送消息时,加上properties

properties=pika.BasicProperties(delivery_mode=2,# 消息持久化)

 

发送端 

channel.queue_declare(queue='hello2', durable=True)# 若声明过,则换一个名字

channel.basic_publish(exchange='',routing_key='hello2',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))

 

// 消息内容
String message = "Hello World!";
/*
 * 向server发布一条消息
 * 参数1:exchange名字,若为空则使用默认的exchange
 * 参数2:routing key
 * 参数3:其他的属性
 * 参数4:消息体

 * 【RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
 * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃】
 */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

 

 

问题:

1、为什么使用Channel,而不是直接使用TCP连接?

对于OS来说,【建立和关闭TCP连接是有代价的】,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且【TCP的连接数也有限制】,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。有实验表明,1s的数据可以Publish10K的数据包。当然对于不同的硬件环境,不同的数据包大小这个数据肯定不一样,但是我只想说明,对于普通的Consumer或者Producer来说,这已经足够了。如果不够用,你考虑的应该是如何细化split你的设计。

 

2、如何选择RabbitMQ的消息保存方式?

RabbitMQ对queue中message的保存有两种方式:disc(磁盘模式)和ram(内存模式)。

 

如果采用disc,则需要对exchange/queue/delivery mode都要设置成durable(持久化)模式。

disc方式的好处是当RabbitMQ失效了,message仍然可以在重启之后恢复;而使用ram方式,RabbitMQ处理message的效率要高很多,ram和disc两种方式的效率比大概是3:1。

 

所以如果在有其它HA手段保障的情况下,选用ram方式是可以提高消息队列的工作效率的。

 

3、当消息发送的速率超过了RabbitMQ的处理能力时该怎么办?

【RabbitMQ会自动减慢这个连接的速率,让client端以为网络带宽变小了,发送消息的速率会受限,从而达到流控的目的】。 使用“rabbitmqctl list_connections”查看连接,如果状态为“flow”,则说明这个连接处于flow-control 状态。

//////////////////////////////////////////////////////////////////////////

 

加载WEB插件

[root@xiaobao ~]# rabbitmq-plugins enable rabbitmq_management

该命令,仅在首次运行RMQ时使用!!!目的就是加载Web插件!!!

 

开始访问你的rabbitmq了。http://localhost:15672 

【5672是rabbitmq的默认端口,15672是后台管理的端口。 】

连接mq是使用IP 端口为5672.

 

配置文件

一般配置文件在/usr/local/etc/rabbitmq/rabbitmq-env.conf或者/etc/rabbitmq/rabbitmq-env.conf 

注意mq有两个配置文件一个rabbitmq.conf参数变量配置 ,rabbitmq-env.conf全局环境变量配置。具体没怎么用过,我就用的后面这个配置一下端口啊,日志输出路径啊,rabbitmq的名称之类的。

 

添加用户

 

//添加用户

rabbitmqctl add_user userName Password

 

//给用户添加权限:userName为用户名, Tag为角色名(对应于administrator,monitoring,policymaker,management,或其他自定义名称)。

rabbitmqctl set_user_tags userName tags

 

//设置用户的接管队列的权限,这一步可以登陆后台管理页面Admin-Users-点击用户名,然后设置。

//如果不设置这一步,是无法看到队列的,因为创建的队列默认在guest名下。

rabbitmqctl set_permissions -p "/" userName ".*" ".*" ".*"

 

way1 使用这个假如关闭窗口就会退出

 

# 需要开起来,有些会使用后台运行的rabbitmq-service -detached,但是这个我在关闭的时候使用way2方式关闭的时候不能关闭,所以很不方便。

rabbitmq-server start

# 停止rabbitmq

rabbitmq-server stop

# 查看rabbitmq的状态

rabbitmq-server status

 

way2 推荐   也可以使用下面的命令

# 需要开起来

service rabbitmq-server start

# 停止rabbitmq

service rabbitmq-server stop

# 查看rabbitmq的状态

service rabbitmq-server status

 

关于角色的描述: 

用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。

 

超级管理员(administrator):可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

监控者(monitoring) 可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

策略制定者(policymaker) 可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。

普通管理者(management) 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

 

 

磁盘与内存模式说明

rabbitmq存储消息有两种模式,一种是【磁盘模式】,一种是【内存模式】。听名字就知道,磁盘模式就是把消息存储到磁盘中,可以达到持久化的目标;内存模式就是把消息存储到内存中,读存速度上要快很多,但是一旦服务器重启就会有可能导致没有消费的消息丢失。

 

了解了两种模式后,我们知道,单机的情况基本使用的是磁盘模式,这样安全性更高。当两台做集群时,就可以一台磁盘模式,一台内存模式了。这样既可以做到保存消息持久化,另外一台速度也有保障。 

[root@xiaobao ~]# rabbitmqctl join_cluster rabbit@homename1 -arm 

在搭建集群时在后面加入-arm模式,即为内存模式,如果不加,默认为磁盘模式。

 

单机集群配置

由于rabbitMq是服务,而不是容器,所以在同一台服务上启动多个时,不需要像tomcat一样,需要拷贝多个tomcat安装文件出来。对于服务来说,我们只需要改变端口就可以启动多个服务】。 

所以单机集群就很简单了,只需要改变端口就好了。

 

 

集群之镜像模式

 

RabbitMQ的Cluster集群模式一般分为两种,普通模式和镜像模式。

 

普通模式:

默认的集群模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

 

镜像模式:

将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。

普通模式说明:队列只能存在某个节点下,node1或者node2中,在创建节点时就决定了在某个节点下。 

p1–> node1—> node2 –>c1;//队列q在node1中,p生产者连接了node1,往队列q里面发送消息,c消费者连接了node2,接收q队列消息,他们通信是node1发送给node2,node2再发送给c消费着的。 

node1挂了,p1连接node2,c1也连接node2。由于队列q是在node1中的, 此时p1,发送消息失败,提示没有该队列。 

p1–> node1—> c2; //p,c都连接了node1,他们消息直接由node1发送就OK了。 

p1–> node2—> c2;

 

 

  • 大小: 21.9 KB
  • 大小: 34.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics