整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

2.8.2. 通过FileSystem API读取数据

hadoop 花牛 8℃ 0评论

正如前一小节所解释的,有时根本不可能在应用中设置URLStreamHandlerFactory 实例。在这种情况下,需要使用FileSystem API来打开一个文件的输入流。

Hadoop文件系统中通过Hadoop Path对象(而非java.io.File对象,因为它的语义与本地文件系统联系太紧密)来代表文件。可以将路径视为一个 Hadoop 文件系统 URI,如 hdfs://localhast/use>-/tom/quang!e.txt。

FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用 的文件系统实例,这里是HDFS。获取FileSystem实例有下面这几个静态 工厂方法:

public static FileSystem get(Configuration conf) throws IOException

Public static FileSystem get(URI uri, Configuration conf) throws IOException

public static FileSystem get(URI uri, Configuration conf, String user) throws IOException

 

Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-silt.xml)。第一个方法返回的是默认文件系统 (在conf/Awe-j/re.xm/中指定的,如果没有指定,则使用默认的本地文件系 统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如 果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户 来访问文件系统,对安全来说是至关重要。(参见9.6节)。

在某些情况下,你可能希望获取本地文件系统的运行实例,此时你可以使用的getLocal()方法很方便地获取。

public static LocalFileSystem getLocal(Configuration conf) throws IOException

有了 FileSystem实例之后,我们调用open()函数来获取文件的输入流:

Public FSDatalnputStream open(Path f) throws IOException

Public abstract FSDatalnputStream open(Path f, int bufferSize) throws IOException

第一个方法使用默认的缓冲区大小4 KB。

最后,我们重写范例3-1,得到范例3-2。

 

范例3-2.直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件

public class FileSystemCat {

public static void main(String[] args) throws Exception {

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

InputStream in = null;

try {

in = fs.open(new Path(uri));

IOUtils.copyBytes(in, System.out, 4096, false);

finally {

IOUtils.closeStream(in);

}

}

}

 

程序运行结果如下:

% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt

On the top of the Crumpetty Tree The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

 

FSDatalnputStream对象

实际上,FileSystem对象中的open()方法返回的是FSDatalnputStream对象,而不是标准的java.io类对象。这个类是继承了 java.io.DatalnputStream接口的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。

public class FSDataInputStream extends DataInputStream

implements Seekable, PositionedReadable{

// implementation elided

}

Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的査询方法:

public interface Seekable {

void seek(long posthrows IOException;

long getPos() throws IOException;

@InterfaceAudience.Private

boolean seekToNewSource(long targetPos) throws IOException;

}

 

调用seek()来定位大于文件长度的位置会引发IOException异常。与 java.io.InputStream的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

范例3-3为范例3-2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后,定位到文件的起始位置再次以流方式读取该文件。

 

范例3-3.使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次

public class FileSystemCat {

public static void main(String[] args) throws Exception {

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri), conf);

FSDataInputStream in = null;

try {

in = fs.open(new Path(uri));

IOUtils.copyBytes(in, System.out, 4096, false);

in.seek(0); // go back to the start of the file

IOUtils.copyBytes(in, System.out, 4096, false);

finally {

IOUtils.closeStream(in);

}

}

}

 

在一个小文件上运行的结果如下:

 

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt

On the top of the Crumpetty Tree The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

On the top of the Crumpetty Tree The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

 

FSDatalnputStream 类也实现了 PositionedReadable 接口,从一个指定偏移量处读取文件的一部分:

 

public interface PositionedReadable {

 

public int read(long position, byte[] buffer, int offset, int length)

throws IOException;

 

public void readFully(long position, byte[] buffer, int offset, int length)

throws IOException;

 

public void readFully(long position, byte[] buffer) throws IOException;

}

 

read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检査这个值,它有可能小于指定的length长度。 readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。

所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDatalnputStrean并不是为并发访问设计的,因此最好为此新建多个实例),因此它们提供了在读取文件——可能是元数据——的主体时,访问文件其他部分的便利方法。事实上,这只是按照以下模式实现的Seekable接口。

最后务必牢记,seek()方法是一个相对髙开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式(如使用MapReduce),而非执行大量 seek()方法。

转载请注明:全栈大数据 » 2.8.2. 通过FileSystem API读取数据

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址