整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

5.4.1本地运行测试数据 在本地作业运行器上运行作业

hadoop 小红牛 22℃ 0评论

现在mapperreducer已经能够在受控的输入上进行工作了,下一步是写—个作业驱动程序(Jobdriver),然后在开发机器上使用测试数据运行它。

通过使用前面介绍的Tool接口,可以轻松写一个MapReducer作业的驱动程序,来计算按照年度査找最高气温(参范例5-8的MaxTemperatureDriver)。

 

范例5-8.查找最高气温

public class MaxTemperatureDriver extends Configured implements Tool {

^Override

public int run(String[] args) throws Exception { if (args.length != 2) {

System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());

ToolRunner.printGenericCommandUsage(System.err); return -1;

Dob job = new Job(getConf(), "Max temperature"); job. set JarByClass(getClass());••

FilelnputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureR«ducer.class);

job.setOutputKeyClass(Text•class);

job.setOutputValueClass(IntWritable.class);

return job.waitForCompletion(true) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new MaxTemperatureDriverC), args); System.exit(exitCode);

MaxTemperatureDriver实现了Tool接口,所以,我们能够设置GenericOptionsParser支持的选项。runO方法根据工具的配置创建一个]ob对象来启动一个作业。在所有可能的作业配置参数中,可以设置输入和输出文件路径,mapperreducercombiner以及输出类型(输入类型 由输入格式决定,默认为TextlnputFormat,包括LongWritable键和Text值)。为作业设置一个名称(Maxtemperature)也是很好的做法,这样可以在执行过程中或作业完成后方便地从作业列表中査找作业。默认情况下,作业名称是JAR文件名,通常情况下没有特殊的描述。

现在我们可以在一些本地文件上运行这个应用。Hadoop有一个本地作业运行器(jobrunner)它是在MapReduce执行引擎运行单个JVM上的MapReduce作业的简化版本。它是为测试而设计的,在IDE中使用起来非

 

常方便,因为我们可以在调试器中单步运行mapperreducer代码。

本地作业运行器只能用于简单测试MapReduce程序,因为它不同于完全的MapReduce实现。最大的区别是它不能运行多个reducer

(它也支持〇个reducer的情况。)通常情况下,这是没有问题的,因为虽然在集群上用户可以选择多个reducer来充分利用并行计算的优势,但是大多数应用可以在一个reducer的情况下工作。注意:即使把reducer的数量设置为大于1的值,本地作业运行器也会忽略这个设置而只使用一个reducer

Hadoop的后续版本可能会放宽这些限制。

本地作业运行器通过一个配置设置来激活。正常情况下,tnapred.job.tracker是一个主机:端口(host:port),用来设置jobtracker的地址,但它是一个特殊的local值(实际是个默认值)时,作业就在不访问外部jobtracker的情况下运行。

MapReduce2(包含在6.1.2节)中,等价的设置是mapreduce.framework.name,它必须设置为local

可以在命令,行方式下输入如下命令来运行驱动程序:

%mvn compile

%export HADOOP_CLASSPATH=target/classes/

%hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml \ input/ncdc/micro output

类似地,可以使用GenericOptionsParser提供的-fsjt选项:

%hadoop v2.MaxTemperatureDriver -fsfile:///-jt local input/ncdc/micro

这条指令使用本地目录作为输入来执行MaxTemperatureDriver,产生的输出存放在本地目录中。注意:虽然我们设置了fs,可以使用本地文件系统(file:///),但本地作业运行器实际上可以在包括HDFS在内的任何文件系统上正常工作(如果HDFS里有一些文件,可以马上进行尝试)。

我们运行这个程序时,运行失败,并打印如下异常:

java.lang.NumberFormatException: For input string:"+0000"

 

修复mapper

这个异常表明map方法仍然不能解析带正号的气温。如果堆栈跟踪不能提供足够的信息来诊断这个错误,因为程序运行在一个JVM中,我们可以在本地调试器中进行测试。前面我们已经使程序能够处理缺失气温值(+9999)的特殊情况,但不是任意正气温的一般情况。如果mapper中有更多的逻辑,那么给出一个解析类来封装解析逻辑是非常有意义的。参见范例5-9(这 是第三个版本)。

范例5-9.该类解析NCDC格式的气温记录

public class NcdcRecordParser {•.

private static final int MISSING_TEMPERATURE = 9999;

private String year; private int airTemperature; private String quality;

public void parse(String record) { year = record.substring(15, 19);

String airTemperatureString;

// Remove leading plus sign as parselnt doesn't like them if (record.charAt(87) == '+') {

airTemperaturestring = record.substring(88, 92);

} else {

airTemperaturestring = record.substring(87, 92);

airTemperature = Integer.parselnt(airTemperatureString); quality = record,substring(92j 93);

public void parse(Text record) { pa rse( record. toString);

public boolean isValidTemperature() {

return airTemperature != MISSING_TEMPERATURE && quality.matches(M[01459]H);

>

public String getYear() { return year; } public int getAirTemperature() { return airTemperature;

最终的mapper相当简单(参见范例5-10)。只调用解析类的parserO方法,后者解析输入行中的相关字段,用isValidTemperature()方法检査174 第5章

 

是否是合法气温,如果是,就用解析类的getter方法获取年份和气温数据。注意,我们也会在isValidTemperature()方法中检查质量状态字段和缺失的气温信息,以便过滤气温读取错误。

创建解析类的另一个好处是:相似作业的mapper不需要重写代码。也提供了一个机会直接针对解析类编写单元测试,用于更多目标测试。

范例5-10•这个mapper使用utility类来解析记录

public class MaxTemperatureMapper

extends Mapper<LongWritable, Text, Text, IntWritable> {

private NcdcRecordParser parser = new NcdcRecordParser();

^Override

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

parser.parse(value);

if(parser.isValidTemperature) {context.write(new Text(parser.getYear()),

new IntWritable(parser.getAirTemperature()));

经过这些修改以后,测试得以通过。

转载请注明:全栈大数据 » 5.4.1本地运行测试数据 在本地作业运行器上运行作业

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址