整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

16.3.3 Nutch系统利用Hadoop进行数据处理的精选实例

hadoop 花牛 10℃ 0评论

下面几节详细描述了几种Nutch工具,主要用于说明Nutch系统如何利用 MapReduce模式来完成具体的数据处理任务。

1.链接逆转

爬取到的HTML页面包含HTML链接,这些链接可能指向它本身(内部链接)或指向其他网页。HTML链接从源网页指向目标网页,参见图16-6

然而,许多计算网页重要性(或质量)的算法需要反向链接的信息,也就是那 些具有链接指向当前页面的网页。进行网页爬取的时候,我们并不能直接 得到这些信息。另外,如果能把入链接的锚文本也用于索引,索引也会受 益,因为这些锚文本可以从语义上丰富当前页面的内容。

 image.png

图16-6.链接逆转 

如前所述,Nutch收集出链接信息,然后用这些数据构造一个LinkDb,用 来存放以入链接和锚文本形式存在的反向链接数裾。

本小节槪述一下LinkDb工具的实现过程,这里只为了展现处理过程的清晰 画面而忽略了很多细节描述(URL规范化处理和过滤)。我们保留了描述 一个典型的实例的内容,解释为什么MapReduce模型能够如此合适地被应 用到搜索引擎所要求的这种关键数据转换处理任务。大规模搜索引擎需要 处理大量的网络图数据(许多页面都有很多出/入链接),Hadoop提供的并行 处理和容错机制使得这样的处理成为可能。另外,使用map-sort-reduce基 本操作可以很容易地表达链接逆转这一过程,我们将在下面进行介绍。

下面的代码片段展示了开发LinkDb工具的作业初始化过程:

DobConf job = new DobConf(configuration);
FilelnputFormat.addInputPath(job, new Path(segmentPath,"parse_data"));
job.setInputFormat(SequenceFileinputFormat.class);
job.setMapperClass(LinkDb.class);
job.setReducerClass(LinkDb.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
job.setOutputFormat(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, newLinkDbPath);

可以看出,这个作业的输入源数据是爬取的URL列表(键)以及相应的 ParseData记录,ParseData包含的其中一项数据是每个页面的出链接信息,它是一个数组。一个出链接记录包含目标URL以及相应的锚文本。

这个作业的输出也是一个URL列表(键),但是值是入链接,它其实只是一 个包含目标URL和锚文本的特殊入链接集合。

出乎意料的是,这些URL—般以纯文本而并非以java.net.URLjava.net.URI实例的形式存储和处理。这么做有几个原因:从下载内容里提 取的URL通常需要做规范化处理(如把主机名变成小写形式,解析相对路 径),或它们是已经坏掉或无效的URL,或它们引用的是不支持的协议。许 多规范化和过滤操作能更好地以文本模式进行表达,它可以跨一个URL的 多个组成部分。此外,考虑到链接分析,我们仍然要处理和计算这些非 URL类型的数据。

让我们进一步査看map()和reduce()的实现。在这个例子中,它们非常简 单以至于这两个函数可以在同一个类实现:

public void map(Text fromUrl, PanseData parseData, 
    OutputCollector<Text, Inlinks> output, Reporter reporter) {
    ...
    Outlink[ ]outlinks = parseData.getOutlinks();
    Inlinks inlinks = new Inlinks(); 
    for (Outlink out : outlinks) {
        inlinks.clear(); // instance reuse to avoid excessive GC 
        String toUrl = out.getToUrl();
        String anchor = out.getAnchor(); 
        inlinks.add(new Inlink(fromUrl, anchor)); 
        output. collect (new Text(tollrl), inlinks);
    }
}

从这个代码段可以看到,对每个出链接Outlink,我们的map()函数产生一 对ctoUrl, Inlinks〉,其中 Inlinks 只包含一个 Inlink,该 Inlink 是 由fromUrl和它的锚文本组成。链接的指向实现了反转。

接着,这些只有一个元素的Inlinks对象用reduceO方法实现聚集处理:

public void reduce(Text toUrl,Iterator<Inlinks> values,
OutputCollector<Text, Inlinks> output, Reporter reporter) {
        Inlinks result = new Inlinks(); 
        while (values.hasNext() { 
            result.add(values.next());
        }
    output.collect(toUrl, result);
}

从这段代码来看,很明显我们已经得到了想要的数据,即指向toUrl变量的所有fromUr'l列表以及相应的锚文本信息。逆转过程完成。

然后这些数据以MapFileOutputFormat格式保存,形成新的LinkDb数据库。

2.生成 fetchlist

现在来看一个更加复杂的用例。fetchlist产生于CrawlDb的数据(map文 件的格式是<url, crawlDatum>,其中crawlDatum包含URL的状态信 息),它存放准备爬取的URL列表,然后Nutch Fetcher工具处理这个列 表。Fetcher工具本身是一个MapReduce应用程序(后面会介绍)。也就是说 输入数据(被分成W)将由Wmap任务处理,Fetcher工具强制执行这样 的规则即SequenceFilelutFormat格式的数据不能继续切分。前面我们 简单提过,fetchlist是通过一个特殊的方法产生的,因此fetchlist的每部分 数据(随后由每个map任务处理)必须满足特定的要求。

(1)来自同一台主机的所有URL最后要放入同一个分区。这是必须 的,以便Nutch可以轻松实现in-JVM(java虚拟机里)宿主级封锁 来避免目标主机超载。

(2)为了减少发生宿主级的封锁,来自同一台主机的URL应该尽量分 开存放(比如和其他主机的URL充分混合)

(3)任何一个单独主机的URL链接数不能多于;c个,从而使得具有很 多URL的大网站相对于小网站来说,就不会占主导地位(来自小网 站的URL仍然有机会被爬取)

(4)具有髙网页排序值的URL应该优先于低的那些URL

(5)在fetchlist中,URL总数不能超过广

(6)输出数据分区数应该和最优的爬取map任务数目一致。

本例中,需要实现两个MapReduce作业来满足所有这些要求,如图16-7所 示。同样,为了简洁,对下面的列表内容,我们将跳过对这些步骤的某些 细节描述。

步骤1:选择,基于网页排序值排序,受限于每台主机的URL数这一步 骤,Nutch运行一个MapReduce作业来选择一些被认为有资格被爬取的 URL列表,并根据它们的网页排序值(赋给每个页面的浮点数,如 PageRank值)对它们进行排序。输入数据来自CrawlDb,是一个<url, datum〉格式的map文件。这一作业的输出是〈score, <url, datum〉〉格 式的序列化文件,根据排序属性值降序排列。

 image.png

图 16-7.生成 fetchlist

首先,我们来看一下作业的设置:

FileinputFormat.addInputPath(job, crawlDbPath);
job.setInputFormat(SequenceFileinputFormat.class);
job.setMapperClass(Selector.class); 
job.setPartitionerClass(Selector.class); 
job.setReducerClass(Selector.class);
FileOutputFormat.setOutputPath(job, tempDir); 
job.setOutputFormat(SequenceFileOutputFormat.class); 
job.setOutputKeyClass(FloatWritable.class);
job.setOutputKeyComparatorClass(DecreasingFloatCompanator.class); 
job.setOutputValueClass(SelectorEntry.class);

Selector类实现了 个函数:mapper, reducer 和 partitioner。最后一个函 数非常有趣:Selector用了一个自定义Partitioner把来自同一主机 的URL分配给同一个reduce任务,这样我们就能满足前面列出的要求 3-5

如果我们不重写默认的partitioner,来自同一主机的URL最终会输出到不同的分区文件里面,这样我们就不能跟踪和限制URL总数,因为 MapReduce任务彼此之间不做任何交流。那么现在的情况是,属于同一台主机的所有URL都会由同一个reduce任务处理,这意味着我们能控制每台主机选择处理的URL数目。

很容易实现一个自定义的partitioner把需要在同一个任务中处理的数据最终放入同一个分区文件。我们首先来看一下Selector类如何实现Partitioner接口(它只包含一个方法):

/** Partition by host. */
public int getPartition(FloatWritable key, Writable value,
int numReduceTasks) {
    return hostPartitioner.getPartition(((SelectorEntry)value).url, key)
    numReduceTasks)
}

该方法返回在0到numReduceTasks – 1之间的一个整数。它简单地用原 始的URL替换了键,URL数据从SelectorEntry获取,这样做就可以把 URL(不是页面排序值)传递给PartitionUrlByHost类对象,并且在这里计算出URL属于的分区文件号:

/** Hash by hostname. */
public int getPartition(Text key, Writable value, int numReduceTasks) {
    String urlString = key.toString();
    URL url = null;
    try {
        url = new URL(urlString);
    } catch (MalformedURLException e) {
        LOG.warn("Malformed URL: '" + urlString
    }
    int hashCode = (url == null ? urlString : url.getHost〇).hashCode();
    // make hosts wind up in different partitions on different runs 
    hashCode A= seed;
    return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
}

从这个代码片断能看到,分区号的计算只针对URL的主机部分的地址,这意味着属于同一个主机的所有URL最终会被放入同一个分区文件。

这个作业的输出数据根据网页排序值降序排列。因为CrawlDB中有很多记录有相同的排序值,所以我们不能用MapFileOutputFormat来存储输出 文件,否则会违反map文件严格基于主键排序的固有规则。

细心的读者会注意到一点,我们没直接使用初始键值,但是我们又想保留这种初始的键值对,这里使用一个SelectorEntry类把初始的键值对传递给下一处理过程。

Selector.reduce()函数计算URL的总数和每个主机对应的最大URL 数,然后简单地摒弃多余的记录。注意,必须对URL总个数的限制进行近似 化处理。我们用总的限制数除以ruduce任务的个数得到当前任务允许处理的URL数目的限制范围。但是我们并不能肯定每个任务都能够得到平均的分配数,实际上在大多数情况下很难实现,因为在各个主机中分布的URL 数目是不均匀的。不管怎么样,对于Nutch来说,这种近似的控制已经足够了。

步驟2 :逆转,基于主机分区,随机排序在前面,我们得到<score, selectorEntry〉格式的序列化文件。现在我们必须产生<url, datum〉格式的序列化文件来满足前面描述的要求126。这个处理步骤的输入数据是步骤1的输出数据。

以下代码片断展示这个作业过程的初始设置:

FileInputFormat.addInputPath(job, tempDir); 
job.setInputFormat(SequenceFilelnputFormat.class); 
job.setMapperClass(SelectorInverseMapper.class);
 job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(SelectorEntry.class); 
job. setPartitionerClass(PartitionllrlByHost .class);
 job.setReducerClass(PartitionReducer.class)j;
job.setNumReduceTasks(numParts);
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(HashComparator.class);

SelectorlnverseMapper类简单地丢弃当前键(排序值),抽取原始的URL 并且把它设置为键,使用SelectorEntr'y类型对象作为值。细心的读者可能质疑:“为什么我们不进一步去再抽取原始的CrawlDatum,把它作为值? ”详情参见后文。

这个作业的最终输出是序列化文件,格式CrawlDatum>,但是 map阶段我们得到的输出是〈Text, SelectorEntry>格式数据。必须指出 的是我们要为map输出指定不同的键/值类,如采用setMapOutputKeyClass(),setMapOutputValueClass() 否则,Hadoop会假定我们用的类与为reduce输出声明的类一样(这样就会导致作业失败)

map阶段的输出使用PartitionUrlByHost类实现数据切分,它把来自同一主机的URL分配到同一个分区。这就满足要求1

一旦数据从map任务进入到reduce任务,Hadoop就会根据输出数据的键值 比较结果对数据排序,可利HashComparator类实现比较。这个比较类 采用简单的哈希机制来组织URL,该机制可保证来自同一主机的URL被尽量放在一起。

为了满足要求6,我们把reduce任务的数量设置成希望的Fetcher map任务 的数量(即前面提到的mjmParts),记住,每个reduce分区稍后将用于创建一PartitionReducer 类负责完成最后一步,即把<url,selectorEntry>数 据格式转换成<url, crawlDatum〉数据格式。使用HashComparator的一个令人吃惊的副作用是几个URL可能哈希成相等的哈希值,并且Hadoop 调用reduceO函数时只处理具有相等键值的第一个键对应的值,其他的值 系统认为是相等的从而被删除。现在能明白当初为什么我们必须在 SelectorEntry类的记录中保留所有的URL值,因为这么做我们就可以在 遍历值的时候抽取所有的URL。下面是这个方法的实现:

public void reduce(Text key, Iterator<SelectorEntry> values, 
OutputCollector<Text, CrawlDatum> output,
    Reporter reporter) throws IOException { 
    // when using HashComparator, we get only one input key in case of hash collisions 
    // so use only URLs extracted from values 
    while (values.hasNext()) {
        SelectorEntry entry = values.next(); 
        output.collect(entry.url, entry.datum);
    }
}

最终,reduce任务的输出在Nutch分区目录crawl_generate子目录下以 SequenceFileOutputFormat格式保存。输出文件满足前面的1-6项全部要求。

3.Fetcher:真正的多线程类MapRunner

Nutch的Fetcher应用程序负责从远程站点下载网页内容。因此,为了尽量 减少爬取fetchlist的时间,对于这个处理过程来说利用每个机会来做并行处 理相当重要。

在Fetcher应用中,已经有一级并行机制 输入fetchlist分成多个部分,然后它们被分配给多个map任务。然而,这么做实际上远远不够:顺序下 载来自不同主机(见前一节对HashComparator的介绍)URL相当浪费时 间。正因如此,Fetchermap任务使用多个工作线程同时处理数据下载 任务。

Hadoop使用MapRunner类来实现对输入数据记录的顺序处理。Fetcher 类实现自己的MapRunner类,它使用若干个线程并行处理输入记录。

先从这个作业的设置开始:

job.setSpeculativeExecution(false);
FilelnputFormat.addInputPath(job, "segment/crawl_generate"); 
job.setInputFormat(InputFormat.class); 
job.setMapRunnerClass(Fetcher.class);
FileOutputFormat.setOutputPath(job, segment); 
job.setOutputFormat(FetcherOutputFormat.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(NutchWritable.class);

首先,我们关闭推测执行(speculative execution)。我们不能同时让几个map 任务从同一个主机下载内容,因为这可能会打破宿主级的负载限制(如并发 请求数和每秒请求数)。 其次,我们使用自定义的InputFormat类防止Hadoop将输入数据分区进 一步切分为更小的块(分片)导致map任务的数量超过输入分区的数量。这 么做又一次保证我们可以控制宿主级的访问限制。 输出数据用自定义的OutputFormat类对象来存储,通过使用NutchWirtable 类的数据值创建几个输出map文件和顺序文件。NutchWritable类是 GenericWritable的子类,通过事先声明,它能传递几种不同Writable

类的实例对象。

Fetcher类实现MapRunner接口,我们把这个类设置为作业的MapRunner实现。相关代码如下:

public void run(RecordReader<Text, CrawlDatum> input,
    OutputCollector<Text, NutchWritable> output,
    Reporter reporter) throws IOException { 
    int threadCount = getConf().getlnt("fetcher.threads.fetch", 10);
    feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); 
    feeder.start();
    for (int i = 0; i < threadCount; i++) { // spawn threads
        new FetcherThread(getConf()).start();
    }
    do { // wait for threads to exit
        try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {} 
        reportStatus(reporter);
    } while (activeThreads.get() > 0);
}

Fetcher类提前读取许多输入记录数据,使用QueueFeeder线程把输入记录放入为每个主机建立的队列中。然后启动几个FetcherThread实例对象,它们将读取每个主机对应的数据队列这时QueueFeeder继续读取输入数据来填充这些队列。,每个FetcherThread都可以读取任意非空队列中 的数据项。

与此同时,map任务的主线程也在运行等待所有的线程完成它们的作业。 它定期向系统报告状态以保证Hadoop不会认为这个任务已经死掉并把它杀 掉。一旦所有项目处理完,等待过程结束,控制权返回给Hadoop,然后 Hadoop将会结束这个map任务。

4.索引器:使用自定义的OutputFormat类 

这个MapReduce应用程序的输出即不是序列化文件或也不是map文件,而 是Lucene索引。再提一下,因为MapReduce应用可能由几个reduce任务 组成,所以这个应用程序的输出可能包含几个不完整的Lucene索引。

Nutch Indxer工具处理来自CrawlDb, LinkDbNutch分区的信息(爬取状 态信息,解析状态,页面元数据和纯文本数据),因此对这个作业的设置程 序将包括多个输入路径:

FileInputFormat.addInputPath(job, crawlDbPath); 
FileInputFormat.addInputPath(job, linkDbPath);
// add segment data 
FileInputFormat.addlnputPath(job, "segment/crawl_fetch");
FileInputFormat.addInputPath(job,"segment/crawl_fetch");
 FileInputFormat.addlnputPath(job, "segment/crawl_fetch");
FileInputFormat.addInputPath(job,"segment/crawl_fetch");
job.setInputFormat(SequenceFilelnputFormat.class);
job.setMapperClass(Indexer.class); 
job.setReducerClass(Indexer.class);
FileOutputFormat.setOutputPath(job, indexDir); 
job.setOutputFormat(OutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LuceneDocumentWrapper.class);

分散存储在这些输入位置的对应于一个URL的所有记录需要合并起来新建 一个Lucene文档(将被加入索引)。

Indexer类通过Mapper把输入数据(不关心它的源数据以及实现类)简单地 封装到NutchWritable中,这样reduce阶段可能要使用不同的类来接收不同的源数据,但是它仍然能够为map和reduce阶段的输出值声明一个 输出值类(类似于NutchWritable)

Reducer方法遍历同一个键(URL)对应的所有值,解封数据 (fetch CrawlDatum, CrawlDb CrawlDatum, LinkDb Inlinks, ParseData 和 ParseText),并用这些信息构建一个Lucene文档,后者被 WritableLuceneDocumentWrapper对象封装并被收集。除了所有文本内 容外(纯文本数据或是元数据),这个文档也包含类似PageRank值的信息 (取自CrawlDb)Nutch使用这种数值来设置Lucene文档的权重值。

OutputFormat类的实现是这个工具最有意思的部分:

public static class OutputFormat extends
    FileOutputFormat<WritableComparable, LuceneDocumentWrapper〉 {
public RecordWriter<WritableComparable, LuceneDocumentWrapper> 
    getRecordWriter(final FileSystem fs,DobConf job,
        String name, final Progressable progress) throws IOException { 
    final Path out = new Path(FileOutputFormat.getOutputPath(job), name); 
    final IndexWriter writer = new IndexWriter(out.toString(), 
                    new NutchDocumentAnalyzer(job), true);
return new RecordWriter<WritableComparableJ LuceneDocumentWrapper>() { 
    boolean closed;
 public void write(WritableComparable key, LuceneDocumentWrapper value) 
    throws IOException { 
    // unwrap & index doc
    Document doc = value.get(); 
    writer.addDocument(doc); 
    progress.progress();
}
public void close(final Reporter reporter) throws IOException { 
    // spawn a thread to give progress heartbeats 
    Thread prog = new Thread() { 
        public void run() { 
            while (Iclosed) { 
                try {
                    reporter.setStatus("closing");
                    Thread.sleep(1000);
                } catch (InterruptedException e) { continue; } catch (Throwable e) { return; }
                try {
                    prog.start();
                    // optimize & close index writer.optimize();
                    writer.close(); 
                } finally {
                    closed = true;
                }
                }
            }
    }
}

当请求生成一个RecordWriter类的实例对象时,OutputFormat类通过 打开一个IndexWriter类对象新建一个Lucene索引。然后,针对reduce 方法中收集的每个新的输出记录,它解封LuceneDocumentWrapper对象中的Lucene文档,并把它添加到索引中。 

reduce任务结束的时候,Hadoop会设法关闭RecordWriter对象。本例 中,关闭的过程可能持续较长时间,因为我们想在关闭它之前进行索引优 化工作。在这段时间中,因为该任务已经没有任何进度更新,所以Hadoop 可能会推断其已经被挂起,然后它可能会尝试杀死这个任务。因此,我们 首先启动一个后台线程来传送进度更新消息,然后才开始索引优化工作。 一旦优化完成,我们便停止进度更新线程。现在索引完成创建、优化和停 止更新,它已经准备好被应用于任何捜索程序。

转载请注明:全栈大数据 » 16.3.3 Nutch系统利用Hadoop进行数据处理的精选实例

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址