整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第七章 MapReduce的类型与格式

MapReduce的类型与格式

MapReduce数据处理模型非常简单:mapreduce函数的输入和输出是键/值对。本章深入讨论MapReduce模型,重点介绍各种类型的数据(从简单文本到结构化的二进制对象)如何在MapReduce中使用。

1. MapReduce 的类型

HadoopMapReduce中,mapreduce函数遵循如下常规格式:

map(K1, V1) list(K2, V2)

reduce(K2, list(V2))  list(K3, V3)

一般来说,map函数输入的键/值类型(K1V1)不同于输出类型(K2V2)。 虽然reduee函数的输入类型必须与map函数的输出类型相同,但reduce函数的输出类型(K3V3)可以不同于输入类型。例如以下Java接口代码:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends MapContext<KEYIN> VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
        // ...
    }
}
public class Reducer<KEYINJ VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    protected void reduce(KEYIN key, Iterable<VALUEIN> values,
        Context context) throws IOException,InterruptedException {
        // ...
    }
}

Context类对象用于输出键/值对,因此它们通过输出类型参数化,这样 write()方法的说明如下:

Public void write(KEYOUT key, VALUEOUT value)
throws IOException, InterruptedException

由于MapperReducer是单独的类,因此类型参数可能会不同,所以MapperKEYIN的实际类型参数可能与Reducer中同名的KEYIN类型参数不一致。例如,在前面章节的求最高温度例子中,MapperKEYINLongWritable 类型,而 Reducer 中为 Text 类型。

类似的,即使map输出类型与reduce的输入类型必须匹配,但这在Java编 译器中并不是强制要求的。

类型参数(type parameter)命名可以不同于抽象类型的定义(KEYIN对应于K1),但它们的格式是相同的。

如果使用combine函数,它与reduce函数的形式相同(它是Reducer的一个实现),不同之处是它的输出类型是中间的键/值对类型(K2V2),这些中 间值可以输入reduce函数:

map: (K1, V1) list(K2, V2)

combine: (K2, list(V2)) list(K2, V2)

reduce: (K2, list(V2)) list(K3, V3)

combine函数与reduce函数通常是一样的,在这种情况下,K3K2类型相同,V3V2类型相同。

partition函数对中间结果的键/值对(K2V2)进行处理,并且返回一个分区索引(partition index)。实际上,分区由键单独决定(值被忽略)

partition(K2, V2)  integer

或用Java的方式:

public abstrack class Partitioner<KEY,VALUE> {
   public abstract int gerPartition(KEY key, VALUE value, int numPartitions);
}

在老版本的API中,MapReduce的用法非常类似,类型参数的实际命名 也为K1V1等。在新旧版本API中类型上的约束也是完全一样的。

public abstrack class Partitioner<KEY,VALUE> {
    public abstract int gerPartition(KEY key, VALUE value, int numPartitions);
}
public interface Mapper<Kl, V1, K2, V2> extends JobConfigurable, Closeable {
    void map(K1 key, V1 value, 0utputCollector<K2, V2> output. Reporter reporter) throws IOException;
}
public interface Reducer<K2, V2, K3, V3> extends 3obConfigurable, Closeable {
    void reduce(K2 key, Iterator<V2> values,0utputCollector<K3, V3> output, Reporter reporter) throws IOException;
}
public interface Partitioner<K2, V2> extends 3obConfigurable {
    int getPartition(K2 key, V2 value, int numPartitions);
}

这些理论对配置MapReduce作业有帮助吗?表7-1总结了新版本API的配置选项(7-2为旧版本API的),把属性分为可以设置类型的属性和必须与类型相容的属性。

输入数据的类型由输入格式进行设置。例如,对应于TextlnputFormat的键类型是LongWritable,值类型是Text。其他的类型通过调用Job类的方法来进行显式设置(旧版本API中使用JobConf类的方法)。如果没有显式设置,中间的类型默认为(最终的)输出类型,也就是默认值LongWritableText。因此,如果K2K3是相同类型,就不需要调用setMapOutputKeyClass(),因为它将调用setOutputKeyClass()来设置同样,如果V2V3相同,只需要使用setOutputValueClass()

这些为中间和最终输出类型进行设置的方法似乎有些奇怪。为什么不能结合mapperreducer导出类型?原来,Java的泛型机制有很多限制:类型擦除(type erasure)导致运行过程中类型信息并非一直可见,所以Hadoop不得不进行明确设定。这也意味着可能会在MapReduce配置的作用中遇到不兼容的类型,因为这些配置在编译时无法检查。与MapReduce类型兼容的设置列在表7-1中。类型冲突是在作业执行过程中被检测出来的,所以 一个比较明智的做法是先用少量的数据跑一次测试任务,发现并修正任何类型不兼容的问题。

7-1 新的MapReduce API中的设置类型

属性

属性设置方法

输入类型

中间类型

输出类型

可以设置类型的属性

K1

V1

K2

V2

K3

V3

mapreduce.job.inputformat.class

setInputFormatClass()

*

mapreduce.map.output.key.class

setMapOutputKeyClass()

*

mapreduce.map.output.value.class

setMapOutputValueClass()

*

mapreduce.job.output.key.class

setOutputKeyClass()

*

mapreduce.job.output.value.class

setOutputValueClass()

*

类型必须一致的属性

mapreduce.job.map.class

setMapperClass()

*

*

*

*

mapreduce.job.combine.class

setCombinerClass()

*

*

mapreduce.job.partitioner.class

setPartitionerClass()

*

*

mapreduce.job.output.key.comparator.class

setGroupingComparatorClass()

*

mapreduce.job.output.group.comparator.class

setGroupingComparatorClass()

*

mapreduce.job.reduce.class

setReducerClass()

*

*

*

*

mapreduce.job.outputformat.class

setOutputFormatClass()

*

*

 

7-2旧版本MapReduce API的设置类型

属性

属性设置方法

输入类型

中间类型

输出类型

可以设置类型的属性

K1

V1

K2

V2

K3

V3

mapreduce.input.format.class

setInputFormat()

*

mapreduce.mapoutput.key.class

setMapOutputKeyClass()

*

mapreduce.mapoutput.value.class

setMapOutputValueClass()

*

mapreduce.output.key.class

setOutputKeyClass()

*

mapreduce.output.value.class

setOutputValueClass()

*

类型必须一致的属性

mapred.mapper.class

setMapperClass()

*

*

*

*

mapred.map.runner.class

setMapRunnerClass()

*

*

*

*

mapred.combine.class

setCombinerClass()

*

*

mapred.partitioner.class

setPartitionerClass()

*

*

mapred.output.key.comparator.class

setOutputKeyComparatorClass()

*

mapreduce.output.value.groupfn.class

setOutputValueGroupingComparatorClass()

*

mapreduce.reduce.class

setReducerClass()

*

*

*

*

mapreduce.output.format.class

setOutputFormat()

*

*

 

1.1. 默认的MapReduce作业

如果不指定mapperreducer就运行MapReduce,会发生什么情况?我们运行一个最简单的MapReduce程序来看看:

public class MinimalMapReduce extends Configured implements Tool { 
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System•err);
            return -1;
        }
        Job job = new Job(getConf());
        job.setJarByClass(getClass());
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[l]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduce(), args);
        System.exit(exitCode);
    }
}

我们唯一设置的是输入路径和输出路径。在气象数据的子集上运行以下 命令:

% hadoop MinimalMapReduce "input/ncdc/all/190{l,2}.gz" output

输出目录中得到命名为part-r-00000的输出文件。这个文件的前几行如下(为了适应页面进行了截断处理)

00029029070999991901010106004+64333+023450FM-

12+000599999V0202701N01591…

00035029070999991902010106004+64333+023450FM-

12+000599999V0201401N01181…

135-0029029070999991901010113004+64333+023450FM-

12+000599999V0202901N00821…

1410035029070999991902010113004+64333+023450FM-

12+000599999V0201401N01181

2700029029070999991901010120004+64333+02B450FM-

12+000599999V0209991C00001…

2820035029070999991902010120004+64333+023450FM-

12+000599999V0201401N01391…

每一行以整数开始,接着是制表符(Tab),然后是一段原始气象数据记录。虽然这并不是一个有用的程序,但理解它如何产生输出确实能够洞悉Hadoop是如何使用默认设置运行MapReduce作业的。范例7-1的示例与前面MinimalMapReduce完成的事情一模一样,但是它显式地把作业环境设置为默认值。

范例7-1.简化的MapReduce驱动程序,默认值显式设置

public class MinimalMapReduceWithDefaults extends Configured implements Tool {
    @Override
    public int run(String[] args) throws IOException {
        Job job = JobBuilder.parseInputAnOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormat(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(l);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormat(TextOutputFormat.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
        System.exit(exitCode);
    }
}

通过把打印使用说明的逻辑抽取出来并把输人/输出路径设定放到一个帮助方法中,实现对run()方法的前几行进行了简化。几乎所有MapReduce驱动程序都有两个参数(输入与输出),所以此处进行这样的代码约简是可行的。以下是JobBuilder类中的相关方法,供大家参考:

public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {
    if (args.length != 2) {
        printUsage(tool, "<input> <output>");
        return null;
    }
    Job job = new Job(conf);
    job.setJarByClass(tool.getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job;
}
public static void printUsage(Tool tool, String extraArgsUsage) {
    System.err.printf("Usage: %s [genericOptions] %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage);
    GenericOptionsParser.printGenericCommandUsage(System.err);
}

回到范例7-1中的MinimalMapReducewithDefaults类,虽然有很多其他的默认作业设置,但加粗显示的部分是执行一个作业最关键的代码。接下来我们逐一讨论。

默认的输入格式是TextInputFormat,它产生的键类型是LongWritable(文件中每行中开始的偏移量值),值类型是Text(文本行)。这也解释了最后输出的整数的含义:行偏移量。

默认的mapperMapper类,它将输入的键和值原封不动地写到输出中:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

Mapper是一个泛型类型(generic type),它可以接受任何键或值的类型。在 这个例子中,map的输入输出键是LongWritable类型,map的输入输出值是Text类型。

默认的partitionerHashPartitioner,它对每条记录的键进行哈希操作以决定该记录应该属于哪个分区。每个分区对应一个reducer任务,所以分区数等于作业的reducer个数:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value,int numPartitions) {
        return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

键的哈希码被转换为一个非负整数,它由哈希值与最大的整型值做一次按位与操作而获得,然后用分区数进行取模操作,来决定该记录属于哪个分区索引。

默认情况下,只有一个reducer,因此,也就只有一个分区,在这种情况下,由于所有数据都放入同一个分区,partitioner操作将变得无关紧要了。然而,如果有很多reducer,了解HashPartitioner的作用就非常重要。假设基于键的散列函数足够好,那么记录将被均匀分到若干个reduce任务中,这样,具有相同键的记录将由同一个reduce任务进行处理。

你可能已经注意到我们并没有设置map任务的数量。原因是该数量等于输入文件被划分成的分块数,这取决于输入文件的大小以及文件块的大小(如果此文件在HDFS中)。关于控制块大小的操作

 

选择reducer的个数

Hadoop新手而言,单个reducer的默认配置很容易上手。真实的应用中,作业都把它设置成一个较大的数字,否则由于所有的中间数据都会放到一个reducer任务中,作业处理极其低效。注意,在本地作业运行器上运行时,只支持0个或1reducer

reducer最优个数与集群中可用的reducer任务槽数相关。总槽数由集群中节点数与每个节点的任务槽数相乘得到。每个节点的任务槽数由mapred.tasktracker.reduce.tasks.maximum 属性的值决定

一个常用的方法是设置的reducer数比总槽数稍微少一些,给reducer任务留点儿余地(容忍一些错误发生而不需要延长作业运行时间).如果 reduce任务很大,比较明智的做法是使用更多reducer,使任务粒度更小,从而使任务的失败不至于显著影响作业执行时间。

默认的reducerReducer类型,它也是一个泛型类型,它简单地将所有的输入写到输出中:

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context Context context) throws IOException, InterruptedException {
        for (VALUEIN value: values) {
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }
}

对于这个任务来说,输出的键是LongWritable类型,而值是Text类型。 事实上,对于这个MapReduce程序来说,所有键都是LongWritable类 型,所有值都是Text类型,因为它们是输入键/值,并且map函数和 reduce函数是恒等函数。然而,大多数MapReduce程序不会一直用相同的 键或值类型,所以就像上一节所描述的那样,必须配置作业来声明使用的类型。

记录在发送给reducer之前,会被MapReduce系统进行排序。在这个例子中,键是按照数值的大小进行排序的,因此来自输入文件中的行会被交叉放入一个合并后的输出文件。

默认的输出格式是TextOutputFormat,它将键和值转换成字符串并用制表符分隔开,然后一条记录一行地进行输出。这就是为什么输出文件是用制表符(Tab)分隔的,这是TextOutputFormat的特点。

 

1.2. 默认的Streaming作业

Streaming方式下,默认的作业与Java方式是相似的,但也有差别。最简单的形式如下:

% hadoop jar $HADOOP__INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper /bin/cat

注意,必须提供一个mapper:默认的mapper不能工作。因为默认输入格式TextlnputFormat产生的是LongWritable类型的键和Text类型的值, 而Streaming的输出键和值(包括map的键和值)都是Text类型。indentity mapper无法将LongWritable类型的键转换为Text类型的键,因而导致无法使用。

如果我们开发一个非Javamapper,输入的格式是TextlnputFormat, 那么Streaming会做一些特殊的处理。它并不会把键传递给mapper,而是 只传递值。对于其他输入类型,将stream.map.input.ignoreKey设亶为 true也可以达到相同的效果。这样做事实上是非常有用的,因为键只是文 件中的行偏移量,而值是行中的数据,这才是几乎所有应用都关心的内 容。这个作业的效果就是对输入的值进行排序。

将更多的默认设置写出来,那么命令行看起来如下所示(注意,Streaming使 用的是老版本MapReduce API类):

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-mapper /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.HashPartitioner \
-numBeduceTasks 1 \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-outputformat org.apache.hadoop.mapped.TextOutputFormat

mapper的参数和reducer的参数可以是一条命令或一个Java类。可以使 用-combiner参数按需设置combiner

Streaming中的键和值

Streaming应用可以决定分隔符的使用,该分隔符用于通过标准输入把键/值对转换为一串比特值发送到mapreduce函数。默认情况下是Tab(制表符),但是如果键或值中本身含有Tab分隔符,能将分隔符修改成其他符号 是很有用的。

类似地,当mapreduce输出结果键/值对时,也需要一个可配置的分隔符来进行分隔。更进一步,来自输出的键可以由多个字段进行组合:它可以由一条记录的前n个字段组成(stream.num.map.output.key.fields stream.nutn.reduce.output.key.Fields 进行定义),剩下的字段就是值。例如,一个Streaming处理的输出是“abc”(分隔符是逗号)n设为2,则键解释为“ab”,而值是“c”。

mapperreducer的分隔符是单独配置的。这些属性可参见表7-3,数据流 图如图7-1所示。

7-3. Streaming的分隔符属性

属性名称

类型

默认值

描述

stream.map.input.field.separator

String

\t

此分隔符用于将输入键/值字符串作为字节流传递到流map

stream.map.output.field.separator

String

\t

此分隔符用于把流map处理的输出分割成map输出需要的键/值字符串

stream.num.map.output.key.fields

int

1

stream.map.output.field.separator分隔的字段数,这些字段作为map输出键

stream.reduce.input.field.separator

String

\t

此分隔符用于将输入键/值字符串作为字节流传递到流reduce

stream.reduce.output.field.separator

String

\t

此分隔符用于将来自流reduce处理的输出分割成reduce最终输出需要的键/值字符串

stream.num.reduce.output.key.fields

int

1

 stream.reduce.output.field.separator 分隔的字段数量,这些字段作为 reduce输出键

 

blob.png 

7-1 Streaming MapReduce作业中使用分隔符的位置

这些属性与输入和输出的格式无关。例如,如果stream.reduce. output.field.separator被设置成冒号,reduce Streaming过程就把a : b行写入标准输出,那么Streamingreducer就会知道a作为键,b作为值。如果使用标准的TextOutputFormat那么这条记录就用Tab将键和值分隔开并写到输出文件。可以设置mapred.textoutputformat.Separator来修改TextOutputFormat的分隔符。

Streaming的配置参数列表可以参阅 Hadoop 网站:http://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Specifying_Configuration_Variables_with_the_-D_Option

 

2. 输入格式

从一般的文本文件到数据库,Hadoop可以处理很多不同类型的数据格式。本节将探讨数据格式问题。

2.1. 输入分片与记录

2章中讲过,一个输入分片(split)就是一个由单个map操作来处理的输入块。每一个map操作只处理一个输入分片。每个分片被划分为若干个记录,每条记录就是一个键/值对,map —个接一个地处理记录。输入分片和 记录都是逻辑概念,不必将它们对应到文f,尽管其常见形式都是文件。 在数据库的场景中,一个输入分片可以对应*于一个表上的若干行,而一条记 录对应到一行pBInputFormat正是这么做的,这种输入格式用于从关系数据库 读取数据h 输入分片在hva中被表示为InpiitSplit接口(和本章提到的所有类一样,它也在 org.apache.hadoop.mapred 包中)。

public abstract class InputSplit {
    public abstract long getLength() throws IOException, InterruptedException;
    public abstract String[] getLocations() throws IOException InterruptedException;
}

InputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名)。注意,分片并不包含数据本身,而是指向数据的引用(reference)。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而分片大小用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间(这也是贪婪近似算法的一个实例)

MapReduce应用开发人员不必直接处理InputSplit,因为它是由InputFormat创建的。InputFormat负责产生输入分片并将它们分割成记录。在我们探讨InputFormat的具体例子之前,先简单看一下它在 MapReduce中的用法。接口如下:

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
    public abstract RecordReader<K, V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;
}

运行作业的客户端通过调用getSplits()计算分片,然后将它们发送到jobtrackerjobtracker使用其存储位置信息来调度map任务从而在tasktracker上处理这些分片数据。在tasktracker上,map任务把输入分片传给InputFormatgetRecordReader()方法来获得这个分片的 RecordReaderRecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键/值对,然后再传递给map函数。查看Mapperrun()方法可以看到这些情况:

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
        map (context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
}

运行setup()之后,再重复调用Context上的nextKeyValue()委托给RecordRader的同名函数实现来为mapper产生keyvalue对象。通过Context, key value RecordReader 出重新取出传递给map()。当 reader 读到 stream 的结尾时,nextKeyValue()方法返回 false, map 任 务运行其cleanUp()方法,然后结束。

这段代码没有显示,但由于效率的原因,RecordReader程序每次调用gerCurrentKey()getCurreritValue()时将返回相同的键/值对象。只是这些对象的内容被readernextKeyValue()方法改变。用户对此可能有些不解。在map()函数之外有对键/值的引用时,这可能引起问题,因为它的值会在没有警告的情况下被改变。如果确实需要这样的引用,那么请保存你想保留的对象的一个副本,例如,对于Text对象,可以使用它的复制构造函数:new Text(value)

这样的情况在reducer中也会发生。reducer迭代器中的值对象被反复使用,所以,在调用迭代器之间,一定要复制任何需要保留的任 何对象(参见范例8-14)

最后注意,Mapperrun()方法是公共的,可以由用户定制。MultithreadedMapRunner是另一个MapRunnable接口的实现,它可以使用可配置个数的线程来并发运行多个mapper(mapreduce.mapper.multithreadedmapper.threads设置)。对于大多数数据处理任务来,默认的执行机制没有优势。但是,对于因为需要连接外部服务器而造成单个记录处理时间比较长的mapper来说,它允许多个mapper在同一个JVM下以尽量避免竞争的方式执行。参见16.3.2节,其中谈到的Fetcher正在运行的多线程类MapRunner)便使用了Multithreaded的版本(使用老版本的API)

2.1.1. FilelnputFormat

FilelnputFormat是所有使用文件作为其数据源的InputFormat实现的基类(参见图7-2)。它提供两个功能:一个用于指出作业的输入文件位置,一个是输入文件生成分片的实现代码段。把分片分割成记录的作业由其子类来完成。

blob.png 

2.1.2. FilelnputFormat类的输入路径

作业的输入被设定为一组路径,这对指定作业输入提供了很强的灵活性。 FilelnputFormat提供四种静态方法来设定Job的输入路径:

public static void addInputPath(Job job, Path path)
public static void addInputPaths(Dob job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Dob job, String commaSeparatedPaths)

其中,addInputPath()addInputPaths()方法可以将一个或多个路径加入路径列表。可以分别调用这两种方法来建立路径列表。setInputPaths()方法一次设定完整的路径列表(替换前面调用中在Job上所设置的所有路径)

一条路径可以表示一个文件、一个目录或是一个glob,即一个文件和目录的集合。路径是目录的话,表示要包含这个目录下所有的文件,这些文件都作为作业的输入。关于glob的使用,3.5.5节在讲到“文件模式”时有详 细讨论。

—个被指定为输入路径的目录,其内容不会被递归处理。事实上,这些目录只包含文件:如果包含子目录,也会被解释为文件(从而产生错误)。处理这个问题的方法是:使用一个文件glob或一个过滤器根据命名模式(name pattern)限定选择目录中的文件。另一种方法是,将 mapred.input.dir.recursive 设置为true从而强制对输入目录进行递归地读取。

add方法和set方法允许指定包含的文件。如果需要排除特定文件,可以使用FilelnputFormatsetlnputPathFilter()方法设置一个过滤器:

public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)

过滤器的详细讨论参见3.5.5节中的“PathFilter”小节说细讨论了过滤器。

即使不设置过滤器,FilelnputFormat也会使用一个默认的过滤器来排除隐藏文件(名称中以._”开头的文件)。如果通过调用setlnputPathFilter()设置了过滤器,它会在默认过滤器的基础上进行过滤。换句话说,自定义的过滤器只能看到非隐藏文件。

路径和过滤器也可以通过配置属性来设置(参见表7-4),这对StreamingPipes应用很方便。StreamingPipes接口都使用-input选项来设置路径,所以通常不需要直接进行手动设置。

7-4.输入路径和过滤器属性

属性名称

类型

默认值

描述

mapred.input.dir

逗号分隔的路径

作业的输入文件。包含逗号的路径中的逗号由\”符号转义。例如:glob {a,b}变成了{a\, b}

mapred.input.pathFilter.class

PathFilter 类名

应用于作业输人文件的过滤器

2.1.3. FilelnputFormat类的输入分片

假设有一组文件,FilelnputFormat如何把它们转换为输入分片呢? FilelnputFormat只分割大文件。这里的“大”指的是文件超过HDFS块的大小。分片通常与HDFS块大小一样,这在大多应用中是合理的然而,这个值也可以通过设置不同的Hadoop属性来改变,如表7-5所示。

7-5.控制分片大小的属性

属性名称

类型

默认值

描述

mapred.min.split.size

int

1

一个文件分片最小的效字节数

mapred.max.split.size

long

Long.MAX_VALUE,9223372036854775807

一个文件分片中最大的 有效字节数(以字节算)

dfs.block.size

long

64 MB,67108864

HDFS中块的大小(按字节)

mapred.max.split.size这个属性在老版本的MapReduce API中没有出现(除了CombineFilelnputFormat)。然而,这个值是被间接计算的。计算方法是作业总的输入大小除以map任务数,该值由 mapred.map.tasks(JobConf 上的 SetNumMapTasks()方法)设置。因为 mapred.map.tasks默认值是1,所以,分片的最大值就是输入的大小

