整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第8章 MapReduce的特性

MapReduce的特性

本章探讨MapReduce的一些高级特性,包括计数器、数据集的排序和连接。

1. 计数器

在许多情况下,用户需要了解待分析的数据,尽管这并非所要执行的分析任务的核心内容。以统计数据集中无效记录数目的任务为例,如果发现无效记录的比例相当高,那么就需要认真思考为何存在如此多无效记录。是所采用的检测程序存在缺陷,还是数据集质量确实很低,包含了大量无效记录?如果确实是数据集的质量问题,则可能需要扩大数据集的规模以增 大有效记录的比例,从而进行有意义的分析。

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统 计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到map或 reduce任务,更好的方法通常是看能否用一个计数器值来记录某一特定事 件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获 取计数器值比输出日志更方便,坯有根据计数器值统计特定事件的发生次 数要比分析一堆日志文件容易得多。

1.1. 内置计数器

Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。

这些内置计数器被划分为若干个组,参见表8-1。

8-1. 内置的计数器分组

组别

 

名称/类别

 

参考

 

MapReduce任务计数器

org.apache.hadoop.mapred.Task$Counter(1.x版本)

org.apache.hadoop.mapreduce.TaskCounter(1.x之后版本)

表8-2

文件系统计数器

FileSystemCounters(1.x版本)

org.apache.hadoop.mapreduce.FileSystemCounter(1.x之后的版本)

表8-3

FilelnputFormat

计数器

org.apache.hadoop.mapred.FileinputFormat$Counter(1.x版本)

org.apache.hadoop.mapreduce.lib.input.FilelnputFormatCounter(1.x之后的版本)

表8-4

FilelnputFormat

计数器

org.apache.hadoop.mapred.FileOutputFormat$Counter(1.x版本)

org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter(1.x之后的版本)

表8-5

作业计数器

org.apache.hadoop.mapred.JobInProgress$Counter(1.x版本)

org.apache.hadoop.mapreduce.JobCounter(1.x之后的版本)

表8-6

各组要么包含任务计数器(在任务处理过程中不断更新),要么包含作业计数器(在作业处理过程中不断更新)。这两种类型将在后续章节中进行介绍。

1.1.1. 任务计数器

在任务执行过程中,任务计数器采集任务的相关信息,每个作业的所有任务的结果会被聚集起来。例如,MAP_INPUT_RECORDS计数器统计每个map任务输入记录的总数,并在一个作业的所有map任务上进行聚集,使得最终数字是整个作业的所有输入记录的总数。

任务计数器由其关联任务维护,并定期发送给tasktracker,再由tasktracker发送给jobtracker。因此,计数器能够被全局地聚集。(参见6.1.1节中对进度和状态更新的介绍。注意,该流程与YARN不同,详情参见6.L2节)。任务计数器的值每次都是完整传输的,而非自上次传输之后再继续尚未完成的传输,从而避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失败,则相关计数器的值会减小。

虽然只有当整个作业执行完之后计数器的值才是完整可靠的,但是部分计数器仍然可以在任务处理过程中提供一些有用的诊断信息,以便由Web界面监控。例如,PHYSICAL_MEMORY_BYTES、VIRTUAL_MEMORY_BYTES 和COMMITTED_HEAP_BYTES计数器显示特定任务执行过程中的内存使用变化情况。

内置的任务计数器包括在MapReduce任务计数器分组中的计数器(表8-2), 以及在文件相关的计数器分组(表8-3、表8-4和表8-5)中。

8-2.内置的MapReduce任务计数器

计数器名称

说明

map输入的记录数

(MAP_INPUT_RECORDS)

作业中所有map已处理的输入记录数。每次 RecordReader读到一条记录并将其传给map的map()函数时,该计数器的值增加

map跳过的记录数(MAP_SKIPPED_RECORDS)

作业中所有map跳过的输入记录数,详情参见6.5.5节

map输入的字节数

(MAP一INPUT一BYTES)

作业中所有map已处理的未经压缩的输入数据的字节数。每次RecordReader读到一条记录并将其传给map的map()函数时,该计数器的值增加

分片(split)的原始字节数 (SPLIT一RAW一BYTES)

由map读取的输入分片对象的字节数。这些对象描述分 片元数据(文件的位移和长度),而不是分片的数据自 身,因此总规模是小的

map输出的记录数(MAP_OUTPUT_RECORDS)

作业中所有map产生的map输出记录数。每次某一个map的OutputCollector调用collect()方法时,该计数器的值增加

map输出的字节数 (MAP_OUTPUT_BYTES)

作业中所有map产生的未经压缩的输出数据的字节数。每次某一个map的OutCollector调用collect()方法时,该计数器的值增加

map输出的物化字节数

(MAP一OUTPUT_MATERIALIZED_BYTES)

Map输出后确实写到磁盘上的字节数;若map输出压缩 功能被启用,则会在计数器值上反映出来

combine输入的记录数 (COMBINE_INPUT—RECORDS)

