关于spark-streaming:SparkSteaming写elasticsearch问题
SparkStreming程序生产kafka数据实时写入ES集群程序报错,以下是dolphinscheduler工作实例的报错日志:查看代码本程序上游输入不止有ES,还有hive分区表,逻辑上先写hive分区表再写ES,先从逻辑上进行业务拆分,将写hive和写ES进行工作拆分,保障写入hive的数据工作失常。拆分当前写ES的日志报Elasticsearch 429申请过多谬误,查看共事的代码发现索引为每天通过脚本定时创立的,然而并未指定分片和正本数等参数,创立index时候指定以下参数: "settings":{"index.refresh_interval":60s,"number_of_shards":7,"number_of_replicas":1}批改后程序每次都能够执行几分钟,然而还是会失败,报错日志如下: [INFO] 2023-03-07 18:23:14.883 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:14 INFO Client: Application report for application_1665743026919_354608 (state: RUNNING)[INFO] 2023-03-07 18:23:15.884 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:15 INFO Client: Application report for application_1665743026919_354608 (state: RUNNING)[INFO] 2023-03-07 18:23:16.885 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:16 INFO Client: Application report for application_1665743026919_354608 (state: RUNNING)[INFO] 2023-03-07 18:23:17.628 - [taskAppId=TASK-278-489615-614198]:[445] - find app id: application_1665743026919_354608[INFO] 2023-03-07 18:23:17.628 - [taskAppId=TASK-278-489615-614198]:[238] - process has exited, execute path:/data/dolphinscheduler/exec/process/6/278/489615/614198, processId:13743 ,exitStatusCode:1 ,processWaitForStatus:true ,processExitValue:1[INFO] 2023-03-07 18:23:17.885 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:17 INFO Client: Application report for application_1665743026919_354608 (state: FINISHED) 23/03/07 18:23:17 INFO Client: client token: N/A diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, udap-ambari092, executor 5): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable at org.elasticsearch.hadoop.rest.RestRepository.getWriteTargetPrimaryShards(RestRepository.java:262) at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:688) at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:636) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:80) at org.elasticsearch.spark.sql.package$SparkDatasetFunctions.saveToEs(package.scala:67)org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable at org.elasticsearch.hadoop.rest.RestRepository.getWriteTargetPrimaryShards(RestRepository.java:262) at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:688) at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:636) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) ... 3 more ApplicationMaster host: 192.18.10.1 ApplicationMaster RPC port: 0 queue: default start time: 1678184482121 final status: FAILED tracking URL: http://179:8088/proxy/application_1665743026919_354608/ user: data Exception in thread "main" org.apache.spark.SparkException: Application application_1665743026919_354608 finished with failed status at org.apache.spark.deploy.yarn.Client.run(Client.scala:1269) at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1627) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:900) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 23/03/07 18:23:17 INFO ShutdownHookManager: Shutdown hook called 23/03/07 18:23:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-efd3f080-2790-4f79-a038-dfd67759952b 23/03/07 18:23:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-b066f7a0-615c-446d-be8b-225a997a5f2d搜寻"please check whether your cluster is stable"异样,并未找到无效的解决方案,在github上的ES源码中大抵能看到此提醒大抵为ES申请资源时候获取的返回值为空时抛出此异样。近程让运维登录到服务器中,查看ES的日志: ...