SpringBoot整合RabbitMQ 手动应答(简单demo)

网友投稿 308 2023-02-09

SpringBoot整合RabbitMQ 手动应答(简单demo)

版本说明

JDK 1.8

RabbitMQ 3.7.15 Erlang 22.0

SpringBoot 2.3.3.RELEASE

// TODO 2021年1月8日 整理CentOS安装RabbitMQ流程

1. 在RabbitMQ的Web管理界面,创建test队列

参数的含义

durability:是否持久化(重启或宕机后消息依然保存)

durable 持久

transient 暂时

新建maven项目。

2. pom.xml

xmlns:xsi="http://wwwMSgNJVsjLL.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.3.3.RELEASE

com.demo

rabbitmq-demo

1.0.0

1.18.12

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-amqp

org.projectlombok

lombok

<optional>true

provided

${lombok.version}

org.springframework.boot

spring-boot-maven-plugin

xmlns:xsi="http://wwwMSgNJVsjLL.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.3.3.RELEASE

com.demo

rabbitmq-demo

1.0.0

1.18.12

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-amqp

org.projectlombok

lombok

<optional>true

provided

${lombok.version}

org.springframework.boot

spring-boot-maven-plugin

3. application.yaml

server:

port: 20002

spring:

rabbitmq:

# 这里我改了本地的hosts,实际地址是192.168.0.121

host: vm.com

port: 5672

virtual-host: /

username: admin

password: admin

# 开启消息确认模式

# 消息发送到交换机确认机制,是否确认回调

# publisher-confirms: true

# 是否返回回调

publisher-returns: true

template:

#开启mandatory: true, basic.return方法将消息返还给生产者

mandatory: true

listener:

simple:

# 手动应答

acknowledge-mode: manual

# 最少消费者数量

concurrency: 1

# 最多消费者数量

max-concurrency: 10

# 支持重试

retry:

enabled: true

端口

5672:RabbitMQ的通信端口

15672:Web管理界面端口

4. RabbitmqDemo.java

@SpringBootApplication

@EnableRabbit

public class RabbitmqDemoApplication {

public static void main(String[] args) {

SpringApplication.run(RabbitmqDemoApplication.class, args);

}

}

5. RabbitConfig.java

@Configuration

@Slf4j

public class RabbitConfig {

private RabbitTemplate rabbitTemplate;

@Bean

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

rabbitTemplate = new RabbitTemplate(connectionFactory);

return rabbitTemplate;

}

}

配置RabbitMQ的消息模板。

6. 消息生产者 produce.java

@Component

public class Producer {

// @Qualifier("rabbitTemplate")

@Autowired

private RabbitTemplate rabbitTemplate;

public void send() {

for (int i = 0; i < 5; i++) {

System.http://out.println("生产者发送消息,序号为: " + i);

rabbitTemplate.convertAndSend("test", String.valueOf(i));

}

}

}

初始化消息发送模板RabbitTemplate,@Qualifier注解用于限定具体的实现类,这里可以不指定。

7. 消息消费者 consumer.java

消费者1和消费者2均监听test队列。

不同的是,消费者1收到消息后返回确认应答basicAck。

而消费者2收到消息后返回拒绝应答basicRegect,消息被消费者拒绝后重新回到test队列中,等待下次发送给消费者。

@Component

@Slf4j

public class Consumer {

/**

* 消费者1 模拟正常处理消息的情况,消息处理完毕发送确认应答

* @param message

* @param channel

* @throws IOException

*/

@RabbitListener(queues = "test")

public void process1(Message message, Channel channel) throws IOException {

log.info("消费者1 接收消息: " + new String(message.getBody()));

log.info("消费者1 确认应答消息:" + new String(message.getBody()));

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

}

/**

* 消费者2 模拟处理消息出错的情况,消费者2向rabbitmq发送拒绝应答。

* 处理失败的消息会被重新放入ready中,再次发送给消费者,直至收到确认应答

* @param message

* @param channel

* @throws IOException

*/

@RabbitListener(queues = "test")

public void process2(Message message, Channel channel) throws IOException {

log.info("消费者2 接收消息:" + new String(message.getBody()));

log.info("消费者2 拒绝应答消息:" + new String(message.getBody()));

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

}

}

8. 测试Rabhttp://bitMqController.java

@RestController

@RequestMapping("")

public class RabbitMqController {

@Autowired

private Producer producer;

@GetMapping("/send")

public String send() {

producer.send();

return "发送完成";

}

}

9. 测试

使用postman或浏览器使用Get方法请求http://localhost:20001/send,生产者会向RabbitMQ的test队列发送5条消息:

生产者发送消息,序号为: 0

生产者发送消息,序号为: 1

生产者发送消息,序号为: 2

生产者发送消息,序号为: 3

生产者发送消息,序号为: 4

可以看出序号为2的消息3次被消费者2接收,消费者2也3次发送拒绝应答,直到第4次才被消费者1接收,并返回确认应答。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:swagger添加权限验证保证API(接口)安全性(两种方法)
下一篇:MyBatis 实现批量插入和删除中双层循环的写法案例
相关文章

 发表评论

暂时没有评论,来抢沙发吧~