整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

12.8.2 写 UDAF

hadoop 小红牛 8℃ 0评论

聚集函数比普通的UDF难写。因为值是在块内进行聚集的(这些块可能分布
在很多map或reduce任务中),从而实现时要能够把部分的聚集值组合成最 终结果。实现此功能的代码最好用示例来进行解释。让我们来看一个简单
的UDAF的实现,它用于计算一组整数的最大值(范例12-4)。

范例12-4.计算一组整数中最大值的UDAF

package com.hadoopbook.hive; 
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; 
import org.apache.hadoop.io.IntWritable;
public class Maximum extends UDAF {
    public static class MaximumlntUDAFEvaluator implements UDAFEvaluaton {
        private IntWritable result;
        public void init() { 
            result = null;
        }
        public boolean iterate(IntWritable value){
            if (value == null) {
                return true;
            if (result == null) {
                result = new IntWritable(value.get());
            } else {
                result.set(Math.max(result.get(), value.get()));
            }
            return true;
            }
        public IntWritable terminatePartial() {
            return result;
        }
        public boolean merge(IntWritable other) { 
            return iterate(other);
        }
        public IntWritable terminate { 
            return result;
        }
    }
}

这个类的结构和UDF的稍有不同。UDAF必须是org.apache.hadoop.
hive.ql.exec.UDAF(注意UDAF中的“A”)的子类,且包含一个或多个 嵌套的、实现了
org.apache.hadoop.hive.ql.UDAFEvaluator 的静态
类。在这个示例中,只有一个嵌套类,MaximumlntUDAFEvaluator。但是
我们也可以添加更多的计算函数(如MaximumLongUDAFEvaluator和
MaximumFloatUDAFEvaluator)来提供计算长整型、浮点型等类型数最大 值的UDAF的重载。

一个计算函数必须实现下面5个方法(处理流程如图12-4所示)。


init()方法 init()方法负责初始化计算函数并重设它的内部状态。在
MaximumlntUDAFEvaluator中,我们把存放最终结果的 IntWritable对象设为null。我们使用null来表示目前还没有
对任何值进行聚集计算,这和对空集NULL计算最大值应有的结果是一致的。

图片.png 

图12-4.包含UDAF部分结果的数据流

•iterate()方法每次对一个新值进行聚集计算时都会调用iterate()
方法。计算函数要根据聚集计算的结果更新其内部状态。 iterate()接受的参数和Hive中被调用函数的参数是对应的。在
这个示例中,只有一个参数。方法首先检査参数值是否为null, 如果是,则将其忽略。否则,resul变量实例就被设为value的整
数值(如果这是方法第一次接受输入),或设为当前值和value值中 的较大值(如果已经接受一些值)。如果输入值合法,我们就让方法 返回true0

•terminatePartial()方法 Hive需要部分聚集结果时会调用 terminatePartial()方法。这个方法必须返回一个封装了聚集计 算当前状态的对象。在这里,因为只需要对已知的最大值或在没有

值时的空值null进行封装,所以使用一个intWritable即可。

•merge()方法在Hive决定要合并一个部分聚集值和另一个部分聚
集值时会调用merge()方法。该方法接受一个对象作为输入。这个对 象的类型必须和terminatePartial()方法的返回类型一致。在这
个示例里,merge()方法可以直接使用iterate()方法,因为部分 结果的聚集和原始值的聚集的表达方法是相同的。但一般情况下不
能这样做(我们后面会看到更普遍的示例),这个方法实现的逻辑会 合并计算函数和部分聚集的状态。

•terminate()方法 Hive需要最终聚集结果时会调用terminate()方法。计算函数需要把状态作为一个值返回。在这里,我们返回实 例变量result。

现在让我们来执行这个新写的函数:

hive> CREATE TEMPORARY FUNCTION maximum AS’com.hadoopbook.hive.Maximum’;
hive> SELECT maximum(temperature) FROM records;
111

1.—个更复杂的UDAF

前面的示例有一个特别的现象:部分聚集结果可以使用和最终结果相同的 类型(IntWritable)来表示。对于更复杂的聚集函数,情况并非如此。考虑 一个计算一组double类型值均值的UDAF,就可以看出这一点。从数学角 度来看,要把两个部分的均值合并成最终的均值是不可能的(参见2.4.2节

combiner函数的讨论)。作为替代,我们可以用一个数对 目前已经处理过的double值的累积和,以及目前已经处理过的数的个数一来表示部分 聚集结果。

这个思路在UDAF中的实现如范例12-5所示。注意,部分聚集结果用一个
嵌套的静态类struct实现,类名是PartialResult,由于我们使用了 Hive
能够处理的字段类型(Java原子数据类型),所以Hive足够“聪明”,能够 自己对这个类进行序列化和反序列化。

范例12-5.计算一组double值均值的UDAF

package com.hadoopbook.hive;
 
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
public class Mean extends UDAF{
    public static class MeanDoubleUDAFEvaluator implements UDAFEvaluator {
        public static class PartialResult {
            double sum; 
            long count;
    }
    private PartialResult partial;
    
    public void init() { 
        partial = null;
    }
    public boolean iterate(DoubleWritable value) { 
     if (value == null) { 
            return true;
        }
        if (partial == null) {
            partial = new PartialResult();
        }
        partial.sum += value.get();
        partial.count++;
        return true;
    }
    
    public PartialResult terminatePartial() {
        return partial;
    }
    public boolean merge(PartialResult other) { 
        if (other == null) { 
            return true;
        }
        if (partial == null) { 
            partial = new PartialResult();
        }
        partial.sum += other.sum;
        partial.count += other.count; 
        return true;
    }
    public DoubleWritable terminate() { 
        if (partial == null) {
            return null;
        }
        return new DoubleWritable(partial.sum / partial.count);
        }
    }
}

在这个示例中,merge()方法和iterate()方法不同,因为它把“部分
和”(partial sum)和“部分计数值”(partial count)分别进行成对的加法合
并。此外,terminatePartial()的返回类型为PartialResult,这个类
型当然不会给调用函数的用户看到,terminate()的返回类型则是最终用户 可以看到的DoubleWritable。


转载请注明:全栈大数据 » 12.8.2 写 UDAF

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址