关于elasticsearch:elasticsearch的开发应用3

36次阅读

共计 6577 个字符,预计需要花费 17 分钟才能阅读完成。

后面的文章外面次要讲的是查问的用法,还是连续之前的文章格局,这里讲讲批改。

1. 单文档批改

1.1. insert

其实在数据筹备阶段曾经有新增的例子了。

DSL

POST /operation_log/_doc
{
  "ip": "0.0.0.0",
  "module": "测试数据"
}

spring

        OperationLog operationLog=new OperationLog();
        operationLog.setIp("0.0.0.0");
        operationLog.setModule("测试数据");
        return esRestTemplate.save(operationLog);

1.2. update-(save)

新增时,springboot 用到的是 save 办法,更新时也一样能够。不过得拿到文档的 id,这里 id=13OkA4QBMgWicIn2wBwM。

DSL

PUT /operation_log/_doc/13OkA4QBMgWicIn2wBwM
{
  "ip": "0.0.0.0",
  "module": "测试数据 1"
}

spring

esRestTemplate.save(operationLog);

1.3. update-(document)

DSL

POST /operation_log/_update/13OkA4QBMgWicIn2wBwM
{
  "doc": {"module":"测试数据 1"}
}

spring

        Document document = Document.create();
        document.put("module", "测试数据 1");
        UpdateQuery updateQuery = UpdateQuery
                .builder(id)
                .withDocument(document)
                .build();
        esRestTemplate.update(updateQuery,IndexCoordinates.of("operation_log"));

1.4. update-(script)

DSL

POST /operation_log/_update/13OkA4QBMgWicIn2wBwM
{
  "script": {
    "source": "ctx._source.module = params.module",
    "params": {"module": "测试数据 1"}
  }
}

spring

        Map<String, Object> params = new HashMap<>();
        params.put("module", "测试数据 1");
        UpdateQuery updateQuery = UpdateQuery
                .builder(id)
                .withScript("ctx._source.module = params.module")
                .withParams(params)
                .build();
        esRestTemplate.update(updateQuery, IndexCoordinates.of("operation_log"));

1.5. delete

DSL

DELETE /operation_log/_doc/13OkA4QBMgWicIn2wBwM

spring

        esRestTemplate.delete(id, OperationLog.class);

2. 批量批改 bulk

批量新增 DSL

POST /operation_log/_bulk
{"create":{"_index":"operation_log"}}
{"ip":"0.0.0.0","module":"测试数据 1"}
{"create":{"_index":"operation_log"}}
{"ip":"0.0.0.0","module":"测试数据 2"}
{"create":{"_index":"operation_log"}}
{"ip":"0.0.0.0","module":"测试数据 3"}

批量更新 DSL

POST /operation_log/_bulk
{"update":{"_id":"2HP9A4QBMgWicIn26BzR"}}
{"doc":{"module":"测试数据 11"}}
{"update":{"_id":"2XP9A4QBMgWicIn26BzR"}}
{"script":{"source":"ctx._source.module = params.module","params":{"module":"测试数据 22"}}}

批量删除 DSL

POST /operation_log/_bulk
{"delete":{"_id":"2HP9A4QBMgWicIn26BzR"}}
{"delete":{"_id":"2XP9A4QBMgWicIn26BzR"}}
{"delete":{"_id":"2nP9A4QBMgWicIn26BzR"}}

不知是否留神到,在批量更新的语句中,反对同时 doc、script 两种更新形式。实际上来说,_bulk 其实反对同时将上述的三种语句一起提交执行。
不过我的项目上个别不会如此利用,都是独自离开来。像批量新增,save 办法就反对批量新增操作,尽管底层代码还是调用 bulkOperation

spring bulkUpdate

    @PatchMapping("bulk-update")
    public void bulkUpdate() {Map<String, Object> params = new HashMap<>();
        params.put("module", "测试数据 2");
        String scriptStr = "ctx._source.module = params.module";
        Query query = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))
                .build();
        List<UpdateQuery> updateQueryList = esRestTemplate.search(query, OperationLog.class)
                .stream()
                .map(SearchHit::getContent)
                .map(obj -> UpdateQuery.builder(obj.getId())
                        .withScript(scriptStr)
                        .withParams(params)
                        .build())
                .collect(Collectors.toList());
        esRestTemplate.bulkUpdate(updateQueryList, OperationLog.class);
    }

