RocketMQ简单使用(一)简单消息、顺序消息、延迟消息

网友投稿 250 2022-11-19

RocketMQ简单使用(一)简单消息、顺序消息、延迟消息

研究下其简单使用。

1. 简单消息

这里使用三种消息的发送方式: 同步发送、异步发送、单向发送,以及消息的消费。

同步发送

package com.zd.bx.rocketmq.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/** * 同步发送消息: 可靠同步传输应用场景广泛,如重要通知消息、短信通知、短信营销系统等 */public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("syncProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("syncTopic" /* Topic */, "TagA" /* Tag */, "keys" + i, /* Keys */ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}

结果:

SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224D30000, offsetMsgId=C0A80D6F00002A9F000000000010B9F4, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=0], queueOffset=50]SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E00001, offsetMsgId=C0A80D6F00002A9F000000000010BABD, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=1], queueOffset=50]SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E40002, offsetMsgId=C0A80D6F00002A9F000000000010BB86, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=2], queueOffset=50]SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E80003, offsetMsgId=C0A80D6F00002A9F000000000010BC4F, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=50]...

异步发送消息

package com.zd.bx.rocketmq.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * 异步发送消息: 一般用于响应时间敏感的业务场景。 */public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("asyncProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("asyncTopic", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); }}

结果:

0 OK 7F000001631C18B4AAC25BF3A67E0000 6 OK 7F000001631C18B4AAC25BF3A67E0002 3 OK 7F000001631C18B4AAC25BF3A67E0005 1 OK 7F000001631C18B4AAC25BF3A67E0007 4 OK 7F000001631C18B4AAC25BF3A67E0004 7 OK 7F000001631C18B4AAC25BF3A67E0001 ...

单向发送

package com.zd.bx.rocketmq.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/** * 单向发送消息: 单向传输用于需要中等可靠性的情况,例如日志收集 */public class OnewayProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("oneWayTopic" /* Topic */, "TagB" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); }}

消费者

package com.zd.bx.rocketmq.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 定义一个pull 消费者// DefaultLitePullConsumer consumer2 = new DefaultLitePullConsumer("myTestConsumerGroup"); // 定义一个push消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup"); // 指定从第一条消息开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定nameserver consumer.setNamesrvAddr("192.168.13.111:9876"); // 指定消费的topic与tag consumer.subscribe("syncTopic", "*"); // 指定使用 广播模式进行消费,默认为集群模式 consumer.setMessageModel(MessageModel.BROADCASTING); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("%s Receive New Messages: %s body: %s %n", Thread.currentThread().getName(), msgs, new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}

2. 顺序消息

1. 什么是顺序消息

顺序消息是指,严格按照消息的发送顺序进行消费的消息。

默认情况下,生产者会把以 RoundRobin 轮询方式发送到不同的Queue 分区队列; 而消费消息时会从多个Queue 上拉取消息,这种情况下的发送和消费是不能保证顺序的。 如果将消息仅发送到同一个Queue 中,消费时也就从这个Queue 上拉取消息,就保证了消息的顺序性。

举个例子:订单状态队列(ORDER_STATUS), 其下有四个Queue 队列。我们发送一个订单的状态时:001未支付-》001已支付-》001发货中-》001发货成功; 这四个状态必须严格按照顺序进行消费,所以就引入了顺序消息。

2. 有序性分类

全局有序\分区有序。 全局有序是指该topic只有一个队列;分区有序是指在有多个Queue 的情况下,在定义Producer时我们指定消息队列选择器,将相关的消息发送到相同的队列,来保证在同一个队列。

3. 代码演示

全局有序很简单,只需要将生产者队列数量设置为1即可。

分区有序可以在调send 的时候传递MessageQueueSelector 进行队列的负载均衡, 负载均衡肯定会选择一个key 作为路由的值:可以是msg的ke, 也可以是调用send 的时候传递第三个参数。

package com.zd.bx.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("orderProducer"); producer.setNamesrvAddr("192.168.13.111:9876"); // 设置queue 的数量为1, 则为全局有序// producer.setDefaultTopicQueueNums(1); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.setKeys(i + ""); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { // 第一种方法是根据key 来进行路由// Integer id = Integer.valueOf(msg.getKeys()); // 第二种就是传递参数来进行路由,send 方法的第三个参数会传递到arg参数上面 Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); }}

3. 延迟消息

如果记得没错,rabbitMQ的延迟消息需要借助于消息的生存时间和死信队列实现延迟。

1. 延迟消息的等级

延迟消息的等级定义在服务端MessageStoreConfig.java 中,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

比如,如果指定等级为3, 则延迟为10s, 延迟等级从1开始。如果想增加其他的配置,可以在rocketMQ安装目录的conf 目录中进行配置。

2. 延迟消息原理

如下图:

具体实现方案是:

Producer 将消息发送到Broker 之后,Broker 先将消息发送到commitLog 文件,然后将其分发到相应的consumerqueue。 不过,在分发之前,系统会判断消息中是否带有延迟等级,没有则直接正常分发;若有:

(1) 修改消息的topic 为SCHEDULE_TOPIC_XXXX 目录。

(2) 根据延时等级,在consumequeue 目录中SCHEDULE_TOPIC_XXXX 主题下创建出相应的queueId 与consumequeue 文件(如果没有这些目录与文件)。这里需要注意:延迟等级与queueId 的对应关系为 queueId = delayLevel -1

(3) 修改消息索引单元内容。索引单元中的MessagetagHashCode 部分原本存放的是消息的Tag的Hash 值。现在修改为消息的投递时间。投递时间=消息存储时间+延迟时间, 也就是投递时间存的是其真正需要被分发回原queue 的时间。

(4) 投递延迟消息:Broker 内部有一个延迟消息服务类ScheduleMessageService,负责将消息投递到目标Topic(内部用Timer 定时器实现)

(5) 将消息重新写入commitlog,形成新的索引条目分发到相应的queue 中

3. 代码演示

package com.zd.bx.rocketmq.delay;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;public class Producer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("delayProducer"); producer.setNamesrvAddr("192.168.13.111:9876"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("delayTopicTest", ("Hello delay message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); producer.send(message); } producer.shutdown(); }}

如果想测试效果,可以先启动消费者,然后启动生产者查看其效果。

启动后到服务器rocketMQ 存储目录查看主题目录其队列信息如下:(默认在当前用户的store 目录)

[root@redisnode1 consumequeue]# pwd/root/store/consumequeue[root@redisnode1 consumequeue]# lsasyncTopic delayTopicTest oneWayTopic SCHEDULE_TOPIC_XXXX syncTopic TopicTest[root@redisnode1 consumequeue]# ls -R ./delayTopicTest/./delayTopicTest/:0 1 2 3./delayTopicTest/0:00000000000000000000./delayTopicTest/1:00000000000000000000./delayTopicTest/2:00000000000000000000./delayTopicTest/3:00000000000000000000[root@redisnode1 consumequeue]# ls -R ./SCHEDULE_TOPIC_XXXX/./SCHEDULE_TOPIC_XXXX/:2./SCHEDULE_TOPIC_XXXX/2:00000000000000000000

可以看出第一次发出延迟消息后会多出一个SCHEDULE_TOPIC_XXXX目录,内部会根据消息的延迟等级创建相应的queue 目录(queueId = delayLevel -1 , 所以queue 的目录ID为2)。

参考: ​​https://rocketmq.apache.org/docs/simple-example/​​

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:spring boot学习笔记之操作ActiveMQ指南
下一篇:解决图形化集线器中管理接口的方案
相关文章

 发表评论

暂时没有评论,来抢沙发吧~