最小的分片大小通常是1个字节,不过某些格式可以使分片大小有一个更低的下界。(例如,顺序文件在流中每次插入一个同步入口,所以,最小的分片大小不得不足够大以确保每个分片有一个同步点,以便reader根据记录边界进行重新同步。

应用程序可以强制设置一个最小的输入分片大小:通过设置一个比HDFS 块更大一些的值,强制分片比文件块大。如果数据存储在HDFS上,那么这样做是没有好处的,因为这样做会对map任务来说不是本地文件的文件块数。

最大的分片大小默认是由Java long类型表示的最大值。只有把它的值被设置成小于块大小才有效果,这将强制分片比块小。

分片的大小由以下公式计算(参见FilelnputFormatcomputeSplitSize() 方法):

max(minimumSize, min (maximumSize, blockSize))

在默认情况下:

minimumSize < blockSize < maximumSize

所以分片的大小就是blocksize。这些参数的不同设置及其如何影响最终 分片大小请参见表7-6的说明。

7-6.举例说明如何控制分片的大小

最小分片大小

最大分片大小

块的大小

分片大小

说明

1(默认值

Long.MAX_VALUE(默认值)

64MB(默认值)

64MB

默认情况下,分片大小与块大小相同

1(默认值

Long.MAX_VALUE(默认值)

128MB

128MB

增加分片大小最自然的方法是提供更大的HDFS块,通过dfs. block.size或在构建文件时针对单个文件进行设置

128MB

Long.MAX_VALUE(默认值)

64MB(默认值)

128MB

通过使最小分片大小的值大于块大小的方法来增大分片大小,但代价是增加了本地操作

1(默认值

32MB

64 MB

0 (默认值)

32MB

通过使最大分片大小的值大于块大小的方法来减少分片大小

 

2.1.4. 小文件与CombineFileInputFormat

相对于大批量的小文件,Hadoop更合适处理少量的大文件。个原因是FilelnputFormat生成的分块是一个文件或该文件的一部分。如果文件很小(“小”意味着比HDFS的块要小很多),并且文件数量很多,那么每次map任务只处理很少的输入数据,(一个文件)就会有很多map任务,每次map操作都会造成额外的开销。请比较一下把1GB的文件分割成1664 MB块与100 KB10000个块。10000个文件每个都需要使用一个map操作,作业时间比一个文件上的16map操作慢几十甚至几百倍。

CombineFilelnputFormat可以缓解这个问题,它是针对小文件而设计的。FilelnputFormat为每个文件产生 1 个分片,而 CombineFilelnputFormat 把多个文件打包到一个分片中以便每个mapper可以处理更多的数据。关键是,决定哪些块放入同一个分片时,CombineFilelnputFormat会考虑到节点和机架的因素,所以在典型MapReduce作业中处理输入的速度并不会下降。

当然,如果可能的话应该尽量避免许多小文件的情况,因为MapReduce处理数据的最佳速度最好与数据在集群中的传输速度相同,而处理小文件将增加运行作业而必需的寻址次数。还有,在HDFS集群中存储大量的小文件会浪费namenode的内存。一个可以减少大量小文件的方法是使用SequenceFile将这些小文件合并成一个或多个大文件:可以将文件名作为键(如果不需要键,可以用NullWritable等常量代替),文件的内容作为值。参见范例7-4。但如果HDFS中已经有大批小文件, CombineFilelnputFormat 方法值得一试。

CombineFilelnputFormat不仅可以很好地处理小文件,在处理大文件的时候也有好处。本质上,CombineFilelnputFormat使map 操作中处理的数据量与HDFS中文件的块大小之间的耦合度降低了。

如果mapper可以在几秒钟之内处理每个数据块,就可以把CombineFilelnputFormat的最大分片大小设成块数的较小的整数倍(通过mapred.max.split.size属性设置,以字节为单位),使毎个map可以处理多个块。这样,整个处理时间减少了,因为相对来说,少量mapper的运行,减少了运行大量短时mapper所涉及的任 务管理和启动开销。

由于CombineFilelnputFormat是一个抽象类,没有提供实体类(不同于FilelnputFormat),所以使用的时候需要一些额外的工作(希望日后会有一些通用的实现添加入库)。例如,如果要使CombineFilelnputFormatTextlnputFormat相同,需要创建一个 CombineFilelnputFormat的具体子类,并且实现getRecordReader()方法。

 

2.1.5. 避免切分

有些应用程序可能不希望文件被切分,而是用一个mapper完整处理每一个输入文件。例如,检査一个文件中所有记录是否有序,一个简单的方法是顺序扫描毎一条记录并且比较后一条记录是否比前一条要小。如果将它实现为一个map任务,那么只有一个map操作整个文件时,这个算法才可行。

有两种方法可以保证输入文件不被切分。第一种(最简单但不怎么漂亮)方法就是增加最小分片大小,将它设置成大于要处理的最大文件大小。把它设置为最大值Long.MAX_VALUE即可。第二种方法就是使用 FilelnputFormat具体子类,并且重载isSplitable()方法把返回值设 置为false。例如,以下就是一个不可分割的TexInputFormat:

import org.apache.hadoop.fs.path;
import org.apache.hadoop.mapreduce.DobContenxt;
import org.apache.hadoop.mapreduce.lib.input.TextInpusFormat;
public class NonSplittableTextInputFormat extends TextlnputFormat {
    @override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
}

2.1.6. mapper中的文件信息

处理文件输入分片的mapper可以从作业配置对象的某些特定属性中读取输入分片的有关信息,这可以通过调用在MapperContext对象上的getInputSplit()方法来实现。当输入的格式源自于FilelnputFormat时,该方法返回的InputSplit可以被强制转换为一个FileSplit,以此来访问表7-7列出的文件信息。

在老版本的MapReduce APIStreamingPipes中,同一个文件分片的信息可通过从mapper配置的可读属性获取。(在老版本的MapReduce API 中,可以通过在Mappe类中编写configure()方法来获取]obConf对象来实现。)

除了表7-7中的属性,所有mapperreduce可访问的属性都在6.5.1节 “任务执行环境”中列出。

7-7.文件输入分片的属性

FileSplit方法

属性名称

类型

说明

getPath()

map.input.file

Path/String

正在处理的输入女件的路径

getStart()

map.input.start

long

分片开始处的字节偏移量

getLength()

map.input.length

long

分片的长度(按字节)

 

2.1.7. 把整个文件作为一条记录处理

有时,mapper需要访问一个文件中的全部内容。即使不分割文件,仍然需要一个RecordReader来读取文件内容作为record的值。范例7-2WholeFilelnputFormat展示了实现的方法。

范例7-2.把整个文件作为一条记录的InputFormat

public class WholeFilelnputFormat
    extends FileinputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(
    InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader•initialize(split, context);
        return reader;
    }
}

WholeFilelnputFormat中,没有使用键,此处表示为NullWritable,值是文件内容,表示成BytesWritable实例。它定义了两个方法:一个是将isSplitable()方法重载返回false值,来指定输入文件不被分片;另一个是实现了createRecordReader()方法来返回一个定制的RecordReader实现,如范例7-3所示。

范例7-3. WholeFilelnputFormat使用RecordReader将整个文件读为一条记录

class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDatalnputStream in = null;
            try {
                in = fs.open(file);
                IOUtiIs.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length);
            } finally {
                IOUtils•closestream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }
    @Override
    public void close() throws IOException {
        //do nothing
    }
}

WholeFileRecordReader负责将FileSplit转换成一条记录,该记录的键是null ,值是这个文件的内容。因为只有一条记录,WholeFileRecordReader要么处理这条记录,要么不处理,所以它维护一个名称为processed的布尔变量来表示记录是否被处理过。如果当nextKeyValue()方法被调用时,文件没有被处理过,就打开文件,产生一个长度是文件长度的字节数组,并用HadoopIOUtils类把文件的内容放入字节数组。然后再被传递到next()方法的BytesWritable实例上设置数组,返回值为true则表示成功读取记录

其他一些方法都是一些直接的用来生成正确的键和值类型、获取reader位置和状态的方法,还有一个close()方法,该方法由MapReduce框架在reader完成后调用。

现在演示如何使用WholeFilelnputFormat。假设有一个将若干个小文件打包成顺序文件的MapReduce作业,键是原来的文件名,值是文件的内容。如范例7-4所示。

范例7-4.将若干个小文件打包成顺序文件的MapReduce程序

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
    
    static class SequenceFileMapper
    extends Mapper<NullWritable, BytesWritable^ Text, BytesWritable> {
        private Text filenameKey;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            filenameKey = new Text(path.toString());
        }
        @Override
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(filenameKey, value);
        }
    }
    @Override
    public int run(String[] args) throws IOException {
        Job job = JobBuilder.parselnputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(SequenceFileMapper.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverterO, args);
        System.exit(exitCode);
    }
}

由于输入格式是wholeFilelnputFormat,所以mapper只需要找到文件输入分片的文件名。通过将InputSplitcontext强制转换为FileSplit来实现这点,后者包含一个方法可以获取文件路径。reducer的类型是相同的(没有明确设置),输出格式是SequenceFileOutputFormat

以下是在一些小文件上运行样例。此处使用了两个reducer,所以生成两个 输出顺序文件:

% hadoop jar job. jar SmallFilesToSequenceFileConverter \
-conf conf/hadoop-localhost. xml -D mapred.reduce.tasks=2 input/smallfiles output

 

由此产生两部分文件,每一个对应一个顺序文件,可以通过文件系统shell-text选项来进行检査:

% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00000
hdfs://localhost/user/tom/input/smallfiles/a 61 61 61 61 61 61 61 61 61 61
hdfs://localhost/user/tom/input/smallfiles/c 63 63 63 63 63 63 63 63 63 63
hdfs://localhost/user/tom/input/smallfiles/e
% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00001
hdfs://localhost/user/tom/input/smallfiles/b 62 62 62 62 62 62 62 62 62 62
hdfs://localhost/user/tom/input/smallfiles/d 64 64 64 64 64 64 64 64 64 64
hdfs://localhost/user/tom/input/smallfiles/f  66 66 66 66 66 66 66 66 66 66

输入文件的文件名分别是abcdef,每个文件分别包含10个相应字母(比如,a文件中包含10个“a”字母)e文件例外,它的内容为空。我们可以看到这些顺序文件的文本表示,文件名后跟着文件的十六进制的表示。

至少有一种方法可以改进我们的程序。前面提到,一个mapper处理一个文件的方法是低效的,所以较好的方法是继承CombineFilelnputFormat而不是FilelnputFormat