无关更具体、更好应用 bulk 的局部,倡议查看 es 官网材料

3. 批改 ByQuery

3.1. updateByQuery

DSL

POST /operation_log/_update_by_query
{
  "script": {
    "source": "ctx._source.module = params.module",
    "params": {"module": "测试数据 1"}
  },
  "query": {
    "term": {"ip": "0.0.0.0"}
  }
}

spring

    @PatchMapping("update-by-query")
    public void updateByQuery() {Map<String, Object> params = new HashMap<>();
        params.put("module", "测试数据 2");
        String scriptStr = "ctx._source.module = params.module";
        UpdateQuery updateQuery = UpdateQuery
                .builder(new NativeSearchQueryBuilder()
                        .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))
                        .build())
                .withScript(scriptStr)
                .withScriptType(ScriptType.INLINE)
                .withLang("painless")
                .withParams(params)
                .build();
        esRestTemplate.updateByQuery(updateQuery, IndexCoordinates.of("operation_log"));
    }

能够比照一下下面的 bulkUpdate 办法,发现有些不同:

  • updateByQuery 只反对 Script,不反对 Document 的形式更新。
  • updateByQuery 应用 Script 形式更新时,必须传递 scriptTypeLang 这些辅助参数。本来 bulkUpdate 中也是要传的,只不过底层办法封装了,然而没有给 updateByQuery 封装。(理论踩过坑,看封装办法才得悉)

3.2. deleteByQuery

DSL

POST /operation_log/_delete_by_query
{
  "query": {
    "term": {"ip": "0.0.0.0"}
  }
}

spring

        Query query = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.termQuery("ip", "0.0.0.0"))
                .build();
        esRestTemplate.delete(query, OperationLog.class);

delete_by_query 并不是真正意义上物理文档删除,而是只是版本变动并且对文档减少了删除标记。当咱们再次搜寻的时候,会搜寻全副而后过滤掉有删除标记的文档。因而,该索引所占的空间并不会随着该 API 的操作磁盘空间会马上开释掉,只有等到下一次段合并的时候才真正被物理删除,这个时候磁盘空间才会开释。相同,在被查问到的文档标记删除过程同样须要占用磁盘空间,这个时候,你会发现触发该 API 操作的时候磁盘岂但没有被开释,反而磁盘使用率回升了。

3.3. 调优参数

可参考 es 官网 ElasticSearch API guide,在批量批改文档时,有很多参数能够配合调优。

这里先列举几个罕用的,剩下具体的请看官网文档:

1. refresh

ES 的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为 IO 的操作是比拟费时的。

  • 先写到内存中,此时不可搜寻。
  • 默认通过 1s 之后会被写入 lucene 的底层文件 segment 中,此时能够搜寻到。
  • refresh 之后才会写入磁盘

以上过程因为随时可能被中断导致数据失落,所以每一个过程都会有 translog 记录,如果两头有任何一步失败了,等服务器重启之后就会重试,保证数据写入。translog 也是先存在内存里的,而后默认 5 秒刷一次写到硬盘里。

在 index,Update , Delete , Bulk 等操作中,能够设置 refresh 的值。如下:

  • false:默认值。不要刷新相干的动作。在申请返回后,此申请所做的更改将在某个时刻显示。如:

     创立一个文档,而不做任何使其能够搜寻的事件:PUT /test/test/1
    PUT /test/test/2?refresh=false
  • true 或空字符串 :更新数据之后,立即对相干的分片 (包含正本) 刷新,这个刷新操作保障了数据更新的后果能够立即被搜寻到。

     创立一个文档并立刻刷新索引,使其可见:PUT /test/test/1?refresh
    PUT /test/test/2?refresh=true
  • wait_for:期待申请所做的更改在返回之前通过冲刷显示。这不会强制立刻刷新,而是期待刷新产生。Elasticsearch 会主动每隔 index.refresh_interval 刷新曾经更改的分片,默认为 1 秒。该设置是动静的。

     创立一个文档并期待它成为搜寻可见:PUT /test/test/1?refresh=wait_for

