通过矢量相似性搜寻,能够在〜50ms内响应〜640K论文上的语义搜寻查问

Arxiv.org大家肯定都不生疏,学习数据迷信的最佳办法之一是浏览Arxiv.org上的开源钻研论文。然而即便对于经验丰富的钻研人员来说,从大量的钻研论文中找出想读的内容也是十分不容易的。Connected等论文之类的工具能够提供一些帮忙,然而它们依据论文之间共享的援用和参考书目来掂量相似性的,这当然十分的好,并且也很简略,然而文档中文本的语义含意也是一个掂量类似度十分重要的特色。

在这篇文章中,咱们将手动构建一个语义相似性搜索引擎,该引擎将单个论文作为“查问”输出,并查找Top-K的最相似论文。咱们测试,在单个笔记本电脑中对Arxiv语料库中的640k计算机科学论文进行查问的的提早<50ms!如果你感兴趣,那么本文的次要内容总结如下:

  • 设置环境并从Kaggle下载ARXIV数据
  • 应用dask将数据加载到Python中
  • 应用MILVUS矢量数据库进行语义相似性搜寻

本文中应用的技术不仅仅局限在科学论文,可用作建设任何NLP语义相似性搜索引擎的模板。惟一的区别是应用的预训练模型不同。

这篇文章应用Kaggle的ARXIV数据集是在CC0:公共域许可证下公布的,所以请先浏览其应用受权的要求。

Milvus Vector是一个矢量数据库,咱们也能够应用其余矢量数据库,如果应用其余库替换的话,有许多步骤完全相同并且改变并不简单。

设置环境并从Kaggle下载ARXIV数据。

Cornel University已将整个Arxiv语料库上传到Kaggle,并依据CC0:公共畛域许可证取得许可。咱们能够应用Kaggle API间接下载数据集。

这里还要应用Conda环境,创立一个称为Semantic_sibilarity的环境。上面的步骤是创立必要的目录和Conda环境,装置所需的Python库,而后从Kaggle下载ARXIV数据集。

 # Create the necessary directories  mkdir -p semantic_similarity/notebooks semantic_similarity/data semantic_similarity/milvus  # CD into the data directory cd semantic_similarity/data  # Create and activate a conda environment conda create -n semantic_similarity python=3.9 conda activate semantic_similarity  ## Create Virtual Environment using venv if not using conda # python -m venv semantic_similarity # source semantic_similarity/bin/activate  # Pip install the necessary libraries pip install jupyterlab kaggle matplotlib scikit-learn tqdm ipywidgets  pip install "dask[complete]" sentence-transformers pip install pandas pyarrow pymilvus protobuf==3.20.0  # Download data using the kaggle API kaggle datasets download -d Cornell-University/arxiv  # Unzip the data into the local directory unzip arxiv.zip  # Delete the Zip file rm arxiv.zip

应用dask将数据加载到Python中

咱们从Kaggle下载的数据是一个3.3GB JSON文件,其中蕴含大概200万篇论文!为了无效地解决如此大的数据集,应用PANDA将整个数据集加载到内存中并不是一个好主见。为了解决这样大的数据,咱们抉择应用DASK将数据分为多个分区,并且仅将一些须要解决的分区加载到内存中。

Dask

Dask是一个开源库,能够让咱们应用相似于PANDA的API进行并行计算。通过运行“ pip install dask[complete]”在本地计算机上进行装置。装置实现后要导入必要的库。

 import dask.bag as db import json from datetime import datetime import time  data_path = '../data/arxiv-metadata-oai-snapshot.json'

咱们将应用两个无效地解决大型ARXIV JSON文件的DASK的组件。

  • Dask Bag:使咱们能够将JSON文件加载到固定大小的块中,并在每行数据上运行一些预处理性能
  • DASK DATAFRAME:将DASK Bag转换为DASK DATAFRAME,并能够用相似Pandas的API拜访

步骤1:将JSON文件加载到Dask Bag中

