后面的文章外面次要讲的是查问的用法,还是连续之前的文章格局,这里讲讲批改。
1. 单文档批改
1.1. insert
其实在数据筹备阶段曾经有新增的例子了。
DSL
POST /operation_log/_doc
{
"ip": "0.0.0.0",
"module": "测试数据"
}
spring
OperationLog operationLog=new OperationLog();
operationLog.setIp("0.0.0.0");
operationLog.setModule("测试数据");
return esRestTemplate.save(operationLog);
1.2. update-(save)
新增时,springboot 用到的是 save 办法,更新时也一样能够。不过得拿到文档的 id,这里 id=13OkA4QBMgWicIn2wBwM。
DSL
PUT /operation_log/_doc/13OkA4QBMgWicIn2wBwM
{
"ip": "0.0.0.0",
"module": "测试数据 1"
}
spring
esRestTemplate.save(operationLog);
1.3. update-(document)
DSL
POST /operation_log/_update/13OkA4QBMgWicIn2wBwM
{
"doc": {"module":"测试数据 1"}
}
spring
Document document = Document.create();
document.put("module", "测试数据 1");
UpdateQuery updateQuery = UpdateQuery
.builder(id)
.withDocument(document)
.build();
esRestTemplate.update(updateQuery,IndexCoordinates.of("operation_log"));
1.4. update-(script)
DSL
POST /operation_log/_update/13OkA4QBMgWicIn2wBwM
{
"script": {
"source": "ctx._source.module = params.module",
"params": {"module": "测试数据 1"}
}
}
spring
Map<String, Object> params = new HashMap<>();
params.put("module", "测试数据 1");
UpdateQuery updateQuery = UpdateQuery
.builder(id)
.withScript("ctx._source.module = params.module")
.withParams(params)
.build();
esRestTemplate.update(updateQuery, IndexCoordinates.of("operation_log"));
1.5. delete
DSL
DELETE /operation_log/_doc/13OkA4QBMgWicIn2wBwM
spring
esRestTemplate.delete(id, OperationLog.class);
2. 批量批改 bulk
批量新增 DSL
POST /operation_log/_bulk
{"create":{"_index":"operation_log"}}
{"ip":"0.0.0.0","module":"测试数据 1"}
{"create":{"_index":"operation_log"}}
{"ip":"0.0.0.0","module":"测试数据 2"}
{"create":{"_index":"operation_log"}}
{"ip":"0.0.0.0","module":"测试数据 3"}
批量更新 DSL
POST /operation_log/_bulk
{"update":{"_id":"2HP9A4QBMgWicIn26BzR"}}
{"doc":{"module":"测试数据 11"}}
{"update":{"_id":"2XP9A4QBMgWicIn26BzR"}}
{"script":{"source":"ctx._source.module = params.module","params":{"module":"测试数据 22"}}}
批量删除 DSL
POST /operation_log/_bulk
{"delete":{"_id":"2HP9A4QBMgWicIn26BzR"}}
{"delete":{"_id":"2XP9A4QBMgWicIn26BzR"}}
{"delete":{"_id":"2nP9A4QBMgWicIn26BzR"}}
不知是否留神到,在批量更新的语句中,反对同时 doc、script 两种更新形式。实际上来说,_bulk
其实反对同时将上述的三种语句一起提交执行。
不过我的项目上个别不会如此利用,都是独自离开来。像批量新增,save
办法就反对批量新增操作,尽管底层代码还是调用 bulkOperation
。
spring bulkUpdate
@PatchMapping("bulk-update")
public void bulkUpdate() {Map<String, Object> params = new HashMap<>();
params.put("module", "测试数据 2");
String scriptStr = "ctx._source.module = params.module";
Query query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))
.build();
List<UpdateQuery> updateQueryList = esRestTemplate.search(query, OperationLog.class)
.stream()
.map(SearchHit::getContent)
.map(obj -> UpdateQuery.builder(obj.getId())
.withScript(scriptStr)
.withParams(params)
.build())
.collect(Collectors.toList());
esRestTemplate.bulkUpdate(updateQueryList, OperationLog.class);
}
无关更具体、更好应用 bulk 的局部,倡议查看 es 官网材料
3. 批改 ByQuery
3.1. updateByQuery
DSL
POST /operation_log/_update_by_query
{
"script": {
"source": "ctx._source.module = params.module",
"params": {"module": "测试数据 1"}
},
"query": {
"term": {"ip": "0.0.0.0"}
}
}
spring
@PatchMapping("update-by-query")
public void updateByQuery() {Map<String, Object> params = new HashMap<>();
params.put("module", "测试数据 2");
String scriptStr = "ctx._source.module = params.module";
UpdateQuery updateQuery = UpdateQuery
.builder(new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))
.build())
.withScript(scriptStr)
.withScriptType(ScriptType.INLINE)
.withLang("painless")
.withParams(params)
.build();
esRestTemplate.updateByQuery(updateQuery, IndexCoordinates.of("operation_log"));
}
能够比照一下下面的 bulkUpdate
办法,发现有些不同:
- updateByQuery 只反对
Script
,不反对Document
的形式更新。 - updateByQuery 应用 Script 形式更新时,必须传递
scriptType
、Lang
这些辅助参数。本来 bulkUpdate 中也是要传的,只不过底层办法封装了,然而没有给 updateByQuery 封装。(理论踩过坑,看封装办法才得悉)
3.2. deleteByQuery
DSL
POST /operation_log/_delete_by_query
{
"query": {
"term": {"ip": "0.0.0.0"}
}
}
spring
Query query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))
.build();
esRestTemplate.delete(query, OperationLog.class);
delete_by_query 并不是真正意义上物理文档删除,而是只是版本变动并且对文档减少了删除标记。当咱们再次搜寻的时候,会搜寻全副而后过滤掉有删除标记的文档。因而,该索引所占的空间并不会随着该 API 的操作磁盘空间会马上开释掉,只有等到下一次段合并的时候才真正被物理删除,这个时候磁盘空间才会开释。相同,在被查问到的文档标记删除过程同样须要占用磁盘空间,这个时候,你会发现触发该 API 操作的时候磁盘岂但没有被开释,反而磁盘使用率回升了。
3.3. 调优参数
可参考 es 官网 ElasticSearch API guide,在批量批改文档时,有很多参数能够配合调优。
这里先列举几个罕用的,剩下具体的请看官网文档:
1. refresh
ES 的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为 IO 的操作是比拟费时的。
- 先写到内存中,此时不可搜寻。
- 默认通过 1s 之后会被写入 lucene 的底层文件 segment 中,此时能够搜寻到。
- refresh 之后才会写入磁盘
以上过程因为随时可能被中断导致数据失落,所以每一个过程都会有 translog 记录,如果两头有任何一步失败了,等服务器重启之后就会重试,保证数据写入。translog 也是先存在内存里的,而后默认 5 秒刷一次写到硬盘里。
在 index,Update , Delete , Bulk 等操作中,能够设置 refresh 的值。如下:
-
false:默认值。不要刷新相干的动作。在申请返回后,此申请所做的更改将在某个时刻显示。如:
创立一个文档,而不做任何使其能够搜寻的事件:PUT /test/test/1 PUT /test/test/2?refresh=false
-
true 或空字符串 :更新数据之后,立即对相干的分片 (包含正本) 刷新,这个刷新操作保障了数据更新的后果能够立即被搜寻到。
创立一个文档并立刻刷新索引,使其可见:PUT /test/test/1?refresh PUT /test/test/2?refresh=true
-
wait_for:期待申请所做的更改在返回之前通过冲刷显示。这不会强制立刻刷新,而是期待刷新产生。Elasticsearch 会主动每隔 index.refresh_interval 刷新曾经更改的分片,默认为 1 秒。该设置是动静的。
创立一个文档并期待它成为搜寻可见:PUT /test/test/1?refresh=wait_for
2. scroll_size
这个参数是执行删除的时候,每次每个线程会查问的数据量,而后进行删除。默认值是 100,就是说每个线程每次都会查问出 100 条数据而后再删除。
3. slices
能够了解为,默认值是一个线程在进行查问数据并删除,当设置这个 slices 值时,会将 es 下的数据进行切分,启动多个 task 去做删除,了解为多线程执行操作。
然而就像不倡议滥用多线程一样,不倡议设置 slices 值太大,否则会导致 es 出问题。倡议设为索引分片数量的倍数(如:1 倍、2 倍),有助于基于每个分片的数据做切分。
4. conflicts
如果按查问删除遇到版本抵触,该怎么办,有两个值:
- abort:默认值,抵触时停止。
- proceed:抵触时持续。
举后面 updateByQuery 的例子。_update_by_query 在启动时获取索引的快照,并应用外部版本控制对其进行索引。这意味着如果文档在拍摄快照和解决索引申请之间发生变化,则会产生版本抵触。当版本匹配文档被更新并且版本号减少。
所有更新和查问失败导致 _update_by_query 停止并在响应失败中返回。已执行的更新依然保持。换句话说,过程没有回滚,只会停止。当第一个故障导致停止时,失败批量申请返回的所有故障都会返回到故障元素中;因而,有可能会有不少失败的实体。
如果你想简略地计算版本抵触,不会导致 _update_by_query 停止,你能够在 url 设置 conflicts=proceed
或在申请体设置 ”conflicts”: “proceed”。如上例中改成:
POST /operation_log/_update_by_query?conflicts=proceed
4. 锁
Elasticsearch 和数据库一样,在多线程并发拜访批改的状况下,会有一个锁机制来管制每次批改的均为最新的文档,外围是应用乐观锁的机制。
_version
在 Elasticsearch 通过 _version
来记录文档的版本。第一次创立一个 document 的时候,它的_version 外部版本号就是 1;当前,每次对这个 document 执行批改或者删除操作,都会对这个_version 版本号主动加 1;哪怕是删除,也会对这条数据的版本号加 1
因为 segment 时不能被批改的,所以当对一个文档执行 DELETE 之后,在插入雷同 id 的文档,version 版本不会是 0,而是在 DELETE 操作的 version 上递增。
在对文档进行批改和删除时,version 会递增,也能够由用户指定。只有当版本号大于以后版本时,才会批改删除胜利,否则失败。当并发申请时,先批改胜利的,version 会减少,这个时候其余申请就会犹豫 version 不匹配从而批改失败。
external version
es 提供了一个 feature,就是说,你能够不必它提供的外部_version 版本号来进行并发管制,能够基于你本人保护的一个版本号来进行并发管制。
举个例子,如果你的数据在 mysql 里也有一份,而后你的利用零碎自身就保护了一个版本号,无论是什么本人生成的或程序控制的。这个时候,你进行乐观锁并发管制的时候,可能并不是想要用 es 外部的_version 来进行管制,而是用你本人保护的那个 version 来进行管制。
?version=1 基于_version
?version=1&version_type=external 基于 external version
_version 与 version_type=external 惟一的区别在于:
- _version,只有当你提供的 version 与 es 中的_version 截然不同的时候,才能够进行批改,只有不一样,就报错。
- 当 version_type=external 的时候,只有当你提供的 version 比 es 中的_version 大的时候,能力实现批改。
es,_version=1,?version=1,能力更新胜利
es,_version=1,?version>1&version_type=external,能力胜利,比如说:?version=2&version_type=external
援用:
- Spring Data Elasticsearch 官网
- ElasticSearch 系列 - 并发机制
- ElasticSearch API guide