整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

Hadoop分布式文件系统

hadoop 小小明 64℃ 0评论

Hadoop分布式文件系统

当数据集的大小超过一台独立的物理计算机的存储能力时,就有必要对它 进行分区(partition)并存储到若干台单独的计算机上。管理网络中跨多台计 算机存储的文件系统称为分布式文件系统(distributed filesystem)。该系统架 构于网络之上,势必会引入网络编程的复杂性,因此分布式文件系统比普 通磁盘文件系统更为复杂。例如,使文件系统能够容忍节点故障且不丢失任何数据,就是一个极大的挑战。

Hadoop 有一个称为 HDFS 的分布式系统,即 Hadoop Distributed Filesystem。 在非正式文档或旧文档以及配置文件中,有时也简称为DFS,它们是一回 事儿。HDFS是Hadoop的旗舰级文件系统,同时也是本章的重点,但实际 上Hadoop是一个综合性的文件系统抽象,因此下面我们也将看到Hadoop 集成其他文件系统的方法(如本地文件系统和Amazon S3系统)。

1. HDFS的设计

HDFS以流式数据访问模式来存储超大文件,运行于商用硬件集群上。让我们仔细看看下面的描述。

  • 超大文件“超大文件”在这里指具有几百MB、几百GB甚至几 百TB大小的文件。目前已经有存储PB级数据的Hadoop集 群了。
  • 流式数据访问HDFS的构建思路是这样的:一次写入、多次读取 是最高效的访问模式。数据集通常由数据源生成或从数据源复制而 来,接着长时间在此数据集上进行各种分析。每次分析都将涉及该 数据集的大部分数据甚至全部,因此读取整个数据集的时间延迟比 读取第一条记录的时间延迟更重要。
  • 商用硬件Hadoop并不需要运行在昂贵且高可靠的硬件上。它是 设计运行在商用硬件(在各种零售店都能买到的普通硬件)的集群上的,因此至少对于庞大的集群来说,节点故障的几率还是非常高的。HDFS遇到上述故障时,被设计成能够继续运行且不让用户察觉到明显的中断。 同样,那些不适合在HDFS上运行&应用也值得研究。目前某些应用领域并不适合在HDFS上运行,不过以后可能会有所改进。
  • 低时间延迟的数据访问要求低时间延迟数据访问的应用,例如几十毫秒范围,不适合在HDFS上运行。记住,HDFS是为高数据吞 吐量应用优化的,这可能会以提高时间延迟为代价。目前,对于低延 迟的访问需求,HBase是更好的选择。
  • 大量的小文件由于namenode将文件系统的元数据存储在内存 中,因此该文件系统所能存储的文件总数受限于namenode的内存容量。根据经验,每个文件、目录和数据块的存储信息大约占150 字节。因此,举例来说,如果有一百万个文件,且每个文件占一个数据块,那至少需要300 MB的内存。尽管存储上百万个文件是可行的,但是存储数十亿个文件就超出了当前硬件的能力。
  • 多用户写入,任意修改文件HDFS中的文件可能只有一个writer,而且写操作总是将数据添加在文件的末尾。它不支持具有多个写入者的操作,也不支持在文件的任意位置进行修改。可能以后会支持这些操作,但它们相对比较低效。

2. HDFS的概念

2.1. 数据块

每个磁盘都有默认的数据块大小,这是磁盘进行数据读/写的最小单位。构建于单个磁盘之上的文件系统通过磁盘块来管理该文件系统中的块,该文件系统块的大小可以是磁盘块的整数倍。文件系统块一般为几千字节,而磁盘块一般为512字节。这些信息一一文件系统块大小——对于需要读/写文件的文件系统用户来说是透明的。尽管如此,系统仍然提供了一些工具(如df和fsck)来维护文件系统,由它们对文件系统中的块进行操作。

HDFS同样也有块(block)的概念,但是大得多,默认为64 MB。与单一磁盘上的文件系统相似,HDFS上的文件也被划分为块大小的多个分块 (chunk),作为独立的存储单元。但与其他文件系统’不同的是,HDFS中小于一个块大小的文件不会占据整个块的空间。如果没有特殊指出,本书中 提到的“块”特指HDFS中的块。

为何HDFS中的块如此之大?

HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置 得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的 时间•因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。

我们来做一个速算,如果寻址时间约为10 ms,而传输速率为100 MB/s,为了使寻址时间仅占传输时间的1%,我们要将块大小设置约为 100 MB。默认的块大小实际为64 MB,但是很多情况下HDFS使用128 MB的块设置。以后随着新一代磁盘驱动器传输速率的提升,块的大小将被 设置得更大。

但是这个数也不会设置得过大。MapReduce中的map任务通常一次只处理一个块中的数据,因此如果任务数太少(少于集群中的节点数量),作 业的运行速度就会比较慢。

对分布式文件系统中的块进行抽象会带来很多好处。第一个最明显的好处 是,一个文件的大小可以大于网络中任意一个磁盘的容量。文件的所有块 并不需要存储在同一个磁盘上,因此它们可以利用集群上的任意一个磁盘 进行存储。事实上,尽管不常见,但对于整个HDFS集群而言,也可以仅存储一个文件,该文件的块占满集群中所有的磁盘。

第二个好处是,使用抽象块而非整个文件作为存储单元,大太简化了存储 子系统的设计。简化是所有系统的目标,但是这对于故障种类繁多的分布 式系统来说尤为重要。将存储子系统控制单元设置为块,可简化存储管理 (由于块的大小是固定的,因此计算单个磁盘能存储多少个块就相对容易)。 同时也消除了对元数据的顾虑(块只是存储数据的一部分——而文件的元数据,如权限信息,并不需要与块一同存储,这样一来,其他系统就可以单 独管理这些元数据)。

不仅如此,块还非常适合用于数据备份进而提供数据容错能力和提高可用 性。将每个块复制到少数几个独立的机器上(默认为3个),可以确保在块、 磁盘或机器发生故障后数据不会丢失。如果发现一个块不可用,系统会从 其他地方读取另一个复本,而这个过程对用户是透明的。一个因损坏或机 器故障而丢失的块可以从其他候选地点复制到另一台可以正常运行的机器 上,以保证复本的数量回到正常水平。参见4.1节对数据完整性的讨论,进 一步了解如何应对数据损坏。同样,有些应用程序可能选择为一些常用的 文件块设置更高的复本数量进而分散集群中的读取负载。

与磁盘文件系统相似,HDFS中fsck指令可以显示块信息。例如,执行以下命令将列出文件系统中各个文件由哪些块构成:

% hadoop fsck / -files -blocks

2.2. namenode 和 datanode

HDFS集群有两类节点以管理者-工作者模式运行,即一个namenode(管理者) 和多个datanode(工作者)。namenode管理文件系统的命名空间。它维护着 文件系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久 保存在本地磁盘上:命名空间镜像文件和编辑日志文件。namenode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信 息,因为这些信息会在系统启动时由数据节点重建。

客户端(client)代表用户通过与namenode和datanode交互来访问整个文件系 统。客户端提供一个类似于POSIX(可移植操作系统界面)的文件系统接口, 因此用户在编程时无需知道namenode和datanode也可实现其功能。

