整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377

16.1.4 Track Statistics 程序

hadoop 花牛 13℃ 0评论

音乐收听信息被发送到Last.fm时,会经历验证和转换阶段,最终结果是一系列由空格分隔的文本文件,包含的信息有用户ID(userld)、曲目 ID(trackld)、收藏的次数(Scrobble)、收听的次数(Radio)以及被跳过的次数 (Skip)。表16-1包含一些采样的收听数据,后面介绍的例子将用到这些数 据,它是Track Statistics程序的输入(真实数据达GB数量级,并且具有 更多的属性字段,为了方便介绍,这里省略了其他字段)

表16-1.收听数据

UserId

TrackId

Scrobble

Radio

Skip

111115

222

0

1

0

111113

225

1

0

0

111117

223

0

1

1

111115

225

1

0

0

这些文本文件作为初始输入提供给Track Statistics程序,该程序包括利用这个输入数据计算各种数据值的两个作业和一个用来合并结果的作业(参见图 16-2)

image.png 

图 16-2. TrackStats 作业

Unique Listeners作业模块统计收听同一首音频曲目的不同用户数,实现手段是累计不同用户对该音频文件的第一次访问而忽略同一用户对这一文件的多次访问。Sum作业模块通过对所有用户的所有收听信息进行累加计数 来为每个音频曲目统计收听总数、收藏总数、电台收听总数以及被跳过的总数。

尽管这两个作业模块的输入格式是相同的,我们仍然需要两个作业模块, 因为Unique Listeners作业模块负责为每个用户对每个音频曲目产生统计值,而Sum作业模块为每个音频产生统计值。最后Merge作业模块负责合 并由这两个模块产生的中间输出数据得到最终统计结果。运行这段程序的 最终结果是对每个音频曲目产生以下几项数值:

    •不同的听众数 

    •音频曲目的收藏次数 

    •音频曲目在电台中的点播次数 

    •音频曲目在电台中被收听的总次数 

    •音频曲目在电台广播中被跳过的次数

下面我们将详细介绍每个作业模块和它的MapReduce过程。请注意,由于 篇幅有限,所提供的代码段已被简化。要想下载完整的代码,请参考本书“前言”。

1.计算不同的听众数

Unique Listeners作业模块用于计算每个音频曲目的不同收听用户数。

UniqueListenerMaper UniqueListenerMaper 程序处理用空格分隔的原始收听数据,然后输出user ID(用户ID), track ID(音频ID)对。:

public void map(LongWritable position, Text rawLine, OutputCollector<IntWritable, 
IntWritable> output, Reporter reporter) throws IOException {
String[] parts = (rawLine.toString()).split(""); 
    int scrobbles =
Integer.parselnt(parts[TrackStatistiesProgram.COL_SCROBBLES]); 
    int radioListens =
Integer.parselnt(parts[TrackStatisticsProgram.COL_RADIO]);
// if track somehow is marked with zero plays - ignore 
if (scrobbles <= 0 && radioListens <= 0) { 
return;
}
// if we get to here then user has listened to track,
// so output user id against track id 
    IntWritable trackld = new IntWritable(
Integer.parselnt(parts[TrackStatisticsProgram.COL_TRACKID])); 
    IntWritable userid = new IntWritable(
Integer.parselnt(parts[TrackStatisticsProgram.COL_USERID])); 
output.collect(trackld, userid);
}

UniqueListenersReducer UniqueListener'sReducer 接收到每个track ID 对应的user ID数据列表之后,把这个列表放入集合Set中以消除重复的用户HD。然后输出每个track ID对应的这个集合的大小(不同用户数)。但是如果某个键对应的值太多,在set中存储所有的reduce值可能会有内存溢出的危险。实际上还没有出现过远种问题,但是为了避免这一问题,我们可以引入一个额外的MapReduce处理步骤或使用辅助排序的方法来删除重复数据(详细内容请参考8.2.4)

public void reduce(IntWritable trackld, Iterator<IntWritable> values, 
OutputCollector<IntWritable, IntWritable〉 output, Reporter reporter) 
throws IOException {
    Set<Integer> userids = new HashSet<Integer>();
    // add all userids to the set, duplicates automatically removed (set contract) 
    while (values.hasNext()) {
        IntWritable userid = values.next(); 
        userids.add(Integer.valueOf(userid.get()));
    }
    //output tnackld -> number of unique listeners per track 
    output.collect(trackld, new IntWritable(userIds.size()));
}

表16-2是这一作业模块的输入数据样本。map输出结果如表16-3所示, reduce输出结果如表16-4所示。

表16-2. 作业的输入

Line of file

LongWritable

UserId

IntWriable

TrackId

IntWriteable

Scrobbled

Boolean

Radio play

Boolean

Skip

Boolean

0

11115

222

0

1

0

1

11113

225

1

0

0

2

11117

223

0

1

1

3

11115

225

1

0

0

表16-3. map输出

TrackId

UserId

IntWritable

IntWritable

222

11115

225

11113

223

11117

225

11115

2.统计音频播放总数

Sum作业相对简单,它只为每个音频曲目累计我们感兴趣的数据。

SumMapper    输入数据仍然是原始文本文件,但是这一阶段对输入数据的 处理完全不同。期望的输出结果是针对每个音轨的一系列累计值(不同用户 数、播放次数、收藏次数、电台收听次数和跳过次数)。为了方便处理,我 们使用一个由Hadoop Record I/O类产生的TrackStats类中间对象来保存这些数据,它实现了 WritableComparable接口(因此可被用作输出)。 Mapper创建一个TrackStats对象,并根据文件中的每一行数据设置相应的属性值,但是“不同的用户数”(unique listener count)这一项没有填写(这 项数据由merge作业填写)

