关于golang:goElasticSearch实战篇二

36次阅读

共计 10319 个字符,预计需要花费 26 分钟才能阅读完成。

前言

哈喽,everybody,这是 go-elastic 学习系列教程第二篇文章。上一篇咱们学习了 ElasticSearch 根底,如果还不懂根底的,能够先看一看上一篇文章,传送门。这一篇咱们开始实战,写了一个小 demo,带你们轻松入门ElasticSearch 实战开发,再也不必放心 es 局部的需要开发了。代码已上传 github, 可自行下载学习。如果能给一个小星星就好啦。好啦,废话不多说,间接开始吧。

github 地址:https://github.com/asong2020/…

背景

在开始之前,我先来介绍一下我这个样例的性能:

  • 增加用户信息
  • 更新用户信息
  • 删除用户信息
  • 依据电话查问指定用户
  • 依据昵称、身份、籍贯查问相干用户(查找类似昵称的用户列表、身份雷同的用户列表、同城的用户列表)

1. 创立客户端

在进行开发之前,咱们须要下载一个 Es 依赖库。

$  go get -u github.com/olivere/elastic/v7

下载好了依赖库,上面咱们开始编写代码,首先咱们须要创立一个client,用于操作ES,先看代码,而后在进行解说:

func NewEsClient(conf *config.ServerConfig) *elastic.Client {url := fmt.Sprintf("http://%s:%d", conf.Elastic.Host, conf.Elastic.Port)
    client, err := elastic.NewClient(
        //elastic 服务地址
        elastic.SetURL(url),
        // 设置谬误日志输入
        elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC", log.LstdFlags)),
        // 设置 info 日志输入
        elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))
    if err != nil {log.Fatalln("Failed to create elastic client")
    }
    return client
}

这里创立 client 是应用的 NewClient 这个办法进行实现的,在创立时,能够提供 ES 连贯参数。下面列举的不全,上面给大家介绍一下。

  • elastic.SetURL(url)用来设置 ES 服务地址,如果是本地,就是127.0.0.1:9200。反对多个地址,用逗号分隔即可。
  • elastic.SetBasicAuth("user", "secret")这个是基于 http base auth 验证机制的账号密码。
  • elastic.SetGzip(true)启动 gzip 压缩
  • elastic.SetHealthcheckInterval(10*time.Second)用来设置监控查看工夫距离
  • elastic.SetMaxRetries(5)设置申请失败最大重试次数,v7 版本当前已被弃用
  • elastic.SetSniff(false)容许指定弹性是否应该定期检查集群(默认为 true)
  • elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC", log.LstdFlags)) 设置谬误日志输入
  • elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)) 设置 info 日志输入

这面这些参数依据本人的应用进行抉择。

2. 创立 index 及 mapping

上一步,咱们创立了 client,接下来咱们就要创立对应的索引以及mapping。依据开始介绍的性能,咱们来设计咱们的mapping 构造:

mappingTpl = `{
    "mappings":{
        "properties":{"id":                 { "type": "long"},
            "username":         {"type": "keyword"},
            "nickname":            {"type": "text"},
            "phone":            {"type": "keyword"},
            "age":                {"type": "long"},
            "ancestral":        {"type": "text"},
            "identity":         {"type": "text"},
            "update_time":        {"type": "long"},
            "create_time":        {"type": "long"}
            }
        }
    }`

索引设计为:index =asong_golang_dream

设计好了 index 及 mapping 后,咱们开始编写代码进行创立:

func NewUserES(client *elastic.Client) *UserES {index := fmt.Sprintf("%s_%s", author, project)
    userEs := &UserES{
        client:  client,
        index:   index,
        mapping: mappingTpl,
    }

    userEs.init()

    return userEs
}

func (es *UserES) init() {ctx := context.Background()

    exists, err := es.client.IndexExists(es.index).Do(ctx)
    if err != nil {fmt.Printf("userEs init exist failed err is %s\n", err)
        return
    }

    if !exists {_, err := es.client.CreateIndex(es.index).Body(es.mapping).Do(ctx)
        if err != nil {fmt.Printf("userEs init failed err is %s\n", err)
            return
        }
    }
}

这里咱们首先判断 es 中是否曾经存在要创立的索引,不存在,调用 CreateIndex 进行创立。

3. 批量增加

实现所有筹备工作,咱们接下来就该进行数据的增删改查了。目前该索引下是没有数据,咱们先来学习批量增加,增加一些数据,不便前面的应用。

