整套大数据学习资料(视频+笔记)百度网盘无门槛下载:http://www.edu360.cn/news/content?id=3377
问答中心分类: 常见问题---同一个SparkStreaming任务中同时处理多个topic消息,每个topic中处理逻辑都不同,如何高效实现
asked 1月 ago

下面这段代码是我写的,有没有性能问题,如何优化:
//开始计算
topicAndMessages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// 先处理消息
processRdd(rdd)

// 再更新offsets
km.updateZKOffsets(rdd)

} else {
println(“没有数据,不需要处理,topoc:” + topics)
}
})

——————————————————-
def processRdd(topicAndMessages: RDD[(String, String)]): Unit = {
//按照topic分组
val groupedTopicAndMessages: RDD[(String, Iterable[String])] = topicAndMessages.groupByKey()
//groupedTopicAndMessages.repartition(10)
//groupedTopicAndMessages.cache()
groupedTopicAndMessages.map(tp =>{
processByTopic(tp._1, tp._2)
})
}

———————————————-
def processByTopic(topic: String, messages: Iterable[String]): Unit = {
topic match {
//满意度
case “satisfactionLog” => SatisfactionHandler.processMessage(messages)
case _ => println(“未知的topic不处理,topic:” + topic)
}
}

转载请注明:全栈大数据 » 同一个SparkStreaming任务中同时处理多个topic消息,每个topic中处理逻辑都不同,如何高效实现

1 个回答
zuiw 用户 回答于 4周 以前

可以在processRdd(rdd)之前将topic的名字取出,用case进行模式匹配,不用的topic用不同的业务逻辑处理
生成环境不建议一个SparkStreaming程序读取多个不相关的topic,如果确实需要,可以创建多个SparkStreaming任务
可以在processRdd(rdd)之前将topic的名字取出,用case进行模式匹配,不同的topic用不同的业务逻辑处理
 

转载请注明:全栈大数据 » 回答给同一个SparkStreaming任务中同时处理多个topic消息,每个topic中处理逻辑都不同,如何高效实现