整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

8.3.2. reduce 端连接

hadoop 小红牛 26℃ 0评论


由于reduce端连接并不要求输入数据集符合特定结构,因而reduce端连接比map端连接更为常用。但是,由于两个数据集均需经过MapReduce的
shuffle过程,所以reduce端连瘘的效率往往要低一些。基本思路是mapper
为各个记录标记源,并且使用连接键作为map输出键,使键相同的记录放 在同一reducer中。以下技术能帮助实现reduce端连接。

 1.多输入

数据集的输入源往往有多种格式,因此可以使用Multiplelnputs类(参见 7.2.4节)来方便地解析和标注各个源。

   2. 辅助排序

如前所述,reducer将从两个源中选出键相同的记录且并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个源的数据传输
到reducer会非常重要。以天气数据连接为例,当天气记录发送到reducer
的时候,与这些记录有相同键的气象站信息最好也已经放在reducer,使得
reducer能够将气象站名称填到天气记录之中再马上输出。虽然也可以不指 定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这
种情况,因为其中任何一组的记录数量可能非常庞大,远远超出reducer的 可用内存容量。

8.2.4节介绍如何对reducer所看到的每个键的值进行排序,所以在此也用到了辅助排序技术。

我们使用第4章的TextPair类构建组合键,包括气象站ID和“标记”。 在这里,“标记”是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达。一种简单的做法就是:对于气象站记录,“标记”值为0;对于天气记录,“标记”值为1。范例8-12和范例8-13分别描述了执行该任务的两个mapper类。

范例8-12.在reduce端连接中,标记气象站记录的mapper

public class JoinStationMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
    private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
    @0verride
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (parser.parse(value)) {
            context.write(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName()));
        }
}
}

范例8-13.在reduce端连接中标记天气记录的mapper
public class JoinRecordMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
    private NcdcRecordParser parser = new NcdcRecordParser();
     
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value);
        context.write(new TextPair(parser.getStationId(), "1"), value);
    }
}

reducer知道自己会先接收气象站记录。因此从中抽取出值,并将其作为后续每条输出记录的一部分写到输出文件。如范例8-14所示。

范例8-14.用于连接已标记的气象站记录和天气记录的reducer

public class JoinReducer extends Reducer<TextPair, Text, Text, Text> {
    @Override
    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Iterator<Text> iter = values.iterator();
        Text stationName = new Text(iter.next());
        while (iter.hasNext()) {
            Text record = iter.next();
            Text outvalue = new Text(stationName.toString() + "\t" + record.toString());
            context.write(key.getFirst(), outvalue);
        }
}
}

上述代码假设天气记录的每个气象站ID恰巧与气象站数据集中的一条记录准确匹配。如果该假设不成立,则需要泛化代码,使用另一个TextPair将标记放入值的对象中。reduce()方法在处理天气记录之前,要能够区分哪些记录是气象站名称,检测(和处理)缺失或重复的记录。

reducer的迭代部分中:对象被重复使用(为了提髙效率)。因此,从第一个Text对象获得点名称(即stationName)就非常关键。

Text stationName = new Text(iter.next());

如果不执行该语句,stationName就会指向上一条记录的值,这显然是错误的。

范例8-15显示了该作业的驱动类。在该类中,关键在于根据组合键的第一字段(即气象站ID)进行分区和分组,即使用一个自定义的partitioner(即KeyPartitioner)和一个自定义的分组 comparator(FirstComparator)作为 TextPair的嵌套类。

310-313

转载请注明:全栈大数据 » 8.3.2. reduce 端连接

喜欢 (1)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址