整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

第15章 关于Sqoop

hadoop 小小明 17℃ 0评论

15 关于Sqoop

(作者:Aaron Kimball)

Hadoop平台的最大优势在于它支持使用不同形式的数据。HDFS能够可靠地存储日志和来自不同渠道的其他数据,MapReduce程序能够解析多种 “即席”(ad hoc)数据格式,抽取相关信息并将多个数据集组合成非常有用的结果。

但是为了能够和HDFS之外的数据存储库进行交互,MapReduce程序需要使用外部API来访问数据。通常,一个组织中有价值的数据都存储在关系型数据库系统(RDBMS)等结构化存储器中。Apache Sqoop是一个开源工具,它允许用户将数据从结构化存储器抽取到Hadoop中,用于进一步的处理。抽取出的数据可以被MapReduce程序使用,也可以被其他类似于Hive的工具使用。(甚至可以使用Sqoop将数据从数据库转移到HBase。)一旦生成最终的分析结果,Sqoop便可以将这些结果导回数据存储器,供其他客户端使用。

在本章中,我们将了解Sqoop是如何工作的,并且学习如何在数据处理过 程中使用它。

1. 获取Sqoop

在几个地方都可以获得Sqoop。该项目的主要位置是在http://sqoop.apache.org,这里有Sqoop的所有源代码和文档。在这个站点可以获得Sqoop的官方版本和当前正在开发的新版本的源代码,这里同时还提供项目编译说明。另外,Clouderas Distribution Including Apache Hadoop(CDH)也包含一个Sqoop的安装包,以及与之兼容的Hadoop版本和类似于Hive的其他工具。

如果已经从Apache下载了一个版本,它将被放在一个类似于/home/yourname/sqoop-x.y.z/的目录中。我们称这个目录为$sqoop_home。可以通过运行可执行脚本$SQOOP_HOME/bin/sqoop来启动Sqoop

如果使用Cloudera的版本,那么安装包会把Sqoop的脚本放在类似于 /usr/bin/sqoop的标准位置。可以通过在命令行上简单地键入sqoop来运行它。(无论通过何种方式安装了Sqoop,从现在起我们都用执行sqoop脚本来表示运行它。)

不带参数运行Sqoop是没有什么意义的:

% sqoop

Try sqoop help for usage.

Sqoop被组织成一组工具或命令。不选择工具,Sqoop便无所适从。help是其中一个工具的名称,它能够打印出可用工具的列表,如下所示:

 

% sqoop help

usage: sqoop COMMAND [ARGS]

 

Available commands:

codegen

Generate code to interact with database records

create-hive-table

Import a table definition into Hive

eval

Evaluate a SQL statement and display the results

export

Export an HDFS directory to a database table

help

List available commands

import

Import a table from a database to HDFS

import-a11-tables

Import tables from a database to HDFS

job

Work with saved jobs

list-databases

List available databases on a server

list-tables

List available tables in a database

merge

Merge results of incremental imports

metastore

Run a standalone Sqoop metastore

version

Display version information

 

See 'sqoop help COMMAND' for information on a specific command.

根据它的解释,通过将特定工具的名称作为参数,help还能够提供该工具的使用说明:

 

 

% sqoop help import

usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]

 

Common arguments:

–connect <jdbc-uri>

Specify DDBC connect string

–driver <class-name>

Manually specify DDBC driver class to use

–hadoop-home <dir>

Override $HADOOP_HOME

–help

Print usage instructions

-P

Read password from console

–password <password>

Set authentication password

–username <username>

Set authentication username

–verbose

Print more information while working

 

运行Sqoop工具的另外一种方法是使用与之对应的特定脚本。这样的脚本般被命名为sqoop-toolname例如,sqoop-help sqoop-import 等。运行这两个脚本与运行sqoop helpsqoop import命令是一样的。

2. Sqoop连接器

Sqoop拥有一个可扩展的框架,使得它能够从()任何支持批量数据传输的外部存储系统导入(导出)数据。一个Sqoop连接器(connector)就是这个框架下的一个模块化组件,用于支持Sqoop的导入和导出操作。Sqoop附带的连接器能够支持大多数常用的关系数据库系统,包括MySQLPostgreSQLOracleSQL ServerDB2。同时还有一个通用的JDBC连接器,用于连接支持JDBC协议的数据库。Sqoop所提供的MySQLPostgreSQL连接器都是经过优化的,通过使用数据库特定的API来提供高效率的批量数据传输(在第536页的“直接模式导入”小节中将会进行详细介绍)

除了内置的Sqoop连接器外,还有很多针对各种数据存储器的第三方连接器可用,能够支持对企业级数据仓库(包括NetezzaTeradataOracle)NoSQL存储器(例如Couchbase)的连接。这些连接器必须另外单独下载,可以根据连接器所附带的安装说明将其添加到已有的Sqoop安装中。

3. 个导入的例子

在安装了Sqoop之后,可以用它将数据导入Hadoop。在本章的所有示例中我们都使用支持很多平台的易用数据库系统MySQL作为外部数据源。

