整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

4.3.3. 实现定制的Writable集合

hadoop 花牛 7℃ 0评论

Hadoop有一套非常有用的Writable实现可以满足大部分需求,但在些情况下,我们需要根据自己的需求构造一个新的实现。有了定制的Writable类型,就可以完全控制二进制表示和排序顺序。由于Writable 是MapReduce数据路径的核心,所以调整二进制表示能对性能产生显著效果。虽然Hadoop自带的Writable实现已经过很好的性能调优,但如果希望将结构调整得更好,更好的做法往往是新建一个Writable类型(而不是组合打包的类型)。

为了演示如何新建一个定制的Writable,我们写一个表示一对字符串的实现,名为TextPair。范例4-7显示了最基本的实现。

范例4-7.存储一对Text对象的Writable

import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;
    public TextPair() {
        set(new Text(), new Text());
    }
    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }
    public TextPair(Text first, Text second) {
        set(first, second);
    }
    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }
    public Text getFirst() {
        return first;
    }
    public Text getSecond() {
        return second;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }
    @Override
    public void readFields(Datalnput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }
    @Override
    public int hashCode() {
        return first.hashCode()*163 + second.hashCode();
    }
    @Override
    public boolean equals(Object o) {
        if (o instanceof TextPair) {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }
    @Override
    public String toString(){
        return first + "\t" + second;
    }
    @Override
    public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

这个定制Writable实现的第一部分非常直观:包括两个Text实例变量(first和second)和相关的构造函数,以及setter和getter(即设置函数和提取函数)。所有Writable实现都必须有一个默认的构造函数以便MapReduce框架可以对它们进行实例化,然后再调用readFields()函数査看(填充)各个字段的值。Writable实例是可变的并且通常可以重用,所以应该尽量避免在write()或readFields()方法中分配对象。

通过让Text对象自我表示,TextPair类的write()方法依次将每个Text对象序列化到输出流中。类似的,通过每个Text对象的表示,readFields()方法对来自输入流的字节进行反序列化。DataOutput和 Datalnput接口有一套丰富的方法可以用于对Java基本类型进行序列化和反序列化,所以,在通常情况下,你可以完全控制Writable对象在线上传输/交换(的数据)的格式(数据传输格式)。

就像针对Java语言构造的任何值对象那样,需要重写javalang.Object 中的 hashCode()、equals()和 toString()方法。HashPartitioner (MapReduce中的默认分区类)通常用hashCode()方法来选择reduce分区,所以应该确保有一个比较好的哈希函数来保证每个reduce分区的大小相似。

即便计划结合使用TextOutputFormat和定制的Writable,也得自己动手实现toString()方法。TextOutputFormat对键和值调用toString()方法,将键和值转换为相应的输出表示。针对TextPair,我们将原始的Text对象作为字符串写到输出,各个字符串之间用制表符来分隔。

TextPair 是 WritableComparable 的一个实现,所以它提供了 compareTo() 方法,该方法可以强制数据排序:先按照第一个字符排序,如果第一个字 符相同则按照第二个字符排序。注意,前一小节中已经提到TextPair不同于TextArrayWritable(可存储的Text对象数组除外),因为 TextArrayWritable 只继承了 Writable,并没有继承 WritableComparable

为速度实现一个RawComparator

范例4-7中的TextPair代码可以按照其描述的基本方式运行,但我们也可以进一步优化。当TextPair被用作MapReduce中的键时,需要将数据流反序列化为对象,然后再调用compareTo()方法进行比较。那么有没有可能看看它们的序列化表示就可以比较两个TextPair 对象呢?

事实证明,我们可以这样做,因为TextPair是两个Text对象连接而成的,而Text对象的二进制表示是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传给 Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。详细过程参见范例4-8(注意,这段代码已嵌入TextPair类)。

范例4-8.用于比较TextPair字节表示的RawComparator

public static class Comparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator());
    public Comparator() {
        super(TextPair.class);
    }
    @Override
    public int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
            int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
            if (cmp != 0) {
                return cmp;
            }
            return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,b2, s2 + firstL2, l2 - firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }
}
static {
    WritableComparator.define(TextPair.class, new Comparator()));
}

事实上,我们采取的做法是继承WritableComparable类,而非实现RawComparator接口,因为它提供了一些比较好用的方法和默认实现。这段代码最本质的部分是计算firstL1和firstL2,这两个参数表示每个字节流中第一个Text字段的长度。两者分别由变长整数的长度(由WritableUtils 的 decodeVIntSize()方法返回)和编码值(在readVInt()方法返回)组成。

定制的comparator

从TextPair可以看出,编写原始的comparator需要谨慎,因为必须要处理字节级别的细节。如果真的需要自己编写comparator,有必要参考org.apache.hadoop.io包中对Writable接口的实现。WritableUtils提供的方法也比较好用。

如果可能,定制的comparator也应该继承自RawComparator。这些comparator定义的排列顺序不同于默认comparator定义的自然排列顺序。范例4-9显示了一个针对TextPair类型的comparator ,称为FirstCompartator,它只考虑TextPair对象的第一个字符串。注意,我们重载了针对该类对象的compare()方法,使两个compare()方法有相同的语法。

范例4-9.定制的RawComparator用于比较TextPair对象字节表示的第一个字段

public static class FirstComparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    public FirstComparator() {  super(TextPair.class); }
    @Override
    public int compare(byte[] bl, int si, int 11,byte[] b2, int s2, int 12) {
        try {
            int firstL1= Writablelltils.decodeVIntSize(b1[s1]) + readVInt (b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt (b2, s2);
            return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        if (a instanceof TextPair && b instanceof TextPair) {
            return ((TextPair) a).first.compareTo(((TextPair) b).first);
        }
            return super.compare(a, b);
        }
}

转载请注明:全栈大数据 » 4.3.3. 实现定制的Writable集合

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址