c语言sscanf函数的用法是什么
253
2022-11-17
大数据学习笔记-------------------(17_1)
第17章 KAFKA 生产者与消费者实例
17.1 生产者实例
用Java客服端来创建一个发布和订阅消息的应用程序。Kafka生产者客服端由如下API组成。
17.1.1KafkaProducer API
KafkaProducerAPI的核心组成部分是"KafkaProducer"类。KafkaProducer类提供一个选项去连接一个kafka中间件,在这个结构内带有如下方法。
KafkaProducer类提供send方法异步发送信息到一个Topic。send()的特征如下:
producer.send(new ProducerRecord
ProducerRecord: 生产者管理等待发送记录的缓冲器。
Callback:当记录已由服务器确认时,提供用户回调执行(null表示无回调)。
KafkaProducer类提供flush方法来确保所有先前发送的消息都已实际完成。flush方法的语法如下:
public void flush()
KafkaProducer类提供partitionFor方法,该方法有助于获取给定Topic分区的元数据。也可用于用户自定义分区。该方法的特征如下:
public partitionsFor(string topic)
该方法返回topic的元数据。
KafkaProducer类提metrics方法,该方法被用于返回一个由生产者维护metrics的Map。该方法的特征如下:
public Map metrics()
该方法返回由生产者维护内部metrics的map
KafkaProducer类提供close方法,直到完成所有先前发送的请求:
public void close()
17.1.2 ProducerAPI
ProducerAPI核心部分是"Producer"类。Producer类提供一个选项去连接Kafka中间件,这个类提供方法如下。
生产者类
生产者类提供send方法发送信息到一个或多个topics。使用方法如下:
public void send(KeyedMessage
生产者有两种类型:
Sync(同步)、
Async(异步)。
相同的API配置也适用于"Sync"生产者。Sync和Async两者的不同如下:一个Sync生产者直接发送信息,但在后台发送信息;当想要一个更高的吞吐量时,Async生产者是首选。在之前的版本像0.8,一个async生产者不能调用send()方法来寄存错误的handler。这种只能在当前0.9版本使用。
public void close()
Producer类提供close方法关闭所有brokers的producer 连接池。
17.1.3 配置设置
下表列出ProducerAPI的主要配置设置如:
17.1.4ProducerRecord API
ProducerRecord是被发送到Kafka集群的键值对(key/value)。ProducerRecord类是用于创建一个带有分区、键值对的record结构,结构类型如下:
public ProducerRecord (string topic, int partition, k key, v value)
Topic:用户定义的topic名称被追加到record中。
Partition:分区数量
Key:包含在record中的key
Value:Record内容
public ProducerRecord (string topic, k key, v value)
ProducerRecord类结构被用于创建一个带有键值对、不带有分区的record:
Topic:创建个Topic分配给record
Key:record的key
Value:record内容
public ProducerRecord (string topic, v value)
ProducerRecord类创建一个不带有分区和键的record
Topic:创建一个Topic
Value:record 内容
ProducerRecord类方法如下表所示:
17.1.5 简单生产者应用程序
在创建应用程序之前,首先启动zookeeper和Kafka中间件,然后用创建Topic命令在kafka broker中创建的Topic。创建一个名为"SimpleProducer.java"的Java类之后,键入如下代码:
import java.util.Properties;//import util.properties packagesimport org.apache.kafka.clients.producer.Producer;//import simple producer packagesimport org.apache.kafka.clients.producer.KafkaProducer;//import KafkaProducer packagesimport org.apache.kafka.clients.producer.ProducerRecord;//import ProducerRecord packagespublic class SimpleProducer { //Create java class named “SimpleProducer” public static void main(String[] args) throws Exception { if(args.length == 0) // Check arguments length value { System.out.println("Enter topic name”); return; } String topicName = args[0].toString(); //Assign topicName to string variable Properties props = new Properties(); // create instance for properties to access producer configs props.put("bootstrap.servers", “localhost:9092"); //Assign localhost id props.put("acks", “all"); //Set acknowledgements for producer requests. props.put("retries", 0); //If the request fails, the producer can automatically retry, props.put("batch.size", 16384); //Specify buffer size in config props.put("linger.ms", 1); //Reduce the no of requests less than 0 props.put("buffer.memory", 33554432); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer
编译:应用程序用下面的命令进行编译。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”*.java
执行:用下面的命令进行执行。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:.SimpleProducer
输出:
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~