整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第六章 MapReduce的工作机制

MapReduce的工作机制

在本章中,我们将深入学习Hadoop中的MapReduce工作机制。这些知识 将为我们随后两章学习编写MapReduce高级编程奠定基础。

1. 剖析MapReduce作业运行机制

可以通过一个简单的方法调用来运行MapRedece作业:]ob对象上的 submit()。注意,也可以调用waitFoKompletion(),它用于提交以前 没有提交过的作业,并等待它的完成®submit()方法调用封装了大量的 处理细节。本小节将揭示Hadoop运行作业时所采取的措施。

我们在第5章中看到Hadoop执行MapReduce程序的方法依赖干两个配置 设置。

在目前的版本以及0.20版本系列中,mapred. job.tracker决定了执行 MapReduce程序的方式。如果这个配置属性被设置为local(默认值),则使 用本地的作业运行器。运行器在单个JVM上运行整个作业。它被设计用来 在小的数据集上测试和运行MapReduce程序。

如果mapred.job.tracker被设置为用冒号分开的主机和端口对(主机:端 口),那么该配置属性就被解释为一个jobtmcker地址,运行器则将作业提 交给该地址的jobtracker。下一节将具体描述整个过程。

Hadoop 2.0引入了一种新的执行机制。这种新机制(称为MapReduce 2)建立 ①在老版本的 MapReduceAPI 中,可以调用]obClient.submit]ob(conf)]obClient.run]ob(conf)

在一个名为YARN®的系统上。目前,用于执行的 框架通过mapreduce.framework.name属性进行设置,值local表示本地的 作业运行器,“classic”表示经典的MapReduce框架(也称MapReduce 1,它使用一个jobtracker和多个tasktracker)yarn表示新的框架。新旧版本MapReduce API之间的区别不同于经典的MapReduce执行 框架和基干YARNMapReduce执行框架(分别是MapReduce 12)的区别。API具有面向用户的、客户端的特性,并且决定着用户怎 样写MapReduce程序,而执行机制只是运行MapReduce程序的不同 途径而已。新旧版本AP丨在MapReduce 12运行的四种组合都是 支持的。第1章的表1-2列出了 $同的Hadoop版本支持的组合 情况。

1.1. 经典的 MapReduce (MapReduce 1)

6-1解释了作业在经典的MapReduce中运行的工作原理。最顶层包含4个独立的实体:

•客户端,提交MapReduce作业

jobtracker,协调作业的运行。jobtracker是一个Java应用程序,它 的主类是]obTracker

tasktracker,运行作业划分后的任务。tasktrackerJava应用程 序,它的主类是TaskTracker

•分布式文件系统(一般为HDFS,参见第3),用来在其他实体间 共享作业文件

 

1.1.1. 作业的提交

Jobsubmit()方法创建一个内部的JobSummiter实例,并且调用其 submitDobInternal()方法(参见图6-1的步骤1)。提交作业后, waitForCompletion()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功就显示作业计数器; 如果失败则导致作业失败的错误被记录到控制台。

blob.png 

6-1. Hadoop运行MapReduce作业的工作原理

]obSummiter所实现的作业提交过程如下所述。

• 向jobtracker请求一个新的作业ID(通过调用]obTrackergetNew]obId()方法获取)。参见步骤2

•检査作业的输出说明。例如,如果没有指定输出目录或输出目录已 经存在,作业就不提交,错误抛回给MapReduce程序。

•计算作业的输入分片。jP果分片无法计算,比如因为输入路径不存 在,作业就不提交,错误返回给MapReduce程序。

•将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所 得的输入分片)复制到一个以作业ID命名的目录下jobtracker的文 件系统中。作业JAR的副本较多(由mapred. submit, replication属性控制,默认值为10),因此在运行作业的任务 时,集群中有很多个副本可供tasktracker访问。参见步骤3

• 告知jobtracker作业准备执行(通过调用]obTrackersubmit]ob()方法实现)。参见步骤4

 

1.1.2. 作业的初始化

]obTracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度器(job scheduler)进行调度,并对其进行初 始化。初始化包栝创建一个表示正在运行作业的对象,用于封装任务和记录 信息,以便跟踪任务的状态和进程(步骤5)

为了创建任务运行列表,作业调度程器首先从共享文件系统中获取客户端 已经计算好的输入分片(步骤6)。然后为每个分片创建一个map任务。创建 的reduce任务的数量由]obmapred.reduce.tasks属性决定,它是用 setNumReduceTasks()方法来设置的,然后调度器创建相应数量的要运行的reduce任务。任务在此时被指定ID

除了Map任务和Reudce任务,还会创建两个任务:作业创建和作业清理。 这两个任务在TaskTracker中执行,在map任务运行之前运行代码来创建作 业,并且在所有reduce任务完成之后完成清理工作。配置项 OutputCommitter属性能设置运行的代码,默认值是FileOutputCommitter。 作业创建伤务为作业创建输出路径和临时工作空间。作业清理清除作业运 行过程中的临时目录。提交机制在6.5.3节有详细描述。

1.1.3. 任务的分配

tasktracker运行一个简单的循环来定期发送“心跳”(heartbeat)jobtracker。 “心跳”向jobtracker表明tasktracker是否还存活,同时也充当 两者之间的消息通道。作为“心跳”的一部分,taslctracke;•会指明它是否已 经准备好运行新的任务,如果是,jobtracker会为它分配一个任务,并使用 “心跳”的返回值与tasktracker进行通信(步骤7)

jobtrackertasktracker选择任务之前,jobtracker必须先选定任务所在 的作业。本章后面将介绍各种调度算法(详见6.3),但是默认方法是简单 维护一个作业优先级列表。一旦选择好作业,jobtracker就可以为该作业选定一个任务。

对于map任务和reduce任务,tasktracker有固定数量的任务槽,两者是独立设置的。例如,一个tasktracker可能可以同时运行两个map任务和两个 reduce任务。准确数量由tasktracker核的数量和内存大小来决定,在讨论内存时对此有所涉及。默认调度器在处理reduce任务槽之前,会填满空闲的map任务槽,因此,如果tasktracker至少有一个闲置的map任务 槽,jobtracker会为它选择一个map任务,否则选择一个reduce任务。

为了选择reduce任务,jobtracker从待运行的reduce任务列表中选取下一个 来执行,用不着考虑数据的本地化。然而,对干map任务,jobtracker会考 虑tasktracker的网络位置,并选取一个距离其输入分片文件最近的 tasktracker。在最理想的情况下,任务是数据本地化的(data-local),也就是 任务运行在输入分片所在的节点上。同样,任务也可能是机架本地化的 (rack-local):任务和输入分片在同一个机架,但不在同一节点上。一些任务 既不是数据本地化的,也不是机架本地化的,而是从与它们自身运行的不 同机架上检索数据。可以通过査看作业的计数器得知每类任务的比例。

1.1.4. 任务的执行

现在,tasktracker已经被分配了一个任务,下一步是运行该任务。第一步, 通过从共享文件系统把作业的JAR文件复制到tasktracker所在的文件系 统,从而实现作业的JAR文件本地化。同时,tasktracker将应用程序所需 要的全部文件从分布式缓存(详见8.4.2)复制到本地磁盘(步骤8)。第二 步,tasktracker为任务新建一个本地工作目录,并把JAR文件中的内容解 压到这个文件夹下。第三步,tasktracker新建一个TaskRunner实例来运行 该任务。

