第20篇不和谐如何索引数十亿条消息

41次阅读

共计 6982 个字符,预计需要花费 18 分钟才能阅读完成。

我的 Elasticsearch 系列文章,逐渐更新中,欢迎关注
0A. 关于 Elasticsearch 及实例应用
00.Solr 与 ElasticSearch 对比
01.ElasticSearch 能做什么?
02.Elastic Stack 功能介绍
03. 如何安装与设置 Elasticsearch API
04. 如果通过 elasticsearch 的 head 插件建立索引_CRUD 操作
05.Elasticsearch 多个实例和 head plugin 使用介绍
06. 当 Elasticsearch 进行文档索引时,它是如何工作的?
07.Elasticsearch 中的映射方式—简洁版教程
08.Elasticsearch 中的分析和分析器应用方式
09.Elasticsearch 中构建自定义分析器
10.Kibana 科普 - 作为 Elasticsearhc 开发工具
11.Elasticsearch 查询方法
12.Elasticsearch 全文查询
13.Elasticsearch 查询 - 术语级查询
14.Python 中的 Elasticsearch 入门
15. 使用 Django 进行 ElasticSearch 的简单方法
16. 关于 Elasticsearch 的 6 件不太明显的事情
17. 使用 Python 的初学者 Elasticsearch 教程
18. 用 ElasticSearch 索引 MongoDB, 一个简单的自动完成索引项目
19.Kibana 对 Elasticsearch 的实用介绍
20. 不和谐如何索引数十亿条消息

另外 Elasticsearch 入门,我强烈推荐 ElasticSearch 新手搭建手册给你,非常想尽的入门指南手册。

每月有数百万用户在 Discord 上发送数十亿条消息。一种搜索历史记录的方法迅速成为我们构建的最受欢迎的功能之一。让我们搜索吧!
要求
● 经济高效:Discord 的核心用户体验是我们的文本和语音聊天。搜索是一项辅助功能,而反映这一功能所需的基础架构价格。理想情况下,这意味着搜索的费用不应超过消息的实际存储量。
● 快速直观:我们构建的所有功能都必须快速直观,包括搜索。我们产品的搜索体验也需要看起来和使用起来很棒。
● 自我修复:我们还没有一支专门的 devop 小组(因此),因此搜索需要能够以最少的操作员干预或完全没有操作员的干预来容忍失败。
● 线性可扩展:就像我们存储消息的方式一样,增加搜索基础结构的容量应涉及添加更多节点。
● 懒惰地索引:并非所有人都使用搜索 - 我们不应该对消息建立索引,除非有人尝试至少搜索一次。此外,如果索引失败,我们需要能够动态地重新索引服务器。

在查看这些要求时,我们向自己提出了两个关键问题:
问:我们可以将搜索外包给托管的 SaaS 吗?(简易模式)
A. 不。我们研究过的每一项解决方案都进行了托管搜索,这会浪费我们的预算(天文数字很高)。此外,将消息从我们的数据中心中发送出去的想法与团队并不协调。作为一个注重安全的团队,我们希望控制用户消息的安全性,而不是让第三方知道他们在做什么。
问:是否存在可以使用的开源搜索解决方案?
答:是的!我们环顾四周,内部很快就开始讨论 Elasticsearch vs Solr,因为两者都适合我们的用例。Elasticsearch 具有优势:
● Solr 上的节点发现需要 ZooKeeper。我们运行 etcd,并且不想拥有专门用于 Solr 的其他基础结构。Elasticsearch 的 Zen Discovery 自成一体。
● Elasticsearch 支持自动分片重新平衡,这将使我们能够向集群添加新节点,从而满足开箱即用的线性可扩展性要求。
● Elasticsearch 具有内置的结构化查询 DSL,而您必须使用第三方库以 Solr 编程方式创建查询字符串。
● 团队的工程师拥有更多与 Elasticsearch 合作的经验
Elasticsearch 可以工作吗?
Elasticsearch 似乎具备了我们想要的一切,并且我们的工程师在过去曾有过使用它的经验。它提供了一种跨不同节点复制数据的方法,以容忍单个节点的故障,通过添加更多节点来扩展群集,并可以吸收要索引的消息而不会费劲。到处阅读,我们听到了一些有关管理大型 Elasticsearch 集群的恐怖故事,实际上,除了日志记录基础架构之外,我们的后端团队都没有任何管理 Elasticsearch 集群的经验。
我们想避免这些繁琐的大型集群,因此我们想到了将分片和路由委托给应用程序层的想法,使我们可以将消息索引到较小的 Elasticsearch 集群池中。这意味着在群集中断的情况下,仅受影响的群集上包含的 Discord 消息将不可搜索。这还为我们提供了以下优势:如果无法恢复整个群集的数据,则可以丢弃整个群集的数据(系统可以在用户下次执行搜索时懒惰地重新索引 Discord 服务器)。
组成部分
当文档被大量索引时,Elasticsearch 喜欢它。这意味着我们无法为实时发布的消息编制索引。取而代之的是,我们设计了一个队列,其中工作人员在单个批量操作中抓取一堆消息并将它们编入索引。我们认为,从发布消息到可搜索消息之间的微小延迟是一个完全合理的约束。毕竟,大多数用户搜索的都是历史记录而不是刚才所说的消息。