datanode是文件系统的工作节点。它们根据需要存储并检索数据块(受客户

没有namenode,文件系统将无法使用。事实上,如果运行namenode服务 的机器毁坏,文件系统上所有的文件将会丢失,因为我们不知道如何根据 datanode的块重建文件。因此,对namenode实现容错非常重要,Hadoop 为此提供两种机制。

第一种机制是备份那些组成文件系统元数据持久状态的文件。Hadoop可以 通过配置使namenode在多个文件系统上保存元数据的持久状态。这些写操 作是实时同步的,是原子操作。一般的配置是,将持久状态写入本地磁盘 的同时,写入一个远程挂载的网络文件系统(NFS)。

另一种可行的方法是运行一个辅助namenode,但它不能被用作namenode。 这个辅助namenode的重要作用是定期通过编辑日志合并命名空间镜像,以 防止编辑日志过大。这个辅助namenode —般在另一台单独的物理计算机上 运行,因为它需要占用大量CPU时间与namenode相同容量的内存来执行 合并操作。它会保存合并后的命名空间镜像的副本,并在namenode发生故 障时启用。但是,辅助namenode保存的状态总是滞后于主节点,所以在主 节点全部失效时,难免会丢失部分数据。在这种情况下,一般把存储在 NFS上的namenode元数据复制到辅助namenode并作为新的主namenode 运行。

 

 

2.3. 联邦 HDFS

namenode在内存中保存文件系统中毎个文件和每个数据块的引用关系,这 意味着对于一个拥有大量文件的超大集群来说,内存将成为限制系统横向 扩展的瓶颈(参见9.4.2节)。在2.x发行版本系列中引入的联邦HDFS允许 系统通过添加namenode实现扩展,其中每个namenode管理文件系统命名 空间中的一部分。例如,一个namenode可能管理/user目录下的所有文件, 而另一个namenode可能管理/share目录下的所有文件。

 

在联邦环境下,每个namenode维护一个命名空间卷(namespace volume), 包括命名空间的源数据和在该命名空间下的文件的所有数据块的数据块 池。命名空间卷之间是相互独立的,两两之间并不相互通信,甚至其中一个namenode的失效也不会影响由其他namenode维护的命名空间的可用性。数据块池不再进行切分,因此集群中的datanode需要注册到每个 namenode,并且存储着来自多个数据块池中的数据块。

要想访问联邦HDFS集群,客户端需要使用客户端挂载数据表将文件路径 映射到namenode。该功能可以通过ViewFileSystem和viewfs: //URI进行配置和管理。

 

2.4. HDFS的高可用性

通过联合使用在多个文件系统中备份namenode的元数据和通过备用 namenode创建监测点能防止数据丢失,但是依旧无法实现文件系统的髙可用性。Namenode依旧存在单点失效(SPOF)的问题。如果namenode失效了,那么所有的客户端——包括MapReduce作业——均无法读、写或列 (list)文件,因为namenode是唯一存储元数据与文件到数据块映射的地方。 在这一情况下,Hadoop系统无法提供服务直到有新的namenode上线。

在这样的情况下,要想从一个失效的namenode恢复,系统管理员得启动一个拥有文件系统元数据副本的新的namenode,并配置datanode和客户端以便使用这个新的namenode。新的namenode直到满足以下情形才能响应服务:1)将命名空间的映像导入内存中;2)重做编辑日志;3)接收到足够多的 来自datanode的数据块报告并退出安全模式。对于一个大型并拥有大量文 件和数据块的集群,namenode的冷启动需要30分钟,甚至更长时间。

系统恢复时间太长,也会影响到日常维护。事实上,namenode失效的可能性非常低,所以在实际应用中计划系统失效时间就显得尤为重要。

Hadoop的2.x发行版本系列针对上述问题在HDFS中增加了对高可用性 (HA)的支持。在这一实现中,配置了一对活动-备用(active-standby) namenode。当活动namenode失效,备用namenode就会接管它的任务并开 始服务于来自客户端的请求,不会有任何明显中断。实现这一目标需要在架构上做如下修改。

  • namenode之间需要通过高可用的共享存储实现编辑日志的共享。

(在早期的高可用实现版本中,需要一个NFS过滤器来辅助实现,但是在后期版本中将提供更多的选择,比如构建于ZooKeeper之上 的BookKeeper这样的系统。)当备用namenode接管工作之后,它 将通读共享编辑日志直至末尾,以实现与活动namenode的状态同

  • datanode需要同时向两个namenode发送数据块处理报告,因为数 据块的映射信息存储在namenode的内存中,而非磁盘。
  • 客户端需要使用特定的机制来处理namenode的失效问题,这一机 制对用户是透明的。

 

在活动namenode失效之后,备用namenode能够快速(几十秒的时间)实现 任务接管,因为最新的状态存储在内存中:包括最新的编辑日志条目和最 新的数据块映射信息。实际观察到的失效时间略长一点(需要1分钟左右), 这是因为系统需要保守确定活动namenode是否真的失效了。

在活动namenode失效且备用namenode也失效的情况下,当然这类情况发 生的槪率非常低,管理员依旧可以申明一个备用namenode并实现冷启动。 这类情况并不会比非高可用(no-HA)的情况更差,并且从操作的角度讲这是 一个进步,因为上述处理已是一个标准的处理过程并植入Hadoop中。

 

故障切换与规避

一个称为故障转移控制器(failover_controller)的系统中有一个新实体管理着将活动namenode转移为备用namenode的转换过程。故障转移控制器是可插拔的,但其最初的实现是基于ZooKeeper的并由此确保有且仅有-个活 动namenode。每一个namenode运行着一个轻量级的故障转移控制器,其 工作就是监视宿主namenode是否失效(通过一个简单的心跳机制实现)并在 namenode失效时进行故障切换。

管理员也可以手动发起故障转移,例如在进行日常维护时。这称为“平稳 的故障转移”,因为故障转移控制器可以组织两个namenode有序切换 角色。

但在非平稳故障转移的情况下,无法确切知道失效namenode是否已经停止 运行。例如,在网速非常慢或者网络被分割的情况下,同样也可能激发故 障转移,但是先前的活动namenode依然运行着并且依旧是活动 namenode。高可用实现做了更进一步的优化,以确保先前活动的namenode 不会执行危害系统并导致系统崩溃的操作——该方法称为“规避” (fencing)。系统引入了一系列的规避机制,包括杀死namenode进程,收回 访问共享存储目录的权限(通常使用供应商指定的NFS命令),通过远程管理命令以屏蔽相应网络端口。诉诸的最后手段是,先前活动namenode可以 通过一个相当形象的称为STONITH(shoot the other node in the head)的技,术 进行规避,该方法主要通过一个特定的供电单元对相应主机进行断电操作:

客户端的故障切换通过客户端类库实现透明处理。最简单的实现是通过客 户端的配置文件实现故障切换的控制。HDFSURI使用一个逻辑主机名,该 主机名映射到一对namenode地址(在配置文件中设置),客户端类库会访问 每一个namenode地址直至处理完成。

 

 

 

2.5. 命令行接口

现在我们通过命令行交互来进一步认识HDFS。HDFS还有很多其他接口, 但命令行是最简单的,同时也是许多开发者最熟悉的。

我们先在一台机器上运 行HDFS。稍后介绍如何在集群上运行HDFS,以提供伸缩性与容错性。

在我们设置伪分布配置时,有两个属性项需要进一步解释。第一项是 fs.default.name,设置为hdfs://localhost/,用于设置Hadoop的默认文件系统。文件系统是由URI指定的,这里我们已使用hdfs URI来配置 HDFS为Hadoop的默认文件系统。HDFS的守护程序通过该属性项来确定 HDFS namenode的主机及端口。我们将在localhost默认端口 8020上运 行namenode。这样一来,HDFS客户端可以通过该属性得知namenode 在哪里运行进而连接到它。

第二个属性dfs.replication,我们设为1,这样一来,HDFS就不会按默认设置将文件系统块复本设为3。在单独一个datanode上运行时,HDFS 无法将块复制到3个datanode上,所以会持续给出块复本不足的警告。设 置这个属性之后,就不会再有问题了。

 

文件系统的基本操作

 

至此,文件系统已经可以使用了,我们可以执行所有常用的文件系统操 作,例如,读取文件,新建目录,移动文件,删除数据,列出目录,等 等。可以输入hadoop fs -help命令获取每个命令的详细帮助文件。

首先从本地文件系统将一个文件复制到HDFS:

% hadoop fs -copyFromLocal input/docs/quangle.txt

hdfs://localhost/user/tom/quangle.txt

 

该命令调用Hadoop文件系统的shell命令fs,后者提供了一系列子命令, 在这个例子中,我们执行的是-copyFromLocal。本地文件quangle.txt被复制到运行在localhost上的HDFS实例中,路径为/user/tom/quangle.txt。事实上,我们可以简化命令格式以省略主机的URI并使用默认设置,即省略 hdfs://localhost,因为该项已在 core-site.xml中指定。

% hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt

我们也可以使用相对路径,并将文件复制到HDFS的home目录中,本例中/user/tom :

% hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt

我们把文件复制回本地文件系统,并检査是否一致:

% hadoop fs -copyToLocal quangle.txt quangle.copy.txt

% md5 input/docs/quangle.txt quangle.copy.txt

MD5 (input/docs/quangle.txt) = al6f231da6b05e2ba7a339320e7dacd9

MD5 (quangle.copy.txt) = al6f231da6b05e2ba7a3B9320e7dacd9

MD5键值相同,表明这个文件在HDFS之旅中得以幸存并保存完整。

最后,我们看一下HDFS文件列表。我们新建一个目录看它在列表中是怎么显示的:

% hadoop fs -mkdir books

% hadoop fs -Is .

Found 2 items

drwxr-xr-x – tom supergroup 0 2009-04-02 22:41 /user/tom/books

-rw-r–r– 1 tom supergroup 118 2009-04-02 22:29 /user/tom/quangle.txt

 

返回的结果信息与Unix命令Is -l的输出结果非常相似,仅有细微差别。 第1列显示的是文件模式。第2列是这个文件的备份数(这在传统Unix文件系统是没有的)。由于我们在整个文件系统范围内设置的默认复本数为 1,所以这里显示的也都是1。这一列的开头目录为空,因为本例中没有使用复本的概念–目录作为元数据保存在namenode中,而非datanode中。

第3列和第4列显示文件的所属用户和组别。第5列是文件的大小,以字节为单位,目录为0。第6列和第7列是文件的最后修改日期与时间。最后,第8列是文件或目录的绝对路径。

2.6. HDFS中的文件访问权限

针对文件和目录,HDFS的权限模式与POSIX非常相似。

 

一共提供三类权限模式:只读权限(r)、写入权限(w)和可执行权限(x)。读取文件或列出目录内容时需要只读权限。写入一个文件或是在一个目 录上新建及删除文件或目录,需要写入权限。对于文件而言,可执行权限可以忽略,因为你不能在HDFS中执行文件(与POSIX不同),但在访问一个目录的子项时需要该权限。 每个文件和目录都有所属用户(owner)、所属组别(group)及模式(mode). 这个模式是由所属用户的权限、组内成员的权限及其他用户的权限组成的。

在默认情况下,可以通过正在运行进程的用户名和组名来唯一确定客户 端的标识•但由于客户端是远程的,任何用户都可以简单地在远程系统上以其名义新建一个账户来进行访问•因此,作为共享文件系统资源和 防止数据意外损失的一种机制,权限只能供合作团体中的用户使用,而 不能用于在一个不友好的环境中保护资源。注意,最新版的Hadoop已 经支持Kerberos用户认证,该认证去除了这些限制,详见第325页的 “安全”小节•但是,除了上述限制之外,为防止用户或自动工具及程 序意外修改或删除文件系统的重要部分,启用权限控制还是很重要的(这 也是默认的配置,参见dfs.permissions属性)。

如果启用权限检查,就会检查所属用户权限,以确认客户端的用户名与 所属用户是否匹配,另外也将检查所属组别权限,以确认该客户端是否 是该用户组的成员;若不符,则检查其他权限。

这里有一个超级用户(super-user)的概念,超级用户是namenode进程的 标识。对于超级用户,系统不会执行任何权限检查。

 

 

2.7. Hadoop文件系统

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。Java抽象类 org.apache.hadoop.fs.FileSystem 定义了 Hadoop 中的一个文件系统接口,并且该抽象类有几个具体实现,如表3-1所示。

表3-1 hadoop文件系统

文件系统 URI方案 Java实现(均包含在org.apache.hadoop 包中) 描述
Local file fs.LocalFileSystem 使用了客户端校验和的本地磁盘文件系统。没有使用校验和的本地磁盘文件系统RawLocalFileSystem。
HDFS hdfs hdfs.DistributedFileSystem Hadoop的分布式文件系统。将 HDFS设计成与MapReduce结合 使用,可以实现高性能
HFTP Hftp hdfs.hftpFileSystem 一个在HTTP上提供对HDFS只 读访问的文件系统(尽管名称为 HFTP,但与FTP无关)。通常与distcp结合使用(参见3.8节),以 实现在运行不同版本的HDFS的 集群之间复制数据
HSFTP hsftp hdfs.HsftpFileSyste 在HTTPS上提供对HDFS只读访问的文件系统(同上,与FTP无关)
WebHDFS Webhdfs 基于HTTP,对HDFS提供安全 读写访问的文件系统。WebHDFS 是为了替代HFTP和HSFTP而构 建的
HAR har fs.HarFileSystem 一个构建在其他文件系统之上用 于文件存档的文件系统。Hadoop 存档文件系统通常用于需要将 HDFS中的文件进行存档时,以 减少namenode内存的使用。
hfs

(云存储)

kfs CloudStore(其前身为Kosmos文件系统)是类似于HDFS或是谷歌的 GFS的文件系统,用C++写。详情 http://kosmosfs.sourceforge.net/
FTP ftp fs.ftp.FTPFileSystem 由FTP服务器支持的文件系统
S3

(原生)

S3n fs.s3native.NativeS3FileSystem 由Amazon S3支持的文件系统。
S3

(基于块)

S3 fs.sa.S3FileSystem 由Amazon S3支持的文件系统, 以块格式存储文件(与HDFS很相 似)以解决S3的5 GB文件大小 限制
分布式 RAID hdfs hdfs.DistributedRaidFileSystem RAID版本的HDFS是为了存档而设计的。针对HDFS中的每个文件,创建一个(更小的)校验文件, 并允许HDFS中的数据副本由3 降为.2,由此可以减少25%~30% 的存储空间,但是数据丢失的概 率保持不变。分布式RAID模式需要在集群中运行一个RaidNode后台进程
View viewfs viewfs.ViewFileSystem 针对其他Hadoop文件系统挂载的客户端表。通常用于联邦namenode建挂载点。

 

Hadoop对文件系统提供了许多接口,它一般使用URI方案来选取合适的 文件系统实例进行交互。举例来说,我们在前一小节中遇到的文件系统命令行解释器可以操作所有的Hadoop文件系统命令。要想列出本地文件系统根目录下的文件,可以输入以下命令:

% hadoop fs -Is file:///

尽管运行的MapReduce程序可以访问任何文件系统(有时也很方便),但在处理大数据集时,建议你还是选择一个有数据本地优化的分布式文件系统,如HDFS。

 

接口

Hadoop是用Java写的,通过Java API可以调用所有Hadoop文件系统的交互操作。例如,文件系统的命令解释器就是一个Java应用,它使用Java的FileSystem类来提供文件系统操作。其他一些文件系统接口也将在本小 节中做简单介绍。这些接口通常与HDFS —同使用,因为Hadoop中的其他文件系统一般都有访问基本文件系统的工具(对于FTP,有FTP客户端;对于S3,有S3工具,等等),但它们大多数都能用于任何Hadoop文件系统。

2.7.1. HTTP

通过HTTP来访问HDFS有两种方法:直接访问,HDFS后台进程直接服务 于来自客户端的请求;通过代理(一个对多个)访问,客户端通常使用 DistributedFileSystemAPI访问HDFS。这两种方法如图3-1所示。

 

在第一种情况中,由namenode内嵌的web服务器(运行在端口 50070上)提 供目录服务,目录列表以XML或者JSON格式存储,并且文件数据由 datanode的web服务器(运行在端口 50075上)以数据流的形式传输。

原来那个的HTTP接口 (HFTP和HSFTP)是只读的,但是新的WebHDFS实 现支持所有的文件系统操作,包括Kerberos认证。WebHDFS必须通过将dfs.webhdfs.enable选项设置为真后才能启用,并且只有启用它之后,你才可以使用webhdfs URI。

第二种方法依靠一个或者多个独立代理服务器通过HTTP访问HDFS。(由于代理服务是无状态的,因此可以运行在标准的负载均衡器之后。)所有到集群的网络通信都需要经过代理。使用代理服务器后可以使用更严格的防 火墙策略和带宽限制策略。通常情况下通过代理服务器,实现在不同数据 中心中部署的Hadoop集群之间的数据传输。

原来那个的HDFS代理服务器是只读的并且客户端 使.用HSFTP Filesystem实现(hsftp URI)进行访问。从1.0.0版本开始,实现了一个称为HttpFS的新代理服务器(具备读和写的能力),并且提供了 和WebHDFS的一样的HTTP接口,因此客户端可以通过webhdfs URI访 问这两类接口。 *

在规范正式定义了 WebHDFS中使用的HTTP REST API,以期望以后使用 非Java语言编写的客户端有望直接使用这个API。

2.7.2. C语言

Hadoop提供了一个名为libhdfs的C语言库,该语言库是Java, Filesystem接口类的一个镜像(它被写成访问HDFS的C语言库,但其实它可以访问全部Hadoop文件系统)。它使用Java原生接口(Java Native Interface,JNI)调用Java文件系统客户端。

这个C语言API与Java的API非常相似,但它的开发一般滞后于java API,因此目前一些新的特性可能还不支持。可以在Hapdoop发行包的 Libhdfs/docs/api目录找到CAPI的相关文档。

Hadoop中自带预先编译好的32位Linux的二进制编码,但对于其 他平台,需要按自行编译。

2.7.3. FUSE

用户空间文件系统(Filesystem in Userspace, FUSE)允许把按照用户空间实 现的文件系统整合成一个Unix文件系统。通过使用Hadoop的Fuse-DFS功 能模块,任何一个Hadoop文件系统(不过一般为HDFS)均可以作为一个标 准文件系统进行挂载。随后便可以使用Unix工具(如Is和cat)与该文件系 统交互,还可以通过任意一种编程语言调用POSIX库来访问文件系统。

