整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

16.5.6 Hadoop 和 Cascading 在 ShareThis 的应用

hadoop 花牛 18℃ 0评论

ShareThis是一个方便用户共享在线内容的共享网络。通过单击网页上或浏 览器插件上的一个按钮,ShareThis允许用户无缝地访问他们的任何在线联 系人及在线网络,并且允许他们通过电子邮件、在线聊天、Facebook、 Digg和手机短信等方式共享它们的内容,而这一过程的执行甚至不要求他

们离开当前的访问网页。发布者能通过部署ShareThis按钮来挖掘服务的通 用共享能力,以此推动网络流量,刺激传播活动,追踪在线内容的共享。 通过减少网页杂乱的内容及提供跨社交网络、联盟群体和社区的实时内容 分布功能,ShareThis也简化了社交媒体服务。

ShareThis用户通过在线窗口共享网页和信息时,一个连续的事件数据流就 进入ShareThis网络。这些事件首先要过滤和处理,然后传送给各种后台系 统,包括 AsterData, Hypertable 和 Katta

这些事件信息的数据量能达到很大数量级,导致传统的系统无法处理。这 种数据的“污染”(dirty)也很严重,主要归咎于流氓软件系统的“注入式攻 击”、网页缺陷或错误窗口。因此,ShareThis选择为后台系统部署Hadoop 作为预处理和管理协调前台。他们也选择使用Amazon Web服务(基于弹性 云计算平台EC2)来托管其服务器,并且使用Amazon S3(简单服务存储服务提供长期的存储功能,目的是利用其灵活的MapReduce模式(Elastic MapReduce, EMR)

这里着重介绍“日志处理管道”(图16-19)。日志处理管道只是简单地从S3 文件夹(bucket)里读取数据,进行处理(稍后介绍),然后把结果存入另个 文件夹。简单消息队列服务(Simple Queue Service, SQS)用于协调各种事件 的处理,用它来标记数据处理执行程序的开始和完成状态。下行数据流是 一些其他的处理程序,它们用于拖动数据装载AsterData数据仓库,如从 Hypertable系统获取URL列表作为网络爬取工具的下载源,或把下载的网 页推入Katta系统来创建Lucene索引。注意,Hadoop系统是ShareThis整 个架构的中心组件。它用于协调架构组件之间的数据处理和数据移动工作。

有了 Hadoop系统作为前端处理系统,在所有事件日志文件被加载到 AsterData集群或被其他组件使用之前,它会基于一系列规则对数据进行解 析、过滤、清理和组织。AsterData是一个集群化数据仓库系统,它能支持 大数据存储,并允许使用标准的SQL语法发出复杂的即时奄询请求。 ShareThis选择Hadoop集群来进行数据清理和准备工作,然后它把数据加 载到AsterData集群实现即席分析和报表生成。尽管使用AsterData也有可 能达到我们的目的,但是在处理流程的第一阶段使用Hadoop系统来降低主 数据仓库的负载具有重要意义。

image.png

为了简化开发过程,制定不同架构组件间的数据协调规则以及为这些组件 提供面向开发者的接口,Cascading被选作主要的数据处理API。这显示出 它和传统Hadoop用例的差别,它们主要是用Hadoop来实现对存储数据的査 询处理。

相反,Cascading和Hadoop的结合使用为端到端的完整解决方案提供了一 个更好、更简单的结构,因此对用户来说更有价值。

对于开发者来说,Cascading的学习过程很简单,它从一个简单的文本解析 单元测试(通过创建cascading.ClusterTestCase类的子类)开始,然后把 这个单元程序放入有更多规则要求的处理层,并且在整个过程中,与系统 维护相关的应用逻辑组织不变。Cascading用以下几种方法帮助保持这种逻 辑组织的不变性。首先,独立的操作(Function, Filter)都可以进行独 立编程和测试。其次,应用程序被分成不同的处理阶段:一个阶段是解 析,一个阶段是根据规则要求进行处理,最后个阶段是封装/整理数据, 所有这些处理都是通过前述的SubAssembly基础类实现的。

ShareThis的日志文件数据看起来非常像Apache日志文件,它们有日期/时 间戳、共享URL、引用页URL和一些元数据。为了让分析下行数据流使用 这些数据,这些URL必须先解压(解析查询字符串数据和域名等)。因此需 要创建一个高层的SubAssembly对象来封装解析工作,并且,如果字段解 析很复杂,SubAssembly子对象就可被嵌入来解析一些特定字段。

我们使用同样的方式来应用处理规则。当每个Tuple对象通过规则处理 SubAssembly类对象的时候,如果有任何规则被触发,该对象就会被标记 上标签“坏”(bad)。具有“坏”字标签的Tuple对象,会被附上被标记的 原因用于后来的审查工作。

最后,创建一个切分SubAssembly类对象来做两件事。第一,用于对元组 数据流进行分流处理,一个数据流针对标记“好”(good)的数据,另一个针 对标记“坏”的数据。第二,切分器把数据切分成片,如以小时为单位。 为了实现这一动作,只需要两个操作:第一个是根据已有数据流的 timestamp(时间戳)创建区间段;第二个是使用interval(区间)good/bad元 数据来创建目录路径(例如,“〇5/good/”中“05”是早上5点,“good” 是通过所有规则验证的数据)。这个路径然后被Cascading TemplateTap使 用,这是一个特殊的Tap类型,它可以根据Tle对象值把元组数据流动 态输出到不同的路径位置。本例中,“path”值被TemplateTap用来创建最 终输出路径。

开发者也创建了第四个SubAssembly类型对象——它用于在单元测试时应用 Cascading Assertion(断言)类。这些断言用来复査规则组件和解析 Sub Assembly做的工作。

在范例16-5的单元测试中,我们看到partitioner没有被检测,但是它被放 入另外一个这里没有展示的集成测试中去了。

范例16-5.单元测试

public void testLogParsing() throws IOException
    Hfs source = new Hfs(new TextLine(new Fields("line")), sampleData);
    Hfs sink =
        new Hfs(new TextLine(), ou1>putPath + "/parser", SinkMode.REPLACE);
    Pipe pipe = new Pipe("parser");
    // split "line" on tabs
    pipe = new Each(pipe, new Fields("line"), new RegexSplitter("\t")); 
    pipe = new LogParser(pipe); pipe = new LogRules(pipe);
    // testing only assertions
    pipe = new ParserAssertions(pipe);
    Flow flow = new FlowConnector().connect(source, sink, pipe);
    // verify there are 98 tuples, 2 fields, and matches the regex pattern 
    // for TextLine schemes the tuples are {     "offset", "line } 
    validateLength(flow, 98, 2, Pattern.compile(,,A[0-9] + (\\t[A\\t]*){19}$M));
}