基于DebianLinux系统(Ubuntu)的用户可以通过键入sudo apt-get install mysql-client mysql-server进行安装;RedHat的用户可以通过键入sudo yum install mysql mysql-server进行安装。

现在,MySQL已经安装好,让我们先登录,然后创建一个数据库(范例15-1)

范例15-1.创建一个新的MySQL数据库模式

% mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 349
Server version: 5.1.37-lubuntu5.4 (Ubuntu)
 
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
 
mysql> CREATE DATABASE hadoopguide;
Query OK, 1 row affected (0.02 sec)
 
mysql> GRANT ALL PRIVILEGES ON hadoopguide.* TO '%'@'localhost';
Query OK, 0 rows affected (0.00 sec)
 
mysql> GRANT ALL PRIVILEGES ON hadoopguide.* TO ''@'localhost';
Query OK, 0 rows affected (0.00 sec)
 
mysql> quit;
Bye

前面的密码提示要求你输入root用户的密码,就像root用户通过密码进行shell登录一样。如果你正在使用Ubuntu或其他不支持root直接登录的Linux系统,则键入安装MySQL时设定的密码。

在这个会话中,我们创建了一个新的名为hadoopguide的数据库模式,本章中我们将一直使用这个数据库模式,接着我们允许所有本地用户査看和修改hadoopguide模式的内容;最后,关闭这个会话。

现在让我们登录到数据库(这次不是作为root,而是你自己),然后创建一个将被导入HDFS的表(范例15-2)

 

范例15-2.填充数据库

% mysql hadoopguide
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 352
Server version: 5.1.37-lubuntu5.4 (Ubuntu)
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> CREATE TABLE widgetS(id INT NOT NULL PRIMARY KEY AUTO.INCREMENT,
-> widget_name VARCHAR(64) NOT NULL,
-> price DECIMAL(10,2),
-> design_date DATE,
-> version INT,
-> design_comment VARCHAR(100));
Query OK, 0 rows affected (0.00 sec)
mysql> INSERT INTO widgets VALUER (NULL, 'sprocket', 0.25, '2010-02-10',
-> 1, 'Connects two gizmos');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO widgets VALUES (NULL, 'gizmo'、 4.00, '2009-11-30', 4,
-> NULL);
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO widgets VALUES (NULL, 'gadget', 99.99, '1983-08-13',
-> 13, 'Our flagship product');
Query OK, 1 row affected (0.00 sec)
mysql> quit;

在上面的示例中,我们创建了一个名为widgets的新表。在本章更多的例子中我们都将使用这个虚构的产品数据库。widgets表有几个不同数据类型的字段。

现在让我们使用Sqoop将这个导入HDFS:

% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets -m 1
10/06/23 14:44:18 INFO tool.CodeGenTool: Beginning code generation
...
10/06/23 14:44:20 INFO mapred.JobClient: Running job:
job_201006231439_0002
10/06/23 14:44:21 INFO mapred.JobClient: map 0% reduce 0%
10/06/23 14:44:32 INFO mapred.JobClient: map 100% reduce 0%
10/06/23 14:44:34 INFO mapred.JobClient: Job complete:
job_201006231439_0002
...
10/06/23 14:44:34 INFO mapreduce.ImportJobBase: Retrieved 3 records.

 

Sqoopimport工具会运行一个MapReduce作业,该作业会连接MySQL数据库并读取表中的数据。默认情况下,该作业会并行使用4map任务来加速导入过程。每个任务都会将其所导入的数据写到一个单独的文件,但所有4个文件都位于同一个目录中。在本例中,由于我们知道只有三行可以导入的数据,因此指定Sqoop只使用一个map任务(-m 1),这样我们只得到一个保存在HDFS中的文件。

 

我们可以检查这个文件的内容,如下所示:

% hadoop fs -cat widgets/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gizmo,4.00,2009-11-30,4,null
3,gadget,99.99,1983-08-13,13,0ur flagship product

在本例中使用了连接字符串(jdbc:mysql://localhost/hadoopguide),表明需要从本地机器上的数据库中读取数据。如果使用了分布式Hadoop集群,则在连接字符串中不能使用localhost,否则,与数据库不在同一台机器上运行的map任务都将无法连接到数据库。即使是从数据库服务器所在主机上运行Sqoop,也需要为数据库服务器指定完整的主机名。

在默认情况下,Sqoop会将我们导入的数据保存为逗号分隔的文本文件。如果导入数据的字段内容中存在分隔符,我们可以另外指定分隔符、字段包围字符和转义字符。使用命令行参数可以指定分隔符、文件格式、压缩以及对导入过程进行更细粒度的控制,Sqoop自带的《Sqoop使用指南》或 Sqoop的在线帮助(sqoop help import,CDH 中使用 man sqoop-import可以找到对相关参数的描述。

 

 

文本和二进制文件格式

Sqoop可以将数据导入成几种不同的格式。文本文件(默认)是一种人类可读的数据表示形式,并且是平台独立和最简单的数据格式。但是,文本文件不能保存二进制字段(例如数据库中类型为VARBINARY的列),并且在区分null值和字符串null时可能会出现问题(尽管使用–null-string选项可以控制空值的表示方式)

为了处理这些情况,应该使用SqoopSequenceFile格式或Avro格式。AvroSequenceFile格式的文件能够为导入的数据提供最精确的表示方式,同时还允许对数据进行压缩,并支持MapReduce并行处理同一文件的不同部分。然而,Sqoop的目前版本还不能将AvroSequenceFile文件加载到Hive中(尽管可以手动地将Avro数据文件加载到Hive中)。SequenceFile文件格式的最后一个缺点是它只支持java语言,而Avro数据文件却可以被很多种语言处理。

4. 生成代码

除了能够将数据库表的内容写到HDFSSqoop同时还生成了一个Java源文件(widgets.java),保存在当前的本地目录中。(在运行了前面的sqoop import命令之后,可以通过Is widgets, java命令看到这个文件。)

15.5节中,将看到Sqoop在将源数据库的表数据写到HDFS之前,会首先用生成的代码对其进行反序列化。

生成的类(widgets)中能够保存一条从被导入表中取出的记录。该类可以在 MapReduce中使用这条记录,也可以将这条记录保存在HDFS中的一个SequenceFile文件中。(在导入过程中,由Sqoop生成的类会将每一条被导入的行保存在SequenceFile文件的键/值对格式中“值”的位置。)

也许你不想将生成的类命名为widgets,因为每一个类的实例只对应于一条记录。我们可以使用另外一个Sqoop工具来生成源代码,但并不真正执行导入操作;但生成的代码仍然会检査数据库表,以确定与每个字段相匹配的数据类型:

% sqoop codegen --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets --class-name Widget

codegen工具只是简单地生成代码,它不执行完整的导入操作。我们指定生成一个名为Widget的类,这个类将被写到Widget.java文件中。在之前执行的导入过程中,我们也可以指定类名–class-name和其他代码生成参数。如果你意外地删除了生成的源代码,或希望使用不同于导入过程的设定来生成代码,都可以用这个工具来重新生成代码。

如果计划使用导入到SequenceFile文件中的记录,你将不可避免地用到生成的类(SequenceFile文件中的数据进行反序列化)。在使用文本文件中的记录时不需要用到生成的代码,但在15.6节中,我们将看到Sqoop生成的代码有助于解决数据处理过程中的一些繁琐问题。

其他序列化系统

最近的Sqoop版本支持基于Avro的序列化和模式生成(参见4.4),允许你在项目中使用Sqoop,但无须集成生成的代码。

5. 深入了解数据库导入

如前所述,Sqoop是通过一个MapReduce作业从数据库中导入一个表,这个作业从表中抽取一行行记录,然后将记录写入HDFSMaPReduce是如何读取记录的呢?本节将解释Sqoop的底层工作机理。

15-1粗略演示了Sqoop是如何与源数据库及Hadoop进行交互的。像 Hadoop —样,Sqoop是用Java语言编写的。Java提供了一个称为JDBC(Java Database Connectivity)API,应用程序可以使用这个API来访问存储在RDBMS中的数据,同时可以检査数据类型。大多数数据库厂商都提供了JDBC驱动程序,其中实现了JDBC API并包含用于连接其数据库服务器的必要代码。

blob.png 

15-1. Sqoop的导入过程

 

根据用于访问数据库的连接字符串中的URL, Sqoop尝试预测应该加载哪个驱动程序,而你要事先下载所需的JDBC驱动并且将其安装在Sqoop客户端。在Sqoop无法判定使用哪个JDBC驱动程序时,用户可以明确指定如何将JDBC驱动程序加载到Sqoop,这样 Sqoop就能够支持大多数的数据库平台。

在导入开始之前,Sqoop使用JDBC来检查将要导入的表。它检索出表中所有的列以及列的SQL数据类型。这些SQL类型(VARCHARINTEGER)被映射到Java数据类型(StringInteger等),在MapReduce应用中将使用这些对应的Java类型来保存字段的值。Sqoop的代码生成器使用这些信息来创建对应表的类,用于保存从表中抽取的记录。

例如,之前提到的Widget类包含下列方法,这些方法用于从抽取的记录中检索所有的列:

public Integer get_id();
public String get_widget_name();
public java.math.BigDecimal get_price();
public java.sql.Date get_design_date();
public Integer get_version();
public String get_design_comment();

不过,对于导入来说,更关键的是DBWritable接口的序列化方法,这些 方法能使Widget类和JDBC进行交互:

public void readFields(ResultSet _dbResults) throws SQLException;
public void write(PreparedStatement _dbStmt) throws SQLException;

JDBCResultSet接口提供了一个用于从査询结果中检索记录的游标,这里的readFields()方法将用ResultSet中一行数据的列来填充Widget 对象的字段。这里的write()方法允许Sqoop将新的Widget行插入表,这个过程称为“导出”(exporting)15.8节将对此进行讨论。

Sqoop启动的MapReduce作业用到一个InputFormat,它可以通过JDBC从一个数据库表中读取部分内容。Hadoop提供的DataDrivenDBInputFormat能够为几个map任务对査询结果进行划分。

使用一个简单的査询通常就可以读取一张表的内容,例如:

SELECT col1,col2,coL3, FROM tabLeName

但是,为了获得更好的导入性能,人们经常将这样的查询划分到多个节点上执行。査询是根据一个“划分列”(splitting column)来进行划分的。根据表的元数据,Sqoop会选择一个合适的列作为划分列(通常是表的主键)。主键列中的最小值和最大值会被读出,与目标任务数一起用来确定每个map 任务要执行的查询。

例如,假设widgets表中有100,000条记录,其id列的值为0~99,999。在导入这张表时,Sqoop会判断出id是表的主键列。启动MapReduce作业时,用来执行导入的DataDrivenDBInputFormat便会发出一条类似于SELECT MIN(id), MAX(id) FROM widgets的査询语句。检索出的数据将用于对整个数据集进行划分。假设我们指定并行运行5map任务(使用-m 5),这样便可以确定每个map任务要执行的査询分別为:SELECT id, widget_name, … FROM widgets WHERE id >= 0 AND id < 20000SELECT id, widget_name, … FROM widgets WHERE id >= 20000 AND id < 40000,…,以此类推。

划分列的选择是影响并行执行效率的重要因素。如果id列的值不是均匀分布的(也许在id50,00075,000的范围内没有记录),那么有一部分map任务可能只有很少或没有工作要做,而其他任务则有很多工作要做。在运行一个导入作业时,用户可以指定一个列作为划分列,从而调整作业的划分使其符合数据的真实分布。如果使用-m 1参数来让一个任务执行导入作业,就不再需要这个划分过程。

在生成反序列化代码和配置InputFormat之后,Sqoop将作业发送到MapReduce集群。map任务执行查询并且将ResultSet中的数据反序列化到生成类的实例,这些数据要么被直接保存在SequenceFile文件中,要么在写到HDFS之前被转换成分隔的文本。

5.1. 导入控制

Sqoop不需要每次都导入整张表。例如,可以指定仅导入表的部分列。用户也可以在查询中加入WHERE子句,以此来限定需要导入的记录。例如, 如果上个月已经将id0~99,999的记录导入,而本月供应商的产品目录中增加了1000种新部件,那么导入时在查询中加入子句WHERE id >= 100000,就可以实现只导入所有新增的记录。

用户提供的WHERE子句会在任务分解之前执行,并且被下推至每个任务所执行的査询中。

5.2. 导入和一致性

在向HDFS导入数据时,重要的是要确保访问的是数据源的一致性快照。从一个数据库中并行读取数据的Map任务分别运行在不同的进程中,因此它们不可能共享同一个数据库事务。保证一致性的最好方法就是在导入时不允许运行任何对表中现有数据进行更新的进程。

5.3. 直接模式导入

Sqoop的架构支持它在多种可用的导入方法中进行选择,而多数数据库都使用上述基于DataDrivenDBInputFormat的方法。一些数据库提供了能够快速抽取数据的特定工具,例如MySQLmysqldump能够以大于JDBC的吞吐率从表中读取数据。在Sqoop的文档中将这种使用外部工具的方法称为“直接模式”(direct mode)。由于直接模式并不像JDBC方法那样通用,(例如,MySQL的直接模式不能处理大对象数据——类型为CLOBBLOB的列,Sqoop需要使用JDBC专用的API将这些列载入HDFS。)所以使用直接模式导入时必须由用户明确地启动(通过direct参数)

对于那些提供了此类特定工具的数据库,Sqoop使用这些工具能够得到很好的效果。采用直接模式从MySQL中导入数据通常比基于JDBC的导入更加高效(map任务和所需时间而言)Sqoop仍然并行启动多个map任务,接着这些任务将分别创建mysqldump程序的实例并且读取它们的运行结果,其效果类似于使用Maatkit(http://www.maatkit.org/)工具集中 tnk-parallel- dump的分布式实现。Sqoop也支持采用直接模式从PostgreSQL中导入数据。

即使是用直接模式来访问数掲库的内容,元数据的查询仍然是通过JDBC 来实现的。

6. 使用导入的数据

一旦数据被导入HDFS,就可以供定制的MapReduce程序使用。导入的文 本格式数据可以供Hadoop Streaming中的脚本或以TextlnputFormat为默 认格式运行的MapReduce作业使用。

为了使用导入记录的个别字段,必须对字段分隔符(以及转义/包围字符)进行解析,抽出字段的值并转换为相应的数据类型。例如,在文本文件中,sprocketid被表示成字符串“1”,但必须被解析为JavaIntegerint类型的变量。

Sqoop生成的表类能够自动完成这个过程,使你可以将精力集中在真正要运行的MacReduce作业上。每个自动生成的类都有几个名为parse()的重载方法,这些方法可以对表示为TextCharSequencechar[]或其他常见类型的数据进行操作。

名为MaxWidgetldMapReduce应用(在示例代码中)可以找到具有最大ID的部件。

这个类可以和Widget.java一起被编译成一个JAR文件。在编译时需要将Hadoop(hadoop-core-version.jar)Sqoop(sqoop-version.jar)的位置加入类路径,然后类文件就可以被合并生成一个MR文件,并且像这样运行:

% jar cvvf widgets.jar *.class
% HADOOP_CLASSPATH=/usr/lib/sqoop/sqoop-version jar hadoop jar \
>widgets.jar MaxWidgetld -libjars /usr/lib/sqoop/sqoop-version.jar

MaxWidgetld.run()方法在运行时,以及map任务在集群上运行时(通过libjars参数),该命令确保了Sqoop位于本地的类路径中(通过$HADOOP_ CLASSPATH)

运行之后,HDFSmaxwidgets路径中便有一个名为的文件,其内容如下:

3,gadget,99.99,1983-08-13,13,Our flagship product

注意,在这个MapReduce示例程序中,一个Widget对象从mapper被发送到reducer;这个自动生成的Widget类实现了Hadoop提供的Writable接口,该接口允许通过Hadoop的序列化机制来发送对象以及写到SequenceFile文件或从SequenceFile文件读出对象。

这个MaxWidgetID的例子是建立在新的MapReduce API之上的。虽然某些高级功能(例如使用大对象数据)只有在新的API中使用起来才更方便,但无论新旧API,都可以用来构建依赖于Sqoop生成代码的MapReduce应用。

前面4.4.7节介绍了用于处理Avro格式导入的API。采用通用的Avm映射时,MapReduce程序不需要使用针对数据表的模式所生成的代码(尽管使用 Avro的特定编译器时这也是其中一个选项;在这种情况下,Sqoop不会生成代码)。示例代码中包含有一个名为MaxWidgetldGenericAvro的程序,用于找出具有最大ID的部件并将结果写入一个Avro数据文件。

导入的数据与Hive

如第12章所述,对于很多类型的分析任务来说,使用类似于Hive的系统来处理关系操作有利于加快分析任务的开发。特别是对于那些来自于关系数据源的数据,使用Hive是非常有帮助的。HiveSqoop共同构成了一个强大的服务于分析任务的工具链。

假设在我们的系统中有另外一组数据记录,来自一个基于Web的零部件采购系统。这个系统返回的记录文件中包含部件ID、数量、送货地址和订单曰期。

下面是此类记录的例子:

1,15,120 Any St.,Los Angeles,CA,90210,2010-08-01

3,4,120 Any St.,Los Angeles,CA,90210,2010-08-01

2,5,400 Some PI.,Cupertino,CA,95014,2010-07-30

2,7,88 Mile Rd.,Manhattan,NY,10005,2010-07-18

通过使用Hadoop来分析这组采购记录,我们可以深入了解我们的销售业务。将这些数据与来自关系数据源(widgets)的数据相结合,可以使我们做得更好。在这个例子中,我们将计算哪个邮政编码区域的销售业绩最好,便可以让我们的销售团队更加关注于该区域。为了做到这一点,我们同时需要来自销售记录和widgets表的数据。

上述销售记录数据被保存在一个名为sales.log的本地文件中。

首先,让我们将销售数据载入Hive:

hive> CREATE TABLE sales(widget_id INT, qty INT,
>street STRING, city STRING, state STRING,
>zip INT, sale_date STRING)
>ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
OK
Time taken: 5.248 seconds
hive> LOAD DATA LOCAL INPATH "sales.log" INTO TABLE sales;
Copying data from file:/home/sales.log
Loading data to table sales
OK
Time taken: 0.188 seconds

Sqoop能够根据一个关系数据源中的表来生成一个Hive表。既然我们已经将widgets表的数据导人到HDFS,那么我们就直接生成相应Hive表的定义,然后加载保存在HDFS中的数据:

% sqoop create-hive-table --connect jdbc:mysql://localhost/hadoopguide \
>--table widgets --fields-terminated-by ','
...
10/06/23 18:05:34 INFO hive.Hivelmport: OK
10/06/23 18:05:34 INFO hive.Hivelmport: Time taken: 3.22 seconds
10/06/23 18:05:35 INFO hive.Hivelmport: Hive import complete.
% hive
hive> LOAD DATA INPATH "widgets" INTO TABLE widgets;
Loading data to table widgets
OK
Time taken: 3.265 seconds

在为一个特定的已导人数据集创建相应的Hive表定义时,我们需要指定该数据集所使用的分隔符。否则,Sqoop将允许Hive使用它自己的默认分隔符(Sqoop的默认分隔符不同)。

Hive的数据类型不如大多数SQL系统的丰富。很多SQL类型在Hive中都没有直接对应的类型。当Sqoop为导入操作生成Hive表定义时,它会为数据列选择最合适的Hive类型,这样可能会导致数据精度的下降。一旦出现这种情况,Sqoop就会提供一条警告信息,如下所示:

10/06/23 18:09:36 WARN hive.TableDefWriter:

Column design_date had to be

cast to a less precise type in Hive

如果想直接从数据库将数据导人到Hive,可以将上述的三个步骤(将数据导入HDFS,创建Hive表,将HDFS中的数据导入Hive)缩短为一个步骤。在进行导入时,Sqoop可以生成Hive表的定义,然后直接将数据导入Hive表。如果我们还没有执行过导入操作,就可以使用下面的命令,根据 MySQL中的数据直接创建Hive中的widgets表:

% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
>--table widgets -m 1 --hive-import

使用–hive-import参数来运行sqoop import工具,可以从源数据库中直接将数据载入Hive,它自动根据源数据库中表的模式来推断Hive表的模式。这样,只需要一条命令,你就可以在Hive中来使用自己的数据。

无论选择哪一种数据导入的方式,现在我们都可以使用widgets数据集和sales数据集来计算最赚钱的邮政编码地区。让我们来做这件事,并且把 査询的结果保存在另外一张表中,供将来使用:

hive> CREATE TABLE zip_profits (sales_vol DOUBLE, zip INT);
OK
hive> INSERT OVERWRITE TABLE zip_profits
>SELECT SUM(w.price * s.qty) AS sales_vol, s.zip FROM SALES s
>JOIN widgets w ON (s.widget_id = w.id) GROUP BY s.zip;
...
3 Rows loaded to zip_profits
OK
 
hive> SELECT * FROM zip_profits ORDER BY sales_vol DESC;
OK
403.71 90210
28.0 10005
20.0 95014

7. 导入大对象

很多数据库都支持在一个字段中保存大量的数据,根据数据是文本还是二进制类型,通常将其保存在表中CLOBBLOB类型的列中。数据库一般会对这些“大对象”进行特殊处理。大多数的表在磁盘上的物理存储都如图15-2所示。通过行扫描来确定哪些行匹配特定的査询条件时,通常需要从磁盘上读出每一行的所有列。如果也是以这种方式“内联”(inline)存储大对象,它们会严重影响扫描的性能。因此,一般将大对象与它们的行分开存储,如图15-3所示。在访问大对象时,需要通过行中包含的引用来“打开”它。

blob.png 

15-2.数据库通常以行数组的方式来存储表,行中所有列存储在相邻的位置

blob.png 

15-3.大对象通常保存在单独的存储区域,行的主存储区域包含指向大对象的间接引用

 

在数据库中使用大对象的困难表明,像Hadoop这样的系统更适合于处理大型的、复杂的数据对象,也是存储此类信息的理想选择。Sqoop能够从表中抽取大对象数据,并且将它们保存在HDFS中供进一步处理。

同在数据库中一样,MapReduce在将每条记录传递给mapper之前一般要对其进行“物化”(materialize)。如果单条记录真的很大,物化操作将非常低效。

如前所示,Sqoop所导入的记录在磁盘上的存储格式与数据库的内部数据结构非常相似:将每条记录的所有字段放在一起组成的一个记录数组。在导入的记录上运行一个MapReduce程序时,每个map任务必须将所读记录的所有字段完全物化。如果MapReduce程序所处理的输入记录中仅有很小一部分的大对象字段的内容,那么将所有记录完全物化将导致程序效率低下。此外,从大对象的大小来看,在内存中进行完全物化也许是无法实现的。

为了克服这些困难,Sqoop将导入的大对象数据存储在LobFile格式的单独文件中。LobFile格式能够存储非常大的单条记录(使用了64位的地址空间),每条记录保存一个大对象。LobFile格式允许客户端持有对记录的引用,而不访问记录内容;对记录的访问是通过java.io.InputStream(用于二进制对象)java.io.Reader(用于字符对象)来实现的。

在导入一条记录时,所有的“正常”字段会在一个文本文件中一起被物化,同时还生成一个指向保存CLOBBLOB列的LobFile文件的引用。 例如,假设我们的widgets表有一个名为schematicBLOB字段,该字段用于保存每个部件的原理图。

导入的记录可能像下面这样:

2,gizmo,4.00,2009-11-30,4,null,externalLob(lf,lobfile0,100,5011714)

externalLob(…)是对外部存储大对象的一个引用,这个大对象以LobFile格式(If)存储在名为lobfile0的文件中,同时还给出了该对象在文件中的字节位移和长度。

在使用这条记录时,Widget.get_scheinatic()方法会返回一个类型为BlobRef的对象,用于引用schematic列,但这个对象并不真正包含记录的内容。BlobRef.getDataStream()方法实际会打开LobFile文件并返回一个InputStream,用于访问schematic字段的内容。

在使用一个MapReduce作业来处理许多Widget记录时,可能你只需要访问少数几条记录的schematic字段。单个原理图数据可能有几兆大小或更大,使用这种方式时,只需要承担访问所需大对象的I/o开销。

在一个map任务中,BlobRefClobRef类会缓存对底层LobFile文件的引用。如果你访问几个顺序排列记录的schematic字段,就可以利用现有文件指针来定位下一条记录。

8. 执行导出

Sqoop中,“导入”(import)是指将数据从数据库系统移动到HDFS。与之相反,“导出”(export)是将HDFS作为数据源,而将一个远程数据库作为目标。在前面的几个小节中,我们导入了一些数据并且使用Hive对数据进行了分析。我们可以将分析的结果导出到一个数据库中,供其他工具使用。

将一张表从HDFS导出到数据库时,我们必须在数据库中创建一张用于接收数据的目标表。虽然Sqoop可以推断出哪个Java类型适合存储SQL数据类型,但反过来却是行不通的(例如,有几种SQL列的定义都可以用来存储JavaString类型,如CHAR(64)VARCHAR(200)或其他一些类似定义)。因此,必须由用户来确定哪些类型是最合适的。

我们打算从Hive中导出zip_profits表。首先需要在MySQL中创建一个具有相同列顺序及合适SQL类型的目标表:

% mysql hadoopguide
mysql> CREATE TABLE sales_by_zip (volume DECIMAL(8,2), zip INTEGER);
Query OK, 0 rows affected (0.01 sec)

接着我们运行导出命令:

% sqoop export --connect jdbc:mysql://localhost/hadoopguide -m 1 \
>--table sales_by_zip --export-dir /user/hive/warehouse/zip_profits \
>--input-fields-terminated-by '\0001'
...
10/07/02 16:16:50 INFO maprerfuce.ExportJobBase: Transferred 41 bytes in
10.8947
seconds (3.7633 bytes/sec)
10/07/02 16:16:50 INFO mapreduce.ExportDobBase: Exported 3 records.


最后,我们可以通过检査MySQL来确认导出成功:

% mysql hadoopguide -e 'SELECT * FROM sales_by_zip'
|====================|
|  volume  |   zip   |
|====================|
|  28.00   |  10005  |
|--------------------|
|  403.71  |  90210  |
|--------------------|
|  20.00   |  95014  |
|--------------------|

Hive中创建zip_profits表时,我们没有指定任何分隔符。因此Hive使用了自己的默认分隔符:字段之间使用Ctrl-A字符(Unicode编码0x0001)分隔,每条记录末尾使用一个换行符分隔。当我们使用Hive来访问这张表的内容时(SELECT语句),Hive将数据转换为制表符分隔的形式,用于在控制台上显示。但是直接从文件中读取这张表时,我们要将所使用的分隔符告知SqoopSqoop默认记录是以换行符作为分隔符,但还需要将字段分隔符Ctrl-A告之Sqoop。可以在 sqoop export 命令中使用–input-fields- terminated-by参数来指定字段分隔符。Sqoop在指定分隔符时支持几种转义序列(以字符'\'开始)

在示例语法中,所用的转义序列被包围在'单引号'中,以确保shell会按字面意义处理它。如果不使用引号,前导的反斜杠就需要转义处理(例如,— input-fields-terminated-by \\0001)。表 15-1列出了 Sqoop 所支持的转义序列。

15-1.转义序列可以用于指定非打印字符作为Sqoop中字段和记录的分隔符

转义序列

描述

\b

退格

\n

换行

\n

回车

\t

制表符

\'

单引号

\"

双引号

\\

反斜扛

\0

NUL。用于在字段或行之间插入NUL字符或在–enclosed-by–optionally-enclosed-by –escaped-by 参数中使用时表示禁用包围/转义

\0ooo

一个Unicode字符代码点的八进制表示,实际字符由八进制值ooo指定

\0xhhh

一个Unicode字符代码点的十六进制表示,采用\0xhhh的形式,其中hhh是十六进制值。例如,–fields-terminated-by '\0x10'指定的是回车符

 

9. 深入了解导出功能

Sqoop导出功能的架构与其导入功能的非常相似(参见图15-4)。在执行导出操作之前,Sqoop会根据数据库连接字符串来选择一个导出方法。对于大多数系统来说,Sqoop都会选择JDBC,然后,Sqoop会根据目标表的定义生成一个Java类;这个生成的类能够从文本文件中解析出记录,并能够向表中插入类型合适的值(除了能够读取ResultSet中的列);接着会启动一个MapReduce作业,从HDFS中读取源数据文件,使用生成的类解析出记录,并且执行选定的导出方法。

基于JDBC的导出方法会产生一批INSERT语句,每条语句会向目标表中插入多条记录。在大部分的数据库系统中,通过一条语句插入多条记录的执行效率要髙于多次执行插入单条记录的INSERT语句。多个独立的线程被用于从HDFS读取数据并与数据库进行通信,以确保涉及不同系统的I/o操作能够尽量重叠执行。

blob.png 

15-4.使用MapReduce并行执行导出

 

对于MySQL数据库来说,Sqoop可以采取使用mysqlimport的直接模式方法。每个map任务会生成一个mysqlimport进程,该进程通过本地文件系统上的一个命名FIFO通道进行通信,数据通过这个FIFO通道流入mysqlimpopt,然后再被写入数据库。

虽然从HDFS读取数据的MapReduce作业大多根据所处理文件的数量和大小来选择并行度(map任务的数量),但Sqoop的导出工具允许用户明确设定任务的数量。由于导出性能会受并行的数据库写入线程数量的影响,所以Sqoop使用CombineFilelnputFormat类将输入文件分组分配给少数几个map任务去执行。

9.1. 导出与事务

进程的并行特性决定了导出操作往往不是原子操作。Sqoop会生成多个并行执行的任务,分别导出数据的一部分。这些任务的完成时间各不相同,即使在每个任务内部都使用事务,不同任务的执行结果也不可能同时提交。 此外,数据库系统经常使用固定大小的缓冲区来存储事务数据,这使得一个任务中的所有操作不可能在一个事务中完成。Sqoop每导入几千条记录便会执行一次提交,以确保不会出现内存不足的情况。在导出操作进行过程 中,提交过的中间结果都是可见的。因此在导出过程完成前,不要启动那些要使用导出结果的应用程序,否则这些应用会看到不完整的导出结果。

为了解决这个问题,Sqoop可以将数据先导出到一个临时阶段表中,然后在导出任务完成前(即导出操作成功执行后)在一个事务中将临时阶段表中的数 据全部移动到目标表中。可以通过–staging-table选项来指定一个临时阶段表,这个临时阶段表必须是已经存在的,并且和目标表具有相同的模式定义。如果不使用–clear-staging-table选项,则这个临时阶段表必须是空表。

9.2. 导出和SequenceFile

之前的导出示例是从一个Hive表中读取源数据,该Hive表以分隔文本文件形式保存在HDFS中。Sqoop也可以从非Hive表的分隔文本文件中导出数据。例如,Sqoop可以导出MapReduce作业结果的文本文件。

Sqoop还可以将存储在SequenceFile中的记录导出到输出表,不过有一些限制。SequenceFile中可以保存任意类型的记录,但由于Sqoop的导出工具从 SequenceFile中读取对象后直接发送到OutputCollector,由它将这些对 象传递给数据库导出OutputFormat,因此为了能让Sqoop使用,记录必须被保存在SequenceFile/值对格式的“值”部分,并且必须继承抽象 类 com.cloudera.sqoop.lib.SqoopRecord(Sqoop 生成的所有类 那样) .

如果基于导出目标表,使用codegen工具(sqoop-codegen)为记录生成一个 SqoopRecord的实现,那你就可以写一个MapReduce程序,填充这个类的 实例并将它们写入SequenceFile,接着sqoop-export就可以将这些SequenceFile文件导出到表中。还有另外一种方法,即将数据放入 SqoopRecord实例中,然后保存到SequenceFile中。如果数据是从数据库表导入HDFS的,那么在经过某种形式的修改后,可以将结果保存在持有相同数据类型记录的SequenceFile中。

在这种情况下,Sqoop应当利用现有的类定义从SequenceFile中读取数据,而不是像导出文本记录时所做的那样,为执行导出生成一个新的(临时的)记录容器类。通过为Sqoop提供–class-name–jar-file参数,可以禁止它生成代码,而使用现有的记录类和jar包。在导出记录时,Sqoop将使用指定jar包中指定的类。

在下面的例子中,我们将重新导入widgets表到SequenceFile中,然后再将其导出到另外一个数据库表:

% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
>--table widgets -m 1 --class-name WidgetHolder --as-sequencefile \
>--target-dir widget_sequence_files --bindir .
...
10/07/05 17:09:13 INFO mapreduce.ImportJobBase: Retrieved 3 records.
 
% mysql hadoopguide
mysql> CREATE TABLE widgets2(id INT, widget_name VARCHAR(100),
-> price DOUBLE, designed DATE, version INT, notes VARCHAR(200));
Query OK, 0 rows affected (0.03 sec)
 
mysql> exit;
 
% sqoop export --connect jdbc:mysql://localhost/hadoopguide \
>--table widgets2 -m 1 --class-name WidgetHolder \
>--jar-file widgets.jar --export-dir widget_sequence_files
...
10/07/05 17:26:44 INFO mapreduce.ExportJobBase: Exported 3 records.

 

在导入过程中,我们指定使用SequenceFile格式,并且将JAR文件放入当前目录(使用–bindir),这样将来便可以重用它,否则它将会被保存在一个临时目录中。然后我们创建一个用于导出的目标表,该表在模式上稍有不同但与源数据兼容;在接下来的导出操作中,我们使用现有的生成代码从SequenceFile中读取记录并将它们写入数据库。

 

 

转载请注明:全栈大数据 » 第15章 关于Sqoop

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址