hadoop mapreduce结果存放的绝对路径是什么 ?

2020-05-27 12:00发布

1条回答
小红
2楼 · 2020-07-16 14:44






MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计。




  1. 通俗理解MapReduce

MapReduce的处理过程分为两个步骤:map和reduce。每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定。map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总。


例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如下:




为了使用MapReduce找出历史上每年的最高温度,我们将行数作为map输入的key,每一行的文本作为map输入的value:




上图中粗体部分分别表示年份和温度。map函数对每一行记录进行处理,提取出(年份,温度)形式的键值对,作为map的输出:




(1950,0)

(1950,22)

(1950,-11)

(1949,111)

(1947,78)


很明显,有些数据是脏的,因此map也是进行脏数据处理和过滤的好地方。在map输出被传输到reduce之前,MapReduce框架会对键值对进行排序,根据key进行分组,甚至在key相同的一组内先统计出最高气温,所以reduce收到的数据格式像这样:




(1949,[111,78]

(1950,[0,22,-11]


如果有多个map任务同时运行(通常都是这样),那么每个map任务完成后,都会向reduce发送上面格式的数据,发送数据的过程叫shuffle。


map的输出会作为reduce的输入,reduce收到的是key加上一个列表,然后对这个列表进行处理,天气数据的例子中,就是找出最大值作为最高气温。最后reduce输出即为每年最高气温:




(1949,111)

(1950,22)


整个MapReduce数据流如下图:




其中的3个黑圈圈分别为map,shuffle和reduce过程。在Hadoop中,map和reduce的操作可以由多种语言来编写,例如Java、Python、Ruby等。


在实际的分布式计算中,上述过程由整个集群协调完成,我们假设现在有5年(2011-2015)的天气数据,分布存放在3个文件中:weather1.txt,weather2.txt,weather3.txt。再假设我们现在有一个3台机器的集群,b并且map任务实例数量为3,reduce实例数量2。那么实际运行MapReduce做作业时,整个流程类似于这样:




注意到2014年的数据分布在两个不同的文件中,黄色的粗线部分,代表2014年的2个map作业的输出都统一传输到一个reduce,因为他们的key相同(2014)。其实这个过程非常好理解,现实生活中,比如期末考试完了,那考卷由不同的老师批改,完成后如果想知道全年级最高分,那么可以这么做:


1)各个老师根据自己批改过的所有试卷分数整理出来(map):




=>(course,[score1,score2,...])


2)各个老师把最高分汇报给系主任(shuffle)

3)系主任统计最高分(reduce)




=>(courese,highest_score)


当然,如果要多门课程混在一起,系主任工作量太大,于是副主任也上(相当于2个reduce),则老师在汇报最高分的时候,相同课程要汇报给同一个人(相同key传输给同一个reduce),例如数学英语汇报给主任,政治汇报给副主任。




2.实例及代码实现



lifeisshort,showmethecode



MapReduce的概念框架有Google提出,Hadoop提供了经典的开源实现。但是并不是Hadoop特有的,例如在文档型数据库MongoDB中,可以通过JS来编写Map-Reduce,对数据库中的数据进行处理。我们这里以Hadoop为例说明。




数据准备


首先将本地的文件上传到HDFS:




hadoopfs-copyFromLocal/home/data/hadoop_book_input/hdfs://master:9000/input


可以查管理界面查看是否成功上传:




查看一下数据内容:




hadoopfs-texthdfs://master:9000/input/ncdc/sample.txt






编写Java代码


首先实现Mapper类,Mapper在新版本Hadoop中改变为类(旧版为接口)定义如下:




//支持泛型,泛型定义map输入输出的键值类型

