关于人工智能:使用CLIP构建视频搜索引擎

40次阅读

共计 5908 个字符,预计需要花费 15 分钟才能阅读完成。

CLIP(Contrastive Language-Image Pre-training) 是一种机器学习技术,它能够精确了解和分类图像和自然语言文本,这对图像和语言解决具备深远的影响,并且曾经被用作风行的扩散模型 DALL- E 的底层机制。在这篇文章中,咱们将介绍如何调整 CLIP 来辅助视频搜寻。

这篇文章将不深入研究 CLIP 模型的技术细节,而是展现 CLIP 的另外一个理论利用 (除了扩散模型外)。

首先咱们要晓得:CLIP 应用图像解码器和文本编码器来预测数据集中哪些图像与哪些文本是匹配的。

应用 CLIP 进行搜寻

通过应用来自 hugging face 的预训练 CLIP 模型,咱们能够构建一个简略而弱小的视频搜索引擎,并且具备自然语言能力,而且不须要进行特色工程的解决。

咱们须要用到以下的软件

Python≥= 3.8,ffmpeg,opencv

通过文本搜寻视频的技术有很多。咱们能够将搜索引擎将由两局部组成,索引和搜寻。

索引

视频索引通常波及人工和机器过程的联合。人类通过在题目、标签和形容中增加相干关键字来预处理视频,而自动化过程则是提取视觉和听觉特色,例如物体检测和音频转录。用户交互指标等等,这样能够记录视频的哪些局部是最相干的,以及它们放弃相关性的工夫。所有这些步骤都有助于创立视频内容的可搜寻索引。

索引过程的概述如下

  • 将视频宰割成多个场景
  • 为框架取样场景
  • 帧解决后进行像素嵌入
  • 索引建设存储

将视频分成多个场景

为什么场景检测很重要? 视频由场景组成,而场景由类似的帧组成。如果咱们只对视频中的任意场景进行采样,可能会错过整个视频中的关键帧。

所以咱们就须要精确地辨认和定位视频中的特定事件或动作。例如,如果我搜寻“公园里的狗”,而我正在搜寻的视频蕴含多个场景,例如一个男人骑自行车的场景和一个公园里的狗的场景,场景检测能够让我辨认出与搜寻查问最靠近的场景。

能够应用“scene detect”python 包来进行这个操作。

 mport scenedetect as sd
 
 video_path = '' # path to video on machine
 
 video = sd.open_video(video_path)
 sm = sd.SceneManager()
         
 sm.add_detector(sd.ContentDetector(threshold=27.0))
 sm.detect_scenes(video)
 
 scenes = sm.get_scene_list()

对场景的帧进行采样

而后就须要应用 cv2 对视频进行帧采样。

 import cv2
 
 cap = cv2.VideoCapture(video_path)
 
 every_n = 2 # number of samples per scene
 
 scenes_frame_samples = []    
 for scene_idx in range(len(scenes)):
     scene_length = abs(scenes[scene_idx][0].frame_num - scenes[scene_idx][1].frame_num)
     every_n = round(scene_length/no_of_samples)
     local_samples = [(every_n * n) + scenes[scene_idx][0].frame_num for n in range(3)]
             
     scenes_frame_samples.append(local_samples)

将帧转换为像素嵌入

在收集样本之后,咱们须要将它们计算成 CLIP 模型可用的货色。

首先须要将每个样本转换为图像张量嵌入。

 from transformers import CLIPProcessor
 from PIL import Image
 
 clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
 
 def clip_embeddings(image):
     inputs = clip_processor(images=image, return_tensors="pt", padding=True)
     input_tokens = {k: v for k, v in inputs.items()
     }
 
     return input_tokens['pixel_values']
 
 # ... 
 scene_clip_embeddings = [] # to hold the scene embeddings in the next step
 
 for scene_idx in range(len(scenes_frame_samples)):
     scene_samples = scenes_frame_samples[scene_idx]
 
     pixel_tensors = [] # holds all of the clip embeddings for each of the samples
     for frame_sample in scene_samples:
         cap.set(1, frame_sample)
         ret, frame = cap.read()
         if not ret:
             print('failed to read', ret, frame_sample, scene_idx, frame)
             break
 
          pil_image = Image.fromarray(frame)
                 
          clip_pixel_values = clip_embeddings(pil_image)
          pixel_tensors.append(clip_pixel_values)

下一步就是均匀同一场景中的所有样本,这样能够升高样本的维数,而且还能够解决单个样本中存在噪声的问题。

 import torch
 import uuid
 
 def save_tensor(t):
     path = f'/tmp/{uuid.uuid4()}'
     torch.save(t, path)
 
     return path
 
 # ..
 avg_tensor = torch.mean(torch.stack(pixel_tensors), dim=0)
 scene_clip_embeddings.append(save_tensor(avg_tensor))

这样就取得了一个 CLIP 嵌入的示意视频内容的的张量列表。

存储索引

对于底层索引存储,咱们应用 LevelDB(LevelDB 是由谷歌保护的键 / 值库)。咱们搜索引擎的架构将包含 3 个独立的索引:

视频场景索引:哪些场景属于特定视频

场景嵌入索引:保留特定的场景数据

视频元数据索引:保留视频的元数据。

