共计 2370 个字符,预计需要花费 6 分钟才能阅读完成。
bulk api 奇特的 json 格式
{“action”: {“meta”}}n
{“data”}n
{“action”: {“meta”}}n
{“data”}n
…
为什么 bulk 要采用这种奇特的 json 格式?
由于 bulk 中的每个操作都可能要转发到不同的 node 的 shard 去执行,假设我们不用这种奇特的 json 格式,采用比较良好的 json 数组格式,允许任意的换行,整个可读性非常棒,读起来很爽。但是 ES 拿到这种标准格式的 json 串之后,要按照下述流程去进行执行处理。
格式如下:
[
{
"action": {
},
"data": {}
}
]
(1)将 json 数组解析为 JSONArray 对象,这个时候,整个数据,就会在内存中出现一份一摸一样的拷贝,一份数据是 json 文本,一份数据是 JSONArray 对象
(2)解析 json 数组里面的每个 json,对每个请求中的 document 进行路由
(3)为路由到同一个 shard 上的多个请求,创建一个请求数组
(4)将这个请求数组序列化
(5)将序列化后的请求数组发送到对应的节点上去
不难看出这样就会耗费更多的内存,更多的 jvm gc 开销。
假设一个场景,对于 bulk size 的大小一般建议在几千条,大小在 10MB 左右,所以说,可怕的事情来了。假设说现在 100 个 bulk 请求发送到了一个节点上去,然后每个请求是 10MB,100 个请求就是 1000MB=1G,然后每个请求的 json 都 copy 一份 JSONArray 对象,此时内存中的占用就会翻倍,就会占用 2GB 的内存,甚至还不止,因为弄成 JSONArray 对象之后,还可能会多弄一些其它的数据结构,2GB+ 的内存占用。
占用更多的内存可能就会积压其它请求的内存使用量,比如说最重要的搜索请求,分析请求等等。此时就可能会导致其它请求的性能急速下降,另外的话,占用内存更多,就会导致 java 虚拟机的垃圾回收次数更多,更加频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致 ES 的 java 虚拟机停止工作线程的时间更多。
而使用这个奇特格式的 json
{“action”: {“meta”}}n
{“data”}n
{“action”: {“meta”}}n
{“data”}n
…
(1)不用将其转换为 json 对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割 json
(2)对每两个一组的 json, 读取 meta,进行 document 路由
(3)直接将对应的 json 发送到 node 上去
和标准格式的 json 相比,最大的优势在于不需要将 json 数组解析为一个 JSONArray 对象,形成一份大数据的拷贝,浪费内存空间,尽可能的保证性能。
实战:
PUT _bulk
{"index": {"_index": "test", "_id": "1"}}
{"field1": "value1", "field2": "value2"}
{"index": {"_index": "test", "_id": "2"}}
{"field1": "value1 id2", "field2": "value2 id2"}
{"delete": {"_index": "test", "_id": "2"}}
{"create": {"_index": "test", "_id": "3"}}
{"field1": "value3"}
{"update": {"_index": "test", "_id": "1"}}
{"doc": {"field2": "value2"}}
{
"took" : 68,
"errors" : true,
"items" : [
{
"index" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 4,
"_primary_term" : 1,
"status" : 200
}
},
{
"index" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 5,
"_primary_term" : 1,
"status" : 201
}
},
{
"delete" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"_version" : 2,
"result" : "deleted",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 6,
"_primary_term" : 1,
"status" : 200
}
},
{
"create" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "3",
"status" : 409,
"error" : {
"type" : "version_conflict_engine_exception",
"reason" : "[3]: version conflict, document already exists (current version [1])",
"index_uuid" : "rOLJZzIVTDCWtDQcJuei6w",
"shard" : "0",
"index" : "test"
}
}
},
{
"update" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"result" : "noop",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"status" : 200
}
}
]
}