整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第十一章 关于Pig

                                                                                                                        第十一章

                                                                                      关于Pig

Pig为大型数据集的处理提供了更高层次的抽象。MapReduce使作为程序员 的你能够自己定义一个map函数和一个紧跟其后的reduce函数。但是,你 必须使你的数据处理过程与这一连续的mapreduce模式相匹配。很多时 候,数据处理需要多个MapReduce过程才能实现。而使得数据处理过程与 该模式匹配可能很困难。有了 Pig,就能使用更为丰富的数据结构。这些数 据结构往往都是多值和嵌套的。Pig还提供了一套更强大的数据变换操作, 包括在MapReduce中被忽视的连接(join)操作。

Pig包括两部分。

(1) 用于描述数据流的语言,称为Pig Latin

(2) 用于运行Pig Latin程序的执行环境。当前有两个环境:单JVM中 的本地执行环境和Hadoop集群上的分布式执行环境。

Pig Latin 程序由一系列的“操作”operation)“变换”transformation) 成。每个操作或变换对输入进行数据处理,并产生输出结果。从整体上 看,这些操作描述了一个数据流。Pig执行环境把数据流翻译为可执行的内 部表示并运行它。在Pig内酣,这些变换操作被转换成一系列MapReduce 作业。但作为程序员,你多数情况下并不需要知道这些转换是如何进行 的。这样一来,你便可以将精力集中在数据上,而非执行细节上。

Pig是一种探索大规模数据集的脚本语言。MapReduce的一个缺点是开发周 期太长。写mapperreducer,对代码进行编译和打包,提交作业,获取结 果,这整个过程非常耗时。即便使用Streaming能在这一过程中去除代码的 编译和打包步骤,仍不能改善这一情况。Pig的诱人之处在于仅用控制台上 的五六行Pig Latin代码就能够处理TB级的数据。事实上,正是由于雅虎 公司想让科研人员和工程师能够更便捷地挖掘大规模数据集,才设计开发PigPig提供了多个命令来检査和处理程序中已有的数据结构。因此, 它能够很好地支持程序员撰写查询。Pig的一个更有用的特性是它支持在输 入数据的一个有代表性的子集上试运行。这样一来,用户可以在处理整个数 据集前检査程序执行时是否会有错误。

Pig被设计为可扩展的。处理路径中的几乎每个部分,包括载入、存储、过 滤、分组、连接,都可以定制。这些操作都可以使用用户定义函数(userdefined function, UDF) 进行修改。这些用户定义函数作用于 Pig的嵌套数 据模型。因此,它们可以在底层与Pig的操作集成。UDF的另一个好处 是,相较于为了写MapReduce程序而开发库,它们更易于重用。

然而,Pig并不适合所有的数据处理任务。和^ MapReduce —样,它是为数据批处理而设计的。如果想执行的查询只涉及一个大型数据集中的一小部 分数据,Pig的表现并不会很好。这是因为它要扫描整个数据集或其中的很大一部分。

在有些情况下,Pig的表现不如MapReduce程序。但随着新版本的发布, Pig的开发团队使用了复杂、精巧的算法来实现Pig的关系型操作,二者的 差距不断缩小。公平地说,除非你愿意花大量时间来优化Java MapReduce 程序,否则使用Pig Latin来写查询的确能够帮你节约时间。

本章只介绍如何使用Pig的基础内容。要获得更详细的指南,可以参考 Alan Gates Pig 程序设计Programming Pig),网址为 http://shop.oreilly.com/ product/0636920018087.do

11.1安装与运行Pig

Pig是作为一个客户端应用程序运行的。即使准备在Hadoop集群上运行 Pig也无需在集群上额外安装什么东西:Pig从工作站上发出作业,并在 工作站上和HDFS(或其他Hadoop文件系统)进行交互。

Pig的安装很简单。安装前需要有Java6(在Windows上安装时,还需要 Cygwin)。从http://pig.apache.org/releases.html下载一个稳定版本,然后把 tar压缩包解压到工作站上的合适路径:

% tar xzf pig-x.y.z.tar.gz

Pig的二进制文件路径添加到命令行路径也很方便,例如:


% export PIG_INSTALL=/home/tom/pig-x.y.z

% export PATH=$PATH:$PIG_INSTALL/bin

你还需要设置]AVA_HOME环境变量,以指明Java的安装路径。

输入pig -help可获得使用帮助。

11.1.1执行类型

Pig有两种执行类型或称为模式(mode):本地模式(local mode)MapReduce 模式(MapReduce mode)。

1. 本地模式

在本地模式下,Pig运行在单个JVM中,访问本地文件系统。该模式只适 用于处理小规模数据集或试用Pig时。

执行类型可用-x-exectype选项进行设置。如果要使用本地模式运行, 应把该选项设置为local:

% pig -x local

grunt>

这样就能够启动GmntGrunt是与Pig进行交互的外壳程序(shell)。稍后我 们要对它进行详细讨论。

2. MapReduce 模式

MapReduce模式下,Pig将査询翻译为MapReduce作业,然后在Hadoop 集群上执行。集群可以是伪分布的,也可以是全分布的。如果要用Pig 理大规模数据集,应该使用(全分布集群上的)MapReduce模式。

要使用MapReduce模式,你首先需要检査下载的Pig版本是否与正在使用 Hadoop版本兼容。Pig发布版本只和特定的Hadoop版本对应。发行说 明中记录了版本的对应关系。

Pig根据HADOOPJHOME环境变量来寻找并运行对应的Hadoop客户端。但 是,如果该环境变量没有被设置过,那么Pig会运行捆绑在Pig中的 Hadoop库。注意,捆绑的库的版本可能和集群上的Hadoop版本不一样。 所以,最好明确指定hadoop_home.


然后,需要将Pig指向集群的namenodejobtracker。如果安装在 HADOOP_HOME中的Hadoop已经进行了设置,那么不需要进行额外的配置。否则,可以把HADOOP_CONF_DIR设置为一个目录,其中包含 fs.default.name  mapred.job.tracker定义的 Hadoop站点文件。

另一种办法是,在Pigconf目录(或由PIG_CONF_DIR指定的目录)下的 pig.properties文件中设置这两个属性。下面的示例是为伪分布集群进行

设置:

fs.default.name=hdfs://localhost/ mapred.job.tracker=localhost:8021

一旦设置好PigHadoop集群的连接,就可以设置x选项为mapreduce 或忽略该选项来运行Pig可以忽略该选项的惊因是MapReduce模式是Pig 的默认执行模式:

% pig

201201-18 20:23:05,764 [main] INFO org.apache.pig.Main – Logging error message

s to: /private/tmp/pig_l326946985762.log

201201-18 20:23:06,009 [main] INFO

org.apache.pig.backend.hadoop.executionengi

ne.HExecutionEngine – Connecting to hadoop file system at: hdfs://localhost/

201201-18 20:23:06,274 [main] INFO

org.apache.pig.backend.hadoop.executionengi

ne.HExecutionEngine – Connecting to map-reduce job tracker at: localhost:8021 grunt>

从输出中可以看到,Pig报告了它所连接的文件系统和jobtracker

11.1.2运行Pig程序

有三种执行Pig程序的方法。它们在本地和MapReduce模式下都适用。

1. 脚本

Pig可以运行包含Pig命令的脚本文件。例如pig script, pig运行在本 地文件中的命令。或者,对于很短的脚本,也可以使用e选项直

接在命令行中以字符串形式输入脚本。

2. Grunt

Grunt是运行Pig命令的交互式外壳环境(shell)。如果没有指明Pig要运行 的文件,而且也没有使用e选项,Pig就会启动GruntGrunt环境中, 还可以通过runexec命令运行Pig脚本。


3. 嵌入式方法 

还可以在Java中通过PigServer类运行Pig程序。这就像能够在java 使用JDBC运行SQL程序。如果要以编程的方式访问Grunt则需要使用 PigRunner

11.1.3 Grunt

Grunt包含的行编辑功能和GNU Readline(在bash外壳环境等命令行应用中 使用)类似。例如,组合键Ctrl+E将光标移到行末。Grunt也记录过去执行 过的命令。®可以使用Ctrl-PCtrl-N或上下光标键,回显命令历史缓存中 的上或下一行。

Grunt的另一个有用特性是自动补全机制。它能够在你按Tab键时试图自动 补全Pig Latin关键词和函数。例如,如果有如下未完成的命令行:

grunt> a = foreach b ge

此时如果按Tab键,那么ge会自动扩展成Pig Latin关键词generate:

grunt> a = foreach b generate

可以创建一个名为awrocowp/ere的文件并把它放在Pig的类路径(如Pig 目录的cow/目录中中或者启动Grunt的目录中,以此来定制自动补 全的单词。这个文件中每个单词占一行,且单词中不能出现空白字符。自 动补全的匹配是大小写敏感的。在这个文件中列出常用的文件路径特別有 用(因为Pig并不提供文件名自动补全)。在该文件中列出你创建的用户自定 义函数也能带来很多便利。

可以使用help命令列出命令列表。结束一个Grunt会话时,可以使用 quit命令退出。

11.1.4 Pig Latin 编辑器

PigPen是一个提供了 Pig程序开发环境的Eclipse插件。它包含Pig脚本编 辑器、示例生成器(等同于ILLUSTRATE命令)以及用于在Hadoop集群上运行脚本的按钮。它还提供一个操作图窗口,能够以图形方式显示脚本,将 ①历史保存在home目录下的文件中。


数据流可视化。完整的安装和使用说明请参见Pig维基页面:

https://cwiki.apache.org/confluence/display/PIG/PigTools 

还有一些为其他编辑器(包括VimTextMate)提供的Pig Latin语法高亮显

示工具。它们的详细说明可参见Pig的维基页面。

11.2示例

现在,让我们用Pig Latin写一个计算天气数据集中年度最高气温的程序(和 第2章用MapReduce写的程序功能相同),作为示例。这个程序只需要很少 几行代码:

–max_temp.pig: Finds the maximum temperature by year records = LOAD 'input/ncdc/micro-tab/sample.txt'

AS (year:chararray, temperature:int, quality:int); filtered_records = FILTER records BY temperature ! =9999 AND

(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality 9); grouped_records = GROUP filtered_records BY year; max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);

DUMP maxtemp;

为了看看这个程序都做了些什么事情,我们将使用PigGmnt解释器。它 让我们能够输入几行代码,然后通过交互来理解程序在做什么。在本地模 式下启动Grunt,然后输入Pig脚本的第一行

grunt> records =LOAD 'input/ncdc/micro-tab/sample.txt'

>> AS (yearrchararray, temperature:int, quality:int);

为了简单起见,程序假设输入是由制表符分割的文本,每行只包含年度、 气温和质量三个字段。(事实上,如后文所述,Pig的输入格式处理能力比 这个灵活得多)。这行代码描述我们要处理的输入数据。year:chararray 描述字段的名称和类型。chararrayJava字符串类似;intJava int类似。LOAD操作接受一个URI参数作为输入。在这个示例中,我们 只使用了一个本地文件。在LOAD中,我们也可以引用HDFS URI(可选 )AS子句设定了字段的名称,以便在随后的语句中更方便地引用它们。

LOAD操作的结果和Pig Latin其他所有操作的结果一样,都是一个关系 (relation),即一个元组集合。一个元组类似于数据库表中的一行数据,包含 按照特定顺序排列的多个字段。在这个示例中,LOAD函数根据输入文 件,生成一个(年份,气温,质量)元组的集合。我们把关系写成每个元组一


行,每行由括号括起,每项字段由逗号分隔:

(1950,0,1)

(1950.22.1)

(1950,-11,1)

(1949.111.1)

关系被赋予名称或别名(alias)以便于引用。这个关系的别名是records 们可以使用DUMP操作来查看某个别名所对应关系的内容:

grunt> DUMP records;

(1950,0,1)

(1950.22.1)

(1950,-11,1)

(1949.111.1)

(1949.78.1)

我们还可以把DESCRIBE操作作用于一个关系的别名,来査看该关系的结 构,即关系的模式(schema):

grunt> DESCRIBE records;

records: {year: chararray,temperature: int,quality: int}

从输出可以知道,records有三个字段,别名分别为year, temperature quality字段的别名是我们在AS子句中指定的。同样,字段的类型也 是在AS子句中指定的。我们在后面会对Pig中的数据类型进行更深入的介绍。

第二条语句去除了没有气温的记录(即用值9999表示气温的记录)以及读数 质量不令人满意的记录。在这个很小的数据集中,并没有记录被过滤掉:

grunt> filtered_records = FILTER records BY temperature ! = 9999 AND >> (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); grunt> DUMP filtered__records;

(1950,0,1)

(1950.22.1)

(1950,-11,1)

(1949.111.1)

(1949.78.1)

第三条语句使用GROUP函数把records关系中的记录按照year字段分组。让我们用DUMP査看GROUP的结果:

grunt grouped^records = GROUP filtered_records BY year; grunt> DUMP grouped_records;

(1949,{(1949,111,1)7(1949,78,1)})

(1950,{(1950,0,1),(1950,22,1),(1950,-11,1)})


这样,我们就有了两行,或叫两个元组。每个元组对应于输入数据中的一个年度。每个元组的第一个字段是用干进行分组的字段(即年度)。第二个字 段是该年度的元组的包(bag)。(bag)是元组的无序集。在Pig Latin里,包 用大括号表示。

通过上述分组方式,我们已经为每个年度创建了一行。剩下的事情就是在 每个包中找到包含最高气温的那个元组。在做这件事情之前,让我们先来 理解grouped_records这一关系的结构:

grunt> DESCRIBE grouped_records;

grouped—records: {group: chararray,filtered_records: {year: chararray, temperature: int,quality: int}}

从输出结果可以看到,Pig给分组字段起十别名group第二个字段和被分 组的filtered_ecords关系的结构相同。根据这些信息,我们可以试着 执行第四条语句对数据进行变换:

grunt> max_temp =FOREACH grouped__records GENERATE group,

>> MAX(filtered_records.temperature);

FOREACH对每一行数据进行处理,并生成一组导出的行。导出的行的字段 GENERATE子句定义。在这个示例中,第一个字段是group,也就是年 度。第二个字段稍微复杂一点。filtered_records.temperature引用了 grouped_records 关系中的 filtered_recor'ds 包中的 temperature  段。MAX是计算包中字段的最大值的内置函数。在这个例子中,它计算了 filtered_records包中的最高温度。我们看•下它的结果:

gruntDUMP max__temp;

(1949,111)

(1950,22)

这样,我们便已成功计算出每年的最高气温。

生成示例

在这个示例中,我们已经使用了一个仅包含少数行的抽样数据集,以简化 数据流跟踪和调试。创建一个精简的数据集是一门艺术。理想情况下,这 个数据集的内容应该足够丰富,能够覆盖查询中可能碰到的各种情况(满足 完备性[completeness]条件);同时,这个数据集应该足够小,能够被程序员 直观理解(满足简明性[conciseness]条件)。通常情况下,使用随机取样并不能满足要求,因为连接和过滤这两个操作往往会去除掉所有的随机取样的数据,而导致产生一个空的结果集。这样是无法描述典型的数据流的。

Pig通过ILLUSTRATE操作,提供了生成相对完备和简明的数据集的工具。下面列出的是运行ILLUSTRATE后的输出(进行了少许格式重排):

image.png

注意,Pig既使用了部分的原始数据(这对于保持生成数据集的真实性很重 要),也创建了一些新的数据。Pig注意到査询中9999这一值,所以创建了 一个包含该值的元组来测试FILTER语句。

综上所述,ILLUSTRATE的输出本身易于理解,而且也能帮助你理解查询的执行过程。

11.3与数据库进行比较

我们已经展示了 Pig如何运行。看上去,Pig LatinSQL很相似。GROUP BYDESCRIBE这样的操作更加强了这种感觉。但是,两种语言之间, 以及Pig和关系数据库管理系统(RDBMS)之间,有几个方面是不同的。

它们之间最显著的不同是:Pig Latin是一种数据流编程语言,而SQL– 种声明式编程语言。换句话说,一个Pig Latin程序是一组针对输入关系的一步步操作。其中每一步都是对数据的简单变换。相反,SQL语句是一个

约束的集合。这些约束结合在一起,定义了输出。从很多方面看,用Pig Latin编程更像在RDBMS中查询规划器(query planner)这一层对数据进行操作。査询规划器决定了如何将声明式语句转化为一系列系统化执行的步骤。 RDBMS把数据存储在严格定义了模式的表内。Pig对它所处理的数据要求 则宽松得多:你可以在运行时定义模式,而且这是可选的。本质上,Pig 以在任何来源的元组上进行操作(当然,数据源必须支持并行的读操作,例如存放在多个文件中)。它使用UDF从原始格式中读取元组。最常用的输入格 式是用制表符分隔的字段组成的文本文件。Pig为这种输入格式提供了内置加 载函数。和传统的数据库不同,Pig并不提供专门的数据导入过程将数据加载 RDBMS从文件系统(通常是HDFS)中加载数据是处理的第一个步骤。

Pig对复杂、嵌套数据结构的支持也使其不伺于只能处理平面数据类型的 SQLPig的语言能够和UDF以及流式操作紧密集成Pig Latin的这一能 力及其嵌套数据结构,使它比大多数SQL的变种具有更强的定制能力。

RDBMS具有一些支持在线和低延迟查询的特性,如事务和索引。而这些特 性是Pig所缺乏的。如前所述,Pig并不支持随机读和几十毫秒级别的査 。它也不支持针对一小部分数据的随机写。和MapReduce —样,所有 的写都是批量的、流式的写操作。

Hive(在第12章中介绍)介于Pig和传统的RDBMS之间。和Pig—样,Hive 也被设计为用HDFS作为存储。但是它们之间有着显著的区别。Hive的査 询语言HiveQL是基于SQL的。任何熟悉SQL的人都可以轻松使用 HiveQL写查询。和RDBMS相同,Hive要求所有数据必须存储在表中,而 表必须有模式,且模式由Hive进行管理。但是,Hive允许为预先存在于 HDFS的数据关联一个模式。所以,数据的加载步骤是可选的。和Pig 样,Hive也不支持低延迟査询。

11.4 Pig Latin

本节对Pig Latin编程语言的语法和语义进行非正式的介绍/ 本小节并不是完整的编程语言参考,但是这里的内容足以帮助大家很好地理解Pig Latin 的组成。

11.4.1结构

一个Pig Latin程序由一組语句构成。一个语句可以理解为一个操作,或一个命令。®例如,GROUP操作是这样一种语句:

grouped_records = GROUP records BY year;

另一个语句的例子是列出Hadoop文件系统中文件的命令:

Is /

如前面的GROUP语句所示,一条语句通常用分号结束。实际上,那是一条 必须用分号表示结束的语句。如果省略了分号,它会产生一个语法错误。 而另一方面,丨s命令可以不用分号结束。一般的规则是:在Grunt中交互使 用的语句或命令不需要表示结束的分号。这包括交互式的Hadoop命令以及 用于诊断的操作,例如DESCRIBE加上表示结束的分号总是不会错。因此,如果不确定是否需要分号,把它加上是最简单的解决办法。

必须用分号表示结束的语句可以被分割成多行以便于阅读:

records = LOAD 'input/ncdc/micro-tab/sample.txt'

AS (year:chararray, temperature:int, quality:int);

Pig Latin有两种注释方法。双减号表示单行注释。Pig Latin解释器会忽略 从第一个减号开始到行尾的所有内容:

–My program

DUMP A; — What’s in A?

C语言风格的注释更灵活。这是因为它使用/**/符号表示注释块的开始和 结束。这样,注释既可以跨多行,也可以内嵌在某一行内:

/*

Description of my program spanning

multiple lines.

*/

A = LOAD ,input/pig/join/A,;

B = LOAD ’input/pig/join/B’;

C = JOIN A BY $0, /* ignored */ B BY $1;

DUMP C;

Pig Latin有一个关键词列表。其中的单词在Pig Latin中有特殊含义,不能 用作标识符。这些单词包括操作(LOAD, ILLUSTRATE)、命令(cat, Is)、 表达式(matches, FLATTEN)以及函数(DIFF, MAX)。它们会在随后的几个小节进行介绍。

Pig Latin采用混合的大小写敏感规则。操作和命令是大小写无关的(这样能 使得交互式操作更“宽容”),而别名和函数名是大小写敏感的。

11.4.2 语句 

Pig Latin程序执行时,每个命令按次序进行解析。如果遇到句法错误或 其他(语义)错误,例如未定义的别名,解释器会终止运行,并显示一条错误 消息。解释器会给每个关系操作建立一个“逻辑计划logical plan)。逻辑 计划构成了 Pig Latin程序的核心。解释器把为一个语句创建的逻辑计划加 到到目前为止已经解析完的程序的逻辑计划上,然后继续处理下一条语句。

特别需要注意的是,在构造整个程序的逻辑计划时,Pig并不处理数据。我 们仍然以前面的Pig Latin程序为例:

–max_temp.pig: Finds the maximum temperature by year records = LOAD ^ input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int); filtered__records = FILTER records BY temperature != 9999 AND »

(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); gnouped_records = GROUP filtened^records BY year; max_temp = FOREACH grouped_records GENERATE group,

MAX(filtered_records.temperature);

DUMP max_temp;

Pig Latin解释器看到第一行LOAD语句时,首先确认它在语法和语义上是 正确的,然后再把这个操作加入逻辑计划。但是,解释器并不真的从文件 加载数据(它甚至不去检査该文件是否存在)Pig到底要把文件加载到哪里 呢?数据是加载到内存吗?使这些数据可以放入内存,Pig又如何处理数 据呢?我们可能并不需要所有的数据(因为后续的语句可能会过滤数据),因 此加载数据没有意义。关键问题在于,在没有定义整个数据流之前,开始 任何处理都是没有意义的。与此类似,Pig验证GROUPFOREACH


GENERATE语句,并把它们加入逻辑计划中,但并不执行这两条语句。让 Pig开始执行的是DUMP语句。此时,逻辑计划被编译成物理计划,并执行。

多查询执行


由于DUMP是一个诊断工具,因此它总是会触发语句的执行。但是, STORE命令与DUMP不同。在交互模式下,STOREDUMP—样,总是 会触发语句的执行(这一过程包含了 run命令)。但是,在批处理模式下, 它不会触发执行(此时包含了 exec命令)。这是为了性能考虑而进行的设 计。在批处理模式下,Pig会解析整个脚本,看看是否能够为减少写或 读磁盘的数据量进行优化。考虑如下的简单示例:

A = LOAD 'input/pig/multiquery/A';

B = FILTER A BY $1 == 'banana';

C = FILTER A BY $1 != 'banana';

STORE B INTO 'output/b';

STORE C INTO 'output/c'

关系BC都是从A导出的。因此,为了防止读两遍A, Pig可以用一个 MapReduce作业从A读取数据,并把结果写到两个输出文件中去:一个给 B, 一个给C这一特性称为“多查询执行multiquery execution)。

Pig的以前版本不包括这一特性:批处理模式下脚本中的每个STORE语句 都会触发语句的执行,从而每个STORE语句都有一个对应的作业。可以 在执行时对pig使用-Mno_multiquery选项来禁用多查询执行,从而 恢复使用以前的设置。

Pig的物理计划是一系列的MapReduce作业。在本地模式下,这些作业在 本地JVM中运行,而在MapRyuce模式下,它们在Hadoop集群上运行。

可以用EXPLAIN命令对一个关系査看Pig所创建的逻辑和物理计划 (例如 EXPLAIN max_temp;)。

EXPLAIN也会显示MapReduce计划,即显示物理操作是如何组成 MapReduce作业的。这是一个査看Pig为査询运行多少个 MapReduce作业的好办法。

11-1概括了能够作为Pig逻辑计划一部分的关系操作。11.6节将详细介 绍这些操作。

表11-1   Pig Latain扥关系操作


类型

操作

描述

加载与存储

LOAD

从文件系统或其他存储加载数据,存入关系

STORE

将一个关系存放到文件系统或其他存储中

DUMP

将关系打印到控制台

过滤

FILTER

从关系中删除不需要的行

DISTINCT

从关系中删除重复的行

FOREACH…GENERATE

在关系中增加或删除字段

MAPREDUCE

以一个关系作为输入运行某个MapReduce作业

STREAM

使用外部程序对一个关系进行变换

SAMPLE

对一个关系进行随机取样

分组与连接

JOIN

连接两个或签个关系

COGROUP

对两个或更多关系中的数据进行分组

GROUP

在一个关系中对数据进行分组

CROSS

创建两个或更多关系的乘积(叉乘

排序

ORDER

根据一个或多个字段对某个关系进行排序

LIMIT

将一个关系的元组个数限定在一定数量内

组合和切分

UNION

合并两个或多个关系为一个关系

SPLIT

把某个关系切分两个或多个关系

 

 

有些种类的语句并不会被加到逻辑计划中去。例如,诊断操作 DESCRIBEEXPLAIN以及ILLUSTRATE这些操作是用来让用户能够与 逻辑计划进行交互以进行调试的(见表11-2)。DUMP也是一种诊断操作。 它只能用于与很小的结果集进行交互调试,或与LIMIT结合使用,来获得 某个较大的关系的一小部分行。当输出包含较多行的时候,应该使用 STORE语句。这是因为STORE语句把结果存入文件而不是在控制台显示。

11-2. Pig Latin的诊断操作

操作

描述

'

DESCRIBE

打印关系的模式

EXPLAIN

打印逻辑和物理计划

ILLUSTRATE

使用生成的输入子集显示逻辑计划的试运行结果

 

 

为了在Pig脚本中使用宏和用户自定义函数Pig Latin提供了 REGISTER DEFINEIMPORT这三个语句(参见表11-3)。


表11-3   Pig Latin的宏和UDF语句


语句

REGISTER

描述

Pig运行时环境中注册一个JAR文件

DEFINE

为宏、UDF流式脚本或命令规范新建别名

IMPORT

把在另一个文件中定义的宏导入脚本


因为这些命令不处理关系,所以它们不会被加入逻辑计划。相反,这些命
令会被立即执行。
Pig提供了与Hadcmp文件系统和MapReduce进行交互的 命令及其他一些工具命令(参见表11-4)。与Hadoop文件系统进行交互的命 令对在Pig处理前和处理后移动数据非常有用。

 11-4. PigLatin 命令

类别

命令

描述

Hadoop文件系统

cat

打印一个或多个文件的内容

cd

改变当前目录

copyFromLocal复制本地文件或目录

copyToLocal

将一个文件或目录从Hadoop文件系统 复制到本地文件系统

cp

把一个文件或目录复制到另一个目录

fs

访问Hadoop文件系统外壳程序

Is

打印文件列表信息

mkdir

创建新目录

mv

将一个文件或目录移动到另一个目录

pwd

打印当前工作目录的路径

rm

删除一个文件或目录

rmf

强制删除文件或目录(即使文件或目录 不存在也不会失畋)

Hadoop MapReduce 工具

kill 

终止某个MapReduce作业

 

 

exec

在新的Gnmt外壳程序中以批处理模式 运行脚本

help

显示可用的命令和选项

quit

退出解释器

run 在当前Grunt外壳程序中运行脚本

set

设置Pig选项和MapReduce作业属性

sh

Grunt中运行外壳命令

 

 


文件系统相关的命令可以对任何Hadoop文件系统的文件或目录进行操作。 这些命令和hadoop fs命令很像(这是意料之中的,因为两者都是Hadoop Filesystem接口的简单封装)。可以使用Pigfs命令访问所有的 Hadoop文件系统外壳命令。例如fs –Is显示文件列表fs –help则显 示所有可用命令的帮助信息。 准确地说,使用哪个Hadoop文件系统是由Hadoop Core站点文件中的 fs.default.name属性决定的。3.3节详细介绍了如何设置这个属性。

除了 set命令以外,这些命令的含义大都不言自明。set命令用于设置控 Pig行为的选项,包括所有MapReduce作业的属性。debug选项用于在 脚本中打开或关闭调试日志(也可以在启动Pig时用ddebug选项控制 日志的级别):

grunt>

set debug on

job.name是另一个很有用的选项。它为Pig作业设定一个有意义的名称。 这样,即可方便地知道在共享的Hadoop集群上有哪些Pig MapReduce作业 是自己的。如果Pig正在运行某个脚本(而不是通过Gnrnt运行交互式査 ),默认作业名称是基于脚本的名称的。

11-4中有两个命令可以运行Pig脚本:execrun它们的区别是 exec在一个新的Grunt外壳程序中以批处理方式运行脚本。因此,所有脚 本中定义的别名在脚本运行结束后不能在外壳程序中再被访问。另一方 面,如果用run运行脚本,那么效果就和在外壳中手工输入脚本的内容是 一样的。因此,运行该脚本的外壳的命令历史中包含脚本的所有语句。只 能用exec进行多査询执行,即Pig以批处理方式一次执行一批语句(详见 11.4.2节的补充内容“多査询执行”)。不能用run命令进行多査询执行。

Pig Latin的设计中缺少原生的控制流语句。如果要写需要条件逻辑 或循环结构的程序,建议把Pig Latin嵌入其他语言之中,如 Python, JavaScriptJava,由该语言管理控制流。在这一模型下,

宿主脚本使用comile-bind-run AP丨来执行Pig脚本,并获得脚本的 状态。要想获得API的详细信息,请参考Pig的帮助文档。

嵌入式Pig程序总是在JVM中运行。如果要执行Python JavaScript程序,应使用pig命令,后面跟脚本名。合适的Java 本引擎(对Python而言是Jython,JavaScript而言是Rhino)会被自动选择来执行脚本。


11.4.3  表达式

可以通过计算表达式得到某个值。在Pig中,表达式可以作为包含关系操作的语句的一部分。Pig可以使用丰富的表达式类型。Pig中的很多表达式 类型和其他编程语言中的表达式相像。表11-5列出了各种表达式及其简要 说明和示例。本章将有很多这样的表达式。


 


11.4.4 类型

到目前为止,你已经见过Pig的一些简单数据类型,例如int chararray本节我们将更详细地讨论Pig的内置数据类型。

Pig有四种数值类型:intlong、floatdouble它们和Java中对应 的数值类型相同。此外,Pig还有bytear^ay类型,这类似于于表示二进 制大对象(blob)Javabyte数组;chararray类型则类似于用UTF-16 格式表示文本数据的java.lang.String。chararray也可以被加载或存 储为UTF-8格式。Pig没有任何一种数据类型对应于Javabooleanbyteshort:,char类型。这些数据类型都能方便地使用Pigint型或chararray类型(针对char)表示。

数值、文本与二进制类型都是原子类型Pig Latin有三种用于表示嵌套结构的复杂类型:元组(tuple)、(bag)和映射(map)。11-6列出了 Pig Latin 的所有数据类型。

11-6. Pig Latin数据类型

类别

数值

数据类型 int

描述

32位有符号整数

文字示例 

long

64位有符号整数

1L

float

32位浮点数

1.0F

double

64位浮点数

1.0

文本

chararray

UTF-16格式的字符数组

'a'

二进制

bytearray

字节数组

不支持

复杂类型

tuple

任何类型的字段序列

(1,'pomegranate')

bag

元组的无序多重集合(允许 重复元组)

{(1,’pomegranate’),(2)}

map

一个键-值对的集合。键必 须是字符数组,值可以是 任何类型的数据

['a'#'pomegranate']

 

复杂类型通常从文件加载数据或使用关系操作进行构建。但是要当心,表11-6列出的文字形式只是在Pig Latin程序中用来表示常数值。用 PigStorage加载器从文件加载的数据的原始形式往往与之不同。例如,文 件中如表11-6所示的包的数据可能形如{(1,pomegranate),(2)}(注意, 此时没有单引号)。如果有合适的模式,该数据可以加载到一个关系。该关 系只有一个仅有一个字段的行,而该字段的值是包。

Pig提供了内置的TOTUPLETOBAG以及TOMAP函数。它们被用来将表达式转化为元组、包以及映射。

虽然关系和包在槪念上是相同的(本质上都是元组的无序多重集合),但事实 上Pig对它们的处理稍有不同。关系是顶层构造结构,而包必须在某个关 系中。正常情况下,不必操心它们的区别。但是,对它们的使用有一些限 制。对于新手,这些限制仍然可能导致错误。例如,不能根据包文字直接 创建一个关系。因此,如下语句会运行失败:

A = {(1,2),(3,4)}; –Error

针对这种情况,最简单的解决办法是使用LOAD语句从文件加载数据。

另一个例子是,.对待•关系,不能像处理包那样把一个字段投影为一个新的 关系(例如,使用位置符号,用$0指向A的第一个字段):

B = A.$0

为此,必须使用关系操作将一个关系A转换为另一个关系B:

B = FOREACH A GENERATE $0;

Pig Latin将来的版本可能采用相同的方法处理关系和包,以消除这种不一致性。

11.4.5模式

Pig中的一个关系可以关联一个模式。模式为关系的字段指定名称和类型。 我们前面已经介绍过LOAD语句的AS子句如何在关系上附以模式:

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'

>> AS (year:int, temperature:int, quality:int); grunt> DESCRIBE records;

records: {year: int,temperature: int,quality: int}

这次,虽然加载的是和上次一样的文件,但年份已声明为整数类型,而不chararray类型。如果要对年份这一字段进行算术操作(例如将它变为一个时间戳),那么用整数类型更为合适;相反,如果只是想把它作为一个简 单的标识符,那么表示为chararray类型就更合适。Pig的这种模式声明 方式提供了很大的灵活性。这和传统SQL数据库要求在数据加载前必须先 声明模式的方式截然不同。Pig的设计目的是用它来分析不包含数据类型信 息的纯文本输入文件,因此,它为字段确定类型的时机比RDBMS要晚也是理所当然的。

我们也可以完全忽略类型声明:

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'

>> AS (year> temperature, quality); grunt> DESCRIBE records;

records: {year: bytearray,temperature: bytearray,quality: bytearray}

在这个例子中,我们在模式中只确定了字段的名称:year、temperature quality默认的数据类型为最通用的bytearray,即二进制串。

不必为每一个字段都给出类型。你可以让某些字段的类型为默认的 bytearray,就像如下模式声明示例中的year字段:

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'

>> AS (year, temperature:int, quality:int); 

grunt> DESCRIBE records;

records: {year: bytearray,temperature: int,quality: int}

但是,如果要用这种方式确定模式,必须在模式中定义每一个字段。同 样,不能只确定字段的类型而不给出其名称。另一方面,模式本身是可选 的。可以省略AS子句,如下所示:

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'; 

grunt DESCRIBE records;

Schema for records unknown.

对于没有对应模式的关系中的字段,只能使用位置符号进行引用$0表示 关系中的第一个字段,$1表示第二个,依此类推。它们的类型都是默认的 bytearray

grunt projectedrecords = FOREACH records GENERATE $0, $1, $2; 

grunt> DUMP projected_records;

(1950,0,1)

(1950.22.1)

(1950,-11,1)

(1949.111.1)

(1949.78.1)

grunt> DESCRIBE projected_records;

projected—records: {bytearray,bytearray,bytearray}


虽然不为字段指明类型很省事(特别是在撰写查询的开始阶段),但如果指定 了字段的类型,我们能使Pig Latin程序更清晰,也使程序运行得更高效。 因此,一般情况下,建议指明字段的数据类型。

虽然在査询中声明模式的方式是灵活的,但这种方式并不利干模式 重用。处理相同输入数据的一组Pig査询常常使用相同的模式。如 果一个査询要处理很多字段,那么在毎个査询中维护重复出现的模 式会很困难。

Apache HCatalog 项目http://incubator.apache.org/hcatalog/)通过提供 基于Hivemetastore的表的元数据服务解决了这一问题。这样,

Pig査询就可以通过名称引用模式,而不必每次都得指明整个模式。

1. 验证与空值

SQL数据库在加载数据时,会强制检查表模式中的约束。例如,试图将一 个字符串加载到声明为数值型的列会失败。在Pig中,如果一个值无法被 强制转换为模式中声明的类型,Pig会用空值null替代。如果有如下天气 数据输入,在定义为整数型的地方出现了一个字符e,我们来看一下这 一验证机制是如何工作的:

1950 0 1

1950 22 1

1950 e 1

1949 111 1 

1949 78 1

Pig在处理损坏的行时会为违例的值产生一个null在输出到屏幕(或使用 STORE存储)时,空值null被显示(或存储)为一个空位:

grunt records = LOAD ’input,ncdc/micro-tab/samplecorrupt.txt,

>> AS (yeanchararray, temperature:int, quality: int); 

grunt> DUMP records;

(1950,0,1)

(1950.22.1)

(1950,,1)

(1949.111.1)

(1949.78.1)

Pig会为非法字段产生一个警告(在此没有显示),但是它不会终止处理。大的数据集普遍都有损坏的值、无效值或意料之外的值,因而逐步修正每一 条无法解析的记录一般都不太现实。作为一种替代方法,我们可以一次性 地把所有的非法记录都找出来,然后再一起处理它们。我们可以修正我们的程序(因为这些记录表示我们写程序时犯了错误),或把这些记录过滤掉 (因为这些数据无法使用):

grunt> corrupt_records = FILTER records BY temperature is null; grunt> DUMP corrupt_records;

(1950”1)

请注意这里对is null操作的使用和SQL中类似。事实上,我们可以在 原始记录中获得更多的信息(例如标识符和无法被解析的值等),以帮助分析问题数据。

我们可以使用如下对关系中的行进行计数的常用语句获得损坏记录的条数:

grunt> grouped = GROUP corrupt_records ALLj^ grunt> all_grouped = FOREACH grouped GENERATE group,

COUNT(corruptrecords); grunt DUMP all_grouped;

(all,l)

11.6.3节在介绍GROUP时详细解释了分组和ALL操作。

另一个有用的技巧是使用SPLIT操作把数据划分成“好”和“坏”两个关系,然后再分别对它们进行分析:

grunt> SPLIT records INTO good_records IF temperature is not null,

>> bad_records IF temperature is null; gnunt> DUMP good_records;

(1950,0,1)

(1950.22.1)

(1949.111.1)

(1949.78.1)

grunt> DUMP bad records:

(1950^1)

让我们回到前面temperature数据类型未声明的情况,此时无法轻松检测 到损坏的数据,因为它没有被当作null值:

grunt〉records = LOAD * input/ncdc/micro-1ab/sample_corrupt.txt*

>> AS (year:chararray, temperature, quality:int); grunt> DUMP records;

(1950,0,1)

(1950.22.1)

(1950,e,l)

(1949.111.1)

(1949.78.1)

grunt> filtered_records = FILTER records BY temperature != 9999 AND >> (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); grunt> grouped_records = GROUP filtered_records BY year; grunt> max_temp = FOREACH grouped_records GENERATE group,

>> MAX(filtered_records.temperature); grunt DUMP max_temp;

(1949.111.0)

(1950.22.0)

在这种情况下,temperature字段被解释为bytearray,因此在数据加载 时,损坏的字段并没有被检测出来。在传输给MAX函数时,因为MAX只能 处理数值类型,所以temperature字段被强制转换为double类型。损坏 的字段不能被表示为double,所以它被当作null处理,MAX则会忽略这个值。通常,最好的解决办法是在加载数据时声明数据类型,并在进行主 要的处理前查看关系中缺失的值或损坏的值。

有时,因为有些字段缺失,损坏的数据被显示为比较短的元组。可以用 SIZE函数对它们进行过滤,如下所示:

grunt> A = LOAD 'input/pig/corrupt/missing_fields'

grunt> DUMP A;

(2,Tie)

(4,Coat)

(3)

(l, Scarf)

grunt> B = FILTER A BY SIZE(TOTUPLE(*)) > 1;

grunt> DUMP B;

(2,Tie)

(4,Coat)

(l,Scarf)

2. 模式合并

Pig中,不用为数据流中每一个新产生的关系声明模式。在大多数情况 下,Pig能够根据关系操作的输入关系的模式来确定输出结果的模式。

那么模式是如何传播到新关系的呢?有些关系操作并不改变模式。因此,LIMIT操作(对一个关系的最大元组数进行限制)产生的关系就和它所处理的 关系具有相同的模式。对于其他操作,情况可能更复杂一些。例如,UNION操作将两个或多个关系合并成一个,并试图同时合并输入关系的模 式。如果这些模式由于数据类型或字段个数不同而不兼容,那么UNION生的模式是未知的。

针对数据流中的任何关系,都可以使用DESCRIBE操作来获取它们的模 式。如果要重新定义一个关系的模式,可以使用带AS子句的FOREACHGENERATE操作来定义输入关系的一部分或全部字段的模式。

11.5节在讨论用户自定义函数时将进一步讨论模式。


11.4.6 函数

Pig中的函数有四种类型。

1. 计算函数Eval function)

计算函数获取-个或多个表达式作为输入,并返回另一个表达式。MAX 是一个内置计算函数的例子,它返回一个包内所有项中的最大值。有些计 算函数是“聚集函数aggregate function),这意味着它们作用于数据的 “包”bag),并产生一个标量值(scalar value)。MAX就是一个聚集函数。此 外,很多聚集函数是“代数的”algebraic)^也就是说这些函数的结果可以 增量计算。用MapReduce的术语来表示,通^过使用combiner进行计算,代 数函数的计算效率可以提高很多(参见2.4.2节对combiner函数的讨论)。 MAX是一个代数函数,而计算一组值的“中位数”median)的函数则不是代数函数。

2.过滤函数(Filter function)

 

过滤函数是一类特殊的计算函数。这类函数返回的是逻辑布尔值。正如其 名,过滤函数被FILTER操作用于移除不需要的行。它们也可以用于其他以 布尔条件作为输入的关系操作或用于使用布尔或条件表达式的表达式。 IsEmpty就是一个内置过滤函数。它测试一个包或映射是否包含有元素。

3. 加载函数(Load function) 


加载函数指明如何从外部存储加载数据到一个关系。

4. 存储函数(Store function)

存储函数指明如何把一个关系中的内容存到外部存储。通常,加载和存储 函数由相同的类型实现。例如,PigStorage从分隔的文本文件中加载数 据,也能以相同的格式存储数据。

Pig有很多内置的函数,表11-7列出了其中的一部分。完整的内置函数列表包括很多标准数学和字符串函数。Pig的每个发布版本文档都包含这些函数列表。

表11-7  Pig的部分内置函数

类别

函数名称

描述

计算

AVG

计算包中项的平均值

CONCAT

把两个字节数组或字符数组连接成一个

COUNT

计算一个包中非空值的项的个数

COUNT__STAR

计算一个包中的项的个数,包括空值

DIFF

计算两个包的差。如果两个参数不是包,那么如果它们相 同,则返回一个包含这两个参数的包否则返回一个空的包

MAX

计算一个包中项的最大值

MIN

计算一个包中项的最小值

SIZE

计算一个类型的大小。数值型的大小总是对干字符数 组,它返回字符的个数;对于字节数组,它返回字节的个 数1对于容器(container,包括元组、包、映射),它返回其 中项的个数

SUM

计算一个包中项的值的总和

TOBAG

把一个或多个表达式转换为单独的元组,然后把这些元组 放入包

AVG

计算包中项的平均值

TOKENIZE

对一个字符数组进行标记解析,并把结果词放入一个包

TOMAP

将偶数个表达式转换为一个键-值对的映射

TOP

计算包中最前面的〃个元组

TOTUPLE

将一个或多个表达式转换为一个元组

过滤

IsEmpty

判断一个包或映射是否为空

加载/ 存储

PigStonage

用字段分隔文本格式加载或存储关系。用一个可设置的分 隔符(默认为一个制表符)把每一行分隔为字段后,将它们 分别存储于元组的各个字段。这是不指定加载/存储方式时 的默认存储函数

BinStorage

从二进制文件加载一个关系或把关系存储到二进制文件 中。该函数使用基于Hadoop Writable对象的Pig内部格式

TextLoader

从纯文本格式加载一个关系。每一行对应于一个元组。每 个元组只包含一个字段,即该行文本

]sonLoader,]soPig定义的)JSON格式加载关系,或将关系存储为 nStorage JSON格式。每个元组存储为一行

HBaseStroage

HBase表中加载关系,或将关系存储至HBase表中

如果表中没有你需要的函数,可以自己写。但在此之前,可以看一下Piggy Bank这是一个Pig社区共享Pig函数的库。例如Piggy Bank中有Avro


数据文件、
CSV文件Hive RCFilesSequenceFiles以及XML文件的加 载和存储函数。Pig网站有关于如何浏览和获取Piggy Bank函数的指导。如 Piggy Bank也没有你所需要的函数,你可以撰写自己的函数(如果你的函 数很通用,可以考虑把它贡献给Piggy Bank好让其他人也能从中受益)。 这些函数就是“用户自定义函数”user-defined function, UDF)。

11.4.7 

宏提供了在Pig Latin内对可重用的Pig Latin代码进行打包的功能。例如, 我们可以把对关系进行分组,然后在每一组内查找最大值的Pig Latin程序 抽出来,定义成如下的宏:

DEFINE max_by_group(X, group_key, max_field) RETURNS Y {

A = GROUP $X by $group_key;

$Y = FOREACH A GENERATE group, MAX($X.$max_field);

这个宏的名字为max_by_group它包含三个参数:一个关系X以及两个 字段名:grup_keymax_field它返回一个关系Y在宏的体内,参 数和返回别名通过使用$前缀进行引用,例如$X

宏的使用方法如下:

records = LOAD 'input/ncdc/micro-tab/sample.txt'

AS (year:chararray, temperature:int, quality:int); filtered_records = FILTER records BY temperature != 9999 AND

(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); max_temp = max__by_group(filtered_records, year, temperature);

DUMP max_temp

在运行时,Pig将使用宏定义展开宏。展开后的程序如下所示,被展开的部 分用粗体表示:

records = LOAD 'input/ncdc/micro-tab/sample.txt'

AS (year:chararray, temperature:int, quality:int); filtered_records = FILTER records BY temperature != 9999 AND

(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); macro_max_by_group_A_0 = GROUP filtered_records by (year); max_temp = FOREACH macro_max_by_group_A_0 GENERATE group,

MAX(filtered_records.(temperature));

DUMP max_temp

一般情况下,你看不到程序展开后的形式,这是因为Pig是在内部进行这 一操作的。但是,有些情况下,在撰写和调试宏时,如果能看到展开后的形式会比较有益。可以通过给pig命令传递dryrun参数让Pig只进行宏 扩展(而不执行脚本)。

注意,传递给宏的参数(filtered_recordsyear,以及 temperature) 已经取代了宏定义中的参数名。宏定义中不使用$前缀的别名,例如示例中 的A仅在宏定义的局部有效。在展开时,它们被重写,以避免和程序的其 他部分的另丨J名冲突。在这个例子中,A在展开后的形式为macro_max_by_group_A_0。

为了方便重用,宏可以定义在Pig脚本以外的文件中。这种情况下,需要在使用宏的脚本中导入这些文件。导入(import)语句的形式如下:

IMPORT './chll/src/main/pig/max_temp.macro';

11.5用户自定义函数

P i g设计者认识到以插件形式提供使用用户定制代码的能力是一个关键问 题,但这也是数据处理中最琐碎的工作。因此,他们的设计简化了用户自 定义函数的定义与使用。我们在本节中只介绍Java UDF你也可以使用 Python  JavaScript 来撰写 UDF,它们都使用 Java Scripting API 来运行。

11.5.1 过滤 UDF

我们通过编写一个过滤不满足气温质量读数要求的天气记录的函数来演示 如何写过滤函数。我们的基本思路是修改下面的代码:

filtered_records = FILTER records BY temperature != 9999 AND

(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);

修改后的代码如下:

filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);

这样写有两个好处:使pig脚本更精简;而且还封装了处理逻辑,以便轻 松重用于其他脚本。如果只是编写一个即时査询,我们可能不需要麻烦地 写一个UDF只有需要不断进行相同的处理时,才需要如此写可重用的 UDF

所有的过滤函数都是FilterFunc的子类,而FilterFunc本身是 EvalFunc的子类。我们后面会对EvalFunc进行更详细的介绍。在这里, 我们只需要知道EvalFunc本质上就像下面的类这样:

public abstract class EvalFunc<T> {
    public abstract T exec(Tuple input) throws IOException;
}


EvalFunc只有一个抽象方法exec()。它的输入是一个元组,输出则只有 一个值,其(参数化)类型为T输入元组的字段包含传递给函数的表达式一 一在这个例子里,它是一个整数。对于FilterFuncTBoolean类型

的,对于那些不应该被过滤掉的元组,该方法应该返回true

对于本例中的质量过滤器,我们要写一个IsGoodQuality类,扩展 FilterFunc并实现exec(),参见范例11-1。Tuple类本质上是一个与某 个类型关联的对象列表。在这里,我们只关心第一个字段(因为函数只有一 个参数)。我们用Tupleget()方法,根M序号来获取这个字段。该字段 的类型是整型。因此,如果它非空,我们就对它进行类型转换,检査它是 否表示气温读数是正确的,并根据检査结果返回相应的值:true false

范例11-1.FiterFunc UDF删除包含不符合质量要求的气温读数记录

package com.hadoopbook.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
public class IsGoodQuality extends FilterFunc {
    @Override
    public Boolean exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() == 0) {
            return false;
        }
        try{
            Object object = tuple.get(0);
            if (object == null) {
                return false;
            }
            int i = (Integer) object;
            return i == 0 || i == 1 || i == 4 || i == 5 || i == 9; 
        } catch (ExecException e) {
            throws new IOException(e);
        }
    }
}

为了使用新函数,我们首先进行编译,并把它打包到一个JAR文件(在本书 所附的示例代码中,包含了如何进行打包的指导)。然后,我们通过 REGISTER操作指定文件的本地路径(不带引号),告诉Pig这个JAR文件的信息:

grunt> REGISTER pig-examples.jar;

最后,我们就可以调用这个函数:

grunt filteredrecords = FILTER records BY temperature != 9999 AND 

>> com.hadoopbook.pig.IsGoodQuality(quality);

Pig把函数名作为Java类名并试图用该类名来加载类以完成函数调用。(这也就是为什么函数名是大小写敏感的,因为Java类名是大小写敏感的。)在 搜索类的时候Pig会使用包含已注册JAR文件的类加载器。运行于分布式模式时,Pig会确保将JAR文件传输到集群。

对于本例中的 UDF, Pig 使用 com.hadoopbook.pig.IsGoodQuality 名称进行査找,能在我们注册的JAR文件中找到它。 内置函数的解析也使用同样的方式处理。内置函数和UDF的处理有两个区 别:Pig会搜索一组内置包。因此,对于内置函数的调用并不一定要提供完 整的名称。例如,函数MAX实际上是由包org.apache.pig.builtin中的 类MAX实现的。这也是Pig搜索的内置包,所以我们在Pig程序中可以使用 MAX 而不需要用 org. apache .pig. built in. MAX

我们可以通过在调用Grunt时使用命令行参数:

Dudf. import. list=com. hadoopbook • pig来把我们的包加人到捜索路

径中去。或者,我们也可以使用DEFINE操作为函数定义別名,以缩短函 教名:

grunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality(); 

grunt> filtered__records = FILTER records BY temperature != 9999 AND 

isGood(quality);

需要在一个脚本里多次使用一个函数时,为函数定义别名是一个好办法。

如果要向UDF的实现类传递参数,必须定义别名。



使用类型 


只有在质量字段的类型定义为int时,前面定义的过滤器才能正常工作。 如果没有类型信息,这个UDF就不能正常处理。因为此时该字段的类型是 默认类型 bytearray,表示为 DataByteArray 类。因为 DataByteArray不是整型,因此类型转换失败。 

修正这一问题最直接的办法是在exec()方法中把该字段转换成整型。但更 好的办法是告诉Pig该函数所期望的各个字段的类型。EvalFunc为此提供 getArgToFuncMapping()方法。我们可以重载这个方法来告诉Pig第一个字段应该是整型。

@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
 List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(); 
  funcSpecs.add(new FuncSpec(this.getClass().getName(),
  new Schema(new Schema.FieldSchemaCnull^ DataType.INTEGER))));
  return funcSpecs;
}

这个方法为传递给exec()方法的那个元组的每个字段返回一个FuncSpec 对象。在这个例子里,只有一个字段。我们构造一个匿名FieldSchema(因为Pig在进行类型转换时忽略其名称,因此其名称以null传递)。该类型 使用PigDataType类的常量INTEGER进行指定。

使用这个修改后的函数,Pig将尝试把传递给函数的参数转换成整型。如果 无法转换这个字段,则把这个字段传递为null如果字段为null, exec() 方法返回的结果总是false在这个应用中,因为我们想要在过滤掉质量 字段无法理解的记录,所以这样做很合适。

以下是使用这个新函数的最终程序:

–max_temp_filter_udf.pig REGISTER pig-examples.jar;

DEFINE isGood com.hadoopbook.pig.IsGoodQuality()j records = LOAD ^input/ncdc/micro-tab/sample.txt'AS (yeanchararray, temperature: int, quality: int); filteredrecords = FILTER records BY temperature != 9999 AND isGood(quality); grouped_records = GROUP filtered_records BY year;

 max—temp = FOREACH grouped_records GENERATE group,

MAX(filtered_records.temperature);

DUMP max_temp;


11.5.1 计算 UDF

写计算函数比写过滤函数的步骤要稍微多一些(参见范例丨1-2)。让我们考虑 范例11-2中的UDF它类似于java.lang.Stringtrim()方法,从 chararray值中去掉开头和结尾的空白符。我们将在本章稍后用到这个

函数。

范例11-2.EvalFunc UDFchararray值中去除开头和结尾的空白符 

public class Trim extends EvalFunc<String> {
    @Override
    public String exec(Tuple input) throws IOException { 
        if (input == null || input.size() == 0) {
            return null;
        }
        try {
            Object object = input•get(0); 
            if (object == null) { 
                return null;
            }
            return ((String) object).trim();
        } catch (ExecException e) { 
            throw new IOException(e);
        }
    }
    @Override
    public List<FuncSpec> getArgToFuncMapping() throws FrontendException { 
        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
        funcList.add(new FuncSpec(this.getClass().getName(), new Schema( new Schema.FieldSchema(null, DataType.CHARARRAY))));
        return funcList;
    }
}

要写一个计算函数,需要参数化返回值的类型,扩展EvalFunc(在Triin UDF中,该类型为String)。IsGoodQuality UDF中的方法一样, exec() getArgToFuncMapping()方法都很直观。

在写计算函数的时候,需要考虑输出模式。在下面的语句中,B的模式由函数Udf决定:

B = FOREACH A GENERATE udf($0)j

