整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

2.11. 查询文件系统

hadoop 花牛 7℃ 0评论

1.文件元数据:FileStatus

任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件和目录相关信息的功能。FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、复本、修改时间、所有者以及权限信息。

FileSystem的getFileStatus()方法用于获取文件或目录的 FileStatus对象。范例3-5显示了它的用法。

范例3-5.展示文件状态信息

import java.io.FileNotFoundException;

import java.io.IOException;

import java.io.OutputStream;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.junit.After;

import org.junit.Before;

import org.junit.Test;

 

public class ShowFileStatusTest {

private FileSystem fs;

 

@Before

public void setUp() throws IOException {

Configuration conf = new Configuration();

fs = FileSystem.get(conf);

OutputStream out = fs.create(new Path(“/dir/file”));

out.write(“content”.getBytes(“UTF-8”));

out.close();

}

 

@After

public void tearDown() throws IOException {

if (fs != null) {

fs.close();

}

}

 

@Test(expected = FileNotFoundException.class)

public void throwsFileNotFoundForNonExistentFile() throws IOException {

fs.getFileStatus(new Path(“no-such-file”));

}

 

@Test

public void fileStatusForFile() throws IOException {

Path file = new Path(“/dir/file”);

FileStatus stat = fs.getFileStatus(file);

assertThat(stat.getPath().toUri().getPath(), is(“/dir/file”));

assertThat(stat.isDir()is(false));

assertThat(stat.getLen(), is(7L));

assertThat(stat.getModificationTime(),is(lessThanOrEqualTo(System.currentTimeMillis())));

assertThat(stat.getReplication(), is((shortl));

assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));

assertThat(stat.getOwner(), is(“tom”));

assertThat(stat.getGroup(), is(“supergroup”));

assertThat(stat.getPermission().toString(), is(“rw-r–r–“));

}

 

@Test

public void fileStatusForDirectory() throws IOException {

Path dir = new Path(“/dir”);

FileStatus stat = fs.getFileStatus(dir);

assertThat (stat. getPath().toUri().getPath(), is(“/dir/file”));

assertThat(stat.isDir()is(true));

assertThat(stat.getLen(), is(0L));

assertThat(stat.getModificationTime()>

is(lessThanOrEqualTo(System.currentTimeMillis()))){

assertThat(stat.getReplication(), is((short) 0));

assertThat(stat.getBlockSize(), is(0L));

assertThat(stat.getOwner(), is(“tom”));

assertThat(stat.getGroup()> is(“supergroup”));

assertThat(stat.getPermission().toString(), is(“rwxr-xr-x”));

}

}

}

 

 

如果文件或目录均不存在,会抛出一个FileNotFoundException异常。 但如果只是想检査文件或目录是否存在,那么调用exists()方法会更 方便:

public boolean exists(Path f) throws IOException

列出文件 查找一个文件或目录相关的信息很实用,但通常还需要能够列出目录中的

 

内容。这就是Filesystem的listStatus()方法的功能:

public FileStatus[] listStatus(Path f) throws IOException

public FileStatusf] listStatus(Path i, PathFilter .filter) throws IOException

public FileStatus[] listStatus(Path[] files) throws IOException

public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

当传入的参数是一个文件时,它会简单转变成以数组方式返回长度为i的 FileStatus对象。当传入参数是一个目录时,则返回0或多个 FileStatus对象,表示此目录中包含的文件和目录。 它的重载方法允许使用PathFilter来限制匹配的文件和目录——可以参见 3.5.5节提供的例子。最后,如果指定一组路径,其执行结果相当于依次轮 流传递每条路径并对其调用listStatus()方兹,再将FileStatus对象数 组累积存入同一数组中,但该方法更为方便。这从文件系统树的不同分支 构建输入文件列表时,这是很有用的。范例3-6简单显示了这个方法。注 意Filelltil中stat2Paths()方法的使用,它将一个FileStatus对象数组 转换为一个Path对象数组。

范例3-6.显示Hadoop文件系统中一组路径的文件信息

