c语言sscanf函数的用法是什么
324
2022-11-25
使用MapReducer将文件写入mysql 数据库
自定义类
package DBOutFormat;
import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;
public class MysqlDBOutPutFormat implements DBWritable, Writable {
private String address ; private String type ; private String name ; private String divce ;
public MysqlDBOutPutFormat(){}
public MysqlDBOutPutFormat(String address,String type,String name,String divce){ this.address = address ; this.type = type ; this.name = name ; this.divce = divce ; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,address); statement.setString(2,type); statement.setString(3,name); statement.setString(4,divce); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.address = resultSet.getString(1); this.type = resultSet.getString(2); this.name = resultSet.getString(3); this.divce = resultSet.getString(4); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(address); out.writeUTF(type); out.writeUTF(name); out.writeUTF(divce); } @Override public void readFields(DataInput in) throws IOException { this.address = in.readUTF() ; this.type = in.readUTF() ; this.name = in.readUTF() ; this.divce = in.readUTF() ; }
}
mapreducer 示例代码
package DBOutFormat;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class reduce {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String input = "data1/mysql.txt" ;
final Configuration co = new Configuration() ;
DBConfiguration.configureDB(co,
"com.mysql.jdbc.Driver",
"jdbc:mysql://11.11.11.2:3306/su?characterEncoding=UTF-8",
"root",
"root"
); //获取 Job 对象
final Job job = Job.getInstance(co);
//设置class
job.setJarByClass(reduce.class);
//设置mapper 和 Reduce
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//设置 Mapper 阶段输出数据的key 和value
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer 阶段输出数据的key 和value
job.setOutputKeyClass(MysqlDBOutPutFormat.class);
job.setOutputValueClass(NullWritable.class);
//设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(input));
//job输出发生变化 ,不能使用默认的 Fileoutputformat
job.setOutputFormatClass(DBOutputFormat.class);
String[] fields = {"address","type","name","divce"};
DBOutputFormat.setOutput(job,"zyplc",fields);
//提交 job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
public static class MyMapper extends Mapper
}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~