整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

5 MapReduce应用开发

MapReduce应用开发

 

在第2章中,我们介绍了MapReduce模型。本章中,我们从实现层面介绍在Hadoop中开发MapReduce应用程序。

MapReduce编程遵循一个特定的流程。首先写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期。然后,写一个驱动程序来运行作业,看这个驱动程序是否可以正确运行,可以先从本地IDE中用一个小的数据集来运行它。如果驱动程序不能正确运行,就用本地IDE调试器来找出问题根源。根据这些调试信息,可以通过扩展单元测试来覆盖这一测试用例,从而改进mapperreducer,使其能正确处理类似输入

一旦程序按预期通过小型数据集的测试,就可以考虑把它放到集群上运行了。当运行程序对整个数据集进行测试的时候,可能会暴露更多的问题,这些问题可以像前面一样修复,即通过扩展测试用例的方式改进mapperreducer。在集群中调试程序很具有挑战性,我们来看一些常用的技术使其变得更简单一些。

程序可以正确运行之后,如果想进行一些优化调整,首先需要执行一些标准检査,借此加快MapReduce程序的运行速度,然后再做任务剖析task profiling)。分布式程序的分栌并不简单,Hadoop提供了钩子(hook)来辅助这个分析过程。

在开始写MapReduce程序之前,需要设置和配置开发环境。为此我们需要先学习如何配置Hadoop

1. 用于配置的API

Hqdoop中的组件是通过Hadoop自己的配置API来配置的。一个Configuration类的实例(可以在org.apache.hadoop.conf/包中找到)代表配置属性及其取值的一个集合。每个属性由一个String来命名,而值的类型可以是多种类型之一,包括Java基本类型(如booleanintlong•和float)和其他有用的类型(如StringClassjava.io.FileString

集合)。

Configuration从资源——(即使用简单结构定义名-值对的XML文件)中

读取其属性值。参见范例5-1。

范例5-1.—个简单的配置文件configuration-1.xml

<?xml version="l.0"?>

<configuration>

<property>

<name>color</name>

<value>yellow</value>

<description>Color</description>

</property>

<property>

<name>size</name>

<value>10</value>

<description>Size</description>

</property>

<property>

<name>weight</name>

<value>heavy</value>

<final>true</final>

<description>Weight</description

</property>

<property>

<namesize-weight</name>

<value>${size},${weight}</value

<description>Size and weight</description>

</property>

/configuration

假定此配置文件位于文件中,我们可以通过如下代码访问其属性:

Configuration conf=new Configuration(); conf.addResource("configuration-1.xml"); assertThat(conf.get("color"), is("yellow")); assertThat(conf.getlnt("size", 0), is(10)); assertThat(conf.get("breadth"j "wide"), is("wide"));

有这样几点需要注意XML文件中不保存类型信息;取而代之的是属性在被读取的时候,可以被解释为指定的类型;此外,get()方法允许为XML文件中没有定义的属性指定默认值,正如这一代码中最后一行的breadth属性一样。

1.1. 资源合并

使用多个资源文件来定义一个配置时,事情变得有趣了。在Hadoop中,这用于分离(core-default.xml文件内部定义的)系统默认属性与(core-site.xml文件中定义的)位置相关(site-specific)的覆盖属性。范例5-2中的文件定义了size属性和weight属性。

范例5-2.第二个配置文件configuration-2.xml

<?xml version="l.0"?>

<configuration>

<property>

<name>size</name>

<value>12</value

</property>

<property>

<name>weight</name>

<value>light</value>

</property>

</configuration>

资源文件按顺序添加到Configuration:

Configuration conf = new Configuration(); conf.addResource("configuration-1.xml"); conf.addResource("configuration-2.xml");

后来添加到资源文件的属性会覆盖(override)之前定义的属性。所以size属性的取值来自于第二个配置文件

assertThat(conf.getlnt("size",0), is(12));

不过,被标记为final的属性不能被后面的定义所覆盖。在第一个配置文件中,weight属性的final状态是true,因此,第二个配置文件中的覆盖设置失败,weight取值仍然是第一个配置文件中的heavy:

assertThat(conf.get ("weight"), is("heavy"));

试图覆盖final属性的操作通常意味着配置错误,所以最后会弹出警告消息来帮助进行故障诊断。一般来说,管理员将守护进程站点中的属性标记为final,表明他们不希望用户在客户端的配置文件或作业提交参数(job submission parameter)中有任何改动。

1.2. 可变的扩展

配置属性可以用其他属性或系统属性进行定义。例如,在第一个配置文件中的size-weight属性可以定义S{size}${weight},而且这些属性是用配置文件中的值来扩展的:

assertThat(conf.get("size-weight"), is("12,heavy"));

系统属性的优先级高于资源文件中定义的属性:

System.setProperty("size", "14");

assertThat(conf.get("size-weight"), is("14,heavy"));

该特性特别适用于在命令行方式下使用JVM参数-Dproperty=value来覆盖属性。

注意,虽然配置属性可以通过系统属性来定义,但除非系统属性使用配置属性重新定义,否则,它们是无法通过配置API进行访问的。因此:

System.setProperty("length","2");

assertThat(conf.get("length"), is((String) null));

5.2配置开发环境

首先新建一个项目,以便编译MapReduce程序并通过命令行或在自己的IDE中以本地(独立,standalone)模式运行它们。在范例5-3中的Maven POM说明了编译和测试MapReduce程序时需要的依赖项(dependency)。

范例5-3•编译和测试MapReduce应用的Maven PoM

<project>

<modelVersion>4•0•0</modelVersion>

<gnoupId>com.hadoopbook</groupId> <artifactId>hadoop-book-mr-dev</artifactId>

<version>3.0</version>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

 </properties

<dependencies>

<!– Hadoop main artifact –>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-core</artifactId>

<version>l.0.0</version>

</dependency>

<!– Unit test artifacts –>

<dependency>

<groupId>junit</groupId>

<artifactld>junit</artifactld>

<version>4.10</vers ion >

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.hamcrest</groupId> <artifactld>hamcrest-all</artifactld>

<version>1.1</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.mrunit</groupId> <artifactld>mrunit</artifactld>

<version>0.8.0-incubating</version>

<scope>test</scope>

</dependency>

<!– Hadoop test artifacts for running mini clusters –> <dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-test</artifactId>

<version>1.0.0</version>

<scope>test</scope>

</dependency>

<!– Missing dependency for running mini clusters –> <dependency>

<groupId>com.sun.jersey</groupId>

<artifactld>jersey-core</artifactId>

<version>1.8</version>

<scope>test</scope>

</dependency>

"dependencies

<build>

<finalName> hadoop-examples </-6inalName>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId> <version>2.B.2</version>

<configuration>

<source>1.6</source>

<target>1.6</target>

</configuration>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactld>maven-jar-plugin</artifactld>

<version>2.4</version>

<configuration>

<outputDirectory>${basedir}</outputDirectory>

</configuration>

</plugin>

</plugins>

</build>

</project>

依赖是POM中有趣的一部分。(只要你使用此处定义的依赖,就可以直接使用其他的构建工具,例如Gradle或者Ant with Ivy)要想构建MapReduce作业,你只需要有hadoop-core依赖,它包含所有的Hadoop类。当运行单元测试时,我们要使用junit类以及两个辅助库,hamcrestall提供了帮助撰写测试断言的匹配符,而mrunit则被用于写MapReduce测试。hadoooptest库中包含了mini–”集群,这有助于在一个单JVM中运行Hadoop集群进行测试(由于HadoopPOM中没有jerseycore,我们把它加了进来)。

 JAR包在版本l.x后发生了变化,因此我们不能仅仅改变hadoop-—core依赖的版本号使得它正常工作。本书站点的范例代码包含针对?不同Hadoop版本的最新依赖声明。

很多IDE可以直接读MavenPOM,因此你只需要在包含pom.xml文件的目录中指向这些MavenPOM,就可以开始写代码®。也可以使用MavenIDE生成配置文件。例如,如下创建Eclipse配置文件以便将项目导入Eclipse

%mvn eclipse:eclipse -DdownloadSources=true -Ddownload3avadocs=true

5.2.1管理配置

开发Hadoop应用时,经常需要在本地运行和集群运行之间进行切换。事实上,可能在几个集群上工作,也可能在本地“伪分布式”集群上测试。伪 分布式集群是其守护进程运行在本机的集群,

应对这些变化的一种方法是使Hadoop配置文件包含每个集群的连接设置,并且在运行Hadoop应用或工具时指定使用哪一个连接设置。最好的做法是,把这些文件放在Hadoop安装目录树之外,以便于轻松地在Hadoop不同版本之间进行切换,从而避免重复或丢失设置信息。

为了方便本书的介绍,我们假设目录co«/包含三个配置文件:Aac/oop-/oca/.xw//jat/oo/?-/oca//ioW.;c7K//z<3rfoop-c/w57er.;cm/(这些文件在本书的范例代码里)。注意,文件名没有特殊要求,这样命名只是为了方便打包配置的设置。(将此与附录A的表A-1进行对比,后者存放的是对应服务器端的配置信息。)

针对默认的文件系统和jobtracker, /2acfoo/?-/oca/.xwl包含默认的Hadoop配置:

<?xml version="l.0"?>

<configuration>

<property>

<name>fs.default.name</name>

<value>file:///</value>

</property>

<property>

<name>mapred.job.tracker</name>

<value>local</value>

</property>

/configuration

hadoop-localhost.xml文件中的设置指向本地主机上运行的namenodejobtracker

<?xml version="l.0"?>

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://localhost/</value>

</property>

<property>

<name>mapred.job.tracker</name>

<value>localhost:8021</value>

</property>

</configuration>

