@青牛 不是,是hdfs上的数据,代码如下:
def main(args:Array[String]){
val localclusterURL="local[2]"
val clusterMasterURL="spark://master:7077"
val conf=new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
val sc =new SparkContext(conf)
val sqlContext=new SQLContext(sc)
val hc =new HiveContext(sc)
import sqlContext.implicits._
//设置RDDdepartions的数量一般以集群分配给应用的cpu核数的整数被为宜
val minPartitions= 8
//links
val links = sc.textFile("data/links.txt", minPartitions).filter {!_.endsWith(",")}
.map(_.split(","))
.map(x => Links(x(0).trim.toInt, x(1).trim.toInt, x(2).trim().toInt))
.toDF()
links.write.mode(SaveMode.Overwrite).parquet("/tmp/links");
hc.sql("drop table if exists links")
hc.sql("create table if not exists links(movieId int,ImdbId int,tmdbId int ) stored as parquet")
hc.sql("load data inpath '/tmp/links' overwrite into table links")