【文档说明】Hadoop大数据开发实战-第07章-认识MapReduce编程模型课件.pptx,共(26)页,2.321 MB,由我爱分享上传
转载请保留链接:https://www.ichengzhen.cn/view-10457.html
以下为本文档部分文字说明:
认识MapReduce编程模型主要内容⚫MapReduce编程模型简介⚫WordCount编程实例⚫HadoopMapReduce架构⚫MapReduce实战开发MapReduce编程模型简介⚫MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但用于编写有用的程序
并不简单。Hadoop可以运行由各种语言编写的MapReduce程序。例如:Java、Ruby、Python和C++语言等。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有足够多
机器的运行商。MapReduce的优势在于处理大规模数据集。MapReduce编程模型简介⚫1、从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map和Reduce。用户只需map()和reduce()两个函数,即可完成简单的分布
式程序设计。⚫2、map()函数以key/value对作为输入,产生另外一系列key/value对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照key值进行聚合,且key值相同的数据被统一交给re
duce()函数处理。⚫3、reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列key/value对作为最终输出写入HDFS。MapReduce编程模型简介⚫MapReduce设计目的:➢易于编程➢良好的扩展性➢高容错性WordCount编
程实例⚫Mapper类:publicclassWordMapperextendsMapper<Object,Text,Text,IntWritable>{publicstaticfinalIntWritableval=newIn
tWritable(1);publicstaticfinalTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsInterruptedException,IOException{Strin
gline=value.toString();String[]arr=line.split("\t");for(Stringwd:arr){word.set(wd);context.write(word,val);}}}WordCount编程实例⚫Reduc
er类publicclassWordReducerextendsReducer<Text,IntWritable,Text,IntWritable>{publicIntWritableval=newIntWrit
able();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsInterruptedException,IOException{intsum=0
;for(IntWritablevalue:values){sum+=value.get();}val.set(sum);context.write(key,val);}}WordCount编程实例⚫main类:publicclassWordCount{pub
licstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{Stringintput=null;Stringou
tput=null;if(null!=args&&args.length==2){intput=args[0];output=args[1];Jobjob=newJob(newConfiguration(),"wordcount");//创建一个job//以jar包的形式运行job.
setJarByClass(WordCount.class);//设置Mapper类和Reducer类job.setMapperClass(Mapper.class);job.setReducerClass(Redu
cer.class);WordCount编程实例//设置输出的key/value的输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入输出的格式FileInp
utFormat.addInputPath(job,newPath(intput));FileOutputFormat.setOutputPath(job,newPath(output));System.exit(job.waitForCompletion(true)?0:1);}else
{System.err.println("<Urage>wordcount<intput><output>");}}}运行结果WordCount编程实例⚫用户编写完MapReduce程序后,按照一定的规则
指定程序的输入和输出目录,并提交到Hadoop集群中,作业在Hadoop中的执行过程如图所示。Hadoop将输入数据切分成若干个输入分片(inputsplit),并将每个split交给一个MapTask处理;M
apTask不断的从对应的split中解析出一个个key/value,并调用map()函数处理,处理完之后根据ReduceTask个数将结果分成若干个分区(partition)写到本地磁盘;同时,每个ReduceTask从每个MapTask上读取属于自己的那个parti
tion,然后基于排序的方法将key相同的数据聚集在一起,调用reduce()函数处理,并将结果输出到文件中。WordCount编程实例⚫流程图如下:HadoopMapReduce架构HadoopMapReduce架构⚫1)C
lient➢用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业的运行状态。在Hadoop内部用“作业”(Job)表示MapReduce程序。一个MapRed
uce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)。⚫2)JobTracker➢JobTracke负责资源监控和作业调度。JobTracker监控所有TaskTracker与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时
,JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用户可以根据自己的需要
设计相应的调度器。HadoopMapReduce架构⚫3)TaskTracker➢TaskTracker会周期性地通过Heartbeat将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTra
cker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slo
t分配给Task使用。slot分为Mapslot和Reduceslot两种,分别供MapTask和ReduceTask使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。HadoopMapReduce架构⚫4)Task➢Task分为MapTask和R
educeTask两种,均由TaskTracker启动。HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节
点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了MapTask的数目,因为每个split只会交给一个MapTask处理。HadoopMapReduce架构⚫MapTask执行过程如下图所示。由该图可知,MapTask先将对应的s
plit迭代解析成一个个key/value对,依次调用用户自定义的map()函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition将被一个ReduceTask处理。HadoopMap
Reduce架构⚫ReduceTask执行过程下图所示。该过程分为三个阶段:➢①从远程节点上读取MapTask中间结果(称为“Shuffle阶段”);➢②按照key对key/value对进行排序(称为“Sort阶段”);➢③依次读取<key,valuelist>,调用用户自定义的reduce
()函数处理,并将最终结果存到HDFS上(称为“Reduce阶段”)。MapReduce实战开发⚫数据源➢sogou500w数据或sogou4000w数据⚫数据字段描述➢Time:用户访问时间➢Uid:用户的id➢Keyword:访问的关键字➢Ran
k:点击排名➢Order:页数➢Url:网址条件过滤⚫统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录➢详细代码见XjUid.java⚫rank<3并且order>2的所有UID及数量➢代码见UidByRank.j
ava搜索过‘仙剑奇侠传’内容的UID、搜索记录staticclassUidMapextendsMapper<LongWritable,Text,Text,Text>{Textuid=newText();protectedvoidmap(LongWritablek
ey,Textvalue,org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,Text>.Contextcontext)throwsjava.io.IOException,InterruptedException{String[]l
ines=value.toString().split("\t");if(lines!=null&&lines.length==6){Stringkw=lines[2];if(kw.indexOf(“仙剑奇侠传")>=0){uid.set(lines[1]);co
ntext.write(uid,newText(kw));}}};}搜索过‘仙剑奇侠传’内容的UID、搜索记录publicstaticvoidmain(String[]args)throwsIOException,ClassN
otFoundException,InterruptedException{if(args.length!=2&&args==null){System.err.println("PleaseIputRightPath!");Sys
tem.exit(0);}Configurationconfiguration=newConfiguration();Jobjob=newJob(configuration,BaiduUid.class.
getSimpleName());job.setJarByClass(BaiduUid.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutput
Format.class);FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job
,newPath(args[1]));job.setMapperClass(UidMap.class);job.setNumReduceTasks(0);job.setOutputKeyClass(Text.class);job.setOutputVa
lueClass(Text.class);job.waitForCompletion(true);}条件查询⚫上午7-9点之间,搜索过“赶集网”的用户➢详细代码请见GjTest.java本章小结⚫MapReduce主要分为input、split
ting、Mapping、Shuffling、Reducing、Finalreduce这几个阶段。⚫HadoopMapReduce处理的数据一般位于底层分布式文件系统中。该系统往往将用户的文件切分为若干个固定大小的block存储到不同的节点上。默认情况下,MapReduce的每个T
ask处理一个block。⚫MapReduce主要由四个组件构成,分别是Client,JobTracker,TaskTracker和Task,它们共同保障一个作业的成功运行。作业⚫用MapReduce实现WordCount谢谢