2. scroll_size

这个参数是执行删除的时候,每次每个线程会查问的数据量,而后进行删除。默认值是 100,就是说每个线程每次都会查问出 100 条数据而后再删除。

3. slices

能够了解为,默认值是一个线程在进行查问数据并删除,当设置这个 slices 值时,会将 es 下的数据进行切分,启动多个 task 去做删除,了解为多线程执行操作。

然而就像不倡议滥用多线程一样,不倡议设置 slices 值太大,否则会导致 es 出问题。倡议设为索引分片数量的倍数(如:1 倍、2 倍),有助于基于每个分片的数据做切分。

4. conflicts

如果按查问删除遇到版本抵触,该怎么办,有两个值:

  • abort:默认值,抵触时停止。
  • proceed:抵触时持续。

举后面 updateByQuery 的例子。_update_by_query 在启动时获取索引的快照,并应用外部版本控制对其进行索引。这意味着如果文档在拍摄快照和解决索引申请之间发生变化,则会产生版本抵触。当版本匹配文档被更新并且版本号减少。

所有更新和查问失败导致 _update_by_query 停止并在响应失败中返回。已执行的更新依然保持。换句话说,过程没有回滚,只会停止。当第一个故障导致停止时,失败批量申请返回的所有故障都会返回到故障元素中;因而,有可能会有不少失败的实体。

如果你想简略地计算版本抵触,不会导致 _update_by_query 停止,你能够在 url 设置 conflicts=proceed 或在申请体设置 ”conflicts”: “proceed”。如上例中改成:

POST /operation_log/_update_by_query?conflicts=proceed

4. 锁

Elasticsearch 和数据库一样,在多线程并发拜访批改的状况下,会有一个锁机制来管制每次批改的均为最新的文档,外围是应用乐观锁的机制。

_version

在 Elasticsearch 通过 _version 来记录文档的版本。第一次创立一个 document 的时候,它的_version 外部版本号就是 1;当前,每次对这个 document 执行批改或者删除操作,都会对这个_version 版本号主动加 1;哪怕是删除,也会对这条数据的版本号加 1

因为 segment 时不能被批改的,所以当对一个文档执行 DELETE 之后,在插入雷同 id 的文档,version 版本不会是 0,而是在 DELETE 操作的 version 上递增。

在对文档进行批改和删除时,version 会递增,也能够由用户指定。只有当版本号大于以后版本时,才会批改删除胜利,否则失败。当并发申请时,先批改胜利的,version 会减少,这个时候其余申请就会犹豫 version 不匹配从而批改失败。

external version

es 提供了一个 feature,就是说,你能够不必它提供的外部_version 版本号来进行并发管制,能够基于你本人保护的一个版本号来进行并发管制。

举个例子,如果你的数据在 mysql 里也有一份,而后你的利用零碎自身就保护了一个版本号,无论是什么本人生成的或程序控制的。这个时候,你进行乐观锁并发管制的时候,可能并不是想要用 es 外部的_version 来进行管制,而是用你本人保护的那个 version 来进行管制。

?version=1   基于_version

?version=1&version_type=external   基于 external version

_version 与 version_type=external 惟一的区别在于:

  • _version,只有当你提供的 version 与 es 中的_version 截然不同的时候,才能够进行批改,只有不一样,就报错。
  • 当 version_type=external 的时候,只有当你提供的 version 比 es 中的_version 大的时候,能力实现批改。
es,_version=1,?version=1,能力更新胜利

es,_version=1,?version>1&version_type=external,能力胜利,比如说:?version=2&version_type=external

援用:

  • Spring Data Elasticsearch 官网
  • ElasticSearch 系列 - 并发机制
  • ElasticSearch API guide

正文完
 0