OUT了吧,Kafka能实现消息延时了

网友投稿 259 2022-10-06

OUT了吧,Kafka能实现消息延时了

摘要:本文讲述如何在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

本文分享自华为云社区《​​Kafka也能实现消息延时了?​​》,作者:HuaweiCloudDeveloper 。

1、背景

Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

2、开发环境

3、云服务介绍

​​分布式消息服务Kafka版:​​ 华为云分布式消息服务Kafka版是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用华为云分布式消息服务Kafka版,资源按需申请,按需配置Topic的分区与副本数量,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。

4、方案设计

i、方案简述

此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。

ii、方案架构图

Kafka消息延时方案架构图

Kafka消息延时实现思路

生产者将生产消息存入topic_delay主题中进行存储。将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。取值判断是否满足延时要求。a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。b. 如果不满足延时要求,则等待自定义时间后重试判断。消费者最终从topic_out主题中拉取消息进行消费。

iii、方案时序图

Kafka消息延时方案时序图

5、代码参数指南

本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考。 如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。

Delay.java参数详情

delay:自定义延时时间,单位ms。topic_delay变量:用于临时存储消息的topic名称。topic_out变量:用于消费者拉取消息消费的topic名称。关于消费者和生产者配置可按需配置,​​可参考Kafka官方文档:​​com.dms.delay;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.time.Duration;import java.util.Arrays;import java.util.Date;import java.util.Properties;import java.util.concurrent.ConcurrentLinkedQueue;/** * Hello world! * */public class Delay { //缓存队列 public static ConcurrentLinkedQueue> link = new ConcurrentLinkedQueue(); //延迟时间(20秒),可根据需要设置延迟大小 public static long delay = 20000L; /** *入口 * @param args */ public static void main( String[] args ) { //延时主题(用于控制延时缓冲) String topic_delay = "topic_delay"; //输出主题(直接供消费者消费) String topic_out = "topic_out"; /* 消费线程 */ new Thread(new Runnable() { @Override public void run() { //消费者配置。请根据需要自行设置Kafka配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者 KafkaConsumer consumer = new KafkaConsumer<>(props); //指定消费主题 consumer.subscribe(Arrays.asList(topic_delay)); while (true) { //轮询消费 ConsumerRecords records = consumer.poll(Duration.ofMillis(10)); //遍历当前轮询批次拉取到的消息 for (ConsumerRecord record : records){ System.out.println(record); //将消息添加到缓存队列 link.add(record); } } } }).start(); /* 生产线程 */ new Thread(new Runnable() { @Override public void run() { //生产者配置(请根据需求自行配置) Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //创建生产者 Producer producer = new KafkaProducer<>(props); //持续从缓存队列中获取消息 while(true){ //如果缓存队列为空则放缓取值速度 if(link.isEmpty()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //获取缓存队列栈顶消息 ConsumerRecord record = link.peek(); //获取该消息时间戳 long timestamp = record.timestamp(); Date now = new Date(); long nowTime = now.getTime(); if(timestamp+ Delay.delay (topic_out, "",value)); //从缓存队列中移除该消息 link.poll(); }else { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); }}

7、结果反馈

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:还在为建第二灾备中心而发愁?为您介绍可以按需快速搭建的方案
下一篇:利用Java编写一个属于自己的日历
相关文章

 发表评论

暂时没有评论,来抢沙发吧~