整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

16.4.4收集和存储

hadoop 花牛 20℃ 0评论

1.日志收集

产生日志的服务器分布于多个数据中心,但目前我们只有一个单独的 Hadoop集群,位于其中一个数据中心(参见图16-8)。为了汇总日志数据并把它们放人集群,我们使用syslog-ng(Unix syslog机制的替代)和一些简单的脚本来控制Hadoop上文件的创建。image.png

图 16-8. Rackspace 的 Hadoop 数据流

在一个数据中心里,syslog-ng用于从logsowrce机器传送日志数据到一 组负载均衡的日志收集机器c//ec;r。在这些收集器上,每种类型的日志 数据被汇成一个单独的数据流,并且用gzip格式进行轻量级的压缩(16-8 步骤A)。远程收集器的数据通过SSH通逭跨数据中心传送到Hadoop集群 所在的“本地收集器”(local collector)(步骤B)

一旦压缩的日志流到达本地收集器,数据就会被写入Hadoop(步骤C)。目 前我们使用简单的Python脚本把输入数据缓存到本地硬盘,并且定期使用 Hadoop命令行把数据放入Hadoop集群。当缓存日志数据量达到Hadoop数 据块大小的倍数或是缓存已经经过了足够长的时间时,脚本程序开始复制 日志缓存数据到Hadoop的各个输入文件夹。 这种从不同数据中心安全地汇总日志数据的方法在Hadoop支持SOCKS之 前就已经有人开发使用了,SOCKS是通过hadoop. rpc.socket.factory. class.default参数和SocksSocketFactory类实现的。通过直接使用远 程收集器对SOCKS的支持和HDFSAPI,我们能够从系统中消除一次磁 盘的写入操作和降低系统的复杂性。我们计划在将来的开发中实现一个使 用这些特性的替代产品。

一旦原始日志被存放到Hadoop上,这些日志就已经准备好交给我们的 MapReduce作业处理了。

2.日志存储

我们的Hadoop集群目前包含15个数据节点,每个节点都使用普通商用 CPU3500 GB的硬盘。我们对在6个月中必须存档处理的文件使用默认的备份因子3,而其他的文件采用2个备份机制。

Hadoop的域名节点namenode使用的硬件和数据节点datanode相同。为了 提供比较髙的可用性,我们使用两个辅助namenode和一个虚拟IP,IP 可以很容易地指向3台机器中任意一台具有HDFS快照的硬盘。这表明在 故障转移情形下,根据辅助namenode的快照时间,我们可能会丢失最多 30分钟的数据。数据丢失的量取决于辅助namenode的数据快照老化时间 设置。虽然这对于我们的日志处理应用来说是可接受的,但是其他Hadoop 应用可能要求通过为namenode镜像提供共享存储的能力来实现无损的故障转移。

16.4.5对日志的MapReduce处理 1.处理 在分布式系统中,唯一标识符令人失望的地方是它们基本不可能是真正唯 一的。所有的电子邮件消息都拥有一个(所谓的)唯一标识符,叫message- id, 它由消息发起的主机产生, 但是一个不良客户端能够轻松发送重复消 息。另外,因为Postfix设计者并不相信message-id可以唯一地标识消息, 所以他们不得不提出设计一个独立的ID(标识)queue-id,并保证它在消息 的本地机器生命周期内唯一。

尽管message-id趋向于成为消息的权威标识,但在Postfix日志中,需要使 用queue-id来査找message-id。看范例16-1第二行(为了适合页面大小,日 志行的格式做了调整),你将发现十六进制字符串1DBD21B48AE,它就是该 行消息的queue-id。因为日志收集的时候(可能每隔几小时进行一次),每个 消息(包括它的message-id)的信息都输出到单独的行,所以我们的解析代码有 必要去保留消息的状态信息。

范例16-1. Postfix日志行

Nov 12 17:36:54 gate8.gate.sat.mlsrvr.com postfix/smtpd[2552]: connect from hostname 
Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/qmgr[9489]: 1DBD21B48AE: 
from=<mapreduce@rackspace.com>, size=5950, nrcpt=l (queue active)
Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtpd[28085]: disconnect from hostname
Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: too many errors after DATA from hostname
Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: disconnect from hostname 
Nov 12 17:36:54 gatel0.gate.sat.mlsrvr.com postfix/smtpdf10311]: connect from hostname 
Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtp[28107]: D42001B48B5: 
to=<mapreduce@rackspace.com>, relay=hostname[ip]j delay=0.32, delays=0.28/0/0/0.04, 
dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 1DBD21B48AE)
Nov 12 17:36:54 gate20.gate.sat.mlsrvr.com postfix/smtpd[27168]: disconnect from hostname 
Nov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/qmgr[1209]: 645965A0224: removed 
Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/smtp[15928]: 732196384ED: 
to=<m apreduce@rackspace.com>, relay=hostname[ip], conn_use=2, delay=0.69, 
delays=0.04/ 0.44/0.04/0.17, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 02E1544C005)
Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/qmgr[13764]: 732196384ED: removed 
Nov 12 17:36:54 gatel.gate.sat.mlsrvr.com postfix/smtpd[26394]: NOQUEUE: reject: RCP T 
from hostname 554 5.7.1 <mapreduce@rackspace.com>: 
Client host rejected: The sender's mail server is blocked; 
from=<mapreduce@rackspace.com> to=<mapred uce@rackspace.com> 
proto=ESMTP helo=<mapreduce@rackspace.com>