在摄取方面,我们需要一些注意事项:
● 消息队列:我们需要一个队列,我们​​可以在消息实时发布时将其放入(供工作人员使用)。
● 索引工作人员:执行实际路由和批量插入的工作人员从队列插入 Elasticsearch。
我们已经在 Celery 之上构建了一个任务排队系统,因此我们也将其用于历史索引工作者。
● 历史索引工作人员:负责在给定服务器中遍历消息历史并将其插入到 Elasticsearch 索引中的工作人员。
我们还需要快速,轻松地映射 Discord 服务器的消息将驻留在哪个 Elasticsearch 集群上并建立索引。我们将此“群集 + 索引”对称为碎片(不要与索引中的 Elasticsearch 的本地碎片混淆)。我们创建的映射分为两层:
● 持久性碎片映射:我们将其放在 Cassandra 上,这是持久性数据的主要数据存储,是事实的来源。
● 分片映射缓存:当我们在工作人员上接收消息时,向 Cassandra 查询分片是一个很慢的操作。我们将这些映射缓存在 Redis 中,以便我们可以执行 mget 操作来快速确定需要将消息路由到的位置。

首次为服务器建立索引时,我们还需要一种方法来选择用于保留 Discord 服务器消息的碎片。由于分片是应用程序分层的抽象,因此我们可以对如何分配它们有所了解。通过利用 Redis 的功能,我们使用了排序集来构建负载感知的分片分配器。
● 分片分配器:在 Redis 中使用排序集,我们保留了一组分片,其得分代表其负荷。得分最低的分片是接下来应该分配的分片。分数随着每次新分配而增加,并且在 Elasticsearch 中索引的每条消息也都有可能增加其 Shard 的分数。随着分片中获得更多数据,它们被分配给新 Discord 服务器的可能性就较小。
当然,如果没有从应用程序层发现集群及其中的主机的方法,那么整个搜索基础架构将是不完整的。
● etcd:我们在系统的其他部分中使用 etcd 进行服务发现,因此我们也将其用于 Elasticsearch 集群。由于集群中的节点可以将自己声明到 etcd 上,以供系统其余部分查看,因此我们不必对任何 Elasticsearch 拓扑进行硬编码。
最后,我们需要一种让客户能够实际搜索事物的方法。
● 搜索 API:客户端可以向其发出搜索查询的 API 端点。它需要进行所有权限检查,以确保客户端仅搜索他们实际有权访问的消息。

索引和映射数据
在非常高的层次上,在 Elasticsearch 中,我们有一个“索引”的概念,其中包含许多“碎片”。在这种情况下,分片实际上是 Lucene 索引。Elasticsearch 负责将索引内的数据分发到属于该索引的分片。如果需要,可以使用“路由键”控制数据在分片之间的分配方式。索引也可以包含“复制因子”,即索引(及其中的分片)应复制到的节点数。如果索引所在的节点发生故障,则副本可以接管(不相关但相关,这些副本也可以用于搜索查询,因此您可以通过添加更多副本来扩展索引的搜索吞吐量)。
由于我们在应用程序级别(我们的分片)中处理了所有分片逻辑,因此让 Elasticsearch 为我们进行分片实际上没有任何意义。但是,我们可以使用它在集群中的节点之间进行索引的复制和平衡。为了让 Elasticsearch 使用正确的配置自动创建索引,我们使用了索引模板,其中包含索引配置和数据映射。索引配置非常简单:
● 索引只能包含一个分片(不要为我们做任何分片)
● 索引应复制到一个节点(能够容忍索引所在的主节点的故障)
● 索引每 60 分钟应刷新一次(为什么要这样做,下面将进行说明)。

