背景
家喻户晓,Elasticsearch 是⼀个实时的分布式搜索引擎,为⽤户提供搜寻服务。当咱们决定存储某种数据,在创立索引的时候就须要将数据结构,即 Mapping 确定下来,于此同时索引的设定和很多固定配置将不能扭转。
<!– more –>
那如果后续业务发生变化,须要扭转数据结构或者更换 ES 更换分词器怎么办呢?为此,Elastic 团队提供了很多通过辅助⼯具来帮忙开发⼈员进⾏重建索引的计划。
如果对 reindex
API 不相熟,那么在遇到重构的时候,必然事倍功半,效率低下。反之,就能够不便地进行索引重构,省时省力。
步骤
假如之前咱们曾经存在一个 blog 索引,因为更换分词器须要对该索引中的数据进行重建索引,以便反对业务应用新的分词规定搜寻数据,并且尽可能使这个变动对外服务没有感知,大略分为以下几个步骤:
- 新增⼀个索引
blog_lastest
,Mapping 数据结构与blog
索引统一 - 将
blog
数据同步至blog_lastest
- 删除
blog
索引 - 数据同步后给
blog_lastest
增加别名blog
新建索引
在这里举荐一个 ES 管理工具Kibana,次要针对数据的摸索、可视化和剖析。
put /blog_lastest/
{
"mappings":{
"properties":{
"title":{
"type":"text",
"analyzer":"ik_max_word"
},
"author":{
"type":"keyword",
"fields":{
"seg":{
"type":"text",
"analyzer":"ik_max_word"
}
}
}
}
}
}
将旧索引数据 copy 到新索引
同步期待
接⼝将会在 reindex 完结后返回
POST /_reindex
{
"source": {"index": "blog"},
"dest": {"index": "blog_lastest"}
}
在 kibana
中的应用如下所示
当然高版本(7.1.1)中,ES 都有提供对应的Java REST Client
,比方
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
为了避免赘述,接下来举例全副以 kibana
中申请介绍,如果有须要用Java REST Client
,能够自行去 ES 官网查看。
异步执⾏
如果 reindex 工夫过⻓,倡议加上 wait_for_completion=false
的参数条件,这样 reindex 将间接返回 taskId
。
POST /_reindex?wait_for_completion=false
{
"source": {"index": "blog"},
"dest": {"index": "blog_lastest"}
}
返回:
{"task" : "dpBihNSMQfSlboMGlTgCBA:4728038"}
op_type 参数
op_type
参数管制着写入数据的抵触解决形式,如果把 op_type
设置为 create
【默认值】,在 _reindex
API 中,示意写入时只在 dest
index
中增加不存在的 doucment,如果雷同的 document 曾经存在,则会报 version confilct
的谬误,那么索引操作就会失败。【这种形式与应用 _create API 时成果统一】
POST _reindex
{
"source": {"index": "blog"},
"dest": {
"index": "blog_lastest",
"op_type": "create"
}
}
如果这样设置了,也就不存在更新数据的场景了【抵触数据无奈写入】,咱们也能够把 op_type
设置为 index
,示意所有的数据全副从新索引创立。
conflicts 配置
默认状况下,当产生 version conflict
的时候,_reindex
会被 abort
,工作终止【此时数据还没有 reindex
实现】,在返回体中的 failures
指标中会蕴含抵触的数据【有时候数据会十分多】,除非把 conflicts
设置为 proceed
。
对于 abort
的阐明,如果产生了 abort
,曾经执行的数据【例如更新写入的】依然存在于指标索引,此时工作终止,还会有数据没有被执行,也就是漏数了。换句话说,该执行过程不会回滚,只会终止。如果设置了 proceed
,工作在检测到数据抵触的状况下,不会终止,会跳过抵触数据继续执行,直到所有数据执行实现,此时不会漏掉失常的数据,只会漏掉有抵触的数据。
POST _reindex
{
"source": {"index": "blog"},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
咱们能够成心把 op_type
设置为 create
,人为制作数据抵触的场景,测试时更容易察看到抵触景象。
如果把 conflicts
设置为 proceed
,在返回体后果中不会再呈现 failures
的信息,然而通过 version_conflicts
指标能够看到具体的数量。
批次大小配置
当你发现 reindex
的速度有些慢的时候,能够在 query
参数的同一档次【即 source
参数中】增加 size
参数,示意 scroll size
的大小【会影响批次的次数,进而影响整体的速度】,如果不显式设置,默认是一批 1000 条数据,在一开始的简略示例中也看到了。
如下,设置 scroll size
为 5000:
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog",
"size":5000
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
测试后,速度达到了 30 分钟 500 万左右,显著晋升了很多。
依据 taskId 能够实时查看工作的执行状态
一般来说,如果咱们的 source index
很大【比方几百万数据量】,则可能须要比拟长的工夫来实现 _reindex
的工作,可能须要几十分钟。而在此期间不可能始终期待后果返回,能够去做其它事件,如果中途须要查看进度,能够通过 _tasks
API 进行查看。
GET /_tasks/{taskId}
返回:
{
"completed" : false,
"task" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 4704218,
"type" : "transport",
"action" : "indices:data/write/reindex",
……
}
当执行结束时,completed
为 true
查看工作进度以及勾销工作,除了依据 taskId 查看以外,咱们还能够通过查看所有的工作中筛选本次 reindex
的工作。
GET _tasks?detailed=true&actions=*reindex
返回后果:
{
"nodes" : {
"dpBihNSMQfSlboMGlTgCBA" : {
"name" : "node-16111-9210",
"transport_address" : "192.168.XXX.XXX:9310",
"host" : "192.168.XXX.XXX",
"ip" : "192.168.16.111:9310",
"roles" : [
"ingest",
"master"
],
"attributes" : {
"xpack.installed" : "true",
"transform.node" : "false"
},
"tasks" : {
"dpBihNSMQfSlboMGlTgCBA:6629305" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 6629305,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"total" : 8361421,
"updated" : 0,
"created" : 254006,
"deleted" : 0,
"batches" : 743,
"version_conflicts" : 3455994,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0
},
"description" : "reindex from [blog] to [blog_lastest][_doc]",
"start_time_in_millis" : 1609338953464,
"running_time_in_nanos" : 1276738396689,
"cancellable" : true,
"headers" : {}}
}
}
}
}
留神察看外面的几个重要指标,例如从 description
中能够看到工作形容,从 tasks 中能够找到工作的 id
【例如 dpBihNSMQfSlboMGlTgCBA:6629305
】,从 cancellable
能够判断工作是否反对勾销操作。
这个 API 其实就是含糊匹配,同理也能够查问其它类型的工作信息,例如应用 GET _tasks?detailed=true&actions=*byquery
查看查问申请的状态。
当集群的工作太多时咱们就能够依据 task_id
,也就是下面提到GET /_tasks/task_id
形式更加精确地查问指定工作的状态,防止集群的工作过多,不不便查看。
如果遇到操作失误的场景,想勾销工作,有没有方法呢?
当然有啦,尽管覆水难收,通过调用_tasks API
:
POST _tasks/task_id/_cancel
这里的 task_id
就是通过下面的查问工作接口获取的工作 id(工作要反对勾销操作,即【cancellable 为 true】时方能收效)。
删除旧索引
当咱们通过 API 查问发现工作实现后,就能够进行后续操作,我这里是要删除旧索引,而后再给新索引起别名,用于替换旧索引,这样能力保障对外服务没有任何感知。
DELETE /blog
应用别名
POST /_aliases
{
"actions":[
{
"add":{
"index":"blog_lastest",
"alias":"blog"
}
}
]
}
通过别名拜访新索引
进行过以上操作后,咱们能够应用一个简略的搜寻验证服务。
POST /blog/_search
{
"query": {
"match": {"author": "james"}
}
}
如果搜寻后果达到咱们的预期指标,至此,数据索引重建迁徙实现。
本文可转载,但需申明原文出处。程序员小明,一个很少加班的程序员。欢送关注微信公众号“程序员小明”,获取更多优质文章。