整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

4.5.1关于 SequenceFile

hadoop 花牛 10℃ 0评论

考虑日志文件,其中每一行文本代表一条日志记录。纯文本不合适记录二进制类型的数据。在这种情况下,Hadoop的SequenceFile类非常合适, 为二进制键/值对提供了一个持久数据结构。将它作为日志文件的存储格式时,你可以自己选择键(比如LongWritable类型所表示的时间戳),以及值可以是Writable类型(用于表示日志记录的数量)。

SequenceFiles也可以作为小文件的容器。HDFSMapReduce是针对大文件优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效率的存储和处理。

SequenceFile的写操作

通过createWriter()静态方法可以创建SequenceFile对象,并返回SequenceFile.Writer实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStreamFileSystem对象和Path对象),Configuration对象,以及键和值的类型。另外,可选参数包括压缩 类型以及相应的codec,Progressable回调函数用于通知写入的进度,以及在SequenceFile头文件中存储的Metadata实例。 存储在SequenceFile中的键和值并不一定需要是Writable类型。只要能被Serialization序列化和反序列化,任何类型都可以。

一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾附加键/值对。写完后,可以调用close()方法(SequenceFile..Writer 实现了 java.io.Closeable 接口)。

范例4-14显示了一小段代码,它使用刚才描述的APT将键/值对写入一个SequenceFile

范例4-14.写入SequenceFile对象

public class SequenceFileWriteDemo {
    private static final String[] DATA = {
        "One, two, buckle my shoe",
        "Three, four, shut the door",
        "Five, six, pick up sticks",
        "Seven, eight, lay them straight",
        "Nine, ten, a big fat hen"
    };
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            IOUtils.closestream(writer);
        }
    }
}

顺序文件中存储的键/值对,键是从1001降序排列的整数,表示为IntWritable对象,值是Text对象。在将每条记录追加到 SequenceFile.Writer实例末尾之前,我们调用getLength()方法来获取文件的当前位置。(在下一小节中,如果不按序读取文件,则使用这一信息作为记录的边界。)我们把这个位置信息和键/值对输出到控制台。结果如下所示:

% hadoop SequenceFileMriteDemo numbers.seq
[128] 100 One, two,buckle my shoe
[173] 99 Three, four, shut the door
[220] 98 Five, six, pick up sticks
[264] 97 Seven, eight, lay them straight
[314] 96 Nine, ten, a big fat hen
[359] 95 One, two, buckle my shoe
[404] 94 Three, four, shut the door
[451] 93 Five, six, pick up sticks
[495] 92 Seven, eight, lay them straight
[545] 91 Nine, ten, a big fat hen
...
[1976] 60 One, two, buckle my shoe
[2021] 59 Three, four, shut the door
[2088] 58 Five, six, pick up sticks
[2132] 57 Seven, eight, lay them straight
[2182] 56 Nine, ten, a big fat hen
...
[4557] 5 One, two, buckle my shoe
[4602] 4 Three, four, shut the door
[4649] 3 Five, six, pick up sticks
[4693] 2 Seven, eighty lay them straight
[4743] 1 Nine, ten, a big fat hen

SequenceFile 的读操作

从头到尾读取顺序文件不外乎创建SequenceFile.Reader实例后反复调用next()方法迭代读取记录。读取的是哪条记录与你使用的序列化框架相关。如果使用的是Writable类型,那么通过键和值作为参数的next()方法可以将数据流中的下一条键/值对读入变量中:

public boolean next(Writable key, Writable val)

如果键/值对成功读取,则返回true,如果已读到文件末尾,则返回 false。

对于其他非Writable类型的序列化框架(Apache Thrift),则应该使用下面两个方法:

public Object next(Object key) throws IOException

public Object getCurrentValue(Object val) throws IOException

在这种情况下,需要确保io.serializations属性已经设置了你想使用的 序列化框架

