摘要:脏数据对数据计算的正确性带来了很重大的影响。因而,咱们须要摸索一种办法,可能实现Spark写入Elasticsearch数据的可靠性与正确性。

概述

Spark与Elasticsearch(es)的联合,是近年来大数据解决方案很炽热的一个话题。一个是杰出的分布式计算引擎,另一个是杰出的搜索引擎。近年来,越来越多的成熟计划落地到行业产品中,包含咱们耳熟能详的Spark+ES+HBase日志剖析平台。

目前,华为云数据湖摸索(DLI)服务已全面反对Spark/Flink跨源拜访Elasticsearch。而之前在实现过程中也遇到过很多场景化问题,本文将筛选其中比拟经典的分布式一致性问题进行探讨。

分布式一致性问题

问题形容

数据容错是大数据计算引擎面临的次要问题之一。目前,支流的开源大数据比方Apache Spark和Apache Flink曾经齐全实现了Exactly Once语义,保障了外部数据处理的正确性。然而在将计算结果写入到内部数据源时,因为内部数据源架构与拜访形式的多样性,始终没能找到一个对立的解决方案来保障一致性(咱们称为Sink算子一致性问题)。再加上es自身没有事务处理的能力,因而如何保障写入es数据一致性成为了热点话题。

咱们举一个简略的例子来阐明一下,图1在SparkRDD中(这里假如是一个task),每一条蓝色的线代表100万条数据,那么10条蓝色的线示意了有1000万条数据筹备写入到CSS(华为云搜寻服务,外部为es)的某个index中。在写入过程中,零碎产生了故障,导致只有一半(500万条)数据胜利写入。

task是Spark执行工作的最小单元,如果task失败了,以后task须要整个从新执行。所以,当咱们从新执行写入操作(图2),并最终重试胜利之后(这次用红色来示意雷同的1000万条数据),上一次失败留下的500万条数据仍然存在(蓝色的线),变成脏数据。脏数据对数据计算的正确性带来了很重大的影响。因而,咱们须要摸索一种办法,可能实现Spark写入es数据的可靠性与正确性。

图1 Spark task失败时向es写入了局部数据

图2 task重试胜利后上一次写入的局部数据成为脏数据

解决方案

1.写笼罩

从上图中,咱们能够很直观的看进去,每次task插入数据前,先将es的index中的数据都清空就能够了。那么,每次写入操作能够看成是以下3个步骤的组合:

  • 步骤一 判断以后index中是否有数据
  • 步骤二 清空以后index中的数据
  • 步骤三 向index中写入数据

换一种角度,咱们能够了解为,不论之前是否执行了数据写入,也不论之前数据写入了多少次,咱们只想要保障以后这一次写入可能独立且正确地实现,这种思维咱们称为幂等。

幂等式写入是大数据sink算子解决一致性问题的一种常见思路,另一种说法叫做最终一致性,其中最简略的做法就是“insert overwrite”。当Spark数据写入es失败并尝试从新执行的时候,利用笼罩式写入,能够将index中的残留数据笼罩掉。

图 应用overwrite模式,task重试时笼罩上一次数据

在DLI中,能够在DataFrame接口里将mode设置成“overwrite”来实现笼罩写es:

val dfWriter = sparkSession.createDataFrame(rdd, schema)//// 写入数据至es//dfWriter.write   .format("es")   .option("es.resource", resource)   .option("es.nodes", nodes)   .mode(SaveMode.Overwrite)   .save()

也能够间接应用sql语句:

// 插入数据至essparkSession.sql("insert overwrite table es_table values(1, 'John'),(2, 'Bob')")

2.最终一致性

利用上述“overwrite”的形式解决容错问题有一个很大的缺点。如果es曾经存在了正确的数据,这次只是须要追加写入。那么overwrite会把之前index的正确的数据都笼罩掉。

比如说,有多个task并发执行写入数据的操作,其中一个task执行失败而其余task执行胜利,从新执行失败的task进行“overwrite”会将其余曾经胜利写入的数据笼罩掉。再比如说,Streaming场景中,每一批次数据写入都变成笼罩,这是不合理的形式。

图 Spark追加数据写入es

图 用overwrite写入会将原先正确的数据笼罩掉

其实,咱们想做的事件,只是清理脏数据而不是所有index中的数据。因而,外围问题变成了如何辨认脏数据?借鉴其余数据库解决方案,咱们仿佛能够找到办法。在MySQL中,有一个insert ignore into的语法,如果遇到主键抵触,可能单单对这一行数据进行疏忽操作,而如果没有抵触,则进行一般的插入操作。这样就能够将笼罩数据的力度细化到了行级别。