作业中所有combiner(如果有)已处理的输入记录数。 comWner的迭代器每次读一个值,该计数器的值增加。 注意:本计数器代表combiner已经处理的值的个数,并 非不同的码分组数(后者并无实质意义,因为对于 combiner而言,并不要求毎个键对应一个组,详情参见 2.4.2节和6.4节

combine输出的记录数(COMBINE__OUTPUT_RECORDS)

作业中所有combiner(如果有)已产生的输出记录每 当一个 combiner 的OutputCollector 调用 collect() 方法时,该计数器的值增加数。

reduce输入的组(REDUCE_INPUT_GROUPS)

作业中所有reducer已经处理的不同的码分组的个数。毎当某一个reducer的reduce()被调用时,该计数器的值增加

reduce输入的记录数 (REDUCE_INPUT_RECORDS)

作业中所有reducer已经处理的输入记录的个数L每‘某 个reducer的迭代器读一个值时,该计数器的值增加。如 果所有reducer已经处理数完所有输入,则该计数器的值 与计数器“map输出的记录”的值相同

reduce输出的记录数(REDUCE_OUTPUT_RECORDS)

作业中所有map已经产生的reduce输出记录数。每当某 个 reducer 的 OutputCollector 调用 collect()方法时,该计数器的值增加

reduce跳过的组数(REDUCE_SKIPPED_GROUPS)

作业中所有reducer已经跳过的不同的码分组的个数。详 情参见6.5.5节

reduce跳过的记录数(REDUCE—SKIPPED_RECORDS)

作业中所有reducer已经跳过的输人记录数

reduce经过shuffle的字节数(REDUCE_SHUFFLE—BYTES)

shuffle将map的输出数据复制到reducer中的字节数

溢出的记录数 (SPILLED_RECORDS)

作业中所有map和reduce任务溢出到磁盘的记录数

CPU毫秒(CPU_MILLISECONDS)

总计的CPU时间,以毫秒为单位,由/proc/cpuinfo获取

物理内存字节数(PHYSICAL__MEMORY_BYTES )

一个任务所用物理内存的字节数,由/proc/meminfo获取

虚拟内存字节数 (VIRTUAL一MEMORY一BYTES)

一个任务所用虚拟内存的字节数,由/proc/meminfo获取

有效的堆字节数(COMMITTED__HEAP_BYTES)

在JVM中的总有效内存量(以字节为单位),可由Runtime. getRuntime().totalMemory()获取

GC运行时间毫秒数(GC_TIME_MILLIS)

在任务执行过程中,垃圾收集器(garbage collection)花费 的时间(以毫秒为单位),可由GarbageCollector MXBean.getCollectionTime()获取:该计数器并未出现在1.x版 本中

由shuffle传输的map输出数(SHUFFLED_MAPS)

由shuffle传输到reducer的map输出文件数,详情参见6.4节,该计数器没有包含在l.x版本中

失败的shuffle数 (FAILED_SHUFFLE)

在shuffle过程中,发生拷贝错误的map输出文件数,该计数器并没有包含在1.x版本中

被合并的map输出数 (MERGED—MAP—OUTPUTS)

在shuffle过程中,在reduce端被合并的map输出文件数,该计数器没有包含在1.x版本中

8-3.内置的文件系统任务计数器

计数器名称

说明

文件系统的读字节数(BYTES_READ)

由map和reduce等(Filesystem)任务在各个文件系统中 读取的字节数,各个文件系统分别对应一个计数器,可以是LocalHDFSS3和KFS等

文件系统的写字节数(BYTES_WRITTEN)

由map和reduce等任务在各个文件系统中写的字节数

8-4.内置的FilelnputFormat任务计数器

计数器名称

说明

读取的字节数 (BYTES__READ)

由map任务通过FilelnputFormat读取的字节数

8-5.内置的FileOutputFormat任务计数器

计数器名称

写的字节数

(BYTES_WRITTEN)

说明

由map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数

1.1.2. 作业计数器

作业计数器(表8-6)由jobtracker(或者YARN中的应用宿主)维护,因此无需在网络间传输数据,这一点与包括“用户定义的计数器”在内的其他计数器不同。这些计数器都是作业级別的统计量,其值不会随着任务运行而改变。例如,TOTAL_LAUNCHED_MAPS统计在作业执行过程中启动的map任务数,包括失败的map任务。

8-6.内置的作业计数器

计数器名称

说明

启用的map任务数 (TOTAL_LAUNCHED_MAPS)

启动的map任务数,包括以“推测执行”方式启动的 任务

启用的reduce任务数(TOTAL_LAUNCHED_REDUCES)

启动的reduce任务数,包括以“推测执行”方式启动的任务

启用的uber任务数(TOTAL_LAUNCHED_UBERTASKS)

启用的uber任务数(详情参见6.1.2节),该计数器仅限干基于YARN 的 MapReduce

uber中的map任务数 (NUM_UBER_SUBMAPS)

在uber任务中的map任务数,该计数器仅限于基于YARN 的 MapReduce

uber中的reduce任务数(NUM_UBER__SUBREDUCES)

在uber任务中的reduce任务数,该计数器仅限于于YARN 的 MapReduce

失败的map任务数 (NUM_FAILED_MAPS)

失败的map任务数,用户可参见6.2.1节对任务失败的 讨论,了解失败原因

失败的reduce任务数 (NUM_FAILED_REDUCES)

失畋的reduce任务数

失败的uber任务数 (NUM_FAILED_UBERTASKS)

失败的uber任务数,该计数器仅限于基于YARN的 MapReduce

数据本地化的map任务数 (DATA_L0CAL_MAPS)

与输入数据在同一节点上的map任务数

机架本地化的map任务数 (RACK_LOCAL_MAPS)

与输入数据在同一机架范围内、但不在同一节点上的 map任务数

其他本地化的map任务数(OTHER_LOCAL_MAPS)

与输入数据不在同一机架范围内的map任务数。由于机 架之间的带宽资源相对较少,Hadoop会尽量让map任务靠近输入数据执行,因此该计数器值一般比较小。详情参见图2-2

map任务的总运行时间 (SLOTS_MILLIS__MAPS)

map任务的总运行时间,单位毫秒。该计数器包括以推 测执行方式启动的任务

Reduce任务的总运行时间 (SLOTS_MILLIS_REDUCES)

reduce任务的总运行时间,单位毫秒。该值包括以推测 执行方式启动的任务

在保留槽之后,map任务 等待的总时间fallow_slots_ MILLIS一MAPS)

在为map任务保留槽之后所花费的总等待时间,单位是 .毫秒。槽保留是针对大内存作业的容量调度器特性,参 见9.4.4节,该计数器在基于YARN的MapReduce中

无效

在保留槽之后,reduce任务等待的总时间(FALLOW_SLOTS_MILLIS一REDUCES)

在为reduce任务保留槽之后,花在等待上的总时间,单位是毫秒。槽保留是针对大内存作业的容量调度器特性,详情参见9.4.4节,该计数器在基于YARN的MapReduce中无效

 

 

1.2. 用户定义的Java计数器

MapReduce允许用户编写程序来定义计数器,计数器的值可在mapper或 reducer中增加,计数器由一个Java枚举(enum)类型来定义,以便对有关的 计数器分组。一个作业可以定义的枚举类型数量不限,各个枚举类型所包 含的字段数量也不限。枚举类型的名称即为组的名称,枚举类型的字段就 是计数器名称。计数器是全局的。换言之,MapReduce框架将跨所有mapreduce聚集这些计数器,并在作业结束时产生一个最终结果。

在第5章中,我们创建了若干计数器来统计天气数据集中不规范的记录数。范例8-1中的程序对此做了进一步的扩展,能统计缺失记录和气温质量代码的分布情况。

范例8-1.统计最高气温的作业,包括统计气温值缺失的记录、不规范的字段和质量代码

public class MaxTemperatureWithCounters extends Configured implements Tool {
    enum Temperature {
        MISSING,
        MALFORMED
    }
    static class MaxTemperatureMapperWithCounters
        extends Mapper<LongWritable, Text, Text, IntWritable> {
        private NedcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValidTemperature()) {
                int airTemperature = parser.getAirTemperature();
                context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
            } else if (parser.isMalformedTemperature()) {
                System.err.printIn("Ignoring possibly corrupt input: " + value);
                context.getCounter(Temperature.MALFORMED).increment(1);
            } else if (parser.isMissingTemperature()) {
                context.getCounter(Temperature.MISSING).increment(1);
            }
            // dynamic counter
            context.getCounter("TemperatureQuality", parser.getQuality()).increment(1);
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this,getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setMapperClass(MaxTemperatureMapperWithCounters.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        return job.waitForCompletion(true)?0:1;
    }
        
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
        System.exit(exitCode);
    }
}

 

理解上述程序的最佳方法是在完整的数据集上运行一遍:

% hadoop jar hadoop-examples.jar MaxTemperatureWithCounters \

input/ncdc/all output-counters

作业成功执行完毕之后会输出各计数器的值(由作业的客户端完成)。以下是 一组我们感兴趣的计数器的值。

