Docker下RabbitMQ延时队列实战两部曲之二:细说开发

网友投稿 215 2022-10-19

Docker下RabbitMQ延时队列实战两部曲之二:细说开发

欢迎访问我的GitHub

本章是《Docker下RabbitMQ延时队列实战两部曲》的终篇,上一章《Docker下RabbitMQ延时队列实战两部曲之一:极速体验》我们快速体验了延时队列的生产和消费,今天来实战整个开发过程;

本章涉及的脚本和源码下载

本章会开发一个yml脚本,三个基于SpringBoot的应用,功能如下:

docker-compose.yml:启动所有容器的docker-compose脚本; delayrabbitmqconsumer:SpringBoot框架的应用,连接RabbitMQ的两个队列,消费消息; messagettlproducer:SpringBoot框架的应用,收到web请求后向RabbitMQ发送消息,消息中带有过期时间(TTL); queuettlproducer:SpringBoot框架的应用,收到web请求后向RabbitMQ发送消息,消息中不带过期时间(TTL),但是对应的消息队列已经设置了过期时间;

上述脚本和工程的源码都可以在github下载,地址和链接信息如下表所示:

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

环境信息

操作系统:Ubuntu 16.04.3 LTS Docker:1.12.6 RabbitMQ:3.7.5-rc.1 JDK:1.8.0_111 SpringBoot:1.4.1.RELEASE Maven:3.5.0

开发步骤

本次开发实战的步骤如下: 开发messagettlproducer应用,制作镜像; 开发queuettlproducer应用,制作镜像; 开发delayrabbitmqconsumer应用,制作镜像; 开发docker-compose.yml脚本;

messagettlproducer应用

