整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

7.2.1.7. 把整个文件作为一条记录处理

hadoop 花牛 13℃ 0评论

有时,mapper需要访问一个文件中的全部内容。即使不分割文件,仍然需要一个RecordReader来读取文件内容作为record的值。范例7-2WholeFilelnputFormat展示了实现的方法。

范例7-2.把整个文件作为一条记录的InputFormat

public class WholeFilelnputFormat
    extends FileinputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(
    InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader•initialize(split, context);
        return reader;
    }
}

在 WholeFilelnputFormat中,没有使用键,此处表示为NullWritable,值是文件内容,表示成BytesWritable实例。它定义了两个方法:一个是将isSplitable()方法重载返回false值,来指定输入文件不被分片;另一个是实现了createRecordReader()方法来返回一个定制的RecordReader实现,如范例7-3所示。

范例7-3. WholeFilelnputFormat使用RecordReader将整个文件读为一条记录

class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDatalnputStream in = null;
            try {
                in = fs.open(file);
                IOUtiIs.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length);
            finally {
                IOUtils•closestream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }
    @Override
    public void close() throws IOException {
        //do nothing
    }
}

WholeFileRecordReader负责将FileSplit转换成一条记录,该记录的键是null ,值是这个文件的内容。因为只有一条记录,WholeFileRecordReader要么处理这条记录,要么不处理,所以它维护一个名称为processed的布尔变量来表示记录是否被处理过。如果当nextKeyValue()方法被调用时,文件没有被处理过,就打开文件,产生一个长度是文件长度的字节数组,并用HadoopIOUtils类把文件的内容放入字节数组。然后再被传递到next()方法的BytesWritable实例上设置数组,返回值为true则表示成功读取记录

其他一些方法都是一些直接的用来生成正确的键和值类型、获取reader位置和状态的方法,还有一个close()方法,该方法由MapReduce框架在reader完成后调用。

现在演示如何使用WholeFilelnputFormat。假设有一个将若干个小文件打包成顺序文件的MapReduce作业,键是原来的文件名,值是文件的内容。如范例7-4所示。

范例7-4.将若干个小文件打包成顺序文件的MapReduce程序

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {

     

    static class SequenceFileMapper
    extends Mapper<NullWritable, BytesWritable^ Text, BytesWritable> {
        private Text filenameKey;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            filenameKey = new Text(path.toString());
        }
        @Override
        protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
            context.write(filenameKey, value);
        }
    }
    @Override
    public int run(String[] args) throws IOException {
        Job job = JobBuilder.parselnputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(SequenceFileMapper.class);
        return job.waitForCompletion(true) ? 0 1;
    }

     

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverterO, args);
        System.exit(exitCode);
    }
}

由于输入格式是wholeFilelnputFormat,所以mapper只需要找到文件输入分片的文件名。通过将InputSplitcontext强制转换为FileSplit来实现这点,后者包含一个方法可以获取文件路径。reducer的类型是相同的(没有明确设置),输出格式是SequenceFileOutputFormat。

以下是在一些小文件上运行样例。此处使用了两个reducer,所以生成两个 输出顺序文件:

% hadoop jar job. jar SmallFilesToSequenceFileConverter \
-conf conf/hadoop-localhost. xml -D mapred.reduce.tasks=2 input/smallfiles output

由此产生两部分文件,每一个对应一个顺序文件,可以通过文件系统shell-text选项来进行检査:

% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00000
hdfs://localhost/user/tom/input/smallfiles/a 61 61 61 61 61 61 61 61 61 61
hdfs://localhost/user/tom/input/smallfiles/c 63 63 63 63 63 63 63 63 63 63
hdfs://localhost/user/tom/input/smallfiles/e
% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00001
hdfs://localhost/user/tom/input/smallfiles/b 62 62 62 62 62 62 62 62 62 62
hdfs://localhost/user/tom/input/smallfiles/d 64 64 64 64 64 64 64 64 64 64
hdfs://localhost/user/tom/input/smallfiles/f  66 66 66 66 66 66 66 66 66 66

输入文件的文件名分别是a、bcdef,每个文件分别包含10个相应字母(比如,a文件中包含10个“a”字母)e文件例外,它的内容为空。我们可以看到这些顺序文件的文本表示,文件名后跟着文件的十六进制的表示。

至少有一种方法可以改进我们的程序。前面提到,一个mapper处理一个文件的方法是低效的,所以较好的方法是继承CombineFilelnputFormat而不是FilelnputFormat

转载请注明:全栈大数据 » 7.2.1.7. 把整个文件作为一条记录处理

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址