12/02/04 19:46:38 INFO mapred.DobClient: TemperatureQuality

12/02/04 19:46:38 INFO mapred.DobClient:2=1246032

12/02/04 19:46:38 INFO mapred.JobClient:1=973422173

12/02/04 19:46:38 INFO mapred.DobClient:0=1

12/02/04 19:46:38 INFO mapred.JobClient:6=40066

12/02/04 19:46:38 INFO mapred.JobClient:5=158291879

12/02/04 19:46:38 INFO mapred.DobClient:4=10764500

12/02/04 19:46:38 INFO mapred.UobClient:9=66136858

12/02/04 19:46:38 INFO mapred.JobClient:Air Temperature Records

12/02/04 19:46:38 INFO mapred.JobClient:Malformed=3

12/02/04 19:46:38 INFO mapred.JobClient:Missing=66136856

 

1.2.1. 动态计数器

上述代码还使用了动态计数器,这是一种不由Java枚举类型定义的计数器。由于Java枚举类型的字段在编了译阶段就必须指定,因而无法使用枚举类型动态新建计数器。范例8-1统计气温质量的分布,尽管通过格式规范定义 了可以取的值,但相比之下,使用动态计数器来产生实际值更加方便。在该例 中,Reporter对象的incrCounter()方法有两个String类型的输入参 数,分别代表组名称和计数器名称:

public void incrCounter(String group, String counter, long amount)

鉴于Hadoop需先将Java枚举类型转变成String类型,再通过RPC发送 计数器值,这两种创建和访问计数器的方法(即使用枚举类型和String类 型)事实上是等价的。相比之下,枚举类型易于使用,还提供类型安全,适 合大多数作业使用。如果某些特定场合需要动态创建计数器,则可以使用 String 类型。

1.2.2. 易读的计数器名称

计数器的默认名称是枚举类型的Java完全限定类名。由于这种名称在Web 界面和终端上可读性较差,因此Hadoop提供“资源捆绑”(resource bundle) 这种方式来修改计数器的显示名称。前面的例子即是如此,显示的计数器名称是 Air Temperature Records,而非 Temperature$MISSING。对动态计数器而 言,组名称和计数器名称也用作显示名称,因而通常没有这个问题。

为计数器提供易读名称也很容易。以Java枚举类型为名创建一个属性文件,用下划线(_)分隔嵌套类。属性文件与包含该枚举类型的顶级类放在同一目录。例如,例8-1中的Temperature枚举类型对应的属性文件被命名MaxTemperatureWithCounters_Temperature.properties0

属性文件应包含一个唯一的CounterGroupName属性,其值便是整个组的显示名称。在枚举类型中定义的每个字段均有一个属性与之对应,属性名称是“字段名称.name”,属性值是该计数器的显示名称。属性文件 MaxTemperatureWithCounters的内容如下:

CounterGroupName=Air Temperature Records

MISSING.name=Missing

MALFORMED.name=MaIformed

Hadoop使用标准的Java本地化机制将正确的属性文件载入到当前运行区 域•。例如,创逢一个名为 MaxTemperatui-eWithCountefs—Temperature—zhjCN.properties 的中文属性文件,在zh_CN区域运行时,就会使用这个属性文件。详情请 参见 java.util.Proper'tyResourceBundle 类的相关文档。

3.获取计数器

除了通过Web界面和命令行(执行hadoop job -counter指令)之外,用 户还可以使用Java API获取计数器的值。通常情况下,用户一般在作业运行完成、计数器的值已经稳定下来时再获取计数器的值,而Java API还支持在作业运行期间就能够获取计数器的值。范例8-2展示了如何统计整个数据集中气温信息缺失记录的比例。

范例8-2.统计气温信息缺失记录所占的比例

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class MissingTemperatureFields extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 1) {
            JobBuilder. printUsage(this, "<job ID>");
            return -1;
        }
        String jobID = args[0];
        JobClient jobClient = new JobClient(new JobConf(getConf()));
        RunningJob job = jobClient.getJob(JobID.forName(jobID));
        if (job == null) {
            System.err.printf("No job with ID %s found.\n", jobID);
            return -1;
        }
        if (!job.isComplete()) {
            System.err.printf("Job %s is not complete.\n", jobID);
            return -1;
        }
        Counters counters = job.getCounters();
        long missing = counters.getCounter(MaxTemperatureWithCounters.Temperature.MISSING);
        long total = counters.getCounter(Task.Counter.MAP_INPUT__RECORDS);
        System.out.printf("Records with missing temperature fields: %.2f%%\n", 100.0 * missing / total);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);
        System.exit(exitCode);
    }
}

 

首先,以作业ID为输入参数调用一个JobClient实例的getJob()方法,返回一个RunningJob对象,从而检查是否有一个作业与指定ID相匹配。 有多种因素可能导致无法找到一个有效的RunningJob对象,例如,错误地指定了作业ID,或jobtracker不再指向该作业。(内存中仅保留最新的 100 个作业,该阈值受 mapred. jobtracker.completeuserjobs.maximum 控制,当jobtracker重启时,所有作业信息都被清除。)

其次,如果确认该Running]ob对象(即作业)存在,则调用该对象的 getCounters()方法会返回一个Counters对象,封装了该作业的所有计 数器。Counters类提供了多个方法用于获取计数器的名称和值。上例调用 getCounter^)方法,它通过一个枚举值来获取气温信息缺失的记录数和被 处理的记录数(根据一个内置计数器)。

最后,输出气温信息缺失记录的比例。针对整个天气数据集的运行结果 如下:

% hadoop jar hadoop-examples.jar MissingTemperatureFields job_201202040938_0012

Records with missing temperature fields: 5.47%

