整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第四章 Hadoop的I/O操作

Hadoop的I/O操作

Hadoop自带一套原子操作用于数据I/O操作。其中有一些技术比Hadoop本身更常用,如数据完整性保持和压缩,但在处理多达好几个TB的数据集时,特别值得关注。其他一些则是Hadoop工具或API,它们所形成的构建模块可用于开发分布式系统,比如序列化操作和在盘(ondisk)数据结构。

4.1. 数据完整性

Hadoop用户肯定都希望系统在存储和处理数据时不会丢失或损坏任何数据。尽管磁盘或网络上的每个I/O操作不太可能将错误引入自己正在读/写的数据中,但是如果系统中需要处理的数据量大到Hadoop的处理极限时,数据被损坏的概率还是很髙的。

检测数据是否损坏的常见措施是,在数据第-次引人系统时计算校验和(checksum)并在数据通过一个不可靠的通道进行传输时再次计算校验和,这样就能发现数据是否损坏。如果计算所得的新校验和与原来的校验和不匹配,我们就认为数据已损坏。但该技术并不能修复数据——它只能检测出数据错误。(这正是不使用低端硬件的原因。具体说来,一定要使用ECC内存。)注意,校验和也是可能损坏的,不只是数据,但由于校验和比数据小得多,所以损坏的可能性非常小。

常用的错误检测码是CRC-32(循环冗余校验),任何大小的数据输入均计算得到一个32位的整数校验和。

4.1.1. HDFS的数据完整性

HDFS会对写入的所有数据计算校验和,并在读取数据时验证校验和。它针对每个由io.bytes.per.checksum指定字节的数据计算校验和默认情况下为512个字节,由于CRC-32校验和是4个字节,所以存储校验和的额外开销低于1%。

datanode负责在收到数据后存储该数据及其验证校验和。它在收到客户端的数据或复制其他datanode的数据时执行这个操作。正在写数据的客户端将数据及其校验和发送到由一系列datanode组成的管线,管线中最后一个datanode负责验证校验和。如果datanode检测到错误,客户端便会收到一个ChecksumException异常,它是IOException异常的一个子类,后者应以应用程序特定的方式来处理,比如重试这个操作。

客户端从datanode读取数据时,也会验证校验和,将它们与datanode中存储的校验和进行比较。每个datanode均持久保存有一个用于验证的校验和日志(persistent log of checksum verification),所以它知道每个数据块的最后一次验证时间。客户端成功验证一个数据块后,会告诉这个datanode,datanode由此更新日志。保存这些统计信息对于检测损坏的磁盘很有价值。

不只是客户端在读取数据块时会验证校验和,每个datanode也会在一个后台线程中运行一个DataBlockScanner,从而定期验证存储在这个datanode上的所有数据块。该项措施是解决物理存储媒体上位损坏的有力措施。

由于HDFS存储着每个数据块的复本(replica),因此它可以通过数据复本来修复损坏的数据块,进而得到一个新的、完好无损的复本。基本思路是,客户端在读取数据块时,如果检测到错误,首先向namenode报告已损坏的数据块及其正在尝试读操作的这个datanode,再抛出ChecksumException异常。namenode将这个数据块复本标记为已损坏,因此,它不会将处理请求直接发送到这个节点,或尝试将这个复本复制到另一个datanode。之后,它安排这个数据块的一个复本复制到另一个datanode,如此一来,数据块的复本因子(replication factor)又回到期望水平。此后,已损坏的数据块复本便被删除。

在使用open()方法读取文件之前,将false值传递给Filesystem对象的setVerifyChecksum()方法,即可以禁用校验和验证。如果在命令解释器中使用带-get选项的-ignoreCrc命令或者使用等价的-copyToLocal命令,也可以达到相同的效果。如果有一个已损坏的文件需要检査并决定如何处理,这个特性是非常有用的。例如,也许你希望在删除该文件之前尝试看看是否能够恢复部分数据。

4.1.2. LocalFileSystem

HadoopLocalFileSystem执行客户端的校验和验证。这意味着在你写入一个名为filename的文件时,文件系统客户端会明确地在包含每个文件块校验和的同一个目录内新建一个名为.filename.crc的隐藏文件。就像HDFS一样,文件块的大小由属性io.bytes.per.checksum控制,默认为512个字节。文件块的大小作为元数据存储在.crc文件中,所以即使文件块大小的设置已经发生变化,仍然可以正确读回文件。在读取文件时需要验证校验和,并且如果检测到错误,LocalFileSystem还会抛出一个ChecksumException异常。

校验和的计算代价是相当低的(在Java中,它们是用本地代码实现的),一般只是增加少许额外的读/写文件时间。对大多数应用来说,付出这样的额外开销以保证数据完整性是可以接受的。此外,我们也可以禁用校验和计算,特别是在底层文件系统本身就支持校验和的时候。在这种情况下,使用RawLocalFileSystem替代LocalFileSystem。要想在一个应用中实现全局校验和验证,需要将fs.file.impl属性设置为org.apache.hadoop.fs.RawLocalFileSystem进而实现对文件URI的重新映射。还有一个可选方案可以直接新建一个RawLocalFileSystem实例。如果你想针对一些读操作禁用校验和,这个方案非常有用。示例如下:

Configuration conf=...
FileSystemfs=newRawLocalFileSystem();
fs.initialize(null,conf);

4.1.3. ChecksumFileSystem

LocalFileSystem通过ChecksumFileSystem来完成自己的任务,有了这个类,向其他文件系统(无校验和系统)加入校验和就非常简单,因为ChecksumFileSystem类继承自 FileSystem类。一般用法如下:

FileSystem rawFs =...
FileSystem checksummedFs = new ChecksumFileSystem(rawFs);

底层文件系统称为“源”(raw)文件系统,可以使用ChecksumFileSystem 实例的getRawFileSystem()方法获取它。ChecksumFileSystem 类还有其他一些与校验和有关的有用方法,比如getChecksumFile()可以获得任意一个文件的校验和文件路径。请参考文档了解其他方法。

如果ChecksumFileSystem类在读取文件时检测到错误,会调用自己的reportChecksumFailure()方法。默认实现为空方法,但 LocalFileSystem类会将这个出错的文件及其校验和移到同一存储设备上一个名为bad_files的边际文件夹(side directory)中。管理员应定期检査这些坏文件并采取相应的行动。

 

4.2. 压缩

文件压缩有两大好处:减少存储文件所需要&磁盘空间,并加速数据在网络和磁盘上的传输。这两大好处在处理大量数据时相当重要,所以我们值得仔细考虑在Hadoop中文件压缩的用法。

有很多种不同的压缩格式、工具和算法,它们各有千秋。表4-1列出了与Hadoop结合使用的常见压缩方法。

4-1.压缩格式总结

压缩格式

工具

算法

文件扩展名

否可切分

DEFLATE

DEFLATE

.deflate

Gzip

gzip

DEFLATE

.gz

bzip2

bzip2

bzip2

.bz2

LZO

Izop

LZO

.Izo

LZ4

LZ4

.lz4

Snappy

Snappy

.snappy

· DEFLATE是一个标准压缩算法,该算法的标准实现是zlib。没有可用干生成 DEFLATE文件的常用命令行工具,因为通常都用gzip格式。注意,gzip文件格式 只是在DEFLATE格式上增加了文件头和一个文件尾。.deflate文件扩展名是Hadoop约定的。

· 但如果LZO文件已经在预处理过程中被索引了,那么LZO文件是可切分的。

所有压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。表4-1列出的所有压缩工具都提供9个不同的选 项来控制压缩时必须考虑的权衡:选项-1为优化压缩速度,-9为优化压缩空间。例如,下述命令通过最快的压缩方法创建一个名为识的压缩文件:

gzip -1 file

不同压缩工具有不同的压缩特性。gzip是一个通用的压缩工具,在空间/时间性能的权衡中,居于其他两个压缩方法之间。bzip2的压缩能力强于gzip,但压缩速度更慢一点。尽管bzip2的解压速度比压缩速度快,但仍比其他压缩格式要慢一些。另一方面,LZOLZ4和Snappy均优化压缩速度,其速度比gzip快一个数量级,但压缩效率稍逊一筹。SnappyLZ4的解压缩速度比LZO高出很多。

4-1中的“是否可切分”列表示对应的压缩算法是否支持切分 (splitable),也就是说,是否可以搜索数据流的任意位置并进一步往下读取 数据。可切分压缩格式尤其适合MapReduce

4.2.1. codec

codec实现一种压缩-解压缩算法。在Hadoop中,一个对CompressionCodec接口的实现代表一个codec。例如,GzipCodec包装了gzip的压缩和解压缩算法。表4-2列举了 Hadoop实现的codec

4-2. Hadoop 的压缩codec

压缩格式

HadoopCompressionCodec

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

LZO

com.hadoop.compression.lzo.LzopCodec

LZ4

Org.apache.hadoop.io.compress.Lz4Codec

Snappy

Org.apache.hadoop.io.compress.SnappyCodec

LZO代码库拥有GPL许可,因而可能没有包含在Apache的发行版本中,因此Hadoopcodec需要单独下载,代码库包含有修正的软件错误及其他一些工具。LzopCodec与lzop工具兼容,LzopCodec基本上是LZO格式的但包含额外的文件头,因此这通常就是你想要的。也有针对纯LZO格式的LzoCodec,并使用.Izo_deflate作为文件扩展名(类似于DEFLATE,但纯gzip并不包含文件头)。

通过CompressionCodec对数据流进行压缩和解

CompressionCodec包含两个函数,可以轻松用于压缩和解压缩数据。如 果要对写入输出数据流的数据进行压缩,可用createOutputStream(OutputStream out)方法在底层的数据流中对需要以压缩格式写入在此之前尚未压缩的数据新建一个CompressionOutputStream对象。相反,对输入数据流中读取的数据进行解压缩的时候,则调用createlnputStream (InputStream in)获取CompressionlnputStream,可通过该方法从底层 数据流读取解压缩后的数据。

CompressionOutputStreamCompressionlnputStream,类似于Java.util.zip.DeflaterOutputStream  java.util.zip.DeflaterlnputStream,只不过前两者能够重置其底层的压缩或解压缩方法,对于某些将部分数据流section of data stream)压缩为单独数据块(block)的应用——例如SequenceFile,这个能力是非常重要的。

范例4-1显示了如何用API来压缩从标准输入中读取的数据并将其写到标准输出。

范例4-1.该程序压缩从标准输入读取的数据,然后将其写到标准输出

public class StreamCompressor {
    public static void main(String[] args) throws Exception {
        String codecClassname = args[0];
        Class<?> codecClass = Class.forName(codecClassname);
        Configuration conf = new Configuration();
        CompressionCodec codec = (CompressionCodec)Reflectionlltils.newlnstance(codecClass, conf);
        CompressionOutputStream out = codec.createOutputStream(System.out);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
    }
}

该应用希望将符合CompressionCodec实现的完全合格名称作为第一个命令行参数。我们使用ReflectionUtils新建一个codec实例,并由此获得在 System.out上支持压缩的一个包裹方法。然后,对IOUtils对象调用copyBytes()方法将输入数据复制到输出,(输出由CompressionOutputStream对象压缩)。最后,我们对CompressionOutputStream对象调用finish()方法,要求压缩方法完成到压缩数据流的写操作,但不关闭这个数据流。我们可以用下面这行命令做一个测试,通过GzipCodec的StreamCompressor对象对字符串Text进行压缩,然后使用gunzip中从标准输入中对它进行读取并解压缩操作

% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip

Text

通过CompressionCodecFactory推断CompressionCodec

在读取一个压缩文件时,通常可以通过文件扩展名推断需要使用哪个codec。如果文件以.gz结尾,则可以用GzipCodec来读取,如此等等。表4-1为每一种压缩格式列举了文件扩展名。

通过使用其getCodec()方法,CompressionCodecFactory提供了一种可以将文件扩展名映射到一个CompressionCodec的方法,该方法取文件的Path对象作为参数。范例4-2所示的应用便使用这个特性来对文件进行解压缩。

范例4-2.该应用根据文件扩展名选取codec解压缩文件

public class FileDecompressor {
    public static void main(String[] args) throws Exception {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);
        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factony.getCodec(inputPath);
        if (codec == null) {
            System.err.printIn("No codec found for " + uri);
            System.exit(l);
        }
        String outputUri =CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
        InputStream in = null;
        OutputStream out = null;
        try{
            in = codec.createlnputstream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        } finally {
            IOUtils.closestream(in);
            IOUtils.closeStream(out);
        }
    }
}

一旦找到对应的codec,便去除文件扩展名形成输出文件名,这是通过CompressionCodecFactory对象的静态方法removeSuffix()来实现的。按照这种方法,一个名为file.gz的文件可以通过调用该程序压缩为名为file的文件:

% hadoop FileDecompressor file.gz

CompressionCodecFactory从io.compression.codecs属性(参见表 4.3)定义的一个列表中找到codec。在默认情况下,该列表列出了Hadoop提供的所有codec,所以只有在你拥有一个希望注册的定制codec(例如外部管理的LZO codec)时才需要加以修改。每个codec都知道自己默认的文件扩展名,因此CompressionCodecFactory可通过搜索注册的codec找到匹配指定文件扩展名的codec(如果有的话)。

4-3.压缩codec的属性

属性名称

