大数据学习笔记-------------------(17_3)

网友投稿 241 2022-11-17

大数据学习笔记-------------------(17_3)

17.3 消费者群例子

消费群是多线程或多机器接收KafkaTopic。

17.3.1 消费者群

Ø  消费者可以通过使用相同的“group.id”来加入组。

Ø  组的最大并行数目是组中消费者数<=分区数。

Ø  Kafka将Topic分区分配给组中的消费者,以便每个分区都由组中的一个消费者使用。

Ø  Kafka保证消息只能被组中的一个消费者读取。

Ø  消费者可以按照消息存储在日志中的顺序查看消息。

17.3.2消费者重现平衡

添加更多进程/线程将导致Kafka重新平衡。如果任何消费者或broker无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。在重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程:

import java.util.Properties;import java.util.Arrays;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.ConsumerRecord;public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2) { System.out.println("Usage: consumer "); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords records = con-sumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } }}

编译:应用程序用下面的命令进行编译。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” ConsumerGroup.java

执行:用下面的命令进行执行。

java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup my-groupjava -cp"/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.ConsumerGroup my-group

输入:打开生产者CLI,发送像下面的信息

Test consumer group 01Test consumer group 02

第一个进程输出:

Subscribed to topic Hello-kafkaoffset = 3, key = null, value = Test consumergroup 01

第二个进程输出:

Subscribed to topic Hello-kafkaoffset = 3, key = null, value = Test consumergroup 02

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:大数据学习笔记-------------------(17_2)
下一篇:@valid&nbsp;无法触发BindingResult的解决
相关文章

 发表评论

暂时没有评论,来抢沙发吧~