整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

13.5.2加载数据

hadoop 花牛 18℃ 0评论

观测站的数量相对较少,所以我们可以使用任何.种接口来插入这些观测站的静态数据。

但是,假设我们要加载数十亿条观测数据。这种数据导入是一个极为复杂 的过程,是一个需要长时间运行的数据库操作。MapReduceHBase的分布式模型让我们可以充分利用集群。通过把原始输入数据复制到HDFS,接着运行MapReduce作业,我们就能读到输入数据并将其写入HBase

范例13-3展示了一个MapReduce作业,它将观测数据从前几章所用的输入文件导入HBase


范例13-3.HDFSHBase表导入气温数据的MapReduce应用

public class HBaseTemperaturelmporter extends Configured implements Tool {
    // Inner-class for map
    static class HBaseTemperatureMapper<K> V> extends MapReduceBase implements
            Mapper<LongWritableJ Text, K, V>{
        private NcdcRecordParser parser = new NcdcRecordPanser(); 
        private HTable table;
        public void map(LongWritable key, Text value,
            OutputCollector<K, V> output, Reporter reporter) 
        throws IOException {
            parser.parse(value.toString()); 
            if (parser.isValidTemperature()) {
                byte[] rowKey = RowKeyConverter.makeObservationRowKey(parser.getStationId(), 
                    parser.getObservationDate().getTime());
                Put p = new Put(rowKey);
                p.add(HBaseTemperatureCli.DATA_COLUMNFAMILY,
                    HBaseTemperatureCli.AIRTEMP_QUALIFIER,
                    Bytes.toBytes(parser.getAirTemperature())); 
                table.put(p);
            }
        }
        public void configure(JobConf jc) { 
            super.configure(jc);
            // Create the HBase table client once up-front and keep it around 
            // rather than create on each map invocation. 
            try {
                this .table = new HTable(new HBaseConfiguration( jc), "observations"); 
            } catch (IOException e) {
                throw new RuntimeException(” Failed HTable construction”,e);
            }
        }
        @Override
        public void close() throws IOException {
            super.close(); 
            table.close();
        }
    }
    public int run(String[] args) throws IOException { 
        if (args.length != 1) {
            System.err.println(w Usage: HBaseTemperaturelmporter <input>” ); 
            return -1;
        }
        JobConf jc = new JobConf(getConf(), getClass()); 
        FilelnputFormat.addInputPath(jc, new Path(args[0])); 
        jc.setMapperClass(HBaseTemperatureMapper.class); 
        jc.setNumReduceTasks(0);
        jc.setOutputFormat(NullOutputFormat.class);
        DobClient.runJob(jc);
        return(0);
    }
    public static void main(String[] args) throws Exception { 
        int exitCode = ToolRunner.run(new HBaseConfiguration(), 
            new HBaseTemperatureImporter(), args);
        System.exit(exitCode);
    }
}

HBaseTemperatureImporter有一个名为HbaseTemperatureMapper的内部类,它类似于第5章的MaxTemperatureMaper类。外部类实现了 Tool并对调用 HBaseTemperatureMapper 内部类进行设置。HBaseTemperatureMapper  MaxTemperatpreMapper 的输入相同,所进行的解析方法 都使用第5章所介绍的NcdcRecordParser来进行也相同。解析时会检查输入是否为有效的气温。但是不同于在 MaxTemperatureMapper中仅把有效气温加到输出集合中,这个类把有效 的气温值添加到HBaseobservations表的列。(我们使用了从HBaseTemperatureCli类中导出的dataairtemp静态常量。 后面对此会有介绍。)在configure()方法中,我们对observations表新 建了一个HTable实例,在后面调用mapHBase进行交互时会用到它。 最后,在HTable实例中调用close把所有尚未清空的写缓存中的数据刷入磁盘。

我们所用的行的键在 RowKeyConverter上的makeObservationRowKey()

方法中,用观测站ID和观测时间创建:

public class RowKeyConverter {
private static final int STATION_ID_LENGTH = 12; 
/**
    *@return A row key whose format is: <station_id> <reverse_order_epoch>
*/
public static byte[] makeObservationRowKey(String stationId,
    longobservationTime){
byte[]row=newbyte[STATION_ID_LENGTH+Bytes.SIZEOF_LONG];
Bytes.putBytes(row,0,Bytes.toBytes(stationld),0,STATION_ID_LENGTH); longreverseOrderEpoch=Long.MAX_VALUE-observationTime;
        Bytes.putLong(row,STATION_ID_LENGTHJ reverseOrderEpoch);
         returnrow;
        }
    }

观测站ID其实是一个定长字符串。在转换时利用了这一点。 makeObservationRowKey()中使用的Byte类来自HBase工具包,它包含 字节数组和普通的JavaHadoop数据类型间的转换方法。在 makeObservationRowKey()中,Bytes.putLong()方法被用来填充键的字 节数组Bytes.SIZEOF_LONG常量被用来确定数据行键的数组大小和其中元素的位置。

我们可以用下面的命令来运行程序:

% hbase HBaseTemperaturelmporter input/ncdc/all

对优化的几点说明。

•要特别当心数据导入所引发的“步调一致”的情况。这时所有的客 户端都对同一个表的区域(在单个节点上)进行操作,然后再对下一个区域进行操作,依次进行。这时加载操作并没有均勻地分布在所 有区域上。这通常是由“排序后输入”(sorted input)和切分的原理 共同造成的。如果在插入数据前,针对行的键按数据排列的次序进 行随机处理,可能有助于减少这种情况。在我们的示例中,基于当 前stationid值的分布情况和TextlnputFormat分割数据的方式,上传操作应该足以保证足够的分布式特性。

•每个任务只获得一个HTable实例。实例化HTable是有代价的,所以如果为每个插入操作实例化一个HTable,会对性能造成负面 影响,因此,我们在configure()步骤设置HTable

•在默认情况下,每个HTable.put(put)在进行插入操作时事实上 不使用任何缓存。可以使用HTable.setAutoFlush(false),接 着设置写缓存的大小,以此禁用HTable的自动刷入特性。插入的 数据占满写缓存之后,缓存才会被刷入存储。但要记住,必须在每 个任务的最后手工调用 HTable.flushCommits()或 HTable.close(), 后者会调用HTable.flushCommits(),以确保缓存中最后没有剩下未被刷入的更新。这可以在mapper重载的close()中完成。

• HBase 包含 TablelnputFormat 和 TableOutputFormat它们可 用于把Hbase作为源或目标的MapReduce(参见范例丨3-2)。也可以 像第5章那样使用MaxTemperatureMapper,增加.一个reducer任务 来接收 MaxTemperatureMapper 的输出并通过 TableOutputFormat把结果导入HBase

转载请注明:全栈大数据 » 13.5.2加载数据

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址