类型

默认值

描述

io.compression.codecs

逗号分隔 的类名

•org.apache.hadoop.io. compress.DefaultCodec

•org.apache.hadoop.io. compress.GzipCodec

•org.apache.hadoop.io. compress.Bzip2Codec

用于压缩/解压缩的各 个CompressionCodec类的列表

原生类库

为了性能,最好使用"原生"(native)类库来实现压缩和解压缩。例如,在一个测试中,使用原生gzip类库可以减少约一半的解压缩时间和约10%的压缩时间(与内置的Java实现相比)。表4-4给出了每种压缩格式的Java实现和原生类库实现。并非所有格式都有原生实现(如bzip2),有些则只有原生类库实现(如LZO)。

4-4.压缩代码库的实现

压缩格式

是否有Java实现

否有原生实现

DEFLATE

gzip

bzip2

LZO

LZ4

Snappy

Hadoop本身包含有为32位和64位Linux构建的压缩代码库(位于lib/native目录)。其他平台,需要根据Hadoop英文维基页面(http://wiki.apache.org/hadoop/nativeHadoop)的指令编译代码库。

可以通过Java系统的java.library.path属性指定原生代码库。bin文件夹中的hadoop脚本可以帮你设置该属性,但如果不用这个脚本,则需要在应用中手动设置该属性。

默认情况下,Hadoop会根据自身运行的平台搜索原生代码库,如果找到相应的代码库就会自动加载。这意味着,你无需为了使用原生代码库而修改任何设置。但是,在某些情况下,例如调试一个压缩相关问题时,可能需要禁用原生代码库。将属性hadoop.native.lib的值设置成false即可,这可确保使用内置的Java代码库(如果有的话)。

 CodecPool

如果使用的是原生代码库并且需要在应用中执行大量压缩和解压缩操作, 可以考虑使用CodecPool,它支持反复使用压缩和解压缩,以分摊创建这些对象的开销。

范例4-3中的代码显示了 API函数,不过在这个程序中,它只新建了一个Compressor,并不需要使用压缩/解压缩池。

范例4-3.使用压缩池对读取自标准输入的数据进行压缩,然后将其写到标准输出

public class PooledStreamCompressor {
    public static void main(String[] args) throws Exception {
        String codecClassname = args[0];
        Class<?> codecClass = Class.forName(codecClassname);
        Configuration conf = new Configuration();
        CompressionCodec codec = (CompressionCodec)ReflectionUtils.newlnstance(codecClass, conf);
        Compressor compressor = null;
        try {
            compressor = CodecPool.getCompressor(codec);
            CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
            IOUtils.copyBytes(System.in, out, 4096, false);
            out.finish();
        }finally {
            CodecPool.returnCompressor(compressor);
        }
    }
}

codec 的重载方法 createOutputStream()中,对于指定的 CompressionCodec 我们从池中获取一个Compressor实例。通过使用finally数据块,我们 在不同的数据流之间来回复制数据,即使出现IOException异常,也可以确保compressor可以返回池中。

4.2.2压缩和输入分片

在考虑如何压缩将由MapReduce处理的数据时,理解这些压缩格式是否支持切分(splitting)是非常重要的。以一个存储在HDFS文件系统中且压缩前大小为1GB的文件为例。如果HDFS的块大小设置为64MB,那么该文件将被存储在16个块中,把这个文件作为输人数据的MapReduce作业,将创建16个数据块,其中每个数据块作为一个map任务的输入。

现在,经过gzip压缩后,文件大小为1GB。与以前一样,HDFS将这个文件保存为16个数据块。但是,将每个数据块单独作为一个输入分片是无法实现工作的,因为无法实现从gzip压缩数据流的任意位置读取数据,所以让map任务独立于其他任务进行数据读取是行不通的。gzip格式使用DEFLATE算法来存储压缩后的数据,而DEFLATE算法将数据存储在一系列连续的压缩块中。问题在于从每个块的起始位置进行读取与从数据流的 任意位置开始读取时一致并接着往后读取下一个数据块,因此需要与整个数据流进行同步。由于上述原因,gzip并不支持文件切分。

在这种情况下,MapReduce会采用正确的做法,它不会尝试切分gzip压缩文件,因为它知道输入是gzip压缩文件(通过文件扩展名看出)且gzip不支持切分。这是可行的,但牺牲了数据的本地性:一个map任务处理16个HDFS块,而其中大多数块并没有存储在执行该map任务的节点。而且,map任务数越少,作业的粒度就较大,因而运行的时间可能会更长。

在前面假设的例子中,如果文件是通过LZO压缩的,我们会面临相同的问题,因为这个压缩格式也不支持数据读取和数据流同步。但是,在预处理LZO文件的时候使用包含在Hadoop LZO库文件中的索引工具是可能的。该工具构建了切分点索引,如果使用恰当的MapReduce输入格式可有效实现文件的可切分特性。

另一方面,bzip2文件提供不同数据块之间的同步标识(pi48位近似值), 因而它支持切分。可以参见表4-1,了解每个压缩格式是否支持切分。

应该使用哪种压缩格式?

Hadoop应用处理的数据集非常大,因此需要借助于压缩。使用哪种压缩格式与待处理的文件的大小、格式和所使用的工具相关。下面有一些建议,大致是按照效率从高到低排列的。

· 使用容器文件格式,例如顺序文件(第130页)、RCFile(第438 页)或者Avro数据文件(第117页),所有这些文件格式同时支持压缩和切分。通常最好与一个快速压缩工具联合使用,例如 LZOLZ4,或者Snappy。

· 使用支持切分的压缩格式,例如bzip2(尽管bzip2非常慢),或者使用通过索引实现切分的压缩格式,例如LZO

· 在应用中将文件切分成块,并使用任意一种压缩格式为每个数据块建立压缩文件(不论它是否支持切分)。这种情况下,需要合理选择数据块的大小,以确保压缩后数据块的大小近似于HDFS 块的大小。

· 存储未经压缩的文件。

对于大文件来说,不用使用不支持切分整个文件的压缩格式,因为会失去数据的本地特性,进而造成MapReduce应用效率低下。

4.2.3在MapReduce中使用压缩

前面讲到通过 CompressionCodecFactory 来推断 CompressionCodec 时指出,如果输入文件是压缩的,那么在根据文件扩展名推断出相应的codec 后,MapReduce会在读取文件时自动解压缩文件。

要想压缩MapReduce作业的输出,应在作业配置过程中将mapned.output.compress属性设为true 和 mapred. output, compression.codec属性设置为打算使用的压缩codec的类名。另一种方案是在FileOutputFormat中使用更便捷的方法设置这些属性,如范洳4-4所示。

范例4-4.对查找最高气温作业所产生输出进行压缩

public class MaxTemperatureWithCompression {
    public static void main(String[] args) throws IOException {
        if (args.length != 2) {
            System.err.printIn("Usage: MaxTemperatureWithCompression <input path>" +"<output path>");
            System.exit(-1);
        }
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        FilelnputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

我们按照如下指令对压缩后的输入运行程序(输出数据不必使用相同的压缩格式进行压缩,尽管本例中不是这样):

% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output

最终输出的每个部分都是经过压缩的。在这里,只有一部分结果:

% gunzip -c output/part-r-00000.gz

1949 111

1950 22

如果为输出生成顺序文件(sequence file),可以设置mapred.output.compression.type属性来控制限制使用压缩格式。默认值是RECORD,即针对每条记录进行压缩。如果将其改为BLOCK,将针对一组记录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。

SequenceFileOutputFormat类中还有一个静态方法 putCompressionType()可用来便捷地设置该属性。

4-5归纳概述了用于设置MpaReduce作业输出的压缩格式的配置属性。如果你的MapReduce驱动使用Tool接口,则可以通过命令行将这些属性传递给程序,这比通过程序代码来修改压缩属性更加简便。

4-5. MapReduce的压缩属性

属性名称

类型

默认值

描述

mapred.output,compress

boolean

false

压缩输

mapred.output.compession.codec

类名称

org.apache.hadoop.io. compress.DefaultCodec

map输出所用的压缩codec

Mapred.output.compression.type

String

RECORD

SqeuenceFile 的输出可以使用的压缩类型:NONE、RECORD或者BLOCK

map任务输出进行压缩

尽管MapReduce应用读/写的是未经压缩的数据,但如果对map阶段的中间输入进行压缩,也可以获得不少好处。由于map任务的输出需要写到磁盘并通过网络传输到reducer节点,所以如果使用LZOLZ4或者Snappy 样的快速压缩方式,是可以获得性能提升的,因为需要传输的数据减少了。启用map任务输出压缩和设置压缩格式的配置属性如表4-6所示。

4-6. map任务输出的压缩属性

属性名称

类型

默认值

描述

mapred.compress.map. output

boolean

false

对map任务输出进行压缩

mapred.map.output. compression.codec

Class

ong.apache.hadoop.io. compress.DefaultCodec

map输出所用的压缩codec

下面是在作业中启用map任务输出gzip压缩格式的代码(使用新API):

Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);
Job job = new Job(conf);

 

在旧的API中,JobConf对象中可通过更便捷的方法实现该功能:

conf.setCompressMapOutput(true);
conf.setMap0utputCompressorClass(GzipCodec.class);

4.3. 序列化

序列化(serialization)是指将结构化对象转化为字节流以便在网络传输或写到磁盘进行永久存储的过程。反序列化(deserialization)是指将字节流转回结构化对象的逆过程。

序列化在分布式数据处理的两大领域经常出现:进程间通信和永久存储。

Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用” (remote procedure call, RPC)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下。

· 紧凑

紧凑格式能充分利用网络带宽(数据中心中最稀缺的资源)。

· 快速

进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是最基本的。

· 可扩展

为了满足新的需求,协议不断变化。所以在控制客户端和服务器的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增添新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)。

· 支持互操作

对于某些系统来说,希望能支持以不同语言写的客户端与服务器交互,所以需要设计需要一种特定的格式来满足这一需求。

表面看来,序列化框架对选择用于数据持久存储的数据格式应该会有不同的要求。毕竟,RPC的存活时间不到1秒钟,永久存储的数据却可能在写到磁盘若干年后才会被读取。这么看来,对数据永久存储而言,RPC序列化格式的4大理想属性非常重要。我们希望存储格式比较紧凑(进而高效使用存储空间)、快速(以读/写数据的额外开销比较小)、可扩展(以可以透明地读取老格式的数据)且可以互操作(以可以使用不同的语言读/写永久存储的数据)。

Hadoop使用的是自己的序列化格式Writable,它绝对紧凑、速度快,但不太容易用Java以外的语言进行扩展或使用。因为WritableHadoop的核心(大多数MapReduce程序都会为键和值使用它),所以在接下来的三个小节中,我们要进行深入探讨,然后再从总体上看看序列化框架和Avro,后者是一个克服了Writable部分不足的序列化系统。

4.3.1. Writable 接口

Writable接口定义了两个方法:一个将其状态写到DataOutput二进制流, 另一个从Datalnput二进制流读取状态:

package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.Datalnput;
import java.io.IOException;
public interface Writable {
    void write(DataOutput out) throws IOException;
    void readFields(Datalnput in) throws IOException;
}

让我们通过一个特殊的Writable类来看看它的具体用途。我们将使用 IntWritable来封装Java int类型。我们可以新建一个对象并使用set()方法来设置它的值:

IntWritable writable = new IntWritable();
writable.set(163);

也可以通过使用一个整数值作为输入参数的构造函数来新建一个对象:

IntWritable writable = new IntWritable(163);

为了检査 IntWritable 的序列化形式,我们在 java.io.DataOutputStream (java.io. DataOutput的一个实现)中加入一个帮助函数来封装java .io.ByteArrayOutputSteam,以便在序列化流中捕捉字节:

public static byte[] serialize(Writable writable) throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataOutputStream dataOut = new DataOutputStream(out);
    writable.write(dataOut);
    dataOut.close();
    return out.toByteArray();
}

一个整数占用4个字节(因为我们使用JUnit4进行声明):

byte[] bytes = serialize(writable);
assertThat(bytes.length, is(4));

每个字节是按照大端顺序写入的(按照java.io.DataOutput接口中的声明,最重要的字节先写到流),并且通过HadoopStringUtils,我们可以看到这些字节的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们试试反序列化。我们再次新建一个辅助方法,从一个字节数组中读取一个Writable对象:

public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException {
    ByteArrayInputStream in = new ByteArraylnputStream(bytes);
    DatalnputStream dataln = new DatalnputStream(in);
    writable.readFields(dataIn);
    dataln.close();
    return bytes;
}

我们构建了一个新的、空值的IntWritable对象,然后调用 deserialize()方法从我们刚写的输出数据中读取数据。最后,我们看到该值(通过get()方法获取)是原始的数值163:

IntWritable newWritable = new IntWritable();
deserialize(newWritable, bytes);
assertThat(newWritable.get(), is(163));

WritableComparable接口和comparator

IntWritable实现原始的WritableComparable接口,该接口继承自 Writable 和 java.lang.Comparable 接口:

package org.apache.hadoop.io;
    public interface WritableComparable<T> extends Writable, Comparable<T> {
}

MapReduce来说,类型比较非常重要,因为中间有个基于键的排序阶段。Hadoop提供的一个优化接口是继承自]ava Comparator的RawComparator接口:

package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator<T> extends Comparator<T> {
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

该接口允许其实现直接比较数据流中的记录,无须先把数据流反序列化为对象,这样便避免了新建对象的额外开销。例如,我们根据IntWritable接口实现的comparator实现原始的compare()方法,该方法可以从每个字节数组b1b2中读取给定起始位置(s1s2)以及长度(l1和l2)的一个整数进而直接进行比较。

WritableComparator 是对继承自 WritableComparable 类的RawComparator类的一个通用实现。它提供两个主要功能。第一,它提供了对原始compare()方法的一个默认实现,该方法能够反序列化将在流中进行比较的对象,并调用对象的compare()方法。第二,它充当的是RawComparator实例的工厂(已注册Writable的实现)。例如,为了获得IntHratablecomparator,我们直接如下调用:

RawCompanator<IntWritable> comparator = WritableComparator.get (IntWritable.class);

这个comparator可以用于比较两个IntWritable对象:

IntWritable w1= new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(wl, w2), greaterThan(0));

或其序列化表示:

byte[] b1 = serialize(wl);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(bl, 0, bl.length, b2, 0, b2.length), greaterThan(0));

4.3.2. Writable

Hadoop自带的org.apache.hadoop.io包中有广泛的Writable类可供选

择。它们形成如图4-1所示的层次结构。

 blob.png

4-1. Writable类的层次结构

Java基本类型的Writable封装器

Writable类对Java基本类型(参见表4-7)提供封装,char除外(可以存储在IntWritable中)。所有的封装包含get()和set()两个方法用于读取或存储封装的值。

4-7.java基本类型的Writable类

Java基本类型

Writable 实现

序列化大小(字节}

boolean

BooleanWritable

1

byte

ByteWritable

1

Short

ShortWritable

2

int

IntWritable

4

VintWritable

1~5

float

FloatWritable

4

long

LongWritable

8

VlongWritable

1〜9

double

DoubleWritable

8

对整数进行编码时,有两种选择,即定长格式(IntWritale和LongWritable)和变长格式(VIntWritable  VLongWritable)。需要编码的数值如果相当小(在-127和127之间,包括-127和127),变长格式就是只用一个字节进行编码;否则,使用第一个字节来表示数值的正负和后跟多少个字节。例如,值163需要两个字节:

byte[] data = serialize(new VIntWritable(163));
assertThat(StringUtils.byteToHexString(data), is("8fa3"));

如何在定长格式和变长格式之间进行选择呢?定长格式非常适合对整个值域空间中分布非常均勻的数值进行编码,如精心设计的哈希函数。大多数数值变量的分布都不均匀,而且变长格式一般更节省空间。变长编码的另一个优点是可以在VIntWritable和VLongWritable转换,因为它们的编码实际上是一致的。所以选择变长格式之后,便有增长的空间,不必一开始就用8字节的long表示。

Text类型

Text是针对UTF-8序列的Writable类。一般可以认为它等价于java.lang.String Writable。Text 替代了 UTF8 类,但这并不是一个很好的替代,一因为不支持对字节数超过32767的字符串进行编码,二者因为它使用的是JavaUTF-8修订版。

Text类使用整型(通过边长编码的方式)来存储字符串编码中所需的字节数,因此该最大值为2 GB。另外,Text使用标准UTF-8编码,这使得能够更简便地与其他理解UTF-8编码的工具进行交互操作。

索引 由于着重使用标准的UTF-8编码,因此Text类和Java String类之间存在一定的差别。对Text类的索引是根据编码后字节序列中的位置实现的,并非字符串中的Unicode字符,也不是Java char的编码单元(String)。对于ASCII字符串,这三个索引位置的概念是一致的。charAt() 方法的用法如下例所示:

Text t = new Text("hadoop");
assertThat(t.getLength(), is(6));
assertThat(t.getBytes().length, is(6));
assertThat(t.charAt(2), is((int) 'd'));
assertThat("Out of bounds", t.charAt(100), is(-l));

注意charAt()方法返回的是一个表示Unicode编码位置的int类型值,而 String返回一个char类型值。Text还有一个find()方法,该方法类似 于 String  indexOf()方法:

Text t = new Text("hadoop");
assertThat("Find a substring", t.find("do"), is(2));
assertThat("Finds first 'o'", t.find("o"), is(3));
assertThat("Finds 'o' from position 4 or later", t.find("o", 4), is(4));
assertThat("No match", t.find("pig"), is(-1));

Unicode  —旦开始使用需要多个字节来编码的字符时,TextString之间的区别就昭然若揭了。考虑表4-8显示的Unicode字符。

所有字符(除了表中最后一个字符U+10400),都可以使用单个Java char类型来表示。U+10400是一个候补字符,并且需要两个Java char来表示,称为"字符代理对"(surrogate pair)。范例4-5中的测试显示了处理一个字符串(表4-8中的由4个字符组成的字符串)时StringText之间的差别。

 4-8. Unicode 字符

Unicode

编码点

U+0041

U+00DF

U+6771

U+10400

名称

拉丁大写字母A

拉丁小写字母 SHARP S

无(统一表示 的汉字)

DESERET CAPITAL LETTER LONG I

UTF-8

41

c39f

e69dbl

F0909080

编码单元

Java表示

\u0041

\u00DF

\u6771

\uuD801\uDC00

 

范例4-5.验证String和Text的差异性的测试

public class StringTextComparisonTest {
    @Test
    public void string() throws UnsupportedEncodingException {
        String s = "\u0041\u00DF\u6771\uD801\uDC00H; assertThat(s.length(),is(5));
        assertThat(s.getBytes("UTF-8").length, is(10));
        assertThat(s.indexOf("\u0041"), is(0));
        assertThat(s.indexOf("\u00DF"), is(l));
        assertThat(s.index0f("\u6771"), is(2));
        assertThat(s.indexOf("\uD801\uDC00"), is(3));
        assertThat(s.charAt(0), is("\u0041"));
        assertThat(s.charAt(l), is("\u00DF"));
        assertThat(s.charAt(2), is("\u6771"));
        assertThat(s.charAt(3), is("\uD801"));
        assertThat(s.charAt(4), is("\uDC00"));
        assertThat(s.codePointAt(0),is(0x0041));
        assertThat(s.codePointAt(1),is(0x00DF));
        assertThat(s.codePointAt(2),is(0x6771));
        assertThat(s.codePointAt(3),is(0x10400));
    }
    @Test
    public void text() {
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
        assertThat(t.getLength(), is(10));
        assertThat(t.find("\u0041"), is(0));
        assertThat(t.find("\u00DF"), is(1));
        assertThat(t.find("\u6771"), is(3));
        assertThat(t.find("\uD801\uDC00"), is(6));
        assertThat(t.charAt(0),is(0x0041));
        assertThat(t.charAt(l),is(0x00DF));
        assertThat(t.charAt(3),is(0x6771));
        assertThat(t.charAt(6),is(0x10400));
    }
}

这个测试证实String的长度是其所含char编码单元的个数(5,由该字符串的前三个字符和最后的一个代理对组成),但Text对象的长度却是其UTF-8编码的字节数(10=1+2+3+4)。相似的,String类的indexOf()方法返回char编码单元中的索引位置,Text类的find()方法则返回字节偏移量。

当代理对不能代表整个Unicode字符时,String类中的charAt()方法会根据指定的索引位置返回char编码单元。根据char编码单元索引位置,需要codePointAt()方法来获取表示成int类型的单个Unicode字符。事实上,Text类中的charAt()方法与String中的codePointAt()更加相似(相较名称而言)。唯一的区别是通过字节的偏移量进行索引。

迭代 利用字节偏移量实现的位置索引,对Text类中的Unicode字符进行迭代是非常复杂的,因为无法通过简单地增加索引值来实现该迭代。同时迭代的语法有些模糊(参见范例4-6):将Text对象转换为java.nio.ByteBuffer对象,然后利用缓冲区对Text对象反复调用bytesToCodePoint()静态方法。该方法能够获取下一代码的位置,并返回相应的int值,最后更新缓冲区中的位置。当bytesToCodePoint()返回-1 时,则检测到字符串的末尾。

范例4-6.遍历Text对象中的字符

public class Textlterator {
    public static void main(String[] args) {
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
        ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength());
        int cp;
        while(buf.hasRemaining() && (cp = Text.bytesToCodePoint(buf))!=-l){
            System.out.println(Integer.toHexString(cp));
        }
    }
}

运行这个程序,打印出字符串中四个字符的编码点(code point):

% hadoop Textlterator

41

df

6771

10400

可变性 String相比,Text的另一个区别在于它是可变的(与所有 Hadoop的Writable接口实现相似,NullWritable除外,它是单实例对象)。可以通过调用其中一个set()方法来重用Text实例。例如:

Text t = new Text("hadoop");
t.set("pig");
assertThat(t.getLength(), is(3));
assertThat(t.getBytes().length, is(3));

在某些情况下,getBytes()方法返回的字节数组可能比getLength()函数返回的长度更长:

Text t = new Text("hadoop");
t.set(new Text("pig"));
assertThat(t.getLength(), is(3));
assertThat("Byte length not shortened", t.getBytes().length, is(6));

以上代码说明了为什么在调用getBytes()之前始终要调用getLength()方法,因为可以由此知道字节数组中多少字符是有效的。

String重新排序 Text类并不像java. lang.String类那样有丰富的字符串操作API。所以,在多数情况下需要将Text对象转换成String 象。这一转换通常通过调用toString()方法来实现:

assertThat(new Text("hadoop").toString(), is("hadoop"));

BytesWritable

BytesWritable是对二进制数据数组的封装。它的序列化格式为一个指定所含数据字节数的整数域(4字节),后跟数据内容本身。例如,长度为2的字节数组包含数值3和5,序列化形式为一个4字节的整数(00000002)和该数组中的两个字节(03和05):

BytesWritable b = new BytesWritable(new byte[] { 3, 5 });
byte[] bytes = serialize(b);
assertThat(StringUtils.byteToHexString(bytes), is("000000020305"));

BytesWritable是可变的,其值可以通过set()方法进行修改。和Text 相似,BytesWritable类的getBytes()方法返回的字节数组长度一一容量——可能无法体现BytesWitable所存储数据的实际大小。可以通过getLength()方法来确定BytesWritable的大小。示例如下:

b.setCapacity(11);
assertThat(b.getLength(), is(2));
assertThat(b.getBytes().length, is(11));

NullWritable

NullWritableWritable的特殊类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符例如,在MapReduce中,如果不需要使用键或值的序列化地址,就可以将键或值声明为NullWritable,结果是高效的存储常量空值。如果希望存储一系列数值,与键/值对相对,NullWritable也可以用作在SequenceFile中的键。它是一个不可变的单实例类型,通过调用NullWritable.get()方法可以获取这个实例。

ObjectWritable GenericWritable

ObjectWritable是对Java 基本类型(String,enum,Writable,null或这些类型组成的数组)的一个通用封装。它在Hadoop RPC中用于对方法的参数和返回类型进行封装和解封装。

当一个字段中包含多个类型时,ObjectWritable非常有用:例如,如果 SequenceFile中的值包含多个类型,就可以将值类型声明为ObjectWritable,并将每个类型封装在一个ObjectWritable中。作为一个通用的机制,每次序列化都写封装类型的名称,这非常浪费空间。如果封装的类型数量比较少并且能够提前知道,那么可以通过使用静态类型的数组,并使用对序列化后的类型的引用加入位置索引来提高性能。GenericWritable取的就是这种方式,所以你得在继承的子类中指定支持什么类型。

Writable集合类

org.apache.hadoop.io 软件包中一共有 6 个 Writable 集合类:ArrayWritable , ArnayPrimitiveWritable , TwoDArrayWritable , MapWritable SortedMapWritable  EnumMapWritable

ArrayWritable  TwoDArrayWritable 是对 Writable 的数组和两维数组(数组的数组)的实现。ArrayWritableTwoDArrayWritable中所有元素必须是同一类的实例(在构造函数中指定),如下所示:

ArrayWritable writable = new ArrayWritable(Text.class);

如果Writable根据类型来定义,例如SequenceFile的键或值,或更多时候作为MapReduce的输入,则需要继承ArrayWritable(或相应的TwoDArray Writable)并设置静态类型。示例如下:

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }
}

ArrayWritable  TwoDArrayWritable都有get()、set()和 toArray() 方法。toArray()方法用于新建该数组(或二维数组)的一个“浅拷贝”(shallow copy)。

ArrayPrimitiveWritable是对Java基本数组类型的一个封装。调用 set()方法时,可以识别相应组件类型,因此无需通过继承该类来设置类型。

MapWritable  SortedMapWritable 分别实现了java.util.Map<WritableWritable>和java.util.SortedMap<WritableComparable,Writable>每个键/值字段使用的类型是相应字段序列化形式的一部分。类型存储为单个字节(充当类型数组的索引)。在org.apache.hadoop.io包中,数组经常与标准类型结合使用,而定制的Writable类型也通常结合使用,但对于非标准类型,则需要在包头指明所使用的数组类型。根据实现,MapWritable类和SortedMapWritable类通过正byte值来指示定制的类型,所以在MapWritable和SortedMapWritable实例中最多可以使用 127个不同的非标准Wirtable类。下面显示使用了不同键和值类型的MapWritable实例:

MapWritable src = new MapWritable();
src.put(new IntWritable(1), new Text("cat"));
src.put(new VIntWritable(2), new LongWritable(163));
MapWritable dest = new MapWritable();
WritableUtils.cloneInto(dest, src);
assertThat((Text) dest.get(new IntWritable(1)), is(new Text("cat")));
assertThat((LongWritable)dest.get(new VIntWritable(2)), is(new LongWritable(163)));

显然,可以通过Writable合类来实现集合和列表。可以使用MapWritable类型(或针对排序集合,使用SortedMapWritable类型)来枚举集合中的元素,用NullWritable类型枚举值。对集合的枚举类型可采用EnumSetWritable。对于单类型的 Writable 列表,使用 ArrayWritable就足够了,但如果需要把不同的Writable类型存储在单个列表中,可以用GenericWritable将元素封装在一个ArrayWritable 中。另一个可选方案是,可以借鉴MapWritable的思路写一个通用的 ListWritable

 

4.3.3. 实现定制的Writable集合

Hadoop有一套非常有用的Writable实现可以满足大部分需求,但在些情况下,我们需要根据自己的需求构造一个新的实现。有了定制的Writable类型,就可以完全控制二进制表示和排序顺序。由于Writable 是MapReduce数据路径的核心,所以调整二进制表示能对性能产生显著效果。虽然Hadoop自带的Writable实现已经过很好的性能调优,但如果希望将结构调整得更好,更好的做法往往是新建一个Writable类型(而不是组合打包的类型)。

为了演示如何新建一个定制的Writable,我们写一个表示一对字符串的实现,名为TextPair。范例4-7显示了最基本的实现。

范例4-7.存储一对Text对象的Writable

import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    }
    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }
    public TextPair(Text first, Text second) {
        set(first, second);
    }
    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }
    public Text getFirst() {
        return first;
    }
    public Text getSecond() {
        return second;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }
    @Override
    public void readFields(Datalnput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }
    @Override
    public int hashCode() {
        return first.hashCode()*163 + second.hashCode();
    }
    @Override
    public boolean equals(Object o) {
        if (o instanceof TextPair) {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }
    @Override
    public String toString(){
        return first + "\t" + second;
    }
    @Override
    public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

这个定制Writable实现的第一部分非常直观:包括两个Text实例变量(first和second)和相关的构造函数,以及setter和getter(即设置函数和提取函数)。所有Writable实现都必须有一个默认的构造函数以便MapReduce框架可以对它们进行实例化,然后再调用readFields()函数査看(填充)各个字段的值。Writable实例是可变的并且通常可以重用,所以应该尽量避免在write()或readFields()方法中分配对象。

通过让Text对象自我表示,TextPair类的write()方法依次将每个Text对象序列化到输出流中。类似的,通过每个Text对象的表示,readFields()方法对来自输入流的字节进行反序列化。DataOutput Datalnput接口有一套丰富的方法可以用于对Java基本类型进行序列化和反序列化,所以,在通常情况下,你可以完全控制Writable对象在线上传输/交换(的数据)的格式(数据传输格式)。

就像针对Java语言构造的任何值对象那样,需要重写java. lang.Object 中的 hashCode()、equals()和 toString()方法HashPartitioner (MapReduce中的默认分区类)通常用hashCode()方法来选择reduce分区,所以应该确保有一个比较好的哈希函数来保证每个reduce分区的大小相似。

即便计划结合使用TextOutputFormat和定制的Writable,也得自己动手实现toString()方法。TextOutputFormat对键和值调用toString()方法,将键和值转换为相应的输出表示。针对TextPair,我们将原始的Text对象作为字符串写到输出,各个字符串之间用制表符来分隔。

TextPair  WritableComparable 的一个实现,所以它提供了 compareTo() 方法,该方法可以强制数据排序:先按照第一个字符排序,如果第一个字 符相同则按照第二个字符排序。注意,前一小节中已经提到TextPair不同于TextArrayWritable(可存储的Text对象数组除外),因为 TextArrayWritable 只继承了 Writable,并没有继承 WritableComparable

3.3.1. 为速度实现一个RawComparator

范例4-7中的TextPair代码可以按照其描述的基本方式运行,但我们也可以进一步优化。当TextPair被用作MapReduce中的键时,需要将数据流反序列化为对象,然后再调用compareTo()方法进行比较。那么有没有可能看看它们的序列化表示就可以比较两个TextPair 对象呢?

事实证明,我们可以这样做,因为TextPair是两个Text对象连接而成的,而Text对象的二进制表示是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传给 Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。详细过程参见范例4-8(注意,这段代码已嵌入TextPair类)。

范例4-8.用于比较TextPair字节表示的RawComparator

public static class Comparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator());
    public Comparator() {
        super(TextPair.class);
    }
    @Override
    public int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
            int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
            if (cmp != 0) {
                return cmp;
            }
            return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,b2, s2 + firstL2, l2 - firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }
}
static {
    WritableComparator.define(TextPair.class, new Comparator()));
}

事实上,我们采取的做法是继承WritableComparable类,而非实现RawComparator接口,因为它提供了一些比较好用的方法和默认实现。这段代码最本质的部分是计算firstL1firstL2,这两个参数表示每个字节流中第一个Text字段的长度。两者分别由变长整数的长度(由WritableUtils  decodeVIntSize()方法返回)和编码值(在readVInt()方法返回)组成。

定制的comparator

TextPair可以看出,编写原始的comparator需要谨慎,因为必须要处理字节级别的细节。如果真的需要自己编写comparator,有必要参考org.apache.hadoop.io包中对Writable接口的实现。WritableUtils提供的方法也比较好用。

如果可能,定制的comparator也应该继承自RawComparator。这些comparator定义的排列顺序不同于默认comparator定义的自然排列顺序。范例4-9显示了一个针对TextPair类型的comparator ,称为FirstCompartator,它只考虑TextPair对象的第一个字符串。注意,我们重载了针对该类对象的compare()方法,使两个compare()方法有相同的语法。

范例4-9.定制的RawComparator用于比较TextPair对象字节表示的第一个字段

public static class FirstComparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    public FirstComparator() {  super(TextPair.class); }
    @Override
    public int compare(byte[] bl, int si, int 11,byte[] b2, int s2, int 12) {
        try {
            int firstL1= Writablelltils.decodeVIntSize(b1[s1]) + readVInt (b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt (b2, s2);
            return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        if (a instanceof TextPair && b instanceof TextPair) {
            return ((TextPair) a).first.compareTo(((TextPair) b).first);
        }
            return super.compare(a, b);
        }
}

 

4.3 序列化框架

尽管大多数MapReduce程序使用的都是Writable类型的键和值,但这并不是MapReduce API强制使用的。事实上,可以使用任何类型,只要能有一种机制对每个类型进行类型与二进制表示的来回转换。

为了支持这一机制,Hadoop有一个针对可替换序列化框架(serialization framework)的API。序列化框架用一个Serialization实现(包含在org.apache.hadoop.io.serializer)来表示。例如,WritableSerialization类是对Writable类型的Serialization实现。

Serialization对象定义了从类型到serializer实例(将对象转换为字节流)和Deserializer实例(将字节流转换为对象)的映射方式。

io.serizalizations属性设置为一个由逗号分隔的类名列表,即可注册Serialization实现。它的默认值包括org.apache.hadoop. io.serializer.WritableSerialization  Avro 指定序列化和自反序列化类,这意味着只有Writable对象和Avro对象才可以在外部序列化和反序列化。

Hadoop 包含一个名为 JavaSerialization 的类,该类使用 Java Object Serialization。尽管它方便了我们在MapReduce程序中使用标准的Java类型,如IntegerString,但不如Writable高效,所以不建议使用(参见以下内容)。

为什么不用 Java Object Serialization?

Java有自己的序列化机制,称为“Java Object Serialization”(通常简称 为“Java Serialization”),该机制与编程语言紧密相关,所以我们很自然会问为什么不在Hadoop中使用该机制针对这个问题,Doug Cutting 是这样解释的:

“为什么开始设计Hadoop的时候我不用Java Serialization?因为它看起来太复杂,而我认为需要有一个至精至简的机制,可以用于精确控制对象的读和写,这个机制将是Hadoop的核心。你用Java Serialization虽然可以获得一些控制权,但用起来非常纠结。

不用RMI也出于类似的考虑高效、高性能的进程间通信是Hadoop的关键。我觉得我们需要精确控制连接、延迟和缓冲的处理方式,RMI对此无能为力。

问题在于Java Serialization不满足先前列出的序列化格式标准:精简、 快速、可扩展、支持互操作

Java Serialization 并不精简。实例化 java.io.Serializable 或者java.io.Externalizable的类需要将其类名和对象表示写入序列化后的数据流中。同一个类后续的实例只引用第一次出现的句柄,这占5个字节。引用句柄不太适用于随机访问,因为被引用的类可能出现在先前数据流的任何位置,也就是说,需要在数据流中存储状态。更糟糕的是句柄引用会对序列化数据流中的排序记录造成巨大的破坏,因为一个特定类的第一个记录是不同的,必须当作特殊情况区别对待。

不把类名写到数据流可以避免所有这些问题,Writable接口采取的正是这种做法。这需要假设客户端知道会收到什么类型。其结果是,这个格式比Java Serialization更精简,同时又支持随机存取和排序,因为流中的每一条记录均独立于其他记录(所以数据流是无状态的)。

Java Serialization是序列化图对象的通用机制,所以它有序列化和反序列化开销。更有甚者,它有一些从数据流中反序列化对象时,反序列化程序需要为每个对象新建一个实例。另一方面,Writable对象可以(并且通常)重用。例如,对于MapReduce作业(主要对只有几个类型的几十亿个对象进行序列化和反序列化),不需要为新建对象分配空间而得到的存储节省是非常可观的。

至于可扩展性,Java Serialization支持演化类型,但是难以使用。(不支持Writable类型,得靠程序员自行搞定。)

从原则上讲,其他编程语言能够理解Java Serialization流协议(Java ObjectSerialization定义),但事实上,其他语言的实现并不多,所以只有Java实现。对此,Writable的情况也不例外。

序列化IDL

还有许多其他序列化框架从不同的角度来解决该问题:不通过代码来定义类型,而是使用“接口定义语言”(Interface Description Language,IDL)以不依赖于具体语言的方式进行声明。由此,系统能够为其他语言生成类型,这种形式能有效提髙互操作能力。它们一般还会定义版本控制方案(使类型的演化直观易懂)。

Hadoop 自己的 Record I/O(可以在 org.apache.hadoop.record 包中找到)有一个IDL(已编译到Writable对象中,有利于生成与MapReduce兼容的类型)。但是,不知何故,Record I/O并未得到广泛应用,而且Avro也不支持它。

Apache Thrift(http://thrift.apache.org/)和 Google  Protocol Buffers (https://developers.google.com/protocol-buffers/)是两个比较流行的序列化框架,但它们常用作二进制数据的永久存储格式。MapReduce格式对该类的支持有限,但在Hadoop内部,部分组件仍使用上述两个序列化框架来实现RPC和数据交换。

在下一小节,我们深入探讨Avro,这是一个基于IDL的序列化框架,非常适用于Hadoop的大规模数据处理。

4.4Avro

Apache Avro(http://avro.apache.org/)是一个独立于编程语言的数据序列化系统。该项目由Doug Cutting(Hadoop之父)创建,旨在解决Hadoop Writable类型的不足:缺乏语言的可移植性。拥有一个可被多种语言(当前是CC++、C#、JavaPHPPythonRuby)处理的数据格式与绑定到单一语言的数据格式相比,前者更易于与公众共享数据集。Avro同时也更具生命力,该语言将使得数据具有更长的生命周期,即使原先用于读/写该数据的语言已经不再使用。

但为什么要有一个新的数据序列化系统?与Apache Thrift和Google的Protocol Buffers相比,Avro有其独有的特性。与前述系统及其他系统相似,Avro数据是用语言无关的模式定义的。但与其他系统不同的是,在Avro中,代码生成是可选的,这意味着你可以对遵循指定模式的数据进行读/写操作,即使在此之前代码从来没有见过这个特殊的数据模式。为此,Avro假设数据模式总是存在的(在读/写数据时)形成的是非常精简的编码,因为编码后的数值不需要用字段标识符来打标签。

Avro模式通常用JSON来写,数据通常采用二进制格式来编码,但也有其他选择。还有一种高级语言称为Avro IDL,可以使用开发人员更熟悉的类 C语言来写模式。还有一个基于JSON的数据编码方式(对构建原型和调试Avro数据很有用,因为它是我们人类可读的)。

Avro 规范(http://avro.apache.org/docs/current/spec.html)精确定义所有实现都必须支持的二进制格式。同时它还指定这些实现还需要支持的其他Avro特性。但是,该规范并没有给API制定规范:实现可以根据自己的需求操作Avro数据并给出相应的API,因为每个API都与语言相关。事实上,只有一种二进制格式比较重要,这表明绑定一种新的编程语言来实现是比较容易的,可以避免语言和格式组合爆炸问题(否则将对互操作性造成一定的问题)。

Avro有丰富的模式解析(schema resolution)能力。在精心定义的约束条件下,读数据所用的模式不必与写数据所用的模式相同。由此Avro是支持模式演化的。例如,如果有一个新的、可选择的字段要加入一个记录中,那么需要在用于读取老数据的模式中声明它即可。新客户端和以前的客户端非常相似,均能读取按旧模式存储的数据,同时新的客户端可以使用新字段写入的新内容。相反,如果老客户端读取新客户端写入的数据,会忽略新加入的字段并按照先前的数据模式处理。

Avro为一系列对象指定了一个对象容器格式——类似于Hadoop的顺序文件。 Avro数据文件包含元数据项(模式数据存储在其中),使此文件可以自我声明。Avro数据文件支持压缩,并且是可切分的,这对MapReduce的输入格式至关重要。另外,Avro本身是为MapReduce设计的,所以在不久的将来可能使Avro用于顶层的MapReduce API(即,比Streaming更丰富的API,就像Java API或C++管道一样)像使用其他编程语言一样使用。

Avro还可以用于RPC,但这里不进行详细说明。详情参见规范文档。

4.4.1. Avro数据类型和模式

Avro定义了少量数据基本类型,它们可用于以写模式的方式来构建应用特定的数据结构。考虑到互操作性,其实现必须支持所有的Avro类型。

4-9列举了 Avro的基本类型。每个基本类型还可以使用更冗长的形式和使用type属性来指定,示例如下:

{"type": "null"}

4-9. Avro基本类型

类型

描述

模式示例

null

空值

"null"

boolean

二进制值

"boolean"

int

32位带符号整数

"int"

long

64位带符号整数

"long"

float

单精度(32位)IEEE 754浮点数

"float"

double

双精度(64位)IEEE 754浮点数

"double"

bytes

8位无符号字节序列

"bytes"

string

Unicode字符序列

"string"

4-10列举了 Avro的复杂类型,并为每个类型给出模式示例。

4-10. Avro复杂类型

类型

描述

模式示例

array

一个排过序的对象集合。特定数组中的所有对象必须模式相同

{

"type": "array",

"items" : "long"

}

map

未排过序的键/值对。键必须是字符串,值可以是任何类型,但一个特定map中所有值必须模式相同

{

"type": "map",

"values" : "string"

}

record

一个任意类型的命名字段集合

{

"type":"record",

"name":"WeatherRecord",

"doc":"A weather reading.",

"fields":[

{"name": "year", "type": "int"},

{"name": "temperature", "type": "int"},

{"name".: "stationld", "type": "string"}

]

}

enum

一个命名的值集合

{

"type": "enum",

"name": "Cutlery",

"doc": "An eating utensil.",

"symbols": ["KNIFE", "FORK”, "SPOON"]

}

fixed

一组固定数量的8位无符号字节

{

"type": "fixed",

"name" : "MdSHash",

"size": 16

}

union

模式的并集。并集可用JSON数组表示,其中每个元素为一个模式。并集表示的数据必须与其其中一个模式相匹配

[

"null",

"string",

{"type": "map", "values":"string"}

]

每个Avro语言API都包含该语言特定的Avro类型表示。例如Avrodouble类型可以用CC++和Java语言的double类型,Pythonfloat类型以及Ruby的Float类型来表示。

而且,一种语言可能有多种表示或映射。所有的语言都支持动态映射,即 使运行前并不知道具体模式,也可以使用动态映射。对此,Java称为"通用"(generic)映射。

另外,JavaC++实现可以自动生成代码来表示符合某种Avro模式的数据。代码生成(code generationJava中称为“特殊映射”)能优化数据处理,如果读/写数据之前就有一个模式备份。那么,为用户代码生成的类和为通用代码生成的类相比,前者提供的API更贴近问题域。

Java拥有第三类映射,即自反映射(reflect mapping,将Avro类型映射到已有的Java类型)。它的速度比通用映射和特殊映射都慢,所以不推荐在新应用中使用。

4-11列举了 Java的类型映射。如表所示,特殊映射和通用映射相同,除非有特别说明(同样的,自反映射与特殊映射也相同,除非特别说明)。特殊映射与通用映射仅在record, enum和fixed三个类型方面有区别,其他所有类型均有自动生成的类(类名由name属性和可选的namesapce属性决定)。

4-11. Avro的Java类型映射

Avro类型

Java通用映射

Java特殊映射

Java自反映射

null

null类型

boolean

boolean

int

int

short或int

long

long

float

float

bytes

java.nio.bytebuffer

字节数组

string

org.apache.avro.util.utf8

java.lang.String

array

org.apache.avro.generic.GenericArray

数组或java.util.Collection

map

java.util.map

record

org.apache.avro.generic.genenicrecord

生成实现org.apache.avro.specific.Specific Record类的实现

具有零参数构造函数的任意用户类。继承了所有不传递的实例字段

enum

java.lang.string

生成]ava enum类型

任意]ava enum类型

fixed

org.apache.avro.generic.genericfixed

生成实现org.apache.avro.pecific.SpecificFixed的

org.apache.avro.generic.genericFixed

union

java.lang.Object

Avro string既可以通过Java String表示,也可以通过Avro Utf8 Java类型表示。之所以使用Utf8表示比较高效,是因为:Avro Utf8类型是易变的,单个Utf8实例可以重用,并对一系列值进行读/写操作。另外Java String在新建对象时进行UTF-8解码,但 Avro执行Utf8解码更晚一些,某些情况下这样做可以提高系统性能。

Utf8实现了 Java 的:java.lang.CharSequence 接口,该接口可以 Java类库实现互操作。在其他情况下可能需要通过调用 toString()方法将Utf8实例转化成String对象。

Avro从1.6.0版本之后加入一个选项使Avro始终执行向String对 象的转换。有两个方法可以达到该目的。第一个方法是,将模式中的 avro. java, string属性设置成 String:

{ "type": "string", "avro.java.string": "String" }

另外一个方法是,对于一些特定的映射操作,你可以构建基于 String的get()和set()方法的类。如果使用Avro Maven插件,该功能可以通过将stringType属性设置成String来实现。

最后,请注意,Java自反映射始终使用了 Java的String对象,其 主要因素是Java的兼容性而非性能。

4.4.2内存中的序列化和反序列化

Avro为序列化和反序列化提供了API,如果想把Avro集成到现有系统(比如已定义帧格式的消息系统),这些API函数就很有用。其他情况,请考虑使用Avro的数据文件格式。

让我们写一个Java程序从数据流读/写Avro数据。首先以一个简单的Avro 模式为例,它用于表示以记录形式出现的一对字符串:

{

"type": "record",

"name": "StingPair",

"doc": "A pair of strings.",

"fields":[

{"name": "left", "type": "string"},

{"name": "right", "type": "string"}

]

}

如果这一模式存储在类路径下一个名为StringPair.avsc的文件中(.avsc是 Avro模式文件的常用扩展名),我们可以通过下列两行代码进行加载:

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(getClass().getResourceAsStream("StringPair.avsc"));

我们可以使用以下通用API新建一个Avro记录的实例:

GenericRecord datum = new GenericData.Record(schema);
datum.put("left", new Utf8("L"));
datum.put("right", new Utf8(",R"));

注意,我们为记录的String字段构造了一个Avro Utf 8实例。

接下来,我们将记录序列化到输出流中:

ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
writer.write(datum, encoder);
encoder.flush();
out.close();

这里有两个重要的对象:DaturnWriter和Encoder。DatumWriter对象将数据对象翻译成Encoder对象可以理解的类型,然后由后者写到输出流。这里,我们使用GenericDatumWriter对象,它将GenericRecord字段的值传递给Encoder对象。我们将null传给encoder工厂,因为我们这里没有重用先前构建的encoder。

在本例中,只有一个对象被写到输出流,但如果需要写若干个对象,我们可以调用write()方法,然后再关闭输入流。

需要向GenericDatumWriter对象传递模式,因为它要根据模式来确定将数据对象中的哪些数值写到输出流。在我们调用writer的write()方法后,刷新encoder,然后关闭输出流。

我们可以使用反向的处理过程从字节缓冲区中读回对象:

DatumReader<GenericRecord> reader = new GenericDatumReader <GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().createBinaryDecoder(out.toByteArray(),null);
GenericRecord result = reader.read(null, decoder);
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));

我们需要传递空值(null)并调用binaryDecoder()和read()方法,因为这里没有重用对象(分别是decoder或记录)。

result.get("left")和 result.get("right")的输出对象是 Utf8类型,因此我们需要通过调用toString()方法将它们转型为java String 类型。

特定API

让我们现在来看看使用特定API的等价代码。通过使用Avro的Maven插件编译模式,我们可以根据模式文件生成StringPair类。以下是与Maven Project Object Model(POM)相关的部分:

<project>
  ...
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>${avro.version}</version>
        <executions>
          <execution>
            <id>schemas</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
            </goals>
            <configuration>
              <includes>
                <include>StringPair.avsc</include>
              </includes>
              <sourceDirectory>src/main/resources</sourceDirectory>
              <outputDirectory>${project.build.directory}/generated-sources/java </outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  ...
</project>

作为Maven的替代,你可以使用Avro的Ant任务org.apache.avro.specific.SchemaTask或者Avro的命令行工具针对模式生成Java代码。

可以从http://avro.apache.org/releases.html下载avro的源文件和二进制文件。键入命令java -jar avro-tools-*.jar可以获得使用指南。

在序列化和反序列化的代码中,我们构建一个StringPair实例来替代GenericRecord对象(使用SpecificDatumWriter类将该对象写入数据流)并使用SpecificDatumReader类读回数据:

StringPair datum = new StringPair();
datum.left = "L";
datum.right = "R";
ByteArrayOutputStream out = new ByteArrayOutputStneam();
DatumWriter<StringPair> writer =new SpecificDatumWriter<StringPair>(StringPair.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(datum, encoder);
encoder.flush();
out.close();
DatumReader<StringPair> reader =new SpecificDatumReader<StringPair>(StringPair.class);
Decoder decoder = DecoderFactory.get().biriaryDecoder(out.toByteArray(), null);
StringPair result = reader.read(null, decoder);
assertThat(result.left.toString(), is("L"));
assertThat(result.right.toString(), is("R"));

Avro 1.6.0版本开始,生成的Java代码中包含有get()和set()方法,不用再写datum.setLeft("L")和 result.getLeft()方法。

4.4.3Avro数据文件

Avro的对象容器文件格式主要用于存储Avro对象序列。这与Hadoop顺序文件的设计非常相似。主要区别在于Avro数据文件主要是面向跨语言使用而设计的,所以,可以用Python写入文件,而用C语言来读取文件。

数据文件的头部分包含元数据,包括一个Avro模式和一个sync marker(同 步标识),紧接着是一系列包含序列化Avro对象的数据块(压缩可选)。数据块由sync marker来分隔,它对该文件而言,是唯一的(特定文件的标识信息存储在文件头部),并且允许在文件中搜索到任意位置之后通过块边界快速地重新进行同步。因此,Avro数据文件是可切分的,适合MapReduce快速处理。

Avro的对象写到数据文件与写到数据流类似。像以前一样,我们使用一个DatumWriter,但没有用Encoder,而是用DatumWriter来创建一个DataFileWriter实例。然后便可以新建一个数据文件(该文件一般有.avro扩展名)并向它附加新写入的对象:

File file = new File("data.avro");
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum);
dataFileWriter.close();

写入数据文件的对象必须遵循相应的文件模式,否则在调用append()方法时会抛出异常。

这个例子演示了如何将对象写到本地文件(前面代码段中的java.io.File), 但使用重载的DataFileWriter的create()方法,可以将数据对象写到任何一个java.io.OutputStream对象中。例如,通过对FileSystem对象调用create()方法可以返回OutputStream对象,进而将文件写到HDFS中。

从数据文件中读取对象与前面例子中在内存数据流中读取数据类似,只有一个重要的区别:我们不需要指定模式,因为可以从文件元数据中读取它。事实上,还可以对DataFileReader实例调用getSchema()方法来获取该模式,并验证该模式是否和原始写入对象的模式相同:

DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, reader);
assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));

DataFileReader对象是一个常规的Java迭代器,由此我们可以调用其hashNext()和next()方法来迭代其数据对象。下面的代码检査是否只有一条记录以及该记录是否有期望的字段值:

assertThat(dataFileReader.hasNext(), is(true));
GenericRecord result = dataFileReader.next();
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
assertThat(dataFileReader.hasNext(), is(false));

但是,更适合的做法是使用重载并将返回对象实例作为输入参数(该例中,

GenericRecord对象),而非直接使用next()方法,因为这样会可以实现对象,减少对象分配和垃圾回收所产生的开销,特别是文件中包含有很多对象时。代码如下所示:

GenericRecord record = null;
while (dataFileReader.hasNext()) {
    record = dataFileReader.next(record);
    // process record
}

如果对象重用不是那么重要,则可以使用如下更简短的形式:

for (GenericRecord record : dataFileReader) {
    // process record
}

如果是一般的从Hadoop文件系统中读取文件,可以使用Avro的Fslnput对象来指定使用Hadoop Path对象作为输入对象。事实上,DataFileReader对象提供对Avro数据文件的随机访问(通过seek()和sync()方法)。但在大多数情况下,如果顺序访问数据流足够了,则使用DataFileStream对象。DataFileStream对象可以从任意Java InputStream对象中读取数据。

4.4.4互操作性

为了说明Avro的语言互操作性,让我们试着用一种语言(Python)来写入数据文件,用另一种语言来读取这个文件。

关于 PythonAPI

范例4-10中的程序从标准输入中读取由逗号分隔的字符串,并将它们以StringPair记录的方式写入Avro数据文件。与写数据文件的Java代码类 似,我们新建了一个DaturnWriter对象和一个DataFileWriter对象,注 意,我们在代码中嵌入了 Avro模式,尽管没有该模式,我们仍然可以从文件中正确读取数据。

Python以目录形式表示Avro记录,从标准输入中读取的每一行都转换为dict对象并被附加到DataFileWriter对象末尾。

范例4-10.这个Python程序将Avro的成对形式的记录写入一个数据文件

import os
import string
import sys
from avro import schema
from avro import io
from avro import datafile
if __name__ == "__main__":
    if len(sys.angv) != 2:
        sys.exit('Usage: %s <data_file>' % sys.argv[0])
    avro_file = sys.argv[1]
    writer = open(avro_file, 'wb')
    datum_writer = io.DatumWriter()
    schema_object = schema.parse("\
    { "type": "record",
    "name": "Pair",
    "doc": "StringPair",
    "fields":[
        {"name": "left", "type": "strijig"},
        {"name": "right", "type": "string")
    }")
    dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
    for line in sys.stdin.readlines():
        (left,right) = string.split(line.strip(),',')
        dfw.append({'left':left, 'right':right});
    dfw.close()

在运行该程序之前,我们需要为Python安装Avro:

% easy_install avro

为了行该程序,我们指定文件名(pairs.avro,输出结果会被写到这个文件)和通过标准输入发送输入的成对记录,结束文件输入时键入CtrlD:

% python avro/src/main/py/write_pairs.py pairs.avro
a,1
c,2
b,3
b,2
^D

关于C API

下面转向C API,写程序显示pairs.avro文件的内容,如范例4-11所示。一般情况下,Avro工具JAR文件有一个tojson命令,该命令可以将Avro数据文件中的内容转储为JSON。

范例4-11.这个C程序从数据文件中读取Avro的成对记录

#include <avro.h>
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char *argv[]) {
    if (argc != 2) {
        fprintf(stderr, "Usage: dump_pairs <data_file>\n");
        exit(EXIT_FAILURE);
    }
    const char *avrofile = argv[1];
    avro_schema_error_t error;
    avro_file_reader_t filereader;
    avro_datum_t pair;
    avro_datum_t left;
    avro_datum_t right;
    int rval;
    char *p;
    avro_file_reader(avrofile, &filereader):
    while (1) {
        rval = avro_file_reader_read(filereader,NULL, &pair);
        if (rval) break;
        if (avro_record_get(pair, "left", &left) == 0) {
            avro_string_get(left, &p);
            fprintf(stdout, "%s,", p);
        }
        if (avro_record_get(pair, "right", &right) == 0) {
            avro_string_get(right, &p);
            fprintf(stdout, "%s\n", p);
        }
    }
    avro_file_reader_close(filereader);
    return 0;
}

该程序的核心部分主要处理三件事情。

· 通过调用Avro的avro_file_reader函数打开一个 avro_file_reader_t类型的文件读取实例。Avro的方法和类型都有avro_前缀,并在avro.h文件中定义。

· 通过文件读取实例的avro—file^eader'方法循环读取Avro数据 直到读完所有的成对记录(由返回值rval决定)。

· 通过avro_file_reader_close方法关闭文件读取实例。

将数据模式作为avrc_file_reader_read方法的第二个参数,即可支持读取模式不同于文件写入模式的情况(下一节将详细说明),但如果参数设为null则说明希望Avro使用数据文件的模式来读取数据。第三个参数为指向avro_datuin_t对象的指针,该指针的内容是从文件中读取的下一条记录的内容。通过调用avro_record_get方法,我们将成对的结构分解成两个字段,然后通过avro_string_get方法抽取出每个字段的值,最后打印输出到控制台。

使用Python程序的输出来运行程序,打印出以下原始输入:

% ./dump_pairs pairs.avro
a,1
c,2
b,3
b,2

这样,我们便成功交换了两个Avro实现的复杂数据。

4.4.5模式的解析

在选择的时候,读加数据的模式(reader的模式)可以不同于我们用于写入数据的模式(writer的模式)。这非常有用,因为它意味着模式演化。例如,为了便于说明,我们考虑新增一个description字段,形成一个新的模式:

{
    "type": "record",
    "name": "StringPair",
    "doc": "A pair of strings with an added field.",
    "fields":[
        {"name": "left", "type": "string"},
        {"name": "right", "type": "string"},
        {"name": "description", "type": "string","default": ""}
    ]
}

我们可以使用该模式来读取前面序列化的数据,因为我们已经为 description字段指定了一个默认值(空字符串)供Avro在读取没有定义字段的记录时使用。如果忽略default属性,在读取旧数据时会报错。

要想将默认值设为null而非空字符串,我们要用null Avro类型 的并集来定义description字段

{"name": "description", "type": ["null", "string"], "default":"null"}

读模式不同于写模式时,我们调用GenericDatumReader的构造函数,它取两个模式对象,即读取对象和写入对象,并按照以下顺序:

DatumReader<GenericRecord> reader =new GenericDatumReader<GenericRecord>(schema, newSchema);
Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder (out.toByteArray(), null);
GenericRecord result = reader.read(null, decoder);
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toStning(), is("R"));
assertThat(result.get("description").toString(), is(""));

对于元数据中存储有写入模式的数据文件,我们只需要显式指定写入模式,具体做法是向写入模式传递null:

DatumReader<GenericRecord> reader =new GenericDatumReader<GenericRecord>(null, newSchema);

不同读取模式的另外一个应用是去掉记录中的某些字段,该操作可以称为"投影"projection)。记录中有大量字段但如果只需读取其中的一部分,这种做法非常有用。例如,可以使用这一模式只读取StringPair对象的 right字段:

{
    "type": "record",
    "name": "StringPair",
    "doc": "The right field of a pair of strings.",
    "fields":[
        {"name": "right", "type": "string"}
    ]
}

模式解析规则可以直接解决模式由一个版本演化为另一个版本时可能产生的问题,Avro规范中对所有Avro类型均有详细说明。表4-12从类型读/写(客户端和服务器端)的角度总结了记录演化规则。

4-12•记录的模式演化

新模式

写入

读取

操作

增加的字段

通过默认值读取新字段,因为写入时没有该字段

读取时不知道新写人的新字段,所以忽略该字段(投影)

删除的字段

读取时忽略已删除的字段(投影)

写入时不写入已删除的字段。如果旧模式对该字段有 默认值,那么读取时可以使用该默认值,否则报错。 在这种情况下,最好同时更新读取模式或在更新写人 模式之前更新读取模式

Avro模式演化的另一个有用的技术是使用别名。别名允许你在读取Avro数据的模式与写入Avro数据的模式中使用不同的名称。例如,下列读取模式可以使用新的字段名称(即firstsecond),来读取StringPair数据,而非写入数据时使用的字段名称(leftright)。

{
    "type": "record",
    "name": "StringPair",
    "doc": "A pair of strings with aliased field names.",
    "fields":[
        {"name": "first", "type": "string", "aliases": ["left"]},
        {"name": "second", "type": "string", "aliases": ["right"]}
    ]
}

注意,别名的主要作用是将写入模式转换(在读取的时候)为读取模式,但是别名对读取程序是不可见的。在上述例子中,读取程序无法再使用字段名称leftright,因为它们已经被转换为firstsecond

4.4.6排列顺序

Avro定义了对象的排列顺序。Avro大多数类型的排列顺序与用户的期望符合例如,数值型按照数值的升序进行排列。其他的就没有那么巧妙了。

例如,枚举通过符号的定义而非符号字符串的值来排序。

除了 record之外,所有类型均按照Avro规范中预先定义的规则来排序, 这些规则不能被用户改写。但对于记录,可以指定order属性来控制排列顺序。它有三个值:ascending(默认值)、descending(降序)或ignore(所以为了比较的目的,可以忽略该字段。)

例如,通过将right字段设置为descending,下述模式(SortedStringPair.avsc)定义StringPair记录按降序顺序。为了排序的目的,忽略了left字段, 但依旧保留在投影中:

{
    "type": "record",
    "name": "StringPair",
    "doc": "A pair of strings, sorted by right field descending.",
    "fields":[
        {"name": "left", "type": "string", "order": "ignore"},
        {"name": "right", "type": "string", "order": "descending"}
    ]
}

按照读取模式中的文档顺序,记录中的字段两两进行比较。这样,通过指定一个恰当的读取模式,便可以对数据记录使用任意顺序。该模式(SwitchedStringpair.avsc)首先定义了一组right字段的顺序,然后是left字段的顺序:

{
    "type": "record",
    "name": "StringPair",
    "doc": "A pair of strings, sorted by right then left.",
    "fields":[
        {"name": "right", "type": "string"},
        {"name": "left", "type": "string"}
    ]
}

Avro实现了高效的二进制比较。也就是说,Avro不需要将二进制对象反序列化成对象即可实现比较,因为它可以直接对字节流进行操作。该属性的一个有用结果是,我们根据可以从对象或相应的二进制表示(后者在BinaryData对象上使用hashCode()静态方法)算出Avro数据的哈希代码,两种情况下获得的结果相同。在使用StringPair模式的情况下(没有order属性),Avro按以下方式实现了二进制比较。

第一个字段(即left字段),使用UTF-8编码,由此Avro可以根据字母表顺序进行比较。如果它们的顺序不确定的,那么Avro可以在该处停止比较。否则,如果这两个字节顺序是相同的,那么它们比较第二个字段 (right),同样在字节尺度上使用字母表序排列,因为该字段同样也使用 UTF-8编码。

Avro为我们提供了比较器,所以我们无需重写和维护这部分代码。同时,可以通过修改读取模式来简单修改排列顺序。对于SortedStringPair.avscSwitchedStringpair.avsc模式来说,Avro所使用的比较方法本质上与刚才所描述的是一致的,只不过要考虑比较哪个字段,考虑使用哪种顺序,是升序还是降序。

在本章后面,我们将Avro的排序逻辑和MapReduce联合使用,实现Avro数据文件的并行排序。

4.4.7关于Avro MapReduce

Avro提供了很多类,以便对Avro数据运行MapReduce程序。例如,org.apache.avro.mapreduce包中的AvroMapper类和AvroReducer类是Hadoop规范(旧版)中Mapper和Reducer类。它们去除了作为输入和 输出的键/值对的不同,因为Avro数据文件只是一系列顺序排列的值。但 是,考虑到混洗,中间结果数据仍然被划分为键/值对。

这次,我们使用Avro MapReduce API重写找出天气数据集中每年最髙温度的MapReduce程序。我们用下列模式来表征天气记录:

{
    "type": "record",
    "name": "WeatherRecord",
    "doc": "A weather reading.",
    "fields":[
        {"name": "year", "type": "int"},
        {"name": "temperature", "type": "int"},
        {"name": "stationld", "type": "string"}
    ]
}

范例4-12中的程序读取文本输入,并将包含 天气记录的Avro数据文件。

public class AvroGenericMaxTemperature extends Configured implements Tool{
  private static final Schema SCHEMA = new Schema.Parser().parse(
    "{"+
    "  \"type\": \" record\","  +
    "  \"name\": \"WeatherRecord\"," +
    "  \"doc\": \"A weather reading.\"," +
    "  \"fields \": [" +
    "    {\"name\": \"year\", \"type\": \"int\"}," +
    "    {\"name\": \"temperature\", \"type\": \"int\"}," +
    "    {\"name\": \"stationId\", \"type\": \"string\"}" +
    "  ]" +
    "}"
  );
  public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, GenericRecord>> {
    private NcdcRecordParser parser = new NcdcRecondParser();
    private GenericRecord record = new GenericData.Record(SCHEMA);
    @Override
    public void map(Utf8 line,AvroCollector<Pair<Integer, GenericRecord>> collector,Reporter reporter) throws IOException {
      parser.parse(line.toString());
      if (parser.isValidTemperature()) {
        record.put("year", parser.getYearlnt());
        record.put("temperature", parser.getAirTemperature());
        record.put("stationld", parser.getStationId());
        collector.collect(new Pair<IntegerJ GenericRecord>(parser.getYearlnt(), record));
      }
    }
  }
  
  public static class MaxTemperatureReducer extends AvnoReducer<Integer, GenericRecord, GenericRecord> {
    @Override
    public void reduce(Integer key, Iterable<GenericRecord> values,AvroCollector<GenericRecord> collector, Reporter reporter) throws IOException {
      GenericRecord max = null;
      for (GenericRecord value : values) {
        if (max == null ||(Integer) value.get("temperature") > (Integer)max.get("temperature")) {
          max = newWeatherRecord(value);
        }
      }
      collector.collect(max);
    }
    private GenericRecord newWeatherRecord(GenericRecord value) {
      GenericRecord record = new GenericData.Record(SCHEMA);
      record.put("year",value.get("year"));
      record.put("temperature", value.get("temperature"));
      record.put("stationld", value.get("stationld"));
      return record;
    }
  }
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    JobConf conf = new JobConf(getConf(), getClass());
    conf.setDobName("Max temperature");
    FilelnputFormat.addInputPath(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING));
    AvroJob.setMapOutputSchema(conf,
    Pair.getPairSchema(Schema.create(Schema.Type.INT), SCHEMA));
    AvroJob.setOutputSchema(conf, SCHEMA);
    conf.setInputFormat(AvroUtf8InputFormat.class);
    AvroJob.setMapperClass(conf, MaxTemperatureMapper.class);
    AvroJob.setReducerClass(conf, MaxTemperatureReducer.class);
    JobClient.runDob(conf);
    return 0;
  }
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
    System.exit(exitCode);
  }
}

