后面的文章外面次要讲的是查问的用法,还是连续之前的文章格局,这里讲讲批改。

1. 单文档批改

1.1. insert

其实在数据筹备阶段曾经有新增的例子了。

DSL
POST /operation_log/_doc{  "ip": "0.0.0.0",  "module": "测试数据"}
spring
        OperationLog operationLog=new OperationLog();        operationLog.setIp("0.0.0.0");        operationLog.setModule("测试数据");        return esRestTemplate.save(operationLog);

1.2. update-(save)

新增时,springboot 用到的是 save 办法,更新时也一样能够。不过得拿到文档的id,这里id=13OkA4QBMgWicIn2wBwM。

DSL
PUT /operation_log/_doc/13OkA4QBMgWicIn2wBwM{  "ip": "0.0.0.0",  "module": "测试数据1"}
spring
esRestTemplate.save(operationLog);

1.3. update-(document)

DSL
POST /operation_log/_update/13OkA4QBMgWicIn2wBwM{  "doc": {    "module":"测试数据1"  }}
spring
        Document document = Document.create();        document.put("module", "测试数据1");        UpdateQuery updateQuery = UpdateQuery                .builder(id)                .withDocument(document)                .build();        esRestTemplate.update(updateQuery,IndexCoordinates.of("operation_log"));

1.4. update-(script)

DSL
POST /operation_log/_update/13OkA4QBMgWicIn2wBwM{  "script": {    "source": "ctx._source.module = params.module",    "params": {      "module": "测试数据1"    }  }}
spring
        Map<String, Object> params = new HashMap<>();        params.put("module", "测试数据1");        UpdateQuery updateQuery = UpdateQuery                .builder(id)                .withScript("ctx._source.module = params.module")                .withParams(params)                .build();        esRestTemplate.update(updateQuery, IndexCoordinates.of("operation_log"));

1.5. delete

DSL
DELETE /operation_log/_doc/13OkA4QBMgWicIn2wBwM
spring
        esRestTemplate.delete(id, OperationLog.class);

2. 批量批改 bulk

批量新增 DSL
POST /operation_log/_bulk{"create":{"_index":"operation_log"}}{"ip":"0.0.0.0","module":"测试数据1"}{"create":{"_index":"operation_log"}}{"ip":"0.0.0.0","module":"测试数据2"}{"create":{"_index":"operation_log"}}{"ip":"0.0.0.0","module":"测试数据3"}
批量更新 DSL
POST /operation_log/_bulk{"update":{"_id":"2HP9A4QBMgWicIn26BzR"}}{"doc":{"module":"测试数据11"}}{"update":{"_id":"2XP9A4QBMgWicIn26BzR"}}{"script":{"source":"ctx._source.module = params.module","params":{"module":"测试数据22"}}}
批量删除 DSL
POST /operation_log/_bulk{"delete":{"_id":"2HP9A4QBMgWicIn26BzR"}}{"delete":{"_id":"2XP9A4QBMgWicIn26BzR"}}{"delete":{"_id":"2nP9A4QBMgWicIn26BzR"}}

不知是否留神到,在批量更新的语句中,反对同时 doc、script 两种更新形式。实际上来说,_bulk 其实反对同时将上述的三种语句一起提交执行。
不过我的项目上个别不会如此利用,都是独自离开来。像批量新增,save 办法就反对批量新增操作,尽管底层代码还是调用 bulkOperation

spring bulkUpdate
    @PatchMapping("bulk-update")    public void bulkUpdate() {        Map<String, Object> params = new HashMap<>();        params.put("module", "测试数据2");        String scriptStr = "ctx._source.module = params.module";        Query query = new NativeSearchQueryBuilder()                .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))                .build();        List<UpdateQuery> updateQueryList = esRestTemplate.search(query, OperationLog.class)                .stream()                .map(SearchHit::getContent)                .map(obj -> UpdateQuery.builder(obj.getId())                        .withScript(scriptStr)                        .withParams(params)                        .build())                .collect(Collectors.toList());        esRestTemplate.bulkUpdate(updateQueryList, OperationLog.class);    }

无关更具体、更好应用 bulk的局部,倡议查看 es官网材料

3. 批改ByQuery

3.1. updateByQuery

DSL
POST /operation_log/_update_by_query{  "script": {    "source": "ctx._source.module = params.module",    "params": {      "module": "测试数据1"    }  },  "query": {    "term": {      "ip": "0.0.0.0"    }  }}
spring
    @PatchMapping("update-by-query")    public void updateByQuery() {        Map<String, Object> params = new HashMap<>();        params.put("module", "测试数据2");        String scriptStr = "ctx._source.module = params.module";        UpdateQuery updateQuery = UpdateQuery                .builder(new NativeSearchQueryBuilder()                        .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))                        .build())                .withScript(scriptStr)                .withScriptType(ScriptType.INLINE)                .withLang("painless")                .withParams(params)                .build();        esRestTemplate.updateByQuery(updateQuery, IndexCoordinates.of("operation_log"));    }

能够比照一下下面的 bulkUpdate 办法,发现有些不同:

  • updateByQuery 只反对Script,不反对 Document 的形式更新。
  • updateByQuery 应用 Script 形式更新时,必须传递 scriptTypeLang 这些辅助参数。本来 bulkUpdate 中也是要传的,只不过底层办法封装了,然而没有给 updateByQuery 封装。(理论踩过坑,看封装办法才得悉)

3.2. deleteByQuery

DSL
POST /operation_log/_delete_by_query{  "query": {    "term": {      "ip": "0.0.0.0"    }  }}
spring
        Query query = new NativeSearchQueryBuilder()                .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))                .build();        esRestTemplate.delete(query, OperationLog.class);