这里批量增加应用的是 bulkAPI,bulkAPI 容许在单个步骤中进行屡次createindexupdatedelete 申请。如果你须要索引一个数据流比方日志事件,它能够排队和索引数百或数千批次。bulk 与其余的申请体格式稍有不同,如下所示:

{action: { metadata}}\n
{request body}\n
{action: { metadata}}\n
{request body}\n
...

这种格局相似一个无效的单行 JSON 文档 ,它通过换行符(\n) 连贯到一起。留神两个要点:

  • 每行肯定要以换行符 (\n) 结尾,包含最初一行。这些换行符被用作一个标记,能够无效分隔行。
  • 这些行不能蕴含未本义的换行符,因为他们将会对解析造成烦扰。这意味着这个 JSON 能应用 pretty 参数打印。

action/metadata 行指定 哪一个文档 什么操作

action 必须是以下选项之一:

create:如果文档不存在,那么就创立它。

index:创立一个新文档或者替换一个现有的文档。

update:局部更新一个文档

delete:删除一个文档

这里我应用的是index,代码实现如下:

func (es *UserES) BatchAdd(ctx context.Context, user []*model.UserEs) error {
    var err error
    for i := 0; i < esRetryLimit; i++ {if err = es.batchAdd(ctx, user); err != nil {fmt.Println("batch add failed", err)
            continue
        }
        return err
    }
    return err
}

func (es *UserES) batchAdd(ctx context.Context, user []*model.UserEs) error {req := es.client.Bulk().Index(es.index)
    for _, u := range user {u.UpdateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
        u.CreateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
        doc := elastic.NewBulkIndexRequest().Id(strconv.FormatUint(u.ID, 10)).Doc(u)
        req.Add(doc)
    }
    if req.NumberOfActions() < 0 {return nil}
    if _, err := req.Do(ctx); err != nil {return err}
    return nil
}

写好了代码,接下来咱们就来测试一下,这个程序应用的 gin 框架,APIhttp://localhost:8080/api/user/create,运行代码,发送一个申请,测试一下:

$ curl --location --request POST 'http://localhost:8080/api/user/create' \
--header 'Content-Type: application/json' \
--data-raw '{"id": 6,"username":"asong6","nickname":"Golang 梦工厂 ","phone":"17897875432","age": 20,"ancestral":" 吉林省深圳市 ","identity":" 工人 "}'

返回后果:

{
    "code": 0,
    "msg": "success"
}

留神:这里有一个点须要说一下,这里我加了一个 for 循环是为了做重试机制的,重试机会为 3 次,超过则返回。

为了确保咱们插入胜利,能够验证一下,发送如下申请:

$ curl --location --request GET 'http://localhost:9200/asong_golang_dream/_search'

4. 批量更新

下面介绍了 bulkAPI,批量更新仍然也是采纳的这个办法,action 选项为update。实现代码如下:

func (es *UserES) BatchUpdate(ctx context.Context, user []*model.UserEs) error {
    var err error
    for i := 0; i < esRetryLimit; i++ {if err = es.batchUpdate(ctx, user); err != nil {continue}
        return err
    }
    return err
}

func (es *UserES) batchUpdate(ctx context.Context, user []*model.UserEs) error {req := es.client.Bulk().Index(es.index)
    for _, u := range user {u.UpdateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
        doc := elastic.NewBulkUpdateRequest().Id(strconv.FormatUint(u.ID, 10)).Doc(u)
        req.Add(doc)
    }

    if req.NumberOfActions() < 0 {return nil}
    if _, err := req.Do(ctx); err != nil {return err}
    return nil
}

验证一下:

$ curl --location --request PUT 'http://localhost:8080/api/user/update' \
--header 'Content-Type: application/json' \
--data-raw '{"id": 1,"username":"asong","nickname":"Golang 梦工厂 ","phone":"17888889999","age": 21,"ancestral":" 吉林省 ","identity":" 工人 "}'

后果:

{
    "code": 0,
    "msg": "success"
}

5. 批量删除

批量删除也是采纳的 bulkAPI,即action 选项为delete。代码实现如下:

func (es *UserES) BatchDel(ctx context.Context, user []*model.UserEs) error {
    var err error
    for i := 0; i < esRetryLimit; i++ {if err = es.batchDel(ctx, user); err != nil {continue}
        return err
    }
    return err
}

func (es *UserES) batchDel(ctx context.Context, user []*model.UserEs) error {req := es.client.Bulk().Index(es.index)
    for _, u := range user {doc := elastic.NewBulkDeleteRequest().Id(strconv.FormatUint(u.ID, 10))
        req.Add(doc)
    }

    if req.NumberOfActions() < 0 {return nil}

    if _, err := req.Do(ctx); err != nil {return err}
    return nil
}

测试一下:

curl --location --request DELETE 'http://localhost:8080/api/user/delete' \
--header 'Content-Type: application/json' \
--data-raw '{"id": 1,"username":"asong","nickname":"Golang 梦工厂 ","phone":"17888889999","age": 21,"ancestral":" 吉林省 ","identity":" 工人 "}'

6. 查问

有了数据,咱们依据条件查问咱们想要的数据了。这里我应用的是 bool 组合查问,这个查问语法,我在之前的文章也解说过,不懂得能够先看一下这一篇文章:

https://mp.weixin.qq.com/s/mV…。

咱们先看代码吧:

func (r *SearchRequest) ToFilter() *EsSearch {
    var search EsSearch
    if len(r.Nickname) != 0 {search.ShouldQuery = append(search.ShouldQuery, elastic.NewMatchQuery("nickname", r.Nickname))
    }
    if len(r.Phone) != 0 {search.ShouldQuery = append(search.ShouldQuery, elastic.NewTermsQuery("phone", r.Phone))
    }
    if len(r.Ancestral) != 0 {search.ShouldQuery = append(search.ShouldQuery, elastic.NewMatchQuery("ancestral", r.Ancestral))
    }
    if len(r.Identity) != 0 {search.ShouldQuery = append(search.ShouldQuery, elastic.NewMatchQuery("identity", r.Identity))
    }

    if search.Sorters == nil {search.Sorters = append(search.Sorters, elastic.NewFieldSort("create_time").Desc())
    }

    search.From = (r.Num - 1) * r.Size
    search.Size = r.Size
    return &search
}

func (es *UserES) Search(ctx context.Context, filter *model.EsSearch) ([]*model.UserEs, error) {boolQuery := elastic.NewBoolQuery()
    boolQuery.Must(filter.MustQuery...)
    boolQuery.MustNot(filter.MustNotQuery...)
    boolQuery.Should(filter.ShouldQuery...)
    boolQuery.Filter(filter.Filters...)

    // 当 should 不为空时,保障至多匹配 should 中的一项
    if len(filter.MustQuery) == 0 && len(filter.MustNotQuery) == 0 && len(filter.ShouldQuery) > 0 {boolQuery.MinimumShouldMatch("1")
    }

    service := es.client.Search().Index(es.index).Query(boolQuery).SortBy(filter.Sorters...).From(filter.From).Size(filter.Size)
    resp, err := service.Do(ctx)
    if err != nil {return nil, err}

    if resp.TotalHits() == 0 {return nil, nil}
    userES := make([]*model.UserEs, 0)
    for _, e := range resp.Each(reflect.TypeOf(&model.UserEs{})) {us := e.(*model.UserEs)
        userES = append(userES, us)
    }
    return userES, nil
}

咱们查问之前进行了条件绑定,这个条件通过 API 进行设定的,依据条件绑定不同 queryphone 是具备唯一性的,所以咱们能够采纳准确查问,也就是应用 NewTermsQuery 进行绑定。NicknameIdentityAncestral这些都属于含糊查问,所以咱们能够应用匹配查问,用 NewMatchQuery 进行绑定·。查问的数据咱们在依据创立工夫进行排序。工夫由近到远进行排序。

代码量不是很多,看一篇就能懂了,我接下来测试一下:

$ curl --location --request POST 'http://localhost:8080/api/user/search' \
--header 'Content-Type: application/json' \
--data-raw '{"nickname":"",
    "phone": "","identity":"",
    "ancestral": "吉林省",
    "num": 1,
    "size":10
}'

这里进行阐明一下,应用 json 来抉择不同的条件,须要那个条件就填写 json 就好了。这个测试的查问条件就是查找出籍贯是 吉林省 的用户列表,通过 numsize 限度查问数据量,即第一页,数据量为 10。

验证后果:

{
    "code": 0,
    "data": [
        {
            "id": 6,
            "username": "asong6",
            "nickname": "Golang 梦工厂",
            "phone": "17897875432",
            "age": 20,
            "ancestral": "吉林省吉林市",
            "identity": "工人",
            "update_time": 1599905564941,
            "create_time": 1599905564941
        },
        {
            "id": 2,
            "username": "asong2",
            "nickname": "Golang 梦工厂",
            "phone": "17897873456",
            "age": 20,
            "ancestral": "吉林省吉林市",
            "identity": "学生",
            "update_time": 1599905468869,
            "create_time": 1599905468869
        },
        {
            "id": 1,
            "username": "asong1",
            "nickname": "Golang 梦工厂",
            "phone": "17897870987",
            "age": 20,
            "ancestral": "吉林省吉林市",
            "identity": "工人",
            "update_time": 1599900090160,
            "create_time": 1599900090160
        }
    ],
    "msg": "success"
}

目前我的数据量没有那么大,所以只有三条数据,你们能够本人测试一下,增加更多的数据进行测试。

6. 批量查问

在一些场景中,咱们须要通过多个 ID 批量查问文档。es中提供了一个 multiGet 进行批量查问,不过我这里实现的不是用这个办法。因为用更好的办法能够应用。multiGet批量查问的实现是跟 redispipeline是一个情理的,缓存所有申请,而后对立进行申请,所以这里只是缩小了 IO 的应用。所以咱们能够应用更好的办法,应用 search 查问,它提供了依据 id 查问的办法,这个办法是一次申请,实现所有的查问,更高效,所以举荐大家应用这个办法进行批量查问。

代码实现如下:

// 依据 id 批量获取
func (es *UserES) MGet(ctx context.Context, IDS []uint64) ([]*model.UserEs, error) {userES := make([]*model.UserEs, 0, len(IDS))
    idStr := make([]string, 0, len(IDS))
    for _, id := range IDS {idStr = append(idStr, strconv.FormatUint(id, 10))
    }
    resp, err := es.client.Search(es.index).Query(elastic.NewIdsQuery().Ids(idStr...)).Size(len(IDS)).Do(ctx)

    if err != nil {return nil, err}

    if resp.TotalHits() == 0 {return nil, nil}
    for _, e := range resp.Each(reflect.TypeOf(&model.UserEs{})) {us := e.(*model.UserEs)
        userES = append(userES, us)
    }
    return userES, nil
}

好啦,写好了代码咱们进行验证一下吧。

$ curl --location --request GET 'http://localhost:8080/api/user/info?id=1,2,3'

验证后果:

{
    "code": 0,
    "data": [
        {
            "id": 1,
            "username": "asong",
            "nickname": "Golang 梦工厂",
            "phone": "88889999",
            "age": 18,
            "ancestral": "广东省深圳市",
            "identity": "工人"
        },
        {
            "id": 2,
            "username": "asong1",
            "nickname": "Golang 梦工厂",
            "phone": "888809090",
            "age": 20,
            "ancestral": "吉林省吉林市",
            "identity": "学生"
        },
        {
            "id": 3,
            "username": "asong2",
            "nickname": "Golang 梦工厂",
            "phone": "88343409090",
            "age": 21,
            "ancestral": "吉林省吉林市",
            "identity": "学生"
        }
    ],
    "msg": "success"
}

总结

这一篇到这里就完结了。本文通过一个代码样例,学习应用 go 进行 eslatic 开发,本文没有将所有办法都讲全,只是将咱们日常应用的一些办法整理出来,供大家入门应用,也能够批改一下应用到我的项目中呦,认为我在我的项目中也是这么应用的。如果下面的代码段没有看懂,能够到我的 github 上下载源代码进行学习,运行整个我的项目,通过 api 进行测试。如果感觉有用,给个小星星呗!!!

github 地址:https://github.com/asong2020/…

结尾给大家发一个小福利吧,最近我在看 [微服务架构设计模式] 这一本书,讲的很好,本人也收集了一本 PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang 梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份 GIN 中文文档,会定期进行保护,有须要的小伙伴后盾回复 [gin] 即可下载。

我是 asong,一名普普通通的程序猿,让我一起缓缓变强吧。我本人建了一个 golang 交换群,有须要的小伙伴加我vx, 我拉你入群。欢送各位的关注,咱们下期见~~~

举荐往期文章:

  • go-ElasticSearch 入门看这一篇就够了(一)
  • 面试官:go 中 for-range 应用过吗?这几个问题你能解释一下起因吗
  • 学会 wire 依赖注入、cron 定时工作其实就这么简略!
  • 据说你还不会 jwt 和 swagger- 饭我都不吃了带着实际我的项目我就来了
  • 把握这些 Go 语言个性,你的程度将进步 N 个品位(二)
  • go 实现多人聊天室,在这里你想聊什么都能够的啦!!!
  • grpc 实际 - 学会 grpc 就是这么简略
  • go 规范库 rpc 实际
  • 2020 最新 Gin 框架中文文档 asong 又捡起来了英语,用心翻译
  • 基于 gin 的几种热加载形式
  • boss: 这小子还不会应用 validator 库进行数据校验,开了~~~

正文完
 0