从MapReduce的角度看,日志的每一行是一个单独的键/值对。第一步,我 们需要把所有具有相同queue-id的行放在一起,然后执行reduce过程判断

日志消息数据是否能完整地表示这个queue-id对应的数据。 类似地,一旦我们拥有一个queue-id对应的完整的消息,在第二步,我们 需要根据message-id对消息进行分组。我们把每个完整的queue-id和其对 应的message-id放在一起作为键(key),而所对应的日志行作为值(value)。 在Reduce阶段,我们判断针对某个message-id的所有的queue-id是否都表明 消息已经离开我们的系统。

邮件日志的MapReduce作业的两阶段处理和它们的InputFonnat与 OutputFormat 形成了一种 SEDA (staged event-driven archilecture,分阶段事件 驱动处理架构)。在SEDA里,一个应用被分解为若干个“阶段”,“阶 段”通过数据队列区分。在Hadoop环境下,队列可能是MapReduce作业 使用的HDFS中的一个输入文件夹或MapReduce作业在MapReduce处 理步骤之间形成的隐性的数据队列。

在图16-9中,各个阶段之间的箭头代表数据队列,虚线箭头表示隐性的 MapReduce数据队列。每个阶段都能通过这些队列发送键值对(SEDA称之 为事件或消息)给其他处理阶段。

阶段1: Map在邮件日志处理作业的第一阶段,Map阶段的输入或是以行 号为键、以对应的日志消息为值的数据,或是以queue-id为键、以对应的日志消息数组作为值的数据。当我们处理来自输入文件数据队列的源曰志文件的时候,产生第一种类型的输入,而第二种类型是一种中间格式,它用来表示一个我们已经试图处理但因为queue-id对应数据不完整而重新进 行数据排队的queue-id的状态信息。

为了能处理这两种格式的输入,我们实现了 Hadoop的InputFonnat类, 它根据FileSplit输入文件的扩展名把工作委托给底层的 SequenceFileRecordReade?类或 LineRecordReader 类处理。这两种输 入格式的文件来自HDFS中不同的输入文件夹(即数据队列)

阶段1: Reduce在这一阶段,Reduce根据queue-id是否拥有足够的曰志 行来判定它是否完整。假如queue-id已经具有完整的消息,便输出以 message-id作为键、以HopWritable对象为值的数据对。否则,queue-id被设置为键,日志行数组重新列队并和下一组原始日志进行Map处理。这个过程将持续到queue-id已经完整或操作超时。

 image.png

图 16-9. MapReduce 处理链

 

HopWritable 对象是 POJO 对象(Plain Old Java Objects,简单 Java 对象),实现了 Hadoop的Writable接口。它从一台单独服务器的视角完整地描述一条消息,包括发送地址和ip,消息发送给其他服 务器的尝试记录,标准的消息头信息。

通过实现OutputFormat类完成输出不同的结果,这一过程对应于 我们的两个InputFormat类输入格式。Hadoop API在版本r0.17.0中添加 MultipleSequenceFileOutputFormat 类之前,我们已经实现 MultipleSequenceFileOutputFormat 类,它们实现同样的目标: Reduce作业根据键的特点存储数据对到不同的文件。

阶段2: Map在邮件日志处理作业的第二个步骤,输入是从上个阶段得到 的数据,它是以message-id为键、以HopWritable类对象数据为值的数据 对。这一步骤并不包含任何逻辑处理,而是使用标准的SequenceFilelnputFormat

类和IdentityMapper类简单地合并来自第一阶段的输入数据。

阶段3: Reduce在最终的reduce步骤,我们想判断针对某个通过系统的 message-id,收集到的所有HopWritable_对象是否能表示它经过系统的整 个消息路径。一条消息路径实际上是一个有向图.(通常是没有循环的,但如 果服务器被错误设置,有可能会包含循环)。在这个图里,点代表服务器, 可标记多个queue-id,服务器之间消息的传送形成了边。对这个应用的实 现,我们使用了 JGraphT图库。