Fuse-DFS是用C语言实现的,调用libhdfs并作为访问HDFS的接口。关于 如何编译和运行Fuse-DFS的文档,可以在Hadoop发行版目录中找到。

 

2.8. Java 接口

在本小节中,我们要深入探索Hadoop的Filesystem类:它是与Hadoop的某一文件系统进行交互的API。虽然我们主要聚焦于HDFS实例,即 DistributedFileSystem,但总体来说,还是应该集成Filesystem抽象类,并编写代码,使其在不同文件系统中可移植。这对测试你编写的程序 非常重要,例如,你可以使用本地文件系统中的存储数据快速进行测试。

2.8.1. 从Hadoop URL读取数据

要从Hadoop文件系统读取文件,最简单的方法是使用java.net.URL对象打开数据流,从中读取数据。具体格式如下:

InputStream in = null;

try {

in = new URL(“hdfs://host/path”).openStream();

// process in

} finally {

IOUtils.closestream(in);

}

 

 

让Java程序能够识别Hadoop的hdfs URL方案还需要一些额外的工作。这里采用的方法是通过FsUrlStreamHandlerFactory实例调用java.net.URL 对象的setURLStreamHandlerFactory方法。每个Java虚拟机只能调用—次这个方法,因此通常在静态方法中调用。这个限制意味着如果程序的其他组件–如不受你控制的第三方组件–已经声明一个URLStreamHandlerFactory实例,你将无法使用这种方法从Hadoop中读取数据。下一节将讨论另一种 备选方法。

范例3-1展示的程序以标准输出方式显示Hadoop文件系统中的文件,类似 于Unix中的cat命令。

范例3-1.通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统的文件

public class URLCat { •

static {

URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

}

public static void main(String[] args) throws Exception {

InputStream in = null;

try {

in = new URL(args[0]).openStream();

IOUtils.copyBytes(in, System.out, 4096, false);

} finally {

IOUtils.closestream(in);

}

}

 

 

 

我们可以调用Hadoop中简洁的IOUtils类,并在finally子句中关闭数据流,同时也可以在输入流和输出流之间复制数据(本例中为 System.out)。copyBytes方法的最后两个参数,第一个设置用于复制的 缓冲区大小,第二个设置复制结束后是否关闭数据流。这里我们选择自行 关闭输入流,因而System.out不必关闭输入流。

下面是一个运行示例:

% hadoop URLCat hdfs://localhost/user/tom/quangle.txt

On the top of the Crumpetty Tree The Quangle Wangle sat^

But his face you could not see,

On account of his Beaver Hat.

2.8.2. 通过FileSystem API读取数据

正如前一小节所解释的,有时根本不可能在应用中设置URLStreamHandlerFactory 实例。在这种情况下,需要使用FileSystem API来打开一个文件的输入流。

Hadoop文件系统中通过Hadoop Path对象(而非java.io.File对象,因为它的语义与本地文件系统联系太紧密)来代表文件。可以将路径视为一个 Hadoop 文件系统 URI,如 hdfs://localhast/use>-/tom/quang!e.txt。

FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用 的文件系统实例,这里是HDFS。获取FileSystem实例有下面这几个静态 工厂方法:

public static FileSystem get(Configuration conf) throws IOException

Public static FileSystem get(URI uri, Configuration conf) throws IOException

public static FileSystem get(URI uri, Configuration conf, String user) throws IOException

 

Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-silt.xml)。第一个方法返回的是默认文件系统 (在conf/Awe-j/re.xm/中指定的,如果没有指定,则使用默认的本地文件系 统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如 果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户 来访问文件系统,对安全来说是至关重要。(参见9.6节)。

在某些情况下,你可能希望获取本地文件系统的运行实例,此时你可以使用的getLocal()方法很方便地获取。

public static LocalFileSystem getLocal(Configuration conf) throws IOException

有了 FileSystem实例之后,我们调用open()函数来获取文件的输入流:

Public FSDatalnputStream open(Path f) throws IOException

Public abstract FSDatalnputStream open(Path f, int bufferSize) throws IOException

第一个方法使用默认的缓冲区大小4 KB。

最后,我们重写范例3-1,得到范例3-2。

 

范例3-2.直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件

public class FileSystemCat {

public static void main(String[] args) throws Exception {

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

InputStream in = null;

try {

in = fs.open(new Path(uri));

IOUtils.copyBytes(in, System.out, 4096, false);

} finally {

IOUtils.closeStream(in);

}

}

}

 

程序运行结果如下:

% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt

On the top of the Crumpetty Tree The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

 

FSDatalnputStream对象

实际上,FileSystem对象中的open()方法返回的是FSDatalnputStream对象,而不是标准的java.io类对象。这个类是继承了 java.io.DatalnputStream接口的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。

public class FSDataInputStream extends DataInputStream

implements Seekable, PositionedReadable{

// implementation elided

}

Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的査询方法:

public interface Seekable {

void seek(long pos) throws IOException;

long getPos() throws IOException;

@InterfaceAudience.Private

boolean seekToNewSource(long targetPos) throws IOException;

}

 

调用seek()来定位大于文件长度的位置会引发IOException异常。与 java.io.InputStream的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

范例3-3为范例3-2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后,定位到文件的起始位置再次以流方式读取该文件。

 

范例3-3.使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次

public class FileSystemCat {

public static void main(String[] args) throws Exception {

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

FSDataInputStream in = null;

try {

in = fs.open(new Path(uri));

IOUtils.copyBytes(in, System.out, 4096, false);

in.seek(0); // go back to the start of the file

IOUtils.copyBytes(in, System.out, 4096, false);

} finally {

IOUtils.closeStream(in);

}

}

}

 

在一个小文件上运行的结果如下:

 

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt

On the top of the Crumpetty Tree The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

On the top of the Crumpetty Tree The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

 

FSDatalnputStream 类也实现了 PositionedReadable 接口,从一个指定偏移量处读取文件的一部分:

 

public interface PositionedReadable {

 

public int read(long position, byte[] buffer, int offset, int length)

throws IOException;

 

public void readFully(long position, byte[] buffer, int offset, int length)

throws IOException;

 

public void readFully(long position, byte[] buffer) throws IOException;

}

 

read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检査这个值,它有可能小于指定的length长度。 readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。

所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDatalnputStrean并不是为并发访问设计的,因此最好为此新建多个实例),因此它们提供了在读取文件——可能是元数据——的主体时,访问文件其他部分的便利方法。事实上,这只是按照以下模式实现的Seekable接口。

最后务必牢记,seek()方法是一个相对髙开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式(如使用MapReduce),而非执行大量 seek()方法。

 

2.9. 写入数据

Filesystem类有一系列新建文件的方法。最简单的方法是给准备建的文件指定一个Path对象,然后返回一个用于写入数据的输出流:

public FSDataOutputStream create(Path f) throws IOException

此方法有多个重载版本,允许我们指定是否需要强制覆盖现有的文件、文 件备份数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。

create()方法能够为需要写入且当前不存在的文件创建父目录。尽 管这样很方便,但有时并不希望这样。如果希望父目录不存在就导 致文件写入失败,则应该先调用exists()方法检査父目录是否存在。

 

还有一个重载方法Progressable用于传递回调接口,如此一来,可以把数据写入datanode的进度通知给应用:

package org.apache.hadoop.util;

public interface Progressable {

public void progress();

}

另一种新建文件的方法是使用append()方法在一个已有文件末尾追加数据(还有其他一些重载版本):

public FSDataOutputStream append(Path f) throws IOException

这样的追加操作允许一个writer打开文件后在访问该文件的最后偏移量处追加数据。有了这个API,某些应用可以创建无边界文件,例如,应用可以在关闭日志文件之后继续追加日志。该追加操作是可选的,并非所有 Hadoop文件系统都实现了该操作。例如,HDFS支持追加,但S3文件系统就不支持。

范例3-4显示了如何将本地文件复制到Hadoop文件系统。每次Hadoop调用progress()方法时——也就是每次将64 KB数据包写入datanode管线后——打印一个时间点来显示整个运行过程。注意,这个操作并不是通过 API实现的,因此Hadoop后续版本能否执行该操作,取决于该版本是否修改过上述操作。API只让你知道到“正在发生什么事情”。

范例3-4 将本地文件复制到Hadoop文件系统

public class FileCopyWithProgress {

public static void main(String[] args) throws Exception {

String localSrc = args[0];

String dst = args[1];

InputStream in=new BufferedInputStream(new FileInputStream(localSrc));

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(dst), conf);

OutputStream out = fs.create(new Path(dst),new Progressable() {

@Override

public void progress() {

System.out.print(“.”);

}

});

IOUtils.copyBytes(in, out, 4096, true);

}

}

 

典型应用如下:

% hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/1400-8.txt

……

目前,其他Hadoop文件系统写入文件时均不调用progress()方法。后面几章将展示进度对MapReduce应用的重要性。

 

FSDataOutputStream 对象

 

FileSystem 实例的 create()方法返回 FSDataOutputStream 对象,与 FSDatalnputStream类相似,它也有一个查询文件当前位置的方法:

package org.apache.hadoop.fs;

 

public class FSDataOutputStream extends DataOutputStream

implements Syncable, CanSetDropBehind {

 

public long getPos() throws IOException {

// implementation elided

}

 

// implementation elided

 

}

 

但与 FSDatalnputStream 类不同的是,FSDataOutputStream 类不允许在文件中定位。这是因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据。换句话说,它不支持在除文件末尾之外的其他位置进行写入,因此,写入时定位就没有什么意义。

 

2.10. 目录

Filesystem实例提供了创建目录的方法: ‘

public boolean mkdirs(Path f) throws IOException

这个方法可以一次性新建所有必要但还没有的父目录,就像java.io.File 类的mkdirs()方法。如果目录(以及所有父目录)都已经创建成功,则返回 true。

通常,你不需要显式创建一个目录,因为调用create()方法写入文件时会 自动创建父目录。

2.11. 查询文件系统

1.文件元数据:FileStatus

任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件和目录相关信息的功能。FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、复本、修改时间、所有者以及权限信息。

FileSystem的getFileStatus()方法用于获取文件或目录的 FileStatus对象。范例3-5显示了它的用法。

范例3-5.展示文件状态信息

import java.io.FileNotFoundException;

import java.io.IOException;

import java.io.OutputStream;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.junit.After;

import org.junit.Before;

import org.junit.Test;

 

public class ShowFileStatusTest {

private FileSystem fs;

 

@Before

public void setUp() throws IOException {

Configuration conf = new Configuration();

fs = FileSystem.get(conf);

OutputStream out = fs.create(new Path(“/dir/file”));

out.write(“content”.getBytes(“UTF-8”));

out.close();

}

 

@After

public void tearDown() throws IOException {

if (fs != null) {

fs.close();

}

}

 

@Test(expected = FileNotFoundException.class)

public void throwsFileNotFoundForNonExistentFile() throws IOException {

fs.getFileStatus(new Path(“no-such-file”));

}

 

@Test

public void fileStatusForFile() throws IOException {

Path file = new Path(“/dir/file”);

FileStatus stat = fs.getFileStatus(file);

assertThat(stat.getPath().toUri().getPath(), is(“/dir/file”));

assertThat(stat.isDir(), is(false));

assertThat(stat.getLen(), is(7L));

assertThat(stat.getModificationTime(),is(lessThanOrEqualTo(System.currentTimeMillis())));

assertThat(stat.getReplication(), is((short) l));

assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));

assertThat(stat.getOwner(), is(“tom”));

assertThat(stat.getGroup(), is(“supergroup”));

assertThat(stat.getPermission().toString(), is(“rw-r–r–“));

}

 

@Test

public void fileStatusForDirectory() throws IOException {

Path dir = new Path(“/dir”);

FileStatus stat = fs.getFileStatus(dir);

assertThat (stat. getPath().toUri().getPath(), is(“/dir/file”));

assertThat(stat.isDir(), is(true));

assertThat(stat.getLen(), is(0L));

assertThat(stat.getModificationTime()>

is(lessThanOrEqualTo(System.currentTimeMillis()))){

assertThat(stat.getReplication(), is((short) 0));

assertThat(stat.getBlockSize(), is(0L));

assertThat(stat.getOwner(), is(“tom”));

assertThat(stat.getGroup()> is(“supergroup”));

assertThat(stat.getPermission().toString(), is(“rwxr-xr-x”));

}

}

}

 

 

如果文件或目录均不存在,会抛出一个FileNotFoundException异常。 但如果只是想检査文件或目录是否存在,那么调用exists()方法会更 方便:

public boolean exists(Path f) throws IOException

列出文件 查找一个文件或目录相关的信息很实用,但通常还需要能够列出目录中的

 

内容。这就是Filesystem的listStatus()方法的功能:

public FileStatus[] listStatus(Path f) throws IOException

public FileStatusf] listStatus(Path i, PathFilter .filter) throws IOException

public FileStatus[] listStatus(Path[] files) throws IOException

public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

当传入的参数是一个文件时,它会简单转变成以数组方式返回长度为i的 FileStatus对象。当传入参数是一个目录时,则返回0或多个 FileStatus对象,表示此目录中包含的文件和目录。 它的重载方法允许使用PathFilter来限制匹配的文件和目录——可以参见 3.5.5节提供的例子。最后,如果指定一组路径,其执行结果相当于依次轮 流传递每条路径并对其调用listStatus()方兹,再将FileStatus对象数 组累积存入同一数组中,但该方法更为方便。这从文件系统树的不同分支 构建输入文件列表时,这是很有用的。范例3-6简单显示了这个方法。注 意Filelltil中stat2Paths()方法的使用,它将一个FileStatus对象数组 转换为一个Path对象数组。

范例3-6.显示Hadoop文件系统中一组路径的文件信息

