背景

家喻户晓,Elasticsearch是⼀个实时的分布式搜索引擎,为⽤户提供搜寻服务。当咱们决定存储某种数据,在创立索引的时候就须要将数据结构,即Mapping确定下来,于此同时索引的设定和很多固定配置将不能扭转。
<!-- more -->
那如果后续业务发生变化,须要扭转数据结构或者更换ES更换分词器怎么办呢?为此,Elastic团队提供了很多通过辅助⼯具来帮忙开发⼈员进⾏重建索引的计划。
如果对 reindex API 不相熟,那么在遇到重构的时候,必然事倍功半,效率低下。反之,就能够不便地进行索引重构,省时省力。

步骤

假如之前咱们曾经存在一个blog索引,因为更换分词器须要对该索引中的数据进行重建索引,以便反对业务应用新的分词规定搜寻数据,并且尽可能使这个变动对外服务没有感知,大略分为以下几个步骤:

  • 新增⼀个索引blog_lastest,Mapping数据结构与blog索引统一
  • blog数据同步至blog_lastest
  • 删除blog索引
  • 数据同步后给blog_lastest增加别名blog

新建索引

在这里举荐一个ES管理工具Kibana,次要针对数据的摸索、可视化和剖析。

put /blog_lastest/{    "mappings":{        "properties":{            "title":{                "type":"text",                "analyzer":"ik_max_word"            },            "author":{                "type":"keyword",                "fields":{                    "seg":{                        "type":"text",                        "analyzer":"ik_max_word"                    }                }            }        }    }}

将旧索引数据copy到新索引

同步期待

接⼝将会在 reindex 完结后返回

POST /_reindex{    "source": {        "index": "blog"    },    "dest": {        "index": "blog_lastest"    }}

kibana 中的应用如下所示

当然高版本(7.1.1)中,ES都有提供对应的Java REST Client,比方

ReindexRequest reindexRequest = new ReindexRequest();reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

为了避免赘述,接下来举例全副以kibana中申请介绍,如果有须要用Java REST Client,能够自行去ES官网查看。

异步执⾏

如果 reindex 工夫过⻓,倡议加上 wait_for_completion=false 的参数条件,这样 reindex 将间接返回 taskId

POST /_reindex?wait_for_completion=false{    "source": {        "index": "blog"    },    "dest": {        "index": "blog_lastest"    }}

返回:

{  "task" : "dpBihNSMQfSlboMGlTgCBA:4728038"}

op_type 参数

op_type 参数管制着写入数据的抵触解决形式,如果把 op_type 设置为 create【默认值】,在 _reindex API 中,示意写入时只在 dest index中增加不存在的 doucment,如果雷同的 document 曾经存在,则会报 version confilct 的谬误,那么索引操作就会失败。【这种形式与应用 _create API 时成果统一】

POST _reindex{  "source": {    "index": "blog"  },  "dest": {    "index": "blog_lastest",    "op_type": "create"  }}

如果这样设置了,也就不存在更新数据的场景了【抵触数据无奈写入】,咱们也能够把 op_type 设置为 index,示意所有的数据全副从新索引创立。

conflicts 配置

默认状况下,当产生 version conflict 的时候,_reindex 会被 abort,工作终止【此时数据还没有 reindex 实现】,在返回体中的 failures 指标中会蕴含抵触的数据【有时候数据会十分多】,除非把 conflicts 设置为 proceed

对于 abort 的阐明,如果产生了 abort,曾经执行的数据【例如更新写入的】依然存在于指标索引,此时工作终止,还会有数据没有被执行,也就是漏数了。换句话说,该执行过程不会回滚,只会终止。如果设置了 proceed,工作在检测到数据抵触的状况下,不会终止,会跳过抵触数据继续执行,直到所有数据执行实现,此时不会漏掉失常的数据,只会漏掉有抵触的数据。

POST _reindex{  "source": {    "index": "blog"  },  "dest": {    "index": "blog_lastest",    "op_type": "create"  },  "conflicts": "proceed"}

咱们能够成心把 op_type 设置为 create,人为制作数据抵触的场景,测试时更容易察看到抵触景象。

如果把 conflicts 设置为 proceed,在返回体后果中不会再呈现 failures 的信息,然而通过 version_conflicts 指标能够看到具体的数量。

批次大小配置