对于输出,我们又一次使用MultiSequenceFileOutputFormat类。如果 reducer判定对于某个message-id的所有queue-id能够创建一条完整的消息 路径,消息就会被序列化,并排队等候SolrOutputFormat类的处理。否 则,消息的HopWritable对象会被列入阶段2: Map阶段的队列,然后与 下一批queue-id —起重新处理。SolrOutputFormat类包含一个嵌入式Apache Solr实例对象模仿的是Solrwiki(/i取最初提出的一种方法一一来产生 本地硬盘的索引信息。关闭OutputFonnat类同时要求把硬盘索引压缩到 输出文件的最终地址。与使用Solr's HTTP接口或直接使用Lucene相比, 这种方法有以下几个优点:

    •我们能实施 Solr 模式

    • MapReduce保持幂等性 

    •搜索节点不承担索引负载

我们目前使用默认的HashPartitioner类来决定Reduce任务和特定键之 间的对应关系,就是说键是半随机分布的。在以后的新版系统中,我们将 实现一个新的Partitioned,它通过发送地址(我们最常用的搜索词)来切 分数据。一旦索引以发送者为单位分割,我们就能够使用地址的哈希值来 判断在哪里合并或查询索引,并且我们的搜索API也只需要和相关的节点 进行通信。

2.合并相近词搜索

在一系列的MapReduce阶段完成之后,一系列不同计算机会得知新的索引 信息,进而可以进行索引合并。这些搜索节点不仅具有把索引合并置于本 地磁盘的服务(参见图16.8步骤D),它们还运行Apache TomeateSolr

托管已完成的索引信息。

来自SolrOutputFormat类的每个压缩文件都是一个完整的Lucene索引, Lucene提供IndexWriter.addIndexes()方法支持快速合并多个索引。我 们的MergeAgent服务把每个新索引解压到Lucene RAMDirectory或 FSDirectory(根据文件的大小),把它们和本地硬盘索引合并,然后发送一 t<commit/>请求给Solr对象,它负责提供索引服务并使更新后的索引能够 用于査询处理。

切片 Query/Management(查询/管理)API是一个PHP代码层,它主要是处 理输出索引在所有搜索节点上的“切片”(sharding)。我们使用一个简单的 “一致性哈希”(consistent hashing)来判定搜索节点和索引文件之间的对应 关系。目前,索引首先按照创建时间切片,然后再根据其文件名的哈希值 切片,但是我们计划将来用发送地址的哈希值取代文件名的哈希值来进行 切片(参见阶段2: Reduce)。 因为HDFS已经处理了 Lucene索引的备份问题,所以没有必要在Solr对象 中保留多个副本。相反,在故障转移时,相应的搜索节点会被完全删除, 然后由其他节点负责合并索引。

搜索结果使用这个系统,从产生日志到获得搜索结果供客服团队使用, 我们获得了 15分钟的周转时间。

搜索API支持Lucene的全部查询语法,因此我们常常可以看到下面这样的 复杂査询:

sender:"mapreduce@rackspace.com" -recipient:"hadoopgrackspace.com" 
recipient:"grackspace.com" short-status:deferred timestamp:[1228140900 TO 
2145916799]

查询返回的每个结果都是一个完整的序列化消息路径,它表明了各个服务器和接收者是否收到了这个消息。现在我们把这个路径用一个二维图展示 出来(图16-10),用户可以通过扩展自己感兴趣的节点来和这个图互动,但 是在这个数据的可视化方面还有很多需要改进的地方。

3.为分析进行存档

除了为客服提供简短词语的捜索功能之外,我们也对日志数据的分析感兴趣。

 image.png

图16-10.数据树

每晚,我们运行一系列的MapReduce作业,它们的输入是白天产生的索引 数据。我们实现了 SolrlnputFormat类,它可以拖回并解压索引文件然后 用键值对的形式输出毎个文档。使用这种InputFormat类,我们可以遍历 一天产生的所有消息路径,几乎可以回答我们邮件系统的任何问题,包括:

    •每个域的数据(病毒程序,垃圾邮件,连接状况,收件人)

    •最有效的垃圾邮件规则 

    •特定用户产生的负载 

    •消息量反弹的原因 

    •连接的地理分布信息 

    •特定机器之间的平均时间延迟

因为在Hadoop上,我们拥有好几个月的压缩索引信息,所以还能够回顾性地回答夜间日志概要工作忽略的问题。例如,我们近期想确定每个月消息 发送量最大的IP地址,这个任务我们可以通过做一次简单的MapReduce作 业来完成。

转载请注明:全栈大数据 » 16.4.4收集和存储

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址