整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

8.2.4. 辅助排序

hadoop 小红牛 54℃ 0评论

MapReduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为
它们来自不同的map任务且这些map任务在不同轮次中的完成时间各不相
同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值
的排序。

例如,考虑如何设计一个MapReduce程序以计算每年的最高气温。如果全部记录均按照气温降序排列,则无需遍历整个数据集即可获得査询结果一 一获取各年份的首条记录并忽略剩余记录。尽管该方法并不是最佳方案,但演示了辅助排序的工作机理。

为此,首先构建一个同时包含年份和气温信息的组合键,然后对键值先按年份升序排序,再按气温降序排列:

1900 35℃

1900 34℃

1900 34℃

1901 36℃

1901 35℃

如果仅仅是使用组合键的话,并没有太大的帮助,因为这会导致同一年的记录可能有不同的键,通常这种情况下记录并不会被送到同一个reducer中。例如,(1900,
35℃)和(1900,
34℃)就可能被送到不同的reducer中。通过设置一个按照键的年份进行分区的patitioner,可以确保同一年的记录会被发送到同一个reducer中。但是,这样做还不够。因为partitioner只保证每一个reducer接受一个年份的所有记录,而在一个分区之内,reducer仍是通过键进行分组的分区:

分区

1900 35?

1900 34℃

1900 34℃

1900 36℃

1900 35℃

 

该问题的最终解决方案是进行分组设置。如果reducer中的值按照键的年份进行分组,则一个reduce组将包括同一年份的所有记录。鉴于这些记录已经按气温降序排列,所以各组的首条记录就是这一年的最髙气温:

分区

1900 35℃

1900 34℃

1900 34℃

1900 36℃

1900 35℃

下面对记录按值排序的方法做一个总结:

•定义包括自然键和自然值的组合键

•根据组合键对记录进行排序,即同时用自然键和自然值进行排序

•针对组合键进行分区和分组时均只考虑自然键

1. Java代码

综合起来便得到范例8-9中的源代码,该程序再一次使用了纯文本输入。

范例8-9.该应用程序通过对键中的气温进行排序来找出最高气温

public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
    static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
        private NcdcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value,
            Context context) throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValidTemperature()) {
                context.write(new IntPair(parser.getYearInt(),
                    parser.getAirTemperature()), NullWritable.get());
            }
        }
    }
static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPai〜NullWritable〉 {
        @Override
        protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write (key, NullWritable.get());
        }
    }
    public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
        @Override
        public int getPartition(IntPair key, NullWritable value, int numPartitions) {
            // multiply by 127 to perform some mixing
            return Math.abs(key.getFirst() * 127) % numPartitions;
        }
    }
    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(IntPair.class,true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
        }
    }
public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(IntPair.class, true);
        }
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            return IntPair.compare(ip1.getFirst(), ip2.getFirst());
        }
        @Override
        public int run(String[] args) throws Exception {
            Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
            if (job == null) {
                return -1;
            }
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setPartitionerClass(FirstPartitioner.class);
            job.setSortComparatorClass(KeyComparator.class);
            job.setGroupingComparatorClass(GroupComparator.class);
            job.setReducerClass(MaxTemperatureReducer.class);
            job.setOutputKeyClass(IntPair.class);
            job.setOutputValueClass(NullWritable.class);
            return job.waitForCompletion(true) ? 0 : 1;
        }
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
            System.exit(exitCode);
        }
    }
}

在上述mapper中,我们利用IntPair类定义了一个代表年份和气温的组合键,该类实现了Writable接口。IntPair与TextPair类相似,后者可参见4.3.2节的相关讨论。由于可以根据各reducer的组合键获得最高气温,因此无需在值上附加其他信息,使用NullWritable即可。根据辅助排序,reducer输出的第一个键就是包含年份和最高气温信息的IntPair对象。IntPair的toString()方法返回一个以制表符分隔的字符串,因而该程序输出一组由制表符分隔的年份/气温对。

