整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

16.5.4 Cascading 实战

hadoop 花牛 9℃ 0评论

现在我们知道Cascading是什么,清楚地了解它是如何工作的,但是用 Cascading写的应用程序是什么样子呢?我们来看看范例16-2

Scheme sourceScheme = 
    new TextLine(new Fields("line")); ①
Tap source =
    new Hfs(sourceScheme, inputPath); ②
Scheme sinkScheme = new TextLine(); ③
Tap sink =
    new Hfs(sinkScheme, outputPath, SinkMode.REPLACE); ④
Pipe assembly = new Pipe("wordcount"); ⑤
String regexString = "(?< !  \\pL) ( ? = \\pL) [^ ]*( ?< = \\pL) (?!  \\pL)";
Function regex = new RegexGenerator(new Fields("word"), regexString); 
assembly =
    new Each(assembly, new Fields("line"), regex); ⑥
assembly =
    new GroupBy(assembly, new Fields("word")); ⑦
Aggregator count = new Count(new Fields("count")); 
assembly = new Every(assemblyJ count); ⑧
assembly =
new 6roupBy(assembly, new Fields("count"),new Fields("word")); ⑨
FlowConnector flowConnector = 
    new FlowConnector();
Flow flow =
    flowConnector.connect("word-count", source, sink, assembly); ⑩
flow.complete();⑩

①创建一个新的Scheme对象读取简单的文本文件,为每一行名为 "line"字段(Fields实例声明)输出一个新的Tuple对象。

②创建源和目标Tap实例分别指向输入文件和

③创建一个新的Scheme对象用于写简单文本文件,并且它期望输出的是 一个具有任意多个字段/值的Tuple对象。假如有多个值要输出,这些 值在输出文件里将以制表符分隔。

④输出目录。目标Tap对象输出数据时将覆盖目录下现有的文件。

⑤构建管道装配线的头,并把它命名为“wordcount”。这个名称用于绑 定源及目标数据对象到这个管道处理流程。多个头或尾要求必须有自己 唯一的名称。

⑥构建Each类型管道,它将解析line字段里的每个词,把解析结果放 入一个新的Tuple对象。

⑦构建GroupBy管道,它将创建一个新的Tuple组,实现基于word

段的分组。

⑧构建一个具有Aggregator操作的Every类型管道,它将对基于不同 词的分组Tuple对象分別进行字数统计。统计结果存于count的字段里。

⑨构建GroupBy类型管道,它将根据数值对count字段进行分组,形成 新的Tuple分组,然后对word字段值进行二级排序。结果是一组基于 count字段值升序排列的count字段值和word字段值列表。

⑩用Flow对象把管道装配线和数据源及目标联系起来,然后

⑩ 在集群上执行这个Flow

在这个例子里,我们统计输人文件中的不同单词的数量,并根据它们的自 然序(升序)进行排序。假如有些词的统计值相同,这些词就根据它们的自然 顺序(字母序)排序。

这个例子有一个明显的问题,即有些词可能会有大写字母,例如“the”和 “The”,当它出现在句首的时候就是“The”。因此我们可以插入一个新 的操作来强制所有单词都转换为小写形式,但是我们意识到那些需要从文 档中解析词语的所有的应用将来都需要做同样的操作,因此我们决定创建 一个可重用的管道SubAssembly,如同我们在传统应用程序中创建一个子 程序一样(参见范例16-3)

范例 16-3创建一个 SubAssembly

public class ParseWordsAssembly extends SubAssembly ①
{
   public ParseWordsAssembly(Pipe previous)
   {
      String negexString = "( ?< !\\pL) (? = \\pL) [^ ]*( ?< =\\pL) (? !\\pL)";
      Function regex = new RegexGenerator(new Fields("word"), regexString); previous = new Each(previous, new Fields("line"), regex);
      String exprString = "word.toLowerCase()";
      Function expression =
      new ExpressionFunction(new Fields("word"), exprString,String.class); ②
      previous = new Each(previous, new Fields("word"), expression); setTails(previous); ③
   }
}

①声明SubAssembly是子类,它本身是一种管道类型。

②创建一个Java的表达式函数,它将调用toLowerCase()方法来处理 “word”字段对应的字符串类型值。我们要传入一个Java类型来指明 “word”字段处理之后所期望的类型,这里是String类型。后台用 Janino(h(p.7/www.yflm.«o./ie^〇来编译。

③我们必须告知SubAssembly的父类这个管道子组件在哪里结束。

首先,我们新建一个SubAssembly类,它管理我们的“解析词”管道装配 线。因为这是一个Java类,所以可用于其他任何应用程序,当然这要求它 们处理的数据中有word字段(范例16-4)。注意,也有办法可以使这个函数 更加通用,这些方法在《Cascading用户手册》中都有介绍。

范例16-4.用一个SubAssembly扩展单词计数和排序

Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new Hfs(sourceSchemGj inputPath);
Scheme sinkScheme = new TextLine(new Fields("word", "count"));
Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);
Pipe assembly = new Pipe("wordcount");
assembly =
new ParseWordsAssembly(assembly); ①
assembly = new GroupBy(assembly, new Fields("word"));
Aggregator count = new Count(new Fields("count")); 
assembly = new Every(assembly> count);
assembly = new GroupBy(assembly, new Fields("count"), new Fields("word")); 
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connectf"word-count", source, sink, assembly);
flow.complete();

①我们用ParseWordsAssetnbly管道组件替换了之前例子中的Each 类型管道。最后,我们只是用新的SubAssembly类型子管道替代 了前面Every类型管道和单词解析函数。有必要的话,还可以继 续进行更深入的嵌套处理。

转载请注明:全栈大数据 » 16.5.4 Cascading 实战

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址