一、kafka安装部署

网友投稿 242 2022-09-25

一、kafka安装部署

Step 1: 下载代码

Step 2: 启动服务

运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper(PS:在kafka包里)。

//这是前台启动,启动以后,当前就无法进行其他操作(不推荐)./zookeeper-server-start.sh ../config/zookeeper.properties//后台启动(推荐)./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &

现在启动​​kafka​​

config/server1.properties: broker.id=0 listeners=PLAINTEXT://192.168.10.130:9092 log.dirs=kafka-logs zookeeper.connect=localhost:2181

//后台启动kafka./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

Step 3:创建一个主题

创建一个名为“test”的Topic,只有一个分区和备份(2181是zookeeper的默认端口)

./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test

命令解析:--create: 指定创建topic动作--topic:指定新建topic的名称--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样--config:指定当前topic上有效的参数值,参数列表参考文档为: --list --zookeeper localhost:2181test

或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

补充:   (1)查看对应topic的描述信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test0

命令解析:--describe: 指定是展示详细信息命令--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样--topic:指定需要展示数据的topic名称

(2)Topic信息修改

bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --config max.message.bytes=128000bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --delete-config max.message.bytesbin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 10 bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 3 ## Kafka分区数量只允许增加,不允许减少

(3)Topic删除   默认情况下Kafka的Topic是没法直接删除的,需要进行相关参数配置

bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181

加粗样式​​​Note: This will have no impact if delete.topic.enable is not set to true​​​.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:   方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可   方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

Step 4: 发送消息

​​Kafka​​​提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给​​Kafka​​集群。每一行是一条消息。

运行producer(生产者),然后在控制台输入几条消息到服务器。  备注:这里的localhost:9092不是固定的,需要根据server.properties中配置的地址来写这里的地址!

[root@administrator bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test>this is a message>this is another message//按`Ctrl+C`终止输入

Step 5: 消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。  备注:这里的localhost:9092不是固定的,需要根据server.properties中配置的地址来写这里的地址!

[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningthis is a messagethis is another message//按`Ctrl+C`终止读取消息

如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

Step 6: 设置多个broker集群(单机伪集群的配置)

到目前,我们只是单一的运行一个broker,没什么意思。对于Kafka,一个broker仅仅只是一个集群的大小,所有让我们多设几个broker。

首先为每个broker创建一个配置文件:

cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties

现在编辑这些新建的文件,设置以下属性:

vim config/server.properties

config/server1.properties: broker.id=0 listeners=PLAINTEXT://192.168.10.130:9092 log.dirs=kafka-logs zookeeper.connect=localhost:2181 config/server-1.properties: broker.id=1 listeners=PLAINTEXT://192.168.10.130:9093 log.dirs=kafka-logs-1 zookeeper.connect=localhost:2181config/server-2.properties: broker.id=2 listeners=PLAINTEXT://192.168.10.130:9094 log.dirs=kafka-logs-2 zookeeper.connect=localhost:2181

备注1:​​listeners​​​一定要配置成为IP地址;如果配置为​​localhost​​​或服务器的​​hostname​​​,在使用​​java​​​发送数据时就会抛出异 常:​​org.apache.kafka.common.errors.TimeoutException: Batch Expired 。​​​因为在没有配置​​advertised.host.name​​​ 的情况下,​​Kafka​​​并没有像官方文档宣称的那样改为广播我们配置的​​host.name​​​,而是广播了主机配置的​​hostname​​​。远端的客户端并没有配置 ​​hosts​​​,所以自然是连接不上这个​​hostname​​的。

备注2:当使用java客户端访问远程的kafka时,一定要把集群中所有的端口打开,否则会连接超时

/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT/sbin/iptables -I INPUT -p tcp --dport 9093 -j ACCEPT/sbin/iptables -I INPUT -p tcp --dport 9094 -j ACCEPT/etc/rc.d/init.d/iptables save

​​broker.id​​​是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止​​broker​​在同一端口上注册和覆盖对方的数据。

我们已经运行了​​zookeeper​​​和刚才的一个​​kafka​​​节点,所有我们只需要在启动2个新的​​kafka​​节点。

./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 &./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &

现在,我们创建一个新topic,把备份设置为:3

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic//所有分区的摘要Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs://提供一个分区信息,因为我们只有一个分区,所以只有一行。Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

“leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。“replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader

其中​​Replicas​​​和​​Isr​​​中的​​1,2,0​​​就对应着3个​​broker​​​他们的​​broker.id​​属性!

我们运行这个命令,看看一开始我们创建的那个节点:

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

这并不奇怪,刚才创建的主题没有Replicas,并且在服务器“0”上,我们创建它的时候,集群中只有一个服务器,所以是“0”。

Step 7: 测试集群的容错能力

7.1发布消息到集群

[root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic>cluster message 1>cluster message 2//Ctrl+C终止产生消息

7.2消费消息

[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topiccluster message 1cluster message 2//Ctrl+C终止消费消息

7.3干掉leader,测试集群容错   首先查询谁是leader

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic//所有分区的摘要Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs://提供一个分区信息,因为我们只有一个分区,所以只有一行。Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

可以看到​​Leader​​​的​​broker.id​​​为​​1​​​,找到对应的​​Broker​​

[root@administrator bin]# jps -m5130 Kafka ../config/server.properties4861 QuorumPeerMain ../config/zookeeper.properties1231 Bootstrap start start7420 Kafka ../config/server-2.properties7111 Kafka ../config/server-1.properties9139 Jps -m

通过以上查询到​​Leader​​​的​​PID​​​(​​Kafka ../config/server-1.properties​​​)为​​7111​​,杀掉该进程

//杀掉该进程kill -9 7111//再查询一下,确认新的Leader已经产生,新的Leader为broker.id=0[root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs://备份节点之一成为新的leader,而broker1已经不在同步备份集合里了Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2

7.4再次消费消息,确认消息没有丢失

[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topiccluster message 1cluster message 2

消息依然存在,故障转移成功!!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:二十、Centos 7 Linux创建项目启动脚本以及开机启动项目
下一篇:丁道师:嘀嗒出行的“护身符”和“紧箍咒”!
相关文章

 发表评论

暂时没有评论,来抢沙发吧~