Spark 插入 Es 能插入 id?

问答 zhangyupeng ⋅ 于 2020-10-21 17:48:19 ⋅ 最后回复由 zhangyupeng 2020-10-21 20:04:40 ⋅ 105 阅读

我在用Spark插入Es时,不想用Es自己生成的ID,但自己插入ID报错。代码如下图
file

val conf: SparkConf = new SparkConf().setAppName("SparkElasticsearch").setMaster("local[*]")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "s1.hadoop")
    conf.set("es.port", "9200")
    conf.set("es.index.auto.create", "false")
    val sc: SparkContext = new SparkContext(conf)

    //插入数据, "_id" -> 6
    val saveRdd: RDD[Map[String, Any]] = sc.parallelize(Array(Map("_id" -> 6, "content" -> "es4444插入数据")))
    saveRdd.saveToEs("news/fulltext")

错误如下图:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/F:/apache-maven-3.5.0/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/F:/apache-maven-3.5.0/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[Stage 0:>                                                         (0 + 0) / 12]20/10/21 17:42:52 ERROR spark.TaskContextImpl: Error in TaskCompletionListener
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [192.168.17.83:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
    at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:222)
    at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:244)
    at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:269)
    at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:121)
    at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
    at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
    at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:123)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:97)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:95)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:95)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
20/10/21 17:42:52 ERROR executor.Executor: Exception in task 11.0 in stage 0.0 (TID 11)
org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [192.168.17.83:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
20/10/21 17:42:52 WARN scheduler.TaskSetManager: Lost task 11.0 in stage 0.0 (TID 11, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [192.168.17.83:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

20/10/21 17:42:52 ERROR scheduler.TaskSetManager: Task 11 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [192.168.17.83:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
回复数量: 1
暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter