整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第十六章 实例学习

hadoop 花牛 16℃ 0评论

实例学习

16.1 Hadoop 在 Last.fm 的应用

16.1.1 Last.fm:社会音乐史上的革命

Last.fm创办于2002年,它是一个提供网络电台和网络音乐服务的社区网站,向用户提供免费在线音乐和音乐下载、音乐及重大事件推荐、个性化图表生成服务以及其他多种服务。每个月大约有2500万人使用Last.fm, 因而产生了大量需要处理的数据。一个例子就是用户会传输他们正在收听的音乐信息(也就是收藏[scrobbling])Last.fm处理并且存储这些数据,以便用户可以直接访问这些数据用图表的形式展示),并且可以利用这些数据 来推断用户的音乐品味、喜好和喜爱的艺术家,然后用于寻找相似的音乐。

16.1.2 Hadoop 在 Last.fm 中的应用

随着Last.fm各种网络服务的幵发,用户数目从数千增长到数百万,存储、 处理和管理这些用户数据渐渐变成一项挑战。幸运的是,当大家认识到 Hadoop技术能解决众多问题的时候,在共同努力下Hado叩的性能迅速稳定下来,并被积极地用于解决众多问题。2006年初,Last.fm开始使用 Hadoop,几个月之后便投入实际应用。Last.fm使用Hadoop的理由归纳如下:

    •分布式文件系统为它所存储的数据 例如,网络日志,用户收听音乐的相关数据提供冗余备份服务而不增加额外的费用 

    •可以方便地通过增添便宜、商用的硬件来满足可扩展性需求 

    •当时Last.fm财力有限,而Hadoop是免费的

    开源代码和活跃的社区意味着Last.fm能够自由地修改Hadoop,从而增添一些自定义特性和补丁

    Hadoop提供了一个灵活的、容易掌握的框架来进行分布式计算

现在,Hadoop已经成为Last.fm基础平台的关键组件,包括2Hadoop集群,涉及50台计算机、300个内核和100TB的硬盘空间。在这些集群上, 进行数百种日常作业的计算执行,例如日志文件分析、A/B测试评测、即席处理和图表生成。本节的例子将侧重于介绍产生图表的处理过程,因为这是Last.fmHadoop的第一个应用,它展示出Hadoop在处理大数据集时比其他方法具有更强的功能性和灵活性。 

16.1.3 Hadoop制作图表

Last.fm使用用户产生的音乐收听数据来生成许多不同类型的图表,例如针对每个国家或个人制作一周音乐汇总图表。许多Hadoop程序用于处理音乐 收听数据然后生成这些图表,它们可以以天、周或月为单位执行。图16-1 展示了这些数据在网站上如何被显示的一个例子,本例是音乐的周排行统计数据。

image.png

图16-1. Last.fm音乐排行统计图表

通常情况下,Last.fm有以下两种音乐收听信息来源:

    •用户播放自己的音乐(例如,在PC机或其他设备上听MP3文件), 这种信息通过Last.fm的官方客户端应用或第三方应用(有上百种) 发送到Last.fm

    •用户收听Last.fm某个网络电台的节目,并在本地计算机上通过流 技术缓冲一首歌。Last.fni播放器或站点能够用来访问这些流数据,然后提供给用户一些额外的功能,比如允许用户对她收听的音频进行喜爱、跳过或禁止等操作

在处理接收到的数据时,我们对它们进行分类:一类是用户提交的收听的 音乐数据(这是第一类数据,被称为“scrabble”,即收藏数据),另一类是用户收听的Last.fm的电台数据(这是第二类数据,被称为“radio listen”即 电台收听数据)。为了避免Last.fm的推荐系统出现信息反馈循环的问题, 对数据源的区分是非常重要的,Last.fm的推荐系统只使用收藏数据。 Last.fm的一项重要任务就是用Hadoop程序接受这些收听数据并处理成能够在Last.fm网站上进行显示的格式化数据,这些数据也能够作为其他 Hadoop程序的输入数据。这一过程是Track Statistics(音轨统计)程序实现 的,以下几节描述中将会用这段程序作为实例。

16.1.4  Track Statistics 程序

音乐收听信息被发送到Last.fm时,会经历验证和转换阶段,最终结果是一系列由空格分隔的文本文件,包含的信息有用户ID(userld)、曲目 ID(trackld)、收藏的次数(Scrobble)、收听的次数(Radio)以及被跳过的次数 (Skip)。表16-1包含一些采样的收听数据,后面介绍的例子将用到这些数 据,它是Track Statistics程序的输入(真实数据达GB数量级,并且具有 更多的属性字段,为了方便介绍,这里省略了其他字段)

表16-1.收听数据

UserId

TrackId

Scrobble

Radio

Skip

111115

222

0

1

0

111113

225

1

0

0

111117

223

0

1

1

111115

225

1

0

0

这些文本文件作为初始输入提供给Track Statistics程序,该程序包括利用这个输入数据计算各种数据值的两个作业和一个用来合并结果的作业(参见图 16-2)

image.png 

图 16-2. TrackStats 作业

Unique Listeners作业模块统计收听同一首音频曲目的不同用户数,实现手段是累计不同用户对该音频文件的第一次访问而忽略同一用户对这一文件的多次访问。Sum作业模块通过对所有用户的所有收听信息进行累加计数 来为每个音频曲目统计收听总数、收藏总数、电台收听总数以及被跳过的总数。

尽管这两个作业模块的输入格式是相同的,我们仍然需要两个作业模块, 因为Unique Listeners作业模块负责为每个用户对每个音频曲目产生统计值,而Sum作业模块为每个音频产生统计值。最后Merge作业模块负责合 并由这两个模块产生的中间输出数据得到最终统计结果。运行这段程序的 最终结果是对每个音频曲目产生以下几项数值:

    •不同的听众数 

    •音频曲目的收藏次数 

    •音频曲目在电台中的点播次数 

    •音频曲目在电台中被收听的总次数 

    •音频曲目在电台广播中被跳过的次数

下面我们将详细介绍每个作业模块和它的MapReduce过程。请注意,由于 篇幅有限,所提供的代码段已被简化。要想下载完整的代码,请参考本书“前言”。

1.计算不同的听众数

Unique Listeners作业模块用于计算每个音频曲目的不同收听用户数。

UniqueListenerMaper UniqueListenerMaper 程序处理用空格分隔的原始收听数据,然后输出user ID(用户ID), track ID(音频ID)对。:

public void map(LongWritable position, Text rawLine, OutputCollector<IntWritable, 
IntWritable> output, Reporter reporter) throws IOException {
String[] parts = (rawLine.toString()).split(""); 
    int scrobbles =
Integer.parselnt(parts[TrackStatistiesProgram.COL_SCROBBLES]); 
    int radioListens =
Integer.parselnt(parts[TrackStatisticsProgram.COL_RADIO]);
// if track somehow is marked with zero plays - ignore 
if (scrobbles <= 0 && radioListens <= 0) { 
return;
}
// if we get to here then user has listened to track,
// so output user id against track id 
    IntWritable trackld = new IntWritable(
Integer.parselnt(parts[TrackStatisticsProgram.COL_TRACKID])); 
    IntWritable userid = new IntWritable(
Integer.parselnt(parts[TrackStatisticsProgram.COL_USERID])); 
output.collect(trackld, userid);
}

UniqueListenersReducer UniqueListener'sReducer 接收到每个track ID 对应的user ID数据列表之后,把这个列表放入集合Set中以消除重复的用户HD。然后输出每个track ID对应的这个集合的大小(不同用户数)。但是如果某个键对应的值太多,在set中存储所有的reduce值可能会有内存溢出的危险。实际上还没有出现过远种问题,但是为了避免这一问题,我们可以引入一个额外的MapReduce处理步骤或使用辅助排序的方法来删除重复数据(详细内容请参考8.2.4)

public void reduce(IntWritable trackld, Iterator<IntWritable> values, 
OutputCollector<IntWritable, IntWritable〉 output, Reporter reporter) 
throws IOException {
    Set<Integer> userids = new HashSet<Integer>();
    // add all userids to the set, duplicates automatically removed (set contract) 
    while (values.hasNext()) {
        IntWritable userid = values.next(); 
        userids.add(Integer.valueOf(userid.get()));
    }
    //output tnackld -> number of unique listeners per track 
    output.collect(trackld, new IntWritable(userIds.size()));
}

表16-2是这一作业模块的输入数据样本。map输出结果如表16-3所示, reduce输出结果如表16-4所示。

表16-2. 作业的输入

Line of file

LongWritable

UserId

IntWriable

TrackId

IntWriteable

Scrobbled

Boolean

Radio play

Boolean

Skip

Boolean

0

11115

222

0

1

0

1

11113

225

1

0

0

2

11117

223

0

1

1

3

11115

225

1

0

0

表16-3. map输出

TrackId

UserId

IntWritable

IntWritable

222

11115

225

11113

223

11117

225

11115

2.统计音频播放总数

Sum作业相对简单,它只为每个音频曲目累计我们感兴趣的数据。

SumMapper    输入数据仍然是原始文本文件,但是这一阶段对输入数据的 处理完全不同。期望的输出结果是针对每个音轨的一系列累计值(不同用户 数、播放次数、收藏次数、电台收听次数和跳过次数)。为了方便处理,我 们使用一个由Hadoop Record I/O类产生的TrackStats类中间对象来保存这些数据,它实现了 WritableComparable接口(因此可被用作输出)。 Mapper创建一个TrackStats对象,并根据文件中的每一行数据设置相应的属性值,但是“不同的用户数”(unique listener count)这一项没有填写(这 项数据由merge作业填写)

public void map(LongWritable position, Text rawLine,
OutputCollector<IntWritableJ TrackStats> output, Reporter reporter) 
throws IOException {
    String[] parts = (rawLine.toString()).split("");
    int trackld = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]); 
    int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); 
    int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL^RADIO]); 
    int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]);
    // set number of listeners to 0 (this is calculated later)
    // and other values as provided in text file
    TrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip); 
    output.collect(new IntWritable(trackld), trackstat);
}

SumReducer   在本实例中,reducer执行和mapper相似的函数,对每个音 频曲目的播放情况进行统计,然后返回一个总的统计数据:

public void reduce(IntWritable trackld, Iterator<TrackStats> values, 
OutputCollector<IntWritableJ TrackStats> output, Reporter reporter) 
throws IOException {
    TrackStats sum = new TrackStats(); // holds the totals for this track 
    while (values.hasNext()) {
        TrackStats trackStats = (TrackStats) values.next(); 
        sum.setListeners(sum.getListeners() + trackStats.getListeners()); 
        sum.setPlays(sum.getPlays() + trackStats.getPlays()); 
        sum.setSkips(sum.getSkips() + trackStats.getSkips()); 
        sum.setScrobbles(sum.getScrobbles() + trackStats.getScrobbles()); 
        sum.setRadioPlays(sum.getRadioPlays() + trackStats.getRadioPlays());
    }
    output.collect(trackld, sum);
}

表16-5是这部分作业的输入数据(Unique Listener作业的输入一样)map 的输出结果如表16-6所示,reduce的输出结果如表16-7所示。

表16-5. 作业输入 

Line

UserId

TrackId

Scrobbled

Radio play

Skip

LongWritable

IntWritable

IntWritable

Boolean

Boolean

Boolean

0

11115

222

0

1

0

1

11113

225

1

0

0

2

11117

223

0

1

1

3

11115

225

1

0

0

表16-6. map输出

 

TrackId

#listeners

#plays

#scrobbles

#Radio play

#Skips

LongWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

0

1

0

1

0

225

0

1

1

0

0

223

0

1

0

1

1

225

0

1

1

0

0

表16-7. reduce输出

TrackId

#listeners

#plays

#scrobbles

#radio plays

#skips

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

0

1

0

1

0

225

0

2

2

0

0

223

0

1

0

1

1

3.合并结果 

最后一个作业模块需要合并前面两个作业产生的输出数据:每个音频曲目对应的不同用户数和每个音频的使用统计信息。为了能够合并这两种不同的输人数据,我们采用了两个不同的mapper(对每一种输入定义一个)。两个中间作业被配置之后可以把它们的输出结果写入路径不同的文件, Multiplelnputs类用于指定mapper和文件的对应关系。下面的代码展示了如何设置作业的;JobConf类来完成这一过程:

MultipieInputs.addInputPath(conf, sumlnputDir,
    SequenceFilelnputFormat.class,IdentityMapper.class);
Multiplelnputs.addlnputPath(conf, listenerslnputDir,
    SequenceFileInputFormat.class, MergeListenersMapper.class);

虽然单用一个mapper也能处理不同的输入,但是示范解决方案更方便、更巧妙。

MergeListenersMapper 这个 mapper 用来处理 UniqueListenerDob 输出的每个音频曲目对应的不同用户数据。它采用和SumMapper'类似的方法创建TrackStats对象,但这次它只填写每个音频曲目对应的不同用户数信 息,不管其他字段:

public void map(IntWritable trackld, IntWritable uniqueListenerCount, 
OutputCollector<IntWritable, TrackStats) output, Reporter reporter) 
throws IOException {
    TrackStats trackStats = new TrackStats(); 
    trackStats.setListeners(uniqueListenerCount.get()); 
    output.collect(trackId, trackStats);
}

表16-8是mapper的一些输入数据,表169是对应的输出结果。

表 16-8. MergeListenersMapper 的输入

TrakId

#listeners

IntWritable

IntWritable

222

1

225

2

223

1

表 16-9. MergeListenersMapper 的输出

TrakId

#listeners

#plays

#scrobbles

#radio

#skips

222

0

0

0

0

0

225

0

0

0

0

0

223

0

0

0

0

0

IdentityMapper被配置为用来处理SurtOob输出的TrackStats对象,因为不要求对数据进行其他处理,所以它直接输出输入数据(参见表16-10)

表16-10. IdentityMapper的输入和输出

TrackId

#listeners

#plays

#scrobbles

#radio plays

#skips

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

0

1

0

1

0

225

0

2

2

0

0

223

0

1

0

1

1

前面两个mapper产生同一类型的数据:每个音频曲目对应的TrackStats 对象,只是数据赋值不同。最后的reduce阶段能够重用前面描述的 SumReducer来为每个音频曲目创建一个TrackStats对象,它综合前面两 个TrackStats对象的值,然后输出结果(参见表16-11)

最终输出文件被收集后复制到服务器端,Last.fm网站调用一个Web服务得 到并展示这些数据。如图16-3所示,这个网页展示了一个音频曲目的使用 统计信息:收听者总数和播放总次数。

表16-11. SumReducer的最终输出

TrackId

#listeners

#plays

#scrobbles

#radio plays

#skips

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

1

1

0

1

0

225

2

2

2

0

0

223

1

1

0

1

1

image.png

图 16-3. TrackStats 结果 

16.1.5总结

Hadoop已经成为Last.fm基础框架的一个重要部件,它用于产生和处理各种各样的数据集,如网页日志信息和用户收听数据。为了让大家能够掌握主要的概念,这里讲述的例子已经被大大地简化;在实际应用中输入数据具有更复杂的结构并且数据处理的代码也更加繁琐。虽然Hadoop本身已经足够成熟可以支持实际应用,但大家仍在积极地开发Hadoop,每周Hadoop 社区都会为它增加新的特性并提升它的性能。Last.fm作为代码和新想法的

贡献者很高兴是这个社区的一份子,同时也是对大量开源技术进行利用的终端用户。

16.2Hadoop 和 Hive Facebook 的应用

Hadoop可以用于构建核心的后台批处理以及近似实时计算的基础架构。它也可用于保存和存档大规模数据集。在下面这个实例中,我们将主要考察后台的数据架构以及Hadoop在其中充当的角色。我们将在假想的Hadoop 配置下描述具有潜力的Hive系统(Hive是建立于Hadoop之上的数据仓库和 SQL体系结构的开源代码)和使用该体系架构构建的各种各样的商业及产品应用。

16.2.1 Hadoop 在 Facebook 的使用

1.发展史

随着Facebook网站的使用量增加,网站上需要处理和存储的日志和维度数 据激增。在这种环境下对任何一种数据处理平台的一个关键性要求是它必 须具有快速的支持系统扩展的应变能力。此外,由于工程资源有限,所以 系统必须是可靠的,并且易于使用和维护。

Facebook最初使用的数据仓库都是在Oracle系统上实现的。在遇到可扩展 性和性能方面的问题之后,大家开始调査是否可以采用开源技术解决我们 的问题。作为调査工作的一部分,我们部署了 一个相对小规模的Hadoop系 统,并且把一部分核心数据集_发布到这个系统上。Hadoop对我们来说有相 当大的吸引力,一是因为雅虎*内部就一直使用这一技术支持后台数据批处 理需求,二是因为我们熟知由Google带动并普及使用的MapReduce模型, 它具有简单性和可扩展性的特点。

