共计 4600 个字符,预计需要花费 12 分钟才能阅读完成。
摘要: 脏数据对数据计算的正确性带来了很重大的影响。因而,咱们须要摸索一种办法,可能实现 Spark 写入 Elasticsearch 数据的可靠性与正确性。
概述
Spark 与 Elasticsearch(es)的联合,是近年来大数据解决方案很炽热的一个话题。一个是杰出的分布式计算引擎,另一个是杰出的搜索引擎。近年来,越来越多的成熟计划落地到行业产品中,包含咱们耳熟能详的 Spark+ES+HBase 日志剖析平台。
目前,华为云数据湖摸索(DLI)服务已全面反对 Spark/Flink 跨源拜访 Elasticsearch。而之前在实现过程中也遇到过很多场景化问题,本文将筛选其中比拟经典的分布式一致性问题进行探讨。
分布式一致性问题
问题形容
数据容错是大数据计算引擎面临的次要问题之一。目前,支流的开源大数据比方 Apache Spark 和 Apache Flink 曾经齐全实现了 Exactly Once 语义,保障了外部数据处理的正确性。然而在将计算结果写入到内部数据源时,因为内部数据源架构与拜访形式的多样性,始终没能找到一个对立的解决方案来保障一致性(咱们称为 Sink 算子一致性问题)。再加上 es 自身没有事务处理的能力,因而如何保障写入 es 数据一致性成为了热点话题。
咱们举一个简略的例子来阐明一下,图 1 在 SparkRDD 中(这里假如是一个 task),每一条蓝色的线代表 100 万条数据,那么 10 条蓝色的线示意了有 1000 万条数据筹备写入到 CSS(华为云搜寻服务,外部为 es)的某个 index 中。在写入过程中,零碎产生了故障,导致只有一半(500 万条)数据胜利写入。
task 是 Spark 执行工作的最小单元,如果 task 失败了,以后 task 须要整个从新执行。所以,当咱们从新执行写入操作(图 2),并最终重试胜利之后(这次用红色来示意雷同的 1000 万条数据),上一次失败留下的 500 万条数据仍然存在(蓝色的线),变成脏数据。脏数据对数据计算的正确性带来了很重大的影响。因而,咱们须要摸索一种办法,可能实现 Spark 写入 es 数据的可靠性与正确性。
图 1 Spark task 失败时向 es 写入了局部数据
图 2 task 重试胜利后上一次写入的局部数据成为脏数据
解决方案
1. 写笼罩
从上图中,咱们能够很直观的看进去,每次 task 插入数据前,先将 es 的 index 中的数据都清空就能够了。那么,每次写入操作能够看成是以下 3 个步骤的组合:
- 步骤一 判断以后 index 中是否有数据
- 步骤二 清空以后 index 中的数据
- 步骤三 向 index 中写入数据
换一种角度,咱们能够了解为,不论之前是否执行了数据写入,也不论之前数据写入了多少次,咱们只想要保障以后这一次写入可能独立且正确地实现,这种思维咱们称为幂等。
幂等式写入是大数据 sink 算子解决一致性问题的一种常见思路,另一种说法叫做最终一致性,其中最简略的做法就是“insert overwrite”。当 Spark 数据写入 es 失败并尝试从新执行的时候,利用笼罩式写入,能够将 index 中的残留数据笼罩掉。
图 应用 overwrite 模式,task 重试时笼罩上一次数据
在 DLI 中,能够在 DataFrame 接口里将 mode 设置成“overwrite”来实现笼罩写 es:
val dfWriter = sparkSession.createDataFrame(rdd, schema)
//
// 写入数据至 es
//
dfWriter.write
.format("es")
.option("es.resource", resource)
.option("es.nodes", nodes)
.mode(SaveMode.Overwrite)
.save()
也能够间接应用 sql 语句:
// 插入数据至 es
sparkSession.sql("insert overwrite table es_table values(1,'John'),(2,'Bob')")
2. 最终一致性
利用上述“overwrite”的形式解决容错问题有一个很大的缺点。如果 es 曾经存在了正确的数据,这次只是须要追加写入。那么 overwrite 会把之前 index 的正确的数据都笼罩掉。
比如说,有多个 task 并发执行写入数据的操作,其中一个 task 执行失败而其余 task 执行胜利,从新执行失败的 task 进行“overwrite”会将其余曾经胜利写入的数据笼罩掉。再比如说,Streaming 场景中,每一批次数据写入都变成笼罩,这是不合理的形式。
图 Spark 追加数据写入 es
图 用 overwrite 写入会将原先正确的数据笼罩掉
其实,咱们想做的事件,只是清理脏数据而不是所有 index 中的数据。因而,外围问题变成了如何辨认脏数据?借鉴其余数据库解决方案,咱们仿佛能够找到办法。在 MySQL 中,有一个 insert ignore into 的语法,如果遇到主键抵触,可能单单对这一行数据进行疏忽操作,而如果没有抵触,则进行一般的插入操作。这样就能够将笼罩数据的力度细化到了行级别。
es 中有相似的性能么?如果 es 中每一条数据都有主键,主键抵触时能够进行笼罩(疏忽和笼罩其实都能解决这个问题),那么在 task 失败重试时,就能够仅针对脏数据进行笼罩。
咱们先来看一下 Elasticsearch 中的概念与关系型数据库之间的一种对照关系:
咱们晓得,MySQL 中的主键是对于一行数据(Row)的惟一标识。从表中能够看到,Row 对应的就是 es 中的 Document。那么,Document 有没有惟一的标识呢?
答案是必定的,每一个 Document 都有一个 id,即 doc_id。doc_id 是可配置的,index、type、doc_id 三者指定了惟一的一条数据(Document)。并且,在插入 es 时,index、type、doc_id 雷同,原先的 document 数据将会被笼罩掉。因而,doc_id 能够等效于“MySQL 主键抵触疏忽插入”性能,即“doc_id 抵触笼罩插入”性能。
因而,DLI 的 SQL 语法中提供了配置项“es.mapping.id”,能够指定一个字段作为 Document id,例如:
create table es_table(id int, name string) using es options(
'es.nodes' 'localhost:9200',
'es.resource' '/mytest/anytype',
'es.mapping.id' 'id')")
这里指定了字段“id”作为 es 的 doc_id,当插入数据时,字段“id”的值将成为插入 Document 的 id。值得注意的是,“id”的值要惟一,否则雷同的“id”将会使数据被笼罩。
这时,如果遇到作业或者 task 失败的状况,间接从新执行即可。当最终作业执行胜利时,es 中将不会呈现残留的脏数据,即实现了最终一致性。
图 在插入数据时将主键设为 doc_id,利用幂等插入来实现最终一致性
总结
本文能够一句话总结为“利用 doc_id 实现写入 es 的最终一致性”。而这种问题,实际上不须要如此大费周章的摸索,因为在 es 的原生 API 中,插入数据是须要指定 doc_id,这应该是一个基本常识:具体 API 阐明能够参考:https://www.elastic.co/guide/…)
图 es 应用 bulk 接口进行数据写入
权当消遣,聊以慰藉。
得益于 Base 实践,最终一致性成为分布式计算中重要的解决方案之一。只管该解决方案还有肯定的限度(比方本文的解决方案中数据必须应用主键),而业界还有很多分布式一致性的解决方案(比方 2PC、3PC)。但集体认为,掂量工作量与最终成果,最终一致性是一种很无效且很简洁的解决方案。
扩大浏览:Elasticsearch Datasource
简介
Datasource 是 Apache Spark 提供的拜访内部数据源的对立接口。Spark 提供了 SPI 机制对 Datasource 进行了插件式治理,能够通过 Spark 的 Datasource 模块自定义拜访 Elasticsearch 的逻辑。
华为云 DLI(数据湖摸索)服务已齐全实现了 es datasource 性能,用户只有通过简略的 SQL 语句或者 Spark DataFrame API 就能实现 Spark 拜访 es。
性能形容
通过 Spark 拜访 es,能够在 DLI 官网文档中找到详细资料:https://support.huaweicloud.com/usermanual-dli/dli_01_0410.html。(Elasticsearch 是由华为云 CSS 云搜寻服务提供)。
能够应用 Spark DataFrame API 形式来进行数据的读写:
//
// 初始化设置
//
// 设置 es 的 /index/type(es 6.x 版本不反对同一个 index 中存在多个 type,7.x 版本不反对设置 type)val resource = "/mytest/anytype";
// 设置 es 的连贯地址 (格局为”node1:port,node2:port...”, 因为 es 的 replica 机制, 即便拜访 es 集群, 只须要配置一个地址即可.)
val nodes = "localhost:9200"
// 结构数据
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(1, "John"),Row(2,"Bob")))
val dfWriter = sparkSession.createDataFrame(rdd, schema)
//
// 写入数据至 es
//
dfWriter.write
.format("es")
.option("es.resource", resource)
.option("es.nodes", nodes)
.mode(SaveMode.Append)
.save()
//
// 从 es 读取数据
//
val dfReader = sparkSession.read.format("es").option("es.resource",resource).option("es.nodes", nodes).load()
dfReader.show()
也能够应用 Spark SQL 来拜访:
// 创立一张关联 es /index/type 的 Spark 长期表, 该表并不寄存理论数据
val sparkSession = SparkSession.builder().getOrCreate()
sparkSession.sql("create table es_table(id int, name string) using es options('es.nodes''localhost:9200',
'es.resource' '/mytest/anytype')")
// 插入数据至 es
sparkSession.sql("insert into es_table values(1,'John'),(2,'Bob')")
// 从 es 中读取数据
val dataFrame = sparkSession.sql("select * from es_table")
dataFrame.show()
点击关注,第一工夫理解华为云陈腐技术~