es中有相似的性能么?如果es中每一条数据都有主键,主键抵触时能够进行笼罩(疏忽和笼罩其实都能解决这个问题),那么在task失败重试时,就能够仅针对脏数据进行笼罩。

咱们先来看一下Elasticsearch中的概念与关系型数据库之间的一种对照关系:

咱们晓得,MySQL中的主键是对于一行数据(Row)的惟一标识。从表中能够看到,Row对应的就是es中的Document。那么,Document有没有惟一的标识呢?

答案是必定的,每一个Document都有一个id,即doc_id。doc_id是可配置的,index、type、doc_id三者指定了惟一的一条数据(Document)。并且,在插入es时,index、type、doc_id雷同,原先的document数据将会被笼罩掉。因而,doc_id能够等效于“MySQL主键抵触疏忽插入”性能,即“doc_id抵触笼罩插入”性能。

因而,DLI的SQL语法中提供了配置项“es.mapping.id”,能够指定一个字段作为Document id,例如:

create table es_table(id int, name string) using es options('es.nodes' 'localhost:9200','es.resource' '/mytest/anytype','es.mapping.id' 'id')")

这里指定了字段“id”作为es的doc_id,当插入数据时,字段“id”的值将成为插入Document的id。值得注意的是,“id”的值要惟一,否则雷同的“id”将会使数据被笼罩。

这时,如果遇到作业或者task失败的状况,间接从新执行即可。当最终作业执行胜利时,es中将不会呈现残留的脏数据,即实现了最终一致性。

图 在插入数据时将主键设为doc_id,利用幂等插入来实现最终一致性

总结

本文能够一句话总结为“利用doc_id实现写入es的最终一致性”。而这种问题,实际上不须要如此大费周章的摸索,因为在es的原生API中,插入数据是须要指定doc_id,这应该是一个基本常识:具体API阐明能够参考:https://www.elastic.co/guide/...)

图 es应用bulk接口进行数据写入

权当消遣,聊以慰藉。

得益于Base实践,最终一致性成为分布式计算中重要的解决方案之一。只管该解决方案还有肯定的限度(比方本文的解决方案中数据必须应用主键),而业界还有很多分布式一致性的解决方案(比方2PC、3PC)。但集体认为,掂量工作量与最终成果,最终一致性是一种很无效且很简洁的解决方案。

扩大浏览:Elasticsearch Datasource

简介

Datasource是Apache Spark提供的拜访内部数据源的对立接口。Spark提供了SPI机制对Datasource进行了插件式治理,能够通过Spark的Datasource模块自定义拜访Elasticsearch的逻辑。

华为云DLI(数据湖摸索)服务已齐全实现了es datasource性能,用户只有通过简略的SQL语句或者Spark DataFrame API就能实现Spark拜访es。

性能形容

通过Spark拜访es,能够在DLI官网文档中找到详细资料:https://support.huaweicloud.com/usermanual-dli/dli_01_0410.html。(Elasticsearch是由华为云CSS云搜寻服务提供)。

能够应用Spark DataFrame API形式来进行数据的读写:

//// 初始化设置//// 设置es的/index/type(es 6.x版本不反对同一个index中存在多个type,7.x版本不反对设置type)val resource = "/mytest/anytype";// 设置es的连贯地址(格局为”node1:port,node2:port...”,因为es的replica机制,即便拜访es集群,只须要配置一个地址即可.)val nodes = "localhost:9200"// 结构数据val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))val rdd = sparkSession.sparkContext.parallelize(Seq(Row(1, "John"),Row(2,"Bob")))val dfWriter = sparkSession.createDataFrame(rdd, schema)//// 写入数据至es//dfWriter.write   .format("es")   .option("es.resource", resource)   .option("es.nodes", nodes)   .mode(SaveMode.Append)   .save()//// 从es读取数据//val dfReader = sparkSession.read.format("es").option("es.resource",resource).option("es.nodes", nodes).load()dfReader.show()

也能够应用Spark SQL来拜访:

// 创立一张关联es /index/type的Spark长期表,该表并不寄存理论数据val sparkSession = SparkSession.builder().getOrCreate()sparkSession.sql("create table es_table(id int, name string) using es options(    'es.nodes' 'localhost:9200',    'es.resource' '/mytest/anytype')")// 插入数据至essparkSession.sql("insert into es_table values(1, 'John'),(2, 'Bob')")// 从es中读取数据val dataFrame = sparkSession.sql("select * from es_table")dataFrame.show()

点击关注,第一工夫理解华为云陈腐技术~