messagettlproducer是个基于SpringBoot的web工程,有一个Controller可以响应web请求,收到请求后发送一条带有过期时间的消息到RabbitMQ的message.ttl.queue.source队列; pom.xml内容如下: 4.0.0 com.bolingcavalry messagettlproducer 0.0.1-SNAPSHOT jar messagettlproducer Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 1.4.1.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin com.spotify docker-maven-plugin 0.4.12 bolingcavalry/${project.artifactId} ${project.version} java:8u111-jdk ["java", "-jar", "/${project.build.finalName}.jar"] / ${project.build.directory} ${project.build.finalName}.jar 上面的内容中有以下两点需要注意:a. 添加对spring-boot-starter-amqp的依赖,这里面是操作RabbitMQ所需的库;b. 添加docker-maven-plugin插件,可以将当前工程直接制作成Docker镜像; src/main/resources文件夹下面创建application.properties文件,内容如下,只配置了应用名称和RabbitMQ的virtualHost路径: spring.application.name=messagettlproducer mq.rabbit.virtualHost=/ RabbitTemplateConfig.java文件中是应用连接RabbitMQ的配置信息: @Configuration public class RabbitTemplateConfig { @Value("${mq.rabbit.address}") String address; @Value("${mq.rabbit.username}") String username; @Value("${mq.rabbit.password}") String password; @Value("${mq.rabbit.virtualHost}") String mqRabbitVirtualHost; //创建mq连接 @Bean(name = "connectionFactory") public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(mqRabbitVirtualHost); connectionFactory.setPublisherConfirms(true); //该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host connectionFactory.setAddresses(address); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } } 上面的代码有以下几点要注意:a. address、username、password这些变量的值,是从操作系统的环境变量中获取的,我们在启动Docker容器的时候将这些值配置到容器的环境变量中,程序运行的时候就能取到了;b. connectionFactory()方法根据上述配置参数和RabbitMQ建立连接;c. rabbitTemplate()创建RabbitTemplate对象,我们可以在其他Bean中通过Autowire使用; MessageTtlRabbitConfig.java类中是和消息队列相关的配置: /** * 成为死信后,重新发送到的交换机的名称 */ @Value("${message.ttl.exchange}") private String MESSAGE_TTL_EXCHANGE_NAME; /** * 不会被消费的队列,投递到此队列的消息会成为死信 */ @Value("${message.ttl.queue.source}") private String MESSAGE_TTL_QUEUE_SOURCE; /** * 该队列被绑定到接收死信的交换机 */ @Value("${message.ttl.queue.process}") private String MESSAGE_TTL_QUEUE_PROCESS; /** * 配置一个队列,该队列的消息如果没有被消费,就会投递到死信交换机中,并且带上指定的routekey * @return */ @Bean Queue messageTtlQueueSource() { return QueueBuilder.durable(MESSAGE_TTL_QUEUE_SOURCE) .withArgument("x-dead-letter-exchange", MESSAGE_TTL_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", MESSAGE_TTL_QUEUE_PROCESS) .build(); } @Bean("messageTtlQueueProcess") Queue messageTtlQueueProcess() { return QueueBuilder.durable(MESSAGE_TTL_QUEUE_PROCESS) .build(); } @Bean("messageTtlExchange") DirectExchange messageTtlExchange() { return new DirectExchange(MESSAGE_TTL_EXCHANGE_NAME); } /** * 绑定指定的队列到死信交换机上 * @param messageTtlQueueProcess * @param messageTtlExchange * @return */ @Bean Binding bindingExchangeMessage(@Qualifier("messageTtlQueueProcess") Queue messageTtlQueueProcess, @Qualifier("messageTtlExchange") DirectExchange messageTtlExchange) { System.out.println("11111111111111111111111111111111111111111111111111"); System.out.println("11111111111111111111111111111111111111111111111111"); System.out.println("11111111111111111111111111111111111111111111111111"); System.out.println("11111111111111111111111111111111111111111111111111"); return BindingBuilder.bind(messageTtlQueueProcess) .to(messageTtlExchange) .with(MESSAGE_TTL_QUEUE_PROCESS); } 上面的代码有以下几点要注意:a. MESSAGE_TTL_EXCHANGE_NAME、MESSAGE_TTL_QUEUE_SOURCE、MESSAGE_TTL_QUEUE_PROCESS这些变量的值,是从操作系统的环境变量中获取的,我们在启动Docker容器的时候将这些值配置到容器的环境变量中,程序运行的时候就能取到了;b. connectionFactory()方法根据上述配置参数和RabbitMQ建立连接;c. rabbitTemplate()创建RabbitTemplate对象,我们可以在其他Bean中通过Autowire使用;d. messageTtlQueueSource()方法创建了一个队列用于投递消息,通过x-dead-letter-exchange和x-dead-letter-routing-key这两个参数,设置了队列消息过期后转发的交换机名称,以及携带的routing key; 为了设置消息过期,我们还要定制一个ExpirationMessagePostProcessor类,作用是将给消息类设置过期时间,后面发送消息时会用到这个类: package com.bolingcavalry.messagettlproducer;

import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;

/**

@Description : @Author : zq2599@gmail.com @Date : 2018-06-02 23:33*/public class ExpirationMessagePostProcessor implements MessagePostProcessor {private final Long ttl; // 毫秒 public ExpirationMessagePostProcessor(Long ttl) {this.ttl = ttl;} @Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties() .setExpiration(ttl.toString()); // 设置per-message的失效时间return message;}}

用于处理web请求的SendMessageController 类,源码如下: /** * @Description : 用于生产消息的web接口类 * @Author : zq2599@gmail.com * @Date : 2018-06-02 23:00 */ @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @Value("${message.ttl.queue.source}") private String MESSAGE_TTL_QUEUE_SOURCE; /** * 生产一条消息,消息中带有过期时间 * @param name * @param message * @param delaytime * @return */ @RequestMapping(value = "/messagettl/{name}/{message}/{delaytime}", method = RequestMethod.GET) public @ResponseBody String messagettl(@PathVariable("name") final String name, @PathVariable("message") final String message, @PathVariable("delaytime") final int delaytime) { SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String timeStr = simpleDateFormat.format(new Date()); String queueName = MESSAGE_TTL_QUEUE_SOURCE; String sendMessage = String.format("hello, %s , %s, from queue [%s], delay %d's, %s", name, message, MESSAGE_TTL_QUEUE_SOURCE, delaytime, timeStr); rabbitTemplate.convertAndSend(MESSAGE_TTL_QUEUE_SOURCE, (Object)sendMessage, new ExpirationMessagePostProcessor(delaytime*1000L)); return "send message to [" + name + "] success , queue is : " + queueName + " (" + timeStr + ")"; } } 如上所示,发送消息的代码很简单,调用rabbitTemplate的convertAndSend就能发送消息到message.ttl.queue.source队列(指定路由键的Direct方式),再传入ExpirationMessagePostProcessor作为处理消息的工具; 以上就是messagettlproducer应用的主要代码介绍,编码完毕后,在pom.xml文件所在目录执行mvn clean package -U -DskipTests docker:build,即可编译、构建、制作Docker镜像;

queuettlproducer应用

queuettlproducer和messagettlproducer极为相似,都是接受web请求后向RabbitMQ发送消息,不同之处有以下两点: queuettlproducer在绑定队列的时候,会设置队列上所有消息的过期时间,messagettlproducer没做这个设置; queuettlproducer在发送消息的时候,没有设置该消息的过期时间,messagettlproducer会对每条消息都设置过期时间; 因此,queuettlproducer和messagettlproducer这两个应用的代码大部分是相同的,这里只要关注不同的部分即可; 队列和交换机的配置类,QueueTtlRabbitConfig: @Configuration public class QueueTtlRabbitConfig { /** * 成为死信后,重新发送到的交换机的名称 */ @Value("${queue.ttl.exchange}") private String QUEUE_TTL_EXCHANGE_NAME; /** * 不会被消费的队列,投递到此队列的消息会成为死信 */ @Value("${queue.ttl.queue.source}") private String QUEUE_TTL_QUEUE_SOURCE; /** * 该队列被绑定到接收死信的交换机 */ @Value("${queue.ttl.queue.process}") private String QUEUE_TTL_QUEUE_PROCESS; @Value("${queue.ttl.value}") private long QUEUE_TTL_VALUE; /** * 配置一个队列,该队列有消息过期时间,消息如果没有被消费,就会投递到死信交换机中,并且带上指定的routekey * @return */ @Bean Queue queueTtlQueueSource() { return QueueBuilder.durable(QUEUE_TTL_QUEUE_SOURCE) .withArgument("x-dead-letter-exchange", QUEUE_TTL_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", QUEUE_TTL_QUEUE_PROCESS) .withArgument("x-message-ttl", QUEUE_TTL_VALUE) .build(); } @Bean("queueTtlQueueProcess") Queue queueTtlQueueProcess() { return QueueBuilder.durable(QUEUE_TTL_QUEUE_PROCESS) .build(); } @Bean("queueTtlExchange") DirectExchange queueTtlExchange() { return new DirectExchange(QUEUE_TTL_EXCHANGE_NAME); } /** * 绑定 * @param queueTtlQueueProcess * @param queueTtlExchange * @return */ @Bean Binding bindingExchangeMessage(@Qualifier("queueTtlQueueProcess") Queue queueTtlQueueProcess, @Qualifier("queueTtlExchange") DirectExchange queueTtlExchange) { System.out.println("22222222222222222222222222222222222222222222222222"); System.out.println("22222222222222222222222222222222222222222222222222"); System.out.println("22222222222222222222222222222222222222222222222222"); System.out.println("22222222222222222222222222222222222222222222222222"); return BindingBuilder.bind(queueTtlQueueProcess) .to(queueTtlExchange) .with(QUEUE_TTL_QUEUE_PROCESS); } } 上述代码请注意以下两点:a. queueTtlQueueSource()方法用来设置队列,除了x-dead-letter-exchange和x-dead-letter-routing-key这两个参数,还多了x-message-ttl,此参数对应的值就是进入该队列的每一条消息的过期时间;b. bindingExchangeMessage()方法将队列queue.ttl.queue.source绑定到Direct模式的交换机; 处理web请求的SendMessageController类: @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @Value("${queue.ttl.queue.source}") private String QUEUE_TTL_QUEUE_SOURCE; /** * 生产一条消息,消息中不带过期时间,但是对应的队列中已经配置了过期时间 * @param name * @param message * @return */ @RequestMapping(value = "/queuettl/{name}/{message}", method = RequestMethod.GET) public @ResponseBody String queuettl(@PathVariable("name") final String name, @PathVariable("message") final String message) { SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String timeStr = simpleDateFormat.format(new Date()); String queueName = QUEUE_TTL_QUEUE_SOURCE; String sendMessage = String.format("hello, %s , %s, from queue [%s], %s", name, message, queueName, timeStr); rabbitTemplate.convertAndSend(queueName, sendMessage); return "send message to [" + name + "] success , queue is : " + queueName + " (" + timeStr + ")"; } } 如上所示,发送消息时只有routing key和消息对象这两个参数; 以上就是发送消息到队列的应用源码,编码完毕后,在pom.xml文件所在目录执行mvn clean package -U -DskipTests docker:build,即可编译、构建、制作Docker镜像; 接下来我们看看消息消费者工程delayrabbitmqconsumer的源码;

delayrabbitmqconsumer应用

delayrabbitmqconsumer应用连接到消息队列,消费收到的每条消息; RabbitTemplateConfig.java是连接到RabbitMQ的配置信息,和前面两个应用一样,不再赘述; 消费message.ttl.queue.process这个队列发出的消息,对应实现类是MessageTtlReceiver: /** * @Description : 消息接受类,接收第一类延时消息(在每条消息中指定过期时间)的转发结果 * @Author : zq2599@gmail.com * @Date : 2018-06-03 9:52 */ @Component @RabbitListener(queues = "${message.ttl.queue.process}") public class MessageTtlReceiver { private static final Logger logger = LoggerFactory.getLogger(MessageTtlReceiver.class); @RabbitHandler public void process(String message) { logger.info("receive message : " + message); } } 如上所示,只要用注解RabbitListener配置好队列的名称即可,编码完毕后,在pom.xml文件所在目录执行mvn clean package -U -DskipTests docker:build,即可编译、构建、制作Docker镜像;

docker-compose.yml配置

最后我们看一下所有容器的配置文件docker-compose.yml: version: '2' services: rabbit1: image: bolingcavalry/rabbitmq-server:0.0.3 hostname: rabbit1 ports: - "15672:15672" environment: - RABBITMQ_DEFAULT_USER=admin - RABBITMQ_DEFAULT_PASS=888888 rabbit2: image: bolingcavalry/rabbitmq-server:0.0.3 hostname: rabbit2 depends_on: - rabbit1 links: - rabbit1 environment: - CLUSTERED=true - CLUSTER_WITH=rabbit1 - RAM_NODE=true - HA_ENABLE=true ports: - "15673:15672" rabbit3: image: bolingcavalry/rabbitmq-server:0.0.3 hostname: rabbit3 depends_on: - rabbit2 links: - rabbit1 - rabbit2 environment: - CLUSTERED=true - CLUSTER_WITH=rabbit1 ports: - "15675:15672" messagettlproducer: image: bolingcavalry/messagettlproducer:0.0.1-SNAPSHOT hostname: messagettlproducer depends_on: - rabbit3 links: - rabbit1:rabbitmqhost1 - rabbit2:rabbitmqhost2 - rabbit3:rabbitmqhost3 ports: - "18080:8080" environment: - mq.rabbit.address=rabbitmqhost1:5672,rabbitmqhost2:5672,rabbitmqhost3:5672 - mq.rabbit.username=admin - mq.rabbit.password=888888 - message.ttl.exchange=message.ttl.exchange - message.ttl.queue.source=message.ttl.queue.source - message.ttl.queue.process=message.ttl.queue.process queuettlproducer: image: bolingcavalry/queuettlproducer:0.0.1-SNAPSHOT hostname: queuettlproducer depends_on: - messagettlproducer links: - rabbit1:rabbitmqhost1 - rabbit2:rabbitmqhost2 - rabbit3:rabbitmqhost3 ports: - "18081:8080" environment: - mq.rabbit.address=rabbitmqhost1:5672,rabbitmqhost2:5672,rabbitmqhost3:5672 - mq.rabbit.username=admin - mq.rabbit.password=888888 - queue.ttl.exchange=queue.ttl.exchange - queue.ttl.queue.source=queue.ttl.queue.source - queue.ttl.queue.process=queue.ttl.queue.process - queue.ttl.value=5000 delayrabbitmqconsumer: image: bolingcavalry/delayrabbitmqconsumer:0.0.1-SNAPSHOT hostname: delayrabbitmqconsumer depends_on: - queuettlproducer links: - rabbit1:rabbitmqhost1 - rabbit2:rabbitmqhost2 - rabbit3:rabbitmqhost3 environment: - mq.rabbit.address=rabbitmqhost1:5672,rabbitmqhost2:5672,rabbitmqhost3:5672 - mq.rabbit.username=admin - mq.rabbit.password=888888 - message.ttl.queue.process=message.ttl.queue.process - queue.ttl.queue.process=queue.ttl.queue.process 上述配置文件有以下几点需要注意: rabbit1、rabbit2、rabbit3是RabbitMQ高可用集群,如果您对RabbitMQ高可用集群感兴趣,推荐您请看《Docker下RabbitMQ四部曲》系列文章; 三个SpringBoot应用都配置了mq.rabbit.address参数,值是三个RabbitMQ server的IP加端口,这样如果RabbitMQ集群中有一台机器故障了也不会影响正常的消息收发; 使用了link参数后,容器内就能通过link的参数取代对应的IP; 至此,Docker下的RabbitMQ延时队列实战就完成了,实战中Docker发挥的作用并不大,只是用来快速搭建环境,关键还是三个工程中对队列的各种操作,希望本系列能帮助您快速构建延时队列相关服务;

欢迎关注51CTO博客:程序员欣宸

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java实现超简单抖音去水印的示例详解
下一篇:G022-CON-CKA-DOC-02 Docker 架构及与虚拟机区别
相关文章

 发表评论

暂时没有评论,来抢沙发吧~