public void map(LongWritable position, Text rawLine,
OutputCollector<IntWritableJ TrackStats> output, Reporter reporter) 
throws IOException {
    String[] parts = (rawLine.toString()).split("");
    int trackld = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]); 
    int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); 
    int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL^RADIO]); 
    int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]);
    // set number of listeners to 0 (this is calculated later)
    // and other values as provided in text file
    TrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip); 
    output.collect(new IntWritable(trackld), trackstat);
}

SumReducer   在本实例中,reducer执行和mapper相似的函数,对每个音 频曲目的播放情况进行统计,然后返回一个总的统计数据:

public void reduce(IntWritable trackld, Iterator<TrackStats> values, 
OutputCollector<IntWritableJ TrackStats> output, Reporter reporter) 
throws IOException {
    TrackStats sum = new TrackStats(); // holds the totals for this track 
    while (values.hasNext()) {
        TrackStats trackStats = (TrackStats) values.next(); 
        sum.setListeners(sum.getListeners() + trackStats.getListeners()); 
        sum.setPlays(sum.getPlays() + trackStats.getPlays()); 
        sum.setSkips(sum.getSkips() + trackStats.getSkips()); 
        sum.setScrobbles(sum.getScrobbles() + trackStats.getScrobbles()); 
        sum.setRadioPlays(sum.getRadioPlays() + trackStats.getRadioPlays());
    }
    output.collect(trackld, sum);
}

表16-5是这部分作业的输入数据(Unique Listener作业的输入一样)map 的输出结果如表16-6所示,reduce的输出结果如表16-7所示。

表16-5. 作业输入 

Line

UserId

TrackId

Scrobbled

Radio play

Skip

LongWritable

IntWritable

IntWritable

Boolean

Boolean

Boolean

0

11115

222

0

1

0

1

11113

225

1

0

0

2

11117

223

0

1

1

3

11115

225

1

0

0

表16-6. map输出

 

TrackId

#listeners

#plays

#scrobbles

#Radio play

#Skips

LongWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

0

1

0

1

0

225

0

1

1

0

0

223

0

1

0

1

1

225

0

1

1

0

0

表16-7. reduce输出

TrackId

#listeners

#plays

#scrobbles

#radio plays

#skips

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

0

1

0

1

0

225

0

2

2

0

0

223

0

1

0

1

1

3.合并结果 

最后一个作业模块需要合并前面两个作业产生的输出数据:每个音频曲目对应的不同用户数和每个音频的使用统计信息。为了能够合并这两种不同的输人数据,我们采用了两个不同的mapper(对每一种输入定义一个)。两个中间作业被配置之后可以把它们的输出结果写入路径不同的文件, Multiplelnputs类用于指定mapper和文件的对应关系。下面的代码展示了如何设置作业的;JobConf类来完成这一过程:

MultipieInputs.addInputPath(conf, sumlnputDir,
    SequenceFilelnputFormat.class,IdentityMapper.class);
Multiplelnputs.addlnputPath(conf, listenerslnputDir,
    SequenceFileInputFormat.class, MergeListenersMapper.class);

虽然单用一个mapper也能处理不同的输入,但是示范解决方案更方便、更巧妙。

MergeListenersMapper 这个 mapper 用来处理 UniqueListenerDob 输出的每个音频曲目对应的不同用户数据。它采用和SumMapper'类似的方法创建TrackStats对象,但这次它只填写每个音频曲目对应的不同用户数信 息,不管其他字段:

public void map(IntWritable trackld, IntWritable uniqueListenerCount, 
OutputCollector<IntWritable, TrackStats) output, Reporter reporter) 
throws IOException {
    TrackStats trackStats = new TrackStats(); 
    trackStats.setListeners(uniqueListenerCount.get()); 
    output.collect(trackId, trackStats);
}

表16-8是mapper的一些输入数据,表16-9是对应的输出结果。

表 16-8. MergeListenersMapper 的输入

TrakId

#listeners

IntWritable

IntWritable

222

1

225

2

223

1

表 16-9. MergeListenersMapper 的输出

TrakId

#listeners

#plays

#scrobbles

#radio

#skips

222

0

0

0

0

0

225

0

0

0

0

0

223

0

0

0

0

0

IdentityMapper被配置为用来处理SurtOob输出的TrackStats对象,因为不要求对数据进行其他处理,所以它直接输出输入数据(参见表16-10)

表16-10. IdentityMapper的输入和输出

TrackId

#listeners

#plays

#scrobbles

#radio plays

#skips

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

0

1

0

1

0

225

0

2

2

0

0

223

0

1

0

1

1

前面两个mapper产生同一类型的数据:每个音频曲目对应的TrackStats 对象,只是数据赋值不同。最后的reduce阶段能够重用前面描述的 SumReducer来为每个音频曲目创建一个TrackStats对象,它综合前面两 个TrackStats对象的值,然后输出结果(参见表16-11)

最终输出文件被收集后复制到服务器端,Last.fm网站调用一个Web服务得 到并展示这些数据。如图16-3所示,这个网页展示了一个音频曲目的使用 统计信息:收听者总数和播放总次数。

表16-11. SumReducer的最终输出

TrackId

#listeners

#plays

#scrobbles

#radio plays

#skips

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

IntWritable

222

1

1

0

1

0

225

2

2

2

0

0

223

1

1

0

1

1

image.png

图 16-3. TrackStats 结果 

转载请注明:全栈大数据 » 16.1.4 Track Statistics 程序

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址