TaskRunner启动一个新的JVM(步骤9)来运行每个任务(步骤10),以便用 户定义的mapreduce函数的任何软件问题都不会影响到tasktracker(例如 导致崩溃或挂起等)。但在不同的任务之间重用JVM还是可能的,详 见 6.5.4 )

子进程通过umbliical接口与父进程进行通信。任务的子进程每隔几秒便告 知父进程它的进度,直到任务完成。

每个任务都能够执行搭建(setup)和清理(cleanup)动作,它们和任务本身在同 一个JVM中运行,并由作业的OutputCommitter确定。清理动作用于提交任务,这在基于文件的作业中意味着它的输出写到该任务的 最终位置。提交协议确保当推理执行(speculative execution)可用时(参阅 6.5.2),只有一个任务副本被提交,其他的都将取消。

至乎StreamingPipes,它们都运行特殊的map任务和reduce任务,目的是运行用户提供的可执行程序,并与之通信(参见图6-2)

blob.png 

6-2执行的StreamingPipestasktracker及其子进程的关系

Streaming中,Streaming任务使用标准输入和输出流与进程(可以用任何语言编写)进行通信。另一方面,Pipes任务则监听套接字(socket),发送 其环境中的一个端口号给C++进程,如此一来,在开始时,C++进程即可建 立一个与其父Java Pipes任务的持久化套接字连接(persistent socket connection)。

在这两种情况下,在任务执行过程中,Java进程都会把输入键/值对传给外 部的进程,后者通过用户定义的map或reduce函数来执行它并把输出的键/ 值对传回Java进程。从tasktracker的角度看,就像tasktracker的子进程自 己在处理map或reduce代码一样。

1.1.5. 进度和状态的更新

MapReduce作业是长时间运行的批量作业,运行时间范围从数分钟到数小 时。这是一个很长的时间段,所以对于用户而言,能够得知作业进展是很 重要的。一个作业和它的每个任务都有一个状态(status),包括:作业或任 务的状态(比如,运行状态,成功完成,失畋状态)、map和reduce的进度、 作业计数器的值、状态消息或描述(可以由用户代码来设置)。这些状态信息 在作业期间不断改变,它们是如何与客户端通信的呢?

任务在运行时,对其进度(progress,即任务完成百分比)保持追踪。对map 任务,任务进度是已处理输入所占的比例。对reduce任务,情况稍微有点 复杂,但系统仍然会估计已处理reduce输人的比例。整个过程分成三部 分,与shuffle的三个阶段相对应(详情参见6.4节)。比如,如果任务已经执 行reducer —半的输入,那么任务的进度便是5/6。因为已经完成复制和排 序阶段(每个占1/3),并且已经完成reduce阶段的一半(1/6)。

MapReduce中进度的组成

进度并不总是可测量的,但是无论如何,它能告诉Hadoop有个任务正 在运行。比如,写输出记录的任务也可以表示成进度,尽管它不能用总的需要写的百分比这样的数字来表示,因为即使通过任务来产生输出, 也无法知道后面的情况。

进度报告很重要。构成进度的所有操作如下:

•读入一条输入记录(在mapper或reducer中)

•写入一条输出记录(在mapper或reducer中)

•在一个Reporter中设爲状态描述(使用Reporter的setStatus()方法)

•增加计数器的值(使用Reporter的incrCounter()方法)

•调用 Reporter 的 progress()任务

 

任务也有一组计数器,负责对任务运行过程中各个事件进行计数(详情参 见2.3.1节),这些计数器要么内置于框架中,例如已写入的map输出记录 数,要么由用户自己定义。

如果任务报告了进度,就设置一个标志以表明状态变化将被发送到tasktracker。有一个独立的线程每隔3秒钟就检査一次此标志,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒钟就发送 “心跳”到jobtraCker(5秒钟这个间隔是最小值,因为“心跳”间隔是实际 上由集群的大小来决定的:对于一个更大的集群,间隔会更长一些),#且 由tasktracker运行的所有任务的状态都会在调用中被发送至jobtracker。计 数器的发送间隔通常大于5秒,因为计数器占用的带宽相对较高。

jobtracker将这些更新合并起来,产生一个表明所有运行作业及其所含任务 状态的全局视图。最后,正如前面提到的,JobClient通过每秒査询 jobtracker来接收最新状态。客户端也可以使用]ob的getStatus()方法来得到一个JobStatus的实例,后者包含作业的所有状态信息。

6-3对方法调用进行了图解。

blob.png 

6-3.状态更新在MapReduce 1系统中的传递流程

 

1.1.6. 作业的完成

jobtracker收到作业最后一个任务已完成的通知后(这是一个特定的作业清理任务),便把作业的状态设置为“成功”。然后,在]ob査询状态时,便知道任务已成功完成,于是]ob打印一条消息告知用户,然后从waitForCompletion()方法返回。Job的统计信息和计数值也在这时输出到控制台。

如果jobtracker有相应的设置,也会发送一个HTTP作业通知。希望收到 回调指令的客户端可以通过job.end.notification.url属性来进行这项设置。

最后,jobtracker清空作业的工作状态,指示tasktracker也清空作业的工作状态(如删除中间输出)。

 

1.2. YARN (MapReduce 2)

对于节点数超出4000的大型集群,前一节描述的MapReduce系统开始面临 着扩展性的瓶颈。在2010年雅虎的一个团队开始设计下一代的MapReduce。 由此,YARN (Yet Another Resource Negotiator 的缩写或者为 YARN Application Resource Nefotiator 的缩写)应运而生。

YARNJobtracker的职能划分为多个独立的实体,从而改善了“经典的” MapReduce面临的扩展瓶颈问题。Jobtracker负责作业调度和任务进度监视,追踪任务、重启失败或过慢的任务和进行任务登记,例如维护计数器总数。

YARN将这两种角色划分为两个独立的守护进程:管理集群上资源使用的资源管理器和管理集群上运行任务生命周期的应用管理器®。基本思路 是:应用服务器与资源管理器协商集群的计算资源:容器0(每个容器都有 特定的内存上限),在这些容器上运行特定应用程序的进程。容器由集群节 点上运行的节点管理器®监视,以确保应用程序使用的资源不会超过分配给它的资源。

jobtracker不同,应用的每个实例(这里指一个MapReduce作业)有一个专 用的应用master,它运行在应用的运行期间。这种方式实际上和最初 GoogleMapReduce论文里介绍的方法很相似,该论文描述了 master进程如何协调在一组worker上运行的map任务和reduce任务。

如前所述,YARNMapReduce更具一般性,实际上MapReduce只是 YARN应用的一种形式。有很多其他的YARN应用(例如能够在集群中的一 组节点上运行脚本的分布式shell)以及其他正在开发的程序。YARN 设计的精妙之处在于不同的YARN应用可以在同一个集群上共存。例如,一个 MapReduce应用可以同时作为MPI应用运行。这大大提高了可管理性和集群的利用率。

此外,用户甚至有可能在同一个YARN集群上运行多个不同版本的 MapReduce,这使得MapReduce升级过程更容易管理。注意,MapReduce 的某些部分(如作业历史服务器和shuffle处理器)以及YARN本身仍然需要在整个集群上升级。

