mysql连接测试不成功的原因有哪些
257
2022-10-09
Springboot 2.x集成kafka 2.2.0的示例代码
目录引言基本环境代码编写1、基本引用pom2、基本配置3、实体类4、生产者端5、消费者6、测试效果展示遇到的问题
引言
kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。
版本对应地址:https://spring.io/projects/spring-kafka
基本环境
springboot版本2.1.4
kafka版本2.2.0
jdk 1.8
代码编写
1、基本引用pom
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2、基本配置
spring.kafka.bootstrap-servers=2.1.1.1:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#logging.level.root=debug
3、实体类
package com.example.demo.model;
import java.util.Date;
public class Messages {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
4、生产者端
package com.example.demo.service;
import com.example.demo.model.Messages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Date;
import java.util.UUID;
@Service
public class KafkaSender {
@Autowired
private KafkEzpCHVtvwEaTemplate
private Gson gson = new GsonBuilder().create();
public void send() {
Messages message = new Messages();
message.setId(System.currentTimeMillis());
message.setMsg("123");
message.setSendTime(new Date());
ListenableFuture
}
}
5、消费者
package com.example.demo.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class KafkaReceiver {
@KafkaListener(topics = {"newtopic"})
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("record =" + record);
System.out.println("message =" + message);
}
}
}
6、测试
在启动方法中模拟消息生产者,向kafka中发送消息
package com.example.demo;
import com.example.demo.service.KafkaSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class KafkademoApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i <1000; i++) {
sender.send();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
效果展示
命令行直接消费消息
遇到的问题
生产端连接kafka超时
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
解决方案:
修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~