publicclassMapper{

publicMapper(){


//map任务开始的时候调用一次,用于做准备工作

protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{

//空实现

}


//map逻辑默认直接将输入进行类型转换后输出

protectedvoidmap(KEYINkey,VALUEINvalue,

Contextcontext)throwsIOException,InterruptedException{

context.write((KEYOUT)key,(VALUEOUT)value);

}

//任务结束后调用一次,清理工作,与setup对应

protectedvoidcleanup(Contextcontext

)throwsIOException,InterruptedException{

//空实现

}


//map的实际运行过程就是调用run方法,一般用于高级实现,更精细地控制任务的执行过程,一般情况不需要覆盖这个方法

publicvoidrun(Contextcontext)throwsIOException,InterruptedException{

//准备工作

setup(context);

try{

//遍历分配给该任务的数据,循环调用map

while(context.nextKeyValue()){

map(context.getCurrentKey(),context.getCurrentValue(),context);

}

}finally{

//清理工作

cleanup(context);

}

}


}



实现中我们只覆盖map方法,其他保留不变。具体实现如下:




publicclassMaxTemperatureMapper

extendsMapper{


//9999代表数据丢失

privatestaticfinalintMISSING=9999;


@Override

publicvoidmap(LongWritablekey,Textvalue,Contextcontext)

throwsIOException,InterruptedException{


//行作为输入值key在这里暂时不需要使用

Stringline=value.toString();

//提取年份

Stringyear=line.substring(15,19);

//提取气温

intairTemperature=parseTemperature(line);

Stringquality=line.substring(92,93);


//过滤脏数据

booleanisRecordClean=airTemperature!=MISSING&&quality.matches("[01459]");

if(isRecordClean){

//输出(年份,温度)对

context.write(newText(year),newIntWritable(airTemperature));

}

}


privateintparseTemperature(Stringline){

intairTemperature;

if(line.charAt(87)=='+'){//parseIntdoesn'tlikeleadingplussigns

airTemperature=Integer.parseInt(line.substring(88,92));

}else{

airTemperature=Integer.parseInt(line.substring(87,92));

}


returnairTemperature;

}

}


接着实现Reducer,看看定义:




publicclassReducer{


//Reducer上下文类定义

publicabstractclassContext

implementsReduceContext{

}


//初始化在Reduce任务开始时调用一次

protectedvoidsetup(Contextcontext

)throwsIOException,InterruptedException{

//空实现

}


/**

*mapshuffle过来的数据中,每一个key调用一次这个方法

*/

@SuppressWarnings("unchecked")

protectedvoidreduce(KEYINkey,Iterablevalues,Contextcontext

)throwsIOException,InterruptedException{

//默认将所有的值一一输出

for(VALUEINvalue:values){

context.write((KEYOUT)key,(VALUEOUT)value);

}

}



protectedvoidcleanup(Contextcontext

)throwsIOException,InterruptedException{

//空实现收尾工作

}


//Reducer的运行逻辑供更高级的定制

publicvoidrun(Contextcontext)throwsIOException,InterruptedException{

setup(context);

try{

//遍历输入key

while(context.nextKey()){

reduce(context.getCurrentKey(),context.getValues(),context);

//一个key处理完要转向下一个key时,重置值遍历器

Iteratoriter=context.getValues().iterator();

if(iterinstanceofReduceContext.ValueIterator){

((ReduceContext.ValueIterator)iter).resetBackupStore();

}

}

}finally{

cleanup(context);

}

}

}



我们的Reducer实现主要是找出最高气温:




publicclassMaxTemperatureReducer

extendsReducer{


@Override

publicvoidreduce(Textkey,Iterablevalues,

Contextcontext)

throwsIOException,InterruptedException{

intmaxValue=findMax(values);

context.write(key,newIntWritable(maxValue));

}


privatestaticintfindMax(Iterablevalues){

intmaxValue=Integer.MIN_VALUE;

for(IntWritablevalue:values){

maxValue=Math.max(maxValue,value.get());

}


returnmaxValue;

}

}


Mapper和Reducer实现后,需要一个入口提交作业到Hadoop集群,在新版本中,使用YARN框架来运行MapReduce作业。作业配置如下:





publicclassMaxTemperature{


publicstaticvoidmain(String[]args)throwsException{

if(args.length!=2){

System.err.println("Usage:MaxTemperature");

System.exit(-1);

}


//设置jar包及作业名称

Jobjob=newJob();

job.setJarByClass(MaxTemperature.class);

job.setJobName("Maxtemperature");


//输入输出路径

FileInputFormat.addInputPath(job,newPath(args[0]));

FileOutputFormat.setOutputPath(job,newPath(args[1]));


//设置Mapper和Reducer实现

job.setMapperClass(MaxTemperatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);


//设置输出格式

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);


//等待作业完成后退出

System.exit(job.waitForCompletion(true)?0:1);

}

}