YARN上的MapReduce比经典的MapReduce包括更多的实体:

提交MapReduce作业的客户端

YARN资源管理器,负责协调集群上计算资源的分配

YARN节点管理器,负责启动和监视集群中机器上的计算容器 (container)

MapReduce应用程序master负责协调运行MapReduce作业的任 务。它和MapReduce任务在容器中运行,这些容器由资源管理器 分配并由节点管理器进行管理

分布式文件系统(一般为HDFS,参见第3),用来与其他实体间 共享作业文件

 

作业的运行过程如图6-4所示,并在接下来的小节中具体描述。

blob.png 

6-4. Hadoop使用YARN运行MapPeduce的过程

1.2.1. 作业提交

MapReduce 2中的作业提交是使用与MapReduce 1相同的用户APT(步骤1)MapReduce 2 实现了ClientProtocol,当mapreduce.framework.name设置为yam时启动。提交的过程与经典的非常相似。从资源管理器(而不是 jobtracker)获取新的作业ID,YARN命名法中它是一个应用程序ID(步骤2)。作业客户端检査作业的输出说明,计算输人分片(虽然有选项yarn.app.mapreduce.am.compute-splits-in-cluster在集群上来产生分片,这可以使具有多个分片的作业从中受益)并将作业资源(包括作业JAR、配置和分片信息) 复制到HDFS(步骤3)。最后,通过调用资源管理器上的submitApplication() 方法提交作业(步骤4)

1.2.2. 作业初始化

资源管理器收到调用它的submitApplication()消息后,便将请求传递给 调度器(scheduler)。调度器分配一个容器,然后资源管理器在节点管理器的 管理下在容器中启动应用程序的master进程(步骤5a5b)

MapReduce作业的application master是一个Java应用程序,它的主类 是MRAppMaster。它对作业进行初始化:通过创建多个簿记对象以保持对 作业进度的跟踪,因为它将接受来自任务的进度和完成报告(步骤6)。接下 来,它接受来自共享文件系统的在客户端计算的输入分片(步骤7)。对每一 个分片创建一个map任务对象以及由mapreduce. job.reduces属性确定 的多个reduce任务对象。

接下来,application master决定如何运行构成MapReduce作业的各个任 务。如果作业很小,就选择在与它同一个JVM上运行任务。

相对于在一个节点上顺序运行它们,判断在新的容器中分配和运行任务的 开销大于并行运行它们的开销时,就会发生这一情况。这不同于 MapReduce 1, MapReduce 1从不在单个tasktracker上运行小作业。这样的作业称为uberized,或者作为uber任务运行。

哪些任务是小任务?默认情况下,小任务就是小于10mapper且只有1 reducer且输入大小小于一个HDFS块的任务。(通过设置 mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces mapreduce.job.ubertask.maxbytes可以改变一个作业的上述值。)将 mapreduce. job.ubertask.enable 设置为 false 也可以完全使 wher 任务不 可用。

在任何任务运行之前,作业的setup方法为了设置作业的OutputCommitter 被调用来建立作业的输出目录。在MapReduce 1中,它在一个由tasktracker 运行的特殊任务中被调用,而在YARN执行框架中,该方法由应用程序 master直接调用。

 

1.2.3. 任务分配

如果作业不适合作为uber任务运行,那么application master就会为该作业 中的所有map任务和reduce任务向资源管理器请求容器(步骤8)。附着心跳 信息的请求包括毎个map任务的数据本地化信息,特别是输入分片所在的 主机和相应机架信息。调度器使用这些信息来做调度决策(jobtracker的 调度器一样)。理想情况下,它将任务分配到数据本地化的节点,但如果不 可能这样做,调度器就会相对于非本地化的分配优先使用机架本地化的分配。 请求也为任务指定了内存需求。在默认情况下,map任务和reduce任务都 分配到1024 MB的内存,但这可以通过mapr'educe.map.memory.mbmapreduce. reduce.memory.mb 来设置0 内存的分配方式不同于MapReduce 1,后者中tasktrackers有在集群配置时 设置的固定数量的槽,每个任务在一个槽上运行。槽有最大内存分配限 制,这对集群是固定的,导致当任务使用较少内存时无法充分利用内存(因 为其他等待的任务不能使用这些未使用的内存)以及由于任务不能获取足够 内存而导致作业失败。

YARN中,资源分为更细的粒度,所以可以避免上述问题。具体而言, 应用程序可以请求最小到最大限制范围的任意最小值倍数的内存容量。默 认的内存分配容量是调度器特定的,对于容量调度器,它的默认值最小值 是 1024 MB(yarn.scheduler.capacity.minimum-allocation-mb设置),默认的最大值是10240M8(yarn.scheduler.capacity.maximum-allocation-mb设置)。因此,任务可以通过适当设置mapreduce.map.memory.mbmapreduce.reduce.memory.mb来请求1 GB10 GB间的任意1 GB倍数的内存容量(调 度器在需要的时候使用最接近的倍数)

 

1.2.4. 任务执行

一旦资源管理器的调度器为任务分配了容器,application master就通过与节点管理器通信来启动容器(步骤9a9b)。该任务由主类为YarnChildJava应用程序执行。在它运行任务之前,首先将任务需要的资源本地化, 包括作业的配置、JAR文件和所有来自分布式缓存的文件(步骤10)。最后,运行map任务或reduce任务(步骤11)

StreamingPipes程序以MapReduce 1 的方式运行。YarnChild 启动 StreamingPipes进程,并通过分别使用标准的输入/输出或套接字与它们 通信,如图6-2所示(child和子进程在节点管理器上运行,而非 tasktracker)0

1.2.5. 进度和状态更新

YARN下运行时,任务每三秒钟通过umbilical接口向application master 汇报进度和状态(包含计数器),作为作业的汇聚视图(aggregate view)。这个 过程如图6-5所示。相比之下,MapReduce 1通过tasktrackerjobtracker 来实现进度更新。

blob.png 

6-5•在MapReduce 2系统中状态更新信息的传播

 

客户端每秒钟(通过 mapreduce. client. progressmonitor. poll interval 设置)査询一次application master以接收进度更新,通常都会向用户显示。

MapReduce1中,作业跟踪器的Web UI展示运行作业列表及其进度。在 YARN中,资源管理器的Web UI展示了正在运行的应用以及连接到的对应 application master,每个 application master 展示 MapReduce 作业的进度等进一步的细节。

1.2.6. 作业完成

除了向application master査询进度外,客户端每5秒钟还通过调用]obwaitForCompletion()来检査作业是否完成。査询的间隔可以通过mapreduce.client.completion.pollinterval 属性进行设置。

注意,通过HTTP回调(callback)来完成作业也是支持的,就像在 MapReduce 1 中一样。然而在 Mapreduce 2 中,回调由 application master初始化。

作业完成后,application master和任务容器清理其工作状态,OutputCommitter 的作业清理方法会被调用。作业历史服务器保存作业的信息供用户需要时査询。

 

2. 失败

实际情况是,用户代码错误不断,进程崩溃,机器故障,如此种种。使用 Hadoop最主要的好处之一是它能处理此类故障并让你能够完成作业。

2.1. 经典MapReduce中的失败