许多应用需要访问所有已排序的值,而非像上例一样只需要第一个值。鉴于在reducer中用户只能够获取第一个键,所以必须通过填充值字段来获取所有已排序的值,这样不可避免会在键和值之间产生—些冗余信息。

我们创建一个自定义的partitioner以按照组合键的首字段(年份)进行分区,即FirstPartitioner。为了按照年份(升序)和气温(降序)排列键,我们使用setSortComparatorClass()设置一个自定义键comparator(即KeyComparator),以抽取字段并执行比较操作。类似的,为了按年份对键进行分组,我们使用SetGroupingComparatorClass来自定义一个分组comparator,只取键的首字段进行比较。

运行该程序,返回各年的最髙气温:

% hadoop jar hadoop-examples.jar MaxTemperatureUsingSecondarySort input/ncdc/all \

> output-secondarysort

% hadoop fs -cat output-secondarysort/part-* | sort | head

1901 317

1902 244

1903 289

1904 256

1905 283

1906 294

1907 283

1908 289

1909 278

1910 294

2.4.2. Streaming

我们可以借助Hadoop所提供的一组库来实现Streaming的辅助排序,下面就是用来进行辅助排序的驱动:

hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D mapred.output.key.comparator.class= \
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1n -k2nr" \
-input input/ncdc/all \
-output output_secondarysort一streaming \
-mapper ch08/src/main/python/secondary_sort_map.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-reducer ch08/src/main/python/secondary_sort_reduce.py \
-file ch08/src/main/python/secondany_sort_map.py \
-file ch08/src/main/python/secondary_sort_reduce.py

范例 8-10中的map函数输出年份和气温两个字段。为了将这两个字段看成一个组合键,需要将 stream.num.map.output.key.fields的值设为2。这意味着值是空的,就像java程序(范例8-9)—样。

范例8-10.针对辅助排序的map函数(Python版本)

#! /usr/bin/env python
import re
import sys
for line in sys.stdin:
    val = line.strip()
    (year, temp, q) = (val[15:19], int(val[87:92]), val[92:93])
    if temp == 9999:
 sys.stderr.write("reporter:counter:Temperature,Missing,l\n")
    elif re.match("[01459]", q):
        print "%s\t%s" % (year, temp)

鉴于我们并不期望根据整个组合键来划分数据集,因此可以利用KeyFieldBasedPatitioner类以组合键的一部分进行划分。具体实现是使用mapred.text.key.partitioner.options 配置该 partitioner。在上例中,值-k1,1表示该partitioner只使用组合键的第一个字段。map.output.key.field. separator属性所定义的字符串能分隔各个字段(默认是制表符)。

接下来,我们还需要一个comparator以对年份字段升序排列、对气温字段降序排列,使reduce函数能够方便地返回各组中的第一个记录。Hadoop提

供的KeyFieldBasedComparator类能有效解决这个问题。该类通过mapred.text.key.comparator.options属性来设置排列次序,其格式规范与GNU sort类似。本例中的-k1n  -k2nr选项表示“首字段按数值顺序排序,字段按数值顺序反向排序”。与KeyFieldBasedPartitioner类似,KeyFieldBasedComparator也采用在 map.output.key.field.separator中定义的分隔符将一个组合键划分成多个字段。

Java版本的程序需要定义分组comparator。但是在Streaming中,组并未以任何方式划分,因此必须在reduce函数中不断地査看年份是否改变来检测组的边界(范例8-11)。

范例8-11.针对辅助排序的reducer函数(Python版本)

#!/usr/bin/env python
import sys
last_group = None
for line in sys.stdin:
    val = line.strip()
    (year, temp) = val.split("\t")
    group = year
if
 
last_group !
=
 
group:
        print val
        last_group = group

运行此程序之后,得到与Java版本一样的结果。

最后牢记一点: KeyFieldBasedPartitioner 和 KeyFieldBasedComparator不仅在Streaming程序中使用,也能够在Java版本的MapReduce程序中使用。



转载请注明:全栈大数据 » 8.2.4. 辅助排序

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址