乐趣区

关于java:ElasticsearchES重建索引怎么才能做到数据无缝迁移呢

背景

家喻户晓,Elasticsearch 是⼀个实时的分布式搜索引擎,为⽤户提供搜寻服务。当咱们决定存储某种数据,在创立索引的时候就须要将数据结构,即 Mapping 确定下来,于此同时索引的设定和很多固定配置将不能扭转。
<!– more –>
那如果后续业务发生变化,须要扭转数据结构或者更换 ES 更换分词器怎么办呢?为此,Elastic 团队提供了很多通过辅助⼯具来帮忙开发⼈员进⾏重建索引的计划。
如果对 reindex API 不相熟,那么在遇到重构的时候,必然事倍功半,效率低下。反之,就能够不便地进行索引重构,省时省力。

步骤

假如之前咱们曾经存在一个 blog 索引,因为更换分词器须要对该索引中的数据进行重建索引,以便反对业务应用新的分词规定搜寻数据,并且尽可能使这个变动对外服务没有感知,大略分为以下几个步骤:​

  • 新增⼀个索引 blog_lastest,Mapping 数据结构与blog 索引统一
  • blog 数据同步至blog_lastest
  • 删除 blog 索引
  • 数据同步后给 blog_lastest 增加别名blog

新建索引

在这里举荐一个 ES 管理工具Kibana,次要针对数据的摸索、可视化和剖析。

put /blog_lastest/
{
    "mappings":{
        "properties":{
            "title":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "author":{
                "type":"keyword",
                "fields":{
                    "seg":{
                        "type":"text",
                        "analyzer":"ik_max_word"
                    }
                }
            }
        }
    }
}

将旧索引数据 copy 到新索引

同步期待

接⼝将会在 reindex 完结后返回

POST /_reindex
{
    "source": {"index": "blog"},
    "dest": {"index": "blog_lastest"}
}

kibana 中的应用如下所示

当然高版本(7.1.1)中,ES 都有提供对应的Java REST Client,比方

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

为了避免赘述,接下来举例全副以 kibana 中申请介绍,如果有须要用Java REST Client,能够自行去 ES 官网查看。

异步执⾏

如果 reindex 工夫过⻓,倡议加上 wait_for_completion=false 的参数条件,这样 reindex 将间接返回 taskId

POST /_reindex?wait_for_completion=false
{
    "source": {"index": "blog"},
    "dest": {"index": "blog_lastest"}
}

返回:

{"task" : "dpBihNSMQfSlboMGlTgCBA:4728038"}

op_type 参数

op_type 参数管制着写入数据的抵触解决形式,如果把 op_type 设置为 create【默认值】,在 _reindex API 中,示意写入时只在 dest index中增加不存在的 doucment,如果雷同的 document 曾经存在,则会报 version confilct 的谬误,那么索引操作就会失败。【这种形式与应用 _create API 时成果统一】

POST _reindex
{
  "source": {"index": "blog"},
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  }
}

如果这样设置了,也就不存在更新数据的场景了【抵触数据无奈写入】,咱们也能够把 op_type 设置为 index,示意所有的数据全副从新索引创立。

conflicts 配置

默认状况下,当产生 version conflict 的时候,_reindex 会被 abort,工作终止【此时数据还没有 reindex 实现】,在返回体中的 failures 指标中会蕴含抵触的数据【有时候数据会十分多】,除非把 conflicts 设置为 proceed

对于 abort 的阐明,如果产生了 abort,曾经执行的数据【例如更新写入的】依然存在于指标索引,此时工作终止,还会有数据没有被执行,也就是漏数了。换句话说,该执行过程不会回滚,只会终止。如果设置了 proceed,工作在检测到数据抵触的状况下,不会终止,会跳过抵触数据继续执行,直到所有数据执行实现,此时不会漏掉失常的数据,只会漏掉有抵触的数据。

POST _reindex
{
  "source": {"index": "blog"},
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  },
  "conflicts": "proceed"
}

咱们能够成心把 op_type 设置为 create,人为制作数据抵触的场景,测试时更容易察看到抵触景象。

如果把 conflicts 设置为 proceed,在返回体后果中不会再呈现 failures 的信息,然而通过 version_conflicts 指标能够看到具体的数量。