MapReduce 1运行时,主要考虑三种失败的模式:运行任务失败、tasktracker失败以及jobtracker失败。下面将逐一分析。

2.1.1. 任务运行失败

首先考虑子任务失败的情况。最常见的情况是map任务或reduce任务中的用户代码抛出运行异常。如果发生这种情况,子任务JVM进程会在退出之前印其父tasktracker发送错误报告。错误报告最后被记入用户日志。

tasktracker将此次任务任务尝试标记为failed(失败),释放任务槽运行另外一个任务。

对于Streaming任务,如果Streaming进程以非零退出代码退出,则被标记为failed。这种行为由 stream.non.zero.exit.is.failure 属性(默认 值为true)来控制。

另一种错误情况是子进程JVM突然退出,可能由于JVM软件缺陷而导致 MapReduce用户代码由于某些特殊原因造成JVM退出。在这种情况下, tasktracker会注意到进程已经退出并将此次任务尝试标记为failed(失 败)。

任务挂起的处理方式则有不同。一旦tasktracker注意到已经有一段时间没 有收到进度的更新,便会将任务标记为failed。在此之后,JVM子进程将 被杀死。05任务失败的超时间隔通常为10分钟,可以以作业为基础(或以集 群为基础)mapred.task.timeout属性设置为以毫秒为单位的值。 超时(timeout)设置为0将关闭超时判定,所以长时间运行的任务永远不会被 标记为failed。在这种情况下,被挂起的任务永远不会释放它的任务槽并 随着时间的推移最终降低整个集群的效率。因此,尽量避免这种设置,同 时充分确保每个任务能够定期汇报其进度。参见6.1.1节的补充材料 “MapReduce中进度的组成”。

jobtracker被告知一个任务尝试失败后(通过tasktracker的“心跳”调用), 将重新调度该任务的执行。jobtracker会尝试避免重新调度以前尝试失败的 tasktracker上的任务。此外,如果一个任务的失败次数超过4次,将不会再 重试。这个值是可以设置的:对于map任务,运行任务的最多尝试次数由 mapred.map.max.attempts属性控制|而对于reduce任务,则由 mapred.reduce.max.attempts属性控制。在默认情况下,如果任何任务 失败次数大于4(或最多尝试次数被配置为4),整个作业都会失败。

 

对于一些应用程序,我们不希望一旦有少数几个任务失败就中止运行整个 作业,因为即使有任务失败,作业的一些结果可能还是可用的。在这种情 况下,可以为作业设置在不触发作业失败的情况下允许任务失败的最大百 分比。map任务和reduce任务可以独立控制,分别通过mapred.max. map.failures.percent mapred.max.reduce.failures.percent 属性来设置。

任务尝试(task attempt)也是可以中止的(killed),这与失败不同。任务尝试可 以被中止是因为它是一个推测副本(相关详情可参见6.5.2节“推测执 行”),或因为它所处的tasktracker失败,导致jobtracker将它上面运行的 所有任务尝试标记为killed。被中止的任务尝试不会被计入任务运行尝试次 数(由 mapred.map.max.attempts mapredreduce.max.attempts 设 置),因为尝试中止并不是任务的过错。

用户也可以使用Web UI或命令行方式(输入hadoop job査看相应的选项) 来中止或取消任务尝试。也可以采用相同的机制来中止作业。

2.1.2. tasktracker 失败

tasktracker失败是另一种失败模式。如果一个tasktracker由于崩溃或运行过于缓慢而失败,会停止向jobtracker发送“心跳”(或很少发送“心跳”)。jobtracker会注意到已经停止发送“心跳”的tasktracker (假设它有10分钟没有接收到一个“心跳”。这个值由mapred.tasktracker.expiry.interval属性设置,以毫秒为单位)并将它从等待任务调度的tasktracker 池中移除。如果是未完成的作业,jobtracker会安排此tasktracker上已经运 行并成功完成的map任务重新运行,因为reduce任务无法访问它们的中间 输出结果(都存放在失败的tasktracker的本地文件系统上)。任何正在进行的 任务也都会被重新调度。

即使tasktracker没有失败,也可能被jobtracker列入黑名单。如果在一个特定的 tasktracker 上超过 4 (通过 mapre.max.tracker.failures 设置)来 自同一个作业的任务失败了,jobtracker就会将此记录为出错。如果 tasktracker上面的失败任务数超过最小阈值(默认值为4,mapred.max. tracker.blacklists设置)并远远高于集群中tasktracker的平均失败任 务数,就会被列入黑名单。

列入黑名单的tasktracker不再被分配任务,但会继续和jobtracker通信。随着错误期满(每天一次的比率)tasktracker有机会再次运行作业。另外,如 果有可修复的潜在错误(比如替换硬件),在tasktracker重启并重新加入集群后,它将从jobtracker的黑名单中移出。

2.1.3. jobtracker 失败

在所有失败中,jobtracker失败是最严重的。目前,Hadoop没有处理jobtracker失败的机制一一它是一个单点故障——因此在这种情况下,作业的执行注定是失败的。然而,这种失败发生的概率很小,因为具体某台机 器失败的几率很小。YARN进一步改善了这种情况,它的设计目标之一是 消除MapReduce中单点失败的可能性。

jobtracker重启后,在它停止时运行的所有士业都需要再次提交。配置选项 mapred.jotracker. restart. recover(默认为关闭)可尝试恢复所有运行作业,但它还不太可靠,因此建议不要用。

 

2.2. YARN中的失败

对于在YARN中运行的MapReduce程序,需要考虑一下几种实体的失败: 任务、application master、节点管理器和资源管理器。

2.2.1. 任务运行失败

任务运行失败类似于经典的情况。JVM的运行时异常和突然退出被反馈给 applitation master,该任务尝试被标记为失败。类似的,通过在umbilical channe丨上的 ping 缺失(由 mapreduce.task.time 设定超时值),applitation master会注意到挂起的任务,任务尝试再次被标记为失败。

确定任务什么时候失败的配置属性和经典情况一样:4次尝试后任务标记为失败(map任务的由mapreduce.map.maxattemps设置,reducer任务的由mapreduce.reduce.maxattempts设置)。如果一个作业中超过mapreduce.map.failures.maxpercent map 任务或超过 mapreduce. reduce.failures.maxpercentreduce任务运行失败,那么整个作业就失败了。

2.2.2. application master 运行失败

YARN中的应用程序在运行失败的时候有几次尝试机会,就像MapReduce任务在遇到硬件或网络故障时要进行几次尝试一样。在默认情况下,只要应用程序运行失败一次就会被标记为失败,但我们可以设置yarn.resourcetnanager.am.max-retries属性增加允许失败的次数。

application master向资源管理器发送周期性的心跳,当application master发生故障时,资源管理器将检测到该故障并在一个新的容器(由节点管理器管 理)中开始一个新的master实例。Mapreduce application master可以恢复故障应用程序所运行任务的状态,使其不必重新运行。默认情况下是不能恢复的,因此故障application master将重新运行它们的所有任务,但我们可以设置 yarn.app.mapneduce.am.job. recovery.enable true,启用这个功能。