将JSON文件加载到一个Dask Bag中,每个块的大小为10MB。能够调整blocksize参数,管制每个块的大小。而后应用.map()函数将JSON.LOADS函数利用于Dask Bag的每一行,将JSON字符串解析为Python字典。

 # Read the file in blocks of 10MB and parse the JSON. papers_db = db.read_text(data_path, blocksize="10MB").map(json.loads)  # Print the first row papers_db.take(1)

步骤2:编写预处理辅助函数

从打印输出中能够看到每行蕴含与论文相干的几个元数据。让咱们编写三个辅助函数,能够帮忙咱们对数据集进行预处理。

v1_date():此函数是提取作者将论文的第一个版上传到arxiv的日期。咱们将将日期转换为UNIX工夫戳,并将其存储在该行中新的字段。

text_col():此函数是应用“ [sep]”令牌组合“题目”和“摘要”字段,以便咱们能够将这些文本发送到SPECTRE embedding模型中。

filters():此函数过滤合乎某些条件的行,例如计算机科学类别中各个列和论文中的最大文本长度等等。

 def v1_date(row):     """     For each row in the dask bag,      find the date of the first version of the paper      and add it to the row as a new column     Args:       row: a row of the dask bag     Returns:       A row of the dask bag with added "unix_time" column     """          versions = row["versions"]      date = None     for version in versions:         if version["version"] == "v1":             date = datetime.strptime(version["created"], "%a, %d %b %Y %H:%M:%S %Z")             date = int(time.mktime(date.timetuple()))      row["unix_time"] = date      return row   def text_col(row):     """     It takes a row of a dataframe, adds a new column called 'text'      that is the concatenation of the 'title' and 'abstract' columns     Args:       row: the row of the dataframe     Returns:       A row with the text column added.     """      row["text"] = row["title"] + "[SEP]" + row["abstract"]     return row   def filters(row):     """     For each row in the dask bag, only keep the row if it meets the filter criteria          Args:       row: the row of the dataframe     Returns:       Boolean mask     """          return ((len(row["id"])<16) and              (len(row["categories"])<200) and             (len(row["title"])<4096) and             (len(row["abstract"])<65535) and             ("cs." in row["categories"]) # Keep only CS papers            )

步骤3:在Dask Bag上运行预处理辅助函数

如下所示,咱们能够应用.map()和.filter()函数在Dask Bag的每一行上运行。因为Dask反对办法链,因而咱们能够仅保留一些必须的列,而后删除不须要的列。

 # Specify columns to keep in the final table cols_to_keep = ["id", "categories", "title", "abstract", "unix_time", "text"]  # Apply the pre-processing papers_db = (     papers_db.map(lambda row: v1_date(row))     .map(lambda row: text_col(row))     .map(         lambda row: {             key: value              for key, value in row.items()              if key in cols_to_keep         }     )     .filter(filters) )  # Print the first row papers_db.take(1)

步骤4:将Dask Bag转换为DASK DATAFRAME

数据加载的最初一步是将Dask Bag转换为DASK DATAFRAME,这样咱们能够应用相似Pandas的API进行拜访。

 # Convert the Dask Bag to a Dask Dataframe schema = {     "id": str,     "title": str,     "categories": str,     "abstract": str,     "unix_time": int,     "text": str, } papers_df = papers_db.to_dataframe(meta=schema)  # Display first 5 rows papers_df.head()

应用MILVUS矢量数据库进行语义相似性搜寻

Milvus是最受欢迎的开源矢量数据库之一,所以咱们在本文中抉择应用它,并且咱们这里应用的是单机版,因为咱们只在本地机器上运行Milvus。

步骤1:本地装置MILVUS矢量数据库

应用Docker装置Milvus Vector数据库很简略,因而咱们首先须要装置Docker。而后就是下载Docker-compose.yml并启动Docker容器,如下所示!MILVUS.IO网站提供了许多其余抉择来装置Milvus单机版和Milvus群集版;如果须要在Kubernetes群集上装置或离线装置,请参考具体文档。

 # CD into milvus directory cd semantic_similarity/milvus   # Download the Standalone version of Milvus docker compose wget https://github.com/milvus-io/milvus/releases/download/v2.1.0/milvus-standalone-docker-compose.yml -O ./docker-compose.yml  # Run the Milvus server docker container on your local sudo docker-compose up -d

步骤2:创立一个Milvus汇合

咱们能够应用Pymilvus库与Milvus Vector数据库服务进行交互。emb_dim参数是文本转换为嵌入的维度。在SPECTRE的状况下,嵌入维度为768。

 # Make sure a Milvus server is already running from pymilvus import connections, utility from pymilvus import Collection, CollectionSchema, FieldSchema, DataType  # Connect to Milvus server connections.connect(alias="default", host="localhost", port="19530")  # Collection name collection_name = "arxiv"  # Embedding size emb_dim = 768  # # Check for existing collection and drop if exists # if utility.has_collection(collection_name): #     print(utility.list_collections()) #     utility.drop_collection(collection_name)

Milvus的汇合是相似于传统数据库中的表格。要创立一个汇合,首先须要指定汇合的模式。在本文示例中利用Milvus 2.1字符串索引和字段来存储与每篇论文相干的所有必要元数据。主键idx和其余字段categories、title、abstract是VARCHAR数据类型,而嵌入是蕴含emb_dim维度嵌入的FLOAT_VECTOR字段。Milvus反对多种数据类型,如下所示。

 # Create a schema for the collection idx = FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=16) categories = FieldSchema(name="categories", dtype=DataType.VARCHAR, max_length=200) title = FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=4096) abstract = FieldSchema(name="abstract", dtype=DataType.VARCHAR, max_length=65535) unix_time = FieldSchema(name="unix_time", dtype=DataType.INT64) embedding = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=emb_dim)  # Fields in the collection fields = [idx, categories, title, abstract, unix_time, embedding] schema = CollectionSchema(     fields=fields, description="Semantic Similarity of Scientific Papers" )  # Create a collection with the schema collection = Collection(     name=collection_name, schema=schema, using="default", shards_num=10 )

一旦创立了汇合,当初就能够将文本和向量插入到汇合中。

步骤3:遍历Dask分区,应用SPECTER进行文本嵌入,并将它们插入到Milvus。

咱们须要将Dask DATAFRAME中的文本转换为嵌入向量来进行语义类似度搜寻。所以首先须要生成文本的嵌入。本文将应用名为SPECTRE的SBERT双编码器模型。

SPECTER : Scientific Paper Embeddings using Citation-informed TransformERs。

简略的说SPECTER 是通过论文数据进行专门训练的模型,所以在选题分类、引文预测、科学论文举荐等方面的体现优于SciBERT,这就是咱们抉择他的文章。

通过Sentence Transformer库,应用事后训练的SPECTRE模型非常简单。只须要一行代码就能够下载预训练的模型,咱们还编写了一个简略的辅助函数,将Dask dataframe分区的整个文本列转换为嵌入。

 from sentence_transformers import SentenceTransformer from tqdm import tqdm  # Scientific Papers SBERT Model model = SentenceTransformer('allenai-specter')  def emb_gen(partition):     return model.encode(partition['text']).tolist()

咱们能够应用dask.map_partitions() API将嵌入生成的函数利用到分区中的每一行,而后能够应用collection.insert将数据上传到Milvus。

 # Initialize collection = Collection(collection_name)  for partition in tqdm(range(papers_df.npartitions)):     # Get the dask dataframe for the partition     subset_df = papers_df.get_partition(partition)      # Check if dataframe is empty     if len(subset_df.index) != 0:         # Metadata         data = [             subset_df[col].values.compute().tolist()             for col in ["id", "categories", "title", "abstract", "unix_time"]         ]          # Embeddings         data += [             subset_df             .map_partitions(emb_gen)             .compute()[0]         ]          # Insert data         collection.insert(data)

须要留神的是增加到数据变量中的列的程序必须与创立时定义的字段变量的程序雷同!

步骤4:对插入的数据将创立一个近似最近街坊(ANN)索引

在咱们将所有的嵌入插入到Milvus向量数据库后,还须要创立一个神经网络索引来放慢搜寻速度。在这个例子中,我应用的是HNSW索引,这是最快、最精确的ANN索引之一。无关HNSW指数及其参数的更多信息,请参阅Milvus文档。

 # Add an ANN index to the collection index_params = {     "metric_type": "L2",     "index_type": "HNSW",     "params": {"efConstruction": 128, "M": 8}, }  collection.create_index(field_name="embedding", index_params=index_params)

步骤5:运行向量类似度搜寻查问!

实现了以上步骤当前就能够查问Milvus汇合中的数据了。首先加载汇合:

 collection = Collection(collection_name) collection.load()

接下来,我创立了一个简略的辅助函数,它接管query_text并将其转换为SPECTRE嵌入,在Milvus汇合中执行ANN搜寻,并打印出后果。还有一些search_params能够管制搜寻的品质和速度,请参考Milvus文档。

 def query_and_display(query_text, collection, num_results=10):     # Embed the Query Text     query_emb = [model.encode(query_text)]      # Search Params     search_params = {"metric_type": "L2", "params": {"ef": 128}}      # Search     query_start = datetime.now()     results = collection.search(         data=query_emb,         anns_field="embedding",         param=search_params,         limit=num_results,         expr=None,         output_fields=["title", "abstract"],     )     query_end = datetime.now()      # Print Results     print(f"Query Speed: {(query_end - query_start).total_seconds():.2f} s")     print("Results:")     for res in results[0]:         title = res.entity.get("title").replace("\n ", "")         print(f"➡️ ID: {res.id}. L2 Distance: {res.distance:.2f}")         print(f"Title: {title}")         print(f"Abstract: {res.entity.get('abstract')}")

咱们看看后果怎么样

 # Query for papers that are similar to the SimCSE paper title = "SimCSE: Simple Contrastive Learning of Sentence Embeddings" abstract = """This paper presents SimCSE, a simple contrastive learning framework that greatly advances state-of-the-art sentence embeddings. We first describe an unsupervised approach, which takes an input sentence and predicts itself in a contrastive objective, with only standard dropout used as noise. This simple method works surprisingly well, performing on par with previous supervised counterparts. We find that dropout acts as minimal data augmentation, and removing it leads to a representation collapse. Then, we propose a supervised approach, which incorporates annotated pairs from natural language inference datasets into our contrastive learning framework by using "entailment" pairs as positives and "contradiction" pairs as hard negatives. We evaluate SimCSE on standard semantic textual similarity (STS) tasks, and our unsupervised and supervised models using BERT base achieve an average of 76.3% and 81.6% Spearman's correlation respectively, a 4.2% and 2.2% improvement compared to the previous best results. We also show -- both theoretically and empirically -- that the contrastive learning objective regularizes pre-trained embeddings' anisotropic space to be more uniform, and it better aligns positive pairs when supervised signals are available."""  query_text = f"{title}[SEP]{abstract}" query_and_display(query_text, collection, num_results=10)

如果不须要查问了,能够开释汇合来开释机器的内存。

 collection.release()

这在单机运行时是很好的办法,然而如果提供线上的服务则不要这样利用,因为每次加载都须要读取硬盘的数据,会很慢。

总结

在这篇文章中,咱们应用SPECTRE嵌入和Milvus向量数据库和几个简略的步骤中实现了一个可扩大的科学论文语义搜寻服务。这种办法在生产中可扩大到数亿甚至数十亿的数据。Milvus在30毫秒内返回了前10个后果,这个速度对咱们来说还是十分不错的。

https://avoid.overfit.cn/post/36ad9ebf46ad43f78b84595e793e1a34

作者:Marie Stephen Leo