Docker部署kafka|Go操作实践

网友投稿 267 2022-11-27

Docker部署kafka|Go操作实践

前言

写作本文的背景是由于字节的暑期青训营中,某个项目要求编写一个简易的流处理引擎(flink),开发语言不限,推荐Java,本着好奇心的驱使,我打算使用Go语言进行部分尝试。

既然是流处理引擎,那么首先需要有流式的数据源,一般而言,flink会配合从kafka中获取数据流,先不考虑后续编写引擎的部分,本文将着重于kafka的部署,并且后半段将给出使用Go语言编写kafka的生产者和消费者。

如果你只是希望完成kafka的部署,而不想局限于Go语言,只需要着重阅读文章的前半部分,后文的Go语言操作部分可以给你提供一些思路,你只需要找寻适合语言如Java的kafka client库去完成生产者和消费者的编写即可。

部署kafka

docker前置知识

下文的实践需要你拥有基本的docker操作能力,如果未曾掌握docker知识点

docker-compose

编写docker-compose.yml,通过docker容器部署单节点kafka

version: '3'services: zookeeper: image: wurstmeister/zookeeper:3.4.6 volumes: - ./zookeeper_data:/opt/zookeeper-3.4.6/data container_name: zookeeper ports: - "10002:2181" - "10003:2182" restart: always kafka: image: wurstmeister/kafka container_name: kafka_01 depends_on: - zookeeper ports: - "10004:9092" volumes: - ./kafka_log:/kafka environment: - KAFKA_BROKER_NO=0 - KAFKA_BROKER_ID=0 - KAFKA_LISTENERS=PLAINTEXT://kafka_01:9092 # kafka tcp 侦听的ip - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器ip:10004 # kafka broker侦听的ip - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M restart: always # kafka集群管理面板 kafka_manager: image: sheepkiller/kafka-manager ports: - "10005:9000" environment: - ZK_HOSTS=zookeeper:2181 depends_on: - zookeeper - kafka restart: always

后台运行

docker-compose up -d

​​docker ps​​命令查看容器是否启动成功

通过上述docker-compose.yml部署会运行三个容器,选择进入kafka容器

docker exec -it kafka容器id /bin/bash# 进入kafka目录cd

在容器内创建​​topic​​​,​​topic​​​是kafka中数据管理的基本单位,或者说集合,每一个​​topic​​​可以管理多个​​partition​​​,编码操作时:你可以往对应kafka服务器​​ip+port+topic+partition​​去发送和读取数据。

bin/kafka-topics.sh --create --zookeeper 服务器ip:2181 --replication-factor 1 -partitions 1 --topic test

业务编写

Go语言中连接kafka使用第三方库: github.com/Shopify/sarama

go get github.com/segmentio/kafka-go

sarama库的简易操作可以参照文档(消费者的编写文档中有坑):​​文档地址​​

如下使用kafka client库进行编码所涉及的API操作比较简单,流程上或许不够规范,请酌情参考。

producer

​​文档中​​生产者只发送了一条数据后就会关闭,这里我改成了每秒钟发送一次。

func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 构造一个消息 msg := &sarama.ProducerMessage{} msg.Topic = "test" // 连接kafka client, err := sarama.NewSyncProducer([]string{"82.156.171.8:10004"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 发送消息 for { time.Sleep(time.Second * 1) msg.Value = sarama.StringEncoder("this is a test log") pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }}

consumer

​​文档中​​消费者虽然开启了Go协程(类比于Java的线程)去读取kafka的数据,但是由于主程序执行顺序执行完毕后,子协程也会终止,导致子协程还没有读取成功/打印数据,整个程序就已经关闭运行了。

因此我做了一些改动,在子协程退出之前,保持主程序不会退出(使用Go语言的WaitGroup),如果简单粗暴在main函数末尾设置一个很长的程序sleep时间,也是可以实现打印输出的。

func main() { consumer, err := sarama.NewConsumer([]string{"82.156.171.8:10004"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions("test") // 根据topic取到所有的分区 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println("list = ", partitionList, len(partitionList)) var wg sync.WaitGroup for partition := range partitionList { // 遍历所有的分区 wg.Add(1) // 针对每个分区创建一个对应的分区消费者 pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() go func(sarama.PartitionConsumer, *sync.WaitGroup) { for msg := range pc.Messages() { //fmt.Println("打印信息") fmt.Println("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) } wg.Done() }(pc, &wg) } wg.Wait()}

生产&消费

确保kafka容器正常运行,kafka服务器防火墙端口正常开放,运行消费者程序,运行生产者程序。这个生产者每秒向kafka发送一条测试数据:​​this is a test log​​,你也可以添加上程序运行时间进行测试。

事实上被客户端消费后的数据并没有马上从kafka删除,这里不多做介绍,各位自行了解~

小结

本文讲解了使用​​docker-compose​​​部署单节点kafka的流程,后续通过修改​​docker-compose.yml​​的内容也可以实现kafka集群的部署,并且,在较新版本的kafka中,集群的部署可以脱离zookeeper,但是经过了解,由于功能并不完善,这里还是选择了基于zookeeper的部署。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:流式计算中的 Window 计算|青训营笔记
下一篇:Java Spring事务的隔离级别详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~