我们最初的原型系统开发得非常成功:工程师们都非常欣赏它能在合理的 时间范围内处理大数量级数据的能力,这正是我们以前所没有的。再者, 能用自己熟悉的编程语言来进行数据处理工作(使用Hadoop Streaming),他 们也感到非常髙兴。把我们的核心数据集发布到一个集中的数据仓库系统 也非常方便。几乎同时,我们开始开发Hive工具。这使用户在Hadoop

群上处理数据变得更加容易,因为普通的计算需求都能用大多数程序员和 分析师们熟悉的SQL语句来表达。

其结果是,集群的规模和对它的使用迅速增长,现在Facebook正在运行世 界第二大Hadoop集群系统。在写这篇文章的时候,我们在Hadoop上存放 的数据超过了 2PB,每天给它加载的数据超过10TB。我们的Hadoop系统 具有2400个内核,大约9 TB的内存,并且在一天之中的很多时间点,这 些设备都是满负荷运行的。根据系统的增长情况,我们能够迅速地进行集 群规模扩展,而且能够利用开源的优点,通过修改Hadoop代码让它适应我 们的需求。同时我们也对开源做出了贡献,比如我们开发的一些Hadoop核 心组件,我们提供了开源的Hive,它现在也是Hadoop上层的一个子项目。

2.使用实例

在Facebook,对Hadoop至少有下面四种相互关联但又不同的用法。

在大规模数据上产生以天和小时为单位的概要信息。这些概要信息 在公司内用于各种不同的目的。

    基于这些概要信息产生的报告,可供工程或非工程职能组用来 制定产品决策。

    概要信息包含用户数、网页浏览次数和网站访问时间的增长情 况,提供在Facebook上进行广告营销活动的相关的性能评估。

    数据对网站相关属性提供后台处理支持,比如计算你喜欢的人 和应用程序。

在历史数据上运行即席作业。数据分析结果有助于产品组和执行主管解决问题。

成为我们日志数据集实用而长期的存档存储器。

•通过特定的属性进行日志事件査询(用这些属性对日志建索引),这可以用于维护网站的数据完整性并且保护用户免受垃圾邮件程序的骚扰。

3.数据架构

图16-4展示了数据架构的基本组件以及这些组件间的数据流。

如图16-4所示,数据处理过程中使用了以下组件。

 image.png

图16-4. Facebook的数据仓库架构

• 记录器(Scribe)日志数据是由Web服务器以及后台服务如捜索后 台(Search backend)产生的。我们使用Facebook自己开发的一个开 源日志收集服务Scribe(记录器),它把几百个日志数据集(每天有几 十个TB的数据量)存放在几个网络文件服务器(NFS)上。

• HDFS大部分的日志数据被复制存入一个中央的HDFS系统。每 天,维度数据也从内部的MySQL数据库复制到这个HDFS文件 系统。

• Hive/Hadoop 我们备用由Facebook开发的Hadoop的子项目Hive HDFS收集的所有数据创建一个数据仓库。HDFS中的文件包括 来自Scribe的日志数据和来自MySQL的维度数据,它们都存作可以访问的具有逻辑分区的表(table)Hive提供了一种类SQL的査 询语言,它配合MapReduce创建/发布各种概要信息和报表以及对这些表格数据执行历史分析。

4.Tools(工具集)

建立于Hive之上的浏览器界面允许用户通过几次简单的鼠标点击来创建和发出Hive查询(依次启动相应的MapReduce作业)

• 传统的关系数据库系统我们使用OracleMySQL数据库来发布这些概要信息。虽然这些数据库存储的数据量相对较小,但是査询 频度很高并且我们需要对査询做出实时响应。

DataBee 它是一个内部的ETL(数据提取、转换和加载,即 Extraction-Transformation-Loading)工作流软件,它为跨数据处理作业的可靠批处理提供一个通用的框架。

在网络文件系统层(NFS tier)存储的来自Scribe的数据被复制作业持续复制 到HDFS集群上。NFS设备被挂接在Hadoop层,复制处理成为在Hadoop 集群上运行的只有map阶段的作业。这使对复制处理的扩展变得更容易, 而且使其具有错误恢复的能力。目前,我们每天用这种方法从Scribe复制6 TB以上的数据到HDFS。我们每天也从MySQL层下载多达4 TB的维度数 据到HDFS。在Hadoop集群上调度这些把数据从MySQL复制出来的map作 业是非常简单的。

5.Hadoop 配置

Hadoop部署工作的中心思想是整合。我们使用一个HDFS系统,大量的处 理工作在一个单独的MapReduce集群上完成(运行一个单独的jobtracker)。 这样做的原因很简单,如下所示。

• 只运行一个集群可以最小化管理成本

• 数据不必复制。对于前面描述的使用样例,可以利用同一个地方的 数据满足所有的任务需求

• 所有的部门都使用同一个计算机集群可以极大地提升效率 •我们的用户工作在一个相互协作的环境下,因此对于服务质量的要 求还不是很髙(至少就目前而言)

我们也拥有一个单独的共享的Hive元数据存储工具(MySQL数据库), 它管理HDFS上存储的所有Hive表涉及的元数据信息。

16.2.2虚构的使用样例

这一节,我们将描述在大型网站上经常遇到的问题,由于涉及的开销和规 模都太大,这些问题很难通过传统的数据仓库管理技术来解决。Hadoop和

Hive技术针对这些问题提供了一种扩展性更好、性价比更高的解决方案。

1.广告客户的洞察力和广告性能

Hadoop最普遍的一个用途是为大量数据产生概要信息。通常用于大型广告 网络,如Facebook广告网络,GoogleAdSense等等,为广告商提供他们所 发布的广告的常规汇总统计信息,这样做可以有效地帮助广告商调整他们 的广告营销活动。

在大规模数据集上统计广告效果是一种数据密集型操作,Hadoop和Hive 在可扩展性和计算开销上的优势非常适用于这一工作,它们可以在合理的 时间和资金消耗范围内完成这些计算。

许多广告网络为广告商提供了标准的基于CPC和CPM的广告计费单位。 CPC是根据广告的“点击数计费”(cost-per-click):广告商根据访问这个网 站的用户对广告的点击总数付费。CPM是根据在这个网站上看广告的人数 的比例计费(每千次曝光率费率cost-per-mille)。除了这些标准计费单位以 外,在最近几年,支持对个体用户进行剪辑的具有更多动态内容的广告(个 性化广告定制)在在线广告业中流行起来。雅虎通过SmartAds实现个性化 广告定制,Facebook则给广告商提供了 Social Ads。而后者允许广告商把来 自用户朋友网络的信息嵌入到广告中;例如,一则Nike广告可能指向某用 户的一位朋友,而这个朋友近期刚好也喜欢这个品牌,并且在Facebook上 和朋友公开共享这个喜好。另外,Facebook也为广告商提供了参与广告 (Engagement Ad),通过对广告发表意见/与嵌入视频交互,用户可以更有效 地和广告交互。总之,在线广告网络为广告商们提供了各种广告发布途 径,广告商们感兴趣的是其广告营销活动的相关效果数据,而这种多样性 又为计算各种各样的性能数据_增添了难度。

广告商的最基本要求是希望知道观看或点击了他们的广告的总用户数以及 不同用户总数。对于动态广告,他们甚至会对这些汇总信息的细节感兴 趣,如动态广告毎个单元播放的次数或交互性广告用户参与的次数。例 如,一个特定的广告可能向3万个不同用户播放了 10万次。类似地,一段 嵌入到Engagement Ad的视频可能已经被10万个不同的用户观看。另外, 通常我们会针对每则广告、每次广告营销活动和每个账户汇总这些性能数 据给出报表。一个帐号有可能会对应多个广告营销活动,而每个活动可能 运行多则网络广告。最后,广告网络通常会根据不同时间粒度报告这些数 据。典型的时间粒度有天、周、月(起始日期相同)和月(固定天数),甚至有

时候是整个广告营销活动周期。再者,根据不同的数据分片和分块的方 法,可以对汇总数据进行低粒度钻取,一个例子是广告商们可以査看汇总 数据在地理上的分布情况,比如,对于某一则特定广告,亚太地区的浏览 者或点击者占多大比率。

很明显,这里有四种主要的维度层次:账户、广告营销活动和广告维度, 时间段维度;交互类型维度,用户维度。最后一个用于汇报不同的用户人 数,而其他三个则是相应维度的汇总。用户维度也可用来产生浏览和点击 用户的地理分布汇总图。总而言之,广告商们利用这些信息来调整他们的 广告营销活动,从而提高他们在广告网络上的广告效果。除了这些数据的 多维属性之外,从处理的数据量以及每天数据量的增长速度来看,如果没 有Hadoop这样的技术,大型广告网络的可扩展社将遇到困难。举个例子, 写这篇文章的时候,Facebook为了计算广告的性能数据,每天所处理的日 志大约是I TB数量级(非压缩数据)20081月,每天处理的日志数据量 大约是30 GB,可见当前的数据量已经增长了 30倍。随着硬件的增加, Hadoop扩展性增强的特性才使我们对数据的处理能力能适应数据量的增 长,而它的代价只是对任务配置文件进行小小的修改。通常,配置修改是 指增加数据密集型计算Hadoop作业的reducer的个数。目前,这些计算中 最大的任务是运行400reducer(20081月所用的50reducer增加 了 8 )

2.即席分析和产品反馈

除了产生定期报表之外,数据仓库解决方案的另一种主要应用是支持即席 分析和产品反馈。例如,一个典型的网站对其产品功能做了修改之后,产 品经理或工程师们通常会基于与这个新特性相关的用户交互信息和点击率 来推断这个新特性的影响。产品团队甚至希望对这个改变带来的影响做更 深入的分析,分析有可能针对不同区域和国家进行,例如这个改变是否使 美国用户的点击率增加或印度用户的使用减少。使用Hive和标准SQL语 言,在Hadoop上可以完成很多类似的分析工作。点击率测定可以简单地表 达成广告的曝光数和与此新特性相关链接点击次数之间的join操作。这种 数据能和地理位置信息结合起来用于计算产品某个特性的改变对不同区域 用户产生的影响。因此,通过对这些数据进行聚集运算,我们可以得到平 均点击率在不同地理区域上的分布情况。Hive系统用几行SQL査询语句就 可以简单方便地表达所有这些工作需求(这也将相应地产生多个Hadoop作 业)。如果只需要估算,可以使用Hive本身支持的取样函数取一组样本用户

数据,然后运行同样的査询语句即可。其中有些分析工作需要使用自定义 map和reduce脚本与Hive SQL联合执行,这种脚本也可以轻松嵌入到 Hive査询语句。

一个更加复杂的分析任务是估算在过去一整年里每分钟登录到网站的峰值 用户数。这个工作涉及对网页浏览日志文件采样(因为人气网站的网页浏览 日志总数是很庞大的),根据时间对它们分组,然后运行自定义reduce脚本 找出不同时间点的新用户数。这是一个要求同时使用SQLMapReduce来解决终端用户问题的典型例子,而且利用Hive来解决这样的问题是非常容易的。

3.数据分析

Hive和Hadoop可以轻松地用于为数据分析应用进行训练和打分。这些数 据分析应用能跨越不同领域,如人气网站、生物信息公司和石油勘探公 司。对于在线广告产业来说,这种应用的一个典型实例是预测什么样的广 告特征能使广告更容易被用户注意到。通常,训练阶段涉及确定响应度量 标准和预测特征。在本例中,评测广告效用的一个良好度量标准可以是点 击率。广告的一些有趣的特征可能是广告所属的垂直分类、广告内容、广 告在网页中的位置等。Hive可以简便易行地收集训练数据,然后把数据输 入数据分析引擎(通常是R程序或用户写的MapReduce程序)。在本例中, 不同的广告性能效果数据和属性特征可以被结构化成为Hive的表格。用户 可以方便地对数据进行取样(R程序只能处理有限数据集,因此取样是必须 的),使用Hive査询语句执行合适的聚集和连接操作然后整合成一个结果响 应表,它包含了决定广告效果的最重要的广告特征。然而,取样会有信息 损失,有些更加重要的数据分析应用就用MapReduce框架完成对常用的数 据分析内核程序的并行实现来减少信息损失。

一旦模型训练出来,它就被部署用于根据每天的数据进行打分评估工作。 但是大多数数据分析任务并不执行每日评测打分工作。实际上,其中有很 多数据分析任务是即席的,要求做到一次分析,然后结果作为输入进入产 品设计过程。

16.2.3 Hive

刚开始使用Hadoop时,我们很快就倾倒于它的可扩展性和有效性。然而, 我们担心它是否可以被广泛采用,主要是因为用JavaMapReduce程序的

复杂度问题(还有培训用户写这种程序的代价)。我们知道很多公司的工程师 和分析师很了解SQL,它是一种査询和分析数据的工具,并且我们也清楚 很多人都精通几门脚本语言,如PHPPython。因此,我们必须开发出一 种软件来建立用户精通的脚本语言和Hadoop编程所需语言之间的沟通 渠道。

很明显,我们的很多数据集是结构化的,而且能够很容易进行数据分割。 这些要求很自然地形成一个结果:我们需要一个系统,它可以把数据模型 化成表格和数据块,并且它能够提供类似SQL的査询和分析语言。另外, 该系统能把用户所选编程语言编写的自定义MapReduce程序嵌入査询。这 个系统就是HiveHive是一个构建于Hadcwp之上的数据仓库架构,是 FacebookHadcwp中存储的数据进行査询的S"要工具。在下面几个小节, 我们将详细描述这一系统。

1.数据的组织

在所有数据集中,数据的组织形式是一致的,被压缩、分区和排序之后进 行存储。

• 压缩几乎所有数据集都采用gzip codec压缩存储成顺序文件。旧的数据采用bzip codec重新压缩,这样可以比用gzip编码压缩更 多。bzip压缩的速度比gzip慢,但是对旧数据的访问频率要低很 多,因此考虑到节省硬盘空间,这种性能损失还是很值得的。

• 分区大部分数据集根据日期进行分区。独立的分区块被加载到 Hive系统,实际是把每个分区块存入HDFS的一个单独的目录 下。大多数情况下,这种分区只根据相关联的记录日志文件(scribe logfile)的时间戳进行。然而,在某些情况下,我们扫描数据,然后 基于日志条目里能找到的时间戳进行数据划分。回顾前面的介绍, 我们也可以根据数据各种其他特征进行分区(如国家和日期)

• 排序在一张表里,每个分区块通常根据某个唯一标识(ID,如果 存在的话)进行排序。这样的设计有几个主要的优点:

在这样的数据集上容易执行取样查询操作 我们能基于排序数据建立索引

对具有唯一标识的数据进行聚集和连接运算更有效

日常的MapReduce作业会把数据加载成这种数据长期保存格式(和近实时的 数据导入处理不同)。

2.查询语言

Hive査询语言和SQL类似。它具有传统的SQL的结构,如joingroup by, where, select, from从句和from从句的子査询等。它试图把SQL命令 转换成一系列的MapReduce作业。除普通的SQL从句之外,它还有一些扩 展功能,如在查询语句中嵌入自定义mapperreducer脚本,对数据进行 一次扫描就可以进行多个表、数据块、HDFS文件和本地文件插入,在样本 数据而不是全部数据集上执行查询的功能(在测试查询的时候,这个功能非 常有用)。Hive metastore存储了表的元数据信息,并将提供元数据给Hive 编译器,以便进行将SQL命令转换成MapReduce作业。通过数据块修剪, map端的聚合和其他一些特色功能,Hive编译器会努力构造可以优化查询 运行时间的执行计划。

3.在数据流水线管道中使用Hive

另外,Hive提供了在SQL语句里表述数据流水线的能力,这•功能能够并 且已经提供相当的灵活性来支持用简单方便的方式对数据流进行合并操 作。这一功能对于改进中或开发中的系统和产品尤其有用。处理数据流时 所需的许多操作都是大家非常了解的SQL操作,如join, group by, distinct aggregation等。由干Hive能够把SQL语句转换成一系歹lj Hadoop MapReduce作业,所以创建和维护这些数据流就非常容易。在这一节,我 们用一个虚构的网络广告例子,展示使用Hive计算广告商所需的某些典型 汇总表,从而说明Hive这些方面的功能。例如,假设某个在线广告网络在 Hive系统里把广告信息存储在dim_adS表中,把和某个广告曝光相关的信 息存储在impression_logs表中,impressionjogs表数据根据日期进 行分区,那么在Hive系统中査询2008-12-01这天的广告曝光数(广告网络 会把广告营销中的每个以及总的广告曝光数定期反馈给广告商)可以表达为 如下SQL语句:

SELECT a.campaign_id, countCl)^ count(DISTINCT b.user_id)
FROM dim_ads a DOIN impression_logs b 0N(b.ad_id = a.ad_id)
WHERE b.dateid = '2008-12-01'
GROUP BY a.campaign_id;

这也是大家能在其他RDMS(关系数据库系统)OracleDB2等上使用的 典型的SQL语句。