当你发现reindex的速度有些慢的时候,能够在 query 参数的同一档次【即 source 参数中】增加 size 参数,示意 scroll size 的大小【会影响批次的次数,进而影响整体的速度】,如果不显式设置,默认是一批 1000 条数据,在一开始的简略示例中也看到了。
如下,设置 scroll size 为 5000:

POST /_reindex?wait_for_completion=false{    "source": {        "index": "blog",        "size":5000    },    "dest": {        "index": "blog_lastest",        "op_type": "create"    },    "conflicts": "proceed"}

测试后,速度达到了 30 分钟 500 万左右,显著晋升了很多。

依据taskId能够实时查看工作的执行状态

一般来说,如果咱们的 source index 很大【比方几百万数据量】,则可能须要比拟长的工夫来实现 _reindex 的工作,可能须要几十分钟。而在此期间不可能始终期待后果返回,能够去做其它事件,如果中途须要查看进度,能够通过 _tasks API 进行查看。

GET /_tasks/{taskId}

返回:

{  "completed" : false,  "task" : {    "node" : "dpBihNSMQfSlboMGlTgCBA",    "id" : 4704218,    "type" : "transport",    "action" : "indices:data/write/reindex",    ……}

当执行结束时,completed为true
查看工作进度以及勾销工作,除了依据taskId查看以外,咱们还能够通过查看所有的工作中筛选本次reindex的工作。

GET _tasks?detailed=true&actions=*reindex

返回后果:

{  "nodes" : {    "dpBihNSMQfSlboMGlTgCBA" : {      "name" : "node-16111-9210",      "transport_address" : "192.168.XXX.XXX:9310",      "host" : "192.168.XXX.XXX",      "ip" : "192.168.16.111:9310",      "roles" : [        "ingest",        "master"      ],      "attributes" : {        "xpack.installed" : "true",        "transform.node" : "false"      },      "tasks" : {        "dpBihNSMQfSlboMGlTgCBA:6629305" : {          "node" : "dpBihNSMQfSlboMGlTgCBA",          "id" : 6629305,          "type" : "transport",          "action" : "indices:data/write/reindex",          "status" : {            "total" : 8361421,            "updated" : 0,            "created" : 254006,            "deleted" : 0,            "batches" : 743,            "version_conflicts" : 3455994,            "noops" : 0,            "retries" : {              "bulk" : 0,              "search" : 0            },            "throttled_millis" : 0,            "requests_per_second" : -1.0,            "throttled_until_millis" : 0          },          "description" : "reindex from [blog] to [blog_lastest][_doc]",          "start_time_in_millis" : 1609338953464,          "running_time_in_nanos" : 1276738396689,          "cancellable" : true,          "headers" : { }        }      }    }  }}

留神察看外面的几个重要指标,例如从 description 中能够看到工作形容,从 tasks 中能够找到工作的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,从 cancellable 能够判断工作是否反对勾销操作。
这个 API 其实就是含糊匹配,同理也能够查问其它类型的工作信息,例如应用 GET _tasks?detailed=true&actions=*byquery 查看查问申请的状态。
当集群的工作太多时咱们就能够依据task_id,也就是下面提到GET /_tasks/task_id 形式更加精确地查问指定工作的状态,防止集群的工作过多,不不便查看。
如果遇到操作失误的场景,想勾销工作,有没有方法呢?
当然有啦,尽管覆水难收,通过调用
_tasks API

POST _tasks/task_id/_cancel

这里的 task_id 就是通过下面的查问工作接口获取的工作id(工作要反对勾销操作,即【cancellable 为 true】时方能收效)。

删除旧索引

当咱们通过 API 查问发现工作实现后,就能够进行后续操作,我这里是要删除旧索引,而后再给新索引起别名,用于替换旧索引,这样能力保障对外服务没有任何感知。

DELETE /blog

应用别名

POST /_aliases{    "actions":[        {            "add":{                "index":"blog_lastest",                "alias":"blog"            }        }    ]}

通过别名拜访新索引

进行过以上操作后,咱们能够应用一个简略的搜寻验证服务。

POST /blog/_search{    "query": {        "match": {            "author": "james"        }    }}

如果搜寻后果达到咱们的预期指标,至此,数据索引重建迁徙实现。

本文可转载,但需申明原文出处。 程序员小明,一个很少加班的程序员。欢送关注微信公众号“程序员小明”,获取更多优质文章。