整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第六章 MapReduce的工作机制 6.1. 剖析MapReduce作业运行机制

hadoop 小红牛 10℃ 0评论

MapReduce的工作机制

在本章中,我们将深入学习Hadoop中的MapReduce工作机制。这些知识 将为我们随后两章学习编写MapReduce高级编程奠定基础

6.1.1. 经典的 MapReduce (MapReduce 1)

6-1解释了作业在经典的MapReduce中运行的工作原理。最顶层包含4个独立的实体:

•客户端,提交MapReduce作业

jobtracker,协调作业的运行。jobtracker是一个Java应用程序,它 的主类是]obTracker

tasktracker,运行作业划分后的任务。tasktrackerJava应用程 序,它的主类是TaskTracker

•分布式文件系统(一般为HDFS,参见第3),用来在其他实体间 共享作业文件

 

1. 作业的提交

Jobsubmit()方法创建一个内部的JobSummiter实例,并且调用其 submitDobInternal()方法(参见图6-1的步骤1)。提交作业后, waitForCompletion()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功就显示作业计数器; 如果失败则导致作业失败的错误被记录到控制台。

blob.png 

6-1. Hadoop运行MapReduce作业的工作原理

]obSummiter所实现的作业提交过程如下所述。

• 向jobtracker请求一个新的作业ID(通过调用]obTrackergetNew]obId()方法获取)。参见步骤2

•检査作业的输出说明。例如,如果没有指定输出目录或输出目录已 经存在,作业就不提交,错误抛回给MapReduce程序。

•计算作业的输入分片。jP果分片无法计算,比如因为输入路径不存 在,作业就不提交,错误返回给MapReduce程序。

•将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所 得的输入分片)复制到一个以作业ID命名的目录下jobtracker的文 件系统中。作业JAR的副本较多(由mapred. submit, replication属性控制,默认值为10),因此在运行作业的任务 时,集群中有很多个副本可供tasktracker访问。参见步骤3

• 告知jobtracker作业准备执行(通过调用]obTrackersubmit]ob()方法实现)。参见步骤4

 

2. 作业的初始化

]obTracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度器(job scheduler)进行调度,并对其进行初 始化。初始化包栝创建一个表示正在运行作业的对象,用于封装任务和记录 信息,以便跟踪任务的状态和进程(步骤5)

为了创建任务运行列表,作业调度程器首先从共享文件系统中获取客户端 已经计算好的输入分片(步骤6)。然后为每个分片创建一个map任务。创建 的reduce任务的数量由]obmapred.reduce.tasks属性决定,它是用 setNumReduceTasks()方法来设置的,然后调度器创建相应数量的要运行的reduce任务。任务在此时被指定ID

除了Map任务和Reudce任务,还会创建两个任务:作业创建和作业清理。 这两个任务在TaskTracker中执行,在map任务运行之前运行代码来创建作 业,并且在所有reduce任务完成之后完成清理工作。配置项 OutputCommitter属性能设置运行的代码,默认值是FileOutputCommitter。 作业创建伤务为作业创建输出路径和临时工作空间。作业清理清除作业运 行过程中的临时目录。提交机制在6.5.3节有详细描述。

3. 任务的分配

tasktracker运行一个简单的循环来定期发送“心跳”(heartbeat)jobtracker。 “心跳”向jobtracker表明tasktracker是否还存活,同时也充当 两者之间的消息通道。作为“心跳”的一部分,taslctracke;•会指明它是否已 经准备好运行新的任务,如果是,jobtracker会为它分配一个任务,并使用 “心跳”的返回值与tasktracker进行通信(步骤7)

jobtrackertasktracker选择任务之前,jobtracker必须先选定任务所在 的作业。本章后面将介绍各种调度算法(详见6.3),但是默认方法是简单 维护一个作业优先级列表。一旦选择好作业,jobtracker就可以为该作业选定一个任务。

对于map任务和reduce任务,tasktracker有固定数量的任务槽,两者是独立设置的。例如,一个tasktracker可能可以同时运行两个map任务和两个 reduce任务。准确数量由tasktracker核的数量和内存大小来决定,在讨论内存时对此有所涉及。默认调度器在处理reduce任务槽之前,会填满空闲的map任务槽,因此,如果tasktracker至少有一个闲置的map任务 槽,jobtracker会为它选择一个map任务,否则选择一个reduce任务。

为了选择reduce任务,jobtracker从待运行的reduce任务列表中选取下一个 来执行,用不着考虑数据的本地化。然而,对干map任务,jobtracker会考 虑tasktracker的网络位置,并选取一个距离其输入分片文件最近的 tasktracker。在最理想的情况下,任务是数据本地化的(data-local),也就是 任务运行在输入分片所在的节点上。同样,任务也可能是机架本地化的 (rack-local):任务和输入分片在同一个机架,但不在同一节点上。一些任务 既不是数据本地化的,也不是机架本地化的,而是从与它们自身运行的不 同机架上检索数据。可以通过査看作业的计数器得知每类任务的比例。

4. 任务的执行

现在,tasktracker已经被分配了一个任务,下一步是运行该任务。第一步, 通过从共享文件系统把作业的JAR文件复制到tasktracker所在的文件系 统,从而实现作业的JAR文件本地化。同时,tasktracker将应用程序所需 要的全部文件从分布式缓存(详见8.4.2)复制到本地磁盘(步骤8)。第二 步,tasktracker为任务新建一个本地工作目录,并把JAR文件中的内容解 压到这个文件夹下。第三步,tasktracker新建一个TaskRunner实例来运行 该任务。

