c语言sscanf函数的用法是什么
237
2022-11-14
(6)FlinkSQL将kafka数据写入到mysql方式一
这里不展开zookeeper、kafka安装配置
(1)首先需要启动zookeeper和kafka
(2)定义一个kafka生产者
package com.producers;import com.alibaba.fastjson.JSONObject;import com.pojo.Event;import com.pojo.WaterSensor;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.Random;/** * Created by lj on 2022-07-09. */public class Kafaka_Producer { public final static String bootstrapServers = "127.0.0.1:9092"; public static void main(String[] args) { Properties props = new Properties(); //设置Kafka服务器地址 props.put("bootstrap.servers", bootstrapServers); //设置数据key的序列化处理类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //设置数据value的序列化处理类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer
(3)定义一个消息对象
package com.pojo;import java.io.Serializable;/** * Created by lj on 2022-07-05. */public class WaterSensor implements Serializable { private String id; private long ts; private int vc; public WaterSensor(){ } public WaterSensor(String id,long ts,int vc){ this.id = id; this.ts = ts; this.vc = vc; } public int getVc() { return vc; } public void setVc(int vc) { this.vc = vc; } public String getId() { return id; } public void setId(String id) { this.id = id; } public long getTs() { return ts; } public void setTs(long ts) { this.ts = ts; }}
(4)从kafka接入数据,并写入到mysql
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //读取kafka的数据 Properties properties = new Properties(); properties.setProperty("bootstrap.servers","127.0.0.1:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamSource
(5)效果演示
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~