大数据学习笔记-------------------(17_1)

网友投稿 253 2022-11-17

大数据学习笔记-------------------(17_1)

第17章 KAFKA 生产者与消费者实例

17.1 生产者实例

用Java客服端来创建一个发布和订阅消息的应用程序。Kafka生产者客服端由如下API组成。

17.1.1KafkaProducer API

KafkaProducerAPI的核心组成部分是"KafkaProducer"类。KafkaProducer类提供一个选项去连接一个kafka中间件,在这个结构内带有如下方法。

KafkaProducer类提供send方法异步发送信息到一个Topic。send()的特征如下:

producer.send(new ProducerRecord(topic, partition, key1, value1) , callback);

ProducerRecord: 生产者管理等待发送记录的缓冲器。

Callback:当记录已由服务器确认时,提供用户回调执行(null表示无回调)。​

KafkaProducer类提供flush方法来确保所有先前发送的消息都已实际完成。flush方法的语法如下:

public void flush()

KafkaProducer类提供partitionFor方法,该方法有助于获取给定Topic分区的元数据。也可用于用户自定义分区。该方法的特征如下:

public partitionsFor(string topic)

该方法返回topic的元数据。

KafkaProducer类提metrics方法,该方法被用于返回一个由生产者维护metrics的Map。该方法的特征如下:

public Map metrics()

该方法返回由生产者维护内部metrics的map

KafkaProducer类提供close方法,直到完成所有先前发送的请求:

public void close()

17.1.2 ProducerAPI

ProducerAPI核心部分是"Producer"类。Producer类提供一个选项去连接Kafka中间件,这个类提供方法如下。

生产者类

生产者类提供send方法发送信息到一个或多个topics。使用方法如下:

public void send(KeyedMessage message) - sends the data to a single topic,par-titioned by key using either sync or async producer.public void send(List> messages) - sends data to multiple topics.Properties prop = new Properties();prop.put(producer.type,”async”)ProducerConfig config = new ProducerConfig(prop);

生产者有两种类型:

Sync(同步)、

Async(异步)。

相同的API配置也适用于"Sync"生产者。Sync和Async两者的不同如下:一个Sync生产者直接发送信息,但在后台发送信息;当想要一个更高的吞吐量时,Async生产者是首选。在之前的版本像0.8,一个async生产者不能调用send()方法来寄存错误的handler。这种只能在当前0.9版本使用。

public void close()

Producer类提供close方法关闭所有brokers的producer 连接池。

17.1.3 配置设置

下表列出ProducerAPI的主要配置设置如:

17.1.4ProducerRecord API

ProducerRecord是被发送到Kafka集群的键值对(key/value)。ProducerRecord类是用于创建一个带有分区、键值对的record结构,结构类型如下:

public ProducerRecord (string topic, int partition, k key, v value)

Topic:用户定义的topic名称被追加到record中。

Partition:分区数量

Key:包含在record中的key

Value:Record内容

public ProducerRecord (string topic, k key, v value)

ProducerRecord类结构被用于创建一个带有键值对、不带有分区的record:

Topic:创建个Topic分配给record

Key:record的key

Value:record内容

public ProducerRecord (string topic, v value)

ProducerRecord类创建一个不带有分区和键的record

Topic:创建一个Topic

Value:record 内容

ProducerRecord类方法如下表所示:

17.1.5 简单生产者应用程序

在创建应用程序之前,首先启动zookeeper和Kafka中间件,然后用创建Topic命令在kafka broker中创建的Topic。创建一个名为"SimpleProducer.java"的Java类之后,键入如下代码:

import java.util.Properties;//import util.properties packagesimport org.apache.kafka.clients.producer.Producer;//import simple producer packagesimport org.apache.kafka.clients.producer.KafkaProducer;//import KafkaProducer packagesimport org.apache.kafka.clients.producer.ProducerRecord;//import ProducerRecord packagespublic class SimpleProducer { //Create java class named “SimpleProducer” public static void main(String[] args) throws Exception { if(args.length == 0) // Check arguments length value { System.out.println("Enter topic name”); return; } String topicName = args[0].toString(); //Assign topicName to string variable Properties props = new Properties(); // create instance for properties to access producer configs props.put("bootstrap.servers", “localhost:9092"); //Assign localhost id props.put("acks", “all"); //Set acknowledgements for producer requests. props.put("retries", 0); //If the request fails, the producer can automatically retry, props.put("batch.size", 16384); //Specify buffer size in config props.put("linger.ms", 1); //Reduce the no of requests less than 0 props.put("buffer.memory", 33554432); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer producer = new KafkaProducer(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully”); producer.close(); } }

编译:应用程序用下面的命令进行编译。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”*.java

执行:用下面的命令进行执行。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:.SimpleProducer

输出:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:ZMC408SCAN轴控光纤激光器加工
下一篇:关于BindingResult的使用总结及注意事项
相关文章

 发表评论

暂时没有评论,来抢沙发吧~