TaskRunner启动一个新的JVM(步骤9)来运行每个任务(步骤10),以便用 户定义的mapreduce函数的任何软件问题都不会影响到tasktracker(例如 导致崩溃或挂起等)。但在不同的任务之间重用JVM还是可能的,详 见 6.5.4 )

子进程通过umbliical接口与父进程进行通信。任务的子进程每隔几秒便告 知父进程它的进度,直到任务完成。

每个任务都能够执行搭建(setup)和清理(cleanup)动作,它们和任务本身在同 一个JVM中运行,并由作业的OutputCommitter确定。清理动作用于提交任务,这在基于文件的作业中意味着它的输出写到该任务的 最终位置。提交协议确保当推理执行(speculative execution)可用时(参阅 6.5.2),只有一个任务副本被提交,其他的都将取消。

至乎StreamingPipes,它们都运行特殊的map任务和reduce任务,目的是运行用户提供的可执行程序,并与之通信(参见图6-2)

blob.png 

6-2执行的StreamingPipestasktracker及其子进程的关系

Streaming中,Streaming任务使用标准输入和输出流与进程(可以用任何语言编写)进行通信。另一方面,Pipes任务则监听套接字(socket),发送
其环境中的一个端口号给C++进程,如此一来,在开始时,C++进程即可建 立一个与其父Java
Pipes任务的持久化套接字连接(persistent socket connection)。

在这两种情况下,在任务执行过程中,Java进程都会把输入键/值对传给外 部的进程,后者通过用户定义的map或reduce函数来执行它并把输出的键/ 值对传回Java进程。从tasktracker的角度看,就像tasktracker的子进程自 己在处理map或reduce代码一样。

5. 进度和状态的更新

MapReduce作业是长时间运行的批量作业,运行时间范围从数分钟到数小
时。这是一个很长的时间段,所以对于用户而言,能够得知作业进展是很 重要的。一个作业和它的每个任务都有一个状态(status),包括:作业或任
务的状态(比如,运行状态,成功完成,失畋状态)、map和reduce的进度、
作业计数器的值、状态消息或描述(可以由用户代码来设置)。这些状态信息 在作业期间不断改变,它们是如何与客户端通信的呢?

任务在运行时,对其进度(progress,即任务完成百分比)保持追踪。对map
任务,任务进度是已处理输入所占的比例。对reduce任务,情况稍微有点 复杂,但系统仍然会估计已处理reduce输人的比例。整个过程分成三部
分,与shuffle的三个阶段相对应(详情参见6.4节)。比如,如果任务已经执 行reducer
—半的输入,那么任务的进度便是5/6。因为已经完成复制和排 序阶段(每个占1/3),并且已经完成reduce阶段的一半(1/6)。

MapReduce中进度的组成

进度并不总是可测量的,但是无论如何,它能告诉Hadoop有个任务正 在运行。比如,写输出记录的任务也可以表示成进度,尽管它不能用总的需要写的百分比这样的数字来表示,因为即使通过任务来产生输出, 也无法知道后面的情况。

进度报告很重要。构成进度的所有操作如下:

•读入一条输入记录(在mapper或reducer中)

•写入一条输出记录(在mapper或reducer中)

•在一个Reporter中设爲状态描述(使用Reporter的setStatus()方法)

•增加计数器的值(使用Reporter的incrCounter()方法)

•调用 Reporter 的 progress()任务

 

任务也有一组计数器,负责对任务运行过程中各个事件进行计数(详情参 见2.3.1节),这些计数器要么内置于框架中,例如已写入的map输出记录 数,要么由用户自己定义。

如果任务报告了进度,就设置一个标志以表明状态变化将被发送到tasktracker。有一个独立的线程每隔3秒钟就检査一次此标志,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒钟就发送 “心跳”到jobtraCker(5秒钟这个间隔是最小值,因为“心跳”间隔是实际
上由集群的大小来决定的:对于一个更大的集群,间隔会更长一些),#且
由tasktracker运行的所有任务的状态都会在调用中被发送至jobtracker。计
数器的发送间隔通常大于5秒,因为计数器占用的带宽相对较高。

jobtracker将这些更新合并起来,产生一个表明所有运行作业及其所含任务 状态的全局视图。最后,正如前面提到的,JobClient通过每秒査询 jobtracker来接收最新状态。客户端也可以使用]ob的getStatus()方法来得到一个JobStatus的实例,后者包含作业的所有状态信息。

6-3对方法调用进行了图解。

图片.png
 

6-3.状态更新在MapReduce 1系统中的传递流程

 

6. 作业的完成

jobtracker收到作业最后一个任务已完成的通知后(这是一个特定的作业清理任务),便把作业的状态设置为“成功”。然后,在]ob査询状态时,便知道任务已成功完成,于是]ob打印一条消息告知用户,然后从waitForCompletion()方法返回。Job的统计信息和计数值也在这时输出到控制台。

如果jobtracker有相应的设置,也会发送一个HTTP作业通知。希望收到 回调指令的客户端可以通过job.end.notification.url属性来进行这项设置。

最后,jobtracker清空作业的工作状态,指示tasktracker也清空作业的工作状态(如删除中间输出)。

 

转载请注明:全栈大数据 » 第六章 MapReduce的工作机制 6.1. 剖析MapReduce作业运行机制

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址