客户端向application master轮询进度报告,如果它的application master运行失败,客户端就需要定位新的实例。在作业初始化期间,客户端向资源管理器询问并缓存application master的地址,使其每次需要向application master査询时不必重载资源管理器。但是,如果application master运行失败,客户端就会在发出状态更新请求时超时,这时客户端会返回资源管理器请求新的application master的地址。

2.2.3. 节点管理器运行失败

如果节点管理器失败,就会停止向资源管理器发送心跳信息并被移出可用 节点资源管理器池。默认值为600000(10分钟)的属性yarn.resourcemanager. nm. liveness-monitor. expiry-interval-ms 决定着资源管理器认为节 点管理器失败之前的等待时间。

在故障节点管理器上运行的所有任务或application master都用前两节描述的机制进行恢复。

如果应用程序的运行失败次数高,那么节点管理器可能会被拉黑。由 application master管理黑名单,对于MapReduce,如果一个节点管理器上有 超过三个任务失败,application master就会尽量将任务调度到不同的节点 上〇 用户可以通过 mapneduce.job.maxtaskfailunes.per.tracker 设 置该阈值。

 

2.2.4. 资源管理器运行失败

资源管理器失败是非常严重的问题,没有资源管理器,作业和任务容器将无法启动。资源管理器的设计从一开始就通过使用检査点机制将其状态保存到持久性存储,从而实现从失败中恢复,不过在本书写作的时候,最新 版本还没有完全实现该功能。

在资源管理器失败后,由管理员启动一个新的资源管理器实例并恢复到保 存的状态。状态由系统中的节点管理器和运行的应用程序组成。(注意,任务并非资源管理器状态的组成部分,因为它们由application master管理。 因此,存储的状态数量比jobtmcker中的状态量更好管理。)

资源管理器使用的存储容量通过yarn.resourcemanager.store.class属性进行配置。 默认值为 org. apache. hadoop. yarn. server. resourcemanager. recovery.MemStore,这保存在内存中,因此可操作性不是很高。然而, 基于ZooKeeper的存储,以后会支持从资源管理器失败中进行可靠的恢复。

3. 作业的调度

早期版本的Hadoop使用一种非常简单的方法来调度用户的作业:按照作业提交的顺序,使用FIFO(先进先出)调度算法来运行作业。典型情况下,每个作业都会使用整个集群,因此作业必须等待直到轮到自己运行。虽然共 享集群极有可能为多个用户提供大量资源,但问题在于如何公平地在用户 之间分配资源,这要求有更好的调度器。生产作业需要及时完成,同时还 要保证正在进行较小临时查询的用户能够在合理时间内得到返回结果。

不久后,增加了设置作业优先级的功能,可以通过设置mapred.job.priority属性或JobClientsetJobPriority()方法来设置优先级(在这两种方法中,可以选择 VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW 中的任何值作为优先级)。在作业调度器选择要运行的下一个作业时,选择的是优先级 最高的作业。然而,在FIFO调度算法中,优先级并不支持抢占(preemption), 所以高优先级的作业仍然受阻于此前已经开始的、长时间运行的低优先级的作业。

Hadoop中,可以选择MapReduce的调度器。MapReduce 1的默认调度器 是最初基于队列的FIFO调度器,还有两个多用户调度器,分别为公平调度 器(Fair Scheduler)和容量调度器(Capacity Scheduler)

3.1. 公平调度器

公平调度器的目标是让每个用户公平共享集群能力。如果只有一个作业在 运行,就会得到集群的所有资源。随着提交的作业越来越多,闲置的任务槽会以“让每个用户公平共享集群”这种方式进行分配。某个用户的耗时短的作业将在合理的时间内完成,即便另一个用户的长好间作业正在运行而且还在运行过程中。

作业都放在作业池中,在默认情况下,每个用户都有自己的作业池。提交作业数较多的用户,不会因此而获得更多的集群资源。可以用mapreduce的任务槽数来定制作业池的最小容量,也可以设置每个池的权重。

公平调度器支持抢占机制,所以,如果一个池在特定的一段时间内未能公平共享资源,就会中止运行池中得到过多资源的任务,把空出来的任务槽让给运行资源不足的作业池。

公平调度器是一个后续模块。要使用它,需要将其JAR文件放在Hadoop 的类路径(classpath),即将它从Hadoopcontrib/fairscheduler目录复制到lib目录。随后,像下面这样设置mapred.jobtracker.taskScheduler属性:

org.apache.hadoop.mapred.FairSchedulen

经过这样的设置后,即可运行公平调度器。但要想充分发挥其特有的优势 和了解如何配置(包括它的网络接口),请参阅Hadoop发行版 src/contrib/fairscheduler目录下的 README 文件。

 

 

3.2. 容量调度器

针对多用户调度,容量调度器采用的方法稍有不同。集群由很多队列组成(类似于公平调度器的任务池),这些队列可能是层次结构的(因此,一个队列可能是另一个队列的子队列),每个队列被分配有一定的容量。这一点与公平调度器类似,只不过在每个队列内部,作业根据FIFO方式(考虑优先级)进行调度。本质上,容量调度器允许用户或组织(使用队列进行定义)为每个用户或组织模拟出一个使用FIFO调度策略的独立MapReduce集群。

相比之下,公平调度器(实际上也支持作业池内的FIFO作业调度,使其类似于容量调度器)强制每个池内公平共享,使运行的作业共享池的资源。

 

4. shuffle 和排序

MapReduce确保每个reducer的输入都是按键排序的。系统执行排序的过程 (即将map输出作为输入传给reducer)称为shuffle。在此,我们将学习shuffle是如何工作的,因为它有助于我们理解工作机制(如果需要优化 MapReduce程序)shuffle属于不断被优化和改进的代码库的一部分,因此下面的描述有必要隐藏一些细节(也可能随时间而改变,目前是0.20版本)。从许多方面来看,shuffleMapReduce的“心脏”,是奇迹发生的地方。

4.1. map

map函数开始产生输出时,并不是简单地将它写到磁盘。这个过程更复杂,它利用缓冲的方式写到内存并出于效率的考虑进行预排序。图6-6 展示了这个过程。

blob.png 

6-6. MapReduce shuffle 和排序

每个map任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲区的大小为100MB,此值可以通过改变io.sort.mb属性来调整。一旦缓冲内容达到阈值(io.sort.spill.percent,默认为0.80,80%), —个后台线程便开始把内容溢出到(spill)磁盘。在溢出写到磁盘过程中,map输出继续写到缓冲区,但如果在此期间缓冲区被填满,map会被阻塞直到写磁盘过程完成。

溢出写过程按轮询方式将缓冲区中的内容写到mapred.local.dir属性指定的作业特定子目录中的目录中。

在写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区(partition)。在每个分区中,后台线程按键进行内排序,如果有一个combiner,它就在排序后的输出上运行。运行combiner使得map输出结果更紧凑,因此减少写到磁盘的数据和传递给reducer的数据。

每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file),因此在map任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着一次最多能合并多少流,默认值是10

如果至少存在3个溢出文件(通过min.num.spills.for.combine属性设置)时,则combiner就会在输出文件写到磁盘之前再次运行。前面曾讲过,combiner可以在输入上反复运行,但并不影响最终结果。如果只有一或两个溢出文件,那么对map输出的减少方面不值得调用combiner,不会为该map输出再次运行combiner

