SpringBoot整合kafka

网友投稿 265 2022-09-26

SpringBoot整合kafka

SpringBoot整合Kafka

​​一、背景​​​​二、实现步骤​​

​​1、引入jar包​​​​2、编写生产者和消费者的配置​​

​​3、生产者配置​​​​4、消费者配置​​​​5、消费者手动提交 ack​​

​​3、编写生产者代码​​​​4、编写消费者代码​​

​​三、运行结果​​​​四、参考文档​​​​五、代码路径​​

一、背景

此处简单记录一下 ​​SpringBoot​​​ 和 ​​Kafka​​ 的整合。

二、实现步骤

1、引入jar包

org.springframework.kafka spring-kafka

2、编写生产者和消费者的配置

3、生产者配置

spring.application.name=kafka-springboot# 配置 kafka 服务器的地址,多个以逗号隔开spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094# 生产者配置spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.acks=1spring.kafka.producer.retries=0spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432

4、消费者配置

# 消费者配置# 关闭自动提交 ackspring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-commit-interval=100spring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.max-poll-records=500spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 配置监听手动提交 ack ,消费一条数据完后,立即提交spring.kafka.listener.ack-mode=manual_immediate# 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交#spring.kafka.listener.ack-mode=manualspring.kafka.listener.poll-timeout=500S

5、消费者手动提交 ack

1、​​spring.kafka.consumer.enable-auto-commit​​​ 修改成 ​​false​​​ 2、​​spring.kafka.listener.ack-mode​​ 修改成             |- ​​manual​​: 表示手动提交,但是测试下来发现是批量提交             |- ​​manual_immediate​​: 表示手动提交,当调用 ​​Acknowledgment#acknowledge​​之后立马提交。

3、编写生产者代码

@Componentpublic class KafkaProducer implements CommandLineRunner { @Autowired private KafkaTemplate kafkaTemplate; @Override public void run(String... args) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis())) .addCallback(new SuccessCallback>() { @Override public void onSuccess(SendResult result) { if (null != result.getRecordMetadata()) { System.out.println("消费发送成功 offset:" + result.getRecordMetadata().offset()); return; } System.out.println("消息发送成功"); } }, new FailureCallback() { @Override public void onFailure(Throwable throwable) { System.out.println("消费发送失败:" + throwable.getMessage()); } }); }, 0, 1, TimeUnit.SECONDS); }}

1、消费的发送使用​​KafkaTemplate​​​ 。 2、根据发送的结果知道,消息发送成功还是失败。

4、编写消费者代码

@Componentpublic class KafkaConsumer { @KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001") public void consumer(ConsumerRecord record, Acknowledgment ack) throws InterruptedException { System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接收到kafka消息,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value()); TimeUnit.SECONDS.sleep(1); ack.acknowledge(); }}

​​KafkaListener​​​:       ​​​topic​​​: 表示需要监听的队列名称       ​​​groupId​​: 表示消费者组的id

三、运行结果

四、参考文档

1、​​https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-kafka​​

五、代码路径

​​https://gitee.com/huan1993/rabbitmq/tree/master/kafka-springboot/src/main/java/com/huan/study/kafka​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring Cloud Gateway GatewayFilter的使用
下一篇:苹果或10月6日发布iPhone 12:富士康已开始小范围生产中!
相关文章

 发表评论

暂时没有评论,来抢沙发吧~