【MapReduce】美国新冠疫情案例集 (MR与数据库交互)

网友投稿 341 2022-11-28

【MapReduce】美国新冠疫情案例集 (MR与数据库交互)

文章目录

​​美国新冠疫情病例数统计​​

​​1.数据字段说明​​​​2.案例一:各州累计病例数量统计​​

​​Step 1:分析​​​​Step 2:准备工作 --- 远程数据库备份数据​​

​​part 1:远程创建数据库及表​​​​part 2:数据库表对应的封装数据库记录Bean类​​​​part 3:编写Mapper类,处理读取的数据​​​​part 4:编写Reducer类,输出​​​​part 5:编写Driver驱动类​​​​part 6:结果查看​​

​​Step 3:封装数据库记录Bean类(读取时用)​​​​Step 4:封装数据库记录Bean类(结果存储时用)​​​​Step 5:编写Mapper类,传递< 州,记录(Bean对象) >​​​​Step 6:编写Reduce类,进行各州病例统计​​​​Step 7:编写Driver驱动类​​​​Step 8:查看结果​​

​​3.案例二:各州累计病例数量倒排统计​​

​​Step 1:分析 ---- 排序​​​​Step 2:封装数据库记录Bean类(读取、存储使用)​​​​Step 3:编写Mapper类,进行数据转移​​​​Step 4:编写Reducer类,直接输出​​​​Step 5:编写Driver驱动类​​​​Step 6:结果查看​​

​​4.案例三:各州累计病例分区统计​​

​​Step 1:分析 ---- 分区​​​​Step 2:封装数据库记录Bean类(读取)​​​​Step 3:编写Mapper类,获取数据​​​​Step 4:编写Reducer类,输出数据​​​​Step 5:编写StatePartition分区类​​​​Step 6:编写Driver驱动类​​​​Step 7:查看结果​​

​​5.案例四:各州累计病例数最多Top1的县信息​​

​​Step 1:分析 ---- 分组​​​​Step 2:封装数据库记录Bean类(读取)​​​​Step 3:封装数据库记录Bean类 (存储结果)​​​​Step 4:编写Mapper类,传出数据​​​​Step 5:编写Comparator类,对数据进行分组​​​​Step 6:编写Reducer类,只输出第一条记录​​​​Step 7:编写Driver驱动类​​​​Step 8:查看结果​​

​​6.案例五:各州累计病例数最多TopN的县信息​​

​​Step 1:修改上个案例Reducer类输出前N个​​​​Step 2:解析​​

美国新冠疫情病例数统计

1.数据字段说明

部分数据集展示:

date,county,state,fips,cases,deaths2020-01-21,Snohomish,Washington,53061.0,1,0.02020-01-22,Snohomish,Washington,53061.0,1,0.02020-01-23,Snohomish,Washington,53061.0,1,0.02020-01-24,Cook,Illinois,17031.0,1,0.02020-01-24,Snohomish,Washington,53061.0,1,0.02020-01-25,Orange,California,6059.0,1,0.02020-01-25,Cook,Illinois,17031.0,1,0.02020-01-25,Snohomish,Washington,53061.0,1,0.02020-01-26,Maricopa,Arizona,4013.0,1,0.02020-01-26,Los Angeles,California,6037.0,1,0.02020-01-26,Orange,California,6059.0,1,0.02020-01-26,Cook,Illinois,17031.0,1,0.0.............................................2020-04-03,Nash,North Carolina,37127.0,14,0.02020-04-03,New Hano

​​返回顶部​​

2.案例一:各州累计病例数量统计

Step 1:分析

​​返回顶部​​

Step 2:准备工作 — 远程数据库备份数据

之后的方式都将采用远程数据库的形式进行数据访问与结果输出,所以要先将数据集存入远程数据库中(这里远程是在linux系统下进行)

part 1:远程创建数据库及表

CREATE TABLE `usa` ( `date` varchar(255) DEFAULT NULL, `country` varchar(255) DEFAULT NULL, `state` varchar(255) DEFAULT NULL, `fips` varchar(255) DEFAULT NULL, `cases` int(255) DEFAULT NULL, `deaths` int(255) DEFAULT NULL);

part 2:数据库表对应的封装数据库记录Bean类