如果next()方法返回的是非null对象,则可以从数据流中读取键、值 对,并且可以通过getCuprentValue()方法读取该值。否则,如果next()返回null值,则表示已经读到文件末尾。

范例4-15中的程序显示了如何读取包含Writable类型键、值对的顺序文 件。注意如何通过调用getKeyClass()方法和getValueClass()方法进而SequenceFile中所使用的类型,然后通过ReflectionUtils对象常见键和值的实例。通过这个技术,该程序可用于处理有Writable类型键、值对的任意一个顺序文件。

范例 4-15.读取 SequenceFile

public class SequenceFileReadDemo {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable)ReflectionUtils.newlnstance(reader.getKeyClass(), conf);
            Writable value = (Writable)ReflectionUtils.newlnstance(reader.getValueClass(), conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                String syncSeen = reader. syncSeen() ? "*":"";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,value);
                position = reader.getPosition(); // beginning of next record
            }
        } finally {
            IOUtils.closeStream(reader);
        }
    }
}

该程序的另一个特性是能够显示顺序文件中同步点的位置信息。所谓同步点, 是指数据读取的实例出错后能够再一次与记录边界同步的数据流中的一个位 置,例如,在数据流中搜索到任意位置后。同步点是由SequenceFile.Writer记 录的,后者在顺序文件写入过程中插入一个特殊项以便每隔几个记录便有 一个同步标识。这样的特殊项非常小,因而只造成很小的存储开销,不到 1%。同步点始终位于记录的边界处。

运行范例4-15的程序后,会显示星号表示的顺序文件中的同步点。第一同步点位于2021处(第二个位于4075处,但本例中并没有显示出来):

% hadoop SequenceFileReadOemo numbers.seq
[128] 100 One, two, buckle my shoe
[173] 99 Three, four, shut the door
[220] 98 Five, six, pick up sticks
[264] 97 Seven, eight, lay them straight
[314] 96 Nine, ten, a big fat hen
[B59] 95 One, two, buckle my shoe
[404] 94 Three, four, shut the door
[451] 93 Five, six, pick up sticks
[495] 92 Seven, eigh^, lay them straight
[545] 91 Nine, ten, a big fat hen
[590] 90 One, two, buckle my shoe
...
[1976] 60 One, two, buckle my shoe
[2021*] 59 Three, four, shut the door
[2088] 58 Five, six, pick up sticks
[2132] 57 Seven, eight, lay them straight
[2182] 56 Nine, ten, a big fat hen
...
[4557] 5 One, two, buckle my shoe
[4602] 4 Three, four, shut the door
[4649] 3 Five, six, pick up sticks
...
[4693] 2 Seven, eight, lay them straight
[4743] 1 Nine, ten, a big fat hen

在顺序文件中搜索给定位置有两种方法。第一种是调用seek()方法,该方法将读指针指向文件中指定的位置。例如,可以按如下方式搜査记录边界:

reader.seek(359);
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(), is(95));

但如果给定位置不是记录边界,调用next()方法时就会出错:

reader.seek(360);
reader.next(key, value); // fails with IOException

第二种方法通过同步点査找记录边界。SequenceFile.Reader对象的 sync(long position)方法可以将读取位置定位到position之后的下一个同步点。如果position之后没有同步了,那么当前读取位置将指向文 件末尾。这样,我们对数据流中的任意位置调用sync()方法(例如非记录 边界)而且可以重新定位到下一个同步点并继续向后读取:

reader.sync(360);
assertThat(reader.getPosition(), is(2021L));
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(), is(59));

SequenceFile.Writer对象有一个sync()方法,该方法可以在数据流的当前位置插入一个同步点。不要把它和同名的Syncable接口中定义的sync()方法混为一谈,后者用于底层设备缓冲区的同步。

可以将加入同步点的顺序文件作为MapReduce的输入,因为该类顺序文件允许切分,由此该文件的不同部分可以由独立的map任务单独处理。

 通过命令行接口显示SequenceFile