使用新版MapReduce API上例使用了旧版API,原因是新版的在作业完成后获取计数器的方法并不适用于Hadoop 1.x版本。(使用旧版编写的代码可以运行在新版API上并获取计数器)。新旧版本API的主要差别在于是否使用Cluster对象来获取一个Job对象(而非一个RunningJob对象),然后再调用它的getCounters()方法。

Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobID.forName(jobID));
Counters counters = job.getCounters();
long missing = counters.findCounter(
MaxTemperatureWithCounters.Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

一个区別在于新版 API 使用 org.apache.hadoop.mapreduce.TaskCounter这个枚举类型,而旧版API则使用org.apache.hadoop.mapred. Task .Counter这个枚举类型。

1.3. 用户定义的Streaming计数器

使用Streaming的MapReduce程序可以向标准错误流发送一行特殊格式的信息来增加计数器的值,这种技术可被视为一种计数器控制手段。信息的格式如下:

reporter:counter:group,counter,amount

以下Python代码片段将Temperature组的Missing计数器的值增加1:

sys.stderr.write("reporter:counter:Temperature,Missing,1\n")

状态信息也可以类似方法发出,格式如下:

reporter:status:messoge

2. 排序

排序是MapReduce的核心技术。尽管应用本身可能并不需要对数据排序,但仍可能使用MapReduce的排序功能来组织数据。本节将讨论几种不同的数据集排序方法,以及如何控制MapReduce的排序。4.4.8节介绍了如何对 Avro数据进行排序。

2.1. 准备

下面将按气温字段对天气数据集排序。由于气温字段是有符号整数,所以不能将该字段视为Text对象并以字典顺序排序。反之,我们要用顺序文 件存储数据,其IntWritable键代表气温(并且正确排序),其Text值就是数据行。

有一个常用的方法能解决这个问题(特别是针对基于文本的Streaming应用):首 先,增加偏移量以消除所有负数> 其次,在数字前面增加〇,使所有数字的长度相 等;最后,用字典法排序。8.2.4节要介绍另一种方法。

 

范例8-3中的MapReduce作业只包含map任务,它过滤输入数据并移除包 含有无效气温的记录。各个map创建并输,出一个块压缩的顺序文件。相关 指令如下:

% hadoop jar hadoop-examples.jar SortDataPreprocessor input/ncdc/all \ input/ncdc/all-seq

范例8-3.该MapReduce程序将天气数据转成SequenceFile格式

 

public class SortDataPreprocessor extends Configured implements Tool {
    static class CleanerMapper
    extends Mapper<LongWritable, Text, IntWritable, Text> {
        private NcdcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValidTemperature()) {
                context.write(new IntWritable(parser.getAirTemperature()), value);
            }
        }
        @Override
        public int run(String[] args) throws Exception {
            Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (job == null) {
                return -1;
            }
            job.setMapperClass(CleanerMapper.class); 
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(0);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            SequenceFileOutputFormat?setCompressOutput(job, true);
            SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            SequencefileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
            return job.waitForCompletion(true) ? 0 : 1;
        }
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
            System.exit(exitCode);
        }
    }
}

2.2. 部分排序

7.1.1节所述,在默认情况下,MapReduce根据输入记录的键对数据集排序。范例8-4则是一个变种,它利用IntWritable键对顺序文件排序。

范例8-4.程序调用默认HashPartitioner按IntWritable键排序顺序文件

public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(SequenceFilelnputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
        SequenceFileOutputFormat?setOutputCompressionType(job,
        CompressionType.BLOCK);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(), args);
        System.exit(exitCode);
    }
}

 

2.2.1. 控制排列顺序

键的排列顺序是由RawComparator控制的,规则如下。

(1)若属性 mapred.output.key.comparator.class已经被显式设置,或者通过Job类的setSortComparatorClass()方法进行设置,则使用该类的实例(旧版API使用JobConf类的 setOutputKeyComparatorClass()方法。)

(2)否则,键必须是WritableComparable的子类,并使用针对该键类的已登记comparator。

(3)如果还没有已登记的comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操作。

上述规则彰显了为自定义Writable类登记RawComparators优化版本的重要性,详情可参见4.3.4节介绍的如何为速度实现一个RawComparator。 同时,通过定制comparator来重新定义排序顺序也很直观,详情可参见 8.2.4节对辅助排序的讨论。

 

假设采用30个reducer来运行该程序:

% hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitionep \

-D mapred.reduce.tasks=30 input/ncdc/all-seq output-hashsort

 

该指令产生30个已排序的输出文件。但是如何将这些小文件合并成一个有序的文件却并非易事。例如,直接将纯文本文件连接起来无法保证全局有序。幸运的是,许多应用并不强求待处理的文件全局有序。例如,对于查找操作来说,部分排序的文件就已经足够了。

2.2.2. 应用:基于分区的MapFile查找技术

以按键执行查找操作为例,在多文件情况下效率更高。在范例8-5中,如果输出格式被改为MapFileOutputFormat,则输出30个map文件,可以 基于这些文件执行査找操作。®

范例8-5.该MapReduce程序对一个顺序文件排序并输出MapFile

public class SortByTemperatureToMapFile extends Configured implements Tool
{
    @Override
    public int run(String[] args) throws Exception {
        Job job = 3obBuilder.parselnputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(SequenceFilelnputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputFormatClass(MapFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(job,
        CompressionType.BLOCK);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SortByTemperatureToMapFile(), args);
        System.exit(exitCode);
    }
}

MapFileOutputFormat提供了两个便捷的静态方法,用于对MapReduce输出文件执行查找操作,其用法如范例8-6所示。

范例8-6.从MapFiles集合中获取符合指定键的第一项记录

public class LookupRecordByTemperature extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            JobBuilder.printUsage(this, "<path> <key>");
            return -1;
        }
        Path path = new Path(args[0]);
        IntWritable key = new IntWritable(Integer.parseInt(args[1]));
        
        Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
        Partitioner<IntWritable, Text> partitioner =
new HashPartitioner<IntWritable, Text>();
        Text val = new Text();
        Writable entry = MapFileOutputFormat.getEntry(readers, partitioner, key, val);
        if (entry == null) {
            System.err.printIn("Key not found: " + key);
            return -1;
        }
        NcdcRecordParser parser = new NcdcRecordParser();
        parser.parse(val.toString());
        System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear());
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new LookupRecordByTemperature(), args);
        System.exit(exitCode);
    }
}

 

getReaders()方法为MapReduce作业创建的每个输出文件分别打开一个MapFile.Reader实例,用一个Reader数组(即readers)表示。之后,getEntry()方法使用Partitioner找到包含指定键的Reader实例,再通过该Reader实例的get()方法得到这个键对应的值(val)。如果getEntry()返回null,则表明没有找到匹配的键。否则,返回一个描述对应气象站ID和年份的值。

举例来说,若要查找首条气温为10℃的记录(注意,气温字段是整数类型,其值是真实气温的10倍。例如,10℃被记为-100),则有:

% hadoop jar hadoop-examples.jar LookupRecordByTemperature output-hashmapsort -100

357460-99999 1956

我们还可以直接用readers数组来获得包含指定键的所有记录。readers数组按分区排序,因而针对一个指定键的reader,均可以通过MapReduce作业中的同一个partitioner获得:

Reader reader = readers[partitioner.getPartition(key, val, readers.length)];

一旦找到reader,可通过MapFile的get()方法获取第一条包含指定键的记录。接着,循环调用next()获取下一条记录,直到键改变为止。相关程序如范例8-7所示。

范例8-7.从一个MapFiles集合中获取包含指定键的所有记录

public class LookupRecordsByTemperature extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            JobBuilder.printUsage(this, "<path> <key>");
            return -1;
        }
        Path path = new Path(args[0]);
        IntWritable key = new IntWritable(Integer.parselnt(args[1]));
        Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
        Partitioner<IntWritable, Text> partitioner =
        new HashPartitioner<IntWritable, Text>();
        Text val = new Text();
        Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
        Writable entry = reader.get(key, val);
        if (entry == null) {
            System.err.printf("Key not found:" + key);
            return -1;
        }
        NcdcRecordParser parser = new NcdcRecordParser();
        IntWritable nextKey = new IntWritable();
        do {
            parser.parse(val.toString());
            System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear());
        } while(reader.next(nextKey, val) && key.equals(nextKey)); return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new LookupRecordsByTemperature(), args);
        System.exit(exitCode);
    }
}

 

