整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

7.1.1. 默认的MapReduce作业

hadoop 花牛 9℃ 0评论

如果不指定mapper或reducer就运行MapReduce,会发生什么情况?我们运行一个最简单的MapReduce程序来看看:

public class MinimalMapReduce extends Configured implements Tool { 
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System•err);
            return -1;
        }
        Job job = new Job(getConf());
        job.setJarByClass(getClass());
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[l]));
        return job.waitForCompletion(true) ? 0 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduce(), args);
        System.exit(exitCode);
    }
}

我们唯一设置的是输入路径和输出路径。在气象数据的子集上运行以下 命令:

% hadoop MinimalMapReduce "input/ncdc/all/190{l,2}.gz" output

输出目录中得到命名为part-r-00000的输出文件。这个文件的前几行如下(为了适应页面进行了截断处理)

0—0029029070999991901010106004+64333+023450FM-

12+000599999V0202701N01591…

0—0035029070999991902010106004+64333+023450FM-

12+000599999V0201401N01181…

135-0029029070999991901010113004+64333+023450FM-

12+000599999V0202901N00821…

141—0035029070999991902010113004+64333+023450FM-

12+000599999V0201401N01181

270—0029029070999991901010120004+64333+02B450FM-

12+000599999V0209991C00001…

282—0035029070999991902010120004+64333+023450FM-

12+000599999V0201401N01391…

每一行以整数开始,接着是制表符(Tab),然后是一段原始气象数据记录。虽然这并不是一个有用的程序,但理解它如何产生输出确实能够洞悉Hadoop是如何使用默认设置运行MapReduce作业的。范例7-1的示例与前面MinimalMapReduce完成的事情一模一样,但是它显式地把作业环境设置为默认值。

范例7-1.简化的MapReduce驱动程序,默认值显式设置

public class MinimalMapReduceWithDefaults extends Configured implements Tool {
    @Override
    public int run(String[] args) throws IOException {
        Job job = JobBuilder.parseInputAnOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormat(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(l);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormat(TextOutputFormat.class);
        return job.waitForCompletion(true) ? 0 1;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
        System.exit(exitCode);
    }
}

通过把打印使用说明的逻辑抽取出来并把输人/输出路径设定放到一个帮助方法中,实现对run()方法的前几行进行了简化。几乎所有MapReduce驱动程序都有两个参数(输入与输出),所以此处进行这样的代码约简是可行的。以下是JobBuilder类中的相关方法,供大家参考:

public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {
    if (args.length != 2) {
        printUsage(tool, "<input> <output>");
        return null;
    }
    Job job = new Job(conf);
    job.setJarByClass(tool.getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job;
}
public static void printUsage(Tool tool, String extraArgsUsage) {
    System.err.printf("Usage: %s [genericOptions] %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage);
    GenericOptionsParser.printGenericCommandUsage(System.err);
}

回到范例7-1中的MinimalMapReducewithDefaults类,虽然有很多其他的默认作业设置,但加粗显示的部分是执行一个作业最关键的代码。接下来我们逐一讨论。

默认的输入格式是TextInputFormat,它产生的键类型是LongWritable(文件中每行中开始的偏移量值),值类型是Text(文本行)。这也解释了最后输出的整数的含义:行偏移量。

默认的mapper是Mapper类,它将输入的键和值原封不动地写到输出中:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

Mapper是一个泛型类型(generic type),它可以接受任何键或值的类型。在 这个例子中,map的输入输出键是LongWritable类型,map的输入输出值是Text类型。

默认的partitioner是HashPartitioner,它对每条记录的键进行哈希操作以决定该记录应该属于哪个分区。每个分区对应一个reducer任务,所以分区数等于作业的reducer个数:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value,int numPartitions) {
        return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

键的哈希码被转换为一个非负整数,它由哈希值与最大的整型值做一次按位与操作而获得,然后用分区数进行取模操作,来决定该记录属于哪个分区索引。

默认情况下,只有一个reducer,因此,也就只有一个分区,在这种情况下,由于所有数据都放入同一个分区,partitioner操作将变得无关紧要了。然而,如果有很多reducer,了解HashPartitioner的作用就非常重要。假设基于键的散列函数足够好,那么记录将被均匀分到若干个reduce任务中,这样,具有相同键的记录将由同一个reduce任务进行处理。

你可能已经注意到我们并没有设置map任务的数量。原因是该数量等于输入文件被划分成的分块数,这取决于输入文件的大小以及文件块的大小(如果此文件在HDFS中)。关于控制块大小的操作

 

选择reducer的个数

对Hadoop新手而言,单个reducer的默认配置很容易上手。真实的应用中,作业都把它设置成一个较大的数字,否则由于所有的中间数据都会放到一个reducer任务中,作业处理极其低效。注意,在本地作业运行器上运行时,只支持0个或1reducer

reducer最优个数与集群中可用的reducer任务槽数相关。总槽数由集群中节点数与每个节点的任务槽数相乘得到。每个节点的任务槽数由mapred.tasktracker.reduce.tasks.maximum 属性的值决定

一个常用的方法是设置的reducer数比总槽数稍微少一些,给reducer任务留点儿余地(容忍一些错误发生而不需要延长作业运行时间).如果 reduce任务很大,比较明智的做法是使用更多reducer,使任务粒度更小,从而使任务的失败不至于显著影响作业执行时间。

默认的reducer是Reducer类型,它也是一个泛型类型,它简单地将所有的输入写到输出中:

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context Context context) throws IOException, InterruptedException {
        for (VALUEIN value: values) {
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }
}

对于这个任务来说,输出的键是LongWritable类型,而值是Text类型。 事实上,对于这个MapReduce程序来说,所有键都是LongWritable类 型,所有值都是Text类型,因为它们是输入键/值,并且map函数和 reduce函数是恒等函数。然而,大多数MapReduce程序不会一直用相同的 键或值类型,所以就像上一节所描述的那样,必须配置作业来声明使用的类型。

记录在发送给reducer之前,会被MapReduce系统进行排序。在这个例子中,键是按照数值的大小进行排序的,因此来自输入文件中的行会被交叉放入一个合并后的输出文件。

默认的输出格式是TextOutputFormat,它将键和值转换成字符串并用制表符分隔开,然后一条记录一行地进行输出。这就是为什么输出文件是用制表符(Tab)分隔的,这是TextOutputFormat的特点。

转载请注明:全栈大数据 » 7.1.1. 默认的MapReduce作业

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址