mapreduce的job流程

2020-08-25 20:33发布

3条回答
D滴滴
2楼 · 2020-08-25 20:38

1、 client提交任务,运行一个job

2、 client向jobtracker索取一个新的jobid

3、 client将所有信息上传到分布式系统中。

4、 client通知jobtracker提交任务完毕。

5、 jobtracker将任务初始化,其实就是将任务加入到任务队列中排队。直到运行到这个任务。

6、 jobtracker查看client提交的任务处理的输入是否合法,并作分割。

7、 通知含有数据的tasktracker进行任务。

8、 tasktracker接扫任务后,将client提交的资料从分布式系统中拿出来,启动虚拟机执行任务,其中可以是map任务,也可以是reduce任务,执行map任务的是map task,执行reduce任务的是reduce task。

9、 map任务执行完的中间结果存到内存和硬盘两部分,reduce会通过查看map的执行地点,主动来拿数据,并且做reduce任务。

10、 上述流程结束后,就是真正map和reduce代码的执行流程了。


无需指教
3楼 · 2020-08-26 08:36

MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用使用Map和Reduce函数,所以对其整体的计算过程不是太清楚,同时MapReduce1.0和MapReduce2.0在网上有很多人混淆。

MapReduce1.0运行模型20170730014216035.png Input

Input但是输入文件的存储位置,

但是注意这里并一定是一些博客说的当然是HDFS似的分布式文件系统位置,默认是HDFS文件系统,当然也可以修改。

,它也可以是本机上的文件位置。

我们来仔细分析下input8aab5880-d171-30f7-91d6-aaacba2d03ce.jpg

首先我们知道要和JobTracker打交道是离不开JobClient这个接口的,就如上图所示,

然后JobClient中的Run方法 会让 JobClient 把所有 Hadoop Job 的信息,比如 mapper reducer jar path, mapper / reducer class name, 输入文件的路径等等,告诉给 JobTracker,如下面的代码所示:

public int run(String[] args) throws Exception { //create job Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); // set run jar class job.setJarByClass(this.getClass()); // set input . output FileInputFormat.addInputPath(job, new Path(PropReader.Reader("arg1"))); FileOutputFormat.setOutputPath(job, new Path(PropReader.Reader("arg2"))); // set map job.setMapperClass(HFile2TabMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // set reduce job.setReducerClass(PutSortReducer.class); return 0; }

除此以外,JobClient.runJob() 还会做一件事,使用 InputFormat类去计算如何把 input 文件 分割成一份一份,然后交给 mapper 处理。inputformat.getSplit() 函数返回一个 InputSplit 的 List, 每一个 InputSplit 就是一个 mapper 需要处理的数据。

一个 Hadoop Job的 input 既可以是一个很大的 file, 也可以是多个 file; 无论怎样,getSplit() 都会计算如何分割 input.

如果是HDFS文件系统,我们都知道其可以通过将文件分割为block的形式存放在很多台电脑上,使其可以存放很大的文件。那么Mapper是如何确定一个HDFS文件中的block存放哪几台电脑,有什么数据?

inputFormat它实际上是个 interface, 需要 类 来继承,提供分割 input 的逻辑。

Jobclient 有一个方法叫 setInputFormat(), 通过它,我们可以告诉 JobTracker 想要使用的 InputFormat 类 是什么。如果我们不设置,Hadoop默认的是 TextInputFormat, 它默认为文件在 HDFS上的每一个 Block 生成一个对应的 InputSplit. 所以大家使用 Hadoop 时,也可以编写自己的 input format, 这样可以自由的选择分割 input 的算法,甚至处理存储在 HDFS 之外的数据。

JobTracker 尽量把 mapper 安排在离它要处理的数据比较近的机器上,以便 mapper 从本机读取数据,节省网络传输时间。具体实现是如何实现?

对于每个 map任务, 我们知道它的 split 包含的数据所在的主机位置,我们就把 mapper 安排在那个相应的主机上好了,至少是比较近的host. 你可能会问:split 里存储的 主机位置是 HDFS 存数据的主机,和 MapReduce 的主机 有什么相关呢?为了达到数据本地性,其实通常把MapReduce 和 HDFS 部署在同一组主机上。

既然一个 InputSplit 对应一个 map任务, 那么当 map 任务收到它所处理数据的位置信息,它就可以从 HDFS 读取这些数据了。

接下来我们再从map函数看Input

map函数接受的是一个 key value 对。

实际上,Hadoop 会把每个 mapper 的输入数据再次分割,分割成一个个 key-value对, 然后为每一个 key-value对,调用Map函数一次. 为了这一步分割,Hadoop 使用到另一个类: RecordReader. 它主要的方法是 next(), 作用就是从 InputSplit 读出一条 key-value对.

RecordReader 可以被定义在每个 InputFormat 类中。当我们通过 JobClient.setInputFormat() 告诉 Hadoop inputFormat 类名称的时候, RecordReader 的定义也一并被传递过来。

所以整个Input,

1.JobClient输入输入文件的存储位置

2.JobClient通过InputFormat接口可以设置分割的逻辑,默认是按HDFS文件分割。

3.Hadoop把文件再次分割为key-value对。

4.JobTracker负责分配对应的分割块由对应的maper处理,同时 RecordReader负责读取key-value对值。

Mapper

JobClient运行后获得所需的配置文件和客户端计算所得的输入划分信息。并将这些信息都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);

然后输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。

JobTracker通过TaskTracker 向其汇报的心跳情况和slot(情况),每一个slot可以接受一个map任务,这样为了每一台机器map任务的平均分配,JobTracker会接受每一个TaskTracker所监控的slot情况。

JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行,分配时根据slot的情况作为标准。

TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。

Map通过 RecordReader 读取Input的key/value对,map根据用户自定义的任务,运行完毕后,产生另外一系列 key/value,并将其写入到Hadoop的内存缓冲取中,在内存缓冲区中的key/value对按key排序,此时会按照reduce partition进行,分到不同partition中,一旦内存满就会被写入到本地磁盘的文件里,这个文件叫spill file。shuffle

Shuffle是我们不需要编写的模块,但却是十分关键的模块。

4df193f5-e56e-308f-9689-eac035dd8a2b.png

在map中,每个 map 函数会输出一组 key/value对, Shuffle 阶段需要从所有 map主机上把相同的 key 的 key value对组合在一起,(也就是这里省去的Combiner阶段)组合后传给 reduce主机, 作为输入进入 reduce函数里。

Partitioner组件 负责计算哪些 key 应当被放到同一个 reduce 里

HashPartitioner类,它会把 key 放进一个 hash函数里,然后得到结果。如果两个 key 的哈希值 一样,他们的 key/value对 就被放到同一个 reduce 函数里。我们也把分配到同一个 reduce函数里的 key /value对 叫做一个reduce partition.

我们看到 hash 函数最终产生多少不同的结果, 这个 Hadoop job 就会有多少个 reduce partition/reduce 函数,这些 reduce函数最终被JobTracker 分配到负责 reduce 的主机上,进行处理。

我们知道map阶段可能会产生多个spill file 当 Map 结束时,这些 spill file 会被 merge 起来,不是 merge 成一个 file,而是也会按 reduce partition 分成多个。

当 Map tasks 成功结束时,他们会通知负责的 tasktracker, 然后消息通过 jobtracker 的 heartbeat 传给 jobtracker. 这样,对于每一个 job, jobtracker 知道 map output 和 map tasks 的关联。Reducer 内部有一个 thread 负责定期向 jobtracker 询问 map output 的位置,直到 reducer 得到所有它需要处理的 map output 的位置。

Reducer 的另一个 thread 会把拷贝过来的 map output file merge 成更大的 file. 如果 map task 被 configure 成需要对 map output 进行压缩,那 reduce 还要对 map 结果进行解压缩。当一个 reduce task 所有的 map output 都被拷贝到一个它的 host上时,reduce 就要开始对他们排序了。

排序并不是一次把所有 file 都排序,而是分几轮。每轮过后产生一个结果,然后再对结果排序。最后一轮就不用产生排序结果了,而是直接向 reduce 提供输入。这时,用户提供的 reduce函数 就可以被调用了。输入就是 map 任务 产生的 key value对.

同时reduce任务并不是在map任务完全结束后才开始的,Map 任务有可能在不同时间结束,所以 reduce 任务没必要等所有 map任务 都结束才开始。事实上,每个 reduce任务有一些 threads 专门负责从 map主机复制 map 输出(默认是5个)。Reduce e1090dee-ee98-30d1-ad55-2f88f774fa73.jpg

reduce() 函数以 key 及对应的 value 列表作为输入,按照用户自己的程序逻辑,经合并 key 相同的 value 值后,产 生另外一系列 key/value 对作为最终输出写入 HDFS。

一定要注意以上为MapReduce1.0的过程,而且现在MapReduce已经升级到了2.0版本,具体2.0的工作流程可参考:

Yarn框架深入理解

但是并不意味着MapReduce1.0被淘汰,在Yarn中的MRYarnClild模块中基本上是是采用MapReduce1.0的解决思路,MRv2 具有与 MRv1 相同的编程模型和数据处理引擎,唯一不同的是运行时环境。MRv2 是在 MRv1 基础上经加工之后,运行于资源管理框架 YARN 之上的计算框架 MapReduce。 它的运行时环境不再由 JobTracker 和 TaskTracker 等服务组成,而是变为通用资源管理 系统 YARN 和作业控制进程 ApplicationMaster,其中,YARN 负责资源管理和调度,而 ApplicationMaster 仅负责一个作业的管理。简言之,MRv1 仅是一个独立的离线计算框架, 而 MRv2 则是运行于 YARN 之上的 MapReduce。


十一郎
4楼 · 2020-08-26 09:08

 我们用文字概括如下:

1、  客户端提交job到resourcemanager(rm)

2、 rm将其放到等待队列,返回jobid和文件路径信息

3、  客户端将所需要计算的资源,上传到hdfs上(包括job信息和分片信息)的存储路径

4、  客户端给rm返回一个资源准备好的信息,job放入等待队列,告诉他可以启动job,等待rm进行调度

5、 rm在调度之前,申请一个资源nodemanager(nm),nm启动container,它接收到任务到hdfs上将资源获取到container,然后跟客户端交互已经得到需要计算的资源,客户端向其发送启动applicationmaster(am)的命令

6、 am启动起来后,通过解析分片信息向rm申请运算资源(maptask)

7、 rm收到信息查看nm资源情况,通过负载均衡分配所需要的机器,nm每一次心跳都会从job的描述信息查询自己所分配到的任务,接收到任务消息的机器会从hdfs上拿取计算资源,然后跟am交互,am发送启动maptask的命令。

8、 Maptask结束后,通知am,然后释放maptask资源,am向rm发出信息,申请reducetask的资源

9、 rm分配资源,am启动reducetask

10、 reducetask收集maptask完成的数据,启动reduce逻辑。执行完成后,通知am,然后释放reducetask的资源。am通知rm。am释放资源


相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问