下例描述如何获取所有-10℃的读数,并计数:

% hadoop jar hadoop-examples.jar LookupRecordsByTemperature output-hashmapsort -100 \

2> /dev/null | wc -1

1489272

2.3. 全排序

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区(a single partition)但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。

事实上仍有替代方案:首先,创建一系列排好序的文件其次,串联这些文件最后,生成一个全局排序的文件。主要的思路是使用一个partitioner来描述输出的全局排序。例如,可以为上述文件创建4个分区,在第一分区中,各记录的气温小于-10℃,第二分区的气温介于-10℃0之间,第三个分区的气温在010之间,最后一个分区的气温大于10

该方法的关键点在于如何划分各个分区。理想情况下,各分区所含记录数应该大致相等,使作业的总体执行时间不会受制于个別reducer。在前面提 到的分区方案中,各分区的相对大小如下所示。

气温范围

<-1O℃

[-10℃,0℃)

[0℃,10℃)

≥10

记录所占的比例

11%

13%

17%

59%

 

显然,记录没有被均匀划分。只有深入了解整个数据集的气温分布才能建立更均匀的分区。写一个MapReduce作业来计算落入各个气温桶的记录数,并不困难。例如,图8-1显示了桶大小为1℃时各桶的分布情况,各点分别对应一个桶。

获得气温分布信息意味着可以建立一系列分布非常均匀的分区。但由于该操作需要遍历整个数据集,因此并不实用。通过对键空间进行采样,就可较为均匀地划分数据集。采样的核心思想是只査看一小部分键,获得键的近似分布,并由此构建分区。幸运的是,Hadoop已经内置了若干采样器,不需要用户自己编写。

InputSampler类实现了Sampler接口,该接口的唯一成员方法(即getSampler)有两个输入参数(一个InputFormat对象和一个Job对象),返回一系列样本键:

public interface Sampler<K, V> {
    K[] getSample(InputFormat<K, V> inf, Job job)
    throws IOException, InterruptedException;
}

该接口通常不直接由客户端调用,而是由InputSampler类的静态方法writePartitionFile()调用,目的是创建一个顺序文件来存储定义分区的键。

public static <K, V> void writePartitionFile(Job job, Sampler<K, V> sampler) throws IOException, ClassNotFoundException, InterruptedException

blob.png 

8-1.天气数据集合的气温分布

顺序文件由TotalOrderPartitioner使用,为排序作业创建分区。范例8-8整合了上述内容。

 

范例8-8.调用TotalOrderPartitioner按IntWritable键对顺序文件进行全局排序

public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parselnputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
        job.setPartitionerClass(TotalOrderPartitioner.class);
        
        InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
        InputSampler.writePartitionFile(job, sampler);
        
        // Add to DistributedCache
        Configuration conf = job.getConfiguration();
        String partitionFile =TotalOrderPartitioner.getPartitionFile(conf);
        URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
        DistributedCache.addCacheFile(partitionUri, conf);
        DistributedCache.createSymlink(conf);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(
            new SortByTemperatureUsingTotalOrderPartitioner(), args);
        System.exit(exitCode);
    }
}

 

该程序使用RandomSampler以指定的采样率均匀地从一个数据集中选择样本。在本例中,采样率被设为0.1RamdomSampler的输入参数还包括最大样本数和最大分区(本例中这两个参数分别是1000010,这也是InputSampler作为应用程序运行时的默认设置),只要任意一个限制条件满足,即停止采样。采样器在客户端运行,因此,限制分片的下载数量以加速采样器的运行就尤为重要。在实践中,采样器的运行时间仅占作业总运行时间的一小部分。

为了和集群上运行的其他任务共享分区文件,InputSampler需将其所写的分区文件加到分布式缓存中(参见8.4.2节)。

以下方案别以-5.6℃、13.9℃和22.0℃为边界得到4个分区。易知,新方案比旧方案更为均匀。

气温范围

<5.6℃

[ -5.6℃, 13.9℃)

[13.9℃, 22.0℃)

≥22.0℃

记录所占的比例

29%

24%

23%

24%

 

输入数据的特性决定如何挑选最合适的采样器。以SplitSampler为例,它只采样一个分片中的前n条记录。由于并未从所有分片中广泛采样,该采样器并不适合已经排好序的数据。

另一方面,IntervalSample以一定的间隔定期从分片中选择键,因此对于已排好序的数据来说是一个更好的选择。RandomSampler是优秀的通用采样器。如果没有采样器可以满足应用需求(记住,采样目的是创建大小近似相等的一系列分区),则只能写程序来实现Sampler接口。

InputSampler类和TotalOrderPartitioner类的一个好特性是用户可以自由定义分区数,该值通常取决于集群上reducer槽的数量(该值需稍小于reducer槽的总数,以应付可能出现的故障)。由于TotalOrderPartitioner只用于分区边界均不相同的时候,因而当键空间较小时,设置太大的分区数可能会导致数据冲突。

以下是运行方式:

% hadoop jar hadoop-examples.jar SortByTemperatureUsingTotalOrderPartitioner \

-D mapred.reduce.tasks=30 input/ncdc/all-seq output-totalsort

该程序输出30个已经内部排好序的分区。且分区i中的所有键都小于分区 i+1中的键。

2.4. 辅助排序

MapReduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为 它们来自不同的map任务且这些map任务在不同轮次中的完成时间各不相 同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值 的排序。

例如,考虑如何设计一个MapReduce程序以计算每年的最高气温。如果全部记录均按照气温降序排列,则无需遍历整个数据集即可获得査询结果一 一获取各年份的首条记录并忽略剩余记录。尽管该方法并不是最佳方案,但演示了辅助排序的工作机理。

为此,首先构建一个同时包含年份和气温信息的组合键,然后对键值先按年份升序排序,再按气温降序排列:

1900 35℃

1900 34℃

1900 34℃

1901 36℃

1901 35℃

 

如果仅仅是使用组合键的话,并没有太大的帮助,因为这会导致同一年的记录可能有不同的键,通常这种情况下记录并不会被送到同一个reducer中。例如,(1900, 35℃)和(1900, 34℃)就可能被送到不同的reducer中。通过设置一个按照键的年份进行分区的patitioner,可以确保同一年的记录会被发送到同一个reducer中。但是,这样做还不够。因为partitioner只保证每一个reducer接受一个年份的所有记录,而在一个分区之内,reducer仍是通过键进行分组的分区:

分区

1900 35?

1900 34℃

1900 34℃

1900 36℃

1900 35℃

 

该问题的最终解决方案是进行分组设置。如果reducer中的值按照键的年份进行分组,则一个reduce组将包括同一年份的所有记录。鉴于这些记录已经按气温降序排列,所以各组的首条记录就是这一年的最髙气温:

分区

1900 35℃

1900 34℃

1900 34℃

1900 36℃

1900 35℃

