spark collect (),当数据量比较大时,卡死怎么解决?

问答 七里芬芳 ⋅ 于 2019-04-22 10:09:30 ⋅ 最后回复由 青牛 2019-04-22 10:37:53 ⋅ 3201 阅读

初学spark,自己尝试写了个矩阵乘法的小程序。
pair1,pair2分别是两个二元组,记录着一组矩阵的值和编号(矩阵数据是从文件逐行读入,
文件格式是每行有一个数值,要生成行主序的矩阵。
本例中从文件读入100行数据(1010矩阵),并逐行依次编号0~99,
对应产生100个pair1,2的二元组。
接下来映射成为行号,列号和值的三元组,进一步map,reduce并行的来算举证乘法。
经测试当矩阵很小时,或者是文件很小,像10
10的是没有问题的和串行的矩阵运算结果一致。
但是当计算10001000,总共有1000000行数据时,collet()和sortByKey()基本是卡死的。
这个测试程序是放在单机上IDEA上来跑。
同样的问题还发现在搭建的集群上运行其他spark程序,数据量大时,
一回收(collect())也出现类似情况(不是内存溢出。
就这个程序,需要怎么改可以计算1000
1000矩阵,我特意测了个时间发现到reduceByKey()都是
很快的,foreach也可以在终端打印出最后一小部分结果(因为数据量比较大)。
可是一继续后面就会卡死,求解决。
val rdd_pairs1 = sc.parallelize(pairs1)
val rdd_pairs2 = sc.parallelize(pairs2)
var beg = System.currentTimeMillis()
val coordinateAndvalue1 = rdd_pairs1.map(a => (a.get_id() / 10, a.get_id() % 10, a.get_value()))
val coordinateAndvalue2 = rdd_pairs2.map(a => (a.get_id() / 10, a.get_id() % 10, a.get_value()))
val k_v1 = coordinateAndvalue1.map(b => (b._2, b._1, b._3))
val cart = k_v1.cartesian(coordinateAndvalue2).filter(x => (x._1._1 == x._2._1))
val mul = cart.map(y => (y._1._2 + "" + y._2._2, y._1._3 * y._2._3))
val add = mul.reduceByKey((x, y) => x + y)
println("time" + (System.currentTimeMillis() - beg))
val sort = add.sortByKey()
val col = sort.collect()
println("half done")
val output_path = "result.txt"
val writer = new FileWriter(new File(output_path) , false)
for (i <- 0 to 100 - 1) {
writer.write(col(i)._1 + " " + col(i)._2 + "\n")
}
writer.flush()
writer.close()
println("Finish")

成为第一个点赞的人吧 :bowtie:
回复数量: 1
  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2019-04-22 10:37:53

    spark本来就很吃内存 你单机的机器多少可用内存?有没有计算过这个矩阵乘法的规模?

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter