后面的文章外面次要讲的是查问的用法,还是连续之前的文章格局,这里讲讲批改。
1. 单文档批改
1.1. insert
其实在数据筹备阶段曾经有新增的例子了。
DSL
POST /operation_log/_doc{ "ip": "0.0.0.0", "module": "测试数据"}
spring
OperationLog operationLog=new OperationLog(); operationLog.setIp("0.0.0.0"); operationLog.setModule("测试数据"); return esRestTemplate.save(operationLog);
1.2. update-(save)
新增时,springboot 用到的是 save 办法,更新时也一样能够。不过得拿到文档的id,这里id=13OkA4QBMgWicIn2wBwM。
DSL
PUT /operation_log/_doc/13OkA4QBMgWicIn2wBwM{ "ip": "0.0.0.0", "module": "测试数据1"}
spring
esRestTemplate.save(operationLog);
1.3. update-(document)
DSL
POST /operation_log/_update/13OkA4QBMgWicIn2wBwM{ "doc": { "module":"测试数据1" }}
spring
Document document = Document.create(); document.put("module", "测试数据1"); UpdateQuery updateQuery = UpdateQuery .builder(id) .withDocument(document) .build(); esRestTemplate.update(updateQuery,IndexCoordinates.of("operation_log"));
1.4. update-(script)
DSL
POST /operation_log/_update/13OkA4QBMgWicIn2wBwM{ "script": { "source": "ctx._source.module = params.module", "params": { "module": "测试数据1" } }}
spring
Map<String, Object> params = new HashMap<>(); params.put("module", "测试数据1"); UpdateQuery updateQuery = UpdateQuery .builder(id) .withScript("ctx._source.module = params.module") .withParams(params) .build(); esRestTemplate.update(updateQuery, IndexCoordinates.of("operation_log"));
1.5. delete
DSL
DELETE /operation_log/_doc/13OkA4QBMgWicIn2wBwM
spring
esRestTemplate.delete(id, OperationLog.class);
2. 批量批改 bulk
批量新增 DSL
POST /operation_log/_bulk{"create":{"_index":"operation_log"}}{"ip":"0.0.0.0","module":"测试数据1"}{"create":{"_index":"operation_log"}}{"ip":"0.0.0.0","module":"测试数据2"}{"create":{"_index":"operation_log"}}{"ip":"0.0.0.0","module":"测试数据3"}
批量更新 DSL
POST /operation_log/_bulk{"update":{"_id":"2HP9A4QBMgWicIn26BzR"}}{"doc":{"module":"测试数据11"}}{"update":{"_id":"2XP9A4QBMgWicIn26BzR"}}{"script":{"source":"ctx._source.module = params.module","params":{"module":"测试数据22"}}}
批量删除 DSL
POST /operation_log/_bulk{"delete":{"_id":"2HP9A4QBMgWicIn26BzR"}}{"delete":{"_id":"2XP9A4QBMgWicIn26BzR"}}{"delete":{"_id":"2nP9A4QBMgWicIn26BzR"}}
不知是否留神到,在批量更新的语句中,反对同时 doc、script 两种更新形式。实际上来说,_bulk
其实反对同时将上述的三种语句一起提交执行。
不过我的项目上个别不会如此利用,都是独自离开来。像批量新增,save
办法就反对批量新增操作,尽管底层代码还是调用 bulkOperation
。
spring bulkUpdate
@PatchMapping("bulk-update") public void bulkUpdate() { Map<String, Object> params = new HashMap<>(); params.put("module", "测试数据2"); String scriptStr = "ctx._source.module = params.module"; Query query = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0")) .build(); List<UpdateQuery> updateQueryList = esRestTemplate.search(query, OperationLog.class) .stream() .map(SearchHit::getContent) .map(obj -> UpdateQuery.builder(obj.getId()) .withScript(scriptStr) .withParams(params) .build()) .collect(Collectors.toList()); esRestTemplate.bulkUpdate(updateQueryList, OperationLog.class); }
无关更具体、更好应用 bulk的局部,倡议查看 es官网材料
3. 批改ByQuery
3.1. updateByQuery
DSL
POST /operation_log/_update_by_query{ "script": { "source": "ctx._source.module = params.module", "params": { "module": "测试数据1" } }, "query": { "term": { "ip": "0.0.0.0" } }}
spring
@PatchMapping("update-by-query") public void updateByQuery() { Map<String, Object> params = new HashMap<>(); params.put("module", "测试数据2"); String scriptStr = "ctx._source.module = params.module"; UpdateQuery updateQuery = UpdateQuery .builder(new NativeSearchQueryBuilder() .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0")) .build()) .withScript(scriptStr) .withScriptType(ScriptType.INLINE) .withLang("painless") .withParams(params) .build(); esRestTemplate.updateByQuery(updateQuery, IndexCoordinates.of("operation_log")); }
能够比照一下下面的 bulkUpdate
办法,发现有些不同:
- updateByQuery 只反对
Script
,不反对Document
的形式更新。 - updateByQuery 应用 Script 形式更新时,必须传递
scriptType
、Lang
这些辅助参数。本来 bulkUpdate 中也是要传的,只不过底层办法封装了,然而没有给 updateByQuery 封装。(理论踩过坑,看封装办法才得悉)
3.2. deleteByQuery
DSL
POST /operation_log/_delete_by_query{ "query": { "term": { "ip": "0.0.0.0" } }}
spring
Query query = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0")) .build(); esRestTemplate.delete(query, OperationLog.class);
delete_by_query并不是真正意义上物理文档删除,而是只是版本变动并且对文档减少了删除标记。当咱们再次搜寻的时候,会搜寻全副而后过滤掉有删除标记的文档。因而,该索引所占的空间并不会随着该API的操作磁盘空间会马上开释掉,只有等到下一次段合并的时候才真正被物理删除,这个时候磁盘空间才会开释。相同,在被查问到的文档标记删除过程同样须要占用磁盘空间,这个时候,你会发现触发该API操作的时候磁盘岂但没有被开释,反而磁盘使用率回升了。
3.3. 调优参数
可参考es官网 ElasticSearch API guide,在批量批改文档时,有很多参数能够配合调优。
这里先列举几个罕用的,剩下具体的请看官网文档:
1. refresh
ES的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为IO的操作是比拟费时的。
- 先写到内存中,此时不可搜寻。
- 默认通过 1s 之后会被写入 lucene 的底层文件 segment 中 ,此时能够搜寻到。
- refresh 之后才会写入磁盘
以上过程因为随时可能被中断导致数据失落,所以每一个过程都会有 translog 记录,如果两头有任何一步失败了,等服务器重启之后就会重试,保证数据写入。translog也是先存在内存里的,而后默认5秒刷一次写到硬盘里。
在 index ,Update , Delete , Bulk 等操作中,能够设置 refresh 的值。如下:
false:默认值。不要刷新相干的动作。在申请返回后,此申请所做的更改将在某个时刻显示。如:
创立一个文档,而不做任何使其能够搜寻的事件:PUT /test/test/1PUT /test/test/2?refresh=false
true或空字符串:更新数据之后,立即对相干的分片(包含正本) 刷新,这个刷新操作保障了数据更新的后果能够立即被搜寻到。
创立一个文档并立刻刷新索引,使其可见:PUT /test/test/1?refreshPUT /test/test/2?refresh=true
wait_for:期待申请所做的更改在返回之前通过冲刷显示。这不会强制立刻刷新,而是期待刷新产生。 Elasticsearch会主动每隔index.refresh_interval刷新曾经更改的分片,默认为1秒。该设置是动静的。
创立一个文档并期待它成为搜寻可见:PUT /test/test/1?refresh=wait_for
2. scroll_size
这个参数是执行删除的时候,每次每个线程会查问的数据量,而后进行删除。默认值是100,就是说每个线程每次都会查问出100条数据而后再删除。
3. slices
能够了解为,默认值是一个线程在进行查问数据并删除,当设置这个slices值时,会将es下的数据进行切分,启动多个task去做删除,了解为多线程执行操作。
然而就像不倡议滥用多线程一样,不倡议设置slices值太大,否则会导致es出问题。倡议设为索引分片数量的倍数(如:1倍、2倍),有助于基于每个分片的数据做切分。
4. conflicts
如果按查问删除遇到版本抵触,该怎么办,有两个值:
- abort:默认值,抵触时停止。
- proceed:抵触时持续。
举后面updateByQuery的例子。_update_by_query 在启动时获取索引的快照,并应用外部版本控制对其进行索引。这意味着如果文档在拍摄快照和解决索引申请之间发生变化,则会产生版本抵触。当版本匹配文档被更新并且版本号减少。
所有更新和查问失败导致 _update_by_query 停止并在响应失败中返回。已执行的更新依然保持。换句话说,过程没有回滚,只会停止。当第一个故障导致停止时,失败批量申请返回的所有故障都会返回到故障元素中;因而,有可能会有不少失败的实体。
如果你想简略地计算版本抵触,不会导致 _update_by_query停止,你能够在url设置conflicts=proceed
或在申请体设置"conflicts": "proceed"。如上例中改成:
POST /operation_log/_update_by_query?conflicts=proceed
4. 锁
Elasticsearch和数据库一样,在多线程并发拜访批改的状况下,会有一个锁机制来管制每次批改的均为最新的文档,外围是应用乐观锁的机制。
_version
在 Elasticsearch 通过 _version
来记录文档的版本。第一次创立一个document的时候,它的_version外部版本号就是1;当前,每次对这个document执行批改或者删除操作,都会对这个_version版本号主动加1;哪怕是删除,也会对这条数据的版本号加1
因为 segment 时不能被批改的,所以当对一个文档执行 DELETE 之后,在插入雷同id的文档,version 版本不会是0,而是在 DELETE 操作的version上递增。
在对文档进行批改和删除时,version 会递增,也能够由用户指定。只有当版本号大于以后版本时,才会批改删除胜利,否则失败。当并发申请时,先批改胜利的,version 会减少,这个时候其余申请就会犹豫 version 不匹配从而批改失败。
external version
es提供了一个feature,就是说,你能够不必它提供的外部_version版本号来进行并发管制,能够基于你本人保护的一个版本号来进行并发管制。
举个例子,如果你的数据在mysql里也有一份,而后你的利用零碎自身就保护了一个版本号,无论是什么本人生成的或程序控制的。这个时候,你进行乐观锁并发管制的时候,可能并不是想要用es外部的_version来进行管制,而是用你本人保护的那个version来进行管制。
?version=1 基于_version?version=1&version_type=external 基于external version
_version与version_type=external惟一的区别在于:
- _version,只有当你提供的version与es中的_version截然不同的时候,才能够进行批改,只有不一样,就报错。
- 当version_type=external的时候,只有当你提供的version比es中的_version大的时候,能力实现批改。
es,_version=1, ?version=1,能力更新胜利es,_version=1, ?version>1&version_type=external,能力胜利,比如说:?version=2&version_type=external
援用:
- Spring Data Elasticsearch 官网
- ElasticSearch系列-并发机制
- ElasticSearch API guide