mapreduce输出结果到mysql

网友投稿 313 2022-11-18

mapreduce输出结果到mysql

Mapreduce程序在reduce阶段执行后,默认会把结果输出到hdfs文件系统上(使用context.wirte()),如果要把结果写到mysql中,需要重写Writable、DBWritable这两个类中的方法。

以wordCount程序为例,下面讲解如何把word和count输出到mysql中。

1、首先在mysql中创建表test,结构如下test (id bigint(20),name varchar(200),w_count int(4))

2、把mysql驱动jar放到classpath下;

3、创建实体类,对应数据库中的表结构;同时实现接口Writable、DBWritable

package bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;public class Word implements Writable, DBWritable{ private Long id; private String name; private int wCount; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getwCount() { return wCount; } public void setwCount(int wCount) { this.wCount = wCount; } @Override public void readFields(ResultSet rs) throws SQLException { this.id = rs.getLong(1); this.name = rs.getString(2); this.wCount = rs.getInt(3); } @Override public void write(PreparedStatement ps) throws SQLException { //ps.setLong(1, this.id); ps.setString(1, this.name); ps.setInt(2, this.wCount); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readLong(); this.name = Text.readString(in); this.wCount = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.id); Text.writeString(out, this.name); out.writeInt(this.wCount); }}

注:id是自动生成,所以这里不需要ps.setLong(); 其实这里只需要重写前两个方法即可(readFields(ResultSet rs) 和write(PreparedStatement ps))

4、Map类(和其他map一样)

public static class TokenizerMapper extends Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

5、reduce类:

public static class SumReducer extends Reducer { public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } word.setName(key.toString()); word.setwCount(sum); context.write(word,null); } }

和普通的reduce略有不同。extends Reducer 4个参数,第一个是reduce输入的key类型(单词);第二个是reduce输入的value类型(单词的计数);第三个是输出的key类型,需要做修改,就是需要我们自己重写对数据库的操作,Word类是数据库表的POJO类,并且需要实现hadoop指定的接口;第四个是输出的value类型可以 为null,因为入库的操作都在key里完成了。

6、主方法:

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /*conf.set("fs.default.name", "**************"); conf.set("mapred.textoutputformat.separator", ","); conf.set("mapred.compress.map.output", "true"); conf.set("date", yesterday); */ DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/test","root", "root"); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); /* if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); }*/ Job job = new Job(conf, "word count 2 mysql"); job.setJarByClass(WordCount2DB.class); job.setMapperClass(TokenizerMapper.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); DBOutputFormat.setOutput(job, "test", "name","w_count"); /*job.setNumReduceTasks(8); job.waitForCompletion(true); */ System.exit(job.waitForCompletion(true) ? 0 : 1); }

注:这里job.setOutputFormatClass(DBOutputFormat.class); 需要制定成数据库的输出。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java NIO、BIO、AIO区别
下一篇:SpringBoot拦截器使用精讲
相关文章

 发表评论

暂时没有评论,来抢沙发吧~