RabbitMQ(rabbitmq默认端口)

网友投稿 301 2022-07-30

1.介绍

RabbitMQ是一个消息代理 - 一个消息系统的媒介。它的工作就是接收和转发消息。你可以把他想象成一个邮局,你把信件都放到这个邮箱中,邮递员叔叔就会把信件投递到你的收件人处。只是邮箱中放的是你的信件,而我们要使用的RabbitMQ中存放的是我们的二进制数据。

下面是RabbitMQ和消息所涉及到的一些术语。

生产(Producing)的意思就是发送。发送消息的程序就是生产者(producer)。我们一般使用"P"表示。

队列(queue)就是存在于RabbitMQ中邮箱的名称。实质上队列就是一个巨大的消息缓冲区,我们同一时刻能够处理的数据有限,所以就将这些数据按照先后顺序存在这个消息队列中,我们一点点的进行处理。

消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它绘制为"C":

2. 作用

1)程序解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2)冗余:

消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。

许多消息队列所采用的"插入-获取-删除"方式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

3)峰值处理能力:

使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

4)可恢复性:

系统的一部分组件失效时,不会影响到整个系统。

消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

5)顺序保证:

在大多使用场景下,数据处理的顺序都很重要。

大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

6)缓冲:

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

7)异步通信:

消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

3.RabbitMQ的安装

Windows系统

1.RabbitMQ 它依赖于Erlang,需要先安装Erlang。

https://erlang.org/downloads

2.运行行Erlang/OTP(otp_win64_17.5.exe)的安装步骤,安装完成后

设置ERLANG_HOME 环境变量

在开始菜单查找Erlang,点击启动 ,如下所示证明安装成功。

​ 注意:如果之前安装了Erlang的其他版本,需要卸载后在进行重新安装和设置。

3.进行RabbitMQ Server的下载与安装,直接运行rabbitmq-server-3.5.2.exe,选择要安装的目录,进行安装

​ https://rabbitmq.com/download.html

4.为了能够在任意Windows命令窗口上操控RabbitMQ服务需要在系统里加一个环境变量并且配置在系统的PHTH环境变量中。

5.检查RabbitMQ是否运行正常,打开终端,进入RabbitMQ的安装目录rabbitmq_server-3.5.2\sbin,输入rabbitmqctl status,如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常。

6.安装rabbitmq_management插件,这款插件是可以可视化的方式查看RabbitMQ 服务器实例的状态,以及操控RabbitMQ服务器。

rabbitmq-plugins enable rabbitmq_management

现在我们在浏览器中输入:http://localhost:15672 可以看到一个登录界面:

这里可以使用默认账号guest/guest登录

linux系统下安装

rabbitmq相关命令

RabbitMQ使用的是AMQP协议。这是一个用于消息传递的开放、通用的协议。针对不同编程语言有大量的RabbitMQ客户端可用。

4. 什么是AMQP?

5. 消息代理和他们所扮演的角色?

消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。

由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。

6.python中使用RabbitMQ

需要先进行模块安装

# rabbitmq官方推荐的python客户端pika模块

pip3 install pika

简单的收发程序,即单发送单接收

发送方send.py

接收方receive.py

你也许要问: 为什么要在接收方和发送方重复声明队列呢 —— 我们已经在前面的代码中声明过它了。如果我们确定了队列是已经存在的,那么我们可以不这么做,比如此前预先运行了send.py程序。可是我们并不确定哪个程序会首先运行。这种情况下,在程序中重复将队列重复声明一下是种值得推荐的做法。

工作队列,即单发送多接收

发送方sender.py

接收方receiver1.py和receiver2.py

我们将receiver1.py和receiver2.py开启,会夯住等待接收消息。

sender.py用来发布新任务,终端执行。

默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。如果添加三个消费者则会出现丢失数据的问题。

7.M消息确认ack

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

生产者,发布任务。sender.py

消费者,receiver.py

将多个receiver.py启动,然后按如下方式在终端启动sender.py

python sender.py First message.

python sender.py Second message..

python sender.py Third message...

python sender.py Fourth message....

python sender.py Fifth message.....

我们会发现现在没有消息丢失,全部被消费者接收。

忘记确认

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

8.消息持久化

如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

队列持久化:如果两个队列,a1队列没有做队列持久化,a2队列做了队列持久化,那么重启后a1队列消失,a2队列依然存在,可通过命令rabbitmqctl list_queues查看

消息持久化:还是两个队列,a2队列做了队列持久化,a3做了队列持久化和消息持久化,那么重启后a2和a3队列都存在,但是a2中的数据丢失,a3中的数据依然存在。

首先,为了不让队列消失,需要把队列声明为持久化(durable):

尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫q1的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue。

channel.queue_declare(queue='task_queue', durable=True)

这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。

另外,我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为2。

注意:消息持久化

将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)(同步)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。

9.公平调度

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

channel.basic_qos(prefetch_count=1)

关于队列大小

10.工作队列代码整合

sender.py

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Django—版本和环境的搭建(怎么配置django环境)
下一篇:python 模块import(26)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~