public class ListStatus {

public static void main(String[] args) throws Exception { String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) { paths[i] = new Path(args[i]);

}

FileStatus[] status = fs.listStatus(paths);

Path[] listedPaths = FileUtil.stat2Paths(status); for (Path p : listedPaths) {

System•out•println(p);

我们可以用这个程序来显示一组路径集目录列表的并集:

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom

hdfs://localhost/user

hdfs://localhost/user/tom/books

hdfs://localhost/user/tom/quangle.txt

 

  1. 文件模式

在单个操作中处理一批文件是一个很常见的需求。例如,一个用于处理日 志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文 件。在一个表达式中使用通配符来匹配多个文件是比较方便的,无需列举 每个文件和目录来指定输入,该操作称为“通配”(globbing)。Hadoop为执 行通配提供了两个FileSystem方法:

public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

globStatus()方法返回与其路径匹配于指定模式的所有文件的Fiiestatus 对象数组,并按路径排序。PathFilter命令作为可选项可以进一步对匹配 结果进行限制。

Hadoop支持的通配符与Unix 的相同(参见表3-2)。

表3-2.通配符及其含义

通配符 名称 匹配

星号 匹配0或多个字符
? 问号 匹配单一字符
[ab] 字符类 匹配{a, b}集合中的一个字符
[Aab] 非字符类 匹配非{a, b}集合中的一个字符
[a-b] 字符范围 匹配一个在{a,b}范围内的字符(包括 ab), a在字典顺序上要小于或等于b
[Aa-b] 非字符范围 匹配一个不在{a,b}范围内的字符(包 括ab),a在字典顺序上要小于或等 于b
{a,b} 或选择 匹配包含a或b中的一个的表达式
\c 转义字符 匹配元字符c

 

 

假设有日志文件存储在按日期分层组织的目录结构中。如此一来,2007年 最后一天的日志文件就会保存在名为/2007/12/31的目录中。假设整个文件 列表如下所示:

 

I——2007/

I L—12/

I I,——30/

I 1 31/

1——2008/

一些文件通配符及其扩展如下所示。

通配符 扩展 > ‘•,免、:,*、.以 K、y:沒餘足丈為,康
r /2007/2008
/*/* /2007/12/2008/01
/*/12/* /2007/12/30/2007/12/31
/200? /2007/2008
/200[78] /2007/2008
/200[7-8] /20G7/2008
/200[八01234569] /2007/2008
/*/*/{31,01} /2007/12/31/2008/01/01
/*/*/3{0,1> /2007/12/30/2007/12/31
/*/{12/31,01/01} /2007/12/31/2008/01/01

 

 

  1. PathFilter 对象

通配符模式并不总能够精确地描述我们想要访问的文件集。比如,使用通 配格式排除一个特定的文件就不太可能。FileSystem中的listStatus() 和globStatus()方法提供了可选的PathFilter对象,以编程方式控制通 配符:

package org.apache.hadoop.fs;

public interface PathFilter { boolean accept(Path path);

}

PathFilter 与 java.io.FileFilter —样,是 Path 对象而不是 File 对象。

范例3-7显示了 PathFilter用于排除匹配正则表达式的路径。

范例3-7. PathFilter,用于排除匹配正则表达式的路径

public class RegexExcludePathFilter implements PathFilter {

 

public RegexExcludePathFilter(String regex) { this.regex = regex;

>

public boolean accept(Path path) {

return !path.toString().matches(regex);

这个过滤器只传递不匹配于正则表达式的文件。在通配符选出一组需要包 含的初始文件之后,过滤器可优化其结果。如下示例将扩展到/2007/12/30:

fs.globStatus(new Path(,72007/*/*”)> new RegexExcludeFilter(“A.*/2007/12/31$”))

过滤器由Path表示,只能作用于文件名。不能针对文件的属性(例如创建 时间)来构建过滤器。但是,通配符模式和正则表达式同样无法对文件属性 进行匹配。例如,如果将文件存储在按照日期排列的目录结构中(如前一节 中讲述的那样),则可以根据Pathfilter在给定时间范围内选出文件。

3.5.6删除数据

使用FileSystem的delete()方法可以永久性删除文件或目录。

public boolean delete(Path f, boolean recursive) throws IOException

如果f是一个文件或空目录,那么recursive的值就会被忽略。只有在 recrusive值为true时,非空目录及其内容才会被删除(否则会抛出IOException 异常)。

3.6数据流 _

3.6.1剖析文件读取

为了了解客户端及与之交互的HDFSnamenodedatanode之间的数据流是什 么样的,我们可参考图3-2,该图显示了在读取文件时事件的发生顺序。

客户端通过调用FileSyste对象的open()方法来打开希望读取的文件, 对于HDFS来说,这个对象是分布式文件系统(图3-2中的步骤1)的一个实 例。DistributedFileSystem通过使用RPC来调用namenode,以确定文 件起始块的位置(步骤2)。对于每一个块,namenode返回存有该块副本的

 

datanode地址。此外,这些datanode根据它们与客户端的距离来排序(根据 集群的网络拓扑;参见3.6.1节的的补充材料“网络拓扑与Hadoop”)。如 果该客户端本身就是一个datanode (比如,在一个MapReduce任务中),并 保存有相应数据块的一个副本本时,该节点就会从本地datanode读取数据 (参见图3-2)。

 

 

图3-2.客户端读取HDFS中的数据

DistributedFileSystem 类返回一个 FSDatalnputStream 对象(一个支持

文件定位的输入流)给客户端并读取数据。FSDatalnputStream类转而封 装 DFSInputStr’eam 对象,该对象管理着 datanode 和 namenode 的 I/O

接着,客户端对这个输入流调用readO方法(步骤3)。存储着文件起始几 个块的datanode地址的DFSInputStream随即连接距离最近的datanode。 通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端 (步骤4)。到达块的末端时,DFSInputStream关闭与该datanode的连接, 然后寻找下一个块的最佳datanode(步骤5)。客户端只需要读取连续的流, 并且对于客户端都是透明的。

客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新 建连接的顺序读取的。它也会根据需要询问namenode来检索下一批数据块 的datanode的位置。一旦客户端完成读取,就对FSDatalnputStream调用 close()方法(步骤6)。

在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错

 

误,会尝试从这个块的另外一个最邻近datanode读取数据。它也记住那个 故障datanode,以保证以后不会反复读取该节点上后续的块。 DFSInputStream也会通过校验和确认从datanode发来的数据是否完整。 如果发现有损坏的块,就在DFSInputStreatn试图从其他datanode读取其复 本之前通知namenode

这个设计的一个重点是,namenode告知客户端每个块中最佳的datanode, 并让客户端直接连接到该datanode检索数据。由于数据流分散在集群中的 所有datanode,所以这种设计能使HDFS可扩展到大量的并发客户端。同 时namenode只需要响应块位置的请求(这些信息存储在内存中,因而非常 髙效),无需响应数据请求,否则随着客户端数量的增长,namenode会很快成 为瓶颈。

网络拓扑与Hadoop

在本地网络中,两个节点被称为“彼此近邻”是什么意思?在海量数据 处理中,其主要限制因素是节点之间数据的传输速率——带宽很稀缺。

这里的想法是将两个节点间的带宽作为距离的衡量标准。

不用衡量节点之间的带宽——实际上很难实现(它需要一个稳定的集群, 并且在集群中两两节点对数量是节点数量的平方)一Hadoop为此采用一 个简单的方法:把网络看作一棵树,两个节点间的距离是它们到最近共 同祖先的距离总和。该树中的层次是没有预先设定的,但是相对于数据 中心、机架和正在运行的节点,通常可以设定等级。具体想法是针对以 下每个场景,可用带宽依次递减:

  • 同一节点上的进程

  • 同一机架上的不同|节点

  • 同一数据中心中不同机架上的节点

  • 不同数据中心中的节点®

例如,假设有数据中心W机架r/中的节点/z /。该节点可以表示为 /以/r/M八利用这种标记,这里给出四种距离描述:

  • distance(/dl/rl/nl/dl/rl/nl)=0(^\一节点上的进程)

(f>在本书写作期间,Hadoop依旧不适合跨数据中心运行。

 

  • distance(/dl/rl/nl,/dl/rl/n2)=2(同一机架上的不同节点)

  • distance(/dl/rl/nl,/dl/r2/n3)=4(同一数据中心中不同机架上的 节点)

  • distance(/dl/rl/nl,/d2/r3/n4)=6(不同数据中心中的节点)

示意图参见图3-3(数学爱好者会注意到,这是一个测量距离的例子)。最 后,我们必须意识到Hadoop无法自行定义网络拓扑结构。它需要我们能 够理解并辅助定义,我们将在9.1.1节的“网络拓扑”中讨论如何配置网 络拓扑•不过在默认情况下,假设网络是扁平化的只有一层——或换句话 说,所有节点都在同一数据中心的同一机架上。_规糢小的集群可能如 此,不需要进一步配置。 •

 

图3-3. Hadoop中的网络距离

 

3.6.2剖析文件写入

接下来我们看看文件是如何写入HDFS的。尽管比较详细,但对于理解数 据流还是很有用的,因为它清楚地说明了 HDFS的•致模型。

我们要考虑的情况是如何新建一个文件,把数据写入该文件,最后关闭该 文件。参见图3-4。

客户端通过对DistributedFileSystem对象调用create()函数来新建文 件(图 3-4 中的步骤 1)。DistributedFileSystem 对 namenode 创建•个 RPC调用,在文件系统的命名空间中新建一个文件,此时该文件中还没有

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

图3-4.客户端将数据写入HDFS

相应的数据块(步骤2)。namenode执行各种不同的检査以确保这个文件不存 在以及客户端有新建该文件的权限。如果这些检査均通过,namenode就会 为创建新文件记录一条记录;否则,文件创建失败并向客户端抛出一个 IOException 异常。DistributedFileSystem 向客户端返回一个 FSDataOutputStream对象,由此客户端可以开始写人数据。就像读取事 件一样,FSDataOutputStream 封装一个 DFSoutPutstream 对象,该对象 负责处理datanodenamenode之间的通信。

在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的数据 包,并写入内部队列,称为“数据队列”(dataqueue)。DataStreamer处理 数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块 来存储数据复本。这一组datanode构成一个管线——我们假设复本数为3, 所以管线中有3个节点。DataStreamer将数据包流式传输到管线中第1个 datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。 同样,第2个datanode存储该数据包并且发送给管线中的第3个(也是最后 一个)datanode (步骤 4)。

DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确 认回执,称为“确认队列(ack queue)。收到管道中所有datanode确认信

 

息后,该数据包才会从确认队列删除(步骤5)。 如果在数据写入期间datanode发生故障,则执行以下操作(对写入数据的客 户端是透明的)。首先关闭管线,确认把队列中的所有数据包都添加回数据 队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据 包。为存储在另一正常datanode的当前数据块指定一个新的标识,并将该 标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数 据块。从管线中删除故障数据节点并且把余下的数据块写入管线中另外两 个正常的的datanodenamenode注意到块复本量不足时,会在另一个节点 上创建一个新的复本。后续的数据块继续正常接受处理。

在一个块被写入期间可能会有多个dataru^de同时发生故障,但非常少见。 只要写入了 dfs.replication.min的复本数(默认为1),写操作就会成 功,并且这个块可以在集群中异步复制,直到达到其目标复本数 (dfsreplication 的默认值为 3)。

客户端完成数据的写入后,对数据流调用close()方法(步骤6)。该操作将 剩余的所有数据包写入datanode管线,并在联系到namenode且发送文件写 入完成信号之前,等待确认(步骤7)。namenode已经知道文件由哪些块组成 (通过Datastpeamer请求分配数据块),所以它在返回成功前只需要等待数

据块进行最小量的复制。

复本怎么放

namenode如何选择在哪个datanode存储复本(replica)?这里需要对可靠 性、写入带宽和读取带宽进行权衡。例如,把所有复本都存储在一个节 点损失的写入带宽最小,因为复制管线都是在同一节点上运行,但这并 不提供真实的冗余(如果节点发生故障,那么该块中的数据会丢失)。同 时,同一机架上服务器间的读取带宽是很高的。另一个极端,把复本放 在不同的数据中心可以最大限度地提高冗余,但带宽的损耗非常大。即 使在同一数据中心(到目前为止,所有Hadoop集群均运行在同一数据中 心内),也有许多不同的数据布局策略。其实,在发布的Hadoop 0.17.0 版中改变了数据布局策略来辅助保持数据块在集群内分布相对均匀(第 350页的“均衡器”详细说明了如何保持集群的均衡)。在丨.x之后的发 行版本,可即时选择数据块的布局策略。

Hadoop的默认布局策略是在运行客户端的节点上放第〗个复本(如果客

 

户端运行在集群之外,就随机选择一个节点,不过系统会避免挑选那些 存储太满或太忙的节点)。第2个复本放在与第一个不同且随机另外选择 的机架中节点上(离架)•第3个复本与第2个复本放在同一个机架上,且 随机选择另一个节点。其他复本放在集群中随机选择的节点上,不过系 统会尽量避免在同一个的机架上放太多复本。

一旦选之复本的放置位置,就根据网络拓扑创建一个管线。如果复本数 为3,则有图3-5所示的管线。

总的来说,这一方法不仅提供很好的稳定性(数据块存储在两个机架中)并 实现很好的负栽均衡,包括写入带宽(写入操作只需要遍历一个交换机)、 读取性能(可以从两个机架中选择读取)和集群中块的均匀分布(客户端只 在本地机架上写入一个块)。

 

 

 

 

 

 

 

 

 

 

 

 

图3-5. —个典型的复本管线

3.6.3 一致模型

文件系统的一致模型(coherency model)描述了文件读/写的数据可见性。 HDFS为性能牺牲了一些POSIX要求,因此一些操作与你期望的可能 不同。

新建一个文件之后,它能在文件系统的命名空间中立即可见,如下所示:

Path p = new Path(“p”);

Fs.create(p);

 

assertThat(fs.exists(p),is(true));

但是,写入文件的内容并不保证能立即可见,即使数据流已经刷新并存 储。所以文件长度显示为〇:

Path p = new Path(“p”);

OutputStream out = fs.create(p);

out.write(“content”.getBytes(“UTF-8“));

out.flush();

assertThat(fs.getFileStatus(p).getLen(is(0L));

当写入的数据超过一个块后,第一个数据块对新的reader就是可见的。之 后的块也不例外。总之,当前正在写入的块对其他reader不可见。

HDFS提供一个方法来使所有缓存与数据节点强行同步,即对 FSDataOutputStream调用sync()方法9sync()方法返回成功后,对 所有新的reader而言,HDFS能保证文件中到目前为止写入的数据均到达所 有datanode的写入管道并且对所有新的reader均可见:®

Path p = new Path(“p”);

FSDataOutputStream out = fs.create(p); out.write(“content”.getBytes(“UTF-8“))y out.flush();

out.sync(); J .

assertThat(fs.getFileStatus(p).getLen(), is(((long) “content”.length())));

该操作类似于POSIX中的fsync系统调用,该调用提交的是一个文件描述 符的缓冲数据。例如,利用标准Java API将数据写入本地文件,我们能够 在刷新数据流且同步之后看到文件内容:

FileOutputStream out = new FileOutputStream(localFile)j out.write(“content”.getBytes(“UTF-8”)); out.flush(); // flush to operating system out.getFD().sync(); // sync to disk

assertThat(localFile.length(), is(((long) “content”.length())));

HDFS中关闭文件其实还隐含执行sync()方法:

Path p = new Path(“p”);

OutputStream out = fs.create(p);

out.write(“content”.getBytes(“UTF-8”));

out.close();

assertThat(fs.getFileStatus(p).getLen()^ is(((long) “content”.length())));

①在HadcK^p l.x之后的版本,sync()方法被丢弃了,进而采用等价的hflushO方法。 另外还增加了一个hsync()方法,确保操作系统刷新数据到磁盘(类似于POS1X的 fsync方法)。但在本书写作期间,此方法还没有实现,只有hflush()方法。

转载请注明:全栈大数据 » 2.11. 查询文件系统

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址