下面对记录按值排序的方法做一个总结:

•定义包括自然键和自然值的组合键

•根据组合键对记录进行排序,即同时用自然键和自然值进行排序

•针对组合键进行分区和分组时均只考虑自然键

2.4.1. Java代码

综合起来便得到范例8-9中的源代码,该程序再一次使用了纯文本输入。

范例8-9.该应用程序通过对键中的气温进行排序来找出最高气温

public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
    static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
        private NcdcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value,
            Context context) throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValidTemperature()) {
                context.write(new IntPair(parser.getYearInt(),
                    parser.getAirTemperature()), NullWritable.get());
            }
        }
    }
    static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPai〜NullWritable〉 {
        @Override
        protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write (key, NullWritable.get());
        }
    }
    public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
        @Override
        public int getPartition(IntPair key, NullWritable value, int numPartitions) {
            // multiply by 127 to perform some mixing
            return Math.abs(key.getFirst() * 127) % numPartitions;
        }
    }
    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(IntPair.class,true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
        }
    }
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(IntPair.class, true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            return IntPair.compare(ip1.getFirst(), ip2.getFirst());
        }
        @Override
        public int run(String[] args) throws Exception {
            Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (job == null) {
                return -1;
            }
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setPartitionerClass(FirstPartitioner.class);
            job.setSortComparatorClass(KeyComparator.class);
            job.setGroupingComparatorClass(GroupComparator.class);
            job.setReducerClass(MaxTemperatureReducer.class);
            job.setOutputKeyClass(IntPair.class);
            job.setOutputValueClass(NullWritable.class);
            return job.waitForCompletion(true) ? 0 : 1;
        }
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
            System.exit(exitCode);
        }
    }
}

 

在上述mapper中,我们利用IntPair类定义了一个代表年份和气温的组合键,该类实现了Writable接口。IntPair与TextPair类相似,后者可参见4.3.2节的相关讨论。由于可以根据各reducer的组合键获得最高气温,因此无需在值上附加其他信息,使用NullWritable即可。根据辅助排序,reducer输出的第一个键就是包含年份和最高气温信息的IntPair对象。IntPair的toString()方法返回一个以制表符分隔的字符串,因而该程序输出一组由制表符分隔的年份/气温对。

许多应用需要访问所有已排序的值,而非像上例一样只需要第一个值。鉴于在reducer中用户只能够获取第一个键,所以必须通过填充值字段来获取所有已排序的值,这样不可避免会在键和值之间产生—些冗余信息。

我们创建一个自定义的partitioner以按照组合键的首字段(年份)进行分区,即FirstPartitioner。为了按照年份(升序)和气温(降序)排列键,我们使用setSortComparatorClass()设置一个自定义键comparator(即KeyComparator),以抽取字段并执行比较操作。类似的,为了按年份对键进行分组,我们使用SetGroupingComparatorClass来自定义一个分组comparator,只取键的首字段进行比较。

运行该程序,返回各年的最髙气温:

% hadoop jar hadoop-examples.jar MaxTemperatureUsingSecondarySort input/ncdc/all \

> output-secondarysort

% hadoop fs -cat output-secondarysort/part-* | sort | head

1901 317

1902 244

1903 289

1904 256

1905 283

1906 294

1907 283

1908 289

1909 278

1910 294

 

2.4.2. Streaming

我们可以借助Hadoop所提供的一组库来实现Streaming的辅助排序,下面就是用来进行辅助排序的驱动:

hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D mapred.output.key.comparator.class= \
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1n -k2nr" \
-input input/ncdc/all \
-output output_secondarysort一streaming \
-mapper ch08/src/main/python/secondary_sort_map.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-reducer ch08/src/main/python/secondary_sort_reduce.py \
-file ch08/src/main/python/secondany_sort_map.py \
-file ch08/src/main/python/secondary_sort_reduce.py

 

范例 8-10中的map函数输出年份和气温两个字段。为了将这两个字段看成一个组合键,需要将 stream.num.map.output.key.fields的值设为2。这意味着值是空的,就像java程序(范例8-9)—样。

范例8-10.针对辅助排序的map函数(Python版本)

#! /usr/bin/env python
import re
import sys
for line in sys.stdin:
    val = line.strip()
    (year, temp, q) = (val[15:19], int(val[87:92]), val[92:93])
    if temp == 9999:
        sys.stderr.write("reporter:counter:Temperature,Missing,l\n")
    elif re.match("[01459]", q):
        print "%s\t%s" % (year, temp)

 

鉴于我们并不期望根据整个组合键来划分数据集,因此可以利用KeyFieldBasedPatitioner类以组合键的一部分进行划分。具体实现是使用mapred.text.key.partitioner.options 配置该 partitioner。在上例中,值-k1,1表示该partitioner只使用组合键的第一个字段。map.output.key.field. separator属性所定义的字符串能分隔各个字段(默认是制表符)。

接下来,我们还需要一个comparator以对年份字段升序排列、对气温字段降序排列,使reduce函数能够方便地返回各组中的第一个记录。Hadoop提 供的KeyFieldBasedComparator类能有效解决这个问题。该类通过mapred.text.key.comparator.options属性来设置排列次序,其格式规范与GNU sort类似。本例中的-k1n  -k2nr选项表示“首字段按数值顺序排序,字段按数值顺序反向排序”。与KeyFieldBasedPartitioner类似,KeyFieldBasedComparator也采用在 map.output.key.field.separator中定义的分隔符将一个组合键划分成多个字段。

Java版本的程序需要定义分组comparator。但是在Streaming中,组并未以任何方式划分,因此必须在reduce函数中不断地査看年份是否改变来检测组的边界(范例8-11)。

范例8-11.针对辅助排序的reducer函数(Python版本)

#!/usr/bin/env python
import sys
last_group = None
for line in sys.stdin:
    val = line.strip()
    (year, temp) = val.split("\t")
    group = year
    if last_group != group:
        print val
        last_group = group

运行此程序之后,得到与Java版本一样的结果。

最后牢记一点: KeyFieldBasedPartitioner 和 KeyFieldBasedComparator不仅在Streaming程序中使用,也能够在Java版本的MapReduce程序中使用。

3. 连接

MapReduce能够执行大型数据集间的“连接” (join)操作,但是,自己从头编写相关代码来执行连接的确非常棘手。除了编写MapReduce程序,还可以考虑采用更髙级的框架,如Pig、Hive或Cascading等,它们都将连接操作视为整个实现的核心部分。

先简要地描述待解决的问题。假设有两个数据集:气象站数据库和天气记录数据集,并考虑如何合二为一。一个典型的査询是:输出各气象站的历史信息,同时各行记录也包含气象站的元数据信息,如图8-2所示。

连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大(例如天气记录)而另外一个集合很小,以至于可以分发到集群中的每一个节点之中(例如气象站元数据),则可以执行一个MapReduce作业,将各个气象站的天气记录放到一块(例如,根据气象站ID执行部分排序),从而实现连接。mapper或reducer根据各气象站ID从较小的数据集合中找到气象站元数据,使元数据能够被写到各条记录之中。该方法将在8.4节中详细介绍,它侧重于将数据分发到tasktracker的机制。

