初学spark,自己尝试写了个矩阵乘法的小程序。
pair1,pair2分别是两个二元组,记录着一组矩阵的值和编号(矩阵数据是从文件逐行读入,
文件格式是每行有一个数值,要生成行主序的矩阵。
本例中从文件读入100行数据(1010矩阵),并逐行依次编号0~99,
对应产生100个pair1,2的二元组。
接下来映射成为行号,列号和值的三元组,进一步map,reduce并行的来算举证乘法。
经测试当矩阵很小时,或者是文件很小,像1010的是没有问题的和串行的矩阵运算结果一致。
但是当计算10001000,总共有1000000行数据时,collet()和sortByKey()基本是卡死的。
这个测试程序是放在单机上IDEA上来跑。
同样的问题还发现在搭建的集群上运行其他spark程序,数据量大时,
一回收(collect())也出现类似情况(不是内存溢出。
就这个程序,需要怎么改可以计算10001000矩阵,我特意测了个时间发现到reduceByKey()都是
很快的,foreach也可以在终端打印出最后一小部分结果(因为数据量比较大)。
可是一继续后面就会卡死,求解决。
val rdd_pairs1 = sc.parallelize(pairs1)
val rdd_pairs2 = sc.parallelize(pairs2)
var beg = System.currentTimeMillis()
val coordinateAndvalue1 = rdd_pairs1.map(a => (a.get_id() / 10, a.get_id() % 10, a.get_value()))
val coordinateAndvalue2 = rdd_pairs2.map(a => (a.get_id() / 10, a.get_id() % 10, a.get_value()))
val k_v1 = coordinateAndvalue1.map(b => (b._2, b._1, b._3))
val cart = k_v1.cartesian(coordinateAndvalue2).filter(x => (x._1._1 == x._2._1))
val mul = cart.map(y => (y._1._2 + "" + y._2._2, y._1._3 * y._2._3))
val add = mul.reduceByKey((x, y) => x + y)
println("time" + (System.currentTimeMillis() - beg))
val sort = add.sortByKey()
val col = sort.collect()
println("half done")
val output_path = "result.txt"
val writer = new FileWriter(new File(output_path) , false)
for (i <- 0 to 100 - 1) {
writer.write(col(i)._1 + " " + col(i)._2 + "\n")
}
writer.flush()
writer.close()
println("Finish")