整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

8.1.2. 用户定义的Java计数器

hadoop 小红牛 8℃ 0评论

MapReduce允许用户编写程序来定义计数器,计数器的值可在mapper或
reducer中增加,计数器由一个Java枚举(enum)类型来定义,以便对有关的
计数器分组。一个作业可以定义的枚举类型数量不限,各个枚举类型所包 含的字段数量也不限。枚举类型的名称即为组的名称,枚举类型的字段就
是计数器名称。计数器是全局的。换言之,MapReduce框架将跨所有map
reduce聚集这些计数器,并在作业结束时产生一个最终结果。

在第5章中,我们创建了若干计数器来统计天气数据集中不规范的记录数。范例8-1中的程序对此做了进一步的扩展,能统计缺失记录和气温质量代码的分布情况。

范例8-1.统计最高气温的作业,包括统计气温值缺失的记录、不规范的字段和质量代码

public class MaxTemperatureWithCounters extends Configured implements Tool {
    enum Temperature {
        MISSING,
        MALFORMED
    }
static class MaxTemperatureMapperWithCounters
        extends Mapper<LongWritable, Text, Text, IntWritable> {
        private NedcRecordParser parser = new NcdcRecordParser();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value);
            if (parser.isValidTemperature()) {
                int airTemperature = parser.getAirTemperature();
                context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
            } else if (parser.isMalformedTemperature()) {
                System.err.printIn("Ignoring possibly corrupt input: " + value);
                context.getCounter(Temperature.MALFORMED).increment(1);
            } else if (parser.isMissingTemperature()) {
                context.getCounter(Temperature.MISSING).increment(1);
            }
            // dynamic counter
            context.getCounter("TemperatureQuality", parser.getQuality()).increment(1);
        }
    }
    @Override
public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this,getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
         
        job.setMapperClass(MaxTemperatureMapperWithCounters.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        return job.waitForCompletion(true)?0:1;
    }
         
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
        System.exit(exitCode);
    }
}

理解上述程序的最佳方法是在完整的数据集上运行一遍:

% hadoop jar hadoop-examples.jar MaxTemperatureWithCounters \

input/ncdc/all output-counters

作业成功执行完毕之后会输出各计数器的值(由作业的客户端完成)。以下是 一组我们感兴趣的计数器的值。

12/02/04 19:46:38 INFO mapred.DobClient: TemperatureQuality

12/02/04 19:46:38 INFO mapred.DobClient:2=1246032

12/02/04 19:46:38 INFO mapred.JobClient:1=973422173

12/02/04 19:46:38 INFO mapred.DobClient:0=1

12/02/04 19:46:38 INFO mapred.JobClient:6=40066

12/02/04 19:46:38 INFO mapred.JobClient:5=158291879

12/02/04 19:46:38 INFO mapred.DobClient:4=10764500

12/02/04 19:46:38 INFO mapred.UobClient:9=66136858

12/02/04 19:46:38 INFO mapred.JobClient:Air Temperature Records

12/02/04 19:46:38 INFO mapred.JobClient:Malformed=3

12/02/04 19:46:38 INFO mapred.JobClient:Missing=66136856

1.2.1. 动态计数器

上述代码还使用了动态计数器,这是一种不由Java枚举类型定义的计数器。由于Java枚举类型的字段在编了译阶段就必须指定,因而无法使用枚举类型动态新建计数器。范例8-1统计气温质量的分布,尽管通过格式规范定义
了可以取的值,但相比之下,使用动态计数器来产生实际值更加方便。在该例
中,Reporter对象的incrCounter()方法有两个String类型的输入参 数,分别代表组名称和计数器名称:

public void incrCounter(String group, String counter, long amount)

鉴于Hadoop需先将Java枚举类型转变成String类型,再通过RPC发送
计数器值,这两种创建和访问计数器的方法(即使用枚举类型和String类 型)事实上是等价的。相比之下,枚举类型易于使用,还提供类型安全,适
合大多数作业使用。如果某些特定场合需要动态创建计数器,则可以使用 String 类型。

1.2.2. 易读的计数器名称

计数器的默认名称是枚举类型的Java完全限定类名。由于这种名称在Web
界面和终端上可读性较差,因此Hadoop提供“资源捆绑”(resource bundle)
这种方式来修改计数器的显示名称。前面的例子即是如此,显示的计数器名称是 Air Temperature Records,而非
Temperature$MISSING。对动态计数器而 言,组名称和计数器名称也用作显示名称,因而通常没有这个问题。

