关于spark-streaming:SparkSteaming写elasticsearch问题

SparkStreming程序生产kafka数据实时写入ES集群程序报错,以下是dolphinscheduler工作实例的报错日志:查看代码本程序上游输入不止有ES,还有hive分区表,逻辑上先写hive分区表再写ES,先从逻辑上进行业务拆分,将写hive和写ES进行工作拆分,保障写入hive的数据工作失常。拆分当前写ES的日志报Elasticsearch 429申请过多谬误,查看共事的代码发现索引为每天通过脚本定时创立的,然而并未指定分片和正本数等参数,创立index时候指定以下参数: "settings":{"index.refresh_interval":60s,"number_of_shards":7,"number_of_replicas":1}批改后程序每次都能够执行几分钟,然而还是会失败,报错日志如下: [INFO] 2023-03-07 18:23:14.883 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:14 INFO Client: Application report for application_1665743026919_354608 (state: RUNNING)[INFO] 2023-03-07 18:23:15.884 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:15 INFO Client: Application report for application_1665743026919_354608 (state: RUNNING)[INFO] 2023-03-07 18:23:16.885 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:16 INFO Client: Application report for application_1665743026919_354608 (state: RUNNING)[INFO] 2023-03-07 18:23:17.628 - [taskAppId=TASK-278-489615-614198]:[445] - find app id: application_1665743026919_354608[INFO] 2023-03-07 18:23:17.628 - [taskAppId=TASK-278-489615-614198]:[238] - process has exited, execute path:/data/dolphinscheduler/exec/process/6/278/489615/614198, processId:13743 ,exitStatusCode:1 ,processWaitForStatus:true ,processExitValue:1[INFO] 2023-03-07 18:23:17.885 - [taskAppId=TASK-278-489615-614198]:[138] - -> 23/03/07 18:23:17 INFO Client: Application report for application_1665743026919_354608 (state: FINISHED) 23/03/07 18:23:17 INFO Client: client token: N/A diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, udap-ambari092, executor 5): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable at org.elasticsearch.hadoop.rest.RestRepository.getWriteTargetPrimaryShards(RestRepository.java:262) at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:688) at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:636) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:80) at org.elasticsearch.spark.sql.package$SparkDatasetFunctions.saveToEs(package.scala:67)org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable at org.elasticsearch.hadoop.rest.RestRepository.getWriteTargetPrimaryShards(RestRepository.java:262) at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:688) at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:636) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) ... 3 more ApplicationMaster host: 192.18.10.1 ApplicationMaster RPC port: 0 queue: default start time: 1678184482121 final status: FAILED tracking URL: http://179:8088/proxy/application_1665743026919_354608/ user: data Exception in thread "main" org.apache.spark.SparkException: Application application_1665743026919_354608 finished with failed status at org.apache.spark.deploy.yarn.Client.run(Client.scala:1269) at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1627) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:900) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 23/03/07 18:23:17 INFO ShutdownHookManager: Shutdown hook called 23/03/07 18:23:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-efd3f080-2790-4f79-a038-dfd67759952b 23/03/07 18:23:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-b066f7a0-615c-446d-be8b-225a997a5f2d搜寻"please check whether your cluster is stable"异样,并未找到无效的解决方案,在github上的ES源码中大抵能看到此提醒大抵为ES申请资源时候获取的返回值为空时抛出此异样。近程让运维登录到服务器中,查看ES的日志: ...

March 13, 2023 · 3 min · jiezi

关于spark-streaming:第十篇SparkStreaming手动维护Kafka-Offset的几种方式

Spark Streaming No Receivers 形式的createDirectStream 办法不应用接收器,而是创立输出流间接从Kafka 集群节点拉取音讯。输出流保障每个音讯从Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据(即Exactly Once语义),单纯的依附SparkStreaming自身的机制是不太现实的,生产环境中通常借助手动治理offset的形式来保护kafka的生产位点。本文分享将介绍如何手动治理Kafka的Offset,心愿对你有所帮忙。本文次要包含以下内容: 如何应用MySQL治理Kafka的Offset如何应用Redis治理Kafka的OffSet如何应用MySQL治理Kafka的Offset咱们能够从Spark Streaming 应用程序中编写代码来手动治理Kafka偏移量,偏移量能够从每一批流解决中生成的RDDS偏移量来获取,获取形式为: KafkaUtils.createDirectStream(...).foreachRDD { rdd =>// 获取偏移量val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges ... }当获取到偏移量之后,能够将将其保留到内部存储设备中(MySQL、Redis、Zookeeper、HBase等)。 应用案例代码MySQL中用于保留偏移量的表CREATE TABLE `topic_par_group_offset` ( `topic` varchar(255) NOT NULL, `partition` int(11) NOT NULL, `groupid` varchar(255) NOT NULL, `offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`topic`,`partition`,`groupid`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;常量配置类:ConfigConstantsobject ConfigConstants { // Kafka配置 val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092" val groupId = "group_test" val kafkaTopics = "test" val batchInterval = Seconds(5) val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer" val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer" val sparkSerializer = "org.apache.spark.serializer.KryoSerializer" val batchSize = 16384 val lingerMs = 1 val bufferMemory = 33554432 // MySQL配置 val user = "root" val password = "123qwe" val url = "jdbc:mysql://localhost:3306/kafka_offset" val driver = "com.mysql.jdbc.Driver" // 检查点配置 val checkpointDir = "file:///e:/checkpoint" val checkpointInterval = Seconds(10) // Redis配置 val redisAddress = "192.168.10.203" val redisPort = 6379 val redisAuth = "123qwe" val redisTimeout = 3000}JDBC连贯工具类:JDBCConnPoolobject JDBCConnPool { val log: Logger = Logger.getLogger(JDBCConnPool.getClass) var dataSource: BasicDataSource = null /** * 创立数据源 * * @return */ def getDataSource(): BasicDataSource = { if (dataSource == null) { dataSource = new BasicDataSource() dataSource.setDriverClassName(ConfigConstants.driver) dataSource.setUrl(ConfigConstants.url) dataSource.setUsername(ConfigConstants.user) dataSource.setPassword(ConfigConstants.password) dataSource.setMaxTotal(50) dataSource.setInitialSize(3) dataSource.setMinIdle(3) dataSource.setMaxIdle(10) dataSource.setMaxWaitMillis(2 * 10000) dataSource.setRemoveAbandonedTimeout(180) dataSource.setRemoveAbandonedOnBorrow(true) dataSource.setRemoveAbandonedOnMaintenance(true) dataSource.setTestOnReturn(true) dataSource.setTestOnBorrow(true) } return dataSource } /** * 开释数据源 */ def closeDataSource() = { if (dataSource != null) { dataSource.close() } } /** * 获取数据库连贯 * * @return */ def getConnection(): Connection = { var conn: Connection = null try { if (dataSource != null) { conn = dataSource.getConnection() } else { conn = getDataSource().getConnection() } } catch { case e: Exception => log.error(e.getMessage(), e) } conn } /** * 敞开连贯 */ def closeConnection (ps:PreparedStatement , conn:Connection ) { if (ps != null) { try { ps.close(); } catch { case e:Exception => log.error("预编译SQL语句对象PreparedStatement敞开异样!" + e.getMessage(), e); } } if (conn != null) { try { conn.close(); } catch { case e:Exception => log.error("敞开连贯对象Connection异样!" + e.getMessage(), e); } } }}Kafka生产者:KafkaProducerTestobject KafkaProducerTest { def main(args: Array[String]): Unit = { val props : Properties = new Properties() props.put("bootstrap.servers", ConfigConstants.kafkaBrokers) props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer]) props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer]) props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer]) props.put("key.serializer",ConfigConstants.kafkaKeySer) props.put("value.serializer", ConfigConstants.kafkaValueSer) val producer : Producer[String, String] = new KafkaProducer[String, String](props) val startTime : Long = System.currentTimeMillis() for ( i <- 1 to 100) { producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i))) } println("耗费工夫:" + (System.currentTimeMillis() - startTime)) producer.close() }}读取和保留Offset:该对象的作用是从外部设备中读取和写入Offset,包含MySQL和Redis ...

November 18, 2020 · 11 min · jiezi

Spark-Streaming的优化之路从Receiver到Direct模式

作者:个推数据研发工程师 学长 1 业务背景随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架MapReduce已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析、决策。Spark Streaming是一种分布式的大数据实时计算框架,他提供了动态的,高吞吐量的,可容错的流式数据处理,不仅可以实现用户行为分析,还能在金融、舆情分析、网络监控等方面发挥作用。个推开发者服务——消息推送“应景推送”正是应用了Spark Streaming技术,基于大数据分析人群属性,同时利用LBS地理围栏技术,实时触发精准消息推送,实现用户的精细化运营。此外,个推在应用Spark Streaming做实时处理kafka数据时,采用Direct模式代替Receiver模式的手段,实现了资源优化和程序稳定性提升。 本文将从Spark Streaming获取kafka数据的两种模式入手,结合个推实践,带你解读Receiver和Direct模式的原理和特点,以及从Receiver模式到Direct模式的优化对比。 2 两种模式的原理和区别Receiver模式1. Receiver模式下的运行架构1) InputDStream: 从流数据源接收的输入数据。2) Receiver:负责接收数据流,并将数据写到本地。3) Streaming Context:代表SparkStreaming,负责Streaming层面的任务调度,生成jobs发送到Spark engine处理。4) Spark Context: 代表Spark Core,负责批处理层面的任务调度,真正执行job的Spark engine。 2. Receiver从kafka拉取数据的过程该模式下:1) 在executor上会有receiver从kafka接收数据并存储在Spark executor中,在到了batch时间后触发job去处理接收到的数据,1个receiver占用1个core;2) 为了不丢数据需要开启WAL机制,这会将receiver接收到的数据写一份备份到第三方系统上(如:HDFS);3) receiver内部使用kafka High Level API去消费数据及自动更新offset。 Direct模式1. Direct模式下的运行架构与receiver模式类似,不同在于executor中没有receiver组件,从kafka拉去数据的方式不同。 2. Direct从kafka拉取数据的过程该模式下:1) 没有receiver,无需额外的core用于不停地接收数据,而是定期查询kafka中的每个partition的最新的offset,每个批次拉取上次处理的offset和当前查询的offset的范围的数据进行处理;2) 为了不丢数据,无需将数据备份落地,而只需要手动保存offset即可;3) 内部使用kafka simple Level API去消费数据, 需要手动维护offset,kafka zk上不会自动更新offset。 Receiver与Direct模式的区别1.前者在executor中有Receiver接受数据,并且1个Receiver占用一个core;而后者无Receiver,所以不会暂用core; 2.前者InputDStream的分区是 num_receiver *batchInterval/blockInteral,后者的分区数是kafka topic partition的数量。Receiver模式下num_receiver的设置不合理会影响性能或造成资源浪费;如果设置太小,并行度不够,整个链路上接收数据将是瓶颈;如果设置太多,则会浪费资源; 3.前者使用zookeeper来维护consumer的偏移量,而后者需要自己维护偏移量; 4.为了保证不丢失数据,前者需要开启WAL机制,而后者不需要,只需要在程序中成功消费完数据后再更新偏移量即可。 3 Receiver改造成Direct模式个推使用Spark Streaming做实时处理kafka数据,先前使用的是receiver模式; receiver有以下特点:1.receiver模式下,每个receiver需要单独占用一个core;2.为了保证不丢失数据,需要开启WAL机制,使用checkpoint保存状态;3.当receiver接受数据速率大于处理数据速率,导致数据积压,最终可能会导致程序挂掉。 由于以上特点,receiver模式下会造成一定的资源浪费;使用checkpoint保存状态, 如果需要升级程序,则会导致checkpoint无法使用;第3点receiver模式下会导致程序不太稳定;并且如果设置receiver数量不合理也会造成性能瓶颈在receiver。为了优化资源和程序稳定性,应将receiver模式改造成direct模式。 修改方式如下:1. 修改InputDStream的创建将receiver的: val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])改成direct的: ...