针对集成和部署,许多Cascading内置属性使得该系统和外部系统更容易集 成,并允许进行更大规模的处理工作。

在生产环境中运行时,所有的SubAssembly对象都连接起来并规划到一个 Flow对象里,但是除了源和目标Tap对象之外,我们也设计了错误捕捉 (trap)Tap对象(16-20)。通常,当远程的Mappej•或Reducer任务抛出一 个异常的时候,Flow对象就会失败并杀死它管理岛所有MapReduce作业。 当一个Flow有错误捕捉类的时候,所有的异常都会被捕捉并且造成异常的 数据信息会被保存到当前这个捕捉程序对应的Tap对象里。然后可以在不 终止当前Flow的情况下,继续处理下一个Tuple对象。有时你想让程序 在出现错误的时候就停止,但在这里,ShareThis开发者知道在系统运行的 时候,他们能同时回览并査看“失败”的数据,然后更新其单元测试。丢 失几个小时的处理时间比丢失几个坏记录数据更糟糕。

 image.png

图 16-20. ShareThis 日志处理 Flow

使用Cascading的事件监听器,Amazon SQS可被集成进来。当一个Flow 结束的时候,系统就发送一条消息来通知其他系统它们已经可以从 Amazon S3上获取准备好的数据了。当Flow处理失败的时候,发送不同 的消息向其他的进程报警。

其余的位于不同的独立集群的下行数据流进程将在中断的日志处理管道位

置处开始处理。现在日志处理管道一天运行-次,而我们没有必要让loo 个节点的集群空转23个小时。因此我们是每24小时执行一次终止和启用 操作。

将来,在小型的集群上根据业务需求,增加运行间歇期到每6个小时一次 或1小时•次都是非常简单的。其他的集群系统可以独立地根据各自负责 的业务需要以不同的间隔期启用或关闭。例如,网络数据爬取组件(使用 Bixo,它是EMIShareThis开发的基于Cascading的网络数据爬取工具可以在一个小型集群上与Hypertable集群协作连续运转。这种随需应变的 模型在Hadoop上运行良好,每个集群都能把工作负载调节到它期望处理的 量级。

转载请注明:全栈大数据 » 16.5.6 Hadoop 和 Cascading 在 ShareThis 的应用

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址