2.2. 文本输入

Hadoop非常擅长处理非结构化文本数据。本节讨论Hadoop提供的用于处理文本的不同InputFormat类。

2.2.1. TextlnputFormat

TextlnputFormat是默认的InputFormat。每条记录是一行输入。键是 LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符),它被打包成一个Text对象。所以,包含如下文本的文件被切分为包含4条记录的一个分片:

On the top of the Crumpetty Tree

The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

每条记录表示为以下键/值对:

(0, On the top of the Crumpetty Tree)

(33, The Quangle Wangle sat,)

(57, But his face you could not see,)

(89, On account of his Beaver Hat.)

很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。每个分片单独处理。行号实际上是一个顺序的标记,即每次读取一行的时候需要对行号进行计数。因此,在分片内知道行号是可能的,但在文件中是不可能的。

然而,每一行在文件中的偏移量是可以在分片内单独确定的,而不需要知道分片的信息,因为每个分片都知道上一个分片的大小,只需要加到分片内的偏移量上,就可以获得每行在整个文件中的偏移量了。通常,对于每行需要唯一标识的应用来说,有偏移量就足够了。如果再加上文件名,那么它在整个文件系统内就是唯一的。当然,如果每一行都是定长的,那么这个偏移量除以每一行的长度即可算出行号。

输入分片与HDFS块之间的关系

FilelnputFormat定义的逻辑记录有时并不能很好地匹配HDFS的文件块。例如,TextlnputFormat的逻辑记录是以行为单位的,那么很有可能某一行会跨文件块存放。虽然这对程序的功能没有什么影响,如行不会丢失或出错,但这种现象应该引起注意,因为这意味着那些“本地的”map(map运行在输入数据所在的主机上)会执行一些远程的读操作。由此而来的额外开销一般不是特别明显。 图7-3展示了一个例子。一个文件被分成几行,行的边界与HDFS块的边界没有对齐。分片的边界与逻辑记录的边界对齐,这里是行边界,所 以第一个分片包含前5行,即使第5行跨了第一块和第二块。第二个分片从第6行开始。

blob.png 

7-3. TextlnputFormat的逻辑记录和HDFS

 

2.2.2. 关于 KeyValueTextlnputFormat

TextlnputFormat的键,即每一行在文件中的字节偏移量,通常并不是特别有用。通常情况下,文件中每一行是一个键/值对,使用某个分界符进行分隔,比如制表符。例如以下由Hadoop默认OutputFormat(TextOutputFormat)产生的输出。如果要正确处理这类文件, KeyValueTextlnputFormat 比较合适。

可以通过mapreduce.input. keyvaluelinerecordreader.key.value.separator属性(或者老版本API中的key.value.separator.in.input.line属性)来指定分隔符。它的默认值是一个制表符。以下是一个范例,其中表示一个(水平方向的)制表符:

line1On the top of the Crumpetty Tree

line2The Quangle Wangle sat,

Iine3But his face you could not see,

line4On account of his Beaver Hat.

TextlnputFormat类似,输入是一个包含4条记录的分片,不过此时的键是每行排在制表符之前的Text序列:

(line1, On the top of the Crumpetty Tree)

(line2, The Quangle Wangle sat,)

(line3, But his face you could not see,)

(line4, On account of his Beaver Hat.)

2.2.3. 关于NLinelnputFormat

通过TextlnputFormat KeyValueTextlnputFormat每个mapper收到的输入行数不同。行数取决于输入分片的大小和行的长度。如果希望mapper收到固定行数的输入,需要将NLinelnputFormat作为InputFormat。与TextlnputFormat—样,键是文件中行的字节偏移量,值是行本身。

N是每个mapper收到的输入行数。N设置为1(默认值)时,每个mapper正好收到一行输入。mapreduce.input.lineinputformat.linespermap属性(在老版本API中的mapred.line.input.format.linespermap属性)实现N的设定。仍然以刚才的4行输入为例:

On the top of the Crumpetty Tree

The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

例如,如果N2,则每个输入分片包含两行。一个mapper收到前两行键/值对:

(0, On the top of the Crumpetty Tree)

(33, The Quangle Wangle sat,)

另一个mapper则收到后两行:

(57, But his face you could not see,)

(89, On account of his Beaver Hat.)

键和值与TextlnputFormat生成的一样。不同的是输入分片的构造方法。

通常来说,对少量输入行执行map任务是比较低效的(任务初始化的额外开销造成的),但有些应用程序会对少量数据做一些扩展的(也就是CPU密集型的)计算任务,然后产生输出。仿真是一个不错的例子。通过生成一个指定输入参数的输入文件,每行一个参数,便可以执行一个参数扫描分析 (parameter sweep):并发运行一组仿真试验,看模型是如何随参数不同而变化的。

在一些长时间运行的仿真实验中,可能会出现任务超时的情况。一个任务在10分钟内没有报告状态,tasktracker就会认为任务失败,进场止进程(£ 6.2.1节娜细讨论)

这个问题最佳解决方案是定期报告状态,如写一段状态信息,或增加计数器的值。详情可以参见6.1节的补充材料“MapReduce中进度的组成”。

另一个例子是用Hadoop引导从多个数据源(如数据库)加载数据。创建一个“种子”输入文件,记录所有的数据源,一行一个数据源。然后每个mapper分到一个数据源,并从这些数据源中加载数据到HDFS中。这个作业不需要reduce阶段,所以reducer的数量应该被设成0(通过调用JobsetNumReduceTasks()来设置)。MapReduce作业就可以处理加载到HDFS中的数据。

2.2.4. 关于XML

大多数XML解析器会处理整个XML文档,所以如果一个大型XML文档由多个输入分片组成,那么单独解析每个分片就相当有挑战。当然,可以在一个mapper(如果这个文件不是很大),可以使用7.2.1节介绍的方法来处理整个XML文档。

由很多“记录”(此处是XML文档片断)组成的XML文档,可以使用简单的字符串匹配或正则表达式匹配的方法来查找记录的开始标签和结束标签,而得到很多记录。这可以解决由MapReduce框架进行分割的问题,因为一条记录的下一个开始标签可以通过简单地从分片开始处进行扫描轻松找到,就像TextlnputFormat确定新行的边界一样。

Hadoop提供了StreamXmlRecordReader(org.apache.hadoop.streaming包中,还可以在Streaming之外使用)。通过把输入格式设置为StreamlnputFormat,stream.recordreader.class 属性设置为org.apache.Hadoop.Streaming.StreamXmlRecordReader来使用StreamXmlRecordReader类。reader的配置方法是通过作业配置属性来设置reader开始标签和结束标签(详情参见该类的帮助文档)

例如,维基百科用XML格式来提供大量数据内容,非常适合用MapReduce来并行处理。数据包含在一个大型的XML打包文档中,文档中有一些元素,例如包含每页内容和相关元数据的page元素。使用streamXmlRecordReader后,这些page元素便可解释为一系列的记录,交由一个mapper来处理。

2.3. 二进制输入

HadoopMapReduce不只是可以处理文本信息,它还可以处理二进制格式 的数据。

2.3.1. 关于SequenceFilelnputFormat

Hadoop的顺序文件格式存储二进制的键/值对的序列。由于它们是可分割的(它们有同步点,所以reader可以从文件中的任意一点与记录边界进行同 步,例如分片的起点),所以它们很符合MapReduce数据的格式要求,并且 它们还支持压缩,可以使用一些序列化技术来储任意类型。详情参见 4.5.1 节。

如果要用顺序文件数据作为MapReduce的输入,应用SequenceFilelnputFormat。键和值是由顺序文件决定,所以只需要保证 map输入的类型匹配。例如,如果输入文件中键的格式是IntWritable,值是Text,那么就像第4章生成的那样,mapper的格式应该是Mapper<IntWritableText, K, V>,其中KV是这个mapper输出的键和值的类型。

虽然从名称上看不出来,但SequenceFilelnputFormat可以读MapFileSequenceFile。如果在处理顺序文件时遇到目录,SequenceFilelnputFormat类会认为自己正在读MapFile,使用的是其数据文件。因此,如果没有MapFileInputFormat类,也是可以理解的。

2.3.2. 关于sequenceFileAsTextlnputFormat

sequenceFileAsTextlnputFormat sequenceFilelnputFormat的变体,它将顺序文件的键和值转换为Text对象。这个转换通过在键和值上调用toString()方法实现。这个格式使顺序文件作为Streaming的合适的输入类型。

2.3.3. 关于SequenceFileAsBinarylnputFormat

SequenceFileAsBinaryInputFormatSequenceFileInputFormat的一种变体,它获取顺序文件的键和值作为二进制对象。它们被封装为BytesWritable对象,因而应用程序可以任意地解释这些字节数组。结合使用SequenceFile.ReaderappendRaw()方法或SequenceFileAsBinary OutputFormat,它提供了在MapReduce中可以使用任意二进制数据类型的方法(作为顺序文件打包),然而,插入Hadoop序列化机制通常更简洁(详情 参见4.3.4)

2.4. 多个输入

虽然一个MapReduce作业的输入可能包含多个输入文件(由文件glob、过滤器和路径组成),但所有文件都由同一个InputFormat和同一个Mapper来解释。然而,数据格式往往会随时间演变,所以必须写自己的mapper来处理应用中的遗留数据格式问题。或者,有些数据源会提供相同的数据,但是格式不同。对不同的数据集进行连接(join,也称“联接”)操作时,便会产生这样的问题。详情参见8.2.2节。例如,有些数据可能是使用制表符分隔的文本文件,另一些可能是二进制的顺序文件。即使它们格式相同, 它们的表示也可能不同,因此需要分别进行解析。

这些问题可以用Multiplelnputs类来妥善处理,它允许为每条输入路径指定InputFormatMapper。例如,我们想把英国Met Office的气象数据和NCDC的气象数据放在一起来分析最高气温,则可以按照下面的方式来设置输入路径:

Multiplelnputs.addInputPath(job,ncdcInputPath, TextInputFormat.class,MaxTemperatureMapper.class);

MultipieInputs.addInputPath(job,metofficelnputPath, TextInputFormat.class,MetofficeMaxTemperatureMapper.class);

这段代码取代了对 FilelnputFormat.addlnputPath()job.setMapperClass()的常规调用。Met OfficeNCDC的数据都是文本文件,所以对两者都使用TextlnputFormat数据类型。但这两个数据源的行格式不同,所以我们使用了两个不一样的mapperMaxTemperatureMapper读取NCDC的输入数据并抽取年份和气温字段的值。MetOfficeMaxTemperatureMapper'读取 Met Office的输入数据,抽取年份和气温字段的值。重要的是两个mapper的输出类型一样,因此,reducer看到的是聚集后的map输出,并不知道这些输入是由不同的mapper产生的。

Multiplelnputs类有一个重载版本的addInputPath()方法,它没有mapper参数:

public static void addInputPath(Job job,Path path,class<? extends InputFormat> inputFormatClass)

如果有多种输入格式而只有一个mapper(通过]obsetMapperClass()方法设定),这种方法很有用。

2.5. 数据库输入(和输出)

DBInputFormat这种输入格式用于使用JDBC从关系数据库中读取数据。因为它没有任何共享能力,所以在访问数据库的时候必须非常小心,在数据库中运行太多的mapper读数据可能会使数据库受不了。正是由于这个原因,DBInputFormat最好用于加载小量的数据集,如果需要与来自HDFS 的大数据集连接,要使用Multiplelnputs。与之相对应的输出格式是 DBOutputFormat,它适用于将作业输出数据(中等规模的数据)转储到数 据库。®

在关系数据库和HDFS之间移动数据的另一个方法是:使用Sqoop,具体描述见第15章。

HBaseTablelnputFormat的设计初衷是让MapReduce程序操作存放在HBase表中的数据。而TableOutputFormat则是把MapReduce的输出写到 HBase 表。

3. 输出格式

针对前一节介绍的输入格式,Hadoop都有相应的输出格式。 OutputFormat类的层次结构如图7-4所示。

blob.png 

7-4. OutputFormat类的层次结构

 

3.1. 文本输出

默认的输出格式是TextOutputFormat它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。每个键/值对由制表符进行分隔,当然也可以设定 mapreduce.output.textoutputformat.separator属性(老版本API中的mapred.textoutputformat.separator)改变默认的分隔符。与TextOutputFormat对应的输入格式是KeyValueTextlnputFormat,它通过可配置的分隔符将键/值对文本行分隔(详情参见7.2.2)

可以使用NullWritable来省略输出的键或值(或两者都省略,相当于NullOutputFormat输出格式,后者什么也不输出)。这也会导致无分隔符输出,以使输出适合用TextlnputFormat读取。

3.2. 二进制输出

3.2.1. 关于SequenceFileOutputFormat

正如名字所示,SequenceFileOutputFormat将它的输出写为一个顺序文件。如果输出需要作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。压缩由SequenceFileOutputFormat的静态方法来实现,详情参见4.2.3节。8.2节用一个例子展示了如何使用SequenceFileOutputFormat

3.2.2. 关于SequenceFileAsBinaryOuputFormat

SequenceFileAsBinaryOutputFormat SequenceFileAsBinanylnputFormat相对应,它把键/值对作为二进制格式写到一个SequenceFile容器中。

3.2.3. 关于MapFileOutputFormat

MapFileOutputFormatMapFile作为输出。MapFile中的键必须顺序添加,所以必须确保reducer输出的键已经排好序。

reduce输入的键一定是有序的,但输出的键由reduce函数控制,MapReduce框架中没有硬性规定reduce输出键必须是有序的。所以要使用MapFileOutputFormat,就需要额外的限制来保证reduce输出的键是有序的。

3.3. 多个输出

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer—个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能需要对输出的文件名进行控制或让每个reducer输出多个文件。Mapreduce为此提供了MultipleOutputFormat类。

3.3.1. 范例:数据分割

考虑这样一个需求:按气象站来区分气象数据。这需要运行一个作业,作业的输出是每个气象站一个文件,此文件包含该气象站的所有数据记录。

一种方法是每个气象站对应一个reducer。为此,我们必须做两件事。第一,写一个partitioner.把同一个气象站的数据放到同一个分区。第二,把作业的reducer数设为气象站的个数。partitioner如下:

public class StationPartitioner extend Partitioner<LongWritable, Text> {

    private NcdcRecordParser parser = new NcdcRecordParser();

    @Override

    public int getPartition(LongWritable key, Text value^ int numPartitions) {

        parser.parse(value);

        return getPartition(parser.getStationId());

    }

    private int getPartition(String stationld) {

        …

    }

}

这里没有给出getPartition(String)方法的实现,它将气象站ID转换成分区索引号。为此,它的输入是一个列出所有气象站ID的列表,然后返回 列表中气象站ID的索引。

这样做有两个缺点。第一,需要在作业运行之前知道分区数和气象站的个数。虽然NCDC数据集提供了气象站的元数据,但无法保证数据中的气象站ID与元数据匹配。如果元数锯中有某个气象站但数据中却没有该气象站 的数据,就会浪费一个reducer任务槽。更糟糕的是,数据中有但元数据中却没有的气象站,也没有对应的reducer任务槽,只好将这个气象站扔掉。解决这个问题的方法是写一个作业来抽取唯一的气象站ID,但很遗憾,这需要额外的作业来实现。

在老版本的MapReduce API中,有两个类用于产生多个输出:MultipleOutputFormat MultipleOutputs。简单地说,虽然MultipleOutputs更具有特色,但MultipleOutputs在输出目录结构和文件命名上有更多的控制。新版本API中的MultipleOutputs结合了老版本API中两种多个输出类的特点。本书网站上的代码包.含了本节例子的老版本API等价样例,该样例使用了MultipleOutputsMuItipleOutputFormat

第二个缺点更微妙。一般来说,让应用程序来严格限定分区数并不好,.因为可能导致分区数少或分区不均。让很多reducer做少量工作不是一个高效的作业组织方法,比较好的办法是使用更少reducer做更多的事情,因为运行任务的额外开销减少了。分区不均的情况也是很难避免的。不同气象站的数据量差异很大:有些气象站是一年前刚投入使用的,而另一些气象站可能已经工作近一个世纪了。如果其中一些reduce任务运行时间远远超过另一些,作业执行时间将由它们决定,从而导致作业的运行时间超出预期。

在以下两种特殊情况下,让应用程序来设定分区数(等价于reducer的个数)是有好处的:

0reducer这是一个很罕见的情况:没有分区,因为应用只需要执行map任务

1reducer可以很方便地运行若干小作业,从而把以前作业的 输出合并成单个文件。前提是数据量足够小,以便一个reducer能轻松处理

 

最好让集群为作业决定分区数:集群的reducer任务槽越多,作业完成就越快。这就是默认的HashPartitioner表现如此出色的原因,因为它处理的分区数不限,并且确保每个分区都有一个很好的键组合使分区更均匀

如果使用HashPartitioner,每个分区就会包含多个气象站,因此,要实 现每个气象站输出一个文件,必须安排每个reducer写多个文件,由此就有了MultipleOutput

 

 

3.3.2. 关于 MultipleOutput

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的 mapper)创建多个文件。采用name-m-nnnnn形式的文件名用于map输出, name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指明块号的整数(0开始)。块号保证从不同块 (mapperreducer)写的输出在相同名字情况下不会冲突。

范例7-5显示了如何使用MultipleOutputs按照气象站划分数据。

 

范例7-5.MultipleOutput类将整个数据集分区到以气象站ID命名的文件

public class PartitionByStationUsingMultipleOutputs extends Configured implements Tool {
    static class StationMapper extends Mapper<LongWritableJ Text, Text, Text> {
        private NcdcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            parser.parse(value);
            context.write(new Text(parser.getStationId()), value);
        }
    }
    static class MultipleOutputsReducer
    extends Reducer<Text, Text, NullWritable, Text> {
        private MultipleOutputs<NullWritable, Text> multipleOutputs;
        
        @Override
        protected void setup(Context context)
        throws IOException InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
        }
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
            for (Text value : values) {
                multipleOutputs.write(NullWritable.get(),value,key.toString());
            }
        }
        @Override
        protected void cleanup(Context context)
        throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setMapperClass(StationMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setReducerClass(MultipleOutputsReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args);
        System.exit(exitCode);
    }
}

在生成输出的reducer中,在setup()方法中构造一个MultipleOutputs的实例并将它赋给一个实例变量。在reduce()方法中使用MultipleOutputs实例来写输出,而不是contextwrite()方法作用于键、值和名字。这里使用气象站标识符作为名字,因此最后产生的输出名字的形式为 station_identifier_r-nnnnn

运行一次后,前面几个输出文件的命名如下:

/output/010010-99999-r-00027

/output/010050-99999-r-00013

/output/010100-99999-r-00015

/output/010280-99999-r-00014

/output/010550-99999-r-00000

/output/010980-99999-r-00011

/output/011060-99999-r-00025

/output/012030-99999-r-00029

/output/012350-99999-r-00018

/output/012620-99999-r-00004

MultipleOutputswrite()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录是有可能的。例如,下面的改动将数据根据气象站和年份进行划分,这样每年的数据就被包含到一个名为气象站ID的目录中(例如029070-99999/1901/part-r-00000):

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
    for (Text value : values) {
        parser.parse(value);
        String basePath = String.format("%s/%s/part", parser.getStationId(), parser.getYear());
        multipleOutputs.write(NullWritable.get(), value, basePath);
    }
}

MultipleOutput传递给mapperOutputFormat该例子中为TextOutputFormat但可能有更复杂的情况。例如,可以创建命名的输出,每个都有自己的OutputForamt、键和值的类型(这可以与mapperreducer的输出类型不相同)。此外,mapperreducer可以为每条被处理的记录写多个输出文件。可査阅Java帮助文档获取更多的信息。

3.4. 延迟输出

FileOutputFormat的子类会产生输出文件(part-r-nnnnn)即使文件是空的。有些应用倾向于不创建空文件,此时LazyOutputFormat就有用武之地了。它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关的输出格式作为参数来调用 setOutputFormatClass()方法即可。

Streaming Pipes 支持-LazyOutput 选项来启用 LazyOutputFormat功能。

3.5. 数据库输出

写到关系数据库和HBase的输出格式可以参见7.2.5节。

转载请注明:全栈大数据 » 第七章 MapReduce的类型与格式

喜欢 (2)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址