【2020Python修炼记】python并发编程(三)多进程-应用部分(python 并发编程)
315
2022-07-30
1.介绍
RabbitMQ是一个消息代理 - 一个消息系统的媒介。它的工作就是接收和转发消息。你可以把他想象成一个邮局,你把信件都放到这个邮箱中,邮递员叔叔就会把信件投递到你的收件人处。只是邮箱中放的是你的信件,而我们要使用的RabbitMQ中存放的是我们的二进制数据。
下面是RabbitMQ和消息所涉及到的一些术语。
生产(Producing)的意思就是发送。发送消息的程序就是生产者(producer)。我们一般使用"P"表示。
队列(queue)就是存在于RabbitMQ中邮箱的名称。实质上队列就是一个巨大的消息缓冲区,我们同一时刻能够处理的数据有限,所以就将这些数据按照先后顺序存在这个消息队列中,我们一点点的进行处理。
消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它绘制为"C":
2. 作用
1)程序解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
许多消息队列所采用的"插入-获取-删除"方式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3)峰值处理能力:
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
4)可恢复性:
系统的一部分组件失效时,不会影响到整个系统。
消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
5)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。
大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
6)缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
7)异步通信:
消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
3.RabbitMQ的安装
Windows系统
1.RabbitMQ 它依赖于Erlang,需要先安装Erlang。
https://erlang.org/downloads
2.运行行Erlang/OTP(otp_win64_17.5.exe)的安装步骤,安装完成后
设置ERLANG_HOME 环境变量
在开始菜单查找Erlang,点击启动 ,如下所示证明安装成功。
注意:如果之前安装了Erlang的其他版本,需要卸载后在进行重新安装和设置。
3.进行RabbitMQ Server的下载与安装,直接运行rabbitmq-server-3.5.2.exe,选择要安装的目录,进行安装
https://rabbitmq.com/download.html
4.为了能够在任意Windows命令窗口上操控RabbitMQ服务需要在系统里加一个环境变量并且配置在系统的PHTH环境变量中。
5.检查RabbitMQ是否运行正常,打开终端,进入RabbitMQ的安装目录rabbitmq_server-3.5.2\sbin,输入rabbitmqctl status,如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常。
6.安装rabbitmq_management插件,这款插件是可以可视化的方式查看RabbitMQ 服务器实例的状态,以及操控RabbitMQ服务器。
rabbitmq-plugins enable rabbitmq_management
现在我们在浏览器中输入:http://localhost:15672 可以看到一个登录界面:
这里可以使用默认账号guest/guest登录
linux系统下安装
rabbitmq相关命令
RabbitMQ使用的是AMQP协议。这是一个用于消息传递的开放、通用的协议。针对不同编程语言有大量的RabbitMQ客户端可用。
4. 什么是AMQP?
5. 消息代理和他们所扮演的角色?
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。
由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。
6.python中使用RabbitMQ
需要先进行模块安装
# rabbitmq官方推荐的python客户端pika模块
pip3 install pika
简单的收发程序,即单发送单接收
发送方send.py
接收方receive.py
你也许要问: 为什么要在接收方和发送方重复声明队列呢 —— 我们已经在前面的代码中声明过它了。如果我们确定了队列是已经存在的,那么我们可以不这么做,比如此前预先运行了send.py程序。可是我们并不确定哪个程序会首先运行。这种情况下,在程序中重复将队列重复声明一下是种值得推荐的做法。
工作队列,即单发送多接收
发送方sender.py
接收方receiver1.py和receiver2.py
我们将receiver1.py和receiver2.py开启,会夯住等待接收消息。
sender.py用来发布新任务,终端执行。
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。如果添加三个消费者则会出现丢失数据的问题。
7.M消息确认ack
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
生产者,发布任务。sender.py
消费者,receiver.py
将多个receiver.py启动,然后按如下方式在终端启动sender.py
python sender.py First message.
python sender.py Second message..
python sender.py Third message...
python sender.py Fourth message....
python sender.py Fifth message.....
我们会发现现在没有消息丢失,全部被消费者接收。
忘记确认
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。
为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:
8.消息持久化
如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
队列持久化:如果两个队列,a1队列没有做队列持久化,a2队列做了队列持久化,那么重启后a1队列消失,a2队列依然存在,可通过命令rabbitmqctl list_queues查看
消息持久化:还是两个队列,a2队列做了队列持久化,a3做了队列持久化和消息持久化,那么重启后a2和a3队列都存在,但是a2中的数据丢失,a3中的数据依然存在。
首先,为了不让队列消失,需要把队列声明为持久化(durable):
尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫q1的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue。
channel.queue_declare(queue='task_queue', durable=True)
这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。
另外,我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为2。
注意:消息持久化
将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)(同步)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。
9.公平调度
这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
channel.basic_qos(prefetch_count=1)
关于队列大小
10.工作队列代码整合
sender.py
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~