c语言sscanf函数的用法是什么
341
2022-11-28
【MapReduce】美国新冠疫情案例集 (MR与数据库交互)
文章目录
美国新冠疫情病例数统计
1.数据字段说明2.案例一:各州累计病例数量统计
Step 1:分析Step 2:准备工作 --- 远程数据库备份数据
part 1:远程创建数据库及表part 2:数据库表对应的封装数据库记录Bean类part 3:编写Mapper类,处理读取的数据part 4:编写Reducer类,输出part 5:编写Driver驱动类part 6:结果查看
Step 3:封装数据库记录Bean类(读取时用)Step 4:封装数据库记录Bean类(结果存储时用)Step 5:编写Mapper类,传递< 州,记录(Bean对象) >Step 6:编写Reduce类,进行各州病例统计Step 7:编写Driver驱动类Step 8:查看结果
3.案例二:各州累计病例数量倒排统计
Step 1:分析 ---- 排序Step 2:封装数据库记录Bean类(读取、存储使用)Step 3:编写Mapper类,进行数据转移Step 4:编写Reducer类,直接输出Step 5:编写Driver驱动类Step 6:结果查看
4.案例三:各州累计病例分区统计
Step 1:分析 ---- 分区Step 2:封装数据库记录Bean类(读取)Step 3:编写Mapper类,获取数据Step 4:编写Reducer类,输出数据Step 5:编写StatePartition分区类Step 6:编写Driver驱动类Step 7:查看结果
5.案例四:各州累计病例数最多Top1的县信息
Step 1:分析 ---- 分组Step 2:封装数据库记录Bean类(读取)Step 3:封装数据库记录Bean类 (存储结果)Step 4:编写Mapper类,传出数据Step 5:编写Comparator类,对数据进行分组Step 6:编写Reducer类,只输出第一条记录Step 7:编写Driver驱动类Step 8:查看结果
6.案例五:各州累计病例数最多TopN的县信息
Step 1:修改上个案例Reducer类输出前N个Step 2:解析
美国新冠疫情病例数统计
1.数据字段说明
部分数据集展示:
date,county,state,fips,cases,deaths2020-01-21,Snohomish,Washington,53061.0,1,0.02020-01-22,Snohomish,Washington,53061.0,1,0.02020-01-23,Snohomish,Washington,53061.0,1,0.02020-01-24,Cook,Illinois,17031.0,1,0.02020-01-24,Snohomish,Washington,53061.0,1,0.02020-01-25,Orange,California,6059.0,1,0.02020-01-25,Cook,Illinois,17031.0,1,0.02020-01-25,Snohomish,Washington,53061.0,1,0.02020-01-26,Maricopa,Arizona,4013.0,1,0.02020-01-26,Los Angeles,California,6037.0,1,0.02020-01-26,Orange,California,6059.0,1,0.02020-01-26,Cook,Illinois,17031.0,1,0.0.............................................2020-04-03,Nash,North Carolina,37127.0,14,0.02020-04-03,New Hano
返回顶部
2.案例一:各州累计病例数量统计
Step 1:分析
返回顶部
Step 2:准备工作 — 远程数据库备份数据
之后的方式都将采用远程数据库的形式进行数据访问与结果输出,所以要先将数据集存入远程数据库中(这里远程是在linux系统下进行)
part 1:远程创建数据库及表
CREATE TABLE `usa` ( `date` varchar(255) DEFAULT NULL, `country` varchar(255) DEFAULT NULL, `state` varchar(255) DEFAULT NULL, `fips` varchar(255) DEFAULT NULL, `cases` int(255) DEFAULT NULL, `deaths` int(255) DEFAULT NULL);
part 2:数据库表对应的封装数据库记录Bean类
import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { // 定义记录字段 (一定保持和数据库中的字段名称、类型一致) private String date; private String country; private String state; private String fips; private int cases; private int deaths; // 重写toString() @Override public String toString() { return "BeanSQL{" + "date='" + date + '\'' + ", country='" + country + '\'' + ", state='" + state + '\'' + ", fips='" + fips + '\'' + ", cases=" + cases + ", deaths=" + deaths + '}'; } // 自定义set赋值 public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } // 数据库写入 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } // 数据库读取 @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } // set、get方法集 public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}
返回顶部
part 3:编写Mapper类,处理读取的数据
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MapSQL extends Mapper
返回顶部
part 4:编写Reducer类,输出
import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ReduceSQL extends Reducer
返回顶部
part 5:编写Driver驱动类
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class DriverSQL { public static void main(String[] args) { try { // 创建job Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", // 数据库连接驱动 "jdbc:mysql://127.0.0.1:3306/school", // 配置远程数据库地址 "root","123456" // 用户、密码 ); Job job = Job.getInstance(conf); // 配置Map、reduce、driver类 job.setMapperClass(MapSQL.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(BeanSQL.class); job.setReducerClass(ReduceSQL.class); job.setOutputKeyClass(BeanSQL.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(DriverSQL.class); // 设置输入文件路径(本地文件) Path input = new Path( "G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\data\\us_counties_covid19_daily.csv" ); FileInputFormat.setInputPaths(job,input); // 设置输出目标为远程数据库(目标存储地) job.setOutputFormatClass(DBOutputFormat.class); // String[] fields = {"date","country","state","fips","cases","deaths"}; // job,表名,字段(也可以将fields传入) DBOutputFormat.setOutput(job,"usa","date","country","state","fips","cases","deaths"); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e) { e.printStackTrace(); } }}
part 6:结果查看
运行成功后,到虚拟机中查看数据库中表的情况,如下图所示,已成功转入!
返回顶部
Step 3:封装数据库记录Bean类(读取时用)
import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { // 定义记录字段 (一定保持和数据库中的字段名称、类型一致) private String date; private String country; private String state; private String fips; private int cases; private int deaths; // 重写toString() @Override public String toString() { return "BeanSQL{" + "date='" + date + '\'' + ", country='" + country + '\'' + ", state='" + state + '\'' + ", fips='" + fips + '\'' + ", cases=" + cases + ", deaths=" + deaths + '}'; } // 自定义set赋值 public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } // 数据库写入 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } // 数据库读取 @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } // set、get方法集 public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}
返回顶部
Step 4:封装数据库记录Bean类(结果存储时用)
结果创建存储表
CREATE TABLE `state_count` ( `state` varchar(255) DEFAULT NULL, `total_c` int(255) DEFAULT NULL, `total_d` int(255) DEFAULT NULL);
import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class Bean1 implements WritableComparable
返回顶部
Step 5:编写Mapper类,传递< 州,记录(Bean对象) >
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Map1 extends Mapper
返回顶部
Step 6:编写Reduce类,进行各州病例统计
import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce1 extends Reducer
返回顶部
Step 7:编写Driver驱动类
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Driver1 { public static void main(String[] args) { try { // job,conf配置 Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); // 获取job Job job = Job.getInstance(conf); // todo:jar包集群运行时需要指定(从hdfs读取数据) // conf.set("fs.defaultFS","hdfs://192.168.64.178:9000"); // 配置类的关联 job.setMapperClass(Map1.class); job.setReducerClass(Reduce1.class); job.setJarByClass(Driver1.class); // 配置数据各阶段输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BeanSQL.class); job.setOutputKeyClass(Bean1.class); job.setOutputValueClass(NullWritable.class); // 配置输入输出文件路径 // todo:方式一:从本地读取数据然后输出结果到本地的路径设置 // Path in = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\data\\us_counties_covid19_daily.csv"); // Path out = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\output"); // 删除已存在文件 // FileSystem fs = FileSystem.get(conf); // if (fs.exists(out)){ // fs.delete(out,true); // } // todo:方式二:从hdfs获取数据然后再输出结果到hdfs的路径设置 // Path in = new Path("/MR/usayiqing/us_counties_covid19_daily.csv"); // Path out = new Path("/MR/usayiqing/output"); // FileInputFormat.setInputPaths(job,in); // FileOutputFormat.setOutputPath(job,out); // todo:方式三:从远程数据库中获取数据,处理然后输出 job.setInputFormatClass(DBInputFormat.class); String[] fields1 = {"date","country","state","fips","cases","deaths"}; String[] fields2 = {"state","total_c","total_d"}; DBInputFormat.setInput(job, BeanSQL.class,"usa",null,"",fields1); DBOutputFormat.setOutput(job,"state_count",fields2); // 提交job boolean result = job.waitForCompletion(true); System.out.println(result ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}
返回顶部
Step 8:查看结果
返回顶部
3.案例二:各州累计病例数量倒排统计
Step 1:分析 ---- 排序
返回顶部
Step 2:封装数据库记录Bean类(读取、存储使用)
注意:虽然读取存储时都使用同一个Bean类,但是真正意义上读取、写入的数据库表示不一样的,这里我们直接使用上面统计好的数据进行排序~
创建结果存贮表
CREATE TABLE `state_count_sort` ( `state` varchar(255) DEFAULT NULL, `total_c` int(255) DEFAULT NULL, `total_d` int(255) DEFAULT NULL);
package 美国疫情.州排序;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class Bean2 implements WritableComparable
返回顶部
Step 3:编写Mapper类,进行数据转移
package 美国疫情.州排序;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// 这里的两个bean2类型相同,但是对应的数据库表不同public class Map2 extends Mapper
返回顶部
Step 4:编写Reducer类,直接输出
package 美国疫情.州排序;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce2 extends Reducer
返回顶部
Step 5:编写Driver驱动类
package 美国疫情.州排序;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;public class Driver2 { public static void main(String[] args) { try { // 获取job Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); Job job = Job.getInstance(conf); // 配置map、reduce、driver类 job.setMapperClass(Map2.class); job.setMapOutputKeyClass(Bean2.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(Reduce2.class); job.setOutputKeyClass(Bean2.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(Driver2.class); // 配置数据输入输出类型 --- 数据库 job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); String[] field1 = {"state","total_c","total_d"}; String[] field2 = {"state","total_c","total_d"}; DBInputFormat.setInput(job,Bean2.class,"state_count",null,"",field1); DBOutputFormat.setOutput(job,"state_count_sort",field2); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}
返回顶部
Step 6:结果查看
返回顶部
4.案例三:各州累计病例分区统计
Step 1:分析 ---- 分区
Step 2:封装数据库记录Bean类(读取)
package 美国疫情.州分区;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { private String date; private String country; private String state; private String fips; private int cases; private int deaths; public BeanSQL() { } @Override public String toString() { return date + '\t' + country + '\t' + state + '\t' + fips + '\t' + cases + '\t' + deaths ; } public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}
返回顶部
Step 3:编写Mapper类,获取数据
package 美国疫情.州分区;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Map3 extends Mapper
返回顶部
Step 4:编写Reducer类,输出数据
package 美国疫情.州分区;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class StatePartition extends Partitioner
返回顶部
Step 5:编写StatePartition分区类
package 美国疫情.州分区;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce3 extends Reducer
返回顶部
Step 6:编写Driver驱动类
package 美国疫情.州分区;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class Driver3 { public static void main(String[] args) { try { // 获取job Configuration conf = new Configuration(); // 配置数据库 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); Job job = Job.getInstance(conf); // 配置map、reduce、driver job.setMapperClass(Map3.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BeanSQL.class); job.setReducerClass(Reduce3.class); job.setOutputKeyClass(BeanSQL.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(Driver3.class); // 配置分区类 job.setPartitionerClass(StatePartition.class); job.setNumReduceTasks(7); // 配置数据输入输出 job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); String[] fields = {"date","country","state","fips","cases","deaths"}; DBInputFormat.setInput(job,BeanSQL.class,"usa",null,"",fields); FileOutputFormat.setOutputPath(job,new Path("G:/Cache/MR/output_usa")); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}
返回顶部
Step 7:查看结果
这里我们选择直接将数据输出到本地,不然需要创建多个数据库表。
注意:
返回顶部
5.案例四:各州累计病例数最多Top1的县信息
Step 1:分析 ---- 分组
返回顶部
Step 2:封装数据库记录Bean类(读取)
package 美国疫情.州分区;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { private String date; private String country; private String state; private String fips; private int cases; private int deaths; public BeanSQL() { } @Override public String toString() { return date + '\t' + country + '\t' + state + '\t' + fips + '\t' + cases + '\t' + deaths ; } public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}
返回顶部
Step 3:封装数据库记录Bean类 (存储结果)
创建数据库用于存储结果数据集
mysql> create table state_top -> ( -> state varchar(255), -> country varchar(255), -> cases int(255) -> );Query OK, 0 rows affected (0.01 sec)
package 美国疫情.州Top;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class TopBean implements DBWritable, WritableComparable
返回顶部
Step 4:编写Mapper类,传出数据
package 美国疫情.州Top;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Map4 extends Mapper
返回顶部
Step 5:编写Comparator类,对数据进行分组
package 美国疫情.州Top;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * todo 自定义分组类 */public class Comparator extends WritableComparator { protected Comparator() { super(TopBean.class, true); } /** * todo 重写compare方法 * @param a * @param b * @return */ @Override public int compare(WritableComparable a, WritableComparable b) { // 类型转换 TopBean abean = (TopBean) a; TopBean bbean = (TopBean) b; // todo 判断规则 // 只要state信息一样,就分到同一组 ---- 同一个ReduceTask处理 // 利用compareTo方法判断,只要州信息一样(返回0),就认为是key相同 return abean.getState().compareTo(bbean.getState()); }}
返回顶部
Step 6:编写Reducer类,只输出第一条记录
package 美国疫情.州Top;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce4 extends Reducer
返回顶部
Step 7:编写Driver驱动类
package 美国疫情.州Top;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;public class Driver4 { public static void main(String[] args) { try { // 获取job Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); Job job = Job.getInstance(conf); // 配置map、reduce、driver、Comparator job.setMapperClass(Map4.class); job.setMapOutputKeyClass(TopBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(Reduce4.class); job.setOutputKeyClass(TopBean.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(Driver4.class); // 配置分组类 job.setGroupingComparatorClass(Comparator.class); // 配置输入输出 job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); String[] fields1 = {"date","country","state","fips","cases","deaths"}; String[] fields2 = {"state","country","cases"}; DBInputFormat.setInput(job,BeanSQL.class,"usa",null,"",fields1); DBOutputFormat.setOutput(job,"state_top",fields2); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}
返回顶部
Step 8:查看结果
返回顶部
6.案例五:各州累计病例数最多TopN的县信息
Step 1:修改上个案例Reducer类输出前N个
创建表存储结果
mysql> create table state_topn -> ( -> state varchar(255), -> country varchar(255), -> cases int(255) -> );Query OK, 0 rows affected (0.07 sec)
修改Reducer类
package 美国疫情.州Top;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce5 extends Reducer
查看结果:
返回顶部
Step 2:解析
返回顶部
相关知识点内容参考:https://bilibili.com/video/BV1Tf4y167U8?p=30
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~