整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

8.4. 边数据分布

hadoop 小红牛 46℃ 0评论

该程序通过气象站查找最高气温,因此mapper(StationTemperatureMapper)仅仅输出(气象站ID,气温)对。对于combiner,该程序重用MaxTemperatureReducer(参见第2章和第5章)来为map端的map输出分组获得最高气温。reducer(MaxTemperatureReducerWithStationLookup)则有所不同,它不仅要查找最髙气温,还需要根据缓存文件査找气象站名称。

该程序调用reducer的setup()方法来获取缓存文件;输入参数是文件的原始名称,文件的路径与任务的工作目录相同。

当文件无法整个放到内存中时,可以使用分布式缓存进行复制。MapFile采用在盘检索格式(参见4.5.2节),在这方面非常有用。由于MapFile是一组已定义的目录结构的文件,用户可以将这些文件整理成存档格式(JAR、ZIP、TAR或gzipped TAR),再用-archives选项将其加入缓存。

以下是输出的小片段,显示部分气象站的最髙气温值。

 

PEATS RIDGE WARATAH

372

STRATHALBVN RACECOU

410

SHEOAKS AWS

399

WANGARATTA AERO

409

MOOGARA

334

MACKAY AERO

331

 

4.2.2. 工作机制

当用户启动一个作业,Hadoop会把由-files、-archives和-libjars等选项所指定的文件复制到分布式文件系统(一般是HDFS)之中。接着,在任务运行之前,tasktracker将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为“本地化”了。从任务的角度来看,这些文件就已经在那儿了(它并不关心这些文件是否来自HDFS)。
此外,由-libjars指定的文件会在任务启动前添加到任务的类路径 (classpath)中。

tasktracker为缓存的文件各维护一个计数器来统计这些文件的被使用情况。当任务即将运行时,该任务所使用的所有文件的对应计数器值增1;当任务执行完毕之后,这些计数器值均减1。当相关计数器值为0时,表明该文件没有被任何任务使用,可以从缓存中移除。缓存的容量是有限的(默认10GB),因此需要经常删除无用的文件以腾出空间来装载新文件。缓存大小可以通过属性local.cache.size进行配置,以字节为单位。

尽管该机制并不确保在同一个tasktracker上运行的同一作业的后续任务肯定能在缓存中找到文件,但是成功的概率相当大。原因在于作业的多个任务在调度之后几乎同时开始运行,因此,不会有足够多的其他作业在运行而导致原始任务的文件从缓存中被删除。

文件存放在 tasktracker 的${mapred.local.dir}/taskTracker/archive目录下。但是应用程序不必知道这一点,因为这些文件同时以符号链接的方式指向任务的工作目录。

4.2.3. 分布式缓存API

由于可以通过GenericOptionsParser间接使用分布式缓存(如范例8-16所示),大多数应用不需要使用分布式缓存API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用API了。API包括两部分:将数据放到缓存中的方法(参见Job),以及从缓存中读取数据的方法(参见JobContext)。以下列举Job中可将数据放入到缓存中的相关方法:

public void addCacheFile(URI uri)

public void addCacheAnchive(URI uri)

public void setCacheFiles(URI[] files)

public void setCacheArchives(URI[] archives)

public void addFileToClassPath(Path file)

public void addArchiveToClassPath(Path archive)

public void createSymlink()

 

在缓存中可以存放两类对象:文件(files)和存档(achives)。文件被直接放置在任务节点上,而存档则会被解档之后再将具体文件放置在任务节点上。每种对象类型都包含三种方法:addCacheXXXX()、setCacheXXXXs()和addXXXXToClassPath()。其中,addCacheXXXX()方法将文件或存档添加到分布式缓存,setCacheXXXXs()方法将一次性向分布式缓存中添加一组文件或存档(之前调用所生成的集合将被替换),addXXXXToClassPath()方法将文件或存档添加到MapReduce任务的类路径。表8-7对上述API方法与GenericOptionsParser选项做了一个比较(参见表5-1)。

8-7.分布式缓存API

blob.png

add()和set()方法中的输入参数URI是指在作业运行时位于共享文件系统中的(一组)文件。而GenericOptionsParser选项所指定的文件(例如,-files)可以是本地文件,如果是本地文件的话,则
会被复制到默认的共享文件系统(一般是HDFS)。

 

这也是使用Java API和使用GenericOptionsParser的关键区别:

Java API的add()和set()方法不会将指定文件复制到共享文件系统中,但GenericOptionsParser会这样做。

Job的分布式缓存API方法还包括createSymlink()。前面提到,当前作业的一些文件会被本地化到任务节点,该方法则为所有本地化文件创建符号化链接。符号化链接的名称是由该文件URI的片段标识符(fragment identifier)决定。例如,由 URI hdfs://namenode/foo/bar#myfile所对应的文件是在任务的工作目录中的myfile文件。(范例8-8使用了该API)如果没有片段标识符,则无法创建符号化链接。当使用GenericOptionsParser将文件添加到分布式缓存之中时,会自动为这些文件创建符号化链接。

当使用本地作业时,并不会为分布式缓存中的文件创建符号化链接。因此,若想同时在本地或集群上运行作业时,需调用getLocalCacheFilesO 或getLocalCacheArchives()方法(后面会进一步讨论)。

另一部分是在Jobcontext类中的分布式缓存API。当用户想要访问分布式 缓存中的文件时,需要在map或者reduce任务代码中使用这部分API。

public Path[] getLocalCacheFilesO throws IOException;

public Path[] getLocalCacheArchives() throws IOException;

public Path[] getFileClassPaths();

public Path[] getArchiveClassPaths();

如果在分布式缓存中的文件已经在任务的工作目录中有符号化链接了,则 用户可以通过名称直接访问本地文件,如范例8-16所示。此外,也可以使
用 getLocalCacheFilesO和 getLocalCacheArchives()方法获取缓存中
的文件或者存档的引用。当处理存档时,将会返回一个包含解档文件的目 录。(相应的,用户也可以通过 getFileClassPaths()和
getArchiveClassPaths() 方法获取被添加到任务的类路径下的文件和存档。)

注意,文件将以“本地的” Path对象的形式返回。为了读取文件,用户需 要首先使用getLocal()方法获得一个Hadoop本地FileSystem实例。

用户也可以使用java.io.File API进行操作,请参考以下针对MaxTemperatureReducerWithStationLookup 的新版 setup()方法的代码片段。

@Override
protected void setup(Context context)
    throws IOException, InterruptedException {
    metadata = new NcdcStationMetadata();
    Path[] localPaths = context.getLocalCacheFilesO;
    if (localPaths.length == 0) {
        throw new FileNotFoundException("Distributed cache file not found.");
    }
File localFile = new File(localPaths[0].toString());
    metadata.initialize(localFile);
}

在使用旧版MapReduce API时,可以调用DistributedCache的静态方法,如下所示:

@Override
public void configure(JobConf conf) {
    metadata = new NcdcStationMetadata();
    try {
        Path[] localPaths = DistributedCache.getLocalCacheFiles(conf);
        if (localPaths.length == 0) {
            throw new FileNotFoundException("Distributed cache file not found.");
        }
        File localFile = new File(localPaths[0].toString());
        metadata.initialize(localFile);
    } catch (IOException e) {
        throw new RuntimeException(e);
}
}

转载请注明:全栈大数据 » 8.4. 边数据分布

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址