delete_by_query并不是真正意义上物理文档删除,而是只是版本变动并且对文档减少了删除标记。当咱们再次搜寻的时候,会搜寻全副而后过滤掉有删除标记的文档。因而,该索引所占的空间并不会随着该API的操作磁盘空间会马上开释掉,只有等到下一次段合并的时候才真正被物理删除,这个时候磁盘空间才会开释。相同,在被查问到的文档标记删除过程同样须要占用磁盘空间,这个时候,你会发现触发该API操作的时候磁盘岂但没有被开释,反而磁盘使用率回升了。

3.3. 调优参数

可参考es官网 ElasticSearch API guide,在批量批改文档时,有很多参数能够配合调优。

这里先列举几个罕用的,剩下具体的请看官网文档:

1. refresh

ES的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为IO的操作是比拟费时的。

  • 先写到内存中,此时不可搜寻。
  • 默认通过 1s 之后会被写入 lucene 的底层文件 segment 中 ,此时能够搜寻到。
  • refresh 之后才会写入磁盘

以上过程因为随时可能被中断导致数据失落,所以每一个过程都会有 translog 记录,如果两头有任何一步失败了,等服务器重启之后就会重试,保证数据写入。translog也是先存在内存里的,而后默认5秒刷一次写到硬盘里。

在 index ,Update , Delete , Bulk 等操作中,能够设置 refresh 的值。如下:

  • false:默认值。不要刷新相干的动作。在申请返回后,此申请所做的更改将在某个时刻显示。如:

    创立一个文档,而不做任何使其能够搜寻的事件:PUT /test/test/1PUT /test/test/2?refresh=false
  • true或空字符串:更新数据之后,立即对相干的分片(包含正本) 刷新,这个刷新操作保障了数据更新的后果能够立即被搜寻到。

    创立一个文档并立刻刷新索引,使其可见:PUT /test/test/1?refreshPUT /test/test/2?refresh=true
  • wait_for:期待申请所做的更改在返回之前通过冲刷显示。这不会强制立刻刷新,而是期待刷新产生。 Elasticsearch会主动每隔index.refresh_interval刷新曾经更改的分片,默认为1秒。该设置是动静的。

    创立一个文档并期待它成为搜寻可见:PUT /test/test/1?refresh=wait_for
2. scroll_size

这个参数是执行删除的时候,每次每个线程会查问的数据量,而后进行删除。默认值是100,就是说每个线程每次都会查问出100条数据而后再删除。

3. slices

能够了解为,默认值是一个线程在进行查问数据并删除,当设置这个slices值时,会将es下的数据进行切分,启动多个task去做删除,了解为多线程执行操作。

然而就像不倡议滥用多线程一样,不倡议设置slices值太大,否则会导致es出问题。倡议设为索引分片数量的倍数(如:1倍、2倍),有助于基于每个分片的数据做切分。

4. conflicts

如果按查问删除遇到版本抵触,该怎么办,有两个值:

  • abort:默认值,抵触时停止。
  • proceed:抵触时持续。

举后面updateByQuery的例子。_update_by_query 在启动时获取索引的快照,并应用外部版本控制对其进行索引。这意味着如果文档在拍摄快照和解决索引申请之间发生变化,则会产生版本抵触。当版本匹配文档被更新并且版本号减少。

所有更新和查问失败导致 _update_by_query 停止并在响应失败中返回。已执行的更新依然保持。换句话说,过程没有回滚,只会停止。当第一个故障导致停止时,失败批量申请返回的所有故障都会返回到故障元素中;因而,有可能会有不少失败的实体。

如果你想简略地计算版本抵触,不会导致 _update_by_query停止,你能够在url设置conflicts=proceed 或在申请体设置"conflicts": "proceed"。如上例中改成:

POST /operation_log/_update_by_query?conflicts=proceed

4. 锁

Elasticsearch和数据库一样,在多线程并发拜访批改的状况下,会有一个锁机制来管制每次批改的均为最新的文档,外围是应用乐观锁的机制。

_version

在 Elasticsearch 通过 _version 来记录文档的版本。第一次创立一个document的时候,它的_version外部版本号就是1;当前,每次对这个document执行批改或者删除操作,都会对这个_version版本号主动加1;哪怕是删除,也会对这条数据的版本号加1

因为 segment 时不能被批改的,所以当对一个文档执行 DELETE 之后,在插入雷同id的文档,version 版本不会是0,而是在 DELETE 操作的version上递增。

在对文档进行批改和删除时,version 会递增,也能够由用户指定。只有当版本号大于以后版本时,才会批改删除胜利,否则失败。当并发申请时,先批改胜利的,version 会减少,这个时候其余申请就会犹豫 version 不匹配从而批改失败。

external version

es提供了一个feature,就是说,你能够不必它提供的外部_version版本号来进行并发管制,能够基于你本人保护的一个版本号来进行并发管制。

举个例子,如果你的数据在mysql里也有一份,而后你的利用零碎自身就保护了一个版本号,无论是什么本人生成的或程序控制的。这个时候,你进行乐观锁并发管制的时候,可能并不是想要用es外部的_version来进行管制,而是用你本人保护的那个version来进行管制。

?version=1   基于_version?version=1&version_type=external   基于external version

_version与version_type=external惟一的区别在于:

  • _version,只有当你提供的version与es中的_version截然不同的时候,才能够进行批改,只有不一样,就报错。
  • 当version_type=external的时候,只有当你提供的version比es中的_version大的时候,能力实现批改。
es,_version=1,    ?version=1,能力更新胜利es,_version=1,    ?version>1&version_type=external,能力胜利,比如说:?version=2&version_type=external

援用:

  • Spring Data Elasticsearch 官网
  • ElasticSearch系列-并发机制
  • ElasticSearch API guide