如果udf创建了有标量字段的元组,那么Pig可以通过反射reflection) 确定B的模式。对于复杂数据类型,例如包(bag)、元组或映射,Pig需要 更多的信息。此时需要实现utputSchema()方法将输出模式的相关信息 告诉Pig

Trim UDF返回一个字符串。Pig把返回值翻译为chararray类型,如以

下会话所示:

grunt> DUMP A;

(pomegranate)

(banana ) 

(apple)

(lychee ) grunt> DESCRIBE A;

A: {fruit: chararray}

grunt> B = FOREACH A GENERATE com.hadoopbook.pig.Trim(fruit); grunt> DUMP B;

(pomegranate)

(banana)

(apple)

(lychee)

grunt> DESCRIBE B;

B: {chararray}

A包含的chararray字段中有开头和结尾的空白符。我们将Trim函数应 用于A的第一个字段(名为fruit),从而创建了 BB的字段被正确推断为 chararray

动态调用

有时你希望使用由某个Java库提供的函数,而不是自己编写UDF动态调 用器(invoker)使你可以在Pig脚本中直接调用Java方法。使用这种方法的 代价是,对于方法的调用是通过Java的反射(reflection)机制进行的。如果 要为一个很大的数据集的每行记录进行调用,则会导致巨大的处理开销。 所以,对于需要重复运行的脚本,最好使用专用的UDF

下面的代码片段展示如何定义和使用基于Apache Commons Lang StringUtils 类的 trim UDF:

grunt> DEFINE trim InvokeForString('org.apache.comnons.lang.StringUtils.trim*,'String');


grunt> B = FOREACH A GENERATE trim(fruit); grunt> DUMP B;

(pomegranate)

(banana)

(apple)

(lychee)

使用调用器InvokeForString的原因是该方法的返回值类型是String此外 还有 InvokeForlntInvokeForLongInvokeForDoubleInvokeForFload

调用器构造函数的第一个参数应是可被调用的方法。第二个参数为以空格 分隔的方法参数类的列表。

11.5.2 加载 UDF

我们将演示一个定制的加载函数,该函数可以通过指定纯文本的列的区域 定义字段,和Unixcut命令非常类似。它的用法如下:

grunt> records = LOAD 'input/ncdc/micro/sample.txt'

>> USING com.hadoopbook.pig.CutLoadFunc('16-19,88-92,93-93')

>> AS (year:int,temperature:int, quality:int); 

grunt> DUMP records;

(1950,0,1)

(1950.22.1)

(1950,-11,1)

(1949.111.1)

(1949.78.1)

传递给CutLoadFunc的字符串是对列的说明:每一个由逗号分隔的区域定 义了一个字段。字段的名称和数据类型通过AS子句进行定义。让我们来看 范例11-3给出的CutLoadFunc的实现:

范例11-3.该加载函数以列区域作为字段加载元组

public class CutLoadFunc extends LoadFunc {
    private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);
    private final List<Range> ranges;
    private final TupleFactory tupleFactory = TupleFactory.getlnstance(); 
    private RecordReader reader;
    public CutLoadFunc(String cutPattern) { 
        ranges = Range.parse(cutPattern);
    }
    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job,lacation);
    }
    @Override
    public InputFormat getlnputFormat() { 
        return new TextInputFormat();
    }
    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) { 
        this.reader = reader;
    }
    @Override
    public Tuple getNext() throws IOException { 
        try { 
            if (!reader.nextKeyValue()) { 
                return null;
            }
            Text value = (Text) reader.getCurrentValue();
            String line = value.toString();
            Tuple tuple = tupleFactory.newTuple(ranges.size()); 
            for (int i = 0; i < ranges.size(); i++) {
                Range range = ranges.get(i); 
                if (range.getEnd() > line.length()) {
                    LOG.warn(String.format(
                    " Range end (%s) is longer than line length (%s)" , range. getEnd(), line. length())); 
                    continue;
                }
                tuple.set(i, new DataByteArray(range.getSubstring(line)));
            }
            return tuple;
        } catch (InterruptedException e) { 
            throw new ExecException(e);
        }
    }
}

Hadoop类似,Pig的数据加载先于mapper运行,所以保证数据可以被分割成能被各个mapper独立处理的部分非常重要,7.2.1节在讨论输入分片和 记录时,介绍了更多背景知识)。

Pig 0.7.0开始,加载和存储函数接口已经进行了大幅修改,以便与 HadoopInputFormatOutputFormat类基本一致。针对Pig以前版本的函数需要重写(重写指南可从http://wike.apache.org/pig/LoadStoreMigrationGuide获得)。LoadFunc —般使用底层已有的InputFormat来创建记录,而 LoadFunc自身则提供把返回记录变为Pig元组的程序逻辑。


CutLoadFunc类使用说明了毎个字段列区域的字符串作为参数进行构造。 解析该字符串并创建内部Range对象列表以封装这些区域的程序逻辑包含 Range类中。这里没有列出这些代码(可在本书所附的示例代码中找到)。

Pig调用LoadFuncsetLocation()把输入位置传输给加载器。因为 CutLoadFunc使用TextlnputFormat把输人切分成行,因此我们只用 FilelnputFormat的一个静态方法传递设置输入路径的位置信息。

 Pig使用了新的MapReduce API因此,我们使用的是org.apache.

 hadoop.mapreduce包的输人和输出格式及其关联的类。

然后,和在MapReduce中一样Pig调用getInputFormat()方法为每一 个分片新建一个RecordReaderPig把每个RecordReader传递给 CutLoadFuncprepareToRead()方法以便通过引用来进行传递,这样, 我们就可以在getNext()方法中用它遍历记录。

Pig运行时环境会反复调用getNext(),然后加载函数从reader中读取元组 直到reader读到分片中的最后一条记录。此时,加载函数返回空值null 报告已经没有可读的元组。

负责把输入文件的行转换为Tuple对象是getNext()的任务。它利用Pig 用于创建Tuple实例的类TupleFactory来完成这一工作。newTuple()法新建一个包含指定字段数的元组。字段数就是Range类的个数,而这些 字段使用Range对象所确定的输入行中的子串填充。

我们还需要考虑输入行比设定范围短的情况。一种选择是抛出异常并停止 进一步的处理。如果你的应用不准备在碰到不完整或损坏的记录时继续工 作,这样处理当然没有问题。在很多情况下,另一种更好的选择是返回一 个有null字段的元组,然后让Pig脚本根据情况来处理不完整的数据。我 们这里采取的是后一种方法:当区域超出行尾时,通过终止for循环把元 组随后的字段都设成默认值null

使用模式 现在让我们来考虑加载的字段数据类型。如果用户指定模式,那么字段就 需要转换成相应的数据类型。但在Pig中,这是在加载后进行的。因此, 加载器应该始终用类型DataByteArray来构造包含bytearray字段的元组。当然,我们也可以让加载器函数来完成类型转换。这时需要重载 getLoadCaster(),以返回包含一组类型转换方法的定制LoadCaster接 口实现:

public interface LoadCaster {

public Integer bytesToInteger(byte[] b) throws IOException; public Long bytesToLong(byte[] b) throws IOException; public Float bytesToFloat(byte[] b) throws IOException; public Double bytesToDouble(byte[] b) throws IOException; public String bytesToCharArray(byte[] b) throws IOException; public Map<String, Object> bytesToMap(byte[] b) throws IOException; public Tuple bytesToTuple(byte[] b) throws IOException; public DataBag bytesToBag(byte[] b) throws IOException;

CutLoadFunc 并没有重载 getLoadCaterO 因为默认的 getLoadCaster()实现返回了 UtfSStorageConverter它提供了 UTF-8

编码数据到Pig数据类型的标准的转换功能。

在有些情况下,加载函数本身可以确定模式。例如,如果我们在加载XML JSON这样的自描述数据,则可以为Pig创建一个模式来处理这些数据。 此外,加载函数可以使用其他方法来确定模式,例如使用外部文件,或通 过传递模式信息给构造函数。为了满足这些需要,加载函数应该(在实现 LoadFunc接口之外)实现LoadMetadata接口,向Pig运行时环境提供模 式。但是请注意,如果用户通过LOADAS子句定义模式,那么它的优先级 将高于通过LoadMetadata接口定义的模式。

加载函数还可以实现LoadPushDown接口,了解査询需要哪些列。因为此 时加载器可以只加载査询需要的列,因此这可能有助于按列存储的优化。 在示例中,CutLoadFunc需要读取元组的整行,所以只加载部分列不容易 实现,鉴于此,我们在这里不使用这种优化技术。

11.6数据处理操作 11.6.1数据的加载和存储

在本章中,我们已经看过Pig如何从外部存储加载数据来进行处理。与之 相似,存储处理结果也是非常直观的。下面的例子使用PigStorage将元 组存储为以冒号分隔的纯文本值:

grunt STORE A INTO out USING PigStorage(‘’);


gnunt> cat out 

Joe:cherry:2 

Ali:apple:3 

Doe:banana:2 

Eve:apple:7

其他内置存储函数参见前面的表丨1-7。 

11.6.2数据的过滤

如果你已经把数据加载到关系中,那么下一步往往是对这些数据进行过 滤,移除你不感兴趣的数据。通过在整个数据处理流水线的早期对数据进 行过滤,可以使系统数据处理总量最小化,从而提升处理性能。

1. FOREACH···GENERATE 操作

我们已经介绍了如何使用带有简单表达式和UDFFILTER操作从一个关 系中移除行。FOREACHGENERATE操作用于逐个处理一个关系中的 行。它可用于移除字段或创建新的字段。在这个示例里,我们既要删除字 段,也要创建字段:

grunt> DUMP A;

(Doe,cherry,2)

(Ali^apple^)

(Doe,banana,2)

$Eve,apple,7)

grunt> B = FOREACH A GENERATE $0, $2+1, "Constant"; grunt> DUMP B;

(Doe,B,Constant)

(Ali,4,Constant)

(3oe, B^onstant)

(Eve,8,Constant)

在这里,我们已经创建了一个有三个字段的新关系B它的第一个字段是 A的第一个字段($0)的投影。B的第二个字段是A的第三个字段($2)1。

B的第三个字段是一个常量字段(目卩B中每一行在第三个字段的取值都相 同),其类型为chararray,取值为Constant

FOREACHGENERATE操作可以使用嵌套形式以支持更复杂的处理。在如下示例中,我们计算天气数据集的多个统计值:

–year_stats.pig REGISTER pig-examples.jar;

DEFINE isGood com.hadoopbook.pig.Is6oodQuality(); records = LOAD 'input/ncdc/all/19{lJ2,3,4,5}0*'


USING com.hadoopbook•pig.CutLoadFunc(,5-10,11-15,16-19,88-92,93-93,

AS (usaf:chararray, wban:chararray, year:int, temperature:int, quality:int);

grouped_records = GROUP records BY year PARALLEL 30;

year_stats = FOREACH grouped_records { uniq_stations = DISTINCT records.usaf; good—records = FILTER records BY isGood(quality);

GENERATE FLATTEN(group), COUNT(uniq_stations) AS station_count, COUNT(good_records) AS good_record_count, COUNT(records) AS record_count;

DUMP year_stats;

通过使用我们前面开发的cut UDF,我们从输入数据集加载多个字段到 records关系中。接下来,我们根据年份对records进行分组。请注意, 我们使用关键字PARALLEL来设置要使用多少个reducer这在使用集群进 行处理时非常重要。然后,我们使用嵌套的FOREACHGENERATE操作 对每个组分别进行处理。第一重嵌套语句使用DISTINCT操作为每一个气 象观测站的USAF标识创建一个关系。第二层嵌套语句使用FILTER操作和 一个UDF为包含“好”的读数的记录创建一个关系。最后一层嵌套语句是 GENERATE 语句(嵌套 FOREACH…GENERATE 语句必须以 GENERATE  句作为最后一层嵌套语句)。该语句使用分组后的记录和嵌套语句块创建的 关系生成了需要的汇总字段。

在若干年数据上运行以上程序,我们得到如下结果:

(1920,8L,8595L,8595L)

fl950,1988L,8635452L,8641353L)

(1930,121L,89245L,89262L)

(1910,7L,7650L,76S0L)

(1940,732L,1052333L,10S2976L)

这些字段分别表示年份、不同气象观测站的个数、好的读数的总数和总的 读数。从中我们可以看到气象观测站个数和读数个数是如何随着时间变化 而增长的。

1. STREAM 操作

STREAM操作让你可以用外部程序或脚本对关系中的数据进行变换。这一 操作的命名对应于HadoopStreaming,后者为MapReduce的提供类似能 (参见2.5节对HadoopStreaming的讨论)。


STREAM可以使用内置命令作为参数。下面的例子使用Unix cut命令从A 中每个元组抽取第二个字段。注意,命令及其参数要用反向撇号引用:

grunt> C = STREAM A THROUGH 'cut -f 2';

grunt> DUMP C;

(cherry)

(apple)

(banana)

(apple)

STREAM操作使用PigStorage来序列化/反序列化关系,输出为程序的标准 输出流或从标准输入流读入。A中的元组变换成由制表符分隔的行,然后 传递给脚本。脚本的输出结果被逐行读入,并根据制表符来划分以创建新 的元组,然后输出到关系C也可以使用DEFINE操作,通过实现 PigToStream  StreamToPig(两者都包含在 org.apache.pig 包中)来提 供定制的序列化和反序列化程序。 在编写定制的处理脚本时,Pig流式处理是最有用的。以下的Python脚本 用于筛选气温记录:

#!/usr/bin/env python

import re import sys

for line in sys.stdin:

(year, temp, q) = line.strip().split() if (temp != "9999" and re.match("[01459]", q)): print "%s\t%s" % (year, temp)

要使用这一脚本,需要把脚本传输到集群上。这可以通过DEFINE子句来 完成。该子句还为STREAM命令创建了一个别名。然后,便可以像下面的 Pig脚本那样在STREAM语句中使用该别名:

–max_temp_filter_stream.pig

DEFINE is_goodquality is_goodquality.py

SHIP (•chll/src/main/python/isgood_quality•py•〉; records = LOAD 'input/ncdc/micro-tab/sample.txt'

AS (year:chararray, temperature:int, quality:int); filtered_records = STREAM records THROUGH is_good__quality AS (year:chararray, temperature:int); grouped_records = GROUP filtered_records BY yean; max_temp = FOREACH grouped__records GENERATE group,

MAX(filtered_records.temperature);

DUMP max_temp;


11.6.3数据的分组与连接

MapReduce中对数据集进行连接操作需要程序员写不少程序,详情参见 8.3节对连接的讨论。Pig为连接操作提供很好的内置支持,简化了数据集 的连接。因为只有非规范化的大规模数据集才最适宜使用Pig(或 MapReduce)这样的工具进行分析,因此连接在Pig中的使用频率远小于在 SQL中的使用频率。

1.关于JOIN语句

让我们来看一个“内连接inner join)的示倒。考虑有如下关系AB:

grunt> DUMP A;

(2,Tie)

(4,Coat)

(3,Hat)

(1,Scarf) grunt> DUMP B;

(oe,2)

(Hank,4)

(Ali,0)

(Eve,3)

(Hank,2)

我们可以在两个关系的数值型(标识符)属性上对它们进行连接操作

grunt> C = DOIN A BY $0, B BY $1 grunt DUMP C;

(2,Tie,Doe,2)

(2,Tie,Hank,2)

(3,Hat,Eve,3)

(4,Coat,Hank,4)

这是一个典型的“内连接”(inner join)操作:两个关系元组的每次匹配都和 结果中的一行相对应。因为连接谓词(join predicate)为相等,所以这其实是 一个等值连接(equijoin)。结果中的字段由桥有输入关系的所有字段组成。

如果要进行连接的关系太大,不能全部放在内存中,则应该使用通用的连 接操作。如果有一个关系小到能够全部放在内存中,则可以使用一种特殊 的连接操作,即“分段复制连接fragment replicate join),它把小的输入 关系发送到所有mapper,并在map端使用内存査找表对(分段的)较大的关系进行连接。要使用特殊的语法让Pig使用分段复制连接'

grunt> C = 30IN A BY $Q, B BY $1 USING "replicated";

这里,第一个关系必须是大的关系,后面则是一个或多个相对较小的关系 (能够全部存放在内存中)。

Pig也支持通过使用类似于SQL的语法(12.7.3节在讨论外连接时将介绍 Hive相关语法)进行外连接(outer join)。例如:

grunt> C = 30IN A BY $0 LEFT OUTER, B BY $1; grunt> DUMP C

(l,Scarf,,)

(2,Tie,]oe,2)

