整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377
问答中心分类: 常见问题---同一个SparkStreaming任务中同时处理多个topic消息,每个topic中处理逻辑都不同,如何高效实现
asked 1月 ago

下面这段代码是我写的,有没有性能问题,如何优化:
//开始计算
topicAndMessages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// 先处理消息
processRdd(rdd)

// 再更新offsets
km.updateZKOffsets(rdd)

} else {
println(“没有数据,不需要处理,topoc:” + topics)
}
})

——————————————————-
def processRdd(topicAndMessages: RDD[(String, String)]): Unit = {
//按照topic分组
val groupedTopicAndMessages: RDD[(String, Iterable[String])] = topicAndMessages.groupByKey()
//groupedTopicAndMessages.repartition(10)
//groupedTopicAndMessages.cache()
groupedTopicAndMessages.map(tp =>{
processByTopic(tp._1, tp._2)
})
}

———————————————-
def processByTopic(topic: String, messages: Iterable[String]): Unit = {
topic match {
//满意度
case “satisfactionLog” => SatisfactionHandler.processMessage(messages)
case _ => println(“未知的topic不处理,topic:” + topic)
}
}

转载请注明:全栈大数据 » 同一个SparkStreaming任务中同时处理多个topic消息,每个topic中处理逻辑都不同,如何高效实现