整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

4.4.7关于Avro MapReduce

hadoop 花牛 6℃ 0评论

Avro提供了很多类,以便对Avro数据运行MapReduce程序。例如,org.apache.avro.mapreduce包中的AvroMapper类和AvroReducer类是Hadoop规范(旧版)中的Mapper和Reducer类。它们去除了作为输入和 输出的键/值对的不同,因为Avro数据文件只是一系列顺序排列的值。但 是,考虑到混洗,中间结果数据仍然被划分为键/值对。

这次,我们使用Avro MapReduce API重写找出天气数据集中每年最髙温度的MapReduce程序。我们用下列模式来表征天气记录:

{
    "type": "record",
    "name": "WeatherRecord",
    "doc": "A weather reading.",
    "fields":[
        {"name": "year", "type": "int"},
        {"name": "temperature", "type": "int"},
        {"name": "stationld", "type": "string"}
    ]
}

范例4-12中的程序读取文本输入,并将包含 天气记录的Avro数据文件。

public class AvroGenericMaxTemperature extends Configured implements Tool{
  private static final Schema SCHEMA = new Schema.Parser().parse(
    "{"+
    "  \"type\": \" record\","  +
    "  \"name\": \"WeatherRecord\"," +
    "  \"doc\": \"A weather reading.\"," +
    "  \"fields \": [" +
    "    {\"name\": \"year\", \"type\": \"int\"}," +
    "    {\"name\": \"temperature\", \"type\": \"int\"}," +
    "    {\"name\": \"stationId\", \"type\": \"string\"}" +
    "  ]" +
    "}"
  );
  public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, GenericRecord>> {
    private NcdcRecordParser parser = new NcdcRecondParser();
    private GenericRecord record = new GenericData.Record(SCHEMA);
    @Override
    public void map(Utf8 line,AvroCollector<Pair<Integer, GenericRecord>> collector,Reporter reporter) throws IOException {
      parser.parse(line.toString());
      if (parser.isValidTemperature()) {
        record.put("year", parser.getYearlnt());
        record.put("temperature", parser.getAirTemperature());
        record.put("stationld", parser.getStationId());
        collector.collect(new Pair<IntegerJ GenericRecord>(parser.getYearlnt(), record));
      }
    }
  }
  
  public static class MaxTemperatureReducer extends AvnoReducer<Integer, GenericRecord, GenericRecord> {
    @Override
    public void reduce(Integer key, Iterable<GenericRecord> values,AvroCollector<GenericRecord> collector, Reporter reporter) throws IOException {
      GenericRecord max = null;
      for (GenericRecord value : values) {
        if (max == null ||(Integer) value.get("temperature") > (Integer)max.get("temperature")) {
          max = newWeatherRecord(value);
        }
      }
      collector.collect(max);
    }
    private GenericRecord newWeatherRecord(GenericRecord value) {
      GenericRecord record = new GenericData.Record(SCHEMA);
      record.put("year",value.get("year"));
      record.put("temperature", value.get("temperature"));
      record.put("stationld", value.get("stationld"));
      return record;
    }
  }
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    JobConf conf = new JobConf(getConf(), getClass());
    conf.setDobName("Max temperature");
    FilelnputFormat.addInputPath(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING));
    AvroJob.setMapOutputSchema(conf,
    Pair.getPairSchema(Schema.create(Schema.Type.INT), SCHEMA));
    AvroJob.setOutputSchema(conf, SCHEMA);
    conf.setInputFormat(AvroUtf8InputFormat.class);
    AvroJob.setMapperClass(conf, MaxTemperatureMapper.class);
    AvroJob.setReducerClass(conf, MaxTemperatureReducer.class);
    JobClient.runDob(conf);
    return 0;
  }
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
    System.exit(exitCode);
  }
}

这个程序使用的是泛型Avro映射。这样可以避免我们以损失类型安全(通过字符串值来引用字段名称,例如"temperature")为代价,生成代码来表征数据记录。为了方便,天气记录的模式已加入代码中(读取SCHMA常量),但是在实际情况中,从驱动器本地文件中读取模式并通过Hadoop作业配置将模式传递给mpperreducer,可以提高代码的可维护性。

该API与常规的Hadoop MapReduce API有两个较大不同之处。第一个不同是通过 org.apache.avro.mapred.Pair 来包裹 MaxTempenatureMapper 中map输出的键、值。(org.apache.avro.mapred.AvroMapper不具备定输出键、值的原因是,方便仅有map任务的作业将值存入Avro数据文件中)针对这个MapReduce程序,键是年份(一个整数),值是天气记录,由Avro的GenericRecord表征。

Avro MapReduce确实为reducer的输入保留了键-值对格式,但这是混洗的输出,所以它在调用 org.apache.avro.mapred.AvroReducer 之前对 Pair进行解包。MaxTemperatureReducer针对每个键(年份)所对应的所有记录执行迭代运算,并找到那条有最高温度的记录。有必要对目前找到的最髙温度的记录拷贝一份,因为该迭代运算为了达到高效的目的需要重用该实例(并且仅更新相关字段)。

与传统MapReduce的第二个差异是,它使用AvroJob来配置作业。AvroJob类非常适用于为输入、map输出以及最后输出数据指定Avro模式。在上述程序中,输入模式是Avro string,因为我们是从文本文件中读取数据且输入格式被设置为AvroUtf8InputFormat。map输出模式是键值对模式,键模式是Avro int,值模式是天气记录模式。最后输出数据模式是天气记录模式,并且写入Avro数据文件中的输出格式是默认的 AvroOutputFormat 格式

下面的命令行代码显示了如何在一个小的采样数据集上运行该程序:

% hadoop jar avro-examples.jar AvroGenericMaxTemperature \
input/ncdc/sample.txt output

执行完成之时,我们可以使用Avro工具JAR来査看输出结果,该结果是JSON格式的Avro数据文件,每行一条记录如下:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-00000.avro

{"year" : 1949, "temperature" : 111, "stationld" : "012650-99999"}

{"year" : 1950, "temperature" : 22, "stationld" : "011990-99999"}

在上述例子中,我们使用了一个AvroMapper和一个AvroReducer,但API支持传统MapReducermapper、reducer 与 Avro 指定的 mapper、reducer 混合使用这有利于将数据格式在Avro格式和其他格式之间来回转换,例如 SequenceFile

转载请注明:全栈大数据 » 4.4.7关于Avro MapReduce

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址