整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

关于MapReduce

hadoop 小小明 40℃ 0评论

 

MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但要想写出有用的程序却不太容易。Hadoop可以运行各种语言版本的MapReduce程序。在本文中,我们将看到同一个程序的Java、Ruby、Python和C++语言版本。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心。MapReduce的优势在于处理大规模数据集,所以这里先来看一个数据集。

 

1. 气象数据集

这里要写一个挖掘气象数据的程序。分布在全球各地的很多气象传感器每隔一小时收集气象数据和收集大量日志数据,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用Map/Reduce来分析。

数据格式

数据来自美国国家气候数据中心。这些数据按行并以ASCII格式存储,其中每一行是一条记录。该存储格式支持丰富的气象要素,其中许多要素可以选择性地列入收集范围或其数据所需的存储长度是可变的。为了简单起见,我们重点讨论一些基本要素(比如气温),这些要素始终都有而且长度都是固定的。范例2-1显示了一行采样数据,其中重要字段被突出显示。该行数据被分成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。

范例2-1.国家气候数据中心数据记录的格式

0057
332130 #USAFweatherstationidentifier
99999 #WBANweatherstationidentifier
19500101 #observationdate
0300 #observationtime
4
+51317 #latitude(degreesx1000)
+028783 #longitude(degreesx1000)
FM-12
+0171 #elevation(meters)
99999
V020
320 #winddirection(degrees)
1 #qualitycode
N
0072
1
00450 #skyceilingheight(meters)
1 #qualitycode
C
N
010000 #visibilitydistance(meters)
1 #qualitycode
N
9
-0128 #airtemperature(degreesCelsiusx10)
1 #qualitycode
-0139 #dewpointtemperature(degreesCelsiusx10)
1 #qualitycode
10268 #atmosphericpressure(hectopascals x10)
1 #qualitycode

 

数据文件按照日期和气象站进行组织。从1901年到2001年,每一年都有一个目录,每一个目录中包含各个气象站该年气象数据的打包文件及其说明文件。例如,1999年对应文件夹下面就包含下面的记录:

%Israw/1990|head

010010-99999-1990.gz

010014-99999-1990.gz

010015-99999-1990.gz

010016-99999-1990.gz

010017-99999-1990.gz

010030-99999-1990.gz

010040-99999-1990.gz

010080-99999-1990.gz

010100-99999-1990.gz

010150-99999-1990.gz

因为有成千上万个气象台,所以整个数据集由大量的小文件组成。通常情况下,处理少量的大型文件更容易、更有效,因此,这些数据需要经过预处理,将每年的数据文件拼接成一个单独的文件。

 

2. 使用Unix工具来分析数据

在这个数据集中,每年全球气温的最高记录是多少?我们先不使用Hadoop来解决这个问题,因为只有提供了性能基准和结果检査工具,才能和Hadoop进行有效对比。

传统处理按行存储数据的工具是awk。范例2-2是一个程序脚本,用于计算每年的最高气温。

范例2-2.该程序从NCDC气象记录中找出每年最高气温

#!/usr/bin/envbash