(2,Tie,Hank, 2)

(3,Hat,Eve,3)

<4,Coat,Hank,4)

2.关于COGROUP语句

JOIN结果的结构总是“平面”的,即一组元组。COGROUP语句和JOIN 类似,但是不同点在于,它会创建一组嵌套的输出元组集合。如果你希望 利用如下语句中输出结果那样的结构,那么COGROUP将会有用:

grunt> D = COGROUP A BY $0, B BY $1; 

grunt> DUMP D;

(0,{>,{(Ali,0)})

(l,{(l,Scarf)h{})

(2,{(2,Tie)},{(Joe,2),(Hank, 2)}) (3,{(3,Hat)M(Eve,3)})

(4,{(4,Coat)},{(Hank,4)})

COGROUP为每个不同的分组键值生成一个元组。每个元组的第一个字段 就是那个键值。其他字段是各个关系中匹配该键值的元组所组成的“包” (bag)。第一个包中包含关系A中有该键值的匹配元组。同样,第二个包 中包含关系B中有该键值的匹配元组。

如果某个键值在一个关系中没有匹配的元组,那么对应于这个关系的包就 为空。在前面的示例中,因为没有人购买围巾(ID1),所以对应元组的第 二个包就为空。这是一个外连接的例子。COGROUP的默认类型是外连 接。可以使用关键词OUTER来显式指明使用外连接,COGROUP产生的结 果和前一个语句相同:

①在USING子句中还可以使用其他关键词,包括“skewed(为包含偏斜的键值空间 的大规模数据集使用)“merge(为在要连接的键上已经进行了排序的输入关系 上使用合并连接)。具体如何使用这些特殊的连接操作,请参见Pig的帮助文档。


也可以使用关键词INNERCOGROUP使用内连接的语义,剔除包含空包 的行。INNER关键词是针对关系进行使用的,因此如下语句只去除关系A 中不匹配的行(在这个示例中就是去掉未知商品0对应的行):

grunt> E = COGROUP A BY $0 INNER, B BY $1; grunt> DUMP E;

(1,{(1,Scarf)},{})

(2,{(2,Tie)},{(Joe,2),(Hank,2)>)

(B,{(B,Hat)}J{(EveJ3)>)

(4,{(4,Coat)},{(Hank,4)})

我们可以把这个结构平面化,从A找出买了每一项商品的人。

grunt> F = FOREACH E GENERATE FLATTEN(A)/b.$0 grunt> DUMP F;

(1,Scarf,{})

(2,Tie,{(]oe),(Hank)})

(3,Hat,{(Eve)})

(4,Coat,{(Hank)})

COGROUPINNERFLATTEN(消除嵌套)组合起来使用相当于实现了 (内)连接:

grunt> G = COGROUP A BY $0 INNER, B BY $1 INNER; grunt> H = FOREACH G GENERATE FLATTEN($1), FLATTEN($2);

grunt> DUMP Hj (2,Tie,]oe,2)

(2,Tie,Hank,2)

(3,Hat,Eve,3)

(4,Coat,HankJ4)

这和JOIN A BY $0,B BY $1的结果是一样的。

如果要连接的键由多个字段组成,则可以在JOINCOGROUP语句的BY 子句中把它们都列出来。这时要保证每个BY子句中的字段个数相同。下 面是如何在Pig中进行连接的另一个示例。该脚本计算输入的时间段内每 个观测站报告的最高气温:

–max__temp_station_name. pig REGISTER pig-examples.jar;

DEFINE isGood com.hadoopbook.pig.IsGoodQuality();

stations = LOAD ' input/ncdc/metadata/stations-fixed-width.txt'

USING com.hadoopbook.pig.CutLoadFunc(’1-6,8-12,14-42’)

AS (usafrchararray, wbanrchararray, name:chararray);


trimmed_stations = FOREACH stations GENERATE usaf, wban, com.hadoopbook.pig.Trim(name);

records = LOAD 'input/ncdc/all/191*' USING com.hadoopbook.pig.CutLoadFunc('5-10,11-15,88-92,93-93')

AS (usaf:chararray, wban:chararray, temperature:int, quality:int);

filtered_records = FILTER records BY temperature != 9999 AND

isGood(quality);

grouped__records = GROUP filtered__records BY (usaf, wban) PARALLEL 30;

maxtemp = FOREACH grouped—records GENERATE FLATTEN(group),

MAX(filtered_records.temperature);

maxtempnamed = OIN maxtemp BY (usaf, wban), trimmedstations BY (usaf, wban) PARALLEL 30

max__temp_result = FOREACH max_temp_named GENERATE $0, $1, $5, $2;

STORE max_temp_result INTO ,max_temp_by_station,;

我们使用先前开发的cut UDF来加载包括气象观测站ID(USAFWBAN 标识)、名称的关系以及包含所有气象记录且以观测站ID为键的关系。我 们在根据气象观测站进行连接之前,先根据观测站ID对气象记录进行分 组和过滤,并计算最髙气温的聚集值。最后,在进行连接之后,我们把所 需要的字段——目卩USAFWBAN观测站名称和最高气温——投影到最终 结果。

下面是20世纪头10年的结果:

228020 99999 SORTAVALA 029110 99999 VAASA AIRPORT 040650 99999 GRIMSEY

因为观测站的元数据较少,所以这个査询可以通过使用“分段复制连接” 来进一步提升运行效率。

3. 关于CROSS语句

Pig Latin包含“叉乘”crossproduct,也称“笛卡儿积”操作。这一操作 把一个关系中的每个元组和第二个中的所有元组进行连接(如果有更多的关 系,那么这个操作就进一步把结果逐一和这些关系的每一个元组进行连 接)。这个操作的输出结果的大小是输入关系的大小的乘积。输出结果可能 会非常大:

grunt> I = CROSS A, B;

 grunt> DUMP I; 

(2,Tie,3oe,2)

(2,Tie,Hank,4) 

(2,Tie,Ali,0)

(2,Tie,Eve,3)

(2,Tie,Hank,2)

(4,Coat,]oe,2)

(4,Coat,Hank,4)

(4,Coat,Ali,0)

(4,Coat,Eve,3)

(4j Coat,Hank,2)

(3,Hat,3oe,2)

(3,Hat,Hank,4)

(3,Hat,Eve,3)

(3,Hat,Hank,2)

(1,Scarf,]oe)2)