import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { // 定义记录字段 (一定保持和数据库中的字段名称、类型一致) private String date; private String country; private String state; private String fips; private int cases; private int deaths; // 重写toString() @Override public String toString() { return "BeanSQL{" + "date='" + date + '\'' + ", country='" + country + '\'' + ", state='" + state + '\'' + ", fips='" + fips + '\'' + ", cases=" + cases + ", deaths=" + deaths + '}'; } // 自定义set赋值 public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } // 数据库写入 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } // 数据库读取 @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } // set、get方法集 public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}

​​返回顶部​​

part 3:编写Mapper类,处理读取的数据

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MapSQL extends Mapper { BeanSQL v = new BeanSQL(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取每行数据进行字段拆分 String[] fields = value.toString().split(","); // 判断数据 (合法的输入) if (fields[0].equals("date") || fields.length < 6) return; // 封装 v.set( fields[0], fields[1], fields[2], fields[3], (int)Double.parseDouble(fields[4]), (int)Double.parseDouble(fields[5]) ); // 写出 context.write(NullWritable.get(),v); // System.out.println(v.toString()); 检测中间输出情况 }}

​​返回顶部​​

part 4:编写Reducer类,输出

import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ReduceSQL extends Reducer { @Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { // 直接遍历写出Map端输入的数据即可 for (BeanSQL bean:values){ context.write(bean,key); // System.out.println(bean); } }}

​​返回顶部​​

part 5:编写Driver驱动类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class DriverSQL { public static void main(String[] args) { try { // 创建job Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", // 数据库连接驱动 "jdbc:mysql://127.0.0.1:3306/school", // 配置远程数据库地址 "root","123456" // 用户、密码 ); Job job = Job.getInstance(conf); // 配置Map、reduce、driver类 job.setMapperClass(MapSQL.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(BeanSQL.class); job.setReducerClass(ReduceSQL.class); job.setOutputKeyClass(BeanSQL.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(DriverSQL.class); // 设置输入文件路径(本地文件) Path input = new Path( "G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\data\\us_counties_covid19_daily.csv" ); FileInputFormat.setInputPaths(job,input); // 设置输出目标为远程数据库(目标存储地) job.setOutputFormatClass(DBOutputFormat.class); // String[] fields = {"date","country","state","fips","cases","deaths"}; // job,表名,字段(也可以将fields传入) DBOutputFormat.setOutput(job,"usa","date","country","state","fips","cases","deaths"); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e) { e.printStackTrace(); } }}

part 6:结果查看

运行成功后,到虚拟机中查看数据库中表的情况,如下图所示,已成功转入!

​​返回顶部​​

Step 3:封装数据库记录Bean类(读取时用)

import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { // 定义记录字段 (一定保持和数据库中的字段名称、类型一致) private String date; private String country; private String state; private String fips; private int cases; private int deaths; // 重写toString() @Override public String toString() { return "BeanSQL{" + "date='" + date + '\'' + ", country='" + country + '\'' + ", state='" + state + '\'' + ", fips='" + fips + '\'' + ", cases=" + cases + ", deaths=" + deaths + '}'; } // 自定义set赋值 public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } // 数据库写入 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } // 数据库读取 @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } // set、get方法集 public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}

​​返回顶部​​

Step 4:封装数据库记录Bean类(结果存储时用)

结果创建存储表

CREATE TABLE `state_count` ( `state` varchar(255) DEFAULT NULL, `total_c` int(255) DEFAULT NULL, `total_d` int(255) DEFAULT NULL);

import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class Bean1 implements WritableComparable, DBWritable { // 结果表的字段 private String state; private int total_c; private int total_d; // 无参构造 @Override public int compareTo(Bean1 o) { return 0; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(state); out.writeInt(total_c); out.writeInt(total_d); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { state = in.readUTF(); total_c = in.readInt(); total_d = in.readInt(); } // 写入数据库 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.state); statement.setInt(2,this.total_c); statement.setInt(3,this.total_d); } // 读取数据库 @Override public void readFields(ResultSet resultSet) throws SQLException { this.state = resultSet.getString(1); this.total_c = resultSet.getInt(2); this.total_d = resultSet.getInt(3); } // 重写toString() @Override public String toString() { return this.state + "州累计确诊" + this.total_c + "例,累计死亡"+ this.total_d+"例"; } // set、get方法集 public String getState() { return state; } public void setState(String state) { this.state = state; } public int getTotal_c() { return total_c; } public void setTotal_c(int total_c) { this.total_c = total_c; } public int getTotal_d() { return total_d; } public void setTotal_d(int total_d) { this.total_d = total_d; }}

​​返回顶部​​

Step 5:编写Mapper类,传递< 州,记录(Bean对象) >

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Map1 extends Mapper { Text k = new Text(); @Override protected void map(LongWritable key, BeanSQL value, Context context) throws IOException, InterruptedException { // 将key设置为获取的每行数据的state信息 k.set(value.getState()); // 写出 context.write(k,value); }}

​​返回顶部​​

Step 6:编写Reduce类,进行各州病例统计

import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce1 extends Reducer { Bean1 k = new Bean1(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int total_c = 0; int total_d = 0; // key相同的数据会进入同一个reduceTask进行处理 // 遍历相同的key的值进行累计病例统计也就是统计相同的州的数据 for (BeanSQL b :values){ total_c += b.getCases(); total_d += b.getDeaths(); } // 封装内容 k.setState(key.toString()); k.setTotal_c(total_c); k.setTotal_d(total_d); // 写出 context.write(k,NullWritable.get()); }}

​​返回顶部​​

Step 7:编写Driver驱动类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Driver1 { public static void main(String[] args) { try { // job,conf配置 Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); // 获取job Job job = Job.getInstance(conf); // todo:jar包集群运行时需要指定(从hdfs读取数据) // conf.set("fs.defaultFS","hdfs://192.168.64.178:9000"); // 配置类的关联 job.setMapperClass(Map1.class); job.setReducerClass(Reduce1.class); job.setJarByClass(Driver1.class); // 配置数据各阶段输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BeanSQL.class); job.setOutputKeyClass(Bean1.class); job.setOutputValueClass(NullWritable.class); // 配置输入输出文件路径 // todo:方式一:从本地读取数据然后输出结果到本地的路径设置 // Path in = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\data\\us_counties_covid19_daily.csv"); // Path out = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\output"); // 删除已存在文件 // FileSystem fs = FileSystem.get(conf); // if (fs.exists(out)){ // fs.delete(out,true); // } // todo:方式二:从hdfs获取数据然后再输出结果到hdfs的路径设置 // Path in = new Path("/MR/usayiqing/us_counties_covid19_daily.csv"); // Path out = new Path("/MR/usayiqing/output"); // FileInputFormat.setInputPaths(job,in); // FileOutputFormat.setOutputPath(job,out); // todo:方式三:从远程数据库中获取数据,处理然后输出 job.setInputFormatClass(DBInputFormat.class); String[] fields1 = {"date","country","state","fips","cases","deaths"}; String[] fields2 = {"state","total_c","total_d"}; DBInputFormat.setInput(job, BeanSQL.class,"usa",null,"",fields1); DBOutputFormat.setOutput(job,"state_count",fields2); // 提交job boolean result = job.waitForCompletion(true); System.out.println(result ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}

​​返回顶部​​

Step 8:查看结果

​​返回顶部​​

3.案例二:各州累计病例数量倒排统计

Step 1:分析 ---- 排序

​​返回顶部​​

Step 2:封装数据库记录Bean类(读取、存储使用)

注意:虽然读取存储时都使用同一个Bean类,但是真正意义上读取、写入的数据库表示不一样的,这里我们直接使用上面统计好的数据进行排序~

创建结果存贮表

CREATE TABLE `state_count_sort` ( `state` varchar(255) DEFAULT NULL, `total_c` int(255) DEFAULT NULL, `total_d` int(255) DEFAULT NULL);

package 美国疫情.州排序;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class Bean2 implements WritableComparable, DBWritable { // 结果表的字段 private String state; private int total_c; private int total_d; // 实现比较 @Override public int compareTo(Bean2 o) { if (this.total_c > o.total_c){ return -1; }else if (this.total_c < o.total_c){ return 1; }else { return 0; } } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(state); out.writeInt(total_c); out.writeInt(total_d); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { state = in.readUTF(); total_c = in.readInt(); total_d = in.readInt(); } // 写入数据库 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.state); statement.setInt(2,this.total_c); statement.setInt(3,this.total_d); } // 读取数据库 @Override public void readFields(ResultSet resultSet) throws SQLException { this.state = resultSet.getString(1); this.total_c = resultSet.getInt(2); this.total_d = resultSet.getInt(3); } // 重写toString() @Override public String toString() { return this.state + "州累计确诊" + this.total_c + "例,累计死亡"+ this.total_d+"例"; } // set、get方法集 public String getState() { return state; } public void setState(String state) { this.state = state; } public int getTotal_c() { return total_c; } public void setTotal_c(int total_c) { this.total_c = total_c; } public int getTotal_d() { return total_d; } public void setTotal_d(int total_d) { this.total_d = total_d; }}

​​返回顶部​​

Step 3:编写Mapper类,进行数据转移

package 美国疫情.州排序;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// 这里的两个bean2类型相同,但是对应的数据库表不同public class Map2 extends Mapper { Bean2 k = new Bean2(); @Override protected void map(LongWritable key, Bean2 value, Context context) throws IOException, InterruptedException { // 属性值转移 k.setState(value.getState()); k.setTotal_c(value.getTotal_c()); k.setTotal_d(value.getTotal_d()); // 写出 context.write(k,NullWritable.get()); }}

​​返回顶部​​

Step 4:编写Reducer类,直接输出

package 美国疫情.州排序;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce2 extends Reducer { @Override protected void reduce(Bean2 key, Iterable values, Context context) throws IOException, InterruptedException { // 直接写出 for (NullWritable n: values){ context.write(key,NullWritable.get()); } }}

​​返回顶部​​

Step 5:编写Driver驱动类

package 美国疫情.州排序;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;public class Driver2 { public static void main(String[] args) { try { // 获取job Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); Job job = Job.getInstance(conf); // 配置map、reduce、driver类 job.setMapperClass(Map2.class); job.setMapOutputKeyClass(Bean2.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(Reduce2.class); job.setOutputKeyClass(Bean2.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(Driver2.class); // 配置数据输入输出类型 --- 数据库 job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); String[] field1 = {"state","total_c","total_d"}; String[] field2 = {"state","total_c","total_d"}; DBInputFormat.setInput(job,Bean2.class,"state_count",null,"",field1); DBOutputFormat.setOutput(job,"state_count_sort",field2); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}

​​返回顶部​​

Step 6:结果查看

​​返回顶部​​

4.案例三:各州累计病例分区统计

Step 1:分析 ---- 分区

Step 2:封装数据库记录Bean类(读取)

package 美国疫情.州分区;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { private String date; private String country; private String state; private String fips; private int cases; private int deaths; public BeanSQL() { } @Override public String toString() { return date + '\t' + country + '\t' + state + '\t' + fips + '\t' + cases + '\t' + deaths ; } public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}

​​返回顶部​​

Step 3:编写Mapper类,获取数据

package 美国疫情.州分区;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Map3 extends Mapper { BeanSQL v = new BeanSQL(); Text k = new Text(); @Override protected void map(LongWritable key, BeanSQL value, Context context) throws IOException, InterruptedException { // 定义输出数据 k.set(value.getState()); v.set(value.getDate(),value.getCountry(),value.getState(),value.getFips(),value.getCases(),value.getDeaths()); // 写出 context.write(k,v); System.out.println(v); }}

​​返回顶部​​

Step 4:编写Reducer类,输出数据

package 美国疫情.州分区;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class StatePartition extends Partitioner { // 构建美国州的数据字典 public static HashMap stateMap = new HashMap<>(); static { stateMap.put("Washington",0); stateMap.put("California",1); stateMap.put("Michigan",2); stateMap.put("Louisiana",3); stateMap.put("Kansas",4); stateMap.put("Florida",5); } /** * todo 自定义分区器中分区规则的实现 * 只要getPartition中返回的int一样,数据就会被分到同一个分区中 * @param text * @param beanSQL2 * @param numPartitions * @return */ @Override public int getPartition(Text text, BeanSQL beanSQL2, int numPartitions) { // 通过map传入州的信息 Integer code = stateMap.get(text.toString()); // 判断是否有该州分区信息 if (code !=null){ // 有直接返回对应的分区数 return code; }else { // 没有则全部放入一个新的分区 return 6; } }}

​​返回顶部​​

Step 5:编写StatePartition分区类

package 美国疫情.州分区;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce3 extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 遍历直接输出values即可 for (BeanSQL bean:values){ context.write(bean,NullWritable.get()); System.out.println(bean); } }}

​​返回顶部​​

Step 6:编写Driver驱动类

package 美国疫情.州分区;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class Driver3 { public static void main(String[] args) { try { // 获取job Configuration conf = new Configuration(); // 配置数据库 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); Job job = Job.getInstance(conf); // 配置map、reduce、driver job.setMapperClass(Map3.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BeanSQL.class); job.setReducerClass(Reduce3.class); job.setOutputKeyClass(BeanSQL.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(Driver3.class); // 配置分区类 job.setPartitionerClass(StatePartition.class); job.setNumReduceTasks(7); // 配置数据输入输出 job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); String[] fields = {"date","country","state","fips","cases","deaths"}; DBInputFormat.setInput(job,BeanSQL.class,"usa",null,"",fields); FileOutputFormat.setOutputPath(job,new Path("G:/Cache/MR/output_usa")); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}

​​返回顶部​​

Step 7:查看结果

这里我们选择直接将数据输出到本地,不然需要创建多个数据库表。

注意:

​​返回顶部​​

5.案例四:各州累计病例数最多Top1的县信息

Step 1:分析 ---- 分组

​​返回顶部​​

Step 2:封装数据库记录Bean类(读取)

package 美国疫情.州分区;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class BeanSQL implements Writable, DBWritable { private String date; private String country; private String state; private String fips; private int cases; private int deaths; public BeanSQL() { } @Override public String toString() { return date + '\t' + country + '\t' + state + '\t' + fips + '\t' + cases + '\t' + deaths ; } public void set(String date, String country, String state, String fips, int cases, int deaths) { this.date = date; this.country = country; this.state = state; this.fips = fips; this.cases = cases; this.deaths = deaths; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(date); out.writeUTF(country); out.writeUTF(state); out.writeUTF(fips); out.writeInt(cases); out.writeInt(deaths); } @Override public void readFields(DataInput in) throws IOException { this.date = in.readUTF(); this.country = in.readUTF(); this.state = in.readUTF(); this.fips = in.readUTF(); this.cases = in.readInt(); this.deaths = in.readInt(); } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.date); statement.setString(2,this.country); statement.setString(3,this.state); statement.setString(4,this.fips); statement.setInt(5,this.cases); statement.setInt(6,this.deaths); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.date = resultSet.getString(1); this.country = resultSet.getString(2); this.state = resultSet.getString(3); this.fips = resultSet.getString(4); this.cases = resultSet.getInt(5); this.deaths = resultSet.getInt(6); } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getFips() { return fips; } public void setFips(String fips) { this.fips = fips; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; } public int getDeaths() { return deaths; } public void setDeaths(int deaths) { this.deaths = deaths; }}

​​返回顶部​​

Step 3:封装数据库记录Bean类 (存储结果)

创建数据库用于存储结果数据集

mysql> create table state_top -> ( -> state varchar(255), -> country varchar(255), -> cases int(255) -> );Query OK, 0 rows affected (0.01 sec)

package 美国疫情.州Top;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class TopBean implements DBWritable, WritableComparable { // 封装输出数据类(表)的属性(字段) private String state; private String country; private int cases; // 无参构造 public TopBean() { } // 自定义赋值方法 public void set(String state, String country, int cases) { this.state = state; this.country = country; this.cases = cases; } // 重写toString() @Override public String toString() { return "TopBean{" + "state='" + state + '\'' + ", country='" + country + '\'' + ", cases=" + cases + '}'; } // 读取数据库 @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1,this.state); statement.setString(2,this.country); statement.setInt(3,this.cases); } // 写入数据库 @Override public void readFields(ResultSet resultSet) throws SQLException { this.state = resultSet.getString(1); this.country = resultSet.getString(2); this.cases = resultSet.getInt(3); } // todo 排序: // 先按照州进行排序,然后在按照州的案例数排序 @Override public int compareTo(TopBean o) {// int result;// // 首先比较州// int i = this.state.compareTo(o.getState());// if (i > 0){// result = 1;// }else if (i < 0){// result = -1;// }else {// // 州相同时,比较cases// if ( this.cases > o.cases){// result = -1;// } else if (this.cases < o.cases){// result =1;// } else {// result = 0;// }// }// return result; // 如果州相等,再比较病例数 if (o.state.compareTo(this.state) == 0) { return o.cases - this.cases; // return this.cases - o.cases; 则选择最病例少的 } // 州不相等 时 降序排列 return -( o.state.compareTo(this.state) ); } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.state); out.writeUTF(this.country); out.writeInt(this.cases); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { this.state = in.readUTF(); this.country = in.readUTF(); this.cases = in.readInt(); } // set\get方法集 public String getState() { return state; } public void setState(String state) { this.state = state; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public int getCases() { return cases; } public void setCases(int cases) { this.cases = cases; }}

​​返回顶部​​

Step 4:编写Mapper类,传出数据

package 美国疫情.州Top;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Map4 extends Mapper { TopBean k = new TopBean(); @Override protected void map(LongWritable key, BeanSQL value, Context context) throws IOException, InterruptedException { // 字段值转移 k.set(value.getState(),value.getCountry(),value.getCases()); // 写出 context.write(k,NullWritable.get()); }}

​​返回顶部​​

Step 5:编写Comparator类,对数据进行分组

package 美国疫情.州Top;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * todo 自定义分组类 */public class Comparator extends WritableComparator { protected Comparator() { super(TopBean.class, true); } /** * todo 重写compare方法 * @param a * @param b * @return */ @Override public int compare(WritableComparable a, WritableComparable b) { // 类型转换 TopBean abean = (TopBean) a; TopBean bbean = (TopBean) b; // todo 判断规则 // 只要state信息一样,就分到同一组 ---- 同一个ReduceTask处理 // 利用compareTo方法判断,只要州信息一样(返回0),就认为是key相同 return abean.getState().compareTo(bbean.getState()); }}

​​返回顶部​​

Step 6:编写Reducer类,只输出第一条记录

package 美国疫情.州Top;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce4 extends Reducer{ @Override protected void reduce(TopBean key, Iterable values, Context context) throws IOException, InterruptedException { // todo 相同的的key进入同一个ReduceTask,此时经过分组 // 此时的参数key就是分组中的一个键值对的key // 求top1,只输出第一条即可 context.write(key,NullWritable.get()); }}

​​返回顶部​​

Step 7:编写Driver驱动类

package 美国疫情.州Top;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;public class Driver4 { public static void main(String[] args) { try { // 获取job Configuration conf = new Configuration(); // 创建数据库连接 DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.64.178:3306/school", "root","123456" ); Job job = Job.getInstance(conf); // 配置map、reduce、driver、Comparator job.setMapperClass(Map4.class); job.setMapOutputKeyClass(TopBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(Reduce4.class); job.setOutputKeyClass(TopBean.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(Driver4.class); // 配置分组类 job.setGroupingComparatorClass(Comparator.class); // 配置输入输出 job.setInputFormatClass(DBInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); String[] fields1 = {"date","country","state","fips","cases","deaths"}; String[] fields2 = {"state","country","cases"}; DBInputFormat.setInput(job,BeanSQL.class,"usa",null,"",fields1); DBOutputFormat.setOutput(job,"state_top",fields2); // 提交job System.exit(job.waitForCompletion(true) ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}

​​返回顶部​​

Step 8:查看结果

​​返回顶部​​

6.案例五:各州累计病例数最多TopN的县信息

Step 1:修改上个案例Reducer类输出前N个

创建表存储结果

mysql> create table state_topn -> ( -> state varchar(255), -> country varchar(255), -> cases int(255) -> );Query OK, 0 rows affected (0.07 sec)

修改Reducer类

package 美国疫情.州Top;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce5 extends Reducer{ @Override protected void reduce(TopBean key, Iterable values, Context context) throws IOException, InterruptedException { // todo 循环判断前N int count = 1; for (NullWritable n:values){ context.write(key,NullWritable.get()); count++; if (count>=4){ // 如果超出三个,直接退出 return; } } }}

查看结果:

​​返回顶部​​

Step 2:解析

​​返回顶部​​

相关知识点内容参考:​​https://bilibili.com/video/BV1Tf4y167U8?p=30​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于ARM 的 mbed微控制器快速原型设计解决方案
下一篇:面试题:java中为什么foreach中不允许对元素进行add和remove
相关文章

 发表评论

暂时没有评论,来抢沙发吧~