为了从前面同样的关联好的数据上以广告和账户为单位计算每天的广告曝 光次数,Hive提供了同时做多个group by操作的能力,査询如下所示(类似 SQL语句但不是严格意思上的SQL):

FROM(
SELECT a.ad_id, a.campaign_id, a.account_id, b.user_id 
FROM dim ads a JOIN impression_logs b ON (b.ad_id = a.ad_id)
WHERE b.dateid = '2008-12-01') x
 INSERT OVERWRITE DIRECTORY 'results_gby_adid'
SELECT x.ad_id, count(l), count(DISTINCT x.user_id) GROUP BY x.ad_id 
INSERT OVERWRITE DIRECTORY ' result s_gby._campaignid '
SELECT x.campaign_id, count(l), count(DISTINCT x.user_id) GROUP BY 
x.campaign_id
INSERT OVERWRITE DIRECTORY 1 results_gby_accountid1
SELECT x.account_id, count(l)j count(DISTINCT x.user_id) GROUP BY 
x.account_id;

Hive增添的一项优化功能是查询能被转换成一系列适用于“偏斜数据” (skewed data)Hadoop MapReduce作业。实际上,join操作转换成一个 MapReduce作业,三个group by操作转换成四个MapReduce任务,其中第 一个任务通过unique_id产生部分聚集数据。这样做非常重要,因为 impression_logs表的数据在unique_id的分布比在ad_id上的分布更 均匀(通常在一个广告网络中,有些广告因为其客户分布更均匀使得其广告 份额占主导地位)。因此,通过uniquejd进行部分聚集操作能让数据流把 工作更均勻地分配到各个reducer。简单改变査询中的日期谓词,同一个査询模板便可以用于计算不同时间段的性能数据。

但是计算整个广告周期的数据可以采用更好的方法,如果使用前面介绍的 计算策略,我们必须扫描impression_logs表中的所有分区。因此,为了计算整个广告周期的数据,一个更可行的方法是在每天的中间表的分区上 执行根据ad_iduniqUe_id的分组操作。这张表上的数据可以和次日的 impression_logs合并增量产生整个周期的广告效果数据。例如,要想得 到2008-12-01日的广告曝光数据,就需要用到2008-11-30日对应的中间表分区数据块。如下面的Hive查询语句所示:

INSERT OVERWRITE lifetime—partial—imps PARTITION(dateid='2008-12-01,) 
SELECT x.ad_id, x.user一id, sum(x.cnt)
FROM (
    SELECT a.ad_id, a.usen_id, a.cnt 
    FROM lifetime_partial_imps a 
    WHERE a.dateid = '2008-11-30'
    UNION ALL
    SELECT b.ad_id, b.user_id,1 as cnt 
    FROM impression_log b
    WHERE b.dateid = '2008-12-01'
)x
GROUP BY x.ad_idj x.user_id;

这个查询为2008-12-01计算局部汇总数据,它可以用来计算2008-12-01的 数据以及2008-12-02的数据(这里没有显示)。SQL语句转换成一个单独 Hadoop MapReduce作业,它实际上是在合并的输入流上做group by计 算。在这个SQL语句之后,可以做如下的Hive查询,它为每个分组计算 出实际的数据(与前面对日汇总的数据流水线的查询相似)

FR0M(
SELECT a.ad_id, a.campaign_id, a.account_id, b.user_id, b.cnt 
FROM dim_ads a DOIN lifetime_partial_imps b ON (b.ad_id = a.ad_id)
WHERE b.dateid = '2008-12-01') x
INSERT OVERWRITE DIRECTORY ' results_gby_adid '
SELECT x.ad_id^ sum(x.cnt), count(DISTINCT x.user_id) GROUP BY 
x.ad id
INSERT OVERWRITE DIRECTORY ' results_gby_campaignid'
SELECT x.campaign_id, sum(x.cnt), count(DISTINCT x.user_id) GROUP BY 
x.campaign_id
INSERT OVERWRITE DIRECTORY 'results—gby_accounticT
SELECT x.account—id, sum(x.cnt), count(DISTINCT x.u$e「_id) GROUP BY 
x.account_id;

Hive和Hadoop都是批处理系统,它们计算数据的延迟比标准RDBMS要 高,如OracleMySQL。因此,在许多情况下,把HiveHadoop系统产 生的概要信息加载到传统的RDBMS还是非常有用的,这样用户可以通过 不同的商业智能(BI)工具或网络门户来使用这些数据。 16.2.4存在的问题与未来工作计划

1.公平共享

Hadoop集群通常同时运行多个日常报表生成作业”(production daily job) 和“即席作业”(ad hoc job),日常报表生成作业需要在某个时间段内完成计算任务,而即席作业则可能具有不同的优先级以及不同的计算规模。在 选择典型安装时,日常报表生成作业倾向于夜间运行,这时来自用户运行 的即席作业的干扰最小。然而,大规模即席作业和日常报表生成作业之间 的工作时间重合常常是不可避免的,如果没有充分健壮的保障措施,这种 作业重合会导致报表生成作业的高延迟。ETL(数据提取、转换和加载)处理 也包含几个近实时的作业,它们都必须以小时为间隔地运行(包括对以下数 据进行处理:从NFS服务器复制Scribe数据以及在某些数据集上以小时为

单位计算得到的概要数据)。这也意味着只要有一个意外作业就会使整个集 群宕机,使结果生成处理作业处于危险境地。

Facebook开发并且贡献给Hadoop系统的Hadoop公平共享作业调度器(job scheduler)为许多类似的问题提供了解决方案。它为特定作业池中的作业保 留了保障性计算资源,同时让闲置资源可以被任何作业使用。通过在各个 作业池之间以一种公平手段分配计算资源也可以防止大规模作业霸占整个 集群资源。在集群里,内存是其中一种竞争比较激烈的资源。我们对 Hadoop系统做了一些修改,即如果发现jobtracker的内存短缺,Hadoop就 会减缓或遏制作业提交。这能保证用户进程能够得到合理的进程内存空间,并且为了阻止在同一个节点运行的MapReduce作业影响HDFS后台程 序(主要是因为大内存消耗),可以在节点运行时^玫置一些监控脚本程序。日志目录存储在单独的硬盘分区,并且定期清理,我们认为把MapReduce中 间结果存储放在单独的硬盘分区上也是有用的。

2.空间管理

硬盘容量管理仍然是一个大挑战——数据的增长带来了硬盘使用的急速增 加。许多数据集日益攀升的成长型公司面临着同样的问题。在许多情况下,很多数据实际是临时数据。这种情况下,我们可以使用Hive的保留 期设置,也可以用bzip格式重新压缩老数据以节省存储空间。尽管可以通 过设置配置项来减少硬盘存储空间,但增加一个高存储密度机器层来管理 旧数据可能会有很大好处。这将使Hadoop存储存档数据的代价变低。然而,对这些数据的访问应该是透明的。目前我们正在为这个数据存档层 的实现而努力工作,把对旧数据处理的方方面面的要求都加进来。

3.Scribe-HDFS 集成

目前,Scribe把数据写入一些NFS文件存档器(filer),然后前面所描述的自 定义复制作业(copier job)就从这里收集和传送数据给HDFS。我们正致力于 开发让Scribe直接把数据写入其他HDFS的方法。这将简化对Scribe的扩展和管理工作。基于对Scribe的正常运行时间的高要求,它的目标写入 HDFS很可能与数据产生HDFS不同(因此它不会产生由于用户作业而出现 负载/停机的问题)

4.改进Hive

对Hive系统的开发目前还是有很多工作要做。我们的开发关注着几个重要 特性,如支持order byhaving从句,更多聚集函数,更多内置函数,日期类型,数据类型等等。同时,我们也在进行大量优化工作,如谓词下推和共同子表达式消除等等。在集成方面,正在开发JDBCODBC驱动程 序用于和OLAPBI工具集成。通过所有这些优化措施,我们希望能够释 放MapReduceHadoop的潜能,把它更进一步推向非工程化社区以及用于 Facebook。该项目相关的更多详细信息,请访问http://hadoop.apache.org/hive/

16.3 Nutch搜索引擎

16.3.1背景介绍

Nutch这个框架用于构建可扩展的网络爬虫(crawler)和搜索引擎。它是 Apache 软件基金会(Apache Software Foundation)的一个项目,Lucene 的一 个子项目,遵循Apache 2.0许可。

我们不会深入地细究网络爬虫的识一一这个案例研究的目的是为了展示 Hadoop是如何实现搜索引擎各种典型、复杂处理任务的。感兴趣的用户可 以在Nutch官方主页(http://lucene.apache.org/butch)找到项目相关的大量信息。可以这样说,为了创建和维护一个搜索引擎,必须要有下面几个子系统。

网页数据库 这个数据库监控网络爬虫要抓取的所有网页和它们的 状态,如上一次访问的时间,它的抓取状态信息,刷新间隔,内容 校验和,等等。用Ndtch的术语来说,这个数据库称为CrawlDb

• 爬取网页清单 网络爬虫定期刷新其Web视图信息,然后下载新 的网页(以前没有抓取的)或刷新它们认为已经过期的网页。这些准 备爬取的候选网页清单,Nutch称为fetchlist

• 原始网页数据 网页内容从远程网站下载,以原始的未解释的格式 在本地存储成字节数组。Nutch称这种数据为page content

• 解析的网页数据 网页内容用适合的解析器进行解析,Nutch为各 种常见格式的文档提供了解析器,如HTML, PDF, Open OfficeMicrosoft Office,RSS 等。

链接图数据库 对于计算基于链接(link)的网页排序(page rank)值来 说,如PageRank,这个数据库是必须的。对于Nutch记录的每一 个URL,它会包含一串指向它的其他的URL值以及这些URL关 联的锚文本(HTML文件的<a href="..">锚文本</3>元素中得 到)。这个数据库称为LinkDb。

全文检索索引 这是一个传统的倒排索引,基于搜集到的所有网页 元数据与抽取到的纯文本内容而建立。它是使用性能优越的 Lucene (http://lucene.apache.org/java)来实现的。

前面我们简单提到Hadoop作为个组件在Nutch系统上得到实现,试图用 它提高Nutch系统的可扩展性以及解决那些由集中式数据处理模型引起的 一系列瓶颈问题。Nutch也是第一个构建在后来称之为Hadoop的架构之上 的公开应用,并且事实证明,把Nutch算法和数据结构移植到Hadoop架构 所需的工作量非常少。这一特点有可能激励大家把Hadoop的开发作为一个子项目,为类似Nutch的其他应用提供可重用的架构。

目前,几乎所有的Nutch工具都通过运行一个或多个MapReduce作业来处 理数据。

16.3.2数据结构

在Nutch系统中维护着几种主要的数据结构,它们都利用Hadoop I/O类和 数据格式来构造。根据数据使用目的和数据创建之后的访问方式,这些数 据可以使用Hadoop的映射(map)文件或顺序(sequence)文件进行保存。 因为数据是MapReduce的作业产生和处理的,而这一过程反过来又会执行几个mapreduce任务,所以它的硬盘存储格式符合常用的Hadoop输出 格式,即 MapFileOutputFormat SequenceFileOutputFormat 两种格式。精确地说,数据被保存成几个不完整的map文件或顺序文件,而文件 数和创建数据作业中的reduce任务数相等。为简单起见,在下面几节的介 绍中,我们将忽略这点。

1. CrawlDb

CrawlDb存储每个URL的当前状态信息,存储文件是map文件,形式是 <url, CrawlDatum>,这里键(key)使用Text类型定义,值(value)使用Nutch特定的CrawlDatum类型(它实现Writable接口)。

为了对这些记录提供快速的随机访问能力(有时对于诊断是有用的,这时用户会想在CrawlDb里面检査单个记录信息),这些数据被存储成map文件而 不是顺序文件。

CrawlDb最初通过Tnjector工具创建,它只是简单地把初始URL列表(种子 列表)的纯文本文件转换成一个map文件,格式如前所述。接着,用爬取和 解析的网页信息来对它做更新。稍后将对此进行详细介绍。

2.LinkDb

这个数据库为Nutch记载的每个URL存储“入链接”(incoming link)信息。 它采用map文件格式<url, Inlinks〉进行存储,其中InlinksURL列 表和锚文本数据。注意,这些信息在网页数据收集阶段并不是立刻可见 的,但是可以获取反向信息,就是这个页面的“出链接”(outlink)信息。链 接信息的互反操作是通过一个MapReduce作业完成的,相关详情可参见后文。

3.分段

在Nutch定义中,“分段”(segment)指的是爬取和解析一组URL。图16-5 展示了分段的创建和处理过程。一个分段(对应文件系统里的一个目录)包含以下几个部分(它们只不过是一 些包含 MapFileOutputFormat SequenceFileOutputFormat 格式数据 的子目录)

content content包食下载页面的原始数据,存储为map文件,格 式是<url, Content〉。为了展示缓存页的视图,这里使用map文 件存储数据从而支持Nutch对文件的快速随机访问。

crawl_generate 它包含将要爬取的URL列表以及从CrawlDb取 到的与这些URL页相关的页面状态信息,对应的顺序文件格式是 <url, CrawlDatum>。这个数据采用顺序文件存储,原因有二: 第一,这些数据是按顺序逐个处理的;第二,我们不能满足map 文件排序键值的不变性要求。我们需要尽量分散属于同一台主机的 URL,以此减少每个目标主机的负载,这就意味着记录信息基本上是随机排列的。

 image.png

图16-5.分段

• crawl_fetch 它包含数据爬取的状态信息,即爬取是否成功,响应码是类别,等等。这个数据以<url, CrawlDatum>格式。存储 在map文件里。

• rawl_parse毎个成功爬取并被解析的页面的出链接列表都保存在这里,因此Nutch通过学习新的URL可以扩展它的爬取前端页 范围。

• parse_data 解析过程中收集的元数据,其中还有页面的出链接(outlinks)列表。这些信息对于后面介绍的建立反向图(入链接, inlink方向图)是相当关键的。

• parsejext页面的纯文本内容适合用Lucene进行索引。这些纯 文本存储成map文件,格式是<111, ParseText>,因此要展示搜 索结果列表的概要信息(摘要)的时候,Nutch可以快速地访问这些文件。

Generator工具(16-5中编号1)运行的时候,CrawlDb里面的数据就会产生一些新的分段,并且开始只包括要爬取的URL列表(crawl_generat下 的子目录)。当这个列表经过几个步骤的处理之后,该分段程序就从处理工具那里收集输出数据并存放在一系列的子目录里面。

例如,content从Fetecher工具(2)接收数据,这个工具根据FetecherURL 列表下载网页原始数据。这个工具也把URL的状态信息存储在里面,因此这些数据后来可以用于更新CrawlDb的页面状态信息。

在分段模块中的其他小模块接收来自Parse分段工具(3)的数据,这个工具读入网页内容,然后基于声明的(或检测到的)MIME类型,选择合适的内容解析器,最后把解析结果存为三部分:和然后这些数据被用于更新CrawlDb(4)和创建LinkDb(5)

这些分段数据一直保留到它们包含的所有数据都过期为止。Nutch采用的是 可配置的最大时间限制的方法,当页面保存的时间段超过这个时间限制 后,这个页面会被强制进行重新获取;这将有助于操作员淘汰所有过期的 分段数据(因为操作员能肯定超过这个时间限制之后,这个分段里面的所有页面都已经被重新爬取)

分段数据用来创建Lucene索引(主要是parse_text和parse_data部分的数据,图中编号6),但是它也提供一种数据存储机制来支持对纯文本数据和原始内容数据的快速检索。当Nutch产生摘要信息的时候(和査询最匹配的 文档文本片段),需要纯文本数据;原始数据提供了展现页面的缓存视图的能力。这两种用例下,或是要求产生摘要信息或是要求展现缓存页面,都是直接从map文件获取数据。实际上,即使是针对大规模数据,直接从 map文件访问数据的效率已经足够满足性能方面的要求。

16.3.3 Nutch系统利用Hadoop进行数据处理的精选实例

下面几节详细描述了几种Nutch工具,主要用于说明Nutch系统如何利用 MapReduce模式来完成具体的数据处理任务。

1.链接逆转

爬取到的HTML页面包含HTML链接,这些链接可能指向它本身(内部链接)或指向其他网页。HTML链接从源网页指向目标网页,参见图16-6

然而,许多计算网页重要性(或质量)的算法需要反向链接的信息,也就是那 些具有链接指向当前页面的网页。进行网页爬取的时候,我们并不能直接 得到这些信息。另外,如果能把入链接的锚文本也用于索引,索引也会受 益,因为这些锚文本可以从语义上丰富当前页面的内容。

 image.png

图16-6.链接逆转 

如前所述,Nutch收集出链接信息,然后用这些数据构造一个LinkDb,用 来存放以入链接和锚文本形式存在的反向链接数裾。

本小节槪述一下LinkDb工具的实现过程,这里只为了展现处理过程的清晰 画面而忽略了很多细节描述(URL规范化处理和过滤)。我们保留了描述 一个典型的实例的内容,解释为什么MapReduce模型能够如此合适地被应 用到搜索引擎所要求的这种关键数据转换处理任务。大规模搜索引擎需要 处理大量的网络图数据(许多页面都有很多出/入链接),Hadoop提供的并行 处理和容错机制使得这样的处理成为可能。另外,使用map-sort-reduce基 本操作可以很容易地表达链接逆转这一过程,我们将在下面进行介绍。

下面的代码片段展示了开发LinkDb工具的作业初始化过程:

DobConf job = new DobConf(configuration);
FilelnputFormat.addInputPath(job, new Path(segmentPath,"parse_data"));
job.setInputFormat(SequenceFileinputFormat.class);
job.setMapperClass(LinkDb.class);
job.setReducerClass(LinkDb.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
job.setOutputFormat(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, newLinkDbPath);

可以看出,这个作业的输入源数据是爬取的URL列表(键)以及相应的 ParseData记录,ParseData包含的其中一项数据是每个页面的出链接信息,它是一个数组。一个出链接记录包含目标URL以及相应的锚文本。

这个作业的输出也是一个URL列表(键),但是值是入链接,它其实只是一 个包含目标URL和锚文本的特殊入链接集合。

出乎意料的是,这些URL—般以纯文本而并非以java.net.URLjava.net.URI实例的形式存储和处理。这么做有几个原因:从下载内容里提 取的URL通常需要做规范化处理(如把主机名变成小写形式,解析相对路 径),或它们是已经坏掉或无效的URL,或它们引用的是不支持的协议。许 多规范化和过滤操作能更好地以文本模式进行表达,它可以跨一个URL的 多个组成部分。此外,考虑到链接分析,我们仍然要处理和计算这些非 URL类型的数据。

让我们进一步査看map()和reduce()的实现。在这个例子中,它们非常简 单以至于这两个函数可以在同一个类实现:

public void map(Text fromUrl, PanseData parseData, 
    OutputCollector<Text, Inlinks> output, Reporter reporter) {
    ...
    Outlink[ ]outlinks = parseData.getOutlinks();
    Inlinks inlinks = new Inlinks(); 
    for (Outlink out : outlinks) {
        inlinks.clear(); // instance reuse to avoid excessive GC 
        String toUrl = out.getToUrl();
        String anchor = out.getAnchor(); 
        inlinks.add(new Inlink(fromUrl, anchor)); 
        output. collect (new Text(tollrl), inlinks);
    }
}

从这个代码段可以看到,对每个出链接Outlink,我们的map()函数产生一 对ctoUrl, Inlinks〉,其中 Inlinks 只包含一个 Inlink,Inlink 是 由fromUrl和它的锚文本组成。链接的指向实现了反转。

接着,这些只有一个元素的Inlinks对象用reduceO方法实现聚集处理:

public void reduce(Text toUrl,Iterator<Inlinks> values,
OutputCollector<Text, Inlinks> output, Reporter reporter) {
        Inlinks result = new Inlinks(); 
        while (values.hasNext() { 
            result.add(values.next());
        }
    output.collect(toUrl, result);
}

从这段代码来看,很明显我们已经得到了想要的数据,即指向toUrl变量的所有fromUr'l列表以及相应的锚文本信息。逆转过程完成。

然后这些数据以MapFileOutputFormat格式保存,形成新的LinkDb数据库。

2.生成 fetchlist

现在来看一个更加复杂的用例。fetchlist产生于CrawlDb的数据(map文 件的格式是<url, crawlDatum>,其中crawlDatum包含URL的状态信 息),它存放准备爬取的URL列表,然后Nutch Fetcher工具处理这个列 表。Fetcher工具本身是一个MapReduce应用程序(后面会介绍)。也就是说 输入数据(被分成W)将由Wmap任务处理,Fetcher工具强制执行这样 的规则即SequenceFilelutFormat格式的数据不能继续切分。前面我们 简单提过,fetchlist是通过一个特殊的方法产生的,因此fetchlist的每部分 数据(随后由每个map任务处理)必须满足特定的要求。

(1)来自同一台主机的所有URL最后要放入同一个分区。这是必须 的,以便Nutch可以轻松实现in-JVM(java虚拟机里)宿主级封锁 来避免目标主机超载。

(2)为了减少发生宿主级的封锁,来自同一台主机的URL应该尽量分 开存放(比如和其他主机的URL充分混合)

(3)任何一个单独主机的URL链接数不能多于;c个,从而使得具有很 多URL的大网站相对于小网站来说,就不会占主导地位(来自小网 站的URL仍然有机会被爬取)

(4)具有髙网页排序值的URL应该优先于低的那些URL

(5)在fetchlist中,URL总数不能超过广

(6)输出数据分区数应该和最优的爬取map任务数目一致。

本例中,需要实现两个MapReduce作业来满足所有这些要求,如图16-7所 示。同样,为了简洁,对下面的列表内容,我们将跳过对这些步骤的某些 细节描述。

步骤1:选择,基于网页排序值排序,受限于每台主机的URL数这一步 骤,Nutch运行一个MapReduce作业来选择一些被认为有资格被爬取的 URL列表,并根据它们的网页排序值(赋给每个页面的浮点数,如 PageRank值)对它们进行排序。输入数据来自CrawlDb,是一个<url, datum〉格式的map文件。这一作业的输出是〈score, <url, datum〉〉格 式的序列化文件,根据排序属性值降序排列。

 image.png

图 16-7.生成 fetchlist

首先,我们来看一下作业的设置:

FileinputFormat.addInputPath(job, crawlDbPath);
job.setInputFormat(SequenceFileinputFormat.class);
job.setMapperClass(Selector.class); 
job.setPartitionerClass(Selector.class); 
job.setReducerClass(Selector.class);
FileOutputFormat.setOutputPath(job, tempDir); 
job.setOutputFormat(SequenceFileOutputFormat.class); 
job.setOutputKeyClass(FloatWritable.class);
job.setOutputKeyComparatorClass(DecreasingFloatCompanator.class); 
job.setOutputValueClass(SelectorEntry.class);

Selector类实现了 3 个函数:mapper, reducer partitioner。最后一个函 数非常有趣:Selector用了一个自定义Partitioner把来自同一主机 的URL分配给同一个reduce任务,这样我们就能满足前面列出的要求 3-5

如果我们不重写默认的partitioner,来自同一主机的URL最终会输出到不同的分区文件里面,这样我们就不能跟踪和限制URL总数,因为 MapReduce任务彼此之间不做任何交流。那么现在的情况是,属于同一台主机的所有URL都会由同一个reduce任务处理,这意味着我们能控制每台主机选择处理的URL数目。

很容易实现一个自定义的partitioner把需要在同一个任务中处理的数据最终放入同一个分区文件。我们首先来看一下Selector类如何实现Partitioner接口(它只包含一个方法):

/** Partition by host. */
public int getPartition(FloatWritable key, Writable value,
int numReduceTasks) {
    return hostPartitioner.getPartition(((SelectorEntry)value).url, key)
    numReduceTasks)
}

该方法返回在0到numReduceTasks – 1之间的一个整数。它简单地用原 始的URL替换了键,URL数据从SelectorEntry获取,这样做就可以把 URL(不是页面排序值)传递给PartitionUrlByHost类对象,并且在这里计算出URL属于的分区文件号:

/** Hash by hostname. */
public int getPartition(Text key, Writable value, int numReduceTasks) {
    String urlString = key.toString();
    URL url = null;
    try {
        url = new URL(urlString);
    } catch (MalformedURLException e) {
        LOG.warn("Malformed URL: '" + urlString
    }
    int hashCode = (url == null ? urlString : url.getHost〇).hashCode();
    // make hosts wind up in different partitions on different runs 
    hashCode A= seed;
    return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
}

从这个代码片断能看到,分区号的计算只针对URL的主机部分的地址,这意味着属于同一个主机的所有URL最终会被放入同一个分区文件。

这个作业的输出数据根据网页排序值降序排列。因为CrawlDB中有很多记录有相同的排序值,所以我们不能用MapFileOutputFormat来存储输出 文件,否则会违反map文件严格基于主键排序的固有规则。

细心的读者会注意到一点,我们没直接使用初始键值,但是我们又想保留这种初始的键值对,这里使用一个SelectorEntry类把初始的键值对传递给下一处理过程。

Selector.reduce()函数计算URL的总数和每个主机对应的最大URL 数,然后简单地摒弃多余的记录。注意,必须对URL总个数的限制进行近似 化处理。我们用总的限制数除以ruduce任务的个数得到当前任务允许处理的URL数目的限制范围。但是我们并不能肯定每个任务都能够得到平均的分配数,实际上在大多数情况下很难实现,因为在各个主机中分布的URL 数目是不均匀的。不管怎么样,对于Nutch来说,这种近似的控制已经足够了。

步驟2 :逆转,基于主机分区,随机排序在前面,我们得到<score, selectorEntry〉格式的序列化文件。现在我们必须产生<url, datum〉格式的序列化文件来满足前面描述的要求126。这个处理步骤的输入数据是步骤1的输出数据。

以下代码片断展示这个作业过程的初始设置:

FileInputFormat.addInputPath(job, tempDir); 
job.setInputFormat(SequenceFilelnputFormat.class); 
job.setMapperClass(SelectorInverseMapper.class);
 job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(SelectorEntry.class); 
job. setPartitionerClass(PartitionllrlByHost .class);
 job.setReducerClass(PartitionReducer.class)j;
job.setNumReduceTasks(numParts);
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(HashComparator.class);

SelectorlnverseMapper类简单地丢弃当前键(排序值),抽取原始的URL 并且把它设置为键,使用SelectorEntr'y类型对象作为值。细心的读者可能质疑:“为什么我们不进一步去再抽取原始的CrawlDatum,把它作为值? ”详情参见后文。

这个作业的最终输出是序列化文件,格式CrawlDatum>,但是 map阶段我们得到的输出是〈Text, SelectorEntry>格式数据。必须指出 的是我们要为map输出指定不同的键/值类,如采用setMapOutputKeyClass(),setMapOutputValueClass() 否则,Hadoop会假定我们用的类与为reduce输出声明的类一样(这样就会导致作业失败)

map阶段的输出使用PartitionUrlByHost类实现数据切分,它把来自同一主机的URL分配到同一个分区。这就满足要求1

一旦数据从map任务进入到reduce任务,Hadoop就会根据输出数据的键值 比较结果对数据排序,可利HashComparator类实现比较。这个比较类 采用简单的哈希机制来组织URL,该机制可保证来自同一主机的URL被尽量放在一起。

为了满足要求6,我们把reduce任务的数量设置成希望的Fetcher map任务 的数量(即前面提到的mjmParts),记住,每个reduce分区稍后将用于创建一PartitionReducer 类负责完成最后一步,即把<url,selectorEntry>数 据格式转换成<url, crawlDatum〉数据格式。使用HashComparator的一个令人吃惊的副作用是几个URL可能哈希成相等的哈希值,并且Hadoop 调用reduceO函数时只处理具有相等键值的第一个键对应的值,其他的值 系统认为是相等的从而被删除。现在能明白当初为什么我们必须在 SelectorEntry类的记录中保留所有的URL值,因为这么做我们就可以在 遍历值的时候抽取所有的URL。下面是这个方法的实现:

public void reduce(Text key, Iterator<SelectorEntry> values, 
OutputCollector<Text, CrawlDatum> output,
    Reporter reporter) throws IOException { 
    // when using HashComparator, we get only one input key in case of hash collisions 
    // so use only URLs extracted from values 
    while (values.hasNext()) {
        SelectorEntry entry = values.next(); 
        output.collect(entry.url, entry.datum);
    }
}

最终,reduce任务的输出在Nutch分区目录crawl_generate子目录下以 SequenceFileOutputFormat格式保存。输出文件满足前面的1-6项全部要求。

3.Fetcher:真正的多线程类MapRunner

Nutch的Fetcher应用程序负责从远程站点下载网页内容。因此,为了尽量 减少爬取fetchlist的时间,对于这个处理过程来说利用每个机会来做并行处 理相当重要。

Fetcher应用中,已经有一级并行机制 输入fetchlist分成多个部分,然后它们被分配给多个map任务。然而,这么做实际上远远不够:顺序下 载来自不同主机(见前一节对HashComparator的介绍)URL相当浪费时 间。正因如此,Fetchermap任务使用多个工作线程同时处理数据下载 任务。

Hadoop使用MapRunner类来实现对输入数据记录的顺序处理。Fetcher 类实现自己的MapRunner类,它使用若干个线程并行处理输入记录。

先从这个作业的设置开始:

job.setSpeculativeExecution(false);
FilelnputFormat.addInputPath(job, "segment/crawl_generate"); 
job.setInputFormat(InputFormat.class); 
job.setMapRunnerClass(Fetcher.class);
FileOutputFormat.setOutputPath(job, segment); 
job.setOutputFormat(FetcherOutputFormat.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NutchWritable.class);

首先,我们关闭推测执行(speculative execution)。我们不能同时让几个map 任务从同一个主机下载内容,因为这可能会打破宿主级的负载限制(如并发 请求数和每秒请求数)。 其次,我们使用自定义的InputFormat类防止Hadoop将输入数据分区进 一步切分为更小的块(分片)导致map任务的数量超过输入分区的数量。这 么做又一次保证我们可以控制宿主级的访问限制。 输出数据用自定义的OutputFormat类对象来存储,通过使用NutchWirtable 类的数据值创建几个输出map文件和顺序文件。NutchWritable类是 GenericWritable的子类,通过事先声明,它能传递几种不同Writable

类的实例对象。

Fetcher类实现MapRunner接口,我们把这个类设置为作业的MapRunner实现。相关代码如下:

public void run(RecordReader<Text, CrawlDatum> input,
    OutputCollector<Text, NutchWritable> output,
    Reporter reporter) throws IOException { 
    int threadCount = getConf().getlnt("fetcher.threads.fetch", 10);
    feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); 
    feeder.start();
    for (int i = 0; i < threadCount; i++) { // spawn threads
        new FetcherThread(getConf()).start();
    }
    do { // wait for threads to exit
        try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {} 
        reportStatus(reporter);
    } while (activeThreads.get() > 0);
}

Fetcher类提前读取许多输入记录数据,使用QueueFeeder线程把输入记录放入为每个主机建立的队列中。然后启动几个FetcherThread实例对象,它们将读取每个主机对应的数据队列这时QueueFeeder继续读取输入数据来填充这些队列。,每个FetcherThread都可以读取任意非空队列中 的数据项。

与此同时,map任务的主线程也在运行等待所有的线程完成它们的作业。 它定期向系统报告状态以保证Hadoop不会认为这个任务已经死掉并把它杀 掉。一旦所有项目处理完,等待过程结束,控制权返回给Hadoop,然后 Hadoop将会结束这个map任务。

4.索引器:使用自定义的OutputFormat类 

这个MapReduce应用程序的输出即不是序列化文件或也不是map文件,而 是Lucene索引。再提一下,因为MapReduce应用可能由几个reduce任务 组成,所以这个应用程序的输出可能包含几个不完整的Lucene索引。

Nutch Indxer工具处理来自CrawlDb, LinkDbNutch分区的信息(爬取状 态信息,解析状态,页面元数据和纯文本数据),因此对这个作业的设置程 序将包括多个输入路径:

FileInputFormat.addInputPath(job, crawlDbPath); 
FileInputFormat.addInputPath(job, linkDbPath);
// add segment data 
FileInputFormat.addlnputPath(job, "segment/crawl_fetch");
FileInputFormat.addInputPath(job,"segment/crawl_fetch");
 FileInputFormat.addlnputPath(job, "segment/crawl_fetch");
FileInputFormat.addInputPath(job,"segment/crawl_fetch");
job.setInputFormat(SequenceFilelnputFormat.class);
job.setMapperClass(Indexer.class); 
job.setReducerClass(Indexer.class);
FileOutputFormat.setOutputPath(job, indexDir); 
job.setOutputFormat(OutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LuceneDocumentWrapper.class);

分散存储在这些输入位置的对应于一个URL的所有记录需要合并起来新建 一个Lucene文档(将被加入索引)。

Indexer类通过Mapper把输入数据(不关心它的源数据以及实现类)简单地 封装到NutchWritable中,这样reduce阶段可能要使用不同的类来接收不同的源数据,但是它仍然能够为map和reduce阶段的输出值声明一个 输出值类(类似于NutchWritable)

Reducer方法遍历同一个键(URL)对应的所有值,解封数据 (fetch CrawlDatumCrawlDb CrawlDatumLinkDb Inlinks, ParseData ParseText),并用这些信息构建一个Lucene文档,后者被 WritableLuceneDocumentWrapper对象封装并被收集。除了所有文本内 容外(纯文本数据或是元数据),这个文档也包含类似PageRank值的信息 (取自CrawlDb)Nutch使用这种数值来设置Lucene文档的权重值。

OutputFormat类的实现是这个工具最有意思的部分:

public static class OutputFormat extends
    FileOutputFormat<WritableComparable, LuceneDocumentWrapper〉 {
public RecordWriter<WritableComparable, LuceneDocumentWrapper> 
    getRecordWriter(final FileSystem fs,DobConf job,
        String name, final Progressable progress) throws IOException { 
    final Path out = new Path(FileOutputFormat.getOutputPath(job), name); 
    final IndexWriter writer = new IndexWriter(out.toString(), 
                    new NutchDocumentAnalyzer(job), true);
return new RecordWriter<WritableComparableJ LuceneDocumentWrapper>() { 
    boolean closed;
 public void write(WritableComparable key, LuceneDocumentWrapper value) 
    throws IOException { 
    // unwrap & index doc
    Document doc = value.get(); 
    writer.addDocument(doc); 
    progress.progress();
}
public void close(final Reporter reporter) throws IOException { 
    // spawn a thread to give progress heartbeats 
    Thread prog = new Thread() { 
        public void run() { 
            while (Iclosed) { 
                try {
                    reporter.setStatus("closing");
                    Thread.sleep(1000);
                } catch (InterruptedException e) { continue; } catch (Throwable e) { return; }
                try {
                    prog.start();
                    // optimize & close index writer.optimize();
                    writer.close(); 
                } finally {
                    closed = true;
                }
                }
            }
    }
}

当请求生成一个RecordWriter类的实例对象时,OutputFormat类通过 打开一个IndexWriter类对象新建一个Lucene索引。然后,针对reduce 方法中收集的每个新的输出记录,它解封LuceneDocumentWrapper对象中的Lucene文档,并把它添加到索引中。 

reduce任务结束的时候,Hadoop会设法关闭RecordWriter对象。本例 中,关闭的过程可能持续较长时间,因为我们想在关闭它之前进行索引优 化工作。在这段时间中,因为该任务已经没有任何进度更新,所以Hadoop 可能会推断其已经被挂起,然后它可能会尝试杀死这个任务。因此,我们 首先启动一个后台线程来传送进度更新消息,然后才开始索引优化工作。 一旦优化完成,我们便停止进度更新线程。现在索引完成创建、优化和停 止更新,它已经准备好被应用于任何捜索程序。

16.3.4总结

这里对Nutch系统的简短综述省略了很多细节,比如错误处理、日志记录、URL过滤和规范化、处理重定向或其他形式的网页“别名”(如镜 像)、剔除重复内容、计算PageRank值等。在这个项目的官方主页和wiki页面 (http://wiki.apache.org/nutch),可以找到这些内容的介绍及其他更多信息。

当前,Nutch正在被很多组织或个人用户使用。然而,运作一个搜索引擎要 求有大量的投资来支持硬件配备,系统集成,自定义开发和索引维护。因此,在大多数情况下,Nutch用于构建商业的垂直或针对领域的搜索引擎。

Nutch正处于积极的开发中,并且该项目紧跟Hadoop的最新版本。因此, 它将继续成为使用Hadoop平台的核心部件,并且具有良好产出的应用实例。

16.4 backspace的日志处理

Rackspace Hosting —直为企业客户提供管理系统,同样,Mailtrust207 秋变成backspace的邮件分部。Rackspace目前在几百台服务器上为100多 万用户和几千家公司提供邮件服务。

16.4.1要求/问题

系统传输的Rackspace用户的邮件产生了相当大的文件路径信息,它们以各种格式的日志文件形式存放,每天大约有150 GB。统计这些数据对系统发 展规划以及了解用户如何使用我们的系统是非常有帮助的,并且这些记录 对系统故障排査也有好处。

假如一封邮件发送失败或用户无法登陆系统,这时非常重要的事是让我们 的客服能找到足够的与问题相关的信息然后开始调试工作。为了能够快速 发现这些信息,我们不能把日志文件就这么放在产生它们的机器上或以其 原始格式存放。相反,我们使用Hadoop来做大量的日志处理工作,而其结 果被Lucene索引之后用来支持客服的査询需求。 曰志

数量级最大的两种日志是由Postfix邮件发送代理和Microsoft Exchange Server产生的。所有通过我们系统的邮件都要在某个地方使用Postfix邮件 代理服务器,并且大部分消息都要经过多个Postfix服务器。Exchange是必 须独立的系统,但是其中有一类profix服务器充当一个附加保护层,它们 使用SMTP协议在各个环境下的托管邮箱之间传递消息。

消息要传递经过很多机器,但是每个服务器只知道邮件的目的地,然后发送邮件到下一个负责的服务器。因此,为了给消息建立完整的历史信息, 我们的日志处理系统需要拥有系统的全局视图。这就是Hadoop对我们帮助 最大的地方:随着我们的系统发展壮大,系统日志量也随之增长。为了使 我们的日志处理逻辑仍然可行,我们必须确保它能扩展。MapReduce就是 一个可以处理这种数据增长的完美系统架构。

16.4.2简史

我们日志处理系统的前几个版本都基于MySQL的,但随着我们拥有越来越 多的日志存储机器,我们达到了一个MySQL服务器能够处理的极限。虽然 该数据库模式已经进行了适度的非规范化处理,使其能够较轻松地进行数 据切片,但目前MySQL对数据分区的支持仍然很脆弱。但我们没有选择在 MySQL上去实现自己的切片和处理方案,而是选择使用Hadoop

16.4.3选择 Hadoop

一旦选择在RDBMS(关系型数据库管理系统)上对数据进行分片存储,你就 丧失了 SQL在数据集分析处理方面的很多优势。Hadoop使我们能够使用针 对小型数据集的同样的算法来轻松地并行处理所有数据。

16.4.4收集和存储

1.日志收集

产生日志的服务器分布于多个数据中心,但目前我们只有一个单独的 Hadoop集群,位于其中一个数据中心(参见图16-8)。为了汇总日志数据并把它们放人集群,我们使用syslog-ng(Unix syslog机制的替代)和一些简单的脚本来控制Hadoop上文件的创建。image.png

图 16-8. Rackspace 的 Hadoop 数据流

在一个数据中心里,syslog-ng用于从logsowrce机器传送日志数据到一 组负载均衡的日志收集机器c//ec;r。在这些收集器上,每种类型的日志 数据被汇成一个单独的数据流,并且用gzip格式进行轻量级的压缩(16-8 步骤A)。远程收集器的数据通过SSH通逭跨数据中心传送到Hadoop集群 所在的“本地收集器”(local collector)(步骤B)

一旦压缩的日志流到达本地收集器,数据就会被写入Hadoop(步骤C)。目 前我们使用简单的Python脚本把输入数据缓存到本地硬盘,并且定期使用 Hadoop命令行把数据放入Hadoop集群。当缓存日志数据量达到Hadoop数 据块大小的倍数或是缓存已经经过了足够长的时间时,脚本程序开始复制 日志缓存数据到Hadoop的各个输入文件夹。 这种从不同数据中心安全地汇总日志数据的方法在Hadoop支持SOCKS之 前就已经有人开发使用了,SOCKS是通过hadoop. rpc.socket.factory. class.default参数和SocksSocketFactory类实现的。通过直接使用远 程收集器对SOCKS的支持和HDFSAPI,我们能够从系统中消除一次磁 盘的写入操作和降低系统的复杂性。我们计划在将来的开发中实现一个使 用这些特性的替代产品。

一旦原始日志被存放到Hadoop上,这些日志就已经准备好交给我们的 MapReduce作业处理了。

2.日志存储

我们的Hadoop集群目前包含15个数据节点,每个节点都使用普通商用 CPU3500 GB的硬盘。我们对在6个月中必须存档处理的文件使用默认的备份因子3,而其他的文件采用2个备份机制。

Hadoop的域名节点namenode使用的硬件和数据节点datanode相同。为了 提供比较髙的可用性,我们使用两个辅助namenode和一个虚拟IP,IP 可以很容易地指向3台机器中任意一台具有HDFS快照的硬盘。这表明在 故障转移情形下,根据辅助namenode的快照时间,我们可能会丢失最多 30分钟的数据。数据丢失的量取决于辅助namenode的数据快照老化时间 设置。虽然这对于我们的日志处理应用来说是可接受的,但是其他Hadoop 应用可能要求通过为namenode镜像提供共享存储的能力来实现无损的故障转移。

16.4.5对日志的MapReduce处理 1.处理 在分布式系统中,唯一标识符令人失望的地方是它们基本不可能是真正唯 一的。所有的电子邮件消息都拥有一个(所谓的)唯一标识符,叫message- id, 它由消息发起的主机产生, 但是一个不良客户端能够轻松发送重复消 息。另外,因为Postfix设计者并不相信message-id可以唯一地标识消息, 所以他们不得不提出设计一个独立的ID(标识)queue-id,并保证它在消息 的本地机器生命周期内唯一。

尽管message-id趋向于成为消息的权威标识,但在Postfix日志中,需要使 用queue-id来査找message-id。看范例16-1第二行(为了适合页面大小,日 志行的格式做了调整),你将发现十六进制字符串1DBD21B48AE,它就是该 行消息的queue-id。因为日志收集的时候(可能每隔几小时进行一次),每个 消息(包括它的message-id)的信息都输出到单独的行,所以我们的解析代码有 必要去保留消息的状态信息。

范例16-1. Postfix日志行

Nov 12 17:36:54 gate8.gate.sat.mlsrvr.com postfix/smtpd[2552]: connect from hostname 
Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/qmgr[9489]: 1DBD21B48AE: 
from=<mapreduce@rackspace.com>, size=5950, nrcpt=l (queue active)
Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtpd[28085]: disconnect from hostname
Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: too many errors after DATA from hostname
Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: disconnect from hostname 
Nov 12 17:36:54 gatel0.gate.sat.mlsrvr.com postfix/smtpdf10311]: connect from hostname 
Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtp[28107]: D42001B48B5: 
to=<mapreduce@rackspace.com>, relay=hostname[ip]j delay=0.32, delays=0.28/0/0/0.04, 
dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 1DBD21B48AE)
Nov 12 17:36:54 gate20.gate.sat.mlsrvr.com postfix/smtpd[27168]: disconnect from hostname 
Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/qmgr[1209]: 645965A0224: removed 
Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/smtp[15928]: 732196384ED: 
to=<m apreduce@rackspace.com>, relay=hostname[ip], conn_use=2, delay=0.69, 
delays=0.04/ 0.44/0.04/0.17, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 02E1544C005)
Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/qmgr[13764]: 732196384ED: removed 
Nov 12 17:36:54 gatel.gate.sat.mlsrvr.com postfix/smtpd[26394]: NOQUEUE: reject: RCP T 
from hostname 554 5.7.1 <mapreduce@rackspace.com>: 
Client host rejected: The sender's mail server is blocked; 
from=<mapreduce@rackspace.com> to=<mapred uce@rackspace.com> 
proto=ESMTP helo=<mapreduce@rackspace.com>

从MapReduce的角度看,日志的每一行是一个单独的键/值对。第一步,我 们需要把所有具有相同queue-id的行放在一起,然后执行reduce过程判断

日志消息数据是否能完整地表示这个queue-id对应的数据。 类似地,一旦我们拥有一个queue-id对应的完整的消息,在第二步,我们 需要根据message-id对消息进行分组。我们把每个完整的queue-id和其对 应的message-id放在一起作为键(key),而所对应的日志行作为值(value)。 在Reduce阶段,我们判断针对某个message-id的所有的queue-id是否都表明 消息已经离开我们的系统。

邮件日志的MapReduce作业的两阶段处理和它们的InputFonnatOutputFormat 形成了一种 SEDA (staged event-driven archilecture,分阶段事件 驱动处理架构)。在SEDA里,一个应用被分解为若干个“阶段”,“阶 段”通过数据队列区分。在Hadoop环境下,队列可能是MapReduce作业 使用的HDFS中的一个输入文件夹或MapReduce作业在MapReduce处 理步骤之间形成的隐性的数据队列。

在图16-9中,各个阶段之间的箭头代表数据队列,虚线箭头表示隐性的 MapReduce数据队列。每个阶段都能通过这些队列发送键值对(SEDA称之 为事件或消息)给其他处理阶段。

阶段1: Map在邮件日志处理作业的第一阶段,Map阶段的输入或是以行 号为键、以对应的日志消息为值的数据,或是以queue-id为键、以对应的日志消息数组作为值的数据。当我们处理来自输入文件数据队列的源曰志文件的时候,产生第一种类型的输入,而第二种类型是一种中间格式,它用来表示一个我们已经试图处理但因为queue-id对应数据不完整而重新进 行数据排队的queue-id的状态信息。

为了能处理这两种格式的输入,我们实现了 Hadoop的InputFonnat类, 它根据FileSplit输入文件的扩展名把工作委托给底层的 SequenceFileRecordReade?类或 LineRecordReader 类处理。这两种输 入格式的文件来自HDFS中不同的输入文件夹(即数据队列)

阶段1: Reduce在这一阶段,Reduce根据queue-id是否拥有足够的曰志 行来判定它是否完整。假如queue-id已经具有完整的消息,便输出以 message-id作为键、以HopWritable对象为值的数据对。否则,queue-id被设置为键,日志行数组重新列队并和下一组原始日志进行Map处理。这个过程将持续到queue-id已经完整或操作超时。

 image.png

图 16-9. MapReduce 处理链

 

HopWritable 对象是 POJO 对象(Plain Old Java Objects,简单 Java 对象),实现了 Hadoop的Writable接口。它从一台单独服务器的视角完整地描述一条消息,包括发送地址和ip,消息发送给其他服 务器的尝试记录,标准的消息头信息。

通过实现OutputFormat类完成输出不同的结果,这一过程对应于 我们的两个InputFormat类输入格式。Hadoop API在版本r0.17.0中添加 MultipleSequenceFileOutputFormat 类之前,我们已经实现 MultipleSequenceFileOutputFormat 类,它们实现同样的目标: Reduce作业根据键的特点存储数据对到不同的文件。

阶段2: Map在邮件日志处理作业的第二个步骤,输入是从上个阶段得到 的数据,它是以message-id为键、以HopWritable类对象数据为值的数据 对。这一步骤并不包含任何逻辑处理,而是使用标准的SequenceFilelnputFormat

类和IdentityMapper类简单地合并来自第一阶段的输入数据。

阶段3: Reduce在最终的reduce步骤,我们想判断针对某个通过系统的 message-id,收集到的所有HopWritable_对象是否能表示它经过系统的整 个消息路径。一条消息路径实际上是一个有向图.(通常是没有循环的,但如 果服务器被错误设置,有可能会包含循环)。在这个图里,点代表服务器, 可标记多个queue-id,服务器之间消息的传送形成了边。对这个应用的实 现,我们使用了 JGraphT图库。

对于输出,我们又一次使用MultiSequenceFileOutputFormat类。如果 reducer判定对于某个message-id的所有queue-id能够创建一条完整的消息 路径,消息就会被序列化,并排队等候SolrOutputFormat类的处理。否 则,消息的HopWritable对象会被列入阶段2: Map阶段的队列,然后与 下一批queue-id —起重新处理。SolrOutputFormat类包含一个嵌入式Apache Solr实例对象模仿的是Solrwiki(/i取最初提出的一种方法一一来产生 本地硬盘的索引信息。关闭OutputFonnat类同时要求把硬盘索引压缩到 输出文件的最终地址。与使用Solr's HTTP接口或直接使用Lucene相比, 这种方法有以下几个优点:

    •我们能实施 Solr 模式

    MapReduce保持幂等性 

    •搜索节点不承担索引负载

我们目前使用默认的HashPartitioner类来决定Reduce任务和特定键之 间的对应关系,就是说键是半随机分布的。在以后的新版系统中,我们将 实现一个新的Partitioned,它通过发送地址(我们最常用的搜索词)来切 分数据。一旦索引以发送者为单位分割,我们就能够使用地址的哈希值来 判断在哪里合并或查询索引,并且我们的搜索API也只需要和相关的节点 进行通信。

2.合并相近词搜索

在一系列的MapReduce阶段完成之后,一系列不同计算机会得知新的索引 信息,进而可以进行索引合并。这些搜索节点不仅具有把索引合并置于本 地磁盘的服务(参见图16.8步骤D),它们还运行Apache TomeateSolr

托管已完成的索引信息。

来自SolrOutputFormat类的每个压缩文件都是一个完整的Lucene索引, Lucene提供IndexWriter.addIndexes()方法支持快速合并多个索引。我 们的MergeAgent服务把每个新索引解压到Lucene RAMDirectoryFSDirectory(根据文件的大小),把它们和本地硬盘索引合并,然后发送一 t<commit/>请求给Solr对象,它负责提供索引服务并使更新后的索引能够 用于査询处理。

切片 Query/Management(查询/管理)API是一个PHP代码层,它主要是处 理输出索引在所有搜索节点上的“切片”(sharding)。我们使用一个简单的 “一致性哈希”(consistent hashing)来判定搜索节点和索引文件之间的对应 关系。目前,索引首先按照创建时间切片,然后再根据其文件名的哈希值 切片,但是我们计划将来用发送地址的哈希值取代文件名的哈希值来进行 切片(参见阶段2: Reduce)。 因为HDFS已经处理了 Lucene索引的备份问题,所以没有必要在Solr对象 中保留多个副本。相反,在故障转移时,相应的搜索节点会被完全删除, 然后由其他节点负责合并索引。

搜索结果使用这个系统,从产生日志到获得搜索结果供客服团队使用, 我们获得了 15分钟的周转时间。

搜索API支持Lucene的全部查询语法,因此我们常常可以看到下面这样的 复杂査询:

sender:"mapreduce@rackspace.com" -recipient:"hadoopgrackspace.com" 
recipient:"grackspace.com" short-status:deferred timestamp:[1228140900 TO 
2145916799]

查询返回的每个结果都是一个完整的序列化消息路径,它表明了各个服务器和接收者是否收到了这个消息。现在我们把这个路径用一个二维图展示 出来(图16-10),用户可以通过扩展自己感兴趣的节点来和这个图互动,但 是在这个数据的可视化方面还有很多需要改进的地方。

3.为分析进行存档

除了为客服提供简短词语的捜索功能之外,我们也对日志数据的分析感兴趣。

 image.png

图16-10.数据树

每晚,我们运行一系列的MapReduce作业,它们的输入是白天产生的索引 数据。我们实现了 SolrlnputFormat类,它可以拖回并解压索引文件然后 用键值对的形式输出毎个文档。使用这种InputFormat类,我们可以遍历 一天产生的所有消息路径,几乎可以回答我们邮件系统的任何问题,包括:

    •每个域的数据(病毒程序,垃圾邮件,连接状况,收件人)

    •最有效的垃圾邮件规则 

    •特定用户产生的负载 

    •消息量反弹的原因 

    •连接的地理分布信息 

    •特定机器之间的平均时间延迟

因为在Hadoop上,我们拥有好几个月的压缩索引信息,所以还能够回顾性地回答夜间日志概要工作忽略的问题。例如,我们近期想确定每个月消息 发送量最大的IP地址,这个任务我们可以通过做一次简单的MapReduce作 业来完成。

16.5 关于 Cascading

Cascading是一个开源的Java库和应用程序编程接口(API),它为 MapReduce提供了一个抽象层。它允许开发者构建出能在Hadoop集群上运 行的复杂的、关键任务类型的数据处理应用。

Cascading项目始于2007年夏天。它的第一个公开版本,即版本0.1,发布 于20081月。版本1.0发布于20091月。从该项目的主页http://www.cascading.org/.可以下载二进制版本、源代码以及一些插件模块。

map和reduce操作提供了强大的原语操作。然而,在创建复杂的、可以被 不同开发者共享的合成性高的代码时,它们的粒度级别似乎不合适。再者,许多开发者发现当他们面对实际问题的时候,很难用MapReduce的模 式来思考解决问题。

为了解决第一个问题,Cascading用简单字段名和一个数据元组模型来替代 MapReduce使用的键和值,而该模型的元组是由值的列表构成的。对第二 个问题,Cascading直接从MapReduce操作分离出来,引入了更髙层次 的抽象元语:Function, Filter, Aggregator Buffer

其他一些可选择的方案在该项目初始版本公开发布的同时也出现了,但 Cascading的设计初衷是对它们进行补充和完善。主要是考虑到大部分可选 的架构都是对系统强加一些前置和后置条件或有其他方面的要求而已。

例如,在其他几种MapReduce工具里,运行应用程序之前,你必须对数据 进行格式化预处理、过滤或把数据导入HDFS。数据准备步骤必须在系统的 程序设计抽象之外完成。相反,Cascading提供方法实现把数据准备和管理 作为系统程序设计抽象的组成部分。

该实例将首先介绍Cascading的主要槪念,最后槪括介绍ShareThis项目 cow)如何在自己的基础框架上使用Cascading

如果希望进一步了解Cascading处理模型,请参见项目主页上的 Cascading用户手册》。

16.5.1字段、元组和管道

MapReduce模型使用键和值的形式把输入数据和Map函数,Map函数和 Reduce函数以及Reduce函数和输出数据联系起来。

但据我们所知,实际的Hadoop应用程序通常会使用多个关联的MapReduce 作业。看一下用MapReduce模型实现的一个典型的字数统计例子。如果需 要根据统计出来的数值进行降序排列,这是一个可能的要求,它将需要启 动另一个MapReduce作业来进行这项工作。

因此,理论上来说,键和值的模式不仅把Map和Reduce绑定到起,它 也把Reduce和下一次的Map绑定了,这样一直进行下去(16-11)。更确 切地说,键/值对源自输入文件,流过MapReduce操作形成的操作链, 并且最后终止到一个输出文件。实现足够多这样链接的MapReduce应用程 序,便能看出一系列定义良好的键/值操作,它们被一遍一遍地执行来修改 键/值数据流的内容。

Cascading系统通过使用具有相应字段名的元组(与关系型数据库中的表名和 列名类似)来替代键/值模式的方法来简化这一处理流程。在处理过程中,由 这些字段和元组组成的流数据在它们通过用户定义的、由管道(pipe)链接在 一起的操作时得以处理(16-12)image.png

图16-11.基于MapReduce的计数和排序

image.png

图16-12.由字段和元祖链接的管道

因此,MapReduce的键和值形式被简化成如下形式。

字段 字段是一个String(字符串)类型的名称(如“first_name”)、 表示位置信息的数值(2-1分别瘥第三和最后一个位置)或是两 者混合使用的集合,它与列名非常像。因此字段用来声明元组里值 的名称和通过名称在元组中选出对应的值。后者就像执行SQLselect 语句。

元组 元组就是由java.lang.Comparable类对象组成的数组。元组与数据库中的行或记录类似。

Map个Reduce操作都被抽象隐藏到一个或多个管道实例之后(图16-13)

image.png

图16-13 管道类型

•  Each Each管道一次只处理一个单独的输入元组。它可以对输入 元组执行一个Function或一个Filter操作(后文马上要介绍)

•  GroupBy GroupBy管道在分组字段上对元组进行分组。该操作类 似于SQLgroup by语句。如果元组的字段名相同,它也能把多 个输入元组数据流合并成一个元组数据流。

• CoGroup CoGroup管道既可以实现元组在相同的字段名上连接, 也可以实现基于相同字段的分组。所有的标准连接类型(内连接一 inner join,外连接一outer join)以及自定义连接都可以用于两个 或多个元组数据流。

• Every Every管道每次只处理元组的一个单独分组的数据,分组数 据可以由GroupByCoGroup管道产生。Every管道可以对分组 数据应用AggregatorBuffer操作。

• SubAssembly SubAssembly管道允许在一个单独的管道内部进行循环嵌套装配组件,或反过来,一个管道也可以被嵌入更加复杂 的流水线处理中。

所有这些管道被开发者连在一起形成“管道装配处理流程”,这里每个装 配线可以有很多输入元组流(源数据,source)和很多输出元组流(目标数据, sink),‘如图 16-14 所示。

image.png

图16-14.简单的管道装配线

从表面上看来,这可能比传统的MapReduce模型更复杂。并且,不可否 认,相较于Map, Reduce, KeyValue,这里涉及的概念更多。但实际 上,我们引入了更多的概念,它们都可以相互协作来提供不同的功能。

例如,如果一个开发者想对reducer的输出值提供“辅助排序”功能,她将 需要实现以下类:MapReduce、一个“合成” Key(嵌套在父Key中的两 个Key)、值、partitioner、一个用于“输出值分组”的comparator和一个 “输出键”的comparator,所有这些概念以各种方式结合协作使用,并且在 后续的应用中几乎不可重用。

在Cascading里,这项工作只对应一行代码:new GroupBy(<previous>, groupingfields〉,〈secondary sorting fields〉),其中 previous 是数据源管道。

16.5.2操作

如前所述,Cascading通过引入一些其他操作而脱离了 MapReduce模式,这 些操作或应用于单个元组,或应用于元组分组(16-15)

 image.png

图16-15 操作原型

• Function Function作用于单个的输入元组,对每个输人,它可 能返回〇或多个输出元组。Function操作供Each类型的管道 使用。

• Filter Filter是一种特殊的函数,它的返回值是boolean(布尔) 值,用于指示是否把当前的元组从元组流中删除。虽然定义一个 Function操作也能实现这一目的,但是Filter是为实现这一目的 而优化过的操作,并且很多过滤器能够通过逻辑运算符(AndOrXorNot)分组,可以快速创建更复杂的过滤操作。

• Aggregator Aggregator对一组元组执行某种操作,这呰分组元 组是通过一组共同字段分组得到的。比如,字段“last-name”值相 同的元组。常见的Aggregator方法是Sum(求和)Count(计数), Average(均值)Max(最大)Min(最小)

Buffer BufferAggregator操作类似,不同的是,它被优化

用来充当一个“滑动窗口”在一次分组中扫描所有的元组。当开发 者需要有效地为一组排序的元组插入遗漏的值时,或计算动态均值 的时候,这个操作非常有用。通常,处理元组分组数据的时候, Aggregator也是一个可选的操作,因为很多Aggregator能够有 效地链接起来工作,但有时,Buffer才是处理这种作业的最佳工具。

管道装配线创建的时候,这些操作便绑定到各管道(图16-16)

 image.png

图16-16.操作装配线

Each和Every类型的管道提供了 种简单的元组选择机制,它们可以选择 一些或所有的输入元组,然后把这些选择的数据传送给它的子操作。并且 我们有一个简单的机制把这些操作的结果和原来的输人元组进行合并,然 后产生输出元组。这里并不详细说明这个过程,它使得每个操作只关心参数指定的元组值和字段,而不是当前输入元组的整个字段集。其次,操作 在不同应用程序之间可重用,这点和Jave方法重用的方式相同。

例如,在 Java 中,声明一个方法 concatenatefString first, String second),比直接定义concatenate(Person person)更抽象。第二个方法的 定义,concatenate()函数必须“了解” Person对象;而第一个方法的定义 并不清楚数据来自哪里。Cascading操作展现了同样的抽象能力。

16.5.3 Tap、Scheme Flow

在前面的几个图中,我们多次提到源数据(source)和目标数据(sink)。在 Cascading系统中,所有的数据都是读自或写入Tap类实例,但是它们都是 通过Scheme对象被转换成最取自元组实例对象。

• Tap Tap类负责如何访问数据以及从哪个位置访问数据。例如, 判断数据是存于HDFS还是存于本地?在Amazon S3中,还是跨 HTTP协议进行访问?

• Scheme Scheme类负责读取原始数据并把它们转换成元组格式/ 或把元组数据写入原始数据格式文件,这里的原始数据可以是文本 行、Hadoop二进制的序列化文件或是一些专用格式数据。

注意,Tap类对象不是管道装配线的一部分,因此它们不是Pipe类型。

但是当Tap对象在集群上变得可执行的时候,它们就和管道装配线关联到 一起。当一个管道装配线与必要的源和目标数据Tap实例关联一起后,我 们就得到一个Flow对象。Flow对象是在管道处理流程与指定数量的源及 目标数据Tap关联时创建的,而Tap对象的功能是输出或获取管道装配线 期望的字段名。就是说,如果Tap对象输出一个具有字段名“line”的元组 (通过读取HDFS上的文件数据),那么这个管道装配线头部必须也希望输入 时字段名是“line”的数据值。否则,连接管道处理流程和Tap的处理程序 会立刻失败并报错。

因此,管道装配线实际上就是数据处理定义,并且它们本身不是“可执 行”的。在它们可以在集群上运行之前,必须^接到源和目标Tap对象。 这种把Tap和管道装配线分开处理的特性也是Cascading系统功能强大的原因之一。

如果把管道装配线想象成Java类,那么Flow就像;lava对象实例(参见图 16-17)。也就是说,在同一个应用程序里面,同样的管道装配线可以被实例 化很多次从而形成新的Flow,不用担心它们之间会有任何干扰。如此一 来,管道装配线就可以像标准Java库一样创建和共享。

16.5.4 Cascading 实战

现在我们知道Cascading是什么,清楚地了解它是如何工作的,但是用 Cascading写的应用程序是什么样子呢?我们来看看范例16-2

Scheme sourceScheme = 
    new TextLine(new Fields("line")); ①
Tap source =
    new Hfs(sourceScheme, inputPath); ②
Scheme sinkScheme = new TextLine(); ③
Tap sink =
    new Hfs(sinkScheme, outputPath, SinkMode.REPLACE); ④
Pipe assembly = new Pipe("wordcount"); ⑤
String regexString = "(?< !  \\pL) ( ? = \\pL) [^ ]*( ?< = \\pL) (?!  \\pL)";
Function regex = new RegexGenerator(new Fields("word"), regexString); 
assembly =
    new Each(assembly, new Fields("line"), regex); ⑥
assembly =
    new GroupBy(assembly, new Fields("word")); ⑦
Aggregator count = new Count(new Fields("count")); 
assembly = new Every(assemblyJ count); ⑧
assembly =
new 6roupBy(assembly, new Fields("count"),new Fields("word")); ⑨
FlowConnector flowConnector = 
    new FlowConnector();
Flow flow =
    flowConnector.connect("word-count", source, sink, assembly); ⑩
flow.complete();⑩

①创建一个新的Scheme对象读取简单的文本文件,为每一行名为 "line"字段(Fields实例声明)输出一个新的Tuple对象。

②创建源和目标Tap实例分别指向输入文件和

③创建一个新的Scheme对象用于写简单文本文件,并且它期望输出的是 一个具有任意多个字段/值的Tuple对象。假如有多个值要输出,这些 值在输出文件里将以制表符分隔。

④输出目录。目标Tap对象输出数据时将覆盖目录下现有的文件。

⑤构建管道装配线的头,并把它命名为“wordcount”。这个名称用于绑 定源及目标数据对象到这个管道处理流程。多个头或尾要求必须有自己 唯一的名称。

⑥构建Each类型管道,它将解析line字段里的每个词,把解析结果放 入一个新的Tuple对象。

⑦构建GroupBy管道,它将创建一个新的Tuple组,实现基于word

段的分组。

⑧构建一个具有Aggregator操作的Every类型管道,它将对基于不同 词的分组Tuple对象分別进行字数统计。统计结果存于count的字段里。

⑨构建GroupBy类型管道,它将根据数值对count字段进行分组,形成 新的Tuple分组,然后对word字段值进行二级排序。结果是一组基于 count字段值升序排列的count字段值和word字段值列表。

⑩用Flow对象把管道装配线和数据源及目标联系起来,然后

⑩ 在集群上执行这个Flow

在这个例子里,我们统计输人文件中的不同单词的数量,并根据它们的自 然序(升序)进行排序。假如有些词的统计值相同,这些词就根据它们的自然 顺序(字母序)排序。

这个例子有一个明显的问题,即有些词可能会有大写字母,例如“the”和 “The”,当它出现在句首的时候就是“The”。因此我们可以插入一个新 的操作来强制所有单词都转换为小写形式,但是我们意识到那些需要从文 档中解析词语的所有的应用将来都需要做同样的操作,因此我们决定创建 一个可重用的管道SubAssembly,如同我们在传统应用程序中创建一个子 程序一样(参见范例16-3)

范例 16-3创建一个 SubAssembly

public class ParseWordsAssembly extends SubAssembly ①
{
   public ParseWordsAssembly(Pipe previous)
   {
      String negexString = "( ?< !\\pL) (? = \\pL) [^ ]*( ?< =\\pL) (? !\\pL)";
      Function regex = new RegexGenerator(new Fields("word"), regexString); previous = new Each(previous, new Fields("line"), regex);
      String exprString = "word.toLowerCase()";
      Function expression =
      new ExpressionFunction(new Fields("word"), exprString,String.class); ②
      previous = new Each(previous, new Fields("word"), expression); setTails(previous); ③
   }
}

①声明SubAssembly是子类,它本身是一种管道类型。

②创建一个Java的表达式函数,它将调用toLowerCase()方法来处理 “word”字段对应的字符串类型值。我们要传入一个Java类型来指明 “word”字段处理之后所期望的类型,这里是String类型。后台用 Janino(h(p.7/www.yflm.«o./ie^〇来编译。

③我们必须告知SubAssembly的父类这个管道子组件在哪里结束。

首先,我们新建一个SubAssembly类,它管理我们的“解析词”管道装配 线。因为这是一个Java类,所以可用于其他任何应用程序,当然这要求它 们处理的数据中有word字段(范例16-4)。注意,也有办法可以使这个函数 更加通用,这些方法在《Cascading用户手册》中都有介绍。

范例16-4.用一个SubAssembly扩展单词计数和排序

Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new Hfs(sourceSchemGj inputPath);
Scheme sinkScheme = new TextLine(new Fields("word", "count"));
Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);
Pipe assembly = new Pipe("wordcount");
assembly =
new ParseWordsAssembly(assembly); ①
assembly = new GroupBy(assembly, new Fields("word"));
Aggregator count = new Count(new Fields("count")); 
assembly = new Every(assembly> count);
assembly = new GroupBy(assembly, new Fields("count"), new Fields("word")); 
FlowConnector flowConnector = new FlowConnector();
Flow flow = flowConnector.connectf"word-count", source, sink, assembly);
flow.complete();

①我们用ParseWordsAssetnbly管道组件替换了之前例子中的Each 类型管道。最后,我们只是用新的SubAssembly类型子管道替代 了前面Every类型管道和单词解析函数。有必要的话,还可以继 续进行更深入的嵌套处理。

16.5.5灵活性

回顾一下,让我们来看看这个新的模型给我们带来了什么好处或消除了哪些不足。

可以看出,我们不必再用MapReduce作业模式来考虑问题,或考虑Mapper Reducer接口的实现问题,后续的MapReduce作业和前面的MapReduce 作业如何绑定或链接。在运行的时候,Cascading “规划器”(planner)会算 出最优的方法把管道装配线切分成MapReduce作业,并管理作业之间的链 接(16-18)

 image.png

图16-18.怎么把Flow变成链式MapReduce作业

因此,开发者可以以任何粒度来构造自己的应用程序。它们可以一开始就 只是一个很小的做日志文件过滤处理的应用程’序,并且以后可以根据需要 不断增添新的功能。

Cascading是一个API而不是类似SQL的字符串句法,因此它更灵活。首 先,开发者能用他们熟悉的语言创建特定领域语言(domain-specific language, DSL),Groovy, JRuby, JythonScala 等(示例参见项目网 站)。其次,开发者能对Cascading不同的部分进行扩展,例如允许对自定 义ThriftJSON对象进行读写,并且允许它们以元组数据流的形式传输。

16.5.6 Hadoop 和 Cascading ShareThis 的应用

ShareThis是一个方便用户共享在线内容的共享网络。通过单击网页上或浏 览器插件上的一个按钮,ShareThis允许用户无缝地访问他们的任何在线联 系人及在线网络,并且允许他们通过电子邮件、在线聊天、FacebookDigg和手机短信等方式共享它们的内容,而这一过程的执行甚至不要求他

们离开当前的访问网页。发布者能通过部署ShareThis按钮来挖掘服务的通 用共享能力,以此推动网络流量,刺激传播活动,追踪在线内容的共享。 通过减少网页杂乱的内容及提供跨社交网络、联盟群体和社区的实时内容 分布功能,ShareThis也简化了社交媒体服务。

ShareThis用户通过在线窗口共享网页和信息时,一个连续的事件数据流就 进入ShareThis网络。这些事件首先要过滤和处理,然后传送给各种后台系 统,包括 AsterData, Hypertable Katta

这些事件信息的数据量能达到很大数量级,导致传统的系统无法处理。这 种数据的“污染”(dirty)也很严重,主要归咎于流氓软件系统的“注入式攻 击”、网页缺陷或错误窗口。因此,ShareThis选择为后台系统部署Hadoop 作为预处理和管理协调前台。他们也选择使用Amazon Web服务(基于弹性 云计算平台EC2)来托管其服务器,并且使用Amazon S3(简单服务存储服务) 提供长期的存储功能,目的是利用其灵活的MapReduce模式(Elastic MapReduce, EMR)

这里着重介绍“日志处理管道”(图16-19)。日志处理管道只是简单地从S3 文件夹(bucket)里读取数据,进行处理(稍后介绍),然后把结果存入另个 文件夹。简单消息队列服务(Simple Queue Service, SQS)用于协调各种事件 的处理,用它来标记数据处理执行程序的开始和完成状态。下行数据流是 一些其他的处理程序,它们用于拖动数据装载AsterData数据仓库,如从 Hypertable系统获取URL列表作为网络爬取工具的下载源,或把下载的网 页推入Katta系统来创建Lucene索引。注意,Hadoop系统是ShareThis整 个架构的中心组件。它用于协调架构组件之间的数据处理和数据移动工作。

有了 Hadoop系统作为前端处理系统,在所有事件日志文件被加载到 AsterData集群或被其他组件使用之前,它会基于一系列规则对数据进行解 析、过滤、清理和组织。AsterData是一个集群化数据仓库系统,它能支持 大数据存储,并允许使用标准的SQL语法发出复杂的即时奄询请求。 ShareThis选择Hadoop集群来进行数据清理和准备工作,然后它把数据加 载到AsterData集群实现即席分析和报表生成。尽管使用AsterData也有可 能达到我们的目的,但是在处理流程的第一阶段使用Hadoop系统来降低主 数据仓库的负载具有重要意义。

image.png

为了简化开发过程,制定不同架构组件间的数据协调规则以及为这些组件 提供面向开发者的接口,Cascading被选作主要的数据处理API。这显示出 它和传统Hadoop用例的差别,它们主要是用Hadoop来实现对存储数据的査 询处理。

相反,Cascading和Hadoop的结合使用为端到端的完整解决方案提供了一 个更好、更简单的结构,因此对用户来说更有价值。

对于开发者来说,Cascading的学习过程很简单,它从一个简单的文本解析 单元测试(通过创建cascading.ClusterTestCase类的子类)开始,然后把 这个单元程序放入有更多规则要求的处理层,并且在整个过程中,与系统 维护相关的应用逻辑组织不变。Cascading用以下几种方法帮助保持这种逻 辑组织的不变性。首先,独立的操作(Function, Filter)都可以进行独 立编程和测试。其次,应用程序被分成不同的处理阶段:一个阶段是解 析,一个阶段是根据规则要求进行处理,最后个阶段是封装/整理数据, 所有这些处理都是通过前述的SubAssembly基础类实现的。

ShareThis的日志文件数据看起来非常像Apache日志文件,它们有日期/时 间戳、共享URL、引用页URL和一些元数据。为了让分析下行数据流使用 这些数据,这些URL必须先解压(解析查询字符串数据和域名等)。因此需 要创建一个高层的SubAssembly对象来封装解析工作,并且,如果字段解 析很复杂,SubAssembly子对象就可被嵌入来解析一些特定字段。

我们使用同样的方式来应用处理规则。当每个Tuple对象通过规则处理 SubAssembly类对象的时候,如果有任何规则被触发,该对象就会被标记 上标签“坏”(bad)。具有“坏”字标签的Tuple对象,会被附上被标记的 原因用于后来的审查工作。

最后,创建一个切分SubAssembly类对象来做两件事。第一,用于对元组 数据流进行分流处理,一个数据流针对标记“好”(good)的数据,另一个针 对标记“坏”的数据。第二,切分器把数据切分成片,如以小时为单位。 为了实现这一动作,只需要两个操作:第一个是根据已有数据流的 timestamp(时间戳)创建区间段;第二个是使用interval(区间)good/bad元 数据来创建目录路径(例如,“〇5/good/”中“05”是早上5点,“good” 是通过所有规则验证的数据)。这个路径然后被Cascading TemplateTap使 用,这是一个特殊的Tap类型,它可以根据Tle对象值把元组数据流动 态输出到不同的路径位置。本例中,“path”值被TemplateTap用来创建最 终输出路径。

开发者也创建了第四个SubAssembly类型对象——它用于在单元测试时应用 Cascading Assertion(断言)类。这些断言用来复査规则组件和解析 Sub Assembly做的工作。

在范例16-5的单元测试中,我们看到partitioner没有被检测,但是它被放 入另外一个这里没有展示的集成测试中去了。

范例16-5.单元测试

public void testLogParsing() throws IOException
    Hfs source = new Hfs(new TextLine(new Fields("line")), sampleData);
    Hfs sink =
        new Hfs(new TextLine(), ou1>putPath + "/parser", SinkMode.REPLACE);
    Pipe pipe = new Pipe("parser");
    // split "line" on tabs
    pipe = new Each(pipe, new Fields("line"), new RegexSplitter("\t")); 
    pipe = new LogParser(pipe); pipe = new LogRules(pipe);
    // testing only assertions
    pipe = new ParserAssertions(pipe);
    Flow flow = new FlowConnector().connect(source, sink, pipe);
    // verify there are 98 tuples, 2 fields, and matches the regex pattern 
    // for TextLine schemes the tuples are {     "offset", "line } 
    validateLength(flow, 98, 2, Pattern.compile(,,A[0-9] + (\\t[A\\t]*){19}$M));
}

针对集成和部署,许多Cascading内置属性使得该系统和外部系统更容易集 成,并允许进行更大规模的处理工作。

在生产环境中运行时,所有的SubAssembly对象都连接起来并规划到一个 Flow对象里,但是除了源和目标Tap对象之外,我们也设计了错误捕捉 (trap)Tap对象(16-20)。通常,当远程的Mappej•或Reducer任务抛出一 个异常的时候,Flow对象就会失败并杀死它管理岛所有MapReduce作业。 当一个Flow有错误捕捉类的时候,所有的异常都会被捕捉并且造成异常的 数据信息会被保存到当前这个捕捉程序对应的Tap对象里。然后可以在不 终止当前Flow的情况下,继续处理下一个Tuple对象。有时你想让程序 在出现错误的时候就停止,但在这里,ShareThis开发者知道在系统运行的 时候,他们能同时回览并査看“失败”的数据,然后更新其单元测试。丢 失几个小时的处理时间比丢失几个坏记录数据更糟糕。

 image.png

图 16-20. ShareThis 日志处理 Flow

使用Cascading的事件监听器,Amazon SQS可被集成进来。当一个Flow 结束的时候,系统就发送一条消息来通知其他系统它们已经可以从 Amazon S3上获取准备好的数据了。当Flow处理失败的时候,发送不同 的消息向其他的进程报警。

其余的位于不同的独立集群的下行数据流进程将在中断的日志处理管道位

置处开始处理。现在日志处理管道一天运行-次,而我们没有必要让loo 个节点的集群空转23个小时。因此我们是每24小时执行一次终止和启用 操作。

将来,在小型的集群上根据业务需求,增加运行间歇期到每6个小时一次 或1小时•次都是非常简单的。其他的集群系统可以独立地根据各自负责 的业务需要以不同的间隔期启用或关闭。例如,网络数据爬取组件(使用 Bixo,它是EMIShareThis开发的基于Cascading的网络数据爬取工具) 可以在一个小型集群上与Hypertable集群协作连续运转。这种随需应变的 模型在Hadoop上运行良好,每个集群都能把工作负载调节到它期望处理的 量级。

16.5.7总结

对于处理和协调跨不同架构组件的数据移动问题,Hadoop是一个非常强大 的平台。它唯一的缺点是它的主要计算模型是MapReduce

Cascading的目标是(不用按照MapRedue模式来考虑设计方案)帮助开发者 通过使用一个逻辑定义良好的API来快速而简单地建立强大的应用程序, 而同时又把提高数据分布、复制、分布式处理管理的性能和程序活性的工 作都留给了 Hadoop

访问该项目的网站(如叹可以读到更多关于Cascading 的信息,也可以加入在线社区和下载范例应用程序。

16.6Apache Hadoop上万亿数量级排序

这篇文章来自 印它写于 2008 5 月。每年,Jim Gray和他的后继者定义一系列的benchmark (基准测试程序) 用以发现最快的排序程序。几年来,万亿字节排序(TB Sort)和其他排序的 benchmarks 及其获胜者在 http://sortbenchmark.org/YahooHadoop.pdf网站列出。2009 4月,Arun Murthy和我在每分钟排序(目标是在一分钟之内排序尽可能多的 数据)的竞争中获胜,我们在1406Hadoop节点上于59秒之内完成了对 500 GB数据的排序工作。在同一个集群上我们也对1TB的数据进行了排序,花了 62秒的时间。2009年,我们使用的集群和下面列出的硬件配置相 似,不同的是网络较好,与前一年相比,我们机架间的超载比由5: 1变成 了 2 : 1。我们对节点之间产生的中间数据也采用了 LZO压缩方法。我们在 3658个节点上在975分钟之内完成了对1PB数据的排序,平均速度达 到每分钟排序1.03 TB数据。

Apache Hadoop是一个开源软件框架,它显著简化了分布式数据密集型应用 程序的编写。它提供了一个基于Google File System l;(Google文件系统)的分 布式文件系统,它还提供了对MapReduce模型®的实现,用于管理分布式 计算。因为MapReduce模型的主要原语操作是分布式排序,所以大部分自 定义代码能实现期望的功能。 

我写了三个Hadoop应用来执行万亿字节数据排序。

(1)TeraGen是一个用于产生数据的MapReduce程序。

(2)TeraSort对输入数据取样,并用MapReduce模型对数据进行全序

排列。

(3)TeraValidate是一个用于验证输出有序性的MapReduce程序。

整个程序是大约1000行的Java代码,它将被放在Hadoop范例目录下。

TeraGen产生输出数据,它的代码和C语言版本是完全样的,包括换行 和特定的键值定义。它根据预期的任务数把数据分成期望的数据行数,并 把数据行段分配给每个map作业。map作业让随机数产生器产生第一行数据的正确值,然后产生其他行的数据。最终运行时,我为TeraGen配置使 用1800个任务在HDFS上产生总数达100亿行的数据,HDFS上每个存储 文件块的大小是512 MB

除了一个自定义的数据切分程序之外,TepaSort是个标准的MapReduce 排序程序,这个数据切分程序使用了 一个已经排序的个取样的键值, 用这些排序好的键值来定义每个reduce作业键的范围。所以,所有的键如

sample[i-1] <=key<都被发送到 reduce /。这就保证,reduce / 的输

出都比reduce/+1的值小。为了sample[i]加速数据切分过程,这个数据切分程序构建 了基干键的前两个字节的两层字典树(two-level trie)索引,它可以帮助快速 地对取样的键建立索引。TeraSort在作业被提交之前通过对输入数据取样 并把样本键列表写入HDFS来获得样本键。

我写了一个输入和输出格式供三个应用程序共同使用,它负责以正确格式 读写文本文件。因为竞赛没有要求输出文件需要在多个节点上备份,所以 我们为reduce作业输出设置的副本数是1而不是默认的3。为该作业配置 使用了 1800 map1800 reduce,并设置 io.sort.mbio.sort.factor 参数、fs.inmemory.size.mb参数来保证足够大的任务堆内存,这可以确 保直到map结束都不用把中间数据写入硬盘。取样程序使用了 10万个键来 生成每个reduce的处理范围,然而从图16-21可以看出,受益于更多的取 样,各个reduce作业的数据分布愈加完美。在图16-22中,可以看到在作 业执行期间运行的任务的分布情况。

image.png

图16-21. reduce作业输出数据的大小和作业结束时间的分布图

TeraValidate确保输出是全局排序数据。它为输出文件目录中的每个文件 新建一个map作业,每个map作业确保每个键值都不大于前一个。这个 map作业也产生每个文件的第一个和最后一个键对应的记录,reduce作业 确保文件i的第一个键值大于文件卜1的最后一个键。如果键值没有排序好,错误信息作为reduce作业的输出而汇报出来。

 image.png

图16-22 整个运行时间内每个阶段的任务数分布

我使用的集群如下:

    910个节点

    •每个节点有22.0Ghz的双核Xeon芯片

    •每个节点有4SATA硬盘 

    •每个节点有8 GBRAM

    •每个节点有1千兆比特的以太网带宽 

    •每个机架上安放40个节点

    •从每个机架到中心内核有8千兆比特的以太网上行带宽

    Red Hat Enterprise Linux Server 5.1 版(内核 2.6.18)

    Sun Java JDK 1.6.0—05-bl3

排序过程只花了 209秒(3.48分钟)。我运行的是Hadoop trunk(pre-0.18.0), 使用的是为 HADOOP-3443 HADOOP-3446写的补 丁,这样可以删除对磁盘的中间结果写入。尽管这910个节点主要供我使 用,但我要与另外一个运行2000个节点的集群共享网络核心模块,因此运 行时间会因为其他节点执行的操作而变化。

16.7用PigWukong探索10亿数量级边的网络图 

超大规模的网络是非常有魅力的。它们所能建模的东西具有普遍的意义: 假如你有一堆东西(我们称它们是节点,node),它们是相关联的(边,edge) 并且假如节点和边(node/edge元数据)能用故事关联起来,就能得到一个网 络图。

我以前做过一个称为Infochimps的项目,这是一个发现、共享或销售数据 集的全球性网站。在Infochimps网站,我们有很多技术可以应用到任何加 入该项目数据集的有趣的网络图上。我们主要使用Pig(参见第H)Wukong (A.这是我们用 Rubby 语言开发的处理 Hadoop流数据的工具箱。这些技术可以让我们用简单的脚本语言(如下面给 出的例子一样)一基本上所有这些脚本都不超过一页一来处理terabyte(千兆,TB)量级的图数据。在infochimps.org上査询network得到以下几个数 据集。

• 社交网络,如TwitterFacebook。我们客观地把人模型化为节 点,把关系(@wr/7/p和你ow_e_w/n7e是朋友)或行为提到了(ghrfoop)模型化为边。用户已发送的消息数和所有这些消 息的词集便是节点元数据的各个重要信息片段。

• 链接的文档集(如维基百科或整个网络)。每个页面是一个节点(把 标题、浏览次数和网页类别作为节点元数据)。每个超链接是一条 边,用户从一个页面点击进入另一个网页的频率作为边的元数据。

C.elegans roundworm研究项目中的神经元(节点)和突角虫(边)的联系。 

•髙速公路地图,出口是节点,高速公路的分段是边。Open Street Map项目的数据集是具有全球性覆盖的地点名称(节点元数据),街 道编号范围(边的元数据)及更多其他信息。

•或是一些不易发现的隐秘的图,假如你能用一个有趣的系统来做分析的话,这个网络图就会很清晰。浏览几百万条Twitter消息,为同一条消息中出现的每对非键盘字符产生一条边。简单地通过观察 “oftenwhen humans use 最,they also use 近”这句话,你就能重建人类语言地图(参见图16-23)image.png

图16-23. Twitter语言地图

这些有机相连的网络图让人惊讶的地方是,如果有足够的数据,一系列功能强大的工具软件通常就能够使用这种网络结构来揭示出更深层次的知识。例如,我们可以使用同一种算法的各种变体来做下面各个任务。

• 从维基百科链接文档集中找出最重要的网页。Google使用这个算 法的更加精良的改进版来确定排序靠前的搜索结果。

• 确定Twitter社区图中的名人和专家。如果用户的跟随者人数比用户trstrank(—种排序值)推算出的数高出很多,意味着他们往往就是 垃圾制造者。

•通过收集5年以上的几百万个匿名考试分数来预测某个学校在学生 教育问题上的影响力。

16.7.1社区判断

在Infochimps数据集中,最有趣的网络是大规模爬取Twitter社区数据,分析得到的网络图。它有多达9千万个节点和20亿条边,这个图对于帮助我们理解人们的谈话和掌握他们之间的关系来说是一个非常了不起的工具。下面使用“谈论InfochimpsHadoop的用户"构成的子图,我们利用三 种方法来构建用户社区图:

    • 和他们一起讨论的用户(@reply)是谁?

    • 他们是否与参与问题讨论的人互换了意见(对称链接)

    • 在用户社区里,用户彼此关注度有多少(聚类因子)

16.7.2每个人都在和我说话:Twitter回复关系图

Twitter允许你回复其他人的消息,从而参与谈论。因为这是一种明显的公众行为,所以回复就代表一种强的“社会性标记”(social token):它表明对 别人谈论事情感兴趣,并表明这种兴趣值得转播。 处理过程的第一步是用Wukong完成的,Wukong是面向HadoopRuby程序库。它能让我们编写出处理吁13级数据流的小巧的程序。以下代码片断取 自一个用于表示twitter消息(tweet)的类:

class Tweet < Struct.new(:tweet_id, :screen_name, :created_at,
:reply_tweet_id, :reply_screen_name, :text) 
def initialize(raw_tweet)
# ... gory details of parsing raw tweet omitted 
end

   

 # Tweet is a reply if there's something in the reply_tweet_id slot 
    def is_reply?
        not reply_tweet_id.blank?
    true 
end

Twitter的Stream API可以让大家轻松得到千兆字节的消息。它们是原始 的JSON格式数据:

{"text":"Dust finished the final draft for Hadoop: the Definitive Guide!", "screen_name": "tom_e_white"/,reply_screen_name" inull, "id": 3239897342, 
"reply_tweet_id":null,...}
{"text":"@tom_e_white Can't wait to get a copy!",
••screen_name": "mrf lip",reply_screen_name": "tom_e_white", "id": 3239873453, 
reply_tweet_idM:3239897342,...}
{"text":"@josephkelly great job on the #InfoC^imps API.
Remind me to tell you about the time a baboon broke into our house.", "screen_name":"wattsteve"J,,reply_screen_name":"josephkelly"> "id":16434069252,...}
{"text":@mza Re: http://j.mp/atbroxmr Check out @names_Rubino's
 http://bit.ly/clusterfork ? Lots of good hadoop refs there too",
"screen_name":"mrflip",Veply.screen.name": "id":7809927173,...}
{"text":"@tlipcon divide lots of data into little parts. Magic software gnomes fix up the parts, elves then assemble those into whole things #hadoop", "screen_name": "nealrichter","reply_screen_name": "tlipcon","id":4491069515,. . . }

replyfScreer^name和reply_tweet_id让你能跟踪整个交流过程(否则正如你看到的,这两个值被设置为null)。我们可以找到每个回复,并且输 出相应的用户ID,然后形成一条边:

class ReplyGraphMapper < LineStreamer 
    def process(raw_tweet) tweet = 
        Tweet.new(raw_tweet) 
        if tweet.is_reply?
            emit [tweet.screen_name, tweet.reply_screen_name]
        end 
    end 
end

mapper从LineStreamer类派生出来,该类把每一行作为一个单独记录提 供给process方法。我们只需定义process方法,其余的工作由Wukong和Hadoop完成。这个案例里,我们使用原始JSON格式的记录来创建 tweet对象。遇到用户A回复用户B的地方,就输出一条边,格式为用制表 符分割的AB。原始输出数据如下所示:

% reply_graph_mapper --run raw一tweets.json a_replies_b.tsv
mrflip tom_e_white
wattsteve josephkelly
mrflip mza
nealrichter tlipcon

这条边读作“a回复b”,理解为一条有向“出”边:@wattsteve@josephkelly转移了社交中心。

1 .边对(edge)与邻接表(list)

上述网络是采用“边对”(edge pair)方式来表示网络方法。它很简单,并且 对入(in)和出(out)边来说,它们有同样的起始点,但是这样会引入一些重复 数据。从节点的角度来看,把信息都集中到链接源节点可以表达相同的信 息(并节省一些磁盘空间)。我们把这个称作“邻接列表”(adjacency list),它 能用Pig工具通过一个简单的GROUP BY操作产生。加载数据文件:

a_replies_b = LOAD 'a_replies_b.tsv' AS (src:chararray, dest:chararray);

通过在源节点上进行分组,找到从每个节点出来的边:

replies_out = GROUP a_replies_b BY src;
DUMP replies_out
(cutting,{(tom_e_white)})
(josephkelly,{(wattsteve)})
(mikeolson,CLusciousPear), (kevinweil), (LusciousPear), (tlipcon)}) 
(mndoci,{(mrflip),(peteskomoroch),(LusciousPear),(mrflip)}) 
(mrflip,{(LusciousPear),(mndoci),(mndoci),(esammer),(ogrisel),(esammer),(wattsteve)}) 
(peteskomoroch,{(CMastication),(esammer),(DataDunkie),(mndoci),(nealrichter),... 
(tlipcon,{(LusciousPear),(LusciousPear),(nealrichter),(mrflip),(kevinweil)}) 
(tom_e_white,{(mrflip),(lenbust)})

2.度(degree)

对影响力,一种简单而有效的度量就是一个用户收到的回帖数。用图的术语来说,是度(degreeX因为这是一个有向图,所以入度尤其重要)

Pig的嵌套FOREACH语法能使我们在一次数据扫描之后,计算参与进来的 不同的回帖者总数(邻居节点)以及回帖总数:

a_replies_b = LOAD ' a_replies_b.tsv' AS (src:chararray, dest:chararray); 
replies_in = GROUP a_replies_b BY dest; -- group on dest to get in-links
replies_in_degree = FOREACH replies一in {
 nbrs = DISTINCT a_replies_b.src;
GENERATE group, COUNT(nbrs), COUNT(a_replies_b);
};
DUMP replies_in_degnee
(cutting,1L,1L)
(josephkelly,1L,1L)
(mikeolson,3Lj4L)
(mndoci,3L,4L)
(mrflip,5L,9L) 
(peteskomoroch,9L,18L)
(tlipcorij4L, 8L)
(tom_e_white,2L,2L)

在这个示例里,@peteskomoroch有9个邻居节点和18个回帖,远远多于 其他大多数节点的数据。社交网络中度的大小通常存在很大的区别。大多 数用户都只有少数几个回帖,但是少数的名人——如@THE_REAL_SHAQ(篮球

明星Shaquille 0Neill@sockington(—只虚构的猫) 能收到上百万的回帖。相比之下,公路地图上几乎每个交叉点都是简单的十字形。®由于度 的巨大偏差而产生的偏斜数据流对如何处理这样的图数据有很大的影响, 后面会有更多介绍。

16.7.3对称链接

有几百万人在twitter上给@THE_REAL_SHAQ回帖声援支持的时候,他不回 复这几百万人是可以理解的。如图所示,我经常和@mndoci交流,a让我们 之间的边是“对称链接”(symmetric link)。这精确地反映了我和@mndoci

①由于边对记录的规模比较小以及Hadoop实现细节的繁琐,mapper也许会很快就把 数据存储到硬盘上。如果jobtracker的运行界面上出现spilled records严重超出

map output records” 字样,请试着把 io. sort .record .percent 参数值调大一 些:PIG_OPTS=" – Dio. sort. record. percent=0.25 -Dio. sort. mb=350M pig my__file.pig0

②我们能够想到的最大的道路地图特殊情况就是位于英国斯温顿的著名大转盘Magic

Roundabout,它的度是 10

%28Swindon%29 0

③开源数据倡导者Deepak Singh,也是Amazon AWS云计算的业务开发经理

有更多共同兴趣(相较于@THE_REAL_SHAQ)

找到对称链接的一个方法是获取那些同时出现在A Replied To B(A回帖 给B)边集合和A Replied By B(B回帖给A)的边集合的边。我们能通过内 部“自连接”操作来实现交集操作,以此来发现对称链接:

a__repl_to_b = LOAD 'a_replies_b.tsv' AS (user_a:chararray, user_b:chararray); a_repl_by_b = LOAD 'a_replies_b.tsv' AS (user_b:chararray, user_a:chararray);
 --symmetric edges appear in both sets 
a_symm_b_j = DOIN a_repl_to_b BY (user_a, user_b), 
a_repl_by_b BY (user_a> user_b);
...

但是,这个过程结束之后,它将发送两个完全的边-对列表给reduce阶段, 这要求系统提供双倍内存。如果从一个节点的角度来看,一个对称链接等同于一对边:一个出一个进,利用这个信息我们能做得更好。如下代码所示,我们可以根据节点进行排序,把排序值低的那个节点放在第一位,然 后我们可以得到一个无向图,但把链接的方向保存为边的一种元数据:

a_replies_b = LOAD 'a_replies_b.tsv' AS (src:chararray, dest:chararray); 
a_b_rels = FOREACH a_replies_b GENERATE 
    ((src <= dest) ? src : dest) AS user_a,
    ((src <= dest) ? dest : src) AS user_b,
    ((src <= dest) ? 1 : 0) AS a_re_b:int^
    ((src <= dest) ? 0 : 1) AS b_re_a:int;
DUMP a_b_rels 
(mrflip,tom_e_white,1,0)
(josephkelly,wattsteve,0,1)
(mrflip,mza,1,0)
(nealrichter,tlipcon,0,1)

现在我们汇总每对节点间的所有边。一个对称边在每个方向至少有一个回帖:

a_b_rels_g = GROUP a_b_rels BY (user_a, user_b); 
a_symm_b_all = FOREACH a _b_rels_g GENERATE 
    group.user_a AS user_a, 
    group.user_b AS user_b,
    (((SUM(a_b_rels.a_re_b) > 0) AND
        (SUM(a_b_rels.b_re_a) > 0) ) ? 1 : 0) AS is_symmetric:intj;
    DUMP a_symm_b_all
    (mrflipj tom_e_white,1)
    (mrflip,mza,0)
    (josephkelly,wattsteve,0)

(nearlrichter,tlipcon,1)
...
a_symm_b=FILTER a_symm_b_all BY (is_symmetric==1);
STORE a_symm_b  INTO 'a_symm_b.tsv';

这里有一部分输出,显示和@tom_e_white之间有一个对称链接:

(mrflip,tom_e_white,1)
(nealrichter,tlipconj1)

16.7.4社区提取 

到目前为止,我们已经提供了节点度量(入度)@度量的方法(对称链接判 定)。让我们进一步看看如何度量邻居关系:一个I旨定用户的朋友中有多少 人彼此之间是朋友?同时,我们将产生一个边集来实现前一个例子那样的 可视化展示。

1.获取邻居

选择一个种子节点(这里是@hadoop)。首先“收集”种子节点的邻居节点:

a_replies_b = LOAD 1a_replies_b.tsv' AS (src:chararray, dest:chararray); 
--Extract edges that originate or terminate on the seed 
n0_edges = FILTER a_replies_b BY (src == 'hadoop') OR (dest == 'hadoop');
--Choose the node in each pair that *isn't* our seed:
n1_nodes_all = FOREACH n0_edges GENERATE
((src == 'hadoop') ? dest : src) AS screen_name; 
nl_nodes = DISTINCT nl_nodes_all;
DUMP nl_nodes

现在我们把这个邻居集和开始节点集进行交集处理,从而找出所有始于 n1_nodes集合的边:

n1_edges_out_j = DOIN a_replies_b BY src,
n1_nodes BY screen_name USING 'replicated'; 
n1_edges._out = FOREACH nl_edges_out_j GENERATE src, dest;

我们得到的图的副本数据(超过io亿条边)仍然太大而不能搬入内存。另一 方面,一个单独用户的邻居人数很少会超过百万,所以它可以轻松地读入 内存。在JOIN操作里包含USING 'replicated'会让Pig做一个map端的连 接操作(也称作fragment replicate join,片断复制连接)Pignl_nodes关系数据读入内存当作一个査找表,然后把整个边集的数据连续地载入内存。 只要连接条件满足(srcnl_nodes查询表中),就产生输出。没有reduce步骤意味着速度得以显著提升。

为了只留下源和目标节点都是种子节点的邻居的边,重复执行如下连接操作:

n1_edges_j = 30IN n1_edges_out BY dest,
n1_nodes BY screen_name USING 'replicated'; 
n1_edges = FOREACH n1_edges_j GENERATE src, dest;
DUMP n1_edges
(mrflip,tom_e_white)
(mrflip,mza)
(wattsteve,josephkelly)
(nealrichter,tlipcon)
(bradfordcross,lusciouspear)
(mrflipj jeromatron)
(mndoci,mrflip)
(nealrichter, datajunkie)

2.社区度量标准和1百万百万数量级的问题

把@hadoop、@cloudera@infochimps作为种子节点,我把类似的脚本应用到20亿条消息集中来创建图16-24,本书网站(http:hadoopbook.cow)上 也有。

 image.png

图16-24. Twitter上的大数据社区

可以看出,这种大数据社区的关联度是很髙的。相对来说,名人(如 @THE_REAL_SHAQ)的邻居非常稀疏。我们可以用“集聚系数”(clustering coefficient)来表示这样的关系,定义为:实际的nl_edges和可能的 nl_edges的集合数据量的比值。值的范围是从0(邻居节点互不关联)1(邻居节点两两互相关联)。集聚系数值髙,表明它是一个凝聚性高的社 区。集聚系数值较低表明社区节点的兴趣很分散(@THE_REAL_SHAQ节点 的情况),或表明这是一个非有机组织社区,可能存在垃圾账户。

3.在全局数据上的局部特性

我们已经算出了对一个节点、一条边以及一个邻居的社区度量标准。那么 面对整个网络该怎么做?这里没有足够的篇幅来4井述这个问题,但是通过 在图上产生节点的“三角关系”,你能同时测量每个节点的集聚系数值。 对每个用户,比较他们所属的三角关系的个数及其链接度,就可以得到集聚 系数值。

请注意,还记得我们前面对节点度巨大差异性的讨论吗?不假思索地扩展前面的方法会导致数据的激增流行音乐明星@britneyspears(在2010年7月有520万粉丝,42万关注)@WholeFoods(170万粉丝,60万关 注),每个人都会产生上万亿的数据记录。更糟糕的是,因为大社区具有稀 疏的集聚系数,所以几乎所有这些数据都会被丢弃掉!我们有更巧妙的方法 在整个图上做数据处理。但一定要牢记现实世界是如何描述这个问题的。如果你断言@britneyspears和这42万人不是朋友,你可以只保留强链 接。给每条边赋权重(考虑关注数、是否是对称链接等因素)并且给来自某个节点.的链接数设置上限。这将大幅缩减中间数据的规模,但仍然可以合理 地估计社区的凝聚性。

转载请注明:全栈大数据 » 第十六章 实例学习

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址