批次大小配置

当你发现 reindex 的速度有些慢的时候,能够在 query 参数的同一档次【即 source 参数中】增加 size 参数,示意 scroll size 的大小【会影响批次的次数,进而影响整体的速度】,如果不显式设置,默认是一批 1000 条数据,在一开始的简略示例中也看到了。
如下,设置 scroll size 为 5000:

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog",
        "size":5000
    },
    "dest": {
        "index": "blog_lastest",
        "op_type": "create"
    },
    "conflicts": "proceed"
}

测试后,速度达到了 30 分钟 500 万左右,显著晋升了很多。

依据 taskId 能够实时查看工作的执行状态

一般来说,如果咱们的 source index 很大【比方几百万数据量】,则可能须要比拟长的工夫来实现 _reindex 的工作,可能须要几十分钟。而在此期间不可能始终期待后果返回,能够去做其它事件,如果中途须要查看进度,能够通过 _tasks API 进行查看。

GET /_tasks/{taskId}

返回:

{
  "completed" : false,
  "task" : {
    "node" : "dpBihNSMQfSlboMGlTgCBA",
    "id" : 4704218,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    ……
}

当执行结束时,completed为 true
查看工作进度以及勾销工作,除了依据 taskId 查看以外,咱们还能够通过查看所有的工作中筛选本次 reindex 的工作。

GET _tasks?detailed=true&actions=*reindex

返回后果:

{
  "nodes" : {
    "dpBihNSMQfSlboMGlTgCBA" : {
      "name" : "node-16111-9210",
      "transport_address" : "192.168.XXX.XXX:9310",
      "host" : "192.168.XXX.XXX",
      "ip" : "192.168.16.111:9310",
      "roles" : [
        "ingest",
        "master"
      ],
      "attributes" : {
        "xpack.installed" : "true",
        "transform.node" : "false"
      },
      "tasks" : {
        "dpBihNSMQfSlboMGlTgCBA:6629305" : {
          "node" : "dpBihNSMQfSlboMGlTgCBA",
          "id" : 6629305,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {
            "total" : 8361421,
            "updated" : 0,
            "created" : 254006,
            "deleted" : 0,
            "batches" : 743,
            "version_conflicts" : 3455994,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until_millis" : 0
          },
          "description" : "reindex from [blog] to [blog_lastest][_doc]",
          "start_time_in_millis" : 1609338953464,
          "running_time_in_nanos" : 1276738396689,
          "cancellable" : true,
          "headers" : {}}
      }
    }
  }
}

留神察看外面的几个重要指标,例如从 description 中能够看到工作形容,从 tasks 中能够找到工作的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,从 cancellable 能够判断工作是否反对勾销操作。
这个 API 其实就是含糊匹配,同理也能够查问其它类型的工作信息,例如应用 GET _tasks?detailed=true&actions=*byquery 查看查问申请的状态。
当集群的工作太多时咱们就能够依据 task_id,也就是下面提到GET /_tasks/task_id 形式更加精确地查问指定工作的状态,防止集群的工作过多,不不便查看。
如果遇到操作失误的场景,想勾销工作,有没有方法呢?
当然有啦,尽管覆水难收,通过调用
_tasks API

POST _tasks/task_id/_cancel

这里的 task_id 就是通过下面的查问工作接口获取的工作 id(工作要反对勾销操作,即【cancellable 为 true】时方能收效)。

删除旧索引

当咱们通过 API 查问发现工作实现后,就能够进行后续操作,我这里是要删除旧索引,而后再给新索引起别名,用于替换旧索引,这样能力保障对外服务没有任何感知。

DELETE /blog

应用别名

POST /_aliases
{
    "actions":[
        {
            "add":{
                "index":"blog_lastest",
                "alias":"blog"
            }
        }
    ]
}

通过别名拜访新索引

进行过以上操作后,咱们能够应用一个简略的搜寻验证服务。

POST /blog/_search
{
    "query": {
        "match": {"author": "james"}
    }
}

如果搜寻后果达到咱们的预期指标,至此,数据索引重建迁徙实现。

本文可转载,但需申明原文出处。程序员小明,一个很少加班的程序员。欢送关注微信公众号“程序员小明”,获取更多优质文章。

退出移动版