hadoop fs命令有一个-text选项可以以文本形式显示顺序文件。该选项可以査看文件的代码,由此检测出文件的类型并将其转换成相应的文本。该选项可以识别gzip压缩文件和顺序文件,否则,便假设输入为纯文本文件。

对于顺序文件,如果键和值是有具体含义的字符串表示,那么这个命令就非常有用(通过toString()方法定义)。同样,如果有自己定义的键或值的类,则需要确保它们在Hadoop类路径目录下。

对前一小节中创建的顺序文件执行这个命令,我们得到如下输出:

% hadoop fs -text numbers.seq | head
100 One, two, buckle my shoe
99 Three,four, shut the door
98 Five,six, pick up sticks
97 Seven,eight, lay them straight
96 Nine,ten^ a big fat hen
95 One, two, buckle my shoe
94 Three,four, shut the door
93 Five,six, pick up sticks
92 Seven,eight, lay them straight
91 Nine,ten, a big fat hen

SequenceFile的排序和合并

MapReduce是对多个顺序文件进行排序(或合并)最有效的方法。MapReduce 本身是并行的,并且可由你指定要使用多少个reducer(该数决定着输出分区数)。例如,通过指定一个reducer,可以得到一个输出文件。我们可以使用 Hadoop发行版自带的例子,通过指定键和值的类型来将输入和输出指定为顺序文件:

% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFilelnputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntWritable \
-outvalue org.apache.hadoop.io.Text \
numbers.seq sorted

% hadoop fs -text sorted/part-00000 | head

1 Nine,ten, a big fat hen

2 Seven,eight, lay them straight

3 Five,six, pick up sticks

4 Three,four, shut the door

5 One, two, buckle my shoe

6 Nine,ten^ a big fat hen

7 Seven,eight, lay them straight

8 Five,six, pick up sticks

9 Three,four, shut the door

10 One, two, buckle my shoe

除了通过MapReduce实现排序/归并,还有一种方法是使用SequenceFile.Sorter类的sort()方法和merge()方法。它们比MapReduce更早出现,比MapReduce更底层(例如,为了实现并行,你需要手动对数据进行分区),所 以对顺序文件进行排序合并时MapReduceSequenceFile的格式

顺序文件由文件头和随后的一条或多条记录组成(参见图4-2)。顺序文件的前三个字节为SEQ(顺序文件代码),紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值类的名称、数据压缩细节、用户定义的元数据以及伺步标识。@如前所述,同步标识用于在读取文件时能 够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标 识,其值存储在文件头中。同步标识位于顺庆文件中的记录与记录之间。 同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识(特别是比较短的记录)。

blob.png 

图4-2.压缩前和压缩后的顺序文件的内部结构

记录的内部结构取决于是否启用压缩。如果已经启用,则取决于记录压缩 数据块压缩。

如果没有启用压缩(默认情况),那么每条记录则由记录长度(字节数)、键长度、键和值组成。长度字段为4字节长的整数,遵循java.io.DataOutput类中writelnt()方法的协定。为写入顺序文件的类定义Serialization类,通过它来实现键和值的序列化。

记录压缩格式与无压缩情况基本相同,只不过值是用文件头中定义的codec 压缩的。注意,键没有被压缩。

块压缩(block compression)是指一次性压缩多条记录,因为它可以利用记录间的相似性进行压缩,所以相较于单条记录压缩方法,该方法的压缩效率 更高。如图4-3所示。可以不断向数据块中压缩记录,直到块的字节数不小于io.seqfile.compress.blocksize属性中设置的字节数:默认为1 MB。每一个新块的开始处都需要插入同步标识。数据块的格式如下:首先是一个指示数据块中字节数的字段;紧接着是4个压缩字段(键长度、键、值长度 和值)。

blob.png 

图4-3.采用块压缩方式之后,顺序文件的内部结构

转载请注明:全栈大数据 » 4.5.1关于 SequenceFile

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址