java系统找不到指定文件怎么解决
260
2022-11-17
Flink常用API之Kafka的Source
基于 Kafka 的 Source 首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , 另 外 更 多 的 连 接 器 可 以 查 看 官 网 :
第一种:读取 Kafka 中的普通数据(String)
package sourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties/** * @Author yqq * @Date 2021/12/25 15:35 * @Version 1.0 */object KafkaSource { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment environment.setParallelism(1) import org.apache.flink.streaming.api.scala._ //连接kafka,且kafaka中数据为字符串 val properties = new Properties() properties.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092") properties.setProperty("group.id","flink01") properties.setProperty("key.deserializer",classOf[StringDeserializer].getName) properties.setProperty("value.deserializer",classOf[StringDeserializer].getName) properties.setProperty("auto.offset.reset","latest") environment.addSource(new FlinkKafkaConsumer[String]("topic_01",new SimpleStringSchema(),properties)) .print() environment.execute() }}
package sourceimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties/** * @Author yqq * @Date 2021/12/25 16:37 * @Version 1.0 */import org.apache.flink.streaming.api.scala._object KafkaSourceKeyValue { //2、导入隐式转换 def main(args: Array[String]): Unit = { //1、初始化Flink流计算的环境 val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //修改并行度 streamEnv.setParallelism(1) //默认所有算子的并行度为1 //连接Kafka的属性 val props = new Properties() props.setProperty("bootstrap.servers","hadoop101:9092,hadoop102:9092,hadoop103:9092") props.setProperty("group.id","flink01") props.setProperty("key.deserializer",classOf[StringDeserializer].getName) props.setProperty("value.deserializer",classOf[StringDeserializer].getName) props.setProperty("auto.offset.reset","latest") //设置Kafka数据源 val stream: DataStream[(String, String)] = streamEnv.addSource(new FlinkKafkaConsumer[(String,String)]("t_bjsxt",new MyKafkaReader,props)) stream.print() streamEnv.execute() } //自定义一个类,从Kafka中读取键值对的数据 class MyKafkaReader extends KafkaDeserializationSchema[(String,String)]{ //是否流结束 override def isEndOfStream(nextElement: (String, String)): Boolean = { false } //反序列化 override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { if(record!=null){ var key="null" var value="null" if(record.key()!=null){ key =new String(record.key(),"UTF-8") } if(record.value()!=null){ //从Kafka记录中得到Value value =new String(record.value(),"UTF-8") } (key,value) }else{//数据为空 ("null","nulll") } } //指定类型 override def getProducedType: TypeInformation[(String, String)] ={ createTuple2TypeInformation(createTypeInformation[String],createTypeInformation[String]) } }}
定义生产者
package sourceimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import java.util.{Properties, Random}/** * @Author yqq * @Date 2021/12/25 17:12 * @Version 1.0 */object MykafkaProducer { def main(args: Array[String]): Unit = { //连接Kafka的属性 val props = new Properties() props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092") props.setProperty("key.serializer",classOf[StringSerializer].getName) props.setProperty("value.serializer",classOf[StringSerializer].getName) var producer =new KafkaProducer[String,String](props) var r =new Random() while(true){ //死循环生成键值对的数据 val data = new ProducerRecord[String,String]("topic_01","key"+r.nextInt(10),"value"+r.nextInt(100)) producer.send(data) Thread.sleep(1000) } producer.close() }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~