public class ListStatus {

public static void main(String[] args) throws Exception { String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) { paths[i] = new Path(args[i]);

}

FileStatus[] status = fs.listStatus(paths);

Path[] listedPaths = FileUtil.stat2Paths(status); for (Path p : listedPaths) {

System•out•println(p);

我们可以用这个程序来显示一组路径集目录列表的并集:

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom

hdfs://localhost/user

hdfs://localhost/user/tom/books

hdfs://localhost/user/tom/quangle.txt

 

  1. 文件模式

在单个操作中处理一批文件是一个很常见的需求。例如,一个用于处理日 志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文 件。在一个表达式中使用通配符来匹配多个文件是比较方便的,无需列举 每个文件和目录来指定输入,该操作称为“通配”(globbing)。Hadoop为执 行通配提供了两个FileSystem方法:

public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

globStatus()方法返回与其路径匹配于指定模式的所有文件的Fiiestatus 对象数组,并按路径排序。PathFilter命令作为可选项可以进一步对匹配 结果进行限制。

Hadoop支持的通配符与Unix 的相同(参见表3-2)。

表3-2.通配符及其含义

通配符 名称 匹配

星号 匹配0或多个字符
? 问号 匹配单一字符
[ab] 字符类 匹配{a, b}集合中的一个字符
[Aab] 非字符类 匹配非{a, b}集合中的一个字符
[a-b] 字符范围 匹配一个在{a,b}范围内的字符(包括 ab), a在字典顺序上要小于或等于b
[Aa-b] 非字符范围 匹配一个不在{a,b}范围内的字符(包 括ab),a在字典顺序上要小于或等 于b
{a,b} 或选择 匹配包含a或b中的一个的表达式
\c 转义字符 匹配元字符c

 

 

假设有日志文件存储在按日期分层组织的目录结构中。如此一来,2007年 最后一天的日志文件就会保存在名为/2007/12/31的目录中。假设整个文件 列表如下所示:

 

I——2007/

I L—12/

I I,——30/

I 1 31/

1——2008/

一些文件通配符及其扩展如下所示。

通配符 扩展 > ‘•,免、:,*、.以 K、y:沒餘足丈為,康
r /2007/2008
/*/* /2007/12/2008/01
/*/12/* /2007/12/30/2007/12/31
/200? /2007/2008
/200[78] /2007/2008
/200[7-8] /20G7/2008
/200[八01234569] /2007/2008
/*/*/{31,01} /2007/12/31/2008/01/01
/*/*/3{0,1> /2007/12/30/2007/12/31
/*/{12/31,01/01} /2007/12/31/2008/01/01

 

 

  1. PathFilter 对象

通配符模式并不总能够精确地描述我们想要访问的文件集。比如,使用通 配格式排除一个特定的文件就不太可能。FileSystem中的listStatus() 和globStatus()方法提供了可选的PathFilter对象,以编程方式控制通 配符:

package org.apache.hadoop.fs;

public interface PathFilter { boolean accept(Path path);

}

PathFilter 与 java.io.FileFilter —样,是 Path 对象而不是 File 对象。

范例3-7显示了 PathFilter用于排除匹配正则表达式的路径。

范例3-7. PathFilter,用于排除匹配正则表达式的路径

public class RegexExcludePathFilter implements PathFilter {

 

public RegexExcludePathFilter(String regex) { this.regex = regex;

>

public boolean accept(Path path) {

return !path.toString().matches(regex);

这个过滤器只传递不匹配于正则表达式的文件。在通配符选出一组需要包 含的初始文件之后,过滤器可优化其结果。如下示例将扩展到/2007/12/30:

fs.globStatus(new Path(,72007/*/*”)> new RegexExcludeFilter(“A.*/2007/12/31$”))

过滤器由Path表示,只能作用于文件名。不能针对文件的属性(例如创建 时间)来构建过滤器。但是,通配符模式和正则表达式同样无法对文件属性 进行匹配。例如,如果将文件存储在按照日期排列的目录结构中(如前一节 中讲述的那样),则可以根据Pathfilter在给定时间范围内选出文件。

3.5.6删除数据

使用FileSystem的delete()方法可以永久性删除文件或目录。

public boolean delete(Path f, boolean recursive) throws IOException

如果f是一个文件或空目录,那么recursive的值就会被忽略。只有在 recrusive值为true时,非空目录及其内容才会被删除(否则会抛出IOException 异常)。

3.6数据流 _

3.6.1剖析文件读取

为了了解客户端及与之交互的HDFSnamenodedatanode之间的数据流是什 么样的,我们可参考图3-2,该图显示了在读取文件时事件的发生顺序。

客户端通过调用FileSyste对象的open()方法来打开希望读取的文件, 对于HDFS来说,这个对象是分布式文件系统(图3-2中的步骤1)的一个实 例。DistributedFileSystem通过使用RPC来调用namenode,以确定文 件起始块的位置(步骤2)。对于每一个块,namenode返回存有该块副本的

 

datanode地址。此外,这些datanode根据它们与客户端的距离来排序(根据 集群的网络拓扑;参见3.6.1节的的补充材料“网络拓扑与Hadoop”)。如 果该客户端本身就是一个datanode (比如,在一个MapReduce任务中),并 保存有相应数据块的一个副本本时,该节点就会从本地datanode读取数据 (参见图3-2)。

 

 

图3-2.客户端读取HDFS中的数据

DistributedFileSystem 类返回一个 FSDatalnputStream 对象(一个支持

文件定位的输入流)给客户端并读取数据。FSDatalnputStream类转而封 装 DFSInputStr’eam 对象,该对象管理着 datanode 和 namenode 的 I/O

接着,客户端对这个输入流调用readO方法(步骤3)。存储着文件起始几 个块的datanode地址的DFSInputStream随即连接距离最近的datanode。 通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端 (步骤4)。到达块的末端时,DFSInputStream关闭与该datanode的连接, 然后寻找下一个块的最佳datanode(步骤5)。客户端只需要读取连续的流, 并且对于客户端都是透明的。

客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新 建连接的顺序读取的。它也会根据需要询问namenode来检索下一批数据块 的datanode的位置。一旦客户端完成读取,就对FSDatalnputStream调用 close()方法(步骤6)。

在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错

 

误,会尝试从这个块的另外一个最邻近datanode读取数据。它也记住那个 故障datanode,以保证以后不会反复读取该节点上后续的块。 DFSInputStream也会通过校验和确认从datanode发来的数据是否完整。 如果发现有损坏的块,就在DFSInputStreatn试图从其他datanode读取其复 本之前通知namenode

这个设计的一个重点是,namenode告知客户端每个块中最佳的datanode, 并让客户端直接连接到该datanode检索数据。由于数据流分散在集群中的 所有datanode,所以这种设计能使HDFS可扩展到大量的并发客户端。同 时namenode只需要响应块位置的请求(这些信息存储在内存中,因而非常 髙效),无需响应数据请求,否则随着客户端数量的增长,namenode会很快成 为瓶颈。

网络拓扑与Hadoop

在本地网络中,两个节点被称为“彼此近邻”是什么意思?在海量数据 处理中,其主要限制因素是节点之间数据的传输速率——带宽很稀缺。

这里的想法是将两个节点间的带宽作为距离的衡量标准。

不用衡量节点之间的带宽——实际上很难实现(它需要一个稳定的集群, 并且在集群中两两节点对数量是节点数量的平方)一Hadoop为此采用一 个简单的方法:把网络看作一棵树,两个节点间的距离是它们到最近共 同祖先的距离总和。该树中的层次是没有预先设定的,但是相对于数据 中心、机架和正在运行的节点,通常可以设定等级。具体想法是针对以 下每个场景,可用带宽依次递减:

  • 同一节点上的进程
  • 同一机架上的不同|节点
  • 同一数据中心中不同机架上的节点
  • 不同数据中心中的节点®

例如,假设有数据中心W机架r/中的节点/z /。该节点可以表示为 /以/r/M八利用这种标记,这里给出四种距离描述:

  • distance(/dl/rl/nl, /dl/rl/nl)=0(^\一节点上的进程)

(f>在本书写作期间,Hadoop依旧不适合跨数据中心运行。

 

  • distance(/dl/rl/nl,/dl/rl/n2)=2(同一机架上的不同节点)
  • distance(/dl/rl/nl,/dl/r2/n3)=4(同一数据中心中不同机架上的 节点)
  • distance(/dl/rl/nl,/d2/r3/n4)=6(不同数据中心中的节点)

示意图参见图3-3(数学爱好者会注意到,这是一个测量距离的例子)。最 后,我们必须意识到Hadoop无法自行定义网络拓扑结构。它需要我们能 够理解并辅助定义,我们将在9.1.1节的“网络拓扑”中讨论如何配置网 络拓扑•不过在默认情况下,假设网络是扁平化的只有一层——或换句话 说,所有节点都在同一数据中心的同一机架上。_规糢小的集群可能如 此,不需要进一步配置。 •

 

图3-3. Hadoop中的网络距离

 

3.6.2剖析文件写入

接下来我们看看文件是如何写入HDFS的。尽管比较详细,但对于理解数 据流还是很有用的,因为它清楚地说明了 HDFS的•致模型。

我们要考虑的情况是如何新建一个文件,把数据写入该文件,最后关闭该 文件。参见图3-4。

客户端通过对DistributedFileSystem对象调用create()函数来新建文 件(图 3-4 中的步骤 1)。DistributedFileSystem 对 namenode 创建•个 RPC调用,在文件系统的命名空间中新建一个文件,此时该文件中还没有

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

图3-4.客户端将数据写入HDFS

相应的数据块(步骤2)。namenode执行各种不同的检査以确保这个文件不存 在以及客户端有新建该文件的权限。如果这些检査均通过,namenode就会 为创建新文件记录一条记录;否则,文件创建失败并向客户端抛出一个 IOException 异常。DistributedFileSystem 向客户端返回一个 FSDataOutputStream对象,由此客户端可以开始写人数据。就像读取事 件一样,FSDataOutputStream 封装一个 DFSoutPutstream 对象,该对象 负责处理datanodenamenode之间的通信。

在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的数据 包,并写入内部队列,称为“数据队列”(dataqueue)。DataStreamer处理 数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块 来存储数据复本。这一组datanode构成一个管线——我们假设复本数为3, 所以管线中有3个节点。DataStreamer将数据包流式传输到管线中第1个 datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。 同样,第2个datanode存储该数据包并且发送给管线中的第3个(也是最后 一个)datanode (步骤 4)。

DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确 认回执,称为“确认队列(ack queue)。收到管道中所有datanode确认信

 

息后,该数据包才会从确认队列删除(步骤5)。 如果在数据写入期间datanode发生故障,则执行以下操作(对写入数据的客 户端是透明的)。首先关闭管线,确认把队列中的所有数据包都添加回数据 队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据 包。为存储在另一正常datanode的当前数据块指定一个新的标识,并将该 标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数 据块。从管线中删除故障数据节点并且把余下的数据块写入管线中另外两 个正常的的datanodenamenode注意到块复本量不足时,会在另一个节点 上创建一个新的复本。后续的数据块继续正常接受处理。

在一个块被写入期间可能会有多个dataru^de同时发生故障,但非常少见。 只要写入了 dfs.replication.min的复本数(默认为1),写操作就会成 功,并且这个块可以在集群中异步复制,直到达到其目标复本数 (dfs. replication 的默认值为 3)。

客户端完成数据的写入后,对数据流调用close()方法(步骤6)。该操作将 剩余的所有数据包写入datanode管线,并在联系到namenode且发送文件写 入完成信号之前,等待确认(步骤7)。namenode已经知道文件由哪些块组成 (通过Datastpeamer请求分配数据块),所以它在返回成功前只需要等待数

据块进行最小量的复制。

复本怎么放

namenode如何选择在哪个datanode存储复本(replica)?这里需要对可靠 性、写入带宽和读取带宽进行权衡。例如,把所有复本都存储在一个节 点损失的写入带宽最小,因为复制管线都是在同一节点上运行,但这并 不提供真实的冗余(如果节点发生故障,那么该块中的数据会丢失)。同 时,同一机架上服务器间的读取带宽是很高的。另一个极端,把复本放 在不同的数据中心可以最大限度地提高冗余,但带宽的损耗非常大。即 使在同一数据中心(到目前为止,所有Hadoop集群均运行在同一数据中 心内),也有许多不同的数据布局策略。其实,在发布的Hadoop 0.17.0 版中改变了数据布局策略来辅助保持数据块在集群内分布相对均匀(第 350页的“均衡器”详细说明了如何保持集群的均衡)。在丨.x之后的发 行版本,可即时选择数据块的布局策略。

Hadoop的默认布局策略是在运行客户端的节点上放第〗个复本(如果客

 

户端运行在集群之外,就随机选择一个节点,不过系统会避免挑选那些 存储太满或太忙的节点)。第2个复本放在与第一个不同且随机另外选择 的机架中节点上(离架)•第3个复本与第2个复本放在同一个机架上,且 随机选择另一个节点。其他复本放在集群中随机选择的节点上,不过系 统会尽量避免在同一个的机架上放太多复本。

一旦选之复本的放置位置,就根据网络拓扑创建一个管线。如果复本数 为3,则有图3-5所示的管线。

总的来说,这一方法不仅提供很好的稳定性(数据块存储在两个机架中)并 实现很好的负栽均衡,包括写入带宽(写入操作只需要遍历一个交换机)、 读取性能(可以从两个机架中选择读取)和集群中块的均匀分布(客户端只 在本地机架上写入一个块)。

 

 

 

 

 

 

 

 

 

 

 

 

图3-5. —个典型的复本管线

3.6.3 一致模型

文件系统的一致模型(coherency model)描述了文件读/写的数据可见性。 HDFS为性能牺牲了一些POSIX要求,因此一些操作与你期望的可能 不同。

新建一个文件之后,它能在文件系统的命名空间中立即可见,如下所示:

Path p = new Path(“p”);

Fs.create(p);

 

assertThat(fs.exists(p),is(true));

但是,写入文件的内容并不保证能立即可见,即使数据流已经刷新并存 储。所以文件长度显示为〇:

Path p = new Path(“p”);

OutputStream out = fs.create(p);

out.write(“content”.getBytes(“UTF-8“));

out.flush();

assertThat(fs.getFileStatus(p).getLen(is(0L));

当写入的数据超过一个块后,第一个数据块对新的reader就是可见的。之 后的块也不例外。总之,当前正在写入的块对其他reader不可见。

HDFS提供一个方法来使所有缓存与数据节点强行同步,即对 FSDataOutputStream调用sync()方法9sync()方法返回成功后,对 所有新的reader而言,HDFS能保证文件中到目前为止写入的数据均到达所 有datanode的写入管道并且对所有新的reader均可见:®

Path p = new Path(“p”);

FSDataOutputStream out = fs.create(p); out.write(“content”.getBytes(“UTF-8“))y out.flush();

out.sync(); J .

assertThat(fs.getFileStatus(p).getLen(), is(((long) “content”.length())));

该操作类似于POSIX中的fsync系统调用,该调用提交的是一个文件描述 符的缓冲数据。例如,利用标准Java API将数据写入本地文件,我们能够 在刷新数据流且同步之后看到文件内容:

FileOutputStream out = new FileOutputStream(localFile)j out.write(“content”.getBytes(“UTF-8”)); out.flush(); // flush to operating system out.getFD().sync(); // sync to disk

assertThat(localFile.length(), is(((long) “content”.length())));

HDFS中关闭文件其实还隐含执行sync()方法:

Path p = new Path(“p”);

OutputStream out = fs.create(p);

out.write(“content”.getBytes(“UTF-8”));

out.close();

assertThat(fs.getFileStatus(p).getLen()^ is(((long) “content”.length())));

①在HadcK^p l.x之后的版本,sync()方法被丢弃了,进而采用等价的hflushO方法。 另外还增加了一个hsync()方法,确保操作系统刷新数据到磁盘(类似于POS1X的 fsync方法)。但在本书写作期间,此方法还没有实现,只有hflush()方法。

82 第3章

 

对应用设计的重要性 这个一致模型和设计应用程序的具体方法息息相关。如果不调用sync()方 法,就要准备好在客户端或系统发生故障时可能会丢失数据块。对很多应 用来说,这是不可接受的,所以需要在适当的地方调用sync()方法,例如 在写入一定的记录或宇节之后。尽管sync()操作被设计成尽量减少HDFS 负载,但它有许多额外的开销,所以在数据鲁棒性和吞吐量之间就会有所 取舍。怎样权衡与具体的应用相关,通过设置不同调用sync()方法的频率 来衡量应用程序的性能,最终找到一个合适的频率。

3.7通过Flume和Sqoop导入数据

不需要重新写一个应用程序来将数据导入HDFS中,更值得考虑的是使用 —些现成的芏具将数据导入,因为这些工具已经涵盖了很多常用的需求。

Apache FIume(A//p://z>?cwZ^/or.是一个将大规模流数据导入 HDFS的工具。最典型的应用是从另外一个系统中收集日志数据——例如, 银行的网络服务器——并实现在HDFS中的聚集操作以便用于后期的分析 操作。Flume能够支持大量的数据源,其中一些通常用于包含tail(如同 Unixtail —样,通过管道的方式将本地文件写入F1画e中),syslogapache log4j(允许Java应用通过Flume将事件写入HDFS中的文件)的 系统。

Flume节点允许以任何拓扑方式进行组织。典型配置是在每个源机器上(例 如每个Web服务器)运行一个Flume节点,通过多个层级的聚合节点,最后 将数据存入HDFS中。 •

Flume提供了不同级别的数据投递可靠性,例如:最大努力投递,该级别不 能容忍任何Flume节点失效,端到端投递,该级别能确保当源节点和HDFS 节点之间有多个Flume节点失效的情况下数据成功投递。

另一方面Apache Sqoop(A//p://叫oop.apac&.org/)是为了将数据从结构化存 储设备批量导入HDFS中设计的,例如关系数据库。Sqoop的应用场景,是 组织将白天生产的数据库中的数据在晚间导入Hive数据仓库中进行分 析。第15章将详细介绍Sqoop

 

3.8通过distcp并行复制

前面着重介绍单线程访问的HDFS访问模型。例如,通过指定文件通配 符,可以对一组文件进行处理,但是为了提高性能,需要写一个程序来并 行处理这些文件。Hadoop有一个有用的出wcp分布式复制程序,该程序可 以从Hadoop文件系统间复制大量数据,也可以将大量的数据复制到 Hadoop 中 0

的典型应用场景是在两个HDFS集群之间传输数据。如果两个集群运 行相同版本的Hadoop,就非常适合使用/zc/yi方案:

% hadoop distcp hdfs://namenodel/foo hdfs :7/namenode2/bar

这行指令把第一个集群//〇〇目录(及其内容)复制到第二个集群的目录 下,所以第二个集群最后的目录结构是如果/ftar不存在,则新建 一个。也可以指定多个源路径,并把所有路径都复制到目标路径下。注 意,源路径必须是绝对路径。

在默认情况下,出会跳过目标路径下已经存在的文件,但可以通过- overvvr^e选项覆盖现有的文件。也可以通过-Mpda/e选项来选择有改动的 文件。

使用-overwrite和-update选项中任意一个(或两个)需要改变源路 径和目标路径的解释方式。这里最好用一个例子来说明。如果改变 先前例子中第一个集群/f〇〇子树下的一个文件,就会进行下面的命 令将修改同步到第二个集群上:

% hadoop distcp -update hdfs://namenodel/foo hdfs://namenode2/bar/foo

因为源目录下的内容已被复制到目标目录下,所以需要在目标路径 中添加额外的子目录//〇〇。(如果对rsync命令比较熟悉,可以 认为-overwrite或-update选项就是在源路径末尾添加一个斜 杠。)

如果不确定必scp操作的效果,最好先在一个小的测试目录树下试 着运行一次。

有很多选项可以用来控制的复制方式,包括保留文件属性,忽略节 点故障和限制文件或总数据的复制量。不带任何选项运行时,将显示使用

 

cfo/c;?是作为一个MapReduce作业来实现的,该复制作业是通过集群中并 行运行的map来完成。这里没有reducer。每个文件通过一个map进行复 制,并且试图为每一个map分配大致相等的数据来执行,即把文件 划分为大致相等的块。

map的数量是这样确定的。让每一个map复制合理的数据量来尽量减少构 建任务时所涉及的开销,这是一个很好的想法,所以每个map至少复制 256 MB数据(除非输入的总数据量较少,否则一个map就可以完成所有的 复制)。例如,将1 GB大小的文件分给4个map任务。如果数据非常大, 则有必要限制map的数量进而限制带宽和集群的使用。默认情况下,每个 集群节点(或tasktracker)最多分配20个map任务。例如,将1000 GB的文 件复制到一个由1〇〇个节点组成的集群,一共分配2000个map任务(每个 节点20个map任务),所以每个map任务平均复制512 MB数据。通过对 cfcvcp指定-m参数,可以减少分配的map任务数。例如-ni 1000将分配 1000个map任务,每个平均复制1GB数据。

如果试图在两个运行着不同HDFS版本的集群上使用cto/cp复制数据并使 用Zu/力协议,会导致复制作业失败,因为两个系统版本的RPC是不兼容 的。想要弥补这种情况,可以使用基于只读HTTP协议的HFTP文件系统 并从源文件系统中读取数据。这个作业必须运行在目标集群上,进而实现 HDFSRPC版本的兼容。使用HFTP协议重复前面的例子:

% hadoop distcp hftp://namenodel:50070/foo hdfs://namenode2/bar

注意,需要在URI源中指定namenodeWeb端口。这是由 dfs.http.address属性决定的,其默认值为50070。

使用新出的webhdfs协议(替代hftp)后,对源集群和目标集群均可以使用 HTTP协议进行通信,且不会造成任何不兼容的问题。

% hadoop distcp webhdfs://namenodel:50070/foo webhdfs://namenode2:50070/bar

另外一个变种是使用HDFS HTTP代理服务作为源distcp或者目标 distcp,进而具备了设置防火墙和控制带宽的优点,详情参见3.4.1节对 HTTP的讨论。

 

保持HDFS集群的均衡 向HDFS复制数据时,考虑集群的均衡性是相当重要的。当文件块在集_ 中均匀分布时,HDFS能达到最佳工作状态。回到前面1000 GB数据的例 子,将-m选项指定为1,即由一个map来执行复制作业,它的意思是—— 不考虑速度变慢和未充分利用集群资源——每个块的第一个复本将存储到运 行map的节点上(直到磁盘被填满)。第二和第三个复本将分散在集群中, 但这一个节点是不均衡的。将map的数量设定为多于集群中节点的数量, 可以避免这个问题。鉴于此,最好首先使用默认的每个节点20个map来运 行distcp命令。

然而,这也并不总能阻止集群的不均衡。也许 1想限制map的数量以便另外 一些节点可以运行其他作业。若是这样,可以使用均衡器(balancer)工具(参 见10.1.4节对均衡器的讨论)进而改善集群中块分布的均匀程度。

3.9 Hadoop 存档

每个文件均按块方式存储,每个块的元数据存储在namenode的内存中,因 此Hadoop存储小文件会非常低效。因为大量的小文件会耗尽namenode中 的大部分内存。但注意,存储小文件所需的磁盘容量和存储这些文件原始 内容所需要的磁盘空间相比也不会增多。例如,一个1MB的文件以大小为 128 MB的块存储,使用的是1 MB的磁盘空间,而不是128 MB

Hadoop存档文件或HAR文件,是一个更髙效的文件存档工具,它将文件 存入HDFS块,在减少namenode内存使用的同时,允许对文件进行透明 地访问。具体说来,Hadoop存档文件可以用作MapReduce的输入。

3.9.1使用Hadoop存档工具

Hadoop存档是通过a/r/uVe工具根据一组文件创建而来的。该存档工具运 行一个MapReduce作业来并行处理所有的输入文件,因此你需要一个 MapReduce集群来运行和使用它。这里,HDFS中有一些文档我们希望对它 们进行存档:

 

现在我们可以运行archive指令:

% hadoop archive -archiveName files.har /my/files /my

第一个选项是存档文件的名称,这里是HAR文件总是一个以.kr 为扩展名的文件,这是必需的,具体理由见后文描述。接下来的参数是需 要存档的文件。这里我们只存档一棵源文件树下的文件,即HDFS下 /m_y/力中的文件,但事实上该工具可以接受多棵源文件树。最后一个参数 是HAR文件的输出目录。让我们看看这个存档文件是怎么创建的:

% hadoop fs -Is /my

Found 2 items

drwxr-xr-x – tom supergroup 0 2009-04-09 19:13 /my/files

drwxr-xr-x – tom supergroup 0 2009-04-09 19:13 /my/files.har

% hadoop fs -Is /my/files.har Found 3 items

-rw-r–r– 10 tom supergroup 165 2009-04-09 19:13 /my/files.har/_index

-rw-r–r– 10 tom supergroup 23 2009-04-09 19:13 /my/files.har/一 masterindex

-rw-r–r– 1 tom supergroup 2 2009-04-09 19:13 /my/files.har/part-0

这个目录列表显示了 HAR文件的组成部分:两个索引文件以及部分文件 的集合——本例中只有一个。这些部分文件中包含已经链接在一起的大量 原始文件的内容,并且我们通过索引可以找到包含在存档文件中的部分文 件,它的起始点和长度。但所有这些细节对于使用/wfr URI方案与HAR文 件交互的应用都是隐式的,并且HAR文件系统是建立在基础文件系统上的 (本例中是HDFS)。以下命令以递归方式列出了存档文件中的部分文件:

% hadoop fs -lsr har:///my/files.har

drw-r–r– – tom supergroup 0 2009-04-09 19:13 /my/files.har/my drw-r–r– – tom supergroup 0 2009-04-09 19:13 /my/files.har/my/files -rw-r–r– 10 tom supergroup 1 2009-04-09 19:13 /my/files.har/my/files/a drw-r–r– – tom supergroup 0 2009-04-09 19:13 /my/files.har/my/files/dir -rw-r–r– 10 tom supergroup 1 2009-04-09 19:13 /my/files.har/my/files/dir/b

如果HAR文件所在的文件系统是默认的文件系统,这就非常直观易懂。另 一方面,如果想在其他文件系统中引用HAR文件,则需要使用一个不同于 正常情况的URI路径格式。以下两个指令作用相同,示例如下:

% hadoop fs -lsr har:///my/files.har/my/files/dir

% hadoop fs -lsr har://hdfs-localhost:8020/my/files.har/my/files/dir

注意第二个格式,仍以方案表示一个HAR文件系统,但是由/Wyi指定 基础文件系统方案的权限,后面加上一个斜杠和HDFS主机(localhost)及端 口(8020)。我们现在知道为什么HAR文件必须要有.kr扩展名。通过查看 权限和路径及扩展名的组成,HAR文件系统将Aw URI转换成为一个 基”础文件系统的 URI。在本例中是 /!6(/i:///oca//ios/.’5020/z«er//oW/^?/e5./zflr

 

路径的剩余部分是文件在存档文件系统中的路径:/ws^/rw/y?/e:r/A>。 要想删除HAR文件,需要使用递归格式进行删除,因为对于基础文件系统 来说,HAR文件是一个目录:

% hadoop fs -rmr /my/files.har

  • 不足

对于HAR文件,还需要了解它的一些不足。新建一个存档文件会创建原始 文件的一个副本,因此至少需要与要存档(尽管创建了存档文件后可以删除 原始文件)的文件容量相同大小的磁盘空间。虽然存档文件中源文件能被压 缩(HAR文件在这方面更接近于hr文件),但目前还不支持存档文件的 压缩。

一旦创建,存档文件便不能再修改。要想从中增加或删除文件,必须重新 创建存档文件。事实上,一般不会再对存档后的文件进行修改,因为它们 是定期成批存档的,比如每日或每周。

如前所述,HAR文件可以作为MapReduce的输入。然而InputFormat 类并不知道文件已经存档,尽管该类可以将多个文件打包成一个 MapReduce分片,所以即使在HAR文件中处理许多小文件,也仍然低效 的。7.2.1节对小文件和CombinenielnputFormat的讨论中,将提到该问题

的另一种解决方案。

最后,如果已经尽量减少系统中小文件的数量,但仍然受制于namenode的 内存容量,可以考虑使用联邦HDFS来提高命名空间的可扩展性(请参 见 3.2.3 节)

 

转载请注明:全栈大数据 » Hadoop分布式文件系统

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址