使用MapReducer将文件写入mysql 数据库

网友投稿 324 2022-11-25

使用MapReducer将文件写入mysql 数据库

自定义类

package DBOutFormat;

import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;

public class MysqlDBOutPutFormat implements DBWritable, Writable {

private String address ; private String type ; private String name ; private String divce ;

public MysqlDBOutPutFormat(){}

public MysqlDBOutPutFormat(String address,String type,String name,String divce){ this.address = address ; this.type = type ; this.name = name ; this.divce = divce ; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,address); statement.setString(2,type); statement.setString(3,name); statement.setString(4,divce); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.address = resultSet.getString(1); this.type = resultSet.getString(2); this.name = resultSet.getString(3); this.divce = resultSet.getString(4); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(address); out.writeUTF(type); out.writeUTF(name); out.writeUTF(divce); } @Override public void readFields(DataInput in) throws IOException { this.address = in.readUTF() ; this.type = in.readUTF() ; this.name = in.readUTF() ; this.divce = in.readUTF() ; }

}

mapreducer 示例代码

package DBOutFormat;

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class reduce {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

String input = "data1/mysql.txt" ; final Configuration co = new Configuration() ; DBConfiguration.configureDB(co, "com.mysql.jdbc.Driver", "jdbc:mysql://11.11.11.2:3306/su?characterEncoding=UTF-8", "root", "root" ); //获取 Job 对象 final Job job = Job.getInstance(co); //设置class job.setJarByClass(reduce.class); //设置mapper 和 Reduce job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置 Mapper 阶段输出数据的key 和value job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //设置Reducer 阶段输出数据的key 和value job.setOutputKeyClass(MysqlDBOutPutFormat.class); job.setOutputValueClass(NullWritable.class); //设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(input)); //job输出发生变化 ,不能使用默认的 Fileoutputformat job.setOutputFormatClass(DBOutputFormat.class); String[] fields = {"address","type","name","divce"}; DBOutputFormat.setOutput(job,"zyplc",fields); //提交 job final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } public static class MyMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key,value); } } public static class MyReducer extends Reducer{ @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { String[] info = value.toString().split(",") ; if(info.length==4){ context.write(new MysqlDBOutPutFormat(info[0].trim(),info[1].trim(),info[2].trim(),info[3].trim()),NullWritable.get()); } } } }

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:深入理解Java设计模式之适配器模式
下一篇:MapReducer 中MapJoin示例
相关文章

 发表评论

暂时没有评论,来抢沙发吧~