整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

8.2.2. 部分排序

hadoop 小红牛 7℃ 0评论

7.1.1节所述,在默认情况下,MapReduce根据输入记录的键对数据集排序。范例8-4则是一个变种,它利用IntWritable键对顺序文件排序。

范例8-4.程序调用默认HashPartitioner按IntWritable键排序顺序文件

public class SortByTemperatureUsingHashPartitioner extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(SequenceFilelnputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);
        SequenceFileOutputFormat?setOutputCompressionType(job,
        CompressionType.BLOCK);
        return job.waitForCompletion(true) ? 0 : 1;
    }
 public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(), args);
        System.exit(exitCode);
    }
}

2.2.1. 控制排列顺序

键的排列顺序是由RawComparator控制的,规则如下。

(1)若属性 mapred.output.key.comparator.class已经被显式设置,或者通过Job类的setSortComparatorClass()方法进行设置,则使用该类的实例(旧版API使用JobConf类的 setOutputKeyComparatorClass()方法。)

(2)否则,键必须是WritableComparable的子类,并使用针对该键类的已登记comparator。

(3)如果还没有已登记的comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操作。

上述规则彰显了为自定义Writable类登记RawComparators优化版本的重要性,详情可参见4.3.4节介绍的如何为速度实现一个RawComparator。 同时,通过定制comparator来重新定义排序顺序也很直观,详情可参见 8.2.4节对辅助排序的讨论。

 

假设采用30个reducer来运行该程序:

% hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitionep \

-D mapred.reduce.tasks=30 input/ncdc/all-seq output-hashsort

 

该指令产生30个已排序的输出文件。但是如何将这些小文件合并成一个有序的文件却并非易事。例如,直接将纯文本文件连接起来无法保证全局有序。幸运的是,许多应用并不强求待处理的文件全局有序。例如,对于查找操作来说,部分排序的文件就已经足够了。

2.2.2. 应用:基于分区的MapFile查找技术

以按键执行查找操作为例,在多文件情况下效率更高。在范例8-5中,如果输出格式被改为MapFileOutputFormat,则输出30个map文件,可以 基于这些文件执行査找操作。®

范例8-5.该MapReduce程序对一个顺序文件排序并输出MapFile

public class SortByTemperatureToMapFile extends Configured implements Tool
{
    @Override
    public int run(String[] args) throws Exception {
        Job job = 3obBuilder.parselnputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(SequenceFilelnputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputFormatClass(MapFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(job, true);
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(job,
        CompressionType.BLOCK);
        return job.waitForCompletion(true) ? 0 : 1;
    }
public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SortByTemperatureToMapFile(), args);
        System.exit(exitCode);
    }
}

MapFileOutputFormat提供了两个便捷的静态方法,用于对MapReduce输出文件执行查找操作,其用法如范例8-6所示。

范例8-6.从MapFiles集合中获取符合指定键的第一项记录

public class LookupRecordByTemperature extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            JobBuilder.printUsage(this, "<path> <key>");
            return -1;
        }
        Path path = new Path(args[0]);
        IntWritable key = new IntWritable(Integer.parseInt(args[1]));
         
        Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
        Partitioner<IntWritable, Text> partitioner =
new HashPartitioner<IntWritable, Text>();
        Text val = new Text();
        Writable entry = MapFileOutputFormat.getEntry(readers, partitioner, key, val);
        if (entry == null) {
            System.err.printIn("Key not found: " + key);
            return -1;
        }
        NcdcRecordParser parser = new NcdcRecordParser();
        parser.parse(val.toString());
        System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear());
        return 0;
    }
 public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new LookupRecordByTemperature(), args);
        System.exit(exitCode);
    }
}

getReaders()方法为MapReduce作业创建的每个输出文件分别打开一个MapFile.Reader实例,用一个Reader数组(即readers)表示。之后,getEntry()方法使用Partitioner找到包含指定键的Reader实例,再通过该Reader实例的get()方法得到这个键对应的值(val)。如果getEntry()返回null,则表明没有找到匹配的键。否则,返回一个描述对应气象站ID和年份的值。

举例来说,若要查找首条气温为10℃的记录(注意,气温字段是整数类型,其值是真实气温的10倍。例如,10℃被记为-100),则有:

% hadoop jar hadoop-examples.jar LookupRecordByTemperature output-hashmapsort -100

357460-99999 1956

我们还可以直接用readers数组来获得包含指定键的所有记录。readers数组按分区排序,因而针对一个指定键的reader,均可以通过MapReduce作业中的同一个partitioner获得:

Reader reader = readers[partitioner.getPartition(key, val, readers.length)];

一旦找到reader,可通过MapFile的get()方法获取第一条包含指定键的记录。接着,循环调用next()获取下一条记录,直到键改变为止。相关程序如范例8-7所示。

范例8-7.从一个MapFiles集合中获取包含指定键的所有记录

public class LookupRecordsByTemperature extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            JobBuilder.printUsage(this, "<path> <key>");
            return -1;
        }
        Path path = new Path(args[0]);
        IntWritable key = new IntWritable(Integer.parselnt(args[1]));
        Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
        Partitioner<IntWritable, Text> partitioner =
        new HashPartitioner<IntWritable, Text>();
        Text val = new Text();
        Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
        Writable entry = reader.get(key, val);
        if (entry == null) {
            System.err.printf("Key not found:" + key);
            return -1;
        }
        NcdcRecordParser parser = new NcdcRecordParser();
        IntWritable nextKey = new IntWritable();
        do {
            parser.parse(val.toString());
            System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear());
        } while(reader.next(nextKey, val) && key.equals(nextKey)); return 0;
    }
public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new LookupRecordsByTemperature(), args);
        System.exit(exitCode);
    }
}

下例描述如何获取所有-10℃的读数,并计数:

% hadoop jar hadoop-examples.jar LookupRecordsByTemperature output-hashmapsort -100 \

2> /dev/null | wc -1

1489272



转载请注明:全栈大数据 » 8.2.2. 部分排序

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址