(1,Scarf,Hank,4)

(IjScarf^Ali,0)

(1)Scarf,Eve,3)

(1,Scarf,Hank,2)

在处理大规模数据集时,应该尽量避免会产生平方(或更差)级中间结果的操 作。只有在极少数情况下,才需要对整个输入数据集计算叉乘。

例如,一开始,用户可能觉得必须生成文档集合中所有文档的两两配对組 合才能计算文档两两之间的相似度。但是,随着对数据和应用的深入了 解,他会发现大多数文档配对的相似度为零(即它们之间没有关系)。于是, 我们就能找到一种更好的算法来计算相似度。

在此,解决这一问题的主要思路是把计算聚焦于用于计算相似度的实体, 如文档中的关键词(term),让它们成为算法的核心。事实上,我们还要删去 对区分文档没有帮助的词,即禁用词(stopword),进一步缩减问题的搜索空 间。使用这一技术,分析近一百万个(106)文档大约会产生约十亿个(109)中 间结果文档配对。®而如果用朴素的方法(即生成输入集合的叉乘)或不消除 停用词,会产生一万亿个(1012)个文档配对。

4. 关于GROUP语句

COGROUP用于把两个或多个关系中的数据放到一起,而GROUP语句则对 一个关系中的数据进行分组。GROUP不仅支持对键值进行分组(即把键值 相同的元组放到一起),你还可以使用表达式或用户自定义函数作为分组键。例 如,有如下关系A:

 ElsayedLinOard. 2008. “Pairwise Document Similarity in Large Collections with MapReduce ” Proceedings of the 46th Annual Meeting of the Association of Computational Linguistics (ACL 2008): 265-268.


grunt> DUMP A;

(Joe,cherry)

(Ali,apple)

(Doe,banana)

(Eve,apple)

我们根据这个关系的第二个字段的字符个数进行分组:

gnunt> B = GROUP A BY SIZE($1); grunt> DUMP B;

(5,{(Ali,apple),(Eve,apple)})

(6,{(3oe,cherry)^(Doe,banana)})

GROUP会创建一个关系,它的第一个字段是分组字段,其別名为group 第二个字段是包含与原关系(在本示例中就是A)模式相同的被分组字段 的包。

有两种特殊的分组操作:ALLANYALL把一个关系中的所有元组放入 一个包。这和使用某个常量函数作为分组函数所获得的结果一样:

grunt> C = GROUP A ALL; grunt> DUMP C;

(all,{(Joe,cherry),(Alijapple),(Doe,banana)^(Eve,apple)})

注意,在这种GROUP语句中,没有关键词BYALL分组常用于计算关系 中的元组个数(参见11.3.4节对验证与空值的讨论)。

关键词ANY用于对关系中的元组随机分组。它对于取样非常有用。

11.6.4数据的排序

Pig中的关系是无序的。考虑如下关系A:

grunt> DUMP A;

(2.3) 

(1,2)

(2.4)

Pig按什么顺序来处理这个关系中的行是不一定的。特别是在使用DUMP STORE检索A中的内容时,Pig可能以任何顺序输出结果。如果想设置 输出的顺序,可以使用ORDER操作按照某个或某几个字段对关系中的数据 进行排序。默认的排序方式是对具有相同类型的字段值使用自然序进行排 (natural ordering),而不同类型字段值之间的排序则是任意的、确定的 (例如,一个元组总是小于一个包)。


如下示例对A中元组根据第一个字段的升序和第二个字段的降序进行排序:

grunt> B = ORDER A BY $0, $1 OESC; grunt> DUMP B;

(1,2)

(2,4)

(2.3)

对排序后关系的后续处理并不保证能够维持已排好的顺序。例如

grunt> C = FOREACH B GENERATE *;

即使关系C和关系B有相同的内容,关系CDUMPSTORE仍然可能 产生以任意顺序排列的输出结果。正是由于这样,通常只在获取结果前一 步才使用ORDER操作。 

LIMIT语句对于限制结果的大小以快速获得一个关系样本,非常有用。 “取原型化”prototyping)操作(即丨LLUSTRATE命令)则更适用于根据数据 产生有代表性的样本。LIMIT语句可紧跟ORDER语句使用,来获得排在最 前面的〃个元组。通常,LIMIT会随意选择一个关系中的个元组。但 是,当它紧跟ORDER语句使用时,ORDER产生的次序会继续保持(这和其 他操作不保持输入关系数据顺序的规则不同,是一个例外):

grunt> D = LIMIT B 2; grunt> DUMP D;

(1,2)

(2.4)

如果所给的限制值远远大于关系中所有元组个数的总数,则返回所有元组 (LIMIT操作没有作用)。

使用LIMIT能够提升系统的性能。因为Pig会在处理流水线中尽早使用限 制操作,以最小化需要处理的数据总量。因此,如果不需要所有输出数 据,就应该用LIMIT操作。

11.6.5数据的组合和切分

有时,你把几个关系组合在一起。为此可以使用UNION语句。例如:

grunt> DUMP A;

(2.3)

(1,2)

(2.4)

grunt> DUMP B;

(z,X,8)


(w,y,l)

grunt> C = UNION A, B; grunt> DUMP C;

(2.3)

(1,2)

(2.4)

(z,z,8)

(w,y,l)

C是关系AB“并”(union)。因为关系本身是无序的,因此C中元组 的顺序是不确定的。另外,如示例中那样,我们可以对两个模式不同或字 段个数不同的关系进行并操作。Pig会试图合并正在执行UNION操作的两 个关系的模式。在这个例子中,两个模式是不兼容的,因此C没有模式:

grunt> DESCRIBE A;

A: {f0: intjfl: int} grunt> DESCRIBE B

B: {f0: chararray,fl: chararray,f2: int} grunt> DESCRIBE C;

Schema for C unknown.

如果输出关系没有模式,脚本就需要能够处理字段个数和数据类型都不同 的元组。

SPLIT操作是UNION操作的反操作。它把一个关系划分成两个或多个关 系。可参考11.3.4节讨论验证与空值时所提供的示例,了解如何使用 SPLIT

11.7 Pig 实战

在开发和运行Pig应用程序时,知道一些实用技术是非常有帮助的。本小 节将介绍一些这样的技术。

11.7.1并行处理

运行在MapReduce模式时,一件重要的事情是使得处理的并行度与所处理 的数据集大小相匹配。Pig将根据输入数据的大小设置reducer的个数:每 1 GB输入使用一个reducer,reducer的个数不超过999。你可以通过设 置 pig.exec, reducers.bytes.per. reducer(默认为1 000 000 000 字节) 和pig.exec.reducers.max(默认为999)来修改这一设置。

为了告诉Pig每个作业要用多少个reducer,你可以在reduce阶段的操作中


使用PARALLEL子句。在reduce阶段使用的操作包括所有的“分组” (grouping)“连接”Joining)操作(GROUPCOGROUP、JOIN  CROSS), 以及DISCTINCTORDER下面的代码将GROUPreducer个数设为30:

grouped_records = GROUP records BY year PARALLEL 30;

也可以通过设置default_parallel选项来达到同样的目的。修改的选项

将作用于所有后续的作业:

grunt>

set default_parallel 30

设置reduce任务个数的一种比较好的方式是把该参数设为稍小于集群中的 reduce任务的槽(slot)的个数数。对于这个问题的详细讨论,可以参7.1.1 见的补充内容“选择reducer的个数”。

map任务的个数由输人的大小决定(每个HDFS块一个map),不受 PARALLEL子句的影响。

11.7.2参数代换

如果有定期运行的Pig脚本,你可能希望让这个脚本能够在不同参数设置 下运行。例如,一个每天运行一次的脚本可能要根据日期来决定它要处理 哪些输入文件。Pig支持“参数代换parameter substitution),即用运行时 提供的值替换脚本中的参数。参数由前缀为$字符的标识符来表示。例如, 在以下脚本中,$input$output用来指定输入和输出路径:

–max_temp__param. pig

records = LOAD '$input'AS (year:chararray' temperature:int, quality:int); filtered_records = FILTER records BY temperature != 9999 AND

(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); grouped_records = GROUP filtered_records BY year; max_temp = FOREACH grouped_records GENERATE group,

MAX(filtered_records.temperature);

STORE max_temp into '$output';

参数值可以在启动Pig时使用-param选项指定,每个参数一个:

% pig -param input=/user/tom/input/ncdc/micro-tab/sample.txt \

-param output=/tmp/out \

chll/src/main/pig/max_temp_param.pig

也可以把参数值放在文件中,通过-param_file选项把参数传递给Pig


例如,我们把参数定义放在文件中,也可以获得相同的结果:

Input file

input=/user/tom/input/ncdc/micro-tab/sample.txt

Output file output=/tmp/out

Pig的调用相应进行如下调整:

% pig -param_file chll/src/main/pig/max_temp_param.param \

> chll/src/main/pig/max_temp_param.pig

可以重复使用-param_file来指定多个参数文件,还可以同时使用-param -paran^file选项。如果同一个参数在参数文件和命令行中都有定义, 那么命令行中最后出现的参数值优先级最高。

1.动态参数

针对使用-paratri选项来提供的参数,很容易使其值变成动态的,运行命令 或脚本即可变为动态的。很多Unix shell都用反引号引用的命令来替换实际 值。我们可以使用这一功能实现根据日期来确定输出目录:

% pig -param input=/user/tom/input/ncdc/micro-tab/sample.txt \

-param output=/tmp/'date "+%Y-%m-%d" '/out \

chll/src/main/pig/max_temp_param.pig

Pig也支持在参数文件中的反引号,在shell中执行用反引号引用的命令, 并使用shell的输出结果作为替换值。如果命令或脚本返回一个非零的退出 状态并退出,Pig会报告错误消息并终止执行。在参数文件中使用反引号是 一种很有用的特性,它意味着可以使用完全相同的方法在文件中或命令行 中定义参数。

2. 参数代换处理

参数代换是脚本运行前的一个预处理步骤。可以使用dryrun选项运行Pig 来査看预处理器所进行的代换。在dryrun模式下,Pig对参数进行代换(以 及宏扩展)并生成一个使用了代换值的原来脚本的副本,但并不执行该脚 本。在普通模式下,可以在运行之前査看生成的脚本,检查参数代换是否 合理(例如,在动态生成代换的情况下)。

在写作本书时,Grunt仍然不支持参数代换。


转载请注明:全栈大数据 » 第十一章 关于Pig

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址