blob.png 

8-2.两个数据集的内连接

连接操作如果由mapper执行,则称为“map端连接”;如果由reducer执行,则称为“reduce端连接”。

如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每个节点,我们仍然可以使用MapReduce来进行连接,至于到底采用map端连接还是reduce端连接,则取决于数据的组织方式。最常见的一个例子便是用户数据库和用户活动日志(例如访问日志)。对于一个热门服务来说,将用户数据库(或日志)分发到所有MapReduce节点中是行不通的。

3.1. map端连接

在两个大规模输入数据集之间的map端连接会在数据到达map函数之前就执行连接操作。为达到该目的,各map的输入数据必须先分区并且以特定方式排序。各个输入数据集被划分成相同数量的分区,并且均按相同的键 (连接键)排序。同一键的所有记录均会放在同一分区之中。听起来似乎要求非常严格(的确如此),但这的确合乎MapReduce作业的输出。

map端连接操作可以连接多个作业的输出,只要这些作业的reducer数量相同、键相同并且输出文件是不可切分的(例如,小于一个HDFS块,或gzip 压缩)。在天气例子中,如果气象站文件以气象站ID部分排序,记录文件也以气象站ID部分排序,而且reducer的数量相同,则就满足了执行map端连接的前提条件。

利用 org.apache.hadoop.mapreduce.join包中的CompositelnputFormat类来运行一个map端连接。CompositeInputFormat类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置,连接表达式的语法简单。详情与示例可参见包文档。

org.apache.hadoop.examples.Join是一个通用的执行map端连接的命令行程序样例。该例运行一个基于多个输入数据集的mapper和reducer的 MapReduce作业,以执行给定的连接操作。

3.2. reduce 端连接

由于reduce端连接并不要求输入数据集符合特定结构,因而reduce端连接比map端连接更为常用。但是,由于两个数据集均需经过MapReduce的 shuffle过程,所以reduce端连瘘的效率往往要低一些。基本思路是mapper 为各个记录标记源,并且使用连接键作为map输出键,使键相同的记录放 在同一reducer中。以下技术能帮助实现reduce端连接。

3.2.1. 多输入

数据集的输入源往往有多种格式,因此可以使用Multiplelnputs类(参见 7.2.4节)来方便地解析和标注各个源。

3.2.2. 辅助排序

如前所述,reducer将从两个源中选出键相同的记录且并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个源的数据传输 到reducer会非常重要。以天气数据连接为例,当天气记录发送到reducer 的时候,与这些记录有相同键的气象站信息最好也已经放在reducer,使得 reducer能够将气象站名称填到天气记录之中再马上输出。虽然也可以不指 定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这 种情况,因为其中任何一组的记录数量可能非常庞大,远远超出reducer的 可用内存容量。

8.2.4节介绍如何对reducer所看到的每个键的值进行排序,所以在此也用到了辅助排序技术。

我们使用第4章的TextPair类构建组合键,包括气象站ID和“标记”。 在这里,“标记”是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达。一种简单的做法就是:对于气象站记录,“标记”值为0;对于天气记录,“标记”值为1。范例8-12和范例8-13分别描述了执行该任务的两个mapper类。

范例8-12.在reduce端连接中,标记气象站记录的mapper

public class JoinStationMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
    private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
    @0verride
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (parser.parse(value)) {
            context.write(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName()));
        }
    }
}

范例8-13.在reduce端连接中标记天气记录的mapper

public class JoinRecordMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
    private NcdcRecordParser parser = new NcdcRecordParser();
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        parser.parse(value);
        context.write(new TextPair(parser.getStationId(), "1"), value);
    }
}

reducer知道自己会先接收气象站记录。因此从中抽取出值,并将其作为后续每条输出记录的一部分写到输出文件。如范例8-14所示。

范例8-14.用于连接已标记的气象站记录和天气记录的reducer

public class JoinReducer extends Reducer<TextPair, Text, Text, Text> {
    @Override
    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Iterator<Text> iter = values.iterator();
        Text stationName = new Text(iter.next());
        while (iter.hasNext()) {
            Text record = iter.next();
            Text outvalue = new Text(stationName.toString() + "\t" + record.toString());
            context.write(key.getFirst(), outvalue);
        }
    }
}

上述代码假设天气记录的每个气象站ID恰巧与气象站数据集中的一条记录准确匹配。如果该假设不成立,则需要泛化代码,使用另一个TextPair将标记放入值的对象中。reduce()方法在处理天气记录之前,要能够区分哪些记录是气象站名称,检测(和处理)缺失或重复的记录。

reducer的迭代部分中:对象被重复使用(为了提髙效率)。因此,从第一个Text对象获得点名称(即stationName)就非常关键。

Text stationName = new Text(iter.next());

如果不执行该语句,stationName就会指向上一条记录的值,这显然是错误的。

范例8-15显示了该作业的驱动类。在该类中,关键在于根据组合键的第一字段(即气象站ID)进行分区和分组,即使用一个自定义的partitioner(即KeyPartitioner)和一个自定义的分组 comparator(FirstComparator)作为 TextPair的嵌套类。

310-313

4. 边数据分布

4.1. 利用JobConf来配置作业

4.2. 分布式缓存

4.2.1. 用法

 

310-313

 

该程序通过气象站查找最高气温,因此mapper(StationTemperatureMapper)仅仅输出(气象站ID,气温)对。对于combiner,该程序重用MaxTemperatureReducer(参见第2章和第5章)来为map端的map输出分组获得最高气温。reducer(MaxTemperatureReducerWithStationLookup)则有所不同,它不仅要查找最髙气温,还需要根据缓存文件査找气象站名称。

该程序调用reducer的setup()方法来获取缓存文件;输入参数是文件的原始名称,文件的路径与任务的工作目录相同。

当文件无法整个放到内存中时,可以使用分布式缓存进行复制。MapFile采用在盘检索格式(参见4.5.2节),在这方面非常有用。由于MapFile是一组已定义的目录结构的文件,用户可以将这些文件整理成存档格式(JAR、ZIP、TAR或gzipped TAR),再用-archives选项将其加入缓存。

以下是输出的小片段,显示部分气象站的最髙气温值。

 

PEATS RIDGE WARATAH

372

STRATHALBVN RACECOU

410

SHEOAKS AWS

399

WANGARATTA AERO

409

MOOGARA

334

MACKAY AERO

331

 

4.2.2. 工作机制

当用户启动一个作业,Hadoop会把由-files、-archives和-libjars等选项所指定的文件复制到分布式文件系统(一般是HDFS)之中。接着,在任务运行之前,tasktracker将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为“本地化”了。从任务的角度来看,这些文件就已经在那儿了(它并不关心这些文件是否来自HDFS)。 此外,由-libjars指定的文件会在任务启动前添加到任务的类路径 (classpath)中。

