Linux中怎么用cat命令创建文件并写入数据
308
2023-02-09
SpringBoot整合RabbitMQ 手动应答(简单demo)
版本说明
JDK 1.8
RabbitMQ 3.7.15 Erlang 22.0
SpringBoot 2.3.3.RELEASE
// TODO 2021年1月8日 整理CentOS安装RabbitMQ流程
1. 在RabbitMQ的Web管理界面,创建test队列
参数的含义
durability:是否持久化(重启或宕机后消息依然保存)
durable 持久
transient 暂时
新建maven项目。
2. pom.xml
xmlns:xsi="http://wwwMSgNJVsjLL.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <optional>true
xmlns:xsi="http://wwwMSgNJVsjLL.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<optional>true
3. application.yaml
server:
port: 20002
spring:
rabbitmq:
# 这里我改了本地的hosts,实际地址是192.168.0.121
host: vm.com
port: 5672
virtual-host: /
username: admin
password: admin
# 开启消息确认模式
# 消息发送到交换机确认机制,是否确认回调
# publisher-confirms: true
# 是否返回回调
publisher-returns: true
template:
#开启mandatory: true, basic.return方法将消息返还给生产者
mandatory: true
listener:
simple:
# 手动应答
acknowledge-mode: manual
# 最少消费者数量
concurrency: 1
# 最多消费者数量
max-concurrency: 10
# 支持重试
retry:
enabled: true
端口
5672:RabbitMQ的通信端口
15672:Web管理界面端口
4. RabbitmqDemo.java
@SpringBootApplication
@EnableRabbit
public class RabbitmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}
5. RabbitConfig.java
@Configuration
@Slf4j
public class RabbitConfig {
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
配置RabbitMQ的消息模板。
6. 消息生产者 produce.java
@Component
public class Producer {
// @Qualifier("rabbitTemplate")
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 0; i < 5; i++) {
System.http://out.println("生产者发送消息,序号为: " + i);
rabbitTemplate.convertAndSend("test", String.valueOf(i));
}
}
}
初始化消息发送模板RabbitTemplate,@Qualifier注解用于限定具体的实现类,这里可以不指定。
7. 消息消费者 consumer.java
消费者1和消费者2均监听test队列。
不同的是,消费者1收到消息后返回确认应答basicAck。
而消费者2收到消息后返回拒绝应答basicRegect,消息被消费者拒绝后重新回到test队列中,等待下次发送给消费者。
@Component
@Slf4j
public class Consumer {
/**
* 消费者1 模拟正常处理消息的情况,消息处理完毕发送确认应答
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "test")
public void process1(Message message, Channel channel) throws IOException {
log.info("消费者1 接收消息: " + new String(message.getBody()));
log.info("消费者1 确认应答消息:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/**
* 消费者2 模拟处理消息出错的情况,消费者2向rabbitmq发送拒绝应答。
* 处理失败的消息会被重新放入ready中,再次发送给消费者,直至收到确认应答
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "test")
public void process2(Message message, Channel channel) throws IOException {
log.info("消费者2 接收消息:" + new String(message.getBody()));
log.info("消费者2 拒绝应答消息:" + new String(message.getBody()));
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
8. 测试Rabhttp://bitMqController.java
@RestController
@RequestMapping("")
public class RabbitMqController {
@Autowired
private Producer producer;
@GetMapping("/send")
public String send() {
producer.send();
return "发送完成";
}
}
9. 测试
使用postman或浏览器使用Get方法请求http://localhost:20001/send,生产者会向RabbitMQ的test队列发送5条消息:
生产者发送消息,序号为: 0
生产者发送消息,序号为: 1
生产者发送消息,序号为: 2
生产者发送消息,序号为: 3
生产者发送消息,序号为: 4
可以看出序号为2的消息3次被消费者2接收,消费者2也3次发送拒绝应答,直到第4次才被消费者1接收,并返回确认应答。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~