输入输出路径使用FileInputFormat/FileOutputFormat的静态方法来设置,在运行作业之前,输出目录不能存在,这是为了避免覆盖数据导致数据丢失。运行之前如果检测到目录已经存在,作业将无法运行。OK,把项目打包,如果使用Eclipse,使用Export功能。如果使用Maven开发,则直接运行package命令。假设我们最后的jar包为max-temp.jar.把jar包上传到你的集群机器上,或者放在安装了Hadoop的客户端机器上,这里假设jar包放在/opt/job目录下。


运行


首先把作业jar包放到CLASSPATH:




cd/opt/job

exportHADOOP_CLASSPATH=max-temp.jar


运行:




hadoopMaxTemperature/input/ncdc/sample.txt/output


hadoop会自动把HADOOP_CLASSPAT设置的路径加入到CLASSPATH中,同时把HADOOP相关的依赖包也加入CLASSPATH,然后启动一个JVM运行MaxTemperature这个带有main方法的类。

结果如下:






日志中可以看到作业的一些运行情况,例如map任务数量,reduce任务数量,以及输入输出的记录数,可以看到跟实际情况完全吻合。


我们看一下输出目录/output:




hadoopfs-ls/output




可以看到该目录下有个成功标识文件_SUCCESS和结果输出文件part-r-0000,每个reducer会输出一个文件。查看一下这个输出文件的内容:




hadoopfs-texthdfs://master:9000/output/part-r-00000


如上图所示,我们成功得到了1949和1950年的最高温度,无需管结果是否合理,只要按照我们想要的逻辑运行即可。


YARN管理界面也可以看到该作业的情况:






3.进一步理解MapReduce


一个MapReduce作业通常包括输入数据、MapReduce程序以及一些配置信息。Hadoop把作业分解为task运行,task分为map任务和reduce任务,在新版本的Hadoop中,这些Task通过资源管理框架进行调度,如果任务失败,MapReduce应用框架会重新运行任务。


作业的输入被华为为固定大小的分片,叫inputsplits,简称splits。然后为每一个split分块创建一个map任务,map任务对每一条记录运行用户定义的map函数。划分为split之后,不同配置的机器就可以根据自己的资源及运算能力运行适当的任务,即使是相同配置的机器,最后运行的任务数也往往不等,这样能有效利用整个集群的计算能力。但是split也不已太多,否则会耗费很多时间在创建map任务上,通常而言,按集群Block大小(默认为128M)来划分split是合理的。


Hadoop会把map任务运行在里数据最近的节点上,最好的情况是直接在数据(split)所在的节点上运行map任务,这样不需要占用带宽,这一优化叫做数据本地优化(datalocalityoptimization)。下图的map选址方案从最优到最次为a,b,c:




关于Hadoop如何衡量两个集群节点的距离,参考我的另一批博客深入理解HDFS:Hadoop分布式文件系统。但是节点距离不是分配task考虑的唯一因素,还会考虑节点当前负载等因素。


Reduce任务通常无法利用本地数据的优化,大多数情况下,reduce的输入都来自集群的其他节点。reduce针对每一个key运行reduce函数之后,输出结果通常保存在HDFS中,并且存储一定的副本数,第一个副本存在运行reduce任务的本地机器,其他副本根据HDFS写入的管道分别写入节点,关于更多HDFS的数据写入流程,参考这里。


下图是一个单reduce的数据流示例:




如果有多个reduce任务,那么map任务的输出到底该传输到哪一个reduce任务呢?决定某个key的数据(key,[value1,value2,...])该发送给那个reduce的过程叫partition。默认情况下,MapReduce使用key的哈希函数进行分桶,这通常工作的很好。如果需要自行指定分区函数,可以自己实现一个Partitioner并配置到作业中。key相同的map任务输出一定会发送到同一个reduce任务。map任务的输出数据传输到reduce任务所在节点的过程,叫做shuffle。下面是一个更通用的MapReduce数据流图:




当然,有些作业中我们可能根本不需要有reduce任务,所有工作在map任务并行执行完之后就完毕了,例如Hadoop提供的并行复制工作distcp,其内部实现就是采用一个只有Mapper,没有Reducer的MapReduce作业,在map完成文件复制之后作业就完成了,如下图所示:




在上面计算最高天气的例子中,每个map将每一条记录所产生的(年份,温度)记录都shuffle到reduce节点,当数据量较大时,将占用很多带宽,耗费很长时间。事实上,可以在map任务所在的节点上做更多工作。map任务运行完之后,可以把所有结果按年份分组,并统计出每一年的最高温度(类似于sql中的selectmax(temperature)fromtablegroupbyyear),这个最高温度是局部的,只在本任务重产生的数据做比较。做完局部统计之后,将结果发送给reduce做最终的汇总,找出全局最高温度。过程示意图如下:




这么做之所以符合逻辑,是基于以下的事实:




max(0,20,10,25,15)=max(max(0,20,10),max(25,15))


符合上述性质的函数称为是commutative和associative,有时候也成为是distributive。如果是计算平均温度,则不能使用这一的方式。


上述的局部计算在Hadoop中使用Combiner来表示。为了在作业中使用Combiner,我们需要明确指定,在前面的例子中,可以直接使用Reducer作为Combiner,因为两者逻辑是一样的:




//设置Mapper和Reducer实现

job.setMapperClass(MaxTemperatureMapper.class);

job.setCombinerClass(MaxTemperatureReducer.class);

job.setReducerClass(MaxTemperatureReducer.class);




4.HadoopStreaming


Hadoop完全允许我们使用Java以外的语言来编写map和reduce函数。HadoopStreaming使用Unix标准流作为Hadoop和其他应用程序的接口。数据流的大致示意图如下:




整个数据在HadoopMapReduce与Ruby应用、标准输入输出之间流转,因此叫Streaming。我们继续使用前面气温的例子来说明,先使用ruby来编写map和reduce,然后使用unix的管道来模拟整个过程,最后迁移到Hadoop上运行。


Ruby版本的map函数从标准流中读取数据,运算后将结果输出到标准输出流:




#!/usr/bin/ruby

STDIN.each_linedo|line|

val=line

year,temp,q=val[15,4],val[87,5],val[92,1]

puts"#{year}\t#{temp}"if(temp!="+9999"&&q=~/[01459]/)

end


逻辑与Java版本完全一样,STDIN是ruby的标准输入,each_line针对每一行进行操作,逻辑封装在do和end之间。puts是ruby标准输出函数,打印tab分割的记录到标准输出流。


因为这个脚本与标准输入输出交互,所以很容易结合linux的管道来测试:




catinput/ncdc/sample.txt|rubymax_temp_map.rb




一样用ruby脚本来完成reduce的功能:




last_key,max_val=nil,-1000000

STDIN.each_linedo|line|

key,val=line.split("\t")

iflast_key&&last_key!=key

puts"#{last_key}\t#{max_val}"

last_key,max_val=key,val.to_i

else

last_key,max_val=key,[max_val,val.to_i].max

end

end

#处理最后一个key的输出

put"#{last_key}\t#{max_val}"iflast_key


map处理完之后,同一个key的一组键值对中,value是排序的,所以当前读到的key如果不同于上一个key,表示这个key的所有值都处理完了(前文提到会在切换key之前reset输入)。我们使用sort命令来替代MapReduce中的排序过程,把map的标准输出作为sort的输入,sort通过管道连接到map:




cat/home/data/hadoop_book_input/ncdc/sample.txt|rubymax_temp_map.rb|sort|rubymax_temp_reduce.rb


输出结果如下图,与前文完全一致。




很好,我们在Hadoop上运行这个作业。非Java语言的MapReduce作业,需要使用HadoopStreaming来运行。HadoopStreaming会负责作业的Task分解,把输入数据作为标准输入流传递给Ruby写的map脚本,并接受来自map脚本的标准输出,排序后shuffle到reduce节点上,并以标准输入传递给reduce,最后把reduce的标准输出保存到HDFS文件中。


我们使用hadoopjar命令,同时指定输入输出目录,脚本位置等。




hadoopjar/home/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar-filesmax_temp_map.rb,max_temp_reduce.rb-input/input/ncdc/sample.txt-output/output/max-tem-ruby-mappermax_temp_map.rb-reducermax_temp_reduce.rb


-file参数把这些文件上传到集群中。注意map和reduce脚本需要在CLASSPATH下,我是在当前目录下运行的,默认加入到类路径中。另外请确保集群中的所有机器都安装了ruby,否则可能出现类似subprocessfailedwithcode127。这里的输出文件是/outp/max-tem-ruby,MapReduce不允许多个作业输出到同一个目录。


查看输出文件,与Java版本完全一致。OK,我们设置combiner,然后在大的数据集上感受一下:




hadoopjar/home/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar-filesmax_temp_map.rb,max_temp_reduce.rb-input/input/ncdc/all-output/output/max-tem-all-mappermax_temp_map.rb-combinerx_temp_reduce.rb-reducermax_temp_reduce.rb


计算结果:




map和reduce也一样可以用Python来实现,用与Ruby一样的方式来运行,这里不多介绍。




参考


本文主要内容来自《Hadoop权威指南》,感谢作者的优秀书籍。

相关问题推荐

  • 回答 3

    换行。比如,print hello\nworld效果就是helloworld\n就是一个换行符。\是转义的意思,'\n'是换行,'\t'是tab,'\\'是,\ 是在编写程序中句子太长百,人为换行后加上\但print出来是一整行。...

  • 回答 42

    十种常见排序算法一般分为以下几种:(1)非线性时间比较类排序:a. 交换类排序(快速排序、冒泡排序)b. 插入类排序(简单插入排序、希尔排序)c. 选择类排序(简单选择排序、堆排序)d. 归并排序(二路归并排序、多路归并排序)(2)线性时间非比较类排序:...

  • 回答 70
    已采纳

    前景很好,中国正在产业升级,工业机器人和人工智能方面都会是强烈的热点,而且正好是在3~5年以后的时间。难度,肯定高,要求你有创新的思维能力,高数中的微积分、数列等等必须得非常好,软件编程(基础的应用最广泛的语言:C/C++)必须得很好,微电子(数字电...

  • 回答 28

    迭代器与生成器的区别:(1)生成器:生成器本质上就是一个函数,它记住了上一次返回时在函数体中的位置。对生成器函数的第二次(或第n次)调用,跳转到函数上一次挂起的位置。而且记录了程序执行的上下文。生成器不仅记住了它的数据状态,生成器还记住了程序...

  • 回答 9

    python中title( )属于python中字符串函数,返回’标题化‘的字符串,就是单词的开头为大写,其余为小写

  • 回答 6

    第一种解释:代码中的cnt是count的简称,一种电脑计算机内部的数学函数的名字,在Excel办公软件中计算参数列表中的数字项的个数;在数据库( sq| server或者access )中可以用来统计符合条件的数据条数。函数COUNT在计数时,将把数值型的数字计算进去;但是...

  • 回答 1

    head是方法,所以需要取小括号,即dataset.head()显示的则是前5行。data[:, :-1]和data[:, -1]。另外,如果想通过位置取数据,请使用iloc,即dataset.iloc[:, :-1]和dataset.iloc[:, -1],前者表示的是取所有行,但不包括最后一列的数据,结果是个DataFrame。...

  • Python入门简单吗2021-09-23 13:21
    回答 45

    挺简单的,其实课程内容没有我们想象的那么难、像我之前同学,完全零基础,培训了半年,直接出来就工作了,人家还在北京大公司上班,一个月15k,实力老厉害了

  • 回答 4

    Python针对众多的类型,提供了众多的内建函数来处理(内建是相对于导入import来说的,后面学习到包package时,将会介绍),这些内建函数功用在于其往往可对多种类型对象进行类似的操作,即多种类型对象的共有的操作;如果某种操作只对特殊的某一类对象可行,Pyt...

  • 回答 8

     相当于 ... 这里不是注释

  • 回答 4

    还有FIXME

  • 回答 3

    python的两个库:xlrd和xlutils。 xlrd打开excel,但是打开的excel并不能直接写入数据,需要用xlutils主要是复制一份出来,实现后续的写入功能。

  • 回答 8

    单行注释:Python中的单行注释一般是以#开头的,#右边的文字都会被当做解释说明的内容,不会被当做执行的程序。为了保证代码的可读性,一般会在#后面加一两个空格然后在编写解释内容。示例:#  单行注释print(hello world)注释可以放在代码上面也可以放在代...

  • 回答 2

    主要是按行读取,然后就是写出判断逻辑来勘测行是否为注视行,空行,编码行其他的:import linecachefile=open('3_2.txt','r')linecount=len(file.readlines())linecache.getline('3_2.txt',linecount)这样做的过程中发现一个问题,...

  • 回答 4

    或许是里面有没被注释的代码

  • 回答 26

    自学的话要看个人情况,可以先在B站找一下视频看一下

没有解决我的问题,去提问