foryearinall/*

do

echo-ne’basename$year.gz'”\t”

gunzip-c$year|\

awk'{temp=substr($0,88,5)+0;

q=substr($0,93,1);

if(temp!=9999&&q-/[01459]/&&temp>max)max=temp}

END{printmax}’

done

 

 

这个脚本循环遍历按年压缩的数据文件,首先显示年份,然后使用awk处理每一个文件。awk从数据中提取两个字段:气温和质量代码。气温值加0后转换为整数。接着测试气温值是否有效(用9999替代NCDC数据集中的缺失的值),通过质量代码来检测读取的数值是否有疑问或错误。如果数据读取正确,那么该值将与目前读取到的最大气温值进行比较,如果该值比原先的最大值大,就替换目前的最大值。处理完文件中所有的行后,再执行END块中的代码并在屏幕上输出最大气温值。

 

下面是某次运行结果的起始部分:

%./max_temperature.sh

1901 317

1902 244

1903 289

1904 256

1905 283

由于源文件中的气温值被放大10倍,所以1901年的最高气温是31.7oC(20世纪初记录的气温数据比较少,所以这个结果也是可能的)。使用亚马逊的EC2High-CPUExtraLargeInstance运行这个程序,只需要42分钟就可以处理完一个世纪的气象数据,找出最高气温。

为了加快处理速度,我们需要并行处理程序来进行数据分析。从理论上讲,这很简单:我们可以使用计算机上所有可用的硬件线程(hardwarethread)来处理,每个线程负责处理不同年份的数据。但这样做仍然存在一些问题。

首先,将任务划分成大小相同的作业通常并不是一件容易的事情。在我们这个例子中,不同年份数据文件的大小差异很大,所以有一部分线程会比其他线程更早结束运行。即使可以再为它们分配下一个作业,但总的运行时间仍然取决于处理最长文件所需要的时间。另一种更好的方法是将输入数据分成固定大小的块(chunk),然后每块分到各个进程去执行,这样一来,即使有一些进程可以处理更多数据,我们也可以为它们分配更多的数据。

其次,合并各个独立进程的运行结果,可能还需要额外进行处理。在我们的例子中,每年的结果独立于其他年份,所以可能需要把所有结果拼接起来,然后再按年份进行排序。如果使用固定块大小的方法,则需要一种精巧的方法来合并结果。在这个例子中,某年的数据通常被分割成几个块,每个块独立处理。我们最终获得每个块的最高气温,所以最后一步找出最大值作为该年的最高气温,其他年份的数据都像这样处理。

最后,还是得受限于单台计算机的处理能力。即便开足马力,用上所有处理器,至少也得花20分钟,系统无法更快了。另外,某些数据集的增长可能会超出单台计算机的处理能力。一旦开始使用多台计算机,整个大环境中的其他因素就会互相影响,最主要的两个因素是协调性和可靠性。哪个进程负责运行整个作业?我们如何处理失败的进程?

因此,虽然并行处理是可行的,不过实际上也很麻烦。使用Hadoop这样的框架来解决这些问题很有帮助。

 

 

3. 使用Hadoop来分析数据

为了充分利用Hadoop提供的并行处理优势,我们需要将查询表示成MapReduce作业。完成某种本地端的小规模测试之后,就可以把作业部署到在集群上运行。

3.1. map和reduce

MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员还需要写两个函数:map函数和reduce函数。

map阶段的输入是NCDC原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。

我们的map函数很简单。由于我们只对年份和气温属性感兴趣,所以只需要取出这两个字段数据。在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,使reducer函数能够继续对它进行处理:即找出每年的最高气温。map函数还是一个比较适合去除已损记录的地方:此处,我们筛掉缺失的、可疑的或错误的气温数据。

为了全面了解map的工作方式,我们考虑以下输入数据的示例数据(去除了一些未使用的列,并用省略号表):

0067011990999991950051507004…9999999N9+00001+99999999999…

0043011990999991950051512004…9999999N9+00221+99999999999…

0043011990999991950051518004…9999999N9-00111+99999999999…

0043012650999991949032412804…0500001N9+01111+99999999999…

0043012650999991949032418004…0500001N9+00781+99999999999…

这些行以键/值对的方式作为map函数的输入:

(0,0067011990999991950051507004…9999999N9+00001+99999999999…)

(106,0043011990999991950051512004…9999999N9+00221+99999999999…)

(212,0043011990999991950051518004…9999999N9-00111+99999999999…)

(318,004B012650999991949032412004…0500001N9+01111+99999999999…)

(424,0043012650999991949032418004…0500001N9+00781+99999999999…)

 

键(key)是文件中的行偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):

(1950,0)

(1950,22)

(1950,-11)

(1949,111)

(1949,78)

map函数的输出经由MapReduce框架处理后,最后发送到reduce函数。这个处理过程基于键来对键值对进行排序和分组。reduce函数看到的是如下输入:

(1949,[111,78])

(1950,[0,22,-11])

每一年份后紧跟着一系列气温数据。reduce适数现在要做的是遍历整个列表并从中找出最大的读数:

(1949,111)

(1950,22)

这是最终输出结果:每一年的全球最髙气温记录。

整个数据流如图2-1所示。在图的底部是Unix管线,用于模拟整个MapReduce的流程,部分内容将在讨论HadoopStreaming时再次涉及。

图2-1.MapReduce的逻辑数据流

 

3.2. JavaMapReduce

明白MapReduce程序的工作原理之后,下一步就是写代码实现它。我们需要三样东西:一个map函数、一个reduce函数和一些用来运行作业的代码。map函数由Mapper类实现来表示,后者声明一个map()虚方法。范例2-3显示了我们的map函数实现。

范例2-3.查找最高气温的Mapper类

importjava.io.IOException;

 

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Mapper;

 

publicclassMaxTempenatureMapperextendsMapper<LongWritable,Text,Text,IntWritable>{

 

privatestaticfinalintMISSING=9999;

 

@Override

publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{

Stringline=value.toString();

Stringyear=line.substring(15,19);

intairTemperature;

if(line.charAt(87)==’+’){//parselntdoesn’tlikeleadingplussigns

airTemperature=Integer.parseInt(line.substring(88,92));

}else{

airTemperature=Integer.parseInt(line.substring(87,92));

}

Stringquality=line.substring(92,93);

if(airTemperature!=MISSING&&quality.matches(“[01459]”)){

context.write(newText(year),newIntWritable(airTemperature));

}

}

}

 

这个Mapper类是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。就现在这个例子来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中。这里使用LongWritable类型(相当于Java的Long类型)、Text类型(相当于]ava中的String类型)和IntWritable类型(相当于Java的Integer类型)。

map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。

map()方法还提供了Context实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写(因为我们把年份当作键),将气温值封装在IntWritable类型中。只有气温数据不缺并且所对应质量代码显示为正确的气温读数时,这些数据才会被写入输出记录中。

以类似方法用Reducer来定义reduce函数,如范例2-4所示。

范例2-4.查找最高气温的Reducer类

importjava.io.IOException;

 

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Reducer;

 

publicclassMaxTemperatureReducerextendsReducer<Text,IntWritable,Text,IntWritable>{

@Override

publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)

throwsIOException,InterruptedException{

intmaxValue=Integer.MIN_VALUE;

for(IntWritablevalue:values){

maxValue=Math.max(maxValue,value.get());

}

context.write(key,newIntWritable(maxValue));

}

}

 

同样,reduce函数也有四个形式参数类型用于指定输入和输出类型。reduce函数的输入类型必须匹配map函数的输出类型:即Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable类型,分别输出年份及其最高气温。这个最高气温是通过循环比较每个气温与当前所知最髙气温所得到的。

第三部分代码负责运行MapReduce作业(请参见范例2-5)。

范例2-5.这个应用程序在气象数据集中找出最高气温

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

publicclassMaxTemperature{

publicstaticvoidmain(String[]args)throwsException{

if(args.length!=2){

System.err.println(“Usage:MaxTemperature〈inputpath>〈outputpath〉”);

System.exit(-1);

}

Jobjob=Job.getInstance();

job.setJarByClass(MaxTemperature.class);

job.setJobName(“Maxtemperature”);

FileInputFormat.addInputPath(job,newPath(args[0]));

FileOutputFormat.setOutputPath(job,newPath(args[1]));

job.setMapperClass(MaxTempenatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?0:1);

}

}

 

Job对象指定作业执行规范。我们可以用它来控制整个作业的运行。我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。不必明确指定JAR文件的名称,在Job对象的setJarByClass()方法中传递一个类即可,Hadoop利用这个类来査找包含它的JAR文件,进而找到相关的JAR文件。

构造Job对象之后,需要指定输入和输出数据的路径。调用FilelnputFormat类的静态方法addInputPath()来定义输人数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用addInputPath()来实现多路径的输入。

调用FileOutputFormat类中的静态方法setOutputPath()来指定输出路径(只能有一个输出路径)。这个方法指定的是reduce函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)。

接着,通过setMapperClass()和setReducerClass()指定map类型和reduce类型。

setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型一般都是相同的。如果不同,则通过setMapOutputKeyClass()和setMapOutputValueClass()来设置map函数的输出类型。

输入的类型通过InputFonnat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。

在设置定义map和reduce函数的类之后,可以开始运行作业。Job中的waitForCompletion()方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。

waitForCompletion()方法返回一个布尔值,表示执行的成(true)败(false),这个布尔值被转换成程序的退出代码0或者1。

 

 

3.3. 数据流

首先定义一些术语。MapReduce作业(Job)是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务(task)来执行,其中包括两类任务:map任务和reduce任务。

有两类节点控制着作业执行过程:一个jobtracker及一系列tasktracker。jobtracker通过调度tasktracker上运行的任务•来协调所有运行在系统上的作业。tasktracker在运行任务的同时将运行进度报告发送给jobtracker,jobtracker由此记录每项作业任务的整体进度情况。如果其中一个任务失败,jobtracker可以在另外一个tasktracker节点上重新调度该任务。

Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(inputsplit)或简称“分片”。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条记录。

拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载平衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,失败的进程或其他同时运行的作业能够实现满意的负载平衡,并且如果分片被切分得更细,负载平衡的会更高。

另一方面,如果分片切分得太小,那么管理分片的总时间和构建map任务的总时间将决定作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块的大小,默认是64MB,不过可以针对集群调整这个默认值(对新建的所有文件),或对新建的每个文件具体指定。

Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得最佳性能。这就是所谓的“数据本地化优化”(datalocalityoptimization),因为它无需使用宝贵的集群带宽资源。但是,有时对于一个map任务的输入来说,存储有某个HDFS数据块备份的三个节点可能正在运行其他map任务,此时作业调度需要在三个备份中的某个数据寻求同个

机架中空闲的机器来运行该map任务。仅仅在非常偶然的情况下(该情况基本上不会发生),会使用其他机架中的机器运行该map任务,这将导致机架与机架之间的网络传输。图2-2显示了这三种可能性。

图2-2.本地数据(a)、本地机架(b)和跨机架(c)map任务

现在我们应该清楚为什么最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。与使用本地数据运行整个map任务相比,这种方法显然效率更低。

map任务将其输出写入本地硬盘,而非HDFS。这是为什么?因为map的输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果就可以删除。因此,如果把它存储在HDFS中并实现备份,难免有些小题大做。如果该节点上运行的map任务在将map中间结果传送给reduce任务之前失败,Hadoop将在另一个节点上重新运行这个map任务以再次构建map中间结果。

reduce任务并不具备数据本地化的优势——单个reduce任务的输入通常来自于所有mapper的输出。在本例中,我们仅有一个reduce任务,其输入是所有map任务的输出。因此,排过序的map输出需通过网络传输发送到运行reduce任务的节点。数据在reduce端合并,然后由用户定义的reduce函数处理。reduce的输出通常存储在HDFS中以实现可靠存储。如第3章所述,对于每个reduce输出的HDFS块,第一个复本存储在本地节点上,其他复本存储在其他机架节点中。因此,将reduce的输出写入HDFS确实

需要占用网络带宽,但这与正常的HDFS流水线写入的消耗一样。

一个reduce任务的完整数据流如图2-3所示。虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示不同节点之间的数据传输。

reduce任务的数量并非由输入数据的大小决定,而事实上是独立指定的。

如果有好多个reduce任务,每个map任务就会针对输出进行分区(partition),即为每个reduce任务建一个分区。每个分区有许多键(及其对应的值),但每个键对应的键/值对记录都在同一分区中。分区由用户定义的partition函数控制,但通常用默认的partitioner通过哈希函数来分区,很高效。

 

 

 

 

 

 

 

 

 

 

 

转载请注明:全栈大数据 » 关于MapReduce

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址