将压缩map输出写到磁盘的过程中对它进行压缩往往是个很好的主意,因为这样会写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。在默认情况下,输出是不压缩的,但只要将mapred.compress.map.output设置为true,就可以轻松启用此功能。使用的压缩库由 mared.map.output.compression.codec 指定。

reducer通过HTTP方式得到输出文件的分区。用于文件分区的工作线程的数量由任务的tracker.http.threads属性控制,此设置针对的是每一个tasktracker,而不是针对每个map任务槽。默认值是40,在运行大型作业大型集群上,此值可以根据需要而增加。在MapReduce 2中,该属性是 适用的,因为使用的最大线程数是基于机器的处理器数量自动设定的。 mapRedcue 2使用Netty,默认情况下允许值为处理器数量的两倍。

 

4.2. reduce

现在转到处理过程的reduce部分。map输出文件位于运行map任务的 tasktracker的本地磁盘(注意,尽管map输出经常写到map tasktracker的本地磁盘,但reduce输出并不这样),现在,tasktracker需要为分区文件运行reduce任务。而且,reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是 5个线程,但这个默认值可以通过设置mapred. reduce.parallel.copies 属性来改变。

reducer如何知道要从哪台机器取得map输出呢?

map任务成功完成后,它们会通知其父tasktracker状态已更新,然后 tasktracker 进而通知 jobtracker。(在 MapReduce 2 中,任务直接通知其应用程序master。)这些通知在前面介绍的心跳通信机制中传输。因此,对于指定作业,jobtracker(或应用程序master)知道map输出和tasktracker之间的映射关系。reducer中的一个线程定期询问jobtracker以便获取map输出的位置,直到获得所有输出位置。

由于第一个reducer可能失败,因此tasktracker并没有在第一个 reducer检索到map输出时就立即从磁盘上删除它们。相反, tasktracker会等待,直到jobtracker告知它删除map输出,这是作业完成后执行的。

如果map输出相当小,会被复制到reduce任务JVM的内存(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.job.shuffle.merge.percent决定)或达到map输出阈值(由mapred.inmem.merge.threshold控制),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它以降低写入硬盘的数据量。

随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。 这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出(通过 map任务)都必须在内存中被解压缩。

复制完所有map输出后,reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50map输出,而合并因子是 1(丨〇为默认设置,由io.sort.factor属性设置,与map的合并类似),合并 将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。

在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一 次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后 一趟。最后的合并可以来自内存和磁盘片段。

reduce阶段,对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于tasktracker节点(或节点管理器)也运行数据节点,所以第一个块复本将被写到本地磁盘。

每趟合并的文件数实际上比示例中展示有所不同。目标是合并最小 数量的文件以便满足最后一趟的合并系数。因此如果有40个文件,

我们不会在四趟中每趟合并10个文件从而得到4个文件。相反,第 一趟只合并4个文件,随后的三趟合并所有10个文件。在最后一趟 中,4个已合并的文件和余下的6(未合并的)文件合计10个文 件。该过程如图6-7所述。

注意,这并没有改变合并次数,它只是一个优化措施,目的是尽量 减少写到磁盘的数据量,因为最后一趟总是直接合并到reduce

blob.png 

6-7 通过合并因子10有效地合并40个文件片段

4.3. 配置调优

现在我们已经有比较好的基础来理解如何调优shuffle过程来提高 MapReduce性能。表6-1和表6-2总结了相关设置和默认值,这些设置以作 业为单位(除非有特别说明),默认值适用于常规作业。

6-1. map端的调优属性

属性名称

类型

默认值

说明

io.sort.mb

int

100

排序map输出时所使 用的内存缓冲区的大 小,以兆字节为单位

io.sort.record.percent

float

0.05

用作存储map输出记 录边界的io.sort.mb 的比例。剩余的空间 用来map输出记 录本身。该属性在 1.x之后的版本删除了,因为shuffle代码被提高来更好地执行作业,速过使map输出可使用所有可用的内存和计数信息

io.sort.spill.percent

float

0.80

map输出内存缓冲和 用来开始磁盘溢出写 过程的记录边界索 引,这两者使用比例 的阈值

io.sort.factor

int

10

排序文件时,一次最 多合并的流数。这个 属性也在reduce中使 用。将此值增加到 100是很常见的

min.num.spills.for. combine

int

3

运行combiner所需的 最少溢出文件数(如果 已指定combiner)

mapred.compress. map.output

Boolean

false

压缩map输出

mapred.map.output, compression.codec

Class name

org.apache.hactoop.io.compress.DefaultCodec

用干map输出的压缩 编解码器

tasktracker.http.

threads

int

40

每个tasktracker的工 作线程数,用于将 map输出到reducer这是集群范围的设 置,不能由单个作业 设置。在MapReduce 2中不适用

 

总的原则是给shuffle过程尽量多提供内存空间。然而,有一个平衡问题,也就是要确保map函数和reduce函数能得到足够的内存来运行。这就是为什么编写map函数和reduce函数时尽量少用内存的原因,它们不应该无限使用内存(例如,应避免在map中堆积数据)

运行map任务和reduce任务的JVM,其内存大小由mapred .child, java .opts属性设置。任务节点上的内存大小应该尽量大。

map端,可以通过避免多次溢出写磁盘来获得最佳性能,一次是最佳的情况。如果能估算map输出大小,就可以合理地设置ip.sort.*.属性来尽可能减少溢出写的次数。具体而言,如果可以,就要增加io.sort.mb的值。MapReduce计数器(“Spilled records”)计算 在作业运行整个阶段中溢出写磁盘的记录数,这对于调优很有帮助。注 意,这个计数器包括mapreduce两端的溢出写。

reduce端,中间数据全部驻留在内存时,就能获得最佳性能。在默认情况下,这是不可能发生的,因为所有内存一般都预留给reduce函数。但如果reduce函数的内存需求不大,把mapred.inmem.merge.threshold设置为0,mapred.job.reduce.input.buffer.percent设置为 1.0(或一个更低的值,详见表6-2)就可以提升性能。

6-2. reduce端的调优属性

属性名称

类型

默认值

主苗述

mapred.reduce.parallel.copies

int

5

用于把map输出复制到reducer 的线程数

mapred.reduce.copy.backoff

int

300

在声明失败之前,reducer获取一个map输出所花的最大时间,以秒为单位。如果失败(根据指数后退),reducer可以在此时间内尝试重传

io.sort.factor

int

10

排序文件时一次最多合并的流的 数量。这个属性也在map端使用

mapred.job.shuffle. input.buffer.percent

float

0.70

shuffle的复制阶段,分配给 map输出的缓冲区占堆空间的百分比

mapred.iob.shuffle. merge.percent

float

0.66

map输出缓冲区(由mapred.job.shuffle.input.buffer.percent定义)的阈值使用比例,用于启动 合并输出和磁盘溢出写的过程

mapred.inmem.merge, threshold

int

1000

启动合并输出和磁盘溢出写过程的map输出的阈值数。0或更小 的数意味着没有阈值限制,溢出写行为由mapred. job. shuffle. merge.percent 单独控制

mapred.iob.reduce.input. buffer.percent

float

0.0

reduce过程中,在内存中保存map输出的空间占整个堆空间的比例。reduce阶段开始时,内存中的map输出大小不能大于这个 值。默认情况下,在reduce任务开始之前,所有map输出都合并到磁盘上,以便为reducer提供尽可能多的内存。然而,如果reducer需要的内存较少,可以增加此值来最小化访问磁盘的次数

更常见的情况是,Hadoop使用默认为4 KB的缓冲区,这是很低的,因此 应该在集群中增加这个值(通过设置io.file.buffer.size,详见9.4.5 节)。

20084月,Hadoop在通用TB字节排序基准测试中获胜, 它使用了一个优化方法,即将中间数据保存在reduce端的内存中。

5. 任务的执行

在本小节,我们将了解 MapReduce用户对任务执行的更多的控制。

5.1. 任务执行环境

Hadoopmap任务或reduce任务提供运行环境相关信息。例如,map任务可以知道它处理的文件的名称(参见7.2.2节),map任务或reduce任务可以得知任务的尝试次数。表6-3中的属性可以从作业的配置信息中获得,通过为mapperreducer提供一个configure()方法实现(其中,配置信息作为参数进行传递),便可获得这一信息。在新版本的API中,这些属性可以从传递给MapperReducer的所有方法的相关对象中获取。

6-3.任务执行环境的属性

属性名称

类型

说明

范例

mapred.job.id

string

作业ID

job_200811201130_0004

mapred.tip.id

string

任务ID

task_200811201130_ 0004_m_000003

mapred.task.id

string

任务尝试ID

(非任务ID)

attempt_2008112011300004_m_000003_0

mapred.task.partition

int

作业中任务ID

3

mapred.task.is.map

boolean

此任务是否是map任务

true

 

Streaming环境变量

Hadoop设置作业配置参数作为Streaming程序的环境变量。但它用下划线来代替非字母数字的符号,以确保名称的合法性。下面这个Python Streaming脚本解释了如何用Python Streaming 脚本来检索 mapred. job.id 属性的值。

os.environ ["mapred_Job_id"]

也可以应用Streaming启动程序的-cmdenv选项,来设置MapReduce所启动的Streaming进程的环境变量(一次设置一个变量)。比如,下面的语句设置了MAGIC_PARAMETER环境变量:

-cmdenv MAGIC_PARAMETER=abracadabra

 

5.2. 推测执行

MapReduce模型将作业分解成任务,然后并行地运行任务以使作业的整体执行时间少于各个任务顺序执行的时间。这使作业执行时间对运行缓慢的任务很敏感,因为只运行一个缓慢的任务会使整个作业所用的时间远远长于执行其他任务的时间。当一个作业由几百或几千个任务组成时,可能出现少数“拖后腿”的任务,这是很常见的。

任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但是,检测具体原因很困难,因为任务总能够成功完成,尽管比预计执行时间长。 Hadoop不会尝试诊断或修复执行慢的任务,相反,在一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份。这就是所谓的任务的“推测执行”(speculative execution)

必须认识到一点:如果同时启动两个重复的任务,它们会互相竞争,导致推测执行无法工作。这对集群资源是一种浪费。相反,只有在一个作业的所有任务都启动之后才启动推测任务,并且只针对那些已运行一段时间(至少一分钟)且比作业中其他任务平均进度慢的任务。一个任务成功完成后, 任何正在运行的重复任务都将被中止,因为已经不再需要它们了。因此, 如果原任务在推测任务前完成,推测任务就会被终止,同样,如果推测任 务先完成,那么原任务就会被中止。

推测执行是一种优化措施,它并不能使作业的运行更可靠。如果有一些软件缺陷会造成任务挂起或运行速度减慢,依靠推测执行来避免这些问题显然是不明智的,并且不能可靠地运行,因为相同的软件缺陷可能会影响推测式任务。应该修复软件缺陷,使任务不会挂起或运行速度减慢。

在默认情况下,推测执行是启用的。可以基于集群或基于每个作业,单独为map任务和reduce任务启用或禁用该功能。相关的属性如表6-4所示。

6-4.推测执行的属性

属性名称

类型

默认值

描述

mapred.map.tasks. speculative.execution

boolean

true

如果任务运行变慢,该属性决定着是否要启动map任务的另外个实例

mapred.reduce.tasks. speculative.execution

boolean

true

如果任务运行变慢,该属性决定 着是否要启动reduce任务的另外一个实例

Yarn.app.mapreduce. am.job.speculator.class

Class

Org.apache.hadoop. mapreduce.v2.app. speculate.Default Speculator

Speculator类实现推测执行策略(只针对 MapReduce2)

Yarn.app.mapreduce. am.job.estimator.class

Class

Org.apache.hadoop. mapreduce.v2.app. speculate.LegacyTaskRuntimeEstimator

Speculator实例使用的TaskRuntime Estimator的实现,提供任务运行时间的估计值只针对 MapReduce)

 

为什么会想到关闭推测执行?推测执行的目的是减少作业执行时间,但这 是以集群效率为代价的。在一个繁忙的集群中,推测执行会减少整体的吞吐量,因为冗余任务的执行时会减少作业的执行时间。因此,一些集群管 理员倾向于在集群上关闭此选项,而让用户根据个别作业需要而开启该功能。Hadoop老版本尤其如此,因为在调度推测任务时,会过度使用推测执行方式。

对于reduce任务,关闭推测执行是有益的,因为任意重复的reduce任务都必须将取得map输出作为最先的任务,这可能会大幅度地增加集群上的网络传输。

关闭推测执行的另一种情况是考虑到非幂等的(idempotent)任务。然而在很多情况下,将任务写成幂等的并使用OutputCommitter来提升任务成功时输出到最后位置的速度,这是可行的。详情将在下一节将介绍。

5.3. 关于 OutputCommitters

Hadoop MapReduce使用一个提交协议来确保作业和任务都完全成功或失败。这个行为通过对作业使用OutputCommitter来实现,在老版本 MapReduce API中通过调用JobConfsetOutputCommitter()或配置中的mapred.output.committer.class来设置。在新版本的MapReduce API中,OutputCommitter OutputFormat 通过它的 getOutputCommitter()方法确定。默认值为FileOutputCommitter,这对基于文件的MapReduce是适合的。可以定制已有的OutputCommitter或者在需要对作业或任务进行特别的安排或清理时,甚至还可以写一个新的实现。

OutputCommitterAPI如下所示(在新旧版本中的MapReduce API中):

public abstract class OutputCommitter {
    public abstract void setupJob(JobContext jobContext) throws IOException;
    public void commitJob(JobContext jobContext) throws IOException { }
    public void abortJob(JobContext jobContext, DobStatus.State state) throws IOException { }
    public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;
    public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException;
    public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;
    public abstract void abortTask(TaskAttemptContext taskContext) throws IOException;
}

setupJob()方法在作业运行前被调用,通常用来执行初始化操作。 FileOutputCommitter()方法通常为任务创建最后的输出目录${mapred.output.dir}以及一个临时的工作空间${mapred. outpur. dir}/ _temporary

如果作业成功,就调用commitJob()方法,在默认的基于文件的实现中,它用于删除临时的工作空间并在输出目录中创建一个名为_SUCCESS的隐藏的标志文件,以此告知文件系统的客户端该作业成功完成了。如果作业不成功,就通过状态对象调用abortJob(),意味着该作业是否失败或终止(例如由用户终止)。在默认的实现中,将删除作业的临时工作空间。