索引包含一个文档类型:
message

将原始消息数据存储在 Elasticsearch 中几乎没有意义,因为数据的格式不是易于搜索的格式。相反,我们决定采用每条消息,并将其转换为一堆字段,其中包含有关消息的元数据,我们可以对其进行索引和搜索:

您会注意到,我们没有在这些字段中包含时间戳,并且如果您从我们以前的博客文章中回忆起,我们的 ID 是 Snowflakes,这意味着它们固有地包含时间戳(我们可以在之前,之后和之后使用它来加电)使用最小和最大 ID 范围进行查询)。

但是,这些字段实际上并没有“存储”在 Elasticsearch 中,而是仅存储在反向索引中。实际存储和返回的唯一字段是张贴消息的消息,通道和服务器 ID。这意味着消息数据在 Elasticsearch 中不会重复。折衷是,我们必须在返回搜索结果时从 Cassandra 获取消息,这是完全可以的,因为我们必须从 Cassandra 中提取消息上下文(前后 2 条消息)以始终为 UI 供电。将实际的消息对象保留在 Elasticsearch 之外意味着我们不必为存储它而额外的磁盘空间。但是,这意味着我们无法使用 Elasticsearch 突出显示搜索结果中的匹配项。我们必须将标记生成器和语言分析器内置到我们的客户端中以进行突出显示(这确实很容易做到)。

实际编码
我们认为可能不需要微服务来搜索,而是向 Elasticsearch 公开了一个封装了路由和查询逻辑的库。我们唯一需要运行的附加服务是索引工作程序(它将使用此库来执行实际的索引工作)。暴露给团队其他成员的 API 表面积也很小,因此,如果确实需要将其转移到它自己的服务中,则可以轻松地将其包装在 RPC 层中。该库也可以由我们的 API 工作者导入,以实际执行搜索查询并通过 HTTP 将结果返回给用户。

对于团队的其他成员,该库暴露了用于搜索消息的最小表面积:
排队要编制索引或删除的消息:

批量索引工作人员中的实时消息(大致):

为了对服务器的历史消息建立索引,一个历史索引作业将执行一个工作单元,并返回继续运行该服务器所需的下一个作业。每个作业代表进入服务器消息历史记录和固定执行单位的光标(在这种情况下,默认值为 500 条消息)。作业将新游标返回到要索引的下一批消息,如果没有更多工作要做,则返回“无”。为了快速返回大型服务器的结果,我们将历史索引分为两个阶段,即“初始”阶段和“深度”阶段。“初始”阶段为服务器上最近 7 天的邮件编制索引,并使索引可供用户使用。之后,我们在“深层”阶段对整个历史进行索引,该阶段以较低的优先级执行。本文显示给用户的外观。这些作业在一组芹菜工作者中执行,从而可以在这些工作者执行的其他任务中安排这些工作。大致如下所示:

在生产中进行测试

在对此进行编码并在我们的开发环境上对其进行测试之后,我们决定是时候看看它在生产中的性能了。我们创建了一个包含 3 个节点的单个 Elasticsearch 集群,配置了索引工作器,并计划对 1,000 个最大的 Discord 服务器进行索引。一切似乎都正常,但是在查看集群中的指标时,我们注意到了两件事:

  1. CPU 使用率高于预期。
  2. 磁盘使用率增长得太快了,无法索引大量消息。

我们很困惑,在让它运行了一段时间并用完了太多的磁盘空间之后,我们取消了索引作业,并将其命名为通宵。不太正确。
第二天早上回来时,我们注意到磁盘使用量减少了很多。Elasticsearch 是否丢弃了我们的数据?我们尝试在我们索引其中一台服务器所在的一台服务器上发出搜索查询。结果返回的很好 - 而且速度也很快!是什么赋予了?

磁盘使用率快速增长然后逐渐减少

CPU 使用率
经过研究后,我们提出了一个假设!默认情况下,Elasticsearch 的索引刷新间隔设置为 1 秒。这就是在 Elasticsearch 中提供“近实时”搜索功能的原因。每隔一千个索引(跨越一千个索引),Elasticsearch 会将内存缓冲区刷新到 Lucene 段,并打开该段使其可搜索。一整夜,Elasticsearch 在空闲时将其生成的大量细小段合并为磁盘上更大(但更节省空间)的段。
测试这一点非常简单:我们将所有索引都放在了集群上,将刷新间隔设置为任意大的数字,然后我们计划对同一服务器进行索引。提取文档时,CPU 使用率几乎降为零,并且磁盘使用率没有以惊人的速度增长。晕!
减少刷新间隔后的磁盘使用率

CPU 使用率

但是,不幸的是,实际上,关闭刷新间隔是无效的……
刷新困境
显而易见,Elasticsearch 的自动近实时索引可用性无法满足我们的需求。可能服务器无需执行单个搜索查询就可以运行数小时。我们需要建立一种方法来控制应用程序层的刷新。我们通过 Redis 中过期的 hashmap 做到了这一点。假设 Discord 上的服务器已在 Elasticsearch 上共享为共享索引,我们可以构建一个快速映射,该索引随索引一起更新,跟踪是否需要刷新索引(给定要搜索的服务器)。数据结构很简单:存储哈希图的 Redis 密钥
prefix + shard_key 到标记 guild_id

值的哈希图,表示需要刷新。回想起来,这可能是一个集合。
因此,索引生命周期变为:
从队列中提取 N 条消息。
找出这些消息应由其路由到何处 guild_id
对相关集群执行批量插入操作。
更新 Redis 映射,表示该碎片和该碎片中的给定 guild_id

s 现在已变脏。1 小时后使该密钥过期(因为此时 Elasticsearch 会自动刷新)。
搜索生命周期变成:
如果脏了,请刷新碎片的 Elasticsearch 索引,并将整个碎片标记为干净。
执行搜索查询并返回结果。
您可能已经注意到,即使我们现在已经在 Elasticsearch 上显式控制了刷新逻辑,我们仍然让它每小时自动刷新基础索引。如果在我们的 Redis 映射上发生数据丢失,则系统最多需要一个小时才能自动更正自身。
未来
自 1 月份部署以来,我们的 Elasticsearch 基础架构已扩展到 2 个集群中的 14 个节点,使用 GCP 上的 n1-standard- 8 实例类型,每个实例类型具有 1TB 的 Provisioned SSD。文件总数约为 260 亿。索引速率达到峰值,约为每秒 30,000 条消息。Elasticsearch 毫不费力地处理了它 - 在我们推出搜索的整个过程中,CPU 保持在 5 -15%。

到目前为止,我们已经能够轻松地向集群添加更多节点。在某个时候,我们将启动更多集群,以便新的 Discord 服务器被索引到它们上(这要归功于我们的加权分片分发系统)。在我们现有的集群上,随着向集群中添加更多数据节点,我们将需要限制主合格节点的数量。
我们还偶然发现了 4 个主要指标,用于确定何时需要增长集群:

  1. heap_free:(又名 heap_committed — heap_used)当我们用完了可用的堆空间时,JVM 被迫执行一个完整的世界各地的 GC 来快速回收空间。如果无法回收足够的空间,则该节点将崩溃并燃烧。在此之前,JVM 将进入一种状态,在这种状态下,随着堆已满,并且在每个完整的 GC 期间释放的内存太少,JVM 会不断地执行世界范围内的 GC。我们将其与 GC 统计信息一起查看,以了解垃圾回收花费了多少时间。
  2. disk_free:显然,当我们用完磁盘空间时,我们需要添加更多节点或更多磁盘空间来处理被索引的新文档。在 GCP 上,这非常容易,因为我们可以增加磁盘的大小而无需重新启动实例。选择添加新节点还是调整磁盘大小取决于此处提到的其他指标的外观。例如,如果磁盘使用率很高,但其他指标处于可接受的水平,则我们将选择添加更多的磁盘空间而不是新节点。
  3. cpu_usage:如果我们在高峰时段达到 CPU 使用量的阈值。
  4. io_wait:如果集群上的 IO 操作变得太慢。

不健康的群集(堆满)
无堆(MiB)
耗用时间 GC / s
健康集群
无堆(GiB)
耗用时间 GC / s

结论
自我们启动搜索功能以来,距离现在已经有三个多月了,到目前为止,该系统几乎没有遇到任何问题。

Elasticsearch 在大约 16,000 个索引和数百万个 Discord 服务器中显示了从 0 到 260 亿个文档的稳定一致的性能。我们将继续通过向现有集群添加更多集群或更多节点来扩展规模。在某个时候,我们可能会考虑编写代码,使我们能够在群集之间迁移索引,从而减轻群集负载,或者如果 Discord 服务器是特别健谈的服务器,则可以为 Discord 服务器提供自己的索引(尽管我们的加权分片系统做得很好确保大型 Discord 服务器当前通常拥有自己的碎片)。

正文完
 0