关于搜索:基于Kafka和Elasticsearch构建实时站内搜索功能的实践

8次阅读

共计 7057 个字符,预计需要花费 18 分钟才能阅读完成。

作者:京东物流 纪卓志

目前咱们在构建一个多租户多产品类网站,为了让用户更好的找到他们所须要的产品,咱们须要构建站内搜索性能,并且它应该是实时更新的。本文将会探讨构建这一性能的外围基础设施,以及反对此搜寻能力的技术栈。

问题的定义与决策

为了构建一个疾速、实时的搜索引擎,咱们必须做出某些设计决策。咱们应用 MySQL 作为主数据库存储,因而有以下抉择:

  1. 间接在 MySQL 数据库中查问用户在搜寻框中输出的每个关键词,就像 %#{word1}%#{word2}%... 这样。😐
  2. 应用一个高效的搜寻数据库,如 Elasticsearch。😮

思考到咱们是一个 多租户应用程序 ,同时被搜寻的实体可能须要大量的 关联操作 (如果咱们应用的是 MySQL 一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以咱们还能够能须要同时遍历 多个数据表 来查问用户输出的关键词。所以咱们决定不应用间接在 MySQL 中查问关键词的计划。🤯

因而,咱们必须决定一种高效、牢靠的形式,将数据 实时 地从 MySQL 迁徙到 Elasticsearch 中。接下来须要做出如下的决定:

  1. 应用 Worker 定期查问 MySQL 数据库,并将所有变动的数据发送到 Elasticsearch。😶
  2. 在应用程序中应用 Elasticsearch 客户端,将数据同时写入到 MySQL 和 Elasticsearch 中。🤔
  3. 应用基于事件的流引擎,将 MySQL 数据库中的数据更改作为事件,发送到流解决服务器上,通过解决后将其转发到 Elasticsearch。🥳

选项 1 并不是实时的,所以能够间接排除,而且即便咱们缩短轮询距离,也会造成全表扫描给数据库造成查问压力。除了不是实时的之外,选项 1 无奈反对对数据的删除操作,如果对数据进行了删除,那么咱们须要额定的表记录之前存在过的数据,这样能力保障用户不会搜寻到曾经删除了的脏数据。对于其余两种抉择,不同的利用场景做出的决定可能会有所不同。在咱们的场景中,如果抉择选项 2,那么咱们能够预感一些问题:如过 Elasticsearch 建设网络连接并确认更新时速度很慢,那么这可能会升高咱们应用程序的速度;或者在写入 Elasticsearch 时产生了未知异样,咱们该如何对这一操作进行重试来保障数据完整性;不可否认开发团队中不是所有开发人员都能理解所有的性能,如果有开发人员在开发新的与产品无关的业务逻辑时没有引入 Elasticsearch 客户端,那么咱们将在 Elasticsearch 中更新这次数据的更改,无奈保障 MySQL 与 Elasticsearch 间的数据一致性。

接下来咱们该思考如何将 MySQL 数据库中的数据更改作为事件,发送到流解决服务器上。咱们能够在数据库变更后,在应用程序中应用音讯管道的客户端同步地将事件发送到音讯管道,然而这并没有解决下面提到的应用 Elasticsearch 客户端带来的问题,只不过是将危险从 Elasticsearch 转移到了音讯管道。最终咱们决定通过采集 MySQL Binlog,将 MySQL Binlog 作为事件发送到音讯管道中的形式来实现 基于事件的流引擎。对于 binlog 的内容能够点击链接,在这里不再赘述。

服务简介

为了对外提供对立的搜寻接口,咱们首先须要定义用于搜寻的数据结构。对于大部分的搜寻零碎而言,对用户展现的搜寻后果通常包含为 题目 内容 ,这部分内容咱们称之 可搜寻内容(Searchable Content)。在多租户零碎中咱们还须要在搜寻后果中标示出该搜寻后果属于哪个租户,或用来过滤以后租户下可搜寻的内容,咱们还须要额定的信息来帮忙用户筛选本人想要搜寻的产品类别,咱们将这部分通用的但不用来进行搜寻的内容称为 元数据(Metadata)。最初,在咱们展现搜寻后果时可能心愿依据不同类型的产品提供不同的展现成果,咱们须要在搜寻后果中返回这些个性化展现所须要的 原始内容(Raw Content)。到此为止咱们能够定义出了存储到 Elasticsearch 中的通用数据结构:

{
    "searchable": {
        "title": "string",
        "content": "string"
    },
    "metadata": {
        "tenant_id": "long",
        "type": "long",
        "created_at": "date",
        "created_by": "string",
        "updated_at": "date",
        "updated_by": "string"
    },
    "raw": {}}

基础设施

Apache Kafka:Apache Kafka 是开源的分布式事件流平台。咱们应用 Apache kafka 作为数据库事件(插入、批改和删除)的长久化存储。

mysql-binlog-connector-java:咱们应用 mysql-binlog-connector-java 从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中。咱们将独自启动一个服务来实现这个过程。

在接收端咱们也将独自启动一个服务来生产 Kafka 中的事件,并对数据进行解决而后发送到 Elasticsearch 中。

Q:为什么不应用 Elasticsearch connector 之类的连接器对数据进行解决并发送到 Elasticsearch 中?A:在咱们的零碎中是不容许将大文本存入到 MySQL 中的,所以咱们应用了额定的对象存储服务来寄存咱们的产品文档,所以咱们无奈间接应用连接器将数据发送到 Elasticsearch 中。Q:为什么不在发送到 Kafka 前就将数据进行解决?A:这样会有大量的数据被长久化到 Kafka 中,占用 Kafka 的磁盘空间,而这部分数据实际上也被存储到了 Elasticsearch。Q:为什么要用独自的服务来采集 binlog,而不是应用 Filebeat 之类的 agent?A:当然能够间接在 MySQL 数据库中装置 agent 来间接采集 binlog 并发送到 Kafka 中。然而在局部状况下开发者应用的是云服务商或其余基础设施部门提供的 MySQL 服务器,这种状况下咱们无奈间接进入服务器装置 agent,所以应用更加通用的、无侵入性的 C / S 构造来生产 MySQL 的 binlog。

配置技术栈

咱们应用 docker 和 docker-compose 来配置和部署服务。为了简略起见,MySQL 间接应用了 root 作为用户名和明码,Kafka 和 Elasticsearch 应用的是单节点集群,且没有设置任何鉴权形式,仅供开发环境应用,请勿间接用于生产环境。

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

在服务启动胜利后咱们须要为 Elasticsearch 创立索引,在这里咱们间接应用 curl 调用 Elasticsearch 的 RESTful API,也能够应用 busybox 根底镜像创立服务来实现这个步骤。

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '{"mappings": {"properties": {"searchable": {"type":"nested","properties": {"title": {"type":"text"},"content": {"type":"text"}
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {"type": "long"},
          "type": {"type": "integer"},
          "created_at": {"type": "date"},
          "created_by": {"type": "keyword"},
          "updated_at": {"type": "date"},
          "updated_by": {"type": "keyword"}
        }
      },
      "raw": {"type": "nested"}
    }
  }
}'