tasktracker为缓存的文件各维护一个计数器来统计这些文件的被使用情况。当任务即将运行时,该任务所使用的所有文件的对应计数器值增1;当任务执行完毕之后,这些计数器值均减1。当相关计数器值为0时,表明该文件没有被任何任务使用,可以从缓存中移除。缓存的容量是有限的(默认10GB),因此需要经常删除无用的文件以腾出空间来装载新文件。缓存大小可以通过属性local.cache.size进行配置,以字节为单位。

尽管该机制并不确保在同一个tasktracker上运行的同一作业的后续任务肯定能在缓存中找到文件,但是成功的概率相当大。原因在于作业的多个任务在调度之后几乎同时开始运行,因此,不会有足够多的其他作业在运行而导致原始任务的文件从缓存中被删除。

文件存放在 tasktracker 的${mapred.local.dir}/taskTracker/archive目录下。但是应用程序不必知道这一点,因为这些文件同时以符号链接的方式指向任务的工作目录。

4.2.3. 分布式缓存API

由于可以通过GenericOptionsParser间接使用分布式缓存(如范例8-16所示),大多数应用不需要使用分布式缓存API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用API了。API包括两部分:将数据放到缓存中的方法(参见Job),以及从缓存中读取数据的方法(参见JobContext)。以下列举Job中可将数据放入到缓存中的相关方法:

public void addCacheFile(URI uri)

public void addCacheAnchive(URI uri)

public void setCacheFiles(URI[] files)

public void setCacheArchives(URI[] archives)

public void addFileToClassPath(Path file)

public void addArchiveToClassPath(Path archive)

public void createSymlink()

 

在缓存中可以存放两类对象:文件(files)和存档(achives)。文件被直接放置在任务节点上,而存档则会被解档之后再将具体文件放置在任务节点上。每种对象类型都包含三种方法:addCacheXXXX()、setCacheXXXXs()和addXXXXToClassPath()。其中,addCacheXXXX()方法将文件或存档添加到分布式缓存,setCacheXXXXs()方法将一次性向分布式缓存中添加一组文件或存档(之前调用所生成的集合将被替换),addXXXXToClassPath()方法将文件或存档添加到MapReduce任务的类路径。表8-7对上述API方法与GenericOptionsParser选项做了一个比较(参见表5-1)。

8-7.分布式缓存API

blob.png 

add()和set()方法中的输入参数URI是指在作业运行时位于共享文件系统中的(一组)文件。而GenericOptionsParser选项所指定的文件(例如,-files)可以是本地文件,如果是本地文件的话,则 会被复制到默认的共享文件系统(一般是HDFS)。

 

这也是使用Java API和使用GenericOptionsParser的关键区别:

Java API的add()和set()方法不会将指定文件复制到共享文件系统中,但GenericOptionsParser会这样做。

Job的分布式缓存API方法还包括createSymlink()。前面提到,当前作业的一些文件会被本地化到任务节点,该方法则为所有本地化文件创建符号化链接。符号化链接的名称是由该文件URI的片段标识符(fragment identifier)决定。例如,由 URI hdfs://namenode/foo/bar#myfile所对应的文件是在任务的工作目录中的myfile文件。(范例8-8使用了该API)如果没有片段标识符,则无法创建符号化链接。当使用GenericOptionsParser将文件添加到分布式缓存之中时,会自动为这些文件创建符号化链接。

当使用本地作业时,并不会为分布式缓存中的文件创建符号化链接。因此,若想同时在本地或集群上运行作业时,需调用getLocalCacheFilesO 或getLocalCacheArchives()方法(后面会进一步讨论)。

另一部分是在Jobcontext类中的分布式缓存API。当用户想要访问分布式 缓存中的文件时,需要在map或者reduce任务代码中使用这部分API。

public Path[] getLocalCacheFilesO throws IOException;

public Path[] getLocalCacheArchives() throws IOException;

public Path[] getFileClassPaths();

public Path[] getArchiveClassPaths();

如果在分布式缓存中的文件已经在任务的工作目录中有符号化链接了,则 用户可以通过名称直接访问本地文件,如范例8-16所示。此外,也可以使 用 getLocalCacheFilesO和 getLocalCacheArchives()方法获取缓存中 的文件或者存档的引用。当处理存档时,将会返回一个包含解档文件的目 录。(相应的,用户也可以通过 getFileClassPaths()和 getArchiveClassPaths() 方法获取被添加到任务的类路径下的文件和存档。)

注意,文件将以“本地的” Path对象的形式返回。为了读取文件,用户需 要首先使用getLocal()方法获得一个Hadoop本地FileSystem实例。

用户也可以使用java.io.File API进行操作,请参考以下针对MaxTemperatureReducerWithStationLookup 的新版 setup()方法的代码片段。

@Override
protected void setup(Context context)
    throws IOException, InterruptedException {
    metadata = new NcdcStationMetadata();
    Path[] localPaths = context.getLocalCacheFilesO;
    if (localPaths.length == 0) {
        throw new FileNotFoundException("Distributed cache file not found.");
    }
    File localFile = new File(localPaths[0].toString());
    metadata.initialize(localFile);
}

 

在使用旧版MapReduce API时,可以调用DistributedCache的静态方法,如下所示:

@Override
public void configure(JobConf conf) {
    metadata = new NcdcStationMetadata();
    try {
        Path[] localPaths = DistributedCache.getLocalCacheFiles(conf);
        if (localPaths.length == 0) {
            throw new FileNotFoundException("Distributed cache file not found.");
        }
        File localFile = new File(localPaths[0].toString());
        metadata.initialize(localFile);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

 

 

5. MapReduce 库类

Hadoop还为mapper和reducer提供了一个包含了常用函数的库。表8-8简要描述了这些类。如需了解详细用法,可参考相关Java文档。

8-8. MapReduce 库的类

 

 

类名称

描述

ChainMapper, ChainReducer

在一个mapper中运行多个mapper,再运行—个reducer,最后在一个reducer中运行多个mapper。符号表示:M+RM*,其中M是mapper,R是reducer。与运行多个MapReduce作业相比,该方案能够显著降低磁盘I/O开销

FieldSelectionMapReduce(旧版 API) FieldSelectionMapper 和 FieldSelectionReducer (新版 API)

能从输入键和值中选择字段(类似Unix的 cut命令),并输出键和值的mapper和 reducer

IntSumReducer, LongSumReducer

对各键的所有整数值执行求和操作的 reducer

InverseMapper

一个能交换键和值的mapper

MultithreadedMapRunner (旧版API)

MultithreadedMapper (新版API)

一个能在多个独立线程中分别并发运行

mapper的mapper(或者旧版API中的map runner)。该技术对于非CPU受限的mapper

比较有用

TokenCounterMapper

将输入值分解成独立的单词(使用Java的StringTokenizer)并输出每个单词和计数值1的mapper

RegexMapper

检査输入值是否匹配某正则表达式,输出

匹配字符串和计数1的mapper

 

 

转载请注明:全栈大数据 » 第8章 MapReduce的特性

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址