乐趣区

关于大数据:ElasticSearch进阶篇一版本控制

一、前言

ElasticSearch(以下简称 ES)的数据写入反对高并发,高并发就会带来很广泛的数据一致性问题。常见的解决办法就是加锁。同样,ES 为了保障高并发写的数据一致性问题,退出了相似于锁的实现办法 – 版本控制。锁从其中的一个角度可分为乐观锁和乐观锁。

对于同一个数据的并发操作,乐观锁认为本人在应用数据的时候肯定会有别的线程过去批改数据,因而在获取数据的时候会先加锁,确保数据不会被别的线程批改。而乐观锁则认为本人在应用数据时不会有别的线程来批改数据,所以不会增加锁,只是在更新或者提交数据的时候去判断之前有没有别的线程更新了这个数据。那么 ES 属于那种锁呢?上面大狮兄就和大家一起探讨官网的具体做法来答复这个问题。

二、版本控制实现及验证

1. ES6.7 Before

# 新建测试索引
PUT test
{
  "settings" : {
    "number_of_shards" : "3",
    "number_of_replicas" : "0"
  }
}

## 插入文档
PUT test/_doc/1 
{"user": "zhangsan", "age": 12}

## 响应后果
{
  "_index" : "test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

更新文档(version 版本大于已写入文档版本),更新年龄为 10,版本号为 200

## 更新文档
PUT test/_doc/1?version=200&version_type=external
{"user": "zhangsan", "age": 10}

## 返回后果
{
  "_index" : "test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 200,
  "result" : "updated",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

## 查问文档
GET test/_doc/1
## 返回后果
{
  "_index" : "test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 200,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "user" : "zhangsan",
    "age" : 10
  }
}

更新胜利,年龄更新为 10 且版本号更新为 200

更新文档(version 版本小于或等于已写入文档版本),更新年龄为 22,版本号为 180

## 更新文档
PUT test/_doc/1?version=180&version_type=external
{"user": "zhangsan", "age": 22}

## 返回后果
{
  "error" : {
    "root_cause" : [
      {
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, current version [200] is higher or equal to the one provided [180]",
        "index_uuid" : "fCv7Q1dkTl6e9E1Z0dNE1g",
        "shard" : "2",
        "index" : "test"
      }
    ],
    "type" : "version_conflict_engine_exception",
    "reason" : "[1]: version conflict, current version [200] is higher or equal to the one provided [180]",
    "index_uuid" : "fCv7Q1dkTl6e9E1Z0dNE1g",
    "shard" : "2",
    "index" : "test"
  },
  "status" : 409
}

## 查问文档
GET test/_doc/1

## 返回后果
{
  "_index" : "test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 200,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "user" : "zhangsan",
    "age" : 10
  }
}

更新失败,数据没有变动,提醒版本抵触,现有的版本号大于要插入的版本号。

  • vertion_type=external 或者 vertion_type=external_gt:指标版本号大于已有的版本号才会更新胜利。
  • vertion_type=external_gte:指标版本号大于或等于已有的版本号才会更新胜利。

2. ES6.7 OR Later

# 新建测试索引
PUT testccc
{
  "settings" : {
    "number_of_shards" : "1",
    "number_of_replicas" : "0"
  }
}


## 插入文档
PUT testccc/_doc/1 
{"user": "lisi", "age": 12}

## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

返回后果留神最初的两个字段,_seq_no 示意序列号是自增的,_primary_term 表是文档位于哪个 shard。

更新数据(seq_no 大于已写入文档序列号),更新年龄为 10,序列号为 20

## 更新文档
PUT testccc/_doc/1?if_seq_no=20&if_primary_term=1
{"user": "lisi", "age": 10}

## 返回后果
{
  "error" : {
    "root_cause" : [
      {
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, required seqNo [20], primary term [1]. current document has seqNo [0] and primary term [1]",
        "index_uuid" : "N6LzBNj9S5yqVWFubt3x4Q",
        "shard" : "0",
        "index" : "testccc"
      }
    ],
    "type" : "version_conflict_engine_exception",
    "reason" : "[1]: version conflict, required seqNo [20], primary term [1]. current document has seqNo [0] and primary term [1]",
    "index_uuid" : "N6LzBNj9S5yqVWFubt3x4Q",
    "shard" : "0",
    "index" : "testccc"
  },
  "status" : 409
}


## 查问文档
GET testccc/_doc/1 

## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "user" : "lisi",
    "age" : 12
  }
}

更新失败,数据无变动,提醒版本抵触,最近文档的序列号为 0,要更新的序列号为 20。

更新数据(seq_no 等于已写入文档序列号),更新年龄为 10

## 更新文档
PUT testccc/_doc/1?if_seq_no=0&if_primary_term=1
{"user": "lisi", "age": 10}

## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

## 查问文档
GET testccc/_doc/1 
## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "user" : "lisi",
    "age" : 10
  }
}

更新胜利,且 seq_no 自增为 1。

## 插入新文档
PUT testccc/_doc/2
{"user": "wangwu", "age": 40}

## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

## 更新原文档
PUT testccc/_doc/1?if_seq_no=1&if_primary_term=1
{"user": "lisi", "age": 50}

## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

## 更新新写入文档
PUT testccc/_doc/2?if_seq_no=2&if_primary_term=1
{"user": "wangwu", "age": 80}

## 返回后果
{
  "_index" : "testccc",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 4,
  "_primary_term" : 1
}

能够察看到对于不同的文档,seq_no 总是自增 1 的。

三、总结

  1. ES 版本控制相似于 Java 中的乐观锁,尤其对版本号字段的奇妙应用与解决乐观锁 ABA 问题的 CAS 算法有殊途同归之妙。
  2. ES6.7 之后增加的 if_seq_no 与 if_primary_term 版本控制是针对于整个索引的,而_version 和 version_type 版本控制是针对于单条记录(即单个文档)的,不同的利用场景可应用不同的版本控制策略。
  3. if_seq_no 配置的值必须等于存在于现有文档中能力更新胜利,而_version 配置的值依据不同的 version_type,必须大于或者大于等于文档最近更改过的_version 值能力更新胜利。
退出移动版