为计数器提供易读名称也很容易。以Java枚举类型为名创建一个属性文件,用下划线(_)分隔嵌套类。属性文件与包含该枚举类型的顶级类放在同一目录。例如,例8-1中的Temperature枚举类型对应的属性文件被命名MaxTemperatureWithCounters_Temperature.properties0

属性文件应包含一个唯一的CounterGroupName属性,其值便是整个组的显示名称。在枚举类型中定义的每个字段均有一个属性与之对应,属性名称是“字段名称.name”,属性值是该计数器的显示名称。属性文件 MaxTemperatureWithCounters的内容如下:

CounterGroupName=Air Temperature Records

MISSING.name=Missing

MALFORMED.name=MaIformed

Hadoop使用标准的Java本地化机制将正确的属性文件载入到当前运行区
域•。例如,创逢一个名为 MaxTemperatui-eWithCountefs—Temperature—zhjCN.properties
的中文属性文件,在zh_CN区域运行时,就会使用这个属性文件。详情请 参见 java.util.Proper'tyResourceBundle
类的相关文档。

3.获取计数器

除了通过Web界面和命令行(执行hadoop
job -counter指令)之外,用 户还可以使用Java
API获取计数器的值。通常情况下,用户一般在作业运行完成、计数器的值已经稳定下来时再获取计数器的值,而Java
API还支持在作业运行期间就能够获取计数器的值。范例8-2展示了如何统计整个数据集中气温信息缺失记录的比例。

范例8-2.统计气温信息缺失记录所占的比例

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class MissingTemperatureFields extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 1) {
            JobBuilder. printUsage(this, "<job ID>");
            return -1;
        }
        String jobID = args[0];
        JobClient jobClient = new JobClient(new JobConf(getConf()));
        RunningJob job = jobClient.getJob(JobID.forName(jobID));
        if (job == null) {
            System.err.printf("No job with ID %s found.\n", jobID);
            return -1;
        }
        if (!job.isComplete()) {
            System.err.printf("Job %s is not complete.\n", jobID);
            return -1;
        }
        Counters counters = job.getCounters();
        long missing = counters.getCounter(MaxTemperatureWithCounters.Temperature.MISSING);
        long total = counters.getCounter(Task.Counter.MAP_INPUT__RECORDS);
        System.out.printf("Records with missing temperature fields: %.2f%%\n", 100.0 * missing / total);
 return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);
        System.exit(exitCode);
    }
}

首先,以作业ID为输入参数调用一个JobClient实例的getJob()方法,返回一个RunningJob对象,从而检查是否有一个作业与指定ID相匹配。 有多种因素可能导致无法找到一个有效的RunningJob对象,例如,错误地指定了作业ID,或jobtracker不再指向该作业。(内存中仅保留最新的
100 个作业,该阈值受 mapred. jobtracker.completeuserjobs.maximum
控制,当jobtracker重启时,所有作业信息都被清除。)

其次,如果确认该Running]ob对象(即作业)存在,则调用该对象的
getCounters()方法会返回一个Counters对象,封装了该作业的所有计
数器。Counters类提供了多个方法用于获取计数器的名称和值。上例调用
getCounter^)方法,它通过一个枚举值来获取气温信息缺失的记录数和被 处理的记录数(根据一个内置计数器)。

最后,输出气温信息缺失记录的比例。针对整个天气数据集的运行结果 如下:

% hadoop jar hadoop-examples.jar MissingTemperatureFields job_201202040938_0012

Records with missing temperature fields: 5.47%

使用新版MapReduce API上例使用了旧版API,原因是新版的在作业完成后获取计数器的方法并不适用于Hadoop 1.x版本。(使用旧版编写的代码可以运行在新版API上并获取计数器)。新旧版本API的主要差别在于是否使用Cluster对象来获取一个Job对象(而非一个RunningJob对象),然后再调用它的getCounters()方法。

Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobID.forName(jobID));
Counters counters = job.getCounters();
long missing = counters.findCounter(
MaxTemperatureWithCounters.Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

一个区別在于新版 API 使用 org.apache.hadoop.mapreduce.TaskCounter这个枚举类型,而旧版API则使用org.apache.hadoop.mapred. Task .Counter这个枚举类型。



转载请注明:全栈大数据 » 8.1.2. 用户定义的Java计数器

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址