这个程序使用的是泛型Avro映射。这样可以避免我们以损失类型安全(通过字符串值来引用字段名称,例如"temperature")为代价,生成代码来表征数据记录。为了方便,天气记录的模式已加入代码中(读取SCHMA常量),但是在实际情况中,从驱动器本地文件中读取模式并通过Hadoop作业配置将模式传递给mpperreducer,可以提高代码的可维护性。

API与常规的Hadoop MapReduce API有两个较大不同之处。第一个不同是通过 org.apache.avro.mapred.Pair 来包裹 MaxTempenatureMapper map输出的键、值。(org.apache.avro.mapred.AvroMapper不具备定输出键、值的原因是,方便仅有map任务的作业将值存入Avro数据文件中)针对这个MapReduce程序,键是年份(一个整数),值是天气记录,由Avro的GenericRecord表征。

Avro MapReduce确实为reducer的输入保留了键-值对格式,但这是混洗的输出,所以它在调用 org.apache.avro.mapred.AvroReducer 之前对 Pair进行解包。MaxTemperatureReducer针对每个键(年份)所对应的所有记录执行迭代运算,并找到那条有最高温度的记录。有必要对目前找到的最髙温度的记录拷贝一份,因为该迭代运算为了达到高效的目的需要重用该实例(并且仅更新相关字段)。

与传统MapReduce的第二个差异是,它使用AvroJob来配置作业。AvroJob类非常适用于为输入、map输出以及最后输出数据指定Avro模式。在上述程序中,输入模式是Avro string,因为我们是从文本文件中读取数据且输入格式被设置为AvroUtf8InputFormatmap输出模式是键值对模式,键模式是Avro int,值模式是天气记录模式。最后输出数据模式是天气记录模式,并且写入Avro数据文件中的输出格式是默认的 AvroOutputFormat 格式

下面的命令行代码显示了如何在一个小的采样数据集上运行该程序:

% hadoop jar avro-examples.jar AvroGenericMaxTemperature \
input/ncdc/sample.txt output

执行完成之时,我们可以使用Avro工具JAR来査看输出结果,该结果是JSON格式的Avro数据文件,每行一条记录如下:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-00000.avro

{"year" : 1949, "temperature" : 111, "stationld" : "012650-99999"}

{"year" : 1950, "temperature" : 22, "stationld" : "011990-99999"}

在上述例子中,我们使用了一个AvroMapper和一个AvroReducer,但API支持传统MapReducermapper、reducer  Avro 指定的 mapper、reducer 混合使用这有利于将数据格式在Avro格式和其他格式之间来回转换,例如 SequenceFile

4.4.8使用Avro MapReduce进行排序

在本节中我们利用Avro的排序能力,并结合使用MapReduce写一个程序来 Avro数据文件进行排序(范例4-13)。

范例4-13•对Avro数据文件进行排序的MapReduce程序

public class AvroSort extends Configured implements Tool {
    static class SortMapper<K> extends AvroMapper<K, Pair<K, K>> {
        public void map(K datum, AvroCollector<Pair<K, K>> collector, Reporter reporter) throws IOException {
            collector.collect(new Pair<K, K>(datum, null, datum, null));
        }
    }
    static class SortReducer<K> extends AvroReducer<Ki K, K> {
        public void reduce(K key, Iterable<K> values, AvroCollector<K> collector,Reporter reporter) throws IOException {
            for (K value : values) {
                collector.collect(value);
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 3) {
            System.err.printf(
            "Usage: %s [generic options] <input> <output> <schema-file>\n", getClass().getSimpleName());
            ToolRunner. printGenericCommandUsage(System, err);
            return -1;
        }
        String input = args[0];
        String output = args[1];
        String schemaFile = args[2];
        
        JobConf conf = new JobConf(getConf(), getClass());
        conf.setJobName("Avro sort");
        FilelnputFormat.addInputPath(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));
        
        Schema schema = new Schema.Parser().parse(new File(schemaFile));
        AvroJob. setInputSchema (conf, schema);
        Schema intermediateSchema = Pair.getPairSchema(schema,schema);
        AvroJob.setMapOutputSchema(conf, intermediateSchema);
        AvroJob.setOutputSchema(conf, schema);
        AvroJob.setMapperClass(conf, SortMapper.class);
        AvroDob.setReducerClass(conf, SortReducer.class);
        
        JobClient.runDob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvroSort(), args);
        System.exit(exitCode);
    }
}

这个程序(使用了泛型的Avro映射,因此无需生成任何代码)能够对由泛型类型参数K表示的任何Java类型的Avro记录进行排序。我们选择值的类型与键的类型相同,以便在值按照键分组后,我们可以对有多值对应于一个键的情况(根据排序函数)输出同一个键对应的所有值,不丢失任何记录。Mapper输出键和值的同时输出具有同一个键的org. apache, avro. mapred.Pair对象。Reducer作为一个识别器,将值(单值)传递给输出,并写入Avro数据文件。

排序发生在MapReduce的混洗过程中,并且排序函数由Avro模式确定并由 它传入程序中。下面我们使用SortedStringPair.avsc模式对右边的字段按照降序排列,实现对先前创建的pairs.avro文件排序。首先,我们使用Avro工具JAR检査输入数据:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro
{"left":"a","right":"1"}
{"left":"c","right":"2"}
{"left":"b","right":"3"}
{"left":"b","right":"2"}

然后我们运行排序程序:

% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \
ch04-avro/src/main/resources/SortedStringPair.avsc

最后,检査输出并査看是否正确排序。

% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-00000.avro
{"left":"b","right":"3"}
{"left":"c","right":"2"}
{"left":"b","right":"2"}
{"left":"a","right":"1"}

4.4.9其他语言的 Avro MapReduce

除了 Java语言之外,还有其他语言也可以使用Avro数据。

AvroAsTextlnputFormat 被设计用来允许 Hadoop Streaming 程序读取 Avro数据文件。文件中的每条数据均被转化为一个字符串,通过JSON格式表示,或者是原始字节(如果是Avro bytes类型的话)。另一方面,你可以指定AvroTextOutputFormat作为Streaming作业的输出格式,并按照bytes模式创建Avro数据文件,其中每条记录是从Streaming输出的、由制表符分隔的键值对。这两个类均可以在org.apache.avro.mapred包中找到。

针对比Streaming更复杂的接口,Avro提供了一个连接框架(在org.apache.avro.mapred.tether包中),该框架与 Hadoop Pipe类似。在写作本书的时候,依旧役有绑定其他语言,但是Python实现将在以后的发行版本中出现。

还值得考虑通过Pig和Hive来处理Avro数据文件,因为两者均可以通过指定适合的数据存储格式来读/写Avro数据文件。

4.5基于文件的数据结构

对于某些应用,我们需要一种特殊的数据结构来存储自己的数据。对于基于MapReduce的数据处理,将每个二进制数据大对象(blob)单独放在各自的文件中不能实现可扩展性,所以,Hadoop为此开发了很多更髙层次的容器。

4.5.1关于 SequenceFile

考虑日志文件,其中每一行文本代表一条日志记录。纯文本不合适记录二进制类型的数据。在这种情况下,Hadoop的SequenceFile类非常合适, 为二进制键/值对提供了一个持久数据结构。将它作为日志文件的存储格式时,你可以自己选择键(比如LongWritable类型所表示的时间戳),以及值可以是Writable类型(用于表示日志记录的数量)。

SequenceFiles也可以作为小文件的容器。HDFSMapReduce是针对大文件优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效率的存储和处理。

SequenceFile的写操作

通过createWriter()静态方法可以创建SequenceFile对象,并返回SequenceFile.Writer实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStreamFileSystem对象和Path对象),Configuration对象,以及键和值的类型。另外,可选参数包括压缩 类型以及相应的codec,Progressable回调函数用于通知写入的进度,以及在SequenceFile头文件中存储的Metadata实例。 存储在SequenceFile中的键和值并不一定需要是Writable类型。只要能被Serialization序列化和反序列化,任何类型都可以。

一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾附加键/值对。写完后,可以调用close()方法(SequenceFile..Writer 实现了 java.io.Closeable 接口)。

范例4-14显示了一小段代码,它使用刚才描述的APT将键/值对写入一个SequenceFile

范例4-14.写入SequenceFile对象

public class SequenceFileWriteDemo {
    private static final String[] DATA = {
        "One, two, buckle my shoe",
        "Three, four, shut the door",
        "Five, six, pick up sticks",
        "Seven, eight, lay them straight",
        "Nine, ten, a big fat hen"
    };
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            IOUtils.closestream(writer);
        }
    }
}

顺序文件中存储的键/值对,键是从1001降序排列的整数,表示为IntWritable对象,值是Text对象。在将每条记录追加到 SequenceFile.Writer实例末尾之前,我们调用getLength()方法来获取文件的当前位置。(在下一小节中,如果不按序读取文件,则使用这一信息作为记录的边界。)我们把这个位置信息和键/值对输出到控制台。结果如下所示:

% hadoop SequenceFileMriteDemo numbers.seq
[128] 100 One, two,buckle my shoe
[173] 99 Three, four, shut the door
[220] 98 Five, six, pick up sticks
[264] 97 Seven, eight, lay them straight
[314] 96 Nine, ten, a big fat hen
[359] 95 One, two, buckle my shoe
[404] 94 Three, four, shut the door
[451] 93 Five, six, pick up sticks
[495] 92 Seven, eight, lay them straight
[545] 91 Nine, ten, a big fat hen
...
[1976] 60 One, two, buckle my shoe
[2021] 59 Three, four, shut the door
[2088] 58 Five, six, pick up sticks
[2132] 57 Seven, eight, lay them straight
[2182] 56 Nine, ten, a big fat hen
...
[4557] 5 One, two, buckle my shoe
[4602] 4 Three, four, shut the door
[4649] 3 Five, six, pick up sticks
[4693] 2 Seven, eighty lay them straight
[4743] 1 Nine, ten, a big fat hen

SequenceFile 的读操作

从头到尾读取顺序文件不外乎创建SequenceFile.Reader实例后反复调用next()方法迭代读取记录。读取的是哪条记录与你使用的序列化框架相关。如果使用的是Writable类型,那么通过键和值作为参数的next()方法可以将数据流中的下一条键/值对读入变量中:

public boolean next(Writable key, Writable val)

如果键/值对成功读取,则返回true,如果已读到文件末尾,则返回 false。

对于其他非Writable类型的序列化框架(Apache Thrift),则应该使用下面两个方法:

public Object next(Object key) throws IOException

public Object getCurrentValue(Object val) throws IOException

在这种情况下,需要确保io.serializations属性已经设置了你想使用的 序列化框架

如果next()方法返回的是非null对象,则可以从数据流中读取键、值 对,并且可以通过getCuprentValue()方法读取该值。否则,如果next()返回null值,则表示已经读到文件末尾。

范例4-15中的程序显示了如何读取包含Writable类型键、值对的顺序文 件。注意如何通过调用getKeyClass()方法和getValueClass()方法进而SequenceFile中所使用的类型,然后通过ReflectionUtils对象常见键和值的实例。通过这个技术,该程序可用于处理有Writable类型键、值对的任意一个顺序文件。

范例 4-15.读取 SequenceFile

public class SequenceFileReadDemo {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable)ReflectionUtils.newlnstance(reader.getKeyClass(), conf);
            Writable value = (Writable)ReflectionUtils.newlnstance(reader.getValueClass(), conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                String syncSeen = reader. syncSeen() ? "*":"";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,value);
                position = reader.getPosition(); // beginning of next record
            }
        } finally {
            IOUtils.closeStream(reader);
        }
    }
}

该程序的另一个特性是能够显示顺序文件中同步点的位置信息。所谓同步点, 是指数据读取的实例出错后能够再一次与记录边界同步的数据流中的一个位 置,例如,在数据流中搜索到任意位置后。同步点是由SequenceFile.Writer 录的,后者在顺序文件写入过程中插入一个特殊项以便每隔几个记录便有 一个同步标识。这样的特殊项非常小,因而只造成很小的存储开销,不到 1%。同步点始终位于记录的边界处。

运行范例4-15的程序后,会显示星号表示的顺序文件中的同步点。第一同步点位于2021处(第二个位于4075处,但本例中并没有显示出来):

% hadoop SequenceFileReadOemo numbers.seq
[128] 100 One, two, buckle my shoe
[173] 99 Three, four, shut the door
[220] 98 Five, six, pick up sticks
[264] 97 Seven, eight, lay them straight
[314] 96 Nine, ten, a big fat hen
[B59] 95 One, two, buckle my shoe
[404] 94 Three, four, shut the door
[451] 93 Five, six, pick up sticks
[495] 92 Seven, eigh^, lay them straight
[545] 91 Nine, ten, a big fat hen
[590] 90 One, two, buckle my shoe
...
[1976] 60 One, two, buckle my shoe
[2021*] 59 Three, four, shut the door
[2088] 58 Five, six, pick up sticks
[2132] 57 Seven, eight, lay them straight
[2182] 56 Nine, ten, a big fat hen
...
[4557] 5 One, two, buckle my shoe
[4602] 4 Three, four, shut the door
[4649] 3 Five, six, pick up sticks
...
[4693] 2 Seven, eight, lay them straight
[4743] 1 Nine, ten, a big fat hen