June 16, 2019 · 1 min · jiezi

大数据系列Spark学习笔记Spark-Streaming

1. Spark StreamingSpark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理Spark Streaing中有一个最基本的抽象叫DStream(代理),本质上就是一系列连续的RDD,DStream其实就是对RDD的封装DStream可以认为是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同在一个批次的处理时间间隔里, DStream只产生一个RDDDStream就相当于一个"模板", 我们可以根据这个"模板"来处理一段时间间隔之内产生的这个rdd,以此为依据来构建rdd的DAG2. 当下比较流行的实时计算引擎 吞吐量 编程语言 处理速度 生态 Storm 较低 clojure 非常快(亚秒) 阿里(JStorm) Flink 较高 scala 较快(亚秒) 国内使用较少 Spark Streaming 非常高 scala 快(毫秒) 完善的生态圈 3. Spark Streaming处理网络数据//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")val ssc = new StreamingContext(conf, Milliseconds(3000))val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)pairRetDS.print()//开启流计算ssc.start()//优雅的关闭ssc.awaitTermination()4. Spark Streaming接收数据的两种方式(Kafka)Receiver 偏移量是由zookeeper来维护的使用的是Kafka高级的API(消费者的API)编程简单效率低(为了保证数据的安全性,会开启WAL)kafka0.10的版本中已经彻底弃用Receiver了生产环境一般不会使用这种方式Direct 偏移量是有我们来手动维护效率高(我们直接把spark streaming接入到kafka的分区中了)编程比较复杂生产环境一般使用这种方式5. Spark Streaming整合Kafka基于Receiver的方式整合kafka(生产环境不建议使用,在0.10中已经移除了) //创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")val ssc = new StreamingContext(conf, Milliseconds(3000))val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"val groupId = "myid"val topics = Map("hadoop" -> 3)val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()基于Direct的方式(生产环境使用) ...

June 14, 2019 · 3 min · jiezi