在任务级别上的操作与此类似。在任务执行之前先调用setupTask()方法,默认的实现不做任何事情,因为针对任务输出命名的临时目录是在写任务输出的时候被创建。

任务的提交阶段是可选的,并通过从needsTaskCornmit()返回的false值关闭它。这使得执行框架不必为任务运行分布提交协议,也不需要commitTask()或者abortTask()

当一个任务没有写任何输出时,FileOutputCommitter将跳过提交阶段。

如果任务成功,就调用commitTask(),在默认的实现中它将临时的任务输出目录(它的名字中有任务尝试的ID,以此避免任务尝试间的冲突)移动到最后的输出路径${mapred.output.dir}。否则,执行框架调用abortTask(),它负责删除临时的任务输出目录。

执行框架保证特定任务在有多次任务尝试的情况下只有一个任务会被提交,其他的则被取消。这种情况是可能出现的,因为第一次尝试出于某个原因而失败(这种情况下将被取消),提交的是稍后成功的尝试。另一种情况是如果两个任务尝试作为推测副本同时运行,则提交先完成的,而另一个被取消。

 

任务附属文件

对于map任务和reduce任务的输出,常用的写方式是通过 OutputCollector来收集键/值对。有一些应用需要比单个键/值对模式更灵活的方式,因此直接将mapreduce任务的输出文件写到分布式文件系统,如HDFS。(还有其他方法用于产生多个输出)

注意,要确保同一个任务的多个实例不向同一个文件进行写操作。如前所述,OutputCommitter协议解决了该问题。如果应用程序将附属文件导入其任务的工作目录中,那么成功完成的这些任务就会将其附属文件自动推送到输出目录,而失败的任务,其附属文件则被删除。

任务通过从其配置文件査询mapred.work.output.dir属性值找到其工作目录。另一种方法,使用Java APIMapReduce程序可以调用FileOutputFormat 上的getWorkOutputPath()静态方法获取描述工作目录的Path对象。执行框架在执行任务之前首先创建工作目录,因此不需要我们创建。

举一个简单的例子,假设有一个程序用来转换图像文件的格式。一种实现方法是用一个只有map任务的作业,其中每个map指定一组要转换的图像 (可以使用NlinelnputFormat,详情参见6.4.3)。如果map任务把转换后的图像写到其工作目录,那么在任务成功完成之后,这些图像就会被传到输出目录。

 

5.4. 任务JVM重用

Hadoop在它们自己的Java虚拟机上运行任务,以区别于其他正在运行的任务。为每个任务启动一个新的JVM耗时约1秒钟,对运行时间在1分钟左右的作业而言,这个额外消耗是微不足道的。但是,有大量超短任务(通常是map任务)的作业或初始化时间长的作业,它们如果能对后续任务重用JVM,就可以体现出性能上的优势。

启用任务重用JVM后,任务不会同时运行在一个JVM上。JVM顺序运行 各个任务。然而,tasktracker可以一次性运行多个任务,但都是在独立的 JVM上运行的。控制tasktrackermap任务槽数和reduce任务槽数的属性。

控制任务JVM重用的属性是mapred.job.reuse.jvm.num.tasks,它指定给定作业每个JVM运行的任务的最大数,默认值为1(参见表6-5)map 任务和reduce任务间没有什么不同,但不同作业的任务总是在独立的JVM 内运行。JobConf中的setNumTasksToExecuteperJvm()方法也可以用于设置这个属性。

6-5.任务JVM重用的属性

属性名称

类型

默认值

描述

mapred.job.reuse. jvm.num.tasks

int

1

在一个tasktracker上,对于给定的作业的每个JVM上可以运行的任务最大数。-1表示无限制,即同一个JVM可以被该作业的所有任务使用

 

通过充分利用HotSpot JVM所用的运行时优化,计算密集型任务也可以受益于任务JVM重用机制。在运行一段时间后,HotSpot JVM构建足够多的信息来检测代码中的性能关键部分并将热点部分的Java字节码动态转换成本地机器码。这对运行时间长的过程很有效,但只运行几秒钟或几分钟的 JVM并不能充分获得HotSpot带来的好处。在这些情况下,值得考虑启用 任务JVM重用功能。

5.5. 跳过坏记录

大型数据集十分庞杂。它们经常有损坏的记录。它们经常有不同格式的记录。它们经常有缺失的字段。在理想情况下,用户代码可以很好地处理这些情况。但实际情况中,忽略这些坏的记录只是权宜之计。取决于正在执行的分析,如果只有一小部分记录受影响,那么忽略它们并不会显著影响结果。然而,如果一个任务由于遇到一个坏的记录而发生问题一一通过抛出一个运行时异常一一任务就会失败。失败的任务将被重新运行(因为失败可能是由硬件故障或任务可控范围之外的一些原因造成的),但如果一个任务失败4次,那么整个作业会被标记为失败。如果数据是导致任务抛出异常的“元凶”,那么重新运行任务将无济于事,因为它每次都会因相同的原因而失败。

 

如果正在使用TextlnputFormat,则可以设置一个预期的行最大长度来防止损坏文件。文件中损坏的记录显示为一个非常长的行,这将引起内存溢出错误并导致任务失败。通过将mapred.linerecordreader.maxlength设置为一个适合内存的以字节衡量的值(一般长于输入数据中行的长度),记录reader将跳过(长的)损坏的行,而不是直接导致任务失败。

最好在mapperreducer代码中处理被损坏的记录。我们可以检测出坏记录并忽略它,或通过抛出个异常来中止作业运行。还可以使用计数器来计算作业中总的坏记录数,看问题影响的范围有多广。

有极少数情况是不能处理的。例如,软件缺陷存在于第三方的库中,我们是无法在mapperreducer中修改它的。在这些情况下,可以使用Hadoop skipping mode选项自动跳过坏记录。

启用skipping mode后,任务将正在处理的记录报告给tasktracker。任务失败时,tasktracker重新运行该任务,跳过导致任务失败的记录。由于额外的网络流量和记录错误以维护失败记录范围,所以只有在任务失败两次之后才会启用 skipping mode

因此,针对一个总是在某条坏记录上失败的任务,tasktracker将根据以下运行结果来启动任务尝试。

(1)任务失败。

(2)任务失败。

(3)开启skipping mode。任务失败,但是失败记录由tasktracker 保存。

(4)仍然启用skipping mode。任务继续运行,但跳过上一次尝试中失败的坏记录。

 

在默认情况下,skipping mode是关闭的,我们用SkipBadRedcord类单独为mapreduce任务启用此功能。值得注意的是,每次任务尝试, skipping mode都只能检测出一个坏记录,因此这种机制仅适用于检测个别坏记录(也就是说,每个任务只有少数几个坏记录)。为了给skipping mode 足够多尝试次数来检测并跳过一个输入分片中的所有坏记录,需要增加最多任务尝试次数(通过 mapned.map.max. attemps mapred. reduce. max.attemps 进行设置)

Hadoop检测出来的坏记录以序列文件的形式保存在_logs/skip子目录下的作业输出目录中。在作业完成后,我们可以查看这些记录(例如,使用hadoop fs-text)进行诊断。

 

转载请注明:全栈大数据 » 第六章 MapReduce的工作机制

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址