整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

7.3.3. 多个输出

hadoop 花牛 40℃ 0评论

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer—个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能需要对输出的文件名进行控制或让每个reducer输出多个文件。Mapreduce为此提供了MultipleOutputFormat类。

3.3.1. 范例:数据分割

考虑这样一个需求:按气象站来区分气象数据。这需要运行一个作业,作业的输出是每个气象站一个文件,此文件包含该气象站的所有数据记录。

一种方法是每个气象站对应一个reducer。为此,我们必须做两件事。第一,写一个partitioner.把同一个气象站的数据放到同一个分区。第二,把作业的reducer数设为气象站的个数。partitioner如下:

public class StationPartitioner extend Partitioner<LongWritable, Text> {

    private NcdcRecordParser parser = new NcdcRecordParser();

    @Override

    public int getPartition(LongWritable key, Text value^ int numPartitions) {

        parser.parse(value);

        return getPartition(parser.getStationId());

    }

    private int getPartition(String stationld) {

        …

    }

}

这里没有给出getPartition(String)方法的实现,它将气象站ID转换成分区索引号。为此,它的输入是一个列出所有气象站ID的列表,然后返回 列表中气象站ID的索引。

这样做有两个缺点。第一,需要在作业运行之前知道分区数和气象站的个数。虽然NCDC数据集提供了气象站的元数据,但无法保证数据中的气象站ID与元数据匹配。如果元数锯中有某个气象站但数据中却没有该气象站 的数据,就会浪费一个reducer任务槽。更糟糕的是,数据中有但元数据中却没有的气象站,也没有对应的reducer任务槽,只好将这个气象站扔掉。解决这个问题的方法是写一个作业来抽取唯一的气象站ID,但很遗憾,这需要额外的作业来实现。

在老版本的MapReduce API中,有两个类用于产生多个输出:MultipleOutputFormat 和MultipleOutputs。简单地说,虽然MultipleOutputs更具有特色,但MultipleOutputs在输出目录结构和文件命名上有更多的控制。新版本API中的MultipleOutputs结合了老版本API中两种多个输出类的特点。本书网站上的代码包.含了本节例子的老版本API等价样例,该样例使用了MultipleOutputsMuItipleOutputFormat

第二个缺点更微妙。一般来说,让应用程序来严格限定分区数并不好,.因为可能导致分区数少或分区不均。让很多reducer做少量工作不是一个高效的作业组织方法,比较好的办法是使用更少reducer做更多的事情,因为运行任务的额外开销减少了。分区不均的情况也是很难避免的。不同气象站的数据量差异很大:有些气象站是一年前刚投入使用的,而另一些气象站可能已经工作近一个世纪了。如果其中一些reduce任务运行时间远远超过另一些,作业执行时间将由它们决定,从而导致作业的运行时间超出预期。

在以下两种特殊情况下,让应用程序来设定分区数(等价于reducer的个数)是有好处的:

0reducer这是一个很罕见的情况:没有分区,因为应用只需要执行map任务

1个reducer可以很方便地运行若干小作业,从而把以前作业的 输出合并成单个文件。前提是数据量足够小,以便一个reducer能轻松处理

 

最好让集群为作业决定分区数:集群的reducer任务槽越多,作业完成就越快。这就是默认的HashPartitioner表现如此出色的原因,因为它处理的分区数不限,并且确保每个分区都有一个很好的键组合使分区更均匀

如果使用HashPartitioner,每个分区就会包含多个气象站,因此,要实 现每个气象站输出一个文件,必须安排每个reducer写多个文件,由此就有了MultipleOutput

 

 

3.3.2. 关于 MultipleOutput 类

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的 mapper)创建多个文件。采用name-m-nnnnn形式的文件名用于map输出, name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指明块号的整数(0开始)。块号保证从不同块 (mapperreducer)写的输出在相同名字情况下不会冲突。

范例7-5显示了如何使用MultipleOutputs按照气象站划分数据。

 

范例7-5.用MultipleOutput类将整个数据集分区到以气象站ID命名的文件

public class PartitionByStationUsingMultipleOutputs extends Configured implements Tool {
    static class StationMapper extends Mapper<LongWritableJ Text, Text, Text> {
        private NcdcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            parser.parse(value);
            context.write(new Text(parser.getStationId()), value);
        }
    }
    static class MultipleOutputsReducer
    extends Reducer<Text, Text, NullWritable, Text> {
        private MultipleOutputs<NullWritable, Text> multipleOutputs;

         

        @Override
        protected void setup(Context context)
        throws IOException InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
        }
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
            for (Text value : values) {
                multipleOutputs.write(NullWritable.get(),value,key.toString());
            }
        }
        @Override
        protected void cleanup(Context context)
        throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setMapperClass(StationMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setReducerClass(MultipleOutputsReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args);
        System.exit(exitCode);
    }
}

在生成输出的reducer中,在setup()方法中构造一个MultipleOutputs的实例并将它赋给一个实例变量。在reduce()方法中使用MultipleOutputs实例来写输出,而不是contextwrite()方法作用于键、值和名字。这里使用气象站标识符作为名字,因此最后产生的输出名字的形式为 station_identifier_r-nnnnn

运行一次后,前面几个输出文件的命名如下:

/output/010010-99999-r-00027

/output/010050-99999-r-00013

/output/010100-99999-r-00015

/output/010280-99999-r-00014

/output/010550-99999-r-00000

/output/010980-99999-r-00011

/output/011060-99999-r-00025

/output/012030-99999-r-00029

/output/012350-99999-r-00018

/output/012620-99999-r-00004

在MultipleOutputs的write()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录是有可能的。例如,下面的改动将数据根据气象站和年份进行划分,这样每年的数据就被包含到一个名为气象站ID的目录中(例如029070-99999/1901/part-r-00000):

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
    for (Text value : values) {
        parser.parse(value);
        String basePath = String.format("%s/%s/part", parser.getStationId(), parser.getYear());
        multipleOutputs.write(NullWritable.get(), value, basePath);
    }
}

MultipleOutput传递给mapperOutputFormat该例子中为TextOutputFormat但可能有更复杂的情况。例如,可以创建命名的输出,每个都有自己的OutputForamt、键和值的类型(这可以与mapperreducer的输出类型不相同)。此外,mapperreducer可以为每条被处理的记录写多个输出文件。可査阅Java帮助文档获取更多的信息。

转载请注明:全栈大数据 » 7.3.3. 多个输出

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址