Springboot 整合 RocketMQ 收发消息的配置过程

网友投稿 477 2022-11-11

Springboot 整合 RocketMQ 收发消息的配置过程

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

org.apache.rocketmq

rocketmq-spring-boot-starter

2.1.0

yml 配置

application.yml

rocketmq:

name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:

producer:

group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:

producer:

group: producer-demo2

测试

demo 1

发送普通消息

发送 Spring 的通用 Message 对象

发送异步消息

发送顺序消息

生产者

package cn.tedu.demo2.m1;

import org.apache.rocketmq.client.producer.SendCallback;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;

@Component

public class Producer {

@Autowired

private RocketMQTemplate t ;

public void send(){

//发送同步消息

t.convertAndSend("Topic1:TagA", "Hello world! ");

//发送spring的Message

Message message = MessageBuilder.withPayload("Hello Spring message! ").build();

t.send("Topic1:TagA",message);

//发送异步消息

t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

System.out.println("发送成功");

}

@Override

public void onException(Throwable throwable) {

System.out.println("发送失败");

}

});

//发送顺序消息

t.syncSendOrderly("Topic1", "98456237,创建", "98456237");

t.syncSendOrderly("Topic1", "98456237,支付", "98456237");

t.syncSendOrderly("Topic1", "98456237,完成", "98456237");

}

}

消费者

package cn.tedu.demo2.m1;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import yxpDcDWorg.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Component;

@Component

@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")

public class Consumer implements RocketMQListener {

@Override

public void onMessage(String s) {

System.out.println("收到"+s);

}

}

主类

package cn.tedu.demo2.m1;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class Main {

public static void main(String[] args) {

SpringApplication.run(Main.class, args);

}

}

测试类

需要放在 test 文件夹

激活 demo1 profile @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.ActiveProfiles;

@SpringBootTest

@ActiveProfiles("demo1")

public class Test1 {

@Autowired

private Producer producer;

@Test

public void test1(){

producer.send();

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;

@Component

public class Producer {

@Autowired

private RocketMQTemplate t;

public void send(){

Message message = MessageBuilder.withPayload("Hello world").build();

//一旦发送消息,则执行监听器

t.sendMessageInTransaction("Topic2",message,null);

}

@RocketMQTransactionListener

class Lis implements RocketMQLocalTransactionListener {

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

System.out.println("执行本地事务");

return RocketMQLocalTransactionState.UNKNOWN;

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

System.out.println("执行事务回查");

return RocketMQLocalTransactionState.COMMIT;

}

}

}

消费者

package cn.tedu.demo2.m2;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Component;

@Component

@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")

public class Consumer implements RocketMQListener {

@Override

public void onMessage(String s) {

System.out.println("收到"+s);

}

}

主类

package cn.tedu.demo2.m2;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class Main {

public static void main(String[] args) {

SpringApplication.run(Main.class, args);

}

}

测试类

package cn.tedu.demo2.m2;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.ActiveProfiles;

@SpringBootTest

@ActiveProfiles("demo2")

public class Test2 {

@Autowired

private Producer producer;

@Test

public void test1(){

producer.send();

//为了能够收到消费者消费的数据,这里通过休眠模拟等待时间

try {

Thread.sleep(30000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:varnish使用yum安装及不同域名站点
下一篇:斯威普科技展出为无人机提供续航的便携充式充电宝DronePower45K
相关文章

 发表评论

暂时没有评论,来抢沙发吧~