c语言sscanf函数的用法是什么
290
2022-11-24
Hadoop yarn工作流程详解
yarn是什么?1、它是一个资源调度及提供作业运行的系统环境平台 资源:cpu、mem等 作业:map task、reduce Task
yarn产生背景?它是从hadoop2.x版本才引入1、hadoop1.x版本它是如何资源调度及作业运行机制原理a、JobTracker(主节点) (a):接受客户端的作业提交 (b):交给任务调度器安排任务的执行 (c):通知空闲的TaskTracker去处理 (d): 与TaskTracker保持心跳机制
b、TaskTracker(从节点) (a):执行map task和reduce task (b): 与JobTracker保持心跳机制
缺点:1、单点故障问题2、负载压力3、只能运行mapreduce的程序
引入了yarn机制1、减少负载压力2、主备机制3、支持不同的程序运行
yarn整体的架构?
resourcemanager
作用: (1)接受客户端提交作业 (2)启动一个app master去处理 资源分配 (3)监控nodemanager
nodemanager
作用: (1)管理单个节点上的资源 (2)接受resourcemanager发送过来的指令 (3)接受app master发送过来的指令 (4) 启动Container
app master
(1)运行作业的主控者 (2)获取切片数据 (3)从resourcemanager审请运行作业资源 (4)监控作业运行的状态
Container:
它其实就是一个虚拟主机的抽象,分配cpu和内存,主要运行作业
app masterContainerClient
yarn的工作机制(重点)1、连接运行器平台 根据mapreduce.framework.name变量配置 如果等于yarn:则创建YARNRunner对象 如果等于Local:则创建LocalJobRunner对象
2、如果是yarn平台,对resoucemanager提交作业审请3、resourcemanager返回一个jobid和数据保存目录(hdfs://xxx/staging/xxx)4、客户端根据返回数据保存目录路径,将job.split、job.xml、jar文件提交到hdfs://xxx/staging/xxx目录5、提交数据资源之后,客户端对resouremanager提交任务运行6、resourcemanager将任务存储任务队列7、resourcemanager发送命令nodemanager处理从任务取出的任务8、nodemanager往resourcemanageer审请我要创建一个app master a、在nodemanager创建一个container,再启动app master9、app master读取数据切片处理方案10、app master往resourcemanager审请运行资源11、resourcemanager往空闲的nodemanager主机发送指令,要创建Container12、app master往nodemanger发送运行指令,container运行任务。
如下图:
是否可以直接从本地idea直接将程序运行到yarn平台?
以wordcount为例:
代码如下:
package com.gec.demo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** 作用:体现mapreduce的map阶段的实现 * KEYIN:输入参数key的数据类型 * VALUEIN:输入参数value的数据类型 * KEYOUT,输出key的数据类型 * VALUEOUT:输出value的数据类型 * * 输入: * map(key,value)=偏移量,行内容 * * 输出: * map(key,value)=单词,1 * * 数据类型: * java数据类型: * int-------------->IntWritable * long------------->LongWritable * String----------->Text * 它都实现序列化处理 * * */public class WcMapTask extends Mapper{ /* *根据拆分输入数据的键值对,调用此方法,有多少个键,就触发多少次map方法 * 参数一:输入数据的键值:行的偏移量 * 参数二:输入数据的键对应的value值:偏移量对应行内容 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String words[]=line.split(" "); for (String word : words) { context.write(new Text(word),new IntWritable(1)); } } }
package com.gec.demo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** 此类:处理reducer阶段 * 汇总单词次数 * KEYIN:输入数据key的数据类型 * VALUEIN:输入数据value的数据类型 * KEYOUT:输出数据key的数据类型 * VALUEOUT:输出数据value的数据类型 * * * */public class WcReduceTask extends Reducer{ /* * 第一个参数:单词数据 * 第二个参数:集合数据类型汇总:单词的次数 * * */ @Override protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } context.write(key,new IntWritable(count)); } }
package com.gec.demo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WcCombiner extends Reducer { private IntWritable sum=new IntWritable(); @Override protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } sum.set(count); context.write(key,sum); } }
package com.gec.demo;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * Hello world! * */public class App { public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration();// conf.set("fs.defaultFS","hdfs://hadoop-001:9000");// conf.set("mapreduce.framework.name","yarn");// conf.set("yarn.resourcemanager.hostname","hadoop-002"); conf.set("mapred.jar","D:\\JAVA\\projectsIDEA\\BigdataStudy\\mrwordcountbyyarn\\target\\wordcountbyyarn-1.0-SNAPSHOT.jar"); Job job=Job.getInstance(conf); //设置Driver类 job.setJarByClass(App.class); //设置运行那个map task job.setMapperClass(WcMapTask.class); //设置运行那个reducer task job.setReducerClass(WcReduceTask.class); job.setCombinerClass(WcCombiner.class); //设置map task的输出key的数据类型 job.setMapOutputKeyClass(Text.class); //设置map task的输出value的数据类型 job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定要处理的数据所在的位置 FileInputFormat.setInputPaths(job, "/wordcount/input/big.txt"); //指定处理完成之后的结果所保存的位置 FileOutputFormat.setOutputPath(job, new Path("/wordcount/output7")); //向yarn集群提交这个job boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
其中
是因为在resource文件夹中直接添加配置文件
配置文件分别如下:
注意:这里的配置文件要和虚拟机中的配置文件一样,否则可能会出错,最好的做法是从虚拟机中直接copy出来
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~