最后,hadoop-cluster.xml文件包含集群上namenodejobtracker的详细信息。事实上,我们会以集群的名称来命名这个文件,而不是这里显示的那样用cluster泛指:

<?xml version="l.0"?>

<configuration>

<property>

<name>fs . default • name"name> <value>hdfs://namenode/</value> "property >

<property>

<name>mapred.job.traeker</name> <value>jobtracker:8021</value> </property>

</configuration

还可以根据需要为这些文件添加其他配置信息。例如,如果想为特定的集群设定Hadoop用户名,则可以在相应的文件中进行这些设置。

设置用户标识

HDFS中,可以通过在客户端系统上运行whoami命令来确定Hadoop用户标识identity)。类似,组名groupname)来自groups命令的输出。

如果Hadoop用户标识不同于客户机上的用户账号,可以通过设置hadoop.job.ugi属性来显式设定Hadoop用户名和组名。用户名和组名由一个逗号分隔的字符串来表示,例如preston,directors,inventors表示用户名为preston,组名是directorsinventors

可以使用相同的语法设置HDFS网络接口(该接口通过设置dfs.web.ugi来运行的用户标识。在默认情况下,webuserwebgroup不是超级用户,因此,不能通过网络接口访问系统文件。

注意,在默认情况下,系统没有认证机制。2.4.1节在讲的安全时介绍了如何在Hadoop中使用Kerberos认证。

有了这些设置,便可以轻松通过conf命令行开关来使用各种配置。例如,下面的命令显示了一个在伪分布式模式下运行于本地主机上的HDFS服务器上的目录列表:

%hadoop fs -conf conf/hadoop-localhost.xml -Is .

Found 2 items

drwxr-xr-x – tom supergroup 0 2009-04-08 10:32 /user/tom/input drwxr-xr-x – tom supergroup 0 2009-04-08 13:09 /user/tom/output

如果省略-conf选项,可以从co«/子目录下的$HADOOP_INSTALL中找到Hadoop的配置信息。至于是独立模式还是伪分布式集群模式,则取决于

 

具体的设置。

Hadoop自带的工具支持conf选项,也可以直接用程序(例如运行MapReduce作业的程序)通过使用Tool接口来支持conf选项。

5.2.2辅助类GenericOptionsParser, ToolToolRunner

为了简化命令行方式运行作业,Hadoop自带了一些辅助类。GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser。更方便的方式是实现Tool接口,通过ToolRunner来运行应用程序。ToolRunner内部调用GenericOptionsParser:

public interface Tool extends Configurable {
int run(String [] args) throws Exception;

}

范例5-4给出了一个非常简单的Tool的实现,用来打印ToolConfiguration对象中所有属性的键值对。

范例5-4. Too丨实现用于打印一个Configuration对象的属性的范例

public class ConfigurationPrinter extends Configured implements Tool {

static {

Configuration.addDefauItResource("hdfs-default.xml");

Configuration.addDefaultResource("hdfs-site.xml");

Configuration•addDefauItResource("mapred-default•xml");

Configuration.addDefauItResource("mapred-site.xml");

gOverride

public int run(String[] args) throws Exception {

Configuration conf = getConf();

for (Entry<String>String> en^ry: conf) {

System.out.printf("%s=%s\nM, entry.getKey(), entry.getValue());

return 0;

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new ConfigurationPrinter(), args); System.exit(exitCode);

我,们把ConfigurationPrinter作为Configured的一个子类,

 

ConfiguredConfigurable接口的…•个实现。Tool的所有实现都需要实现Configurable(因为Tool继承于Configurable),Configured子类通常是一种最简单的实现方式。run()方法通过ConfigurablegetConf()方法获取Configuration,然后重复执行,将每个属性打印到标准输出。

静态代码部分确保核心配置外的HDFSMapReduce配置能够被获取(因为Configuration已经获取了核心配置)。

可以设置哪些属性?

作为一个有用的工具,ConfigurationPrinter可用于了解在环境中某个属性是如何进行设置的。

你也可以在Hadoop安装路径的docs目录中,查看所有公共属性的默认设相失文括coredefauk.htmlhdfs-default.htmlmapred-default.html这几个HTML文件。每个属性都有用来解释属性作用和取值范围的描述。

默认的配置文档可通过以下形式的URL找到:http://hadoop.apache.org/common/docs/r<version>/<component>-default.html.例如,在1.0.0版本中,HDFS的默认配置在http://hadoop.apache.Org/common/docs/rl_0.0/hdfsdefault.html

注意:在客户端配置中设置某些属性,将不会产生影响。例如,如果在作业提交时想通过设置inapred.tasktracker.map.tasks.maxiinum

改变运行作业的tasktracker的任务槽)数,结果会令你失望,因为这个属 性只能在tasktracker的文件中进行设置。般情况下,可以通过属性名的组成部分来获知该属性应该在哪里进行设置。由于mapred.tasktracker.map.tasks.maximummapned.tasktracker

开头,因此,我们知道它只能为tasktracker守护进程进行设置。但是,这

不是硬性的,在有些情况下,我们需要进行尝试,甚至去阅读源代码。

本书讨论了Hadoop的很多重要的配置属性。

ConfigurationPrintermain()方法没有直接调用自身的run()方法,

而是调用ToolRunner的静态run()方法,该方法负责在调用自身的run()方法之前,为Tool建立一个Configuration对象。ToolRunner还使用了GenepicOptionsParser来获取在命令行方式中指定所有标准的选项,

 

然后,在Configuration实例上进行设置。运行下列代码,可以看到在

conf/hadoop-localhost.xml中设置的属性。

%mvn compile

%export HADOOP_CLASSPATH: target/classes

%hadoop ConfigurationPrinter -conf conf/hadoop-localhost.xml \

| grep mapred.job.tracker= mapred•job.tracker=localhost:8021

GenericOptionsParser也允许设置个別属性。例如:

%hadoop ConfigurationPrinter -D color=yellow | grep color

color=yellow

D选项用于将键color的配置属性值设置为yellow。设置为D的选项优先级要高于配置文件里的其他属性。这一点很有用:可以把默认属性放入配置文件中,然后再在需要时,用D选项来覆盖它们。一个常见的例子是:通过-Dmapred.reduce.tasks=n来设置MapReduce作业中reducer的数量。这样会覆盖集群上或客户端配置属性文件中设置的reducer数量。

GenericOptionsParserToolRunner支持的其他选项可参见表5-1。更 多的Hadoop用于配置的API可以在5.1节中找到。

-D property = value选项将Hadoop属性设置为GenericOptionsParser(和ToolRunner),不同于用Java命令-Dproperty=value选项对JVM系统属性的设置。JVM系统属性的语法不允许D耜属性名之间有任何空格,而GenericOptionsParser则允许用空格。

JVM系统属性来自于java.lang.System类,而Hadoop属性只能从Configuration对象中获取。所以,下面的命令行将没有任何输出,因为Conf igurationPrinter没有使用System类:

%hadoop -Dcolor=yellow ConfigurationPrinter | grep color

如果希望通过系统属性进行配置,则需要在配置文件中反映相关的系统属性。具体讨论请参见5.1.2节。

5-1.GenericOptionsParser选项和ToolRunner选项

选项名称描述

-Dproperty=vaLue将指定值赋值给某个Hadoop配置属性。覆盖配置文件里的

默认属性或站点属性,或通过-conf选项设置的任何属性

 

选项名称

-fs uri

用指定的URI设置默认文件系统。这是4 fs.default.name=uri的快捷方式

-jt host:port

用指定主机和端口设置jobtracker这是-Dmapred. job.tracker= host:port的快捷方式

-files

fiLeljfiLe2j…

从本地文件系统(或任何指定模式的文件系统)中复制指定 文件到jobtracker所用的共享文件系统(通常是HDFS),确保在任务工作目录的MapReduce程序可以访问这些文 件(要想进一步了解复制文件到tasktracker机器时的分布 式缓存机制,请参见8.4.2节)

-archives

archivelj archi\/e2j

从本地文件系统(或任何指定模式的文件系统)复制指定存 档到jobtracker所用的共享文件系统(通常是HDFS),打 开存档文件,确保任务f作目录的MapReduce程序可以

访问这些存档

-libjarsjarljjar2,…

从本地文件系统(或任何指定模式的文件系统)复制指定JAR文件到被jobtracker使用的共享文件系统(通常是HDFS),把它们加入MapReduce任务的类路径中。这个 选项适用于传输作业需要的JAR文件

 

 

5.3用MRUnit来写单元测试

MapReduce中,map函数和reduce函数的独立测试非常方便,这是由函数风格决定的。MRUnit(/?"p:///?jcMZ»fl/o/*.apacAe.org/mrMm7/)是一个测试库,它便于将已知的输入传递给mapper或者检査reducer的输出是否符合预期。MRUnit与标准的执行框架(如JUnit)—起使用,因此可以将MapReduce作业的测试作为正常开发环境的一部分运行。例如这里描述的所有测试都可根据5.2节介绍的指令在IDE中运行。

5.3.1关于Mapper

范例5-5是一个mapper的测试。

范例5-5.MaxTemperatureMapper的单元测试

import java.io.IOException; import ong.apache.hadoop.io.*;

import org.apache.hadoop.mrunit.mapreduce.MapDriver; import org.junit.*;

 

gTest

public void processesValidRecord() throws IOException,InterruptedException { Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +

// YearAAAA

"99999V0203201N00261220001CN9999999N9-00111+99999999999");

// TemperatureAAAAA

new MapDriver<LongWritable, Text, Text, IntWritable>()

•withMapper(new MaxTemperatureMapper())

• withinputValue(Value)

.withOutput(new Text (,r1950w), new IntWritable(-11))

.runTest();

测试很简单:传递一个天气记录作为mapper的输人,然后检査输出是否是读入的年份和气温。

由于测试的是mapper,所以可以使用MRUnitMapDriver.在调用runTest()方法执行这个测试之前,在test(MaxTemperatureMapper)下配置mapper、输入值、期望的输出key(1950,表示年份的Text对象)和期望的输 出值(-1.1X:,表示温度的IntWritable)。如果没有期望的输出值发出mapper,MRUnit测试失败。注意,由于mapper忽略输入key,因此,我们此处不需要设置输入key

在测试驱动的方式下,范例5-6创建了一个能够通过测试的Mapper实现。由于本章要进行类的扩展,所以每个类被放在包含版本信息的不同包中。例如,vl.MaxTemperatur'eMapper'是MaxTemperatureMapper的第一个版本。当然,不重新打包实际上也可以对类进行扩展。

范例5-6.第一个版本的Mapper函数通过了MaxTemperatureMapper测试

public class MaxTemperatureMapper*extends Mapper<LongWritableiText, Text, IntWritable> {

^Override

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature = Integer.parselnt(line.substring(87, 92)); context.write(new Text(year), new IntWritable(airTemperature));

这是i个非常简单的实现,从行中抽出年份和气温,并将它们写到Context

 

中。现在,让我们增加一个缺失值的测试,该值在原始数据中表示气温+9999:

@Test*

public void ignoresMissingTemperatureRecord() throws IOException { InterruptedException {

Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" + // YearAAAA

,,99999V0203201N00261220001CN9999999N9+99991+99999999999,');

// TemperatureAAAAA

new MapDpiver<LongWritable, Text, Text, IntWritable>()

.withMapper(new MaxTemperatureMapper())

.withinputValue(value)

.runTest();

}

根据withOutput()被调用的次数,MapDriver能用来检査0、1或多个输

出记录。在这个测试中由于缺失温度的记录已经被过滤掉,该测试保证对于这种特定的输入值不产生任何输出。

由于parselnt()不能解析带加号的整数,所以测试最后抛出NumberFormatException异常,以失败告终。下面修改此实现(版本2)来处理缺失值:

@Overide

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

String temp = line.substring(87, 92); if (!missing(temp)) {

int airTemperature = Integer.parselnt(temp);

output.collect(new Text(year), new IntWritable(airTemperature));

private boolean missing(String temp) { return temp.equals("+9999");

这个测试通过后,我们接下来写reducer

5.3.2关于Reducer

reducer必须找出指定键的最大值。这是针对此特性的一个简单的测试,其中使用了一个ReduceDriver

 

@Test

public void returnsMaximumlntegerlnValues() throws IOException, InterruptedException {

new ReduceDriver<Text, IntWritable, Text^ IntWritable>()

.withReducer(new MaxTemperatureReducer())

• withInputKey(new Text("1950"))

.withInputValues(Arrays.asList(new IntWritable(10), new IntWritable(5))) .withOutput(new Text("1950M), new IntWritable(10))

.runTest();

}

我们对一些IntWritable值构建一个迭代器来验证MaxTemperatureReducer能找到最大值。范例5-7里的代码是一个通过测试的MaxTemperatureReducer的实现。

范例5-7.用来计算最高气温的reducer

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

^Override

public void reduce(Text key, Iterable<IntWritable> values,

Context context)

throws IOException, InterruptedException {

int maxValue = Integer .MIN__VALUE; for (IntWritable value : values) {

maxValue = Math.max(maxValue, value.get());

>

context.write(key, new IntWritable(maxValue));

5.4本地运行测试数据

现在mapperreducer已经能够在受控的输入上进行工作了,下一步是写—个作业驱动程序(Jobdriver),然后在开发机器上使用测试数据运行它。

5-4.1在本地作业运行器上运行作业

通过使用前面介绍的Tool接口,可以轻松写一个MapReducer作业的驱动程序,来计算按照年度査找最高气温(参范例5-8的MaxTemperatureDriver)。

 

范例5-8.查找最高气温

public class MaxTemperatureDriver extends Configured implements Tool {

^Override

public int run(String[] args) throws Exception { if (args.length != 2) {

System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());

ToolRunner.printGenericCommandUsage(System.err); return -1;

Dob job = new Job(getConf(), "Max temperature"); job. set JarByClass(getClass());••

FilelnputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureR«ducer.class);

job.setOutputKeyClass(Text•class);

job.setOutputValueClass(IntWritable.class);

return job.waitForCompletion(true) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new MaxTemperatureDriverC), args); System.exit(exitCode);

MaxTemperatureDriver实现了Tool接口,所以,我们能够设置GenericOptionsParser支持的选项。runO方法根据工具的配置创建一个]ob对象来启动一个作业。在所有可能的作业配置参数中,可以设置输入和输出文件路径,mapperreducercombiner以及输出类型(输入类型 由输入格式决定,默认为TextlnputFormat,包括LongWritable键和Text值)。为作业设置一个名称(Maxtemperature)也是很好的做法,这样可以在执行过程中或作业完成后方便地从作业列表中査找作业。默认情况下,作业名称是JAR文件名,通常情况下没有特殊的描述。

现在我们可以在一些本地文件上运行这个应用。Hadoop有一个本地作业运行器(jobrunner)它是在MapReduce执行引擎运行单个JVM上的MapReduce作业的简化版本。它是为测试而设计的,在IDE中使用起来非

 

常方便,因为我们可以在调试器中单步运行mapperreducer代码。

本地作业运行器只能用于简单测试MapReduce程序,因为它不同于完全的MapReduce实现。最大的区别是它不能运行多个reducer

(它也支持〇个reducer的情况。)通常情况下,这是没有问题的,因为虽然在集群上用户可以选择多个reducer来充分利用并行计算的优势,但是大多数应用可以在一个reducer的情况下工作。注意:即使把reducer的数量设置为大于1的值,本地作业运行器也会忽略这个设置而只使用一个reducer

Hadoop的后续版本可能会放宽这些限制。

本地作业运行器通过一个配置设置来激活。正常情况下,tnapred.job.tracker是一个主机:端口(host:port),用来设置jobtracker的地址,但它是一个特殊的local值(实际是个默认值)时,作业就在不访问外部jobtracker的情况下运行。

MapReduce2(包含在6.1.2节)中,等价的设置是mapreduce.framework.name,它必须设置为local

可以在命令,行方式下输入如下命令来运行驱动程序:

%mvn compile

%export HADOOP_CLASSPATH=target/classes/

%hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml \ input/ncdc/micro output

类似地,可以使用GenericOptionsParser提供的-fsjt选项:

%hadoop v2.MaxTemperatureDriver -fsfile:///-jt local input/ncdc/micro

这条指令使用本地目录作为输入来执行MaxTemperatureDriver,产生的输出存放在本地目录中。注意:虽然我们设置了fs,可以使用本地文件系统(file:///),但本地作业运行器实际上可以在包括HDFS在内的任何文件系统上正常工作(如果HDFS里有一些文件,可以马上进行尝试)。

我们运行这个程序时,运行失败,并打印如下异常:

java.lang.NumberFormatException: For input string:"+0000"

 

修复mapper

这个异常表明map方法仍然不能解析带正号的气温。如果堆栈跟踪不能提供足够的信息来诊断这个错误,因为程序运行在一个JVM中,我们可以在本地调试器中进行测试。前面我们已经使程序能够处理缺失气温值(+9999)的特殊情况,但不是任意正气温的一般情况。如果mapper中有更多的逻辑,那么给出一个解析类来封装解析逻辑是非常有意义的。参见范例5-9(这 是第三个版本)。

范例5-9.该类解析NCDC格式的气温记录

public class NcdcRecordParser {•.

private static final int MISSING_TEMPERATURE = 9999;

private String year; private int airTemperature; private String quality;

public void parse(String record) { year = record.substring(15, 19);

String airTemperatureString;

// Remove leading plus sign as parselnt doesn't like them if (record.charAt(87) == '+') {

airTemperaturestring = record.substring(88, 92);

} else {

airTemperaturestring = record.substring(87, 92);

airTemperature = Integer.parselnt(airTemperatureString); quality = record,substring(92j 93);

public void parse(Text record) { pa rse( record. toString);

public boolean isValidTemperature() {

return airTemperature != MISSING_TEMPERATURE && quality.matches(M[01459]H);

>

public String getYear() { return year; } public int getAirTemperature() { return airTemperature;

最终的mapper相当简单(参见范例5-10)。只调用解析类的parserO方法,后者解析输入行中的相关字段,用isValidTemperature()方法检査174 第5章

 

是否是合法气温,如果是,就用解析类的getter方法获取年份和气温数据。注意,我们也会在isValidTemperature()方法中检查质量状态字段和缺失的气温信息,以便过滤气温读取错误。

创建解析类的另一个好处是:相似作业的mapper不需要重写代码。也提供了一个机会直接针对解析类编写单元测试,用于更多目标测试。

范例5-10•这个mapper使用utility类来解析记录

public class MaxTemperatureMapper

extends Mapper<LongWritable, Text, Text, IntWritable> {

private NcdcRecordParser parser = new NcdcRecordParser();

^Override

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

parser.parse(value);

if(parser.isValidTemperature) {context.write(new Text(parser.getYear()),

new IntWritable(parser.getAirTemperature()));

经过这些修改以后,测试得以通过。

5.4.2测试驱动程序

除了灵活的配置选项可以使应用程序实现Tool,还可以插入任意Configuration来增加可测试性。可以利用这点来编写测试程序,它将利用本地作业运行器在已知的输《入数据上运行作业,借此来检査输出是否满足预期。

要实现这个目标,有两种方法。第一种方法是使用本地作业运行器,在本地文件系统的测试文件上运行作业。范例5-11的代码给出了一种思路。

范例5-11•这个MaxTemperatureDriver测试使用了一个正在运行的本地作业运行器

gTest

public void test() throws Exception {

Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local");

 

Path input = new Path("input/ncdc/micro"); Path output = new Path("output");

FileSystem fs = FileSystem.getLocal(conf); fs.delete(output, true); // delete old output

MaxTemperatureDriver driver = new MaxTemperatureDriver(); driver.setConf(conf);

int exitCode = driven.run(new String[] { input.toString(), output.toStning() }); assertThat(exitCode, is(0));

checkOutput(conf, output);

测试代码明确设置fs.default.namemapVed.job.tracker,所以,它

使用的是本地文件系统和本地作业运行器。随后,通过其Tool接口在少数已知数据上运行MaxTemperatureDriver。最后,Check0utput()方法被调用

以逐行对比实际输出与预期输出。

测试驱动程序的第二种方法是使用一个mini集群来运行它。Hadoop有一组测试类,名为MiniDFSCluster、MiniMRClusterMiniYARNCluster

它以程序方式创建正在运行的集群。不同于本地作业运行器,它们允许在整个HDFSMapReduce机器上运行运行测试。注意,mini集群上的tasktracker启动不同的JVM来运行任务,这会使调试更困难。

mini集群广泛应用于Hadoop自带的自动测试包中,但也可以用于测试用户代码。HadoopClusterMapReduceTestCase抽象类提供了一个编写此类测试的基础,它的setUp()和tearDown()方法可处理启动和停止运行中的HDFSMapReduce集群的细节,同时产生一个合适的被配置为一起工作的配置对象。子类只需要得到HDFS中的数据(可能从本地文件中复制得到),运行MapReduce作业,然后确认输出是否满足要求。参见本书示例代码中的MaxTemperatureDriverMiniTest类。

这样的测试是回归测试,暴一个非常有用的输入边界用例和相应的期望结果的资源库。随着测试用例的增加,简单将其加入输入文件,然后更新相应输出即可。

5.5在集群上运行

目前,程序已经可以在少量测试数据上正确运行,下面可以准备在Hadm>p

 

集群的完整数据集上运行了。第9章将介绍如何建立完全分布的集群,同 时,该章中的方法也可以用在伪分布集群上。

5.5.1打包作业

本地作业运行器使用单JVM运行一个作业,只要作业需要的所有类都在类路径(classpath)上,那么作业就可以正常执行。

在分布式的环境中,情况稍微复杂一些。开始的时候作业的类必须打包进作业的JAR文件中并发送给集群。Hadoop通过搜索驱动程序的类路径自动找到作业的JAR文件,该类路径包含了]obConf]ob上的set]arByClass方法中设置的类。另一种方法,如果你想通过文件路径设置一个指定的JAR文件,可以使用setJar〇方法。

通过使用像AntMaven的工具可以方便地创建作业的JAR文件。例如下面的Maven命令将在包含所有已编译的类的工程目录中创建一个名为hadoop-examples.jar3AR文件:

%mvn package -DskipTests

如果每个JAR文件都有一个作业,可以在JAR文件的manifest中指定要运行的main类。如果main类不在manifest中,则必须在命令行指定(见下 文)》任何非独立的JAR文件应该打包到JAR文件的子目录中。当然也有其他的方法将依赖包含进来,这我们稍后会讨论。类似地,资源文件也可以打包进类的子目录。这与Javaarc/j/_veWAR文件类似,只不过JAR文件是放在fT£5-/A^//沁子目录下WAR文件中的。

1.客户端的类路径

hgdoop jar <jar>设置的用户客户端类路径包括以下几个组成部分:

•作业的JAR文件

•作业JAR文件的//fc目录中的所有JAR文件以及类目录(如果定义)

HADOOP_CLASSPH定义的类路径(如果已经设置)

顺便说一下,这解释了如果你在没有作业〗AR(hadoopCLASSNAME)情况下使用本地作业运行器时,为什么你必须设置HADOOP_CLASSPATH来指明依赖类和库。

 

2.任务的类路径

在集群上(包括伪分布式模式),mapreduce任务在各自的JVM上运行,它们的类路径不受HADOOP_CLASSPATH控制。HADOOP_CLASSPATH是一项

客户端设置,并只针对驱动程序的JVM的类路径进行设置。

反之,用户任务的类路径有以下几个部分组成:

•作业的JAR文件

作业JAR文件的/A目录中包含的所有JAR文件以及类目录(如果

定义)

•使用-libjars选项(参阅表5-1)Distr^butedCacheaddFileToClassPath()方法(老版本的API)hb(新版本沾API)添加到分布式缓存的所有

文件

3.打包依赖

给定这些不同的方法来控制客户端和类路径上的内容,也有相应的操作处理作业的库依赖:

•将库解包和重新打包到作业的JAR•对作业的JAR的目录中的库打包

•保持库与作业的JAR分开,并且通过HADOOPJILASSPATH将它们添加到客户端的类路径,通过-libjars将它们添加到任务的类路径

从创建的角度来看,最后使用分布式缓存的选项是最简单的,因为依赖不需要在作业的JAR中重新创建。同时,分布式缓存意味着在集群上更少的JAR文件转移,因为文件可能缓存在任务间的一个节点上了。详情可参见8.4.2节。

4.任务类路径的优先权

用户的JAR文件被添加到客户端类路径和任务类路径的最后,如果Hadoop使用的库版本和你的代码使用的不同或不相容,在某些情况下可能会引发和Hadoop内置库的依赖冲突。有时需要控制任务类路径的次序,这样你的类能够被先提取出来。在客户端,可以通过设置环境变量HADOOPUSER_CLASSPATH_FIRSTtrue强制使Hadoop将用户的类路径优先放到捜索顺序对于任务的类路径,你可以将mapreduce.task.classpath.first设为true。注意,设置这些选项就改变了针对Hadoop框架依赖的类(但仅仅对你的作业而

 

),这可能会引起作业的提交失败或者任务失败,因此请谨慎使用这些选项。5.5.2启动作业

为了启动作业,我们需要运行驱动程序,使用-conf选项来指定想要运行作业的集群(同样,也可以使用-fs-jt选项):

%unset HADOOP_CLASSPATH

%hadoop jar hadoop-examples.jar v3.MaxTemperatureDriver \

-conf conf/hadoop-cluster.xml input/ncdc/all max-temp

我们不设置HADOOP_CLASSPATH环境变量是因为对于该作业没有任何第三方依赖。如果它被设置为/target/classes/(本章前面的内容),那么当Hadooptarget/classes而不是从JAR装载MaxTempratureDriver类时,Hadoop将找不到作业的JAR,从而导致作业失败。

]〇b上的waitForCompletion()方法启动作业并检査进展情况。如果有任何变化,就输出一行mapreduce进度总结。输入如下(为了清楚起见,有些行特意删除了):

09/04/11 08:15:52INFO mapred.FilelnputFormat:Total input paths to process : 101 09/04/11 08:15:53 INFO mapred.DobClient: Running job: job_200904110811_0002 09/04/11 08:15:54 INFO mapred.JobClient: map0%reduce0%

09/04/11 08:16:06 INFO mapred.DobClient: map28%reduce0%

09/04/11 08:16:07 INFO mapred.DobClient: map30%reduce0%

09/04/11 08:21:36 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient:Dob

09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO maprecKDobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.3obClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapped.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapred.DobClient: 0*9/04/11 08:21:38 INFO mapred. DobClient: 09/04/11 08:21:38 INFO mapred.DobClient:

 

09/04/11 08:21:38INFO mapred.DobClient:09/04/11 08:21:38 INFO mapred.DobClient: 09/04/11 08:21:38 INFO mapped.DobClient: 09/04/11 08:21:38 INFO mapred.JobClient: 09/04/11 08:21:38 INFO mapred.DobClient:

输出包含很多有用的信息。在作业开始之前,打印作业ID;如果需要在日志文件中或通过hadoopjob命令查询某个作业,必须要有ID信息。作业完成后,统计信息(例如计数器)被打印出来。这对于确认作业是否完成是很有用的。例如,对于这个作业,大约分析275GB输入数据(“Mapinput bytes),读取了HDFS大约34GB压缩文件(HDFS_BYTES_READ)。输入数据被分成101个大小合适的gzipped文件,因此即使不能划分数据也没有问题。..

作业、任务和任务尝试ID

作业ID的格式包含两部分:jobtracker(不是作业的)开始时间和唯一标识此作业的由jobtracker维护的增量计数'器。例如:ID_job_200904110811_0002的作业是第二个作业(0002,作业丨D1开 始),jobtracker2009年4月11日08:11开始运行这个作业。计数器 的数字前面由0开始,以便于作业ID在目录列表中进行排序。然而,计数器达到10000时,不能重新设置,导致作业ID更长(这些1D不能很好地排序)。

任务属于作业,任务ID通过替换作业ID的作业前缀为任务前缀,然后加上一个后缀表示哪个作业里的任务。例如:task_200904110811_ 0002_m_000003表示IDjob_200904110811_0002的作业的第4i

map任各(000003,任务ID0开始计数)。作业的任务ID在初始化时产生,因此,任务ID的顺序不必是任务执行的顺序。

由于失败(参见6.2节)或推测执行(参见6.5.2),任务可以执行多次, 所以,为了标识任务执行的不同实例,任务尝试都会被指定一个在jobtracker上唯一的ID.如:attempt_200904110811_0002_m_000003_0表示正在运行的task_200904110811_0002_m_000003任务的第一}attemp(0,attemptID0开始计数)。任务尝试在作业运行时根据 需要分配,所以.,它们的顺序代表tasktracker产生并运行的先后顺序。

如果在jobtracker重启并恢复运抒作业后,作业被重启,那么任务尝试ID中最后的计数值将从1000递增。该动作默认是不可用的,参见

6.2.1节。

 

i CompU<*d

5_5.3MapReduceWeb界面

HadoopWeb界面用来浏览作业信息,对于跟踪作业运行进度、查找作业完成后的统计信息和日志非常有用。可以在找到用户界面信息。

1_jobtracker页面

ip-10-250-110-47 Hadoop Map/Reduce Administration

State.* HMNMNH

St»rt«d;SatApr U 0«rJl 53 EOT ?000 ,

V*r*Um:O_2D 0, "63504

ComjrfkrtlAor&05^18s*& UTC 2f»S t>y rvWifiy

W#n"fl2〇〇«>川〇0”

Clutter Summary (Heap Sizeis53.75 MB/888.94 MB)

 

 

 

 

5-1给出了主页的截屏。第一部分是Hadoop的安装细节,包括版本号、编译时间和jobtracker的当前状态(在本例中,状态是running)和启动时间。

 

j Qu»u« NAme

I ScIviduilng information |

i

FUt»f (Jobid, PrktrUy,Nam*)

tLmimht.:utiu;:!tmv< Hiiiitso^–9»i^ 0>

 

 

JoWd iPrtortty iNOOMAL

U*w

Qornci

otwpsI.

M«p%

Ccwttpl

W".w

Ms

C»«pl«W>

,M_…

««ducc%R«kJuc«Retfwos,;Job SchwJuHnSfXtit»i \Cmi«40() ; irtlrm«tkm

100I30U\na

:;;:

 

 

5-1 •jobtracker页面的屏薄截图

 

接下来是关于集群的概要信息,包括集群的负载情况和使用情况。这表明当前正在集群上运行的mapreduce的数量,作业提女的数量,可用的tasktracker节点数和集群的负载能力,集群中可用mapreduce的任务槽数“MapTaskCapacity”和“ReduceTaskCapacity”),每个节点平均可用的任务槽数。被jobtracker列入黑名单的tasktrackers数也被列出,关于黑名单的详细信息,请参见6.2.1节中对tasktracker失败的论述。

概要信息的下面是正在运行的作业调度器的相关信息(此处是“默认值”)。 可以单击査看作业队列。

随后,显示的是正在运行、(成功地)完成和失败的作业。每部分都有一个作业表,其中每行显示作业的ID、所属者_、作业名(使用obConfset]obName()方法设置的mapred. job.name属性)和进度信息。

最后,页面的底部是一些链接信息,指向jobtracker日志和jobtracker历史信息:记录jobtracker运行过的所有作业的信息。在作业存储到历史信息页之前,主页上只显示100个作业(通过maprecLjobtracker. completeuserjobs.maximum属性来配置)。注意,作业历史是永久存储的,因此,可以从以前运行的jobtracker中找到作业。

作业历史

作业历史包括已完成作业的事件和配置信息。不管作业是否成功执行,作业历史都将保存下来,为运行作业的用户提供有用信息。

作业历史文件存放在jobtracker本地文件系统中logs目录的子目录中。通过hadoop.job.history.location属性来设置历史文件存放在Hadoop文件系统的任意位置。jobtracker的历史文件会保存30天,随后被系统删除。

作业输出目录下的_/ogs/7ni/or_v子目录为用户存放第二个备份。这个存放位置可以通过设置hadoop.job.history.user.location进行覆盖。如果将其值设置为特殊值none,则不会有用户作业历史被保存,虽然作业历史仍然是集中存放的。用户的作业历史文件不会被系统删除。

历史日志包括作业、任务和尝试事件,所有这些信息以纯文本方式存储。特殊作业的历史可以通过Web界面或在命令行方法下用hadoop job -history(指定的作业输出目录中)查看。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

作业运行期间,可以在作业页面监视作业进度,页面信息会定期自动更新。摘要信息下方的表展示mapreduce进度NumTask显示该作业mapreduce的总数。其他列显示的是这些任务的状态:Pending(等待运行)、RunningComplete(成功完成)和Killed(失败任务——用Failed标记更准确)。最后一列显示的是一个作业所有mapreduce任务中失败和中止的taskattempt总数(taskattempt可标记为killed,原因可能是:它们是推测执行的副本6.2.1节对任务失败进行了详细的讨论。taskattempt运行的tasktracker已结束,或这些taskattempt已被用户中止)。

在该页面的随后部分,可以找到显示每个任务进度的完成图。reduce完成图被分为reduce任务的三个阶段:copy(map输出传输到reducetasktracker时)、sort(合并reduce输入时)和feduce(reduce函数运行产生最后输出时)。这些阶段的详细描述参见6.4节。

在该页的中间部分是作业计数器表。这些信息在作业运行期间动态更新,为作业进度和整体健康程度提供另一个有用的信息。关于这些计数器的详细信息,请参见8.1.1节。

5.5.4获取结果

一旦作业完成,有许多方法可以获取结果。每个reducer产生一个输出文件,因此,在目录中会有30个部分文件(partfile),命名为par/-00000part-00029

正如文件名所示,这些“part”文件可以认为是文件的一

部分。

如果输出文件很大(本例不是这种情况),那么把文件分为多个part文件很重要,这样才能使多个reducer并行工作。通常情况下,如果文件采用这种分割形式,使用起来仍然很方便:例如作为另一个MapReduce作业的输人。在某些情况下,可以探索多个分割文件的结构来进行map端连接操作(参阅8.3.1)或执行一个MapFile的査找操作(参阅8.2.2)。

这个作业产生的输出很少,所以很容易从HDFS中将其复制到开发机器上。-getmerge选项对hadoopfs命令很有用,因为它得到了源模式指定目录下所有的文件,并将其合并为本地文件系统的一个文件:

%hadoop fs -getmerge max-temp max-temp-local

 

%sort max-temp-local | tail

1991607

1992605

1993567

1994568

1995567

1996561

1997565

1998568

1999568

2000558

因为reduce的输出分区文件是无序的(使用hashpartitioner的缘故),我们对输出进行排序。对MapReduce的数据做些后期处理是很常见的,把这些数据送入分析工具(例如R、电子数据表甚至关系数据库)进行处理。

如果输出文件比较小,另外一种获取方式是使用cat选项将输出文件打印到控制台:

%hadoop fs -cat max-temp/*

深入分析后,我们发现某些结果看起来似乎没有道理。比如1951年(此处 没有显示)的最高气温是590€!这个结果是怎么产生的呢?是不正确的输 入数据还是程序中的bug?

5.5.5作业调试

最经典的方法通过打印语句来调试程序,这在Hadoop中同样适用。然而,需要考虑复杂的情况:当程序运行在几十台、几百台甚至几千台节点上时,如何找到并检测调试语句分散在这些节点中的输出呢?为了处理这种情况,我们要査找一个特殊情况,我们用一个调试语句记录到一个标准错误中,它将发送一个信息来更,新任务的状态信息以提示我们査看错误日志。我们将看到WebUI简化了这个操作。

我们还要创建一个自定义的计数器来统计整个数据集中不合理的气温记录总数。这就提供了很有价值的信息来处理如下情况,如果这种情况经常发生,我们需要从中进一步了解事件发生的条件以及如何提取气温值,而不是简单地丢掉这些记录。事实上,调试一个作业的时候,应当总想是否能够使用计数器来获得需要找出事件发生来源的相关信息。即使需要使用日志或状态信息,但使用计数器来衡量问题的严重程度仍然也是有帮助的(详情参见8.丨节)。

 

如果调试期间产生的日志数据规模比较大,可以有如下选择。第一是将这些信息写到map的输出流供reduce分析和汇总,而不是写到标准错误流。这种方法通常必须改变程序结构,所以先选用其他技术。第二种方法,•可 以编写一个程序(当然是MapReduce程序)来分析作业产生的日志。 我们把调试加入mapper(版本4),而不是reducer,因为我们希望找到导致这些异常输出的数据源:

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

enum Temperature {

OVERJL00

} •

private NcdcRecordParser parser = new NcdcRecordParser();

@ Override

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

parser.parse(value);

if (parser.isValidTemperature()) {

int airTemperature = parser.getAirTemperature(); if (airTemperature > 1000) {

System.err.println("Temperature over 100 degrees for input: " + value); context.setStatus("Detected possibly corrupt record: see logs,〉;

context.getCounter(Temperature.OVER_100.increment(1));

}

context.write(new Text(parser.getYear), new IntWritable(airTemperature));

如果气温超过i(rc(表示为looo,因为气温只保留小数点后一位),我们输出一行到标准错误流以代表有问题的行,同时使用ContextsetStatus()方法来更新map中的状态信息,引导我们査看日志。我们还增加了计数器,表示为Javaenum类型的字段。在这个程序中,定义一个OVERJL00字段来统计气温超过l(TC的记录数。

完成这些修改,我们重新编译代码,重新创建JAR文件,然后重新运行作业并在运行时进入任务页面。

 

1.任务页面

任务页面包括一些査看作业中任务细节的链接。例如,点击map链接,进入一个页面,所有map任务的信息都列在这一页上。还可以只査看已完成的任务。图5-3中的截图显示了带有调试语句的作业页面中的一部分。表中的每行代表一个任务,提供的信息包括每个任务的开始时间和结束时间,来自tasktracker的错误报告,一个用来査看每个任务的计数器的链接。

Completed Tasks

Task

Complele

Status

StartTime

FinishTune

Errors

Counters

task?00904110B110003mOOQC43

100.00*4

hdfs;//ip

10*250-110-47ec2.in«emal

i'user/root/input/nr^lc/all

yid4d.g2<}+2203.»475

1tApr-2009 09:00:06

11-Apr-200S

09:01:25

ISsqc)

in

task200904110811QQQ3mQ0Q044

100.00%

Detectedpossiblycorrupt record:seelogs.

11-Apr-2009 09.00:06

11-Apr-2009

09:01:28

<1mins.

21sec)

n

task200904110B11 0003mO0QD4S

100.00%

hdfe://ip

10-250-11j>-47ec2.inlemflJ

UApr-2009 0900:06

11-Apr-2009 09 01:28

21sec)

ifl

;/1970.g7Di-2083M610

 

 

 

 

5-3.任务页面的屏幕截图

 

Status列对调试非常有用,因为它显示了任务的最新状态信息。任务开始之前,显示的状态为initializing,—旦开始读取记录,它便以字节偏移 量和长度作为文件名,显示它正在读取的文件的划分信息。你可以看到我 们为任务task_200904110811*003_m_000044进行调试时的状态显示,单击日志页面找到相关的调试信息。注意,这个任务有一个附加计数器,因为这个任务的用户计数器有一个非零的计数。

2.任务详细信息页面

从任务页面中,可以单击任何任务获得更多相关信息。图5-4的详细任务信息页面显示了每个taskattempt。在这个范例中,只有一个成功完成的taskattempt。此图表进一步提供了十分有用的数据,如taskattempt的运行节点和指向任务日志文件和计数器的链接。

 

Twfc.AiluK**

*:js

iAcioi*

一軟HtfUCOKW.t KM rl,,,»〇〇〇

",Apr.

1

ST

12

 

 

5-4.任务详细信息页面的屏幕截图

Actions列包括终止taskattempt的链接。在默认情况下,这项功能是禁用的,Web用户界面是只读接口。将webinterface.private.actions设置成true,即可启用此动作的链接。

f:将webinterface.Private.actions设置为true,意味着允许任

何人访问HDFS Web界面来删除文件。dfs.web.ugi属性决定以哪个用户身份运行HDFS Web UI,从而控制可以査看或删除哪些文件。

对于map任务,页面中还有一部分显示了输入分片分布在哪些节点上。

通过跟踪成功taskattempt的日志文件链接(可以看到每个日志文件的最后 4KB8KB或整个文件),会发现存在问题输入记录。这里考虑到篇幅,已经进行了转行和截断处理:

Temperature over100degrees for input:

0335999999433181957042302005+37950+139117SAO + 0004R3SNV020113590031500703569999994 33201957010100005+35317+1B9650SAO+000899999V02002359002650076249N0040005994-0067…

此记录的格式看上去与其他记录不同。可能是因为行中有空格,规范中没有这方面的描述。

作业完成后,査看我们定义的计数器的值,检査在整个数据集中有多少记录超过100‘C。通过Web界面或命令行,可以査看计数器:

%hadoop job -counter job_200904110811_0003,v4.MaxTemperatureMapper$Temperature'\ OVER_100

3

 

-counter选项的输入参数包括作业ID,计数器的组名(这里一般是类名)和 计数器名称(enum名)。这里,在超过十亿条记录的整个数据集中,只有三个异常记录。直接扔掉不正确的记录,是许多大数据问题中的标准做法。然而,这里我们需要谨慎处理这种情况,因为我们寻找的是一个极限值-最 高气温值,而不是一个总量。当然,扔掉三个记录也许不会改变最终 结果。

3.处理不合理的数据

捕获引发问题的输入数据是很有价值的,因为我们可以在测试中用它来检査mapper的工作是否正常:

@Test

public void parsesMalformedTemperature() throws IOException, InterruptedException {

Text value = new Text(,,0335999999433181957042302005+37950+139117SAO +0004" +

// YearAAAA

"RDSN V02011359003150070356999999433201957010100005+353M);

// TemperatureAAAAACounters counters = new Counters();

new MapDriver<longWritable, Text^ Text, IntWritable()> withMapper [new MaxTemperature Mapper()] withlnputValue (value). withCauters(counters) runTest();

OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);

Reporter reporter = mock(Reporter.class)j

mapper.map(null>value, output, reporter);

verify(output, never()).collect(any(Text.class), any(IntWritable.class));

verify(reporter).incrCounter(MaxTemperatureMapper.Temperature. MALFORMED, 1);

} •

引发问题的记录与其他行的格式是不同的。范例5-12显示了修改过的程序(版本5),它使用的解析器忽略了那些没有首符号(+或_)气温字段的行。我们还引入一个计数器来统计因为这个原因而被忽略的记录数。

范例5-12.mapper用于查找最高气温

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>,

enum Temperature {

•MALFORMED

 

private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key, Text value, context context throws IOException, InterruptedException { parser.parse(value);

if (parser.isValidTemperature()) {

int airTemperature = parser.getAirTemperature();

context.write(new Text(parser.getYear), new IntWritable(airTemperatune));

} else if (parser.isMalformedTemperature()) {

System, err. print In ("Ignoring possibly corrupt input: .. + value); cotext•getCounter(Temperature.MALFORMED, 1);

5.5.6 Hadoop日志

针对不同用户,Hadoop在不同的地方生成日志。表5-2对此进行了总结。

从前文可以看到,MapReduce任务日志可以从Web界面访问,这是最便捷的方式。也可以从执行taskattempt(tasktracker的本地文件系统中找到日志文件,目录以taskattempt来命名。如果启用任务]VM重用功能(参见6.5.4节),每个日志文件累加成为整个JVM运行日志,所以,多个taskattempt存放在一个日志文件中。Web界面隐藏了这一点,只显示与正在査看的taskattempt相关的部分日志。

对这些日志文件的写操作是很直观的。任何到标准输出或标准错误流的写操作都直接写到相关日志文件。当然,在Streaming方式下,标准输出被用于mapreduce的输出,所以不会出现在标准输出日志文件中。

Java中,如果想用ApacheCommons LoggingAIM,就可以写入任务的系统日志文件中(syslogfile)。如范例5-13所示。

范例5-13•这个等价的Mapper写到标准输出(使用Apache Commons Logging API)

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.Mapper;

public class LoggingIdentityMapper<KEYIN>VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

private static final Log LOG = LogFactory.getLog(LoggingIdentityMapper.class);

 

@Override

public void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {

// Log to stdout file

System.out.printIn("Map key: " + key); .

// Log to syslog file

LOG.info("Map key: " + key);

if (LOG.isDebugEnabled()) {

LOG.debug("Map value: " + value);

}

context.write((KEYOUT) key, (VALUEOUT) value);

默认的日志级别是INFO,因此BEBUG级别的消息不在syslog任务日志文件中出现。然而,有时候如果希望看到这些消息可以适当设置mapred.map.child.log.level或者mapred.reduce.child.log.level(从

0.22开始)。例如,对于上面的情况你可以为mapper进行如下设置,以便能够看到日志中的map值。

%hadoop jar hadoop-examples.jar LoggingDriver -conf conf/hadoop-cluster.xml \

-D mapred.map.child.log.level=DEBUG input/ncdc/sample.txt ^logging-out

有一些控制用于管理任务日志的大小和记录保留时间。在默认情况下,日志最短在24小时后删除(通过mapred.user'log.retain.hours属性来设置)。也可以用mapred.userlog.limit.kb属性在每个日志文件的最大大小上设置一个阈值,默认值是〇,表示没有上限。

5-2. Hadoop曰志的类型

日志

主要对象

描述

更多信息

系统守护 进程日志

管理员

每个Hadoop守护进程产生一个日志文件 (使用log^fj)和另一个(文件合并标准输出 和错误)。这些文件分别写入hadoop_ L0G_DIR环境变量定义的目录

参见9.4.2节 和10.2.1

HDFS审计日志

管理员

这个日志记录所有HDFS请求,默认是关 闭状态。虽然该日志存放位置可以配置, 但一般写入namenode的日志

参见丨0.1.3

MapReduce作业历史

日志

用户

记录作业运行期间发生的事件(如任务完 成)。集中保存在jobtracker上的_logs/ history子目录中的作业输出目录中

参见5.2.2的补充内容任务历史

 

 

 

续表

日志主要对象描述更多信息

MapReduce用户毎个tasktr^cker子进程都用log4j产生一参见本节

任务日志个日志文件(称作—个保存发到

标准输出(^也《0数据的文件,一个保存标准错误(sWerr)的文件。这些文件写入到HADOOP_LOG_DIR环境变量定义的目录的twer/ogs的子目录中

有时你可能需要调试一个问题,这个问题你怀疑在运行一个Hadoop命令的JVM上发生,而不是在襄群上。你可以通过如下调用将DEBUG级别曰志发送给控制台:

%HAD00P_R00T_L06GER=DEBUG,console hadoop fs -text /foo/bar

5.5.7远程调试

当一个任务失败并且没有足够多的记录信息来诊断错误时,可以选择用调试器运行该任务。在集群上运行作业时,很难使用调试器,因为你不知道哪个节点处理哪部分输入,所以不能在错误发生之前安装调试器。然而,有一些其他可用的方法。

•在本地重新产生错误对于特定的输入,失败的任务通常总会失败。你可以尝试通过下载致使任务失败的文件到本地运行重现问题,这可以使用到调试器(如JavaVisualVM)。

•使用JVM调试选项失败产生的一个通常原因是任务JVMJava内存溢出错误。可以将Mapred.child.java.opt设置包含-XX: HeapDiimpOnOutOfMemoryError-XX:HeapDumpPath=/path/to/dumps

该设置将产生一个堆转储(heapdump),这可以通过或Eclipse MemoryAnalyzer这样的工具来检査。注意,该JVM选项应当添加到由mapred.child,java.opts指定的已有内存设置中。9.4.2

节在讨论内存时有进一步的讨论。

•使用任务分析Javaprofiler提供了很多JVM的内部细节,Hadoop提供了分析作业中部分任务的机制。参阅5.6.1节。

 

• 使用丨solationRunner老版本的Hadoop提供了一个称为

IsolationRunner的任务运行器,它可以在集群的原来位置重新运行失败的任务。不幸的是,在目前的版本中不再支持了,但可以在https://issues.apache•or'g/jira/browse/MAPREDUCE-ZSS?找一下它的代替者。

在一些情况下保存失败的任务尝试的中间结果文件对于以后的检査是有用的,特别是在任务工作路径中建立转储或配置文件。可以将keep..failed.task.files设置为true来保存失败的任务文件。

你也可以保存成功任务的中间结果文件,以便解释任务没有失败。这时,将属性keep.task.files.pattern设置为一个正则表达式(与保留的任务ID匹配)。

为了检査中间结果文件,登录到任务失败的节点并找到该任务尝试的目录。它在一个本地MapReduce目录下,由mapred.local.dir的设置决定(细节 请参阅9_4.3)。如果这个属性是以逗号分隔的目录列表(在一台机器的物 理磁盘上分布负载),在找到那个特定的task attempt之前,需要搜索整个目录。taskattemp的目录在以下位置:

mapred.local.dir/taskTracker/jobcache/job'ID/task-attempt-ID

5.6作业调优

作业运行后,许多开发人员可能会问:“能够让它运行得更快一些吗?”

有一些Hadoop相关的“疑点”值得检査一下,看看它们是不是引发性能问 题的“元凶”。在开始任务级别的分析或优化之前,必须仔细研究表5-3所示的检査内容。

5-3.作业调优检查表

范围

mapper数量

 

续表

范围

最佳实践

进一步信息

reducer的 数量

为了达到最髙性能,集群中reducer数 应该略少于reducer的任务槽数。这将 使reducer能够在同一个周期in one wave)完成任务,并在reducer阶段充分 使用集群

7_1_1

combiner

作业能否充分利用combiner来减少通 过shuffle传输的数据量

2.4.2

中间值的压缩

map输出进行压缩几乎总能使作业执 行得更快

4.2.3

自定义序列

如果使用自定义的Writable对象或自 定义的comparator,则必须确保芑实现RawComparator

4.3.3

调整shuffle

MapReduceshuffle过程可以对一些 内存管理的参数进行调整,以弥补性能 的不足

6.4.3

 

 

5.6.1分析任务

正如调试一样,对MapReduce这类分布式系统上运行的作业进行分析也有诸多挑战。Hadoop允许分析作业中的一部分任务,并且在每个任务完成时,把分析信息放到用户的机器上,以便日后使用标准分析工具进行分析。

当然,对本地作业运行器中运行的作业进行分析可能稍微简单些。如果你有足够的数据运行mapreduce任务,那么对于提高mapperreducer的性能有很大的帮助。但必须注意一些问题。本地作业运行器是一个与集群完全不同的环境,并且数据流模式也截然不同。如果MapReduce作业是I/O密集型的(很多作业都属于此类),那么优化代码的CPU性能是没有意义的。为了保证所有调整都是有效的,应该在实际集群上对比新老执行时间。这说起来容易做起来难,因为作业执行时间会随着与其他作业的资源争夺和调度器决定的任务顺序不同而发生改变。为了在这类情况下得到较短的作业执行时间,必须不断运行(改变代码或不改变代码),并检査是否有明显的改进。

有些问题(如内存溢出)只能在集群上重现,在这些情况下,必须能够在发生问题的地方进行分析。

 

1.HPROF分析工具

许多配置属性可以控制分析过程,这些属性也可以通过]obConf的简便方法获取。下面对MaxTemperatureDrive(版本6)的修改将启用远程HPROF分析。HPROFJDK自带的分析工具,虽然只有基本功能,但是同样能提供程序的CPU和堆使用情况等有用信息。®

Configuration conf=getConf();

conf.setBoolean("mapred.task.profile", true)j

conf.set("mapred.task.profile.params", "-agentlib:hprof=cpu=samples," + "heap=sites, depth=6j forcesn, thread=y, verbose=n, f ile=%s ••); conf .set ("mapred. task, profile, maps", "0-2•_); conf.set("mapred.task.profile.reduces", "); // no reduces ]ob job = new ]ob(conf, "Max temperature");

第一行启用了分析工具(默认是关闭状态)。在新的API中,可以使用]obContext.TASK_PROFIL常量,而不使用mapred.task.profile

接下来设置分析参数,即传到任务JVM的额外的命令行参数。一旦启用分析,即使启用JVM重用,也会给每个任务分配一个新的JVM。详见6.5.4节。默认参数定义了HPROF分析器,示例中设置一个额外的HPROF选项depth=6,以便能达到更深的栈跟踪深度(相比HPROF默认值)。使用DobContext.Task_PROFILE_PARAMS相当于设置mapred.task.profile.params0

最后,指定希望分析的任务。一般只需要少数几个任务的分析信息,所以mapred.task.profile.map mapred.task.profile.reduces M个属性来指定想要分析的mapreduce任务ID的范围。我们将map属性设置为〇-2(默认情况下),这意味着ID0、1、2的map任务将被分析。允许范围集合(a set of ranges)的表示方法,使用一个标注允许开放范(openrange)。例如0-1、4、6-将指定除了ID2、3、5之外的所有任 务。要分析的map任务,还可以使用HobContext.NUM_MAP_PROGILES量来控制,reduce任务则由]obContext.NUM_REDUCE_PROFILES常量来控制。

使用修改过的驱动程序来运行作业时,分析结果将输出到在启动作业的文

HPROF使用字节码插入来解析代码,所以在使用自己的应用程序之前,我们不需要带上特殊选项来重新编译它。有关HPROF的详情,请参见题为“HPROF: A Heap/CPU Profiling. Tool in J2SE 5.0”的文章,作者Kelly O’Hair,网址为http://java.sun.com/developer/technicalArticles/Programming/HPROF.html

 

件夹中该作业的末尾。因为我们只分析少数几个任务,所以可以在数据集的子集上运行该作业。下面取自mapper的一个分析文件,它显示了CPU的抽样信息:

CPU

SAMPLES BEGIN

(total

=1002)

Sat Apr 11

11:17:52 2009

rank

self

accum

count

trace

method

1

3.49%

3.49%

35

307969

java.lang.Object.<init>

2

3.39%

6.89%

34

307954

java.lang.Object.<init>

3

3.19%

10.08%

32

307945

java.util.regex.Matcher.<init>

4

3.19%

13.27%

32

307963

java.lang.Object.<init>

5

3.19%

16.47%

32

307973

java.lang.Object.<init>

 

 

交叉引用跟踪号307973显示了同一文件的栈跟踪轨迹:

TRACE307973:(thread=200001)

java•lang.Object•<init>(Object.java:2汉)

ong.apache.hadoop.io.IntWritable.<init>(IntWritable.java:29) v5.MaxTemperatureMapper.map(MaxTemperatureMapper.java:30) v5.MaxTemperatureMapper.map(MaxTemperatureMapper.java:14) org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:356)

因此可以看出,mapper花了3%的时间来构建IntWritable对象。这表明重用Writable实例作为输出来观察在我们的集群和数据集上是否有任何的性能提升。在一个实验中,我修改mapperWritable对象存储为实例变量。当我在〗1个节点的集群上运行修改后的程序时,在作业整体执行时 间上没有看到有明显的变化。当然,你的实验结果可能有所不同。

2.其他分析工具

获取分析输出的机制是HPROF专有的,因此如果你使用其他的分析工具必须从tasktracker中手动检索分析输出。

如果在所有tasktracker的机器上都没有安装分析工具,可以考虑使用DistributedCache(分布式缓存,详见8.4.2节)在需要的机器上安装该分析 工具。

5.7MapReduce的工作流

至此,你已经知道MapReduce应用开发的机制了。我们目前还未考虑如何将数据处理问题转化成MapReduce模型。本书前面的数据处理都用来解决十分简单的问题(如在指定年份找到最高气 温值的记录)。如果处理过程更复杂,这种复杂度一般是因为有更多的

 

MapReduce作业,而不是更复杂的mapreduce函数。换而言之,通常是增加更多的作业,而不是增加作业的复杂度。

对干更复杂的问题,可考虑使用比MapReduce更高级的语言,如PighiveCascadingCascalogCrunch。一个直接的好处是:有了它之后,就用不着处理到MapReduce作业的转换,而是集中精力分析正在执行的任务。

最后JimmyLinChrisDyer合著的《MapReduce数据密集型文本处理》7fcc/w/7/7—书是学习MapReduce算法

设计的优秀资源,强烈推荐。该书由Morgan&Claypool出版社于2010出 版,网址为we/。

5.7.1将问题分解成MapReduce作业

让我们看一个更复杂的问题,我们想把它转换成MapReduce工作流。

假设我们想找到每个气象台每年每天的最高气温记录的均值。例如,要计算029070~99999气象台的1月1日的每日最高气温的均值,我们将从这个气象台的190丨年1月丨日,1902年1月1日,直到2000年的1月1日的气温中找出每日最高气温的均值。

我们如何使用MapReduce来计算它呢?计算自然分解为下面两个阶段。

⑴计算毎对stationdate的毎日最髙气温。

本例中的MapReduce程序是最高气温程序的一个变种,不同之处在于本例中的键是一个综合的stationdate对,而不只是年份。

(2)计算毎个stationdaymonth键的每日最高气温的均值。

mapper从上一个作业得到输出记录(stationdate,最高气温值),丢掉年份部分,将其值投影到记录(stationdaymonth,最髙气温值)。 然后reducer为每个stationdaymonth键计算最髙气温值的均值。

第一阶段的输出看上去就是我们想要的气象台的信息。范例中的

脚本提供了HadoopStreaming的一个实现:

029070-99999190101010029070-9999919020101-94

前两个字段形成键,最后一列是指定气象台和日期所有记录中的最高气

 

温。第二阶段计算这些年份中每日最高气温的平均值:

029070-999990101-68

以上是气象台029070-99999在整个世纪中1月1日的日均最高气温-6.8‘C

只用一个MapReduce过程就能完成这个计算,但它可能会让部分程序员花更多精力。®

个作业可以包含多少(简单的)MapReduce步骤,这样整个作业由多个可分解的、可维护的mapperreducer组成。第16章的案例学习包括使用MapReduce来解决的大量实际问题,在每个例子中,数据处理任务都是使用两个或更多MapReduce作业来实现的。对于理解如何将问题分解成MapReduce工作流,第16章所提供的详细介绍非常有价值。

相对于我们已经做的maPPer_reducer完全可以进一步分解。maPPer一般执行输入格式解析、投影(选择相关的字段)和过滤(去掉无关记录)。在前 面的mapper中,我们在一个mapper中实现了所有这些函数。然而,还可以将这些函数分割到不同的maPPer,然后使用HadooP自带的ChainMapper类库将它们连接成一个mapper。结合使用ChainReducer,你可以在一个MapReduce作业中运行一系列的mapper,再运行一个reducer和另一个mapper链。

5.7.2关于JobControl

MapReduce工作流中的作业不止一个时,问题随之而来:如何管理这些作业按顺序执行?有几种方法,其中主要考虑是否有一个线性的作业链或一 个更复杂的作业有向无环图(directedacyclicgraphDAG)。

对于线性链表,最简单的方法是一个接一个地运行作业,等前一个作业运行结束后再运行下一个:

JobClient.runJob(confl);

]obClient•run]ob(conf2);

如果一个作业失败,run]ob()方法就抛出一个IOException,这样一来,管道中后面的作业就无法执行。根据具体的应用程序,你可能想捕获异常,并清除前一个作业输出的中间数据。①这个一个很有趣的练习。提示:使用8.2.4节介绍的内容。

 

这种方法类似新的MapReduceAPI,除了需要Job上的waitForCompletion()

方法的布尔返回值:true表示作业成功,而false表示失败。

对于比线性链表更复杂的结构,有相关的类库可以帮助你合理安排工作流。它们也适用于线性链表或一次性作业。最简单的是org.apache.hadoop.mapned.jobcontrol包中的DobContnol类。在org • apache .hadoop.mapred.jobcontrol包中也有一个等价的类。]obControl的实例表示一个作业的运行图,你可以加入作业配置,然后告知]obControl实例作业之间的依赖关系。在一个线程中运行DobControl时,它将按照依赖顺序来执行这些作业。也可以査看进程,在作业结束后,可以査询作业的所有状态和每个失败相关的错误信息。如果一个作业失败,IlobControl将不执行与之有依赖关系的后续作业。

5.7.3关于ApacheOozie

不同于在客户端运行并提交作业的]obControl, Ooize作为服务器运行,客户端提交一个立即或稍后执行的工作流定义到服务器。在Ooize中,工作流是一个由动作(action)节点和控制流节点组成的DAG(有向无环图)。

动作节点执行工作流任务,例如在HDFS中移动文件,运行MapReducePigHive作业,执行Sqoop导入,又或者是运行shell脚本或Java程序。控制流节点通过构建条件逻辑(不同执行分支的执行依赖于前一个动作节点 的输出结果)或并行执行来管理活动之间的工作流执行情况。当工作流结束 时,Oozie通过发送一个HTTP的回调向客户端通知工作流的状态。还可以在每次进入工作流或退出一个动作节点时接收到回调。

1.定义Oozie工作流

工作流定义是使用Hadoop Process Difinition LanguageXML格式来书写,这个规范可在Oozie网站(/z"p.7//«CM6a?or.apac/?e.org/oozie/)找到。范例5-14展示了一个运行单MapReduce作业的简单OoZie工作流定义。

范例5-14.用来运行求最高温度的MapReduce作业的Oozie工作流定义

<workflow-app xmlns="uri:oozie:workflow:0.1" name="max-temp-workflow">

<start to="max-temp-mr"/>

<action name="max-temp-mr">

<map-reduce>

<job-tracker>${jobTracker}</job-tracker>

 

<name-node>${nameNode}</name-node>

<prepare>

<delete path=M${nameNode}/user/${wf:user()}/output"/>

</prepare>

configuration

<property>

<name>mapred.mapper.class</name>

<value>01dMaxTemperature$01dMaxTemperatureMapper</value>

"property

<property>

<name>mapred.combiner.class</name>

<value>01dMaxTemperature$01dMaxTemperatureReducer</value>

</property>

<property>

<name>mapred.reducer.class/name>

<value>01dMaxTempenature$01dMaxTempe^atureReducer</value>

</property>

<property>

<name>mapred.output.key.class</name>

<value>org.apache.hadoop.io.Text</value>

</property>

<property>

<name>mapred.output.value.class</name>

<value>org.apache.hadoop.io.IntWritable</value>

</property>

<property>

<name>mapred.input.dir</name>

<value>/user/${wf:user()>/input/ncdc/micro</value>

</property>•

<property>

<name>mapred.output.dir</name>

<value>/user/${wf:user()}/output</value>

"property

</configuration

</map-reduce>

<ok to="end"/>

<error to="fail"/>

</action>

<kill name="fail">

<message>MapReduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message>

</kill>

<end name="end"/>

</workflow-app>

这个工作流有三个控制流节点和一个动作节点:一个start控制节点、一mapreduce动作节点、一个kill控制节点和一个end控制节点。节点及其之间的转换如图5-5所示。

 

 

 

5-5•—个Oozie工作流的转移图

每个工作流都必须有一个start节点和一个end节点。当工作流作业开始时,它转移到有start节点指定的节点上(本例中maxtempmr动作)。当一个工作流作业转移到end节点时就意味着它成功完成了。然而,如果一个工作流作业转移到了kill节点,那么就被认为失败了并且报告在工作流定义中的message元素指定的错误消息。

这个工作流定义文件的大部分都是指定mapreduce动作。前两个元素(jobtrackernamenode)用于指定提交作业的jobtracker和输入输出数据的namenode(实际上是一个Hadoop文件系统的URI)。两者都被参数化,使得工作流定义不受限于特定的集群,更有利于测试。这些参数在提交时指定为工作流属性,我们稍后会看到。

可选项prepare元素在MapReduce作业之前运行并用于目录删除(在需要的时候也可以创建目录,但这里没有指明)。通过确保输出目录在运行作业 之前处于一致的状态,如果作业失败的话,那么Oozie也可以安全地重新执行动作。*

运行MapReduce作业是在configuration元素中设定的,通过为Hadoop配置的名值对来设置嵌套的元素。可以把MapReduce配置部分看作本书中其他地方运行MapReduce程序(如范例2-6所示)使用的驱动程序表的一个 替代。

有两个非标准的Hadoop属性mapred.input.dirmapred.output.dir,它们分别用于设置FilelnputFormat输入路径和FileOutputFormat输出路径。

 

我们在工作流的定义中的几个地方利用了JSPExpressionLanguage(EL)语法。Oozie提供了一组与工作流交互的函数。例如${wf:user()}返回开始当前工作流作业的用户名,我们用它来指定正确的文件系统路径Oozie规范中列出所有Oozie支持的EL函数。

2.打包和配置Oozie工作流应用

工作流应用由工作流定义和所有运行它所需的资源(例如MapReduce JAR文件、Pig脚本等)构成。应用必须遵循一个简单的目录结构,并在HDFS上配置,这样它们才能被Oozie访问。对于这个工作流应用,我们将所有的文件放到max-temp-workflow根目录中,如下所示:

maxtempworkflow/

!— lib/

|1——hadoopexamples.jar

1——workflow.xml

工作流定义文件workflow.xm丨必须在该目录的顶层出现。包含应用的MapReduce类的JAR文件放在lib目录中。

遵循这种布局的工作流应用能通过很多合适的工具创建,例如AntMaven,你可以在本书附带的代码中找到样例。一旦创建应用,就使用正规的Hadoop工具将它复制到HDFS。命令如下:

%hadoop fs -put hadoop-examples/target/max-temp-workflow max-temp-workflow

3.运行Oozie工作流作业

接下来,我们看看如何为刚刚上载的应用运行一个工作流作业。为此,使用oozie命令行工具,它是用于和Oozie服务器通信客户端程序。方便起见,我们输出〇〇ZIE_URL环境变量来告诉oozie命令使用哪个Oozie服务器(这里我们使用本地运行的服务器):

%export OOZIE_URL="http://localhost:11000/oozie"

oozie工具有很多子命令(输入ooziehelp可得到这些子命令的列表),但我们将调用带有-run选项的job子命令:

%oozie job -config ch05/src/main/resources/max-temp-workflow.properties

-run job: 0000009-120119174508294-oozie-tom-W

config选项设定本地Java属性文件,它包含工作流XML文件里的参数

 

的定义以及oozie.wf.application.path,这个路径告知OoziHDFS中工作流应用的位置。该属性文件的内容如下:

nameNode=hdfs://localhost:8020jobT racken=localhost:8021

oozie.wf.application.path=${nameNode}/user/${user.name}/max-temp-workflow

为了得到工作流作业的状态信息,我们使用info选项,指定由前面运行的命令打印的作业ID(输入ooziejob将得到所有作业的列表):

%oozie job -info 0000009-120119174508294-oozie-tom-W

输出显示如下状态:RUNNINGKILLED或者SUCCEDED。通过Oozie的网页UI(A:///oca//2〇从•"0/oozie),你也可以找到这些信息。

作业成功后,可以通过以下常用方法检査结果:

%hadoop fs -cat output/part-*

1949111

195022

这个例子只描述了Oozie工作流的基本写入。Oozie网站上的文档介绍了如何创建更复杂的工作流以及如何写和运行coordinator作业。

 

 

转载请注明:全栈大数据 » 5 MapReduce应用开发

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址