咱们将首先将视频中所有计算出的元数据以及视频的惟一标识符,插入到元数据索引中,这一步都是现成的,非常简单。

 import leveldb
 import uuid
 
 def insert_video_metadata(videoID, data):
     b = json.dumps(data)
 
     level_instance = leveldb.LevelDB('./dbs/videometadata_index')
     level_instance.Put(videoID.encode('utf-8'), b.encode('utf-8'))
 
 # ...
 video_id = str(uuid.uuid4())
 insert_video_metadata(video_id, {'VideoURI': video_path,})

而后在场景嵌入索引中创立一个新条目保留视频中的每个像素嵌入,还须要一个惟一的标识符来辨认每个场景。

 import leveldb
 import uuid
 
 def insert_scene_embeddings(sceneID, data):
     level_instance = leveldb.LevelDB('./dbs/scene_embedding_index')
     level_instance.Put(sceneID.encode('utf-8'), data)
 
 # ...
 for f in scene_clip_embeddings:
     scene_id = str(uuid.uuid4())
     
     with open(f, mode='rb') as file:
         content = file.read()
             
         insert_scene_embeddings(scene_id, content)

最初,咱们须要保留哪些场景属于哪个视频。

 import leveldb
 import uuid
 
 def insert_video_scene(videoID, sceneIds):
     b = ",".join(sceneIds)
     
     level_instance = leveldb.LevelDB('./dbs/scene_index')
     level_instance.Put(videoID.encode('utf-8'), b.encode('utf-8'))
 
 # ...
 scene_ids = []
 for f in scene_clip_embeddings:
     # .. as shown in previous step
     scene_ids.append(scene_id)
     scene_embedding_index.insert(scene_id, content)
 
 scene_index.insert(video_id, scene_ids)

搜寻

当初咱们有了一种将视频的索引,上面就能够依据模型输入对它们进行搜寻和排序。

第一步须要遍历场景索引中的所有记录。而后,创立一个视频中所有视频和匹配场景 id 的列表。

 records = []
 
 level_instance = leveldb.LevelDB('./dbs/scene_index')
 
 for k, v in level_instance.RangeIter():    
     record = (k.decode('utf-8'), str(v.decode('utf-8')).split(','))
     records.append(record)

下一步须要收集每个视频中存在的所有场景嵌入张量。

import leveldb

def get_tensor_by_scene_id(id):
    level_instance = leveldb.LevelDB('./dbs/scene_embedding_index')
    b = level_instance.Get(bytes(id,'utf-8'))

    return BytesIO(b)

for r in records:
    tensors = [get_tensor_by_scene_id(id) for id in r[1]]

在咱们有了组成视频的所有张量之后,咱们能够把它传递到模型中。该模型的输出是“pixel_values”,示意视频场景的张量。

import torch
from transformers import CLIPProcessor, CLIPModel

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

inputs = processor(text=text, return_tensors="pt", padding=True)

for tensor in tensors:
    image_tensor = torch.load(tensor)
    inputs['pixel_values'] = image_tensor   
    outputs = model(**inputs)

而后拜访模型输入中的“logits_per_image”取得模型的输入。

Logits 实质上是对网络的原始非标准化预测。因为咱们只提供一个文本字符串和一个示意视频中的场景的张量,所以 logit 的构造将是一个单值预测。

logits_per_image = outputs.logits_per_image    
probs = logits_per_image.squeeze()

prob_for_tensor = probs.item()

将每次迭代的概率相加,并在运算完结时将其除以张量的总数来取得视频的均匀概率。

def clip_scenes_avg(tensors, text):
    avg_sum = 0.0

    for tensor in tensors:
        # ... previous code snippets
        probs = probs.item()
        avg_sum += probs.item()

    return avg_sum / len(tensors)

最初在失去每个视频的概率并对概率进行排序后,返回申请的搜寻后果数目。

import leveldb
import json

top_n = 1 # number of search results we want back

def video_metadata_by_id(id):
    level_instance = leveldb.LevelDB('./dbs/videometadata_index')
    b = level_instance.Get(bytes(id,'utf-8'))
    return json.loads(b.decode('utf-8'))

results = []
for r in records:
    # .. collect scene tensors

    # r[0]: video id
    return (clip_scenes_avg, r[0]) 

sorted = list(results)
sorted.sort(key=lambda x: x[0], reverse=True)

results = []
for s in sorted[:top_n]:
    data = video_metadata_by_id(s[1])

    results.append({'video_id': s[1],
        'score': s[0],
        'video_uri': data['VideoURI']
     })

就是这样! 当初就能够输出一些视频并测试搜寻后果。

总结

通过 CLIP 能够轻松地创立一个频搜索引擎。应用预训练的 CLIP 模型和谷歌的 LevelDB,咱们能够对视频进行索引和解决,并应用自然语言输出进行搜寻。通过这个搜索引擎使用户能够轻松地找到相干的视频,最次要的是咱们并不需要大量的预处理或特色工程。

那么咱们还能有什么改良呢?

  • 应用场景的工夫戳来确定最佳场景。
  • 批改预测让他在计算集群上运行。
  • 应用向量搜索引擎,例如 Milvus 代替 LevelDB
  • 在索引的根底上建设举荐零碎
  • 等等

最初:

能够在这里找到本文的代码:

https://avoid.overfit.cn/post/a190cdd81cf74c5dadd651a87697d14c

作者:Guy Ross

正文完
 0