整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

7.2. 输入格式 7.2.1. 输入分片与记录

hadoop 花牛 9℃ 0评论

从一般的文本文件到数据库,Hadoop可以处理很多不同类型的数据格式。本节将探讨数据格式问题

2章中讲过,一个输入分片(split)就是一个由单个map操作来处理的输入块。每一个map操作只处理一个输入分片。每个分片被划分为若干个记录,每条记录就是一个键/值对,map —个接一个地处理记录。输入分片和 记录都是逻辑概念,不必将它们对应到文f,尽管其常见形式都是文件。 在数据库的场景中,一个输入分片可以对应*于一个表上的若干行,而一条记 录对应到一行pBInputFormat正是这么做的,这种输入格式用于从关系数据库 读取数据输入分片在hva中被表示为InpiitSplit接口(和本章提到的所有类一样,它也在 org.apache.hadoop.mapred 包中)。

public abstract class InputSplit {
    public abstract long getLength() throws IOException, InterruptedException;
    public abstract String[] getLocations() throws IOException InterruptedException;
}

InputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名)。注意,分片并不包含数据本身,而是指向数据的引用(reference)。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而分片大小用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间(这也是贪婪近似算法的一个实例)

MapReduce应用开发人员不必直接处理InputSplit,因为它是由InputFormat创建的。InputFormat负责产生输入分片并将它们分割成记录。在我们探讨InputFormat的具体例子之前,先简单看一下它在 MapReduce中的用法。接口如下:

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
    public abstract RecordReader<K, V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;
}

运行作业的客户端通过调用getSplits()计算分片,然后将它们发送到jobtrackerjobtracker使用其存储位置信息来调度map任务从而在tasktracker上处理这些分片数据。在tasktracker上,map任务把输入分片传给InputFormatgetRecordReader()方法来获得这个分片的 RecordReaderRecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键/值对,然后再传递给map函数。查看Mapperrun()方法可以看到这些情况:

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
        map (context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
}

运行setup()之后,再重复调用Context上的nextKeyValue()委托给RecordRader的同名函数实现来为mapper产生keyvalue对象。通过Context, key 和 value 从 RecordReader 出重新取出传递给map()。当 reader 读到 stream 的结尾时,nextKeyValue()方法返回 false, map 任 务运行其cleanUp()方法,然后结束。

这段代码没有显示,但由于效率的原因,RecordReader程序每次调用gerCurrentKey()和getCurreritValue()时将返回相同的键/值对象。只是这些对象的内容被readernextKeyValue()方法改变。用户对此可能有些不解。在map()函数之外有对键/值的引用时,这可能引起问题,因为它的值会在没有警告的情况下被改变。如果确实需要这样的引用,那么请保存你想保留的对象的一个副本,例如,对于Text对象,可以使用它的复制构造函数:new Text(value)

这样的情况在reducer中也会发生。reducer迭代器中的值对象被反复使用,所以,在调用迭代器之间,一定要复制任何需要保留的任 何对象(参见范例8-14)

最后注意,Mapper的run()方法是公共的,可以由用户定制。MultithreadedMapRunner是另一个MapRunnable接口的实现,它可以使用可配置个数的线程来并发运行多个mapper(mapreduce.mapper.multithreadedmapper.threads设置)。对于大多数数据处理任务来,默认的执行机制没有优势。但是,对于因为需要连接外部服务器而造成单个记录处理时间比较长的mapper来说,它允许多个mapper在同一个JVM下以尽量避免竞争的方式执行。参见16.3.2节,其中谈到的Fetcher正在运行的多线程类MapRunner)便使用了Multithreaded的版本(使用老版本的API)

转载请注明:全栈大数据 » 7.2. 输入格式 7.2.1. 输入分片与记录

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址