在顺序文件中搜索给定位置有两种方法。第一种是调用seek()方法,该方法将读指针指向文件中指定的位置。例如,可以按如下方式搜査记录边界:

reader.seek(359);
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(), is(95));

但如果给定位置不是记录边界,调用next()方法时就会出错:

reader.seek(360);
reader.next(key, value); // fails with IOException

第二种方法通过同步点査找记录边界。SequenceFile.Reader对象的 sync(long position)方法可以将读取位置定位到position之后的下一个同步点。如果position之后没有同步了,那么当前读取位置将指向文 件末尾。这样,我们对数据流中的任意位置调用sync()方法(例如非记录 边界)而且可以重新定位到下一个同步点并继续向后读取:

reader.sync(360);
assertThat(reader.getPosition(), is(2021L));
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(), is(59));

SequenceFile.Writer对象有一个sync()方法,该方法可以在数据流的当前位置插入一个同步点。不要把它和同名的Syncable接口中定义的sync()方法混为一谈,后者用于底层设备缓冲区的同步。

可以将加入同步点的顺序文件作为MapReduce的输入,因为该类顺序文件允许切分,由此该文件的不同部分可以由独立的map任务单独处理。

 通过命令行接口显示SequenceFile

hadoop fs命令有一个text选项可以以文本形式显示顺序文件。该选项可以査看文件的代码,由此检测出文件的类型并将其转换成相应的文本。该选项可以识别gzip压缩文件和顺序文件,否则,便假设输入为纯文本文件。

对于顺序文件,如果键和值是有具体含义的字符串表示,那么这个命令就非常有用(通过toString()方法定义)。同样,如果有自己定义的键或值的类,则需要确保它们在Hadoop类路径目录下。

对前一小节中创建的顺序文件执行这个命令,我们得到如下输出:

% hadoop fs -text numbers.seq | head
100 One, two, buckle my shoe
99 Three,four, shut the door
98 Five,six, pick up sticks
97 Seven,eight, lay them straight
96 Nine,ten^ a big fat hen
95 One, two, buckle my shoe
94 Three,four, shut the door
93 Five,six, pick up sticks
92 Seven,eight, lay them straight
91 Nine,ten, a big fat hen

SequenceFile的排序和合并

MapReduce是对多个顺序文件进行排序(或合并)最有效的方法。MapReduce 本身是并行的,并且可由你指定要使用多少个reducer(该数决定着输出分区数)。例如,通过指定一个reducer,可以得到一个输出文件。我们可以使用 Hadoop发行版自带的例子,通过指定键和值的类型来将输入和输出指定为顺序文件:

% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFilelnputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntWritable \
-outvalue org.apache.hadoop.io.Text \
numbers.seq sorted

% hadoop fs -text sorted/part-00000 | head

1 Nine,ten, a big fat hen

2 Seven,eight, lay them straight

3 Five,six, pick up sticks

4 Three,four, shut the door

5 One, two, buckle my shoe

6 Nine,ten^ a big fat hen

7 Seven,eight, lay them straight

8 Five,six, pick up sticks

9 Three,four, shut the door

10 One, two, buckle my shoe

除了通过MapReduce实现排序/归并,还有一种方法是使用SequenceFile.Sorter类sort()方法和merge()方法。它们比MapReduce更早出现,比MapReduce更底层(例如,为了实现并行,你需要手动对数据进行分区),所 以对顺序文件进行排序合并时MapReduceSequenceFile的格式

顺序文件由文件头和随后的一条或多条记录组成(参见图4-2)。顺序文件的前三个字节为SEQ(顺序文件代码),紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值类的名称、数据压缩细节、用户定义的元数据以及伺步标识。@如前所述,同步标识用于在读取文件时能 够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标 识,其值存储在文件头中。同步标识位于顺庆文件中的记录与记录之间。 同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识(特别是比较短的记录)。

blob.png 

4-2.压缩前和压缩后的顺序文件的内部结构

记录的内部结构取决于是否启用压缩。如果已经启用,则取决于记录压缩 数据块压缩。

如果没有启用压缩(默认情况),那么每条记录则由记录长度(字节数)、键长度、键和值组成。长度字段为4字节长的整数,遵循java.io.DataOutput类中writelnt()方法的协定。为写入顺序文件的类定义Serialization类,通过它来实现键和值的序列化。

记录压缩格式与无压缩情况基本相同,只不过值是用文件头中定义的codec 压缩的。注意,键没有被压缩。

块压缩(block compression)是指一次性压缩多条记录,因为它可以利用记录间的相似性进行压缩,所以相较于单条记录压缩方法,该方法的压缩效率 更高。如图4-3所示。可以不断向数据块中压缩记录,直到块的字节数不小于io.seqfile.compress.blocksize属性中设置的字节数:默认为1 MB。每一个新块的开始处都需要插入同步标识。数据块的格式如下:首先是一个指示数据块中字节数的字段;紧接着是4个压缩字段(键长度、键、值长度 和值)。

blob.png 

4-3.采用块压缩方式之后,顺序文件的内部结构

 

4.5.2关于MapFile 

MapFile是已经排过序的SequenceFile,它有索引,所以可以按键查 找。可以将MapFile视为java.util.Map的持久化形式(尽管它并没有实现该接口),它的大小可以超过保存在内存中一个map的大小。

 MapFile的写操作

MapFile的写操作类似于SequenceFile的写操作。新建一个MapFile.Writer实例,然后调用append()方法顺序写入文件内容。如果不顺序写入,就抛出一个IOException异常。键必须是WritableComparable类型的实例,值必须是Writable类型的实例,这与SequenceFile正好相反,后者可以为其条目使用任意序列化框架。

范例4-16中的程序新建一个MapFile对象,然后向它写入一些记录,与范 4-14中新建SequenceFile对象的程序非常相似。

范例4-16.写入MapFile

public class MapFileWriteDemo {
    private static final String[] DATA = {
        "One, two, buckle my shoe",
        "Three, four, shut the door",
        "Five, six, pick up sticks",
        "Seven, eight, lay them straight",
        "Nine, ten, a big fat hen"
    };
    public static void main(String[] args) throws IOException {
        String uri = angs[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
         
        IntWritable key = new IntWritable();
        Text value = new Text();
        MapFile.Writer writer = null;
        try {
            writer.= new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass());
            for (int i = 0; i < 1024; i++) {
                key.set(i + 1);
                value.set(DATA[i % DATA.length]);
                writer.append(key, value);
            }
        } finally {
            IOUtils.closestream(writer);
        }
    }
}

让我们使用这个程序构建一个MapFile:

% hadoop MapFileWriteDemo numbers.map

仔细观察这个MapFile,会发现它实际上是一个包含dataindex这两个 文件的文件夹:

% Is -1 numbers.map
total 104
-rw-r--r-- 1 tom tom 47898 Jul 29 22:06 data
-rw-r--r-- 1 tom tom 251 Jul 29 22:06 index

这两个文件都是SequenceFile。data文件包含所有记录,依次如下:

% hadoop fs -text numbers.map/data | head
1 One, two, buckle my shoe
2 Three, four, shut the door
3 Five, six, pick up sticks
4 Seven, eight, lay them straight
5 Nine, ten, a big fat hen
6 One, two, buckle my shoe
7 Three, four, shut the door
8 Five, six, pick up sticks
9 Seven, eight, lay them straight
10 Nine, ten, a big fat hen

index文件包含一部分键和data文件中键到其偏移量的映射:

% hadoop fs -text numbers.map/index
1 128
129 6079
257 12054
385 18030
513 24002
641 29976
769 35947
897 41922

从输出中可以看到,默认情况下只有每隔128个键才有一个包含在文件中,当然也可以调整,调用MapFile.Writer实例的setlndexlnterval() 方法来设置io.map.index.interval属性即可。增加索引间隔大小可以有效减少MapFile存储索引所需要的内存。相反,降低该间隔可以提高随 机访问效率(因为减少了),代价是消耗了更多内存。

因为索引只保留一部分键,所以MapFile无法枚举所有键甚至计算它自己 有多少键。唯一的办法是读取整个文件。

MapFile的读操作

MapFile依次遍历文件中所有条目的过程类似于SequenceFile中的过 程:首先新建一个MapFile.Reader实例,然后调用next()方法,直到返回值为false,表示没有条目返回,因为已经读到文件末尾:

public boolean next(WritableComparable key, Writable val) throws IOException

调用get()方法可以随机访问文件中的数据:

public Writable get(WritableComparable key, Writable val) throws IOException

返回值用于确定是否在MapFile中找到相应的条目如果是null,说明指定key没有相应的条目。如果找到相应的key,则将该键对应的值读入val变量,通过方法调用返回。这有助于我们理解实现过程。下面的代码是我们在前一小节中建立的,用于检索MapFile中的条目:

Text value = new Text();
reader.get(new IntWnitable(496), value);
assertThat(value.toString(), is("One, two, buckle my shoe"));

对于这个操作,MapFile.Reader首先将index文件读入内存(由于索引是缓存的,所以后续的随机访问将使用内存中的同一个索引)。接着对内存中的索引进行二分查找,最后找到小于或等于搜索索引的键496。在本例中,找到的键位于385,对应的值为18030, data文件中的偏移量,接着顺序读 data文件中的键,直到读取到496为止。至此,找到键所对应的值,最后从data文件中读取相应的值。就整体而言,一次査找需要一次磁盘寻址和 一次最多有128个条目的扫描。对于随机访问,这是非常髙效的。

getClosest()方法与get()方法类似,只不过它返回的是与指定键匹配的 最接近的键,而不是在不匹配时返回null。更准确地说,如果MapFile 含指定的键,则返回对应的条目,否则,返回MapFile中第一个大于(或小 于,由相应的boolean参数指定)指定键的键。

大型MapFile的索引会占据大量内存。不要在修改索引间隔之后重建索 引,要在读取索引时设置io.mao.index.skip属性来加载一定比例的索引 键。该属性通常设置为0,意味着不跳过索引键;如果设置为1,则表示每 次跳过索引键中的一个(也就是索引键中的每隔一个键),如果设置为2,则 表示每次读取索引时跳过2个键(也就是说,只读索引三分之一的键),以此 类推。设置的值越大,节省大量的内存,但会增加搜索时间,因为平均而言,扫描的键更多。

MapFile的变种

Hadoop在通用的键/值对MapFile接口上提供了一些变种。

· SetFile是一个特殊的MapFile,用于存储Writable键的集合。键必须按照排好的顺序添加。

· ArrayFile也是一个MapFile变种,该变种中的键是一个整型, 用于表示数组中元素的索引,而值是一个Writable值。

· BloomMapFile也是一个MapFile变种,该变种提供了 get()方法的一个高性能实现,对稀疏文件特别有用。该实现使用一个动态 的布隆过滤器来检测某个给定的键是否在map文件中。这个测试 非常快,因为是在内存中完成的,但是该测试结果出现假阳性的概 率大于零。经过布隆过滤器过滤之后,如果存在相应结果,则调用 get()方法。

它有两个调优参数,一个是io.mapfile.bloom.size,(近似)指出map文件中有多少个条目;另一个是ip.map.file.bloom.error.rate,设置布隆过滤器出现假阳性的概率值(默认为0.005,即0.5%)。

SequenceFile转换为 MapFile

MapFile中搜索相当于在加有索引和排过序的SequenceFile中捜索。 所以我们自然联想到把SequenceFile转换为MapFile。这里讨论如何为SequenceFile创建索引。范例4-17中的程序显示了对MapFile调用fix()静态方法,该方法能够为MapFile重建索引。

范例4-17.对MapFile再次创建索引

public class MapFileFixer {
    public static void main(String[] args) throws Exception {
        String mapUri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
        Path map = new Path(mapUri);
        Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
        // Get key and value types from data sequence file
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
        Class keyClass = reader.getKeyClass();
        Class valueClass = reader.getValueClass();
        reader.close();
        // Create the map file index file
        long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
        System.out.printf("Created MapFile %s with %d entries\n", map, entries);
    }
}

Fix()方法通常用于重建已损坏的索引,但是由于它能从头开始建立新的索 引,所以此处我们可以使用该方法满足需求。具体用法如下。 .

⑴将名为叫的顺序文件排序后,保存到名为的文件夹下,该文件夹就是最终的MapFile(如果顺序文件已排过序,则可以跳过这一步。只需要把这个文件复制到

文件,然后直接跳到第3步):

% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFilelnputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntMritable \
-outvalue org.apache.hadoop.io.Text \
numbers.seq numbers.map

(2)MapReduce的输出重命名为data文件:

% hadoop fs -mv numbers.map/part-00000 numbers.map/data

(3)建立index文件:

% hadoop MapFileFixer numbers.map

Created MapFile numbers.map with 100 entries

现在,名为的MapFile已经存在并可以使用了。

转载请注明:全栈大数据 » 第四章 Hadoop的I/O操作

喜欢 (4)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址