kafka系列--消费

网友投稿 225 2022-11-22

kafka系列--消费

public String title;     public  ConsumerRecords records;          public KafkaConsumerSimple(String title, ConsumerRecords records) {         this.title = title;         this.records = records;     }     @Override     public void run() {         System.out.println("开始运行 " + title);         for (ConsumerRecord record : records) {             if(record!=null){                 String topic = record.topic();                 int partition = record.partition();                                  long offset = record.offset();                 String msg = new String(record.value());                 String key=new String(record.key());                 //System.out.println(String.format(                         "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s],key:[%s]",                         title, topic, partition, offset, msg,key));             }         }         //System.out.println(String.format("Consumer: [%s] exiting ...", title));     }     public static void main(String[] args) {         Properties properties = new Properties();           properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupname");         //默认自动提交           //properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG , "false");         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,                 "ip:port,ip:port");         /**          * earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。          * latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。          * none 当该topic下所有分区中存在未提交的offset时,抛出异常。          */         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         /**          * consumer向zookeeper提交offset的频率,单位是秒          */         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");         /**          * RoundRobin策略有两个前提条件必须满足:          * 同一个Consumer Group里面的所有消费者的num.streams必须相等;          * 每个消费者订阅的主题必须相同          *          * Range 均分          */         //properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range");         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");         String topic = "test";         TopicPartition partition0 = new TopicPartition(topic, 0);         TopicPartition partition1 = new TopicPartition(topic, 1);         KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);           kafkaConsumer.subscribe(Arrays.asList(topic));         //指定分区消费         //kafkaConsumer.assign(Arrays.asList(partition0, partition1));         boolean isRunning = true;         //创建一个容量3的线程池         ExecutorService executor = Executors.newFixedThreadPool(3);         int index=0;         while(isRunning) {             ++index;             ConsumerRecords records = kafkaConsumer.poll(Long.MAX_VALUE);             executor.execute(new KafkaConsumerSimple("消费者" + (index), records));         }         kafkaConsumer.close();     }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java使用Graphics2D绘图/画图方式
下一篇:大话数据--商业
相关文章

 发表评论

暂时没有评论,来抢沙发吧~