外围代码实现(SpringBoot + Kotlin)

Binlog 采集端:

    override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG)
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {val header = it.getHeader<EventHeader>()
            val data = it.getData<EventData>()
            if (header.eventType == EventType.TABLE_MAP) {tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()}
                logger.info("Mutation events: {}", events)
                for (event in events) {kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()}

在这段代码外面,咱们首先是对 binlog 客户端进行了初始化,随后开始监听 binlog 事件。binlog 事件类型有很多,大部分都是咱们不须要关怀的事件,咱们只须要关注 TABLE\_MAP 和 WRITE/UPDATE/DELETE 就能够。当咱们接管到 TABLE\_MAP 事件,咱们会对内存中的数据库表构造进行更新,在后续的 WRITE/UPDATE/DELETE 事件中,咱们会应用内存缓存的数据库构造进行映射。整个过程大略如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
    "id": 1,
    "title": "Foo",
    "content": "Bar"
}

随后咱们将收集到的事件发送到 Kafka 中,并由 Event Processor 进行生产解决。

事件处理器

@Component
class KafkaBinlogTopicListener(val binlogEventHandler: BinlogEventHandler) {

    companion object {private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

首先应用 SpringBoot Message Kafka 提供的注解对事件进行生产,接下来将事件委托到 binlogEventHandler 去进行解决。实际上 BinlogEventHandler 是个自定义的函数式接口,咱们自定义事件处理器实现该接口后通过 Spring Bean 的形式注入到 KafkaBinlogTopicListener 中。

@Component
class ElasticsearchIndexerBinlogEventHandler(val restHighLevelClient: RestHighLevelClient) : BinlogEventHandler {override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf("title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf("tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

在这里咱们只须要简略地判断是否为删除操作就能够,如果是删除操作须要在 Elasticsearch 中将数据删除,而如果是非删除操作只须要在 Elasticsearch 从新依照为文档建设索引即可。这段代码简略地应用了 Kotlin 中提供的 mapOf 办法对数据进行映射,如果须要其余简单的解决只须要依照 Java 代码的形式编写处理器即可。

总结

其实 Binlog 的解决局部有很多开源的解决引擎,包含 Alibaba Canal,本文应用手动解决的形式也是为其余应用非 MySQL 数据源的同学相似的解决方案。大家能够按需所取,就地取材,为本人的网站设计属于本人的实时站内搜索引擎!

正文完
 0