整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

4.4.8使用Avro MapReduce进行排序

hadoop 花牛 7℃ 0评论

在本节中我们利用Avro的排序能力,并结合使用MapReduce写一个程序来 对Avro数据文件进行排序(范例4-13)。

范例4-13•对Avro数据文件进行排序的MapReduce程序

public class AvroSort extends Configured implements Tool {
    static class SortMapper<K> extends AvroMapper<K, Pair<K, K>> {
        public void map(K datum, AvroCollector<Pair<K, K>> collector, Reporter reporter) throws IOException {
            collector.collect(new Pair<K, K>(datum, null, datum, null));
        }
    }
    static class SortReducer<K> extends AvroReducer<Ki K, K> {
        public void reduce(K key, Iterable<K> values, AvroCollector<K> collector,Reporter reporter) throws IOException {
            for (K value : values) {
                collector.collect(value);
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 3) {
            System.err.printf(
            "Usage: %s [generic options] <input> <output> <schema-file>\n", getClass().getSimpleName());
            ToolRunner. printGenericCommandUsage(System, err);
            return -1;
        }
        String input = args[0];
        String output = args[1];
        String schemaFile = args[2];
        
        JobConf conf = new JobConf(getConf(), getClass());
        conf.setJobName("Avro sort");
        FilelnputFormat.addInputPath(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));
        
        Schema schema = new Schema.Parser().parse(new File(schemaFile));
        AvroJob. setInputSchema (conf, schema);
        Schema intermediateSchema = Pair.getPairSchema(schema,schema);
        AvroJob.setMapOutputSchema(conf, intermediateSchema);
        AvroJob.setOutputSchema(conf, schema);
        AvroJob.setMapperClass(conf, SortMapper.class);
        AvroDob.setReducerClass(conf, SortReducer.class);
        
        JobClient.runDob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvroSort(), args);
        System.exit(exitCode);
    }
}

这个程序(使用了泛型的Avro映射,因此无需生成任何代码)能够对由泛型类型参数K表示的任何Java类型的Avro记录进行排序。我们选择值的类型与键的类型相同,以便在值按照键分组后,我们可以对有多值对应于一个键的情况(根据排序函数)输出同一个键对应的所有值,不丢失任何记录。Mapper输出键和值的同时输出具有同一个键的org. apache, avro. mapred.Pair对象。Reducer作为一个识别器,将值(单值)传递给输出,并写入Avro数据文件。

排序发生在MapReduce的混洗过程中,并且排序函数由Avro模式确定并由 它传入程序中。下面我们使用SortedStringPair.avsc模式对右边的字段按照降序排列,实现对先前创建的pairs.avro文件排序。首先,我们使用Avro工具JAR检査输入数据:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro
{"left":"a","right":"1"}
{"left":"c","right":"2"}
{"left":"b","right":"3"}
{"left":"b","right":"2"}

然后我们运行排序程序:

% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \
ch04-avro/src/main/resources/SortedStringPair.avsc

最后,检査输出并査看是否正确排序。

% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-00000.avro
{"left":"b","right":"3"}
{"left":"c","right":"2"}
{"left":"b","right":"2"}
{"left":"a","right":"1"}

转载请注明:全栈大数据 » 4.4.8使用Avro MapReduce进行排序

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址