介绍
本次项目使用深度学习自动生成图像字幕。如上图,模型自动生成“The person is riding a surfboard in the ocean”字幕。我们具体该如何实现呢?
如图所示,我们需要分别使用 CNN 和 RNN 模型来实现。
CNN 模型:
利用卷积网络对图像特征提取的强大能力,来提取特征信息。我们的 CNN 模型需要有强大的识别能力,因此该模型需要使用过大量,多类别的训练集进行训练,并且识别准确率较高。本次,我们利用迁移学习使用 Inception 模型实现此功能。通过迁移学习实现 OCT 图像识别 文章中有迁移学习的相关介绍。
RNN 模型:对于文本序列数据,目前我们最好的选择依然是 RNN 模型。为了提升模型预测能力,我们使用注意力机制实现文本预测。注意力机制实现机器翻译 文章中有注意力机制的相关介绍。
对模型的细节要求我们将在对应代码实现里进行介绍。
数据集介绍
我们使用 MS-COCO 数据集进行训练,为方便理解,简单介绍下数据格式。COCO 数据有 5 种类型,分别是:object detection, keypoint detection, stuff segmentation, panoptic segmentation,image captioning。基础数据结构如下图所示:
具体样例(部分):
本次项目使用的是 Image Captioning 其中,每张照片不少于 5 个字幕:
数据下载处理
import tensorflow as tf
# 开启 eager 模式
tf.enable_eager_execution()
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
annotation_zip=tf.keras.utils.get_file(
# cache_dir(默认值): `~/.keras`
# cache_subdir: `datasets`,
# ~/.keras/datasets/captions.zip
fname=’captions.zip’,
cache_subdir=os.path.abspath(‘.’),
origin=’http://images.cocodataset.org/annotations/annotations_trainval2014.zip’,
# 解压
extract=True
)
# 返回文件夹名,实现:split(file)[0]
annotation_file = os.path.dirname(annotation_zip)+’/annotations/captions_train2014.json’
name_of_zip=’train2014.zip’
if not os.path.exists(os.path.abspath(‘.’)+”/”+name_of_zip):
image_zip=tf.keras.utils.get_file(
fname=name_of_zip,
cache_subdir=os.path.abspath(‘.’),
origin=’http://images.cocodataset.org/zips/train2014.zip’,
extract=True
)
PATH=os.path.dirname(image_zip)+’train2014/’
else:
PATH=os.path.abspath(‘.’)+’/train2014/’
读取字幕和图片:
# 读取注释 json 文件
with open(annotation_file,’r’) as f:
annotations=json.load(f)
# 保存全部字幕
all_captions=[]
# 保存全部图片
all_img_name_vecotr=[]
# json 格式参考 COCO 数据集官网
for annot in annotations[‘annotations’]:
# 添加开始和结束标记
caption='<start>’+annot[‘caption’]+'<end>’
# 获取图片名字
image_id=annot[‘image_id’]
# 参考文章开始给出的“具体样例”
full_coco_image_path=PATH+’COCO_train2014_’+’%012d.jpg’%(image_id)
all_img_name_vecotr.append(full_coco_image_path)
all_captions.append(caption)
# random_state 随机种子,确保每次数据一致
train_captions,img_name_vector=shuffle(
all_captions,
all_img_name_vecotr,
random_state=1
)
# 使用训练集前 30000 样本
num_examples=30000
train_captions=train_captions[:num_examples]
img_name_vector=img_name_vector[:num_examples]
重训练 InceptionV3:
简单介绍下 InceptionV3 模型:
Inception 模型结构中最重要的思想就是卷积核分解。通过上图可知,5×5 的卷积可由 2 个 3 ×3 的卷积代替,3×3 卷积可由一个 3 ×1 卷积和一个 1 ×3 卷积代替,代替的好处是减少了权重参数量,增加了网络非线性(层增多)。比如,一个 5 ×5 卷积的权重参数量和 2 个 3 ×3 卷积的权重参数量分别是(5×5):(3×3)x2。InceptionV3 中就将 7 ×7 的卷积分解成 7 ×1 卷积和 1 ×7 卷积。
批标准化(BN)正式提出是在 InceptionV2,BN 通过将输入分布转变成均值为 0,标准差为 1 的正态分布,将值域处于激活函数敏感范围从而防止梯度消失问题。正因为梯度消失问题的解决,我们可以使用更大的学习率进行训练从而加快模型收敛。由于 BN 有类似 Dropout 的正则化作用,因此在训练的时候不使用或少使用 Dropout,并减轻 L2 正则。
使用非对称卷积,如:1×3 卷积,3×1 卷积(论文作者指出在 feature map 的大小 12×12~20×20 之间效果最好)。
使用 Label Smoothing 对损失修正。下图是新损失函数:
网络各层信息如下图所示:
# 使用 inception V3 要求图片分辨率:299,299
# 输入值范围[-1,1]
def load_image(image_path):
img=tf.image.decode_jpeg(tf.read_file(image_path))
img_reshape=tf.image.resize_images(img,(299,299))
# 像素范围[-1,1]
# (-255)/255
img_range=tf.keras.applications.inception_v3.preprocess_input(img_reshape)
return img_range,image_path
使用迁移学习构建新模型:
# 最后一层卷积输入 shape(8*8*2048), 并将结果向量保存为 dict
image_model=tf.keras.applications.InceptionV3(
# 不使用最后全连接层
include_top=False,
# inception 模型的训练集是 imagenet
weigths=’imagenet’
)
# shape:(batch_size,299,299,3)
new_input=image_model.input
# hidden_layer shape:(batch_size,8,8,2048)
hidden_layer=image_model.layers[-1].output
# 创建新模型
image_features_extract_model=tf.keras.Model(
new_input,
hidden_layer
)
保存通过使用 InceptionV3 获得的特征:
encode_train=sorted(set(img_name_vector))
# map: 可以并行处理数据,默认读取的文件具有确定性顺序
# 取消顺序可以加快数据读取
# 通过设置参数 num_parallel_calls 实现
image_dataset=tf.data.Dataset.from_tensor_slices(encode_train).map(load_image).batch(16)
for img,path in image_dataset:
# inception v3 得到的 feature
batch_features=image_features_extract_model(img)
batch_features=tf.reshape(
# shape:(batch_size,8,8,2048) reshape:(batch_size,64,2048)
batch_features,shape=(batch_features.shape[0],-1,batch_features[3])
)
# 保存
for bf,p in zip(batch_features,path):
path_of_feature=p.numpy().decode(‘utf-8′)
# 文件后缀.npy
np.save(path_of_feature,bf.numpy())
文本处理
文本处理方式还是老规矩,先将文本转成字典表示然后创建字符转 ID,ID 转字符,最后补长到预设长度。
# 计算最大长度
def calc_max_length(tensor):
return max(len(t)for t in tensor)
top_k=5000
tokenizer=tf.keras.preprocessing.text.Tokenizer(
num_words=top_k,
# 字典中没有的字符用 <unk> 代替
oov_token='<unk>’,
# 需要过滤掉的特殊字符
filters=’!”#$%&()*+.,-/:;=?@[\]^_`{|}~’
)
# 要用以训练的文本列表
tokenizer.fit_on_texts(train_captions)
# 转为序列列表向量
train_seqs=tokenizer.texts_to_sequences((train_captions))
tokenizer.word_index[‘<pad>’]=0
# 如果没有指定最大长度,pad_sequences 会自动计算最大长度
cap_vector=tf.keras.preprocessing.sequence.pad_sequences(
sequences=train_seqs,
# 后置补长
padding=’post’
)
max_length=calc_max_length(train_seqs)
模型训练参数
拆分训练集,验证集:
img_name_train,img_name_val,cap_trian,cap_val=train_test_split(
img_name_vector,
cap_vector,
# 验证数据集占 20%
test_size=0.2,
# 确保每次数据一致
random_state=0
# 最好是 2 的次幂,更适合 GPU 运算(加快二进制运算)
BATCH_SIZE=64
# shuffle 缓冲区大小
BUFFER_SIZE=1000
# 词嵌入维度
embedding_dim=256
units=512
vocab_size=len(tokenizer.word_index)
# 后面会将 (8,8,2048) 转为(64,2048)
# 维度一定要一致
feature_shape=2048
attention_features_shape=64
# 加载保存的之前 feature 文件
def map_func(img_name,cap):
img_tensor=np.load(img_name.decode(‘utf-8′)+’.npy’)
return img_tensor,cap
dataset=tf.data.Dataset.from_tensor_slices((img_name_train,cap_trian))
# num_parallel_calls 根据自己的 CPU 而定
dataset=dataset.map(lambda item1,item2:tf.py_func(
map_func,[item1,item2],[tf.float32,tf.int32]
),num_parallel_calls=4)
# prefetch 可以合理利用 CPU 准备数据,GPU 计算数据之间的空闲时间,加快数据读取
dataset=dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(1)
创建模型
编码器模型:
# 一层使用 relu 的全连接层
class CNN_Encoder(tf.keras.Model):
def __init__(self,embedding_dim):
super(CNN_Encoder, self).__init__()
# fc shape:(batch_size,64,embedding_dim)
self.fc=tf.keras.layers.Dense(embedding_dim)
def __call__(self,x):
x=self.fc(x)
x=tf.nn.relu(x)
return x
注意力层:详细介绍可以查看文章开始给出的链接,这里给出计算方程式:
class BahdanauAttention(tf.keras.Model):
def __init__(self,units):
super(BahdanauAttention, self).__init__()
self.W1=tf.keras.layers.Dense(units)
self.W2=tf.keras.layers.Dense(units)
self.V=tf.keras.layers.Dense(1)
def __call__(self, features,hidden):
# 参考注意力机制计算的方程
# feature shape:(batch_size,64,embedding_dim)
# hidden_state shape:(batch_size,hidden_size)
hidden_with_time_axis=tf.expand_dims(hidden,1)
# score shape:(batch_size,64,hidden_size)
score=tf.nn.tanh(self.W1(features)+self.W2(hidden_with_time_axis))
# attention_weights shape:(batch_size,64,1)
attention_weights=tf.nn.softmax(self.V(score),axis=1)
context_vector=tf.reduce_sum(attention_weights*features,axis=1)
return context_vector,attention_weights
解码器中的 GRU:
# 相比 LSTM 因为减少了一个门,参数少,收敛快
def gru(units):
if tf.test.is_gpu_available():
# 使用 GPU 加速计算
return tf.keras.layers.CuDNNGRU(
units=units,
return_state=True,
return_sequences=True,
# 循环核的初始化方法
# glorot_uniform 是 sqrt(2 / (fan_in + fan_out))的正态分布产生
# 其中 fan_in 和 fan_out 是权重张量的扇入扇出(即输入和输出单元数目)
recurrent_initializer=’glorot_uniform’
)
else:
return tf.keras.layers.GRU(
return_sequences=True,
return_state=True,
# 默认:hard_sigmoid <= -1 输出 0,>=1 输出 1,中间为线性
recurrent_activation=’sigmoid’,
recurrent_initializer=’glorot_uniform’
)
解码器模型:
# 使用注意力模型
class RNN_Decoder(tf.keras.Model):
def __init__(self,embedding_dim,units,vocab_size):
super(RNN_Decoder, self).__init__()
self.units=units
# 词嵌入将高维离散数据转为低维连续数据,并表现出数据之间的相似性(向量空间)
self.embedding=tf.keras.layers.Embedding(input_shape=vocab_size,output_dim=embedding_dim)
self.gru=gru(units)
self.fc1=tf.keras.layers.Dense(self.units)
self.fc2=tf.keras.layers.Dense(vocab_size)
self.attention=BahdanauAttention(self.units)
def __call__(self,x,features,hidden):
# 获取注意力模型输出
context_vector,attention_weights=self.attention(features,hidden)
# x shape:(batch_size,1,embedding_dim)
x=self.embedding(x)
# 注意力,当前输入合并
# 注意力 shape:(batch_size,1,hidden) x shape:(batch_size,1,embedding_size)
# x shape:(batch_size, 1, embedding_dim + hidden_size)
x=tf.concat([tf.expand_dims(context_vector,1),x],axis=-1)
output,state=self.gru(x)
# x shape:(batch_size,max_length,hidden_size)
x=self.fc1(output)
# x shape:(batch_size*max_length,hidden_size)
x=tf.reshape(x,shape=(-1,x.shape[2]))
# x shape:(batch_size*max_length,vocab_size)
x=self.fc2(x)
return x,state,attention_weights
def reset_state(self, batch_size):
return tf.zeros((batch_size, self.units))
模型训练
实例化模型:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)
损失函数,优化器设置:
# InceptionV3 模型使用的不是 Adam 优化器
# 各种优化器以后放到一篇单独的文章详细介绍
optimizer=tf.train.AdamOptimizer(learning_rate=0.0001)
def loss_function(real,pred):
mask=1-np.equal(real,0)
# 带 mask 的交叉熵损失
loss_=tf.nn.sparse_softmax_cross_entropy_with_logits(
labels=real,
logits=pred
)*mask
return tf.reduce_mean(loss_)
训练:
将使用 InceptionV3 模型提取的特征作为编码器输入
编码器输出,hidden_state,字幕文本作为解码器输入
解码器 hidden_state 作为下一次输入,预测值用于计算模型损失
使用标签文本作为解码器输入(teacher-forcing 模式)
梯度计算及应用
loss_plot=[]
EPOCHS=20
for epoch in range(EPOCHS):
start=time.time()
total_loss=0
for (batch,(img_tensor,target)) in enumerate(dataset):
loss=0
# 每迭代一次 batch 后重置 hidden_state
hidden=decoder.reset_states(batch_size=target.shape[0])
# input 维度是 3 维
dec_input=tf.expand_dims([tokenizer.word_index[‘<start>’]*BATCH_SIZE],1)
# eager 模式下记录梯度
with tf.GradientTape() as tape:
# inception 模式提取的特征
features=encoder(img_tensor)
# 每张照片不止一个 captions
for i in range(1,target.shape[1]):
# attention_weights 此处暂不需要
predictions,hidden,_=decoder(dec_input,features,hidden)
loss+=loss_function(target[:,i],predictions)
# teacher forcing 使用标签数据作为输入替代 hidden-output
dec_input=tf.expand_dims(target[:,i],1)
total_loss+=(loss/int(target.shape[1]))
# 总训练参数
variables=encoder.variables+decoder.variables
# 梯度计算及应用
gradients=tape.gradient(loss,variables)
optimizer.apply_gradients(zip(gradients,variables))
if batch%100 == 0:
print(‘epoch{},batch{},loss{:.4}’.format(
epoch+1,
batch,
loss.numpy()/int(target.shape[1])
))
loss_plot.append(total_loss/len(cap_vector))
plt.plot(loss_plot)
plt.xlabel(‘epochs’)
plt.ylabel(‘loss’)
plt.show()
模型预测
模型预测不使用 Teacher forcing 模式,当遇到预设的结束标记“<end>”时模型结束训练。
def evaluate(image):
attention_plot = np.zeros((max_length, attention_features_shape))
# 初始化 hidden-state
hidden = decoder.reset_state(batch_size=1)
# shape:(1,299,299,3)
temp_input = tf.expand_dims(load_image(image)[0], 0)
# 特征提取
img_tensor_val = image_features_extract_model(temp_input)
# shape:(1,8,8,2048) reshape:(1,64,2048)
img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))
# shape:(1,64,256)
features = encoder(img_tensor_val)
# 增加 batchsize 维度
dec_input = tf.expand_dims([tokenizer.word_index[‘<start>’]], 0)
result = []
for i in range(max_length):
predictions, hidden, attention_weights = decoder(dec_input, features, hidden)
attention_plot[i] = tf.reshape(attention_weights, (-1,)).numpy()
# 我们使用 softmax 归一化结果,使用 argmax 查询最大值
# 对于分类数量大于 2,softmax 和 sigmoid 的区别是
# 类别之间有相互关系的使用 sigmoid,反之使用 softmax
predicted_id = tf.argmax(predictions[0]).numpy()
# ID 转字符,获取文本结果
result.append(tokenizer.index_word[predicted_id])
# 判断是否是预设的结束标记
if tokenizer.index_word[predicted_id] == ‘<end>’:
return result, attention_plot
# 将预测值作为输入,预测下一个结果(teacher-forcing 在这里使用数据标签作为输入)
dec_input = tf.expand_dims([predicted_id], 0)
attention_plot = attention_plot[:len(result), :]
return result, attention_plot
以下用于可视化注意力机制训练过程:此处代码主要是图像展示就不做过多介绍了。
def plot_attention(image, result, attention_plot):
temp_image = np.array(Image.open(image))
fig = plt.figure(figsize=(10, 10))
len_result = len(result)
for l in range(len_result):
temp_att = np.resize(attention_plot[l], (8, 8))
ax = fig.add_subplot(len_result//2, len_result//2, l+1)
ax.set_title(result[l])
img = ax.imshow(temp_image)
ax.imshow(temp_att, cmap=’gray’, alpha=0.6, extent=img.get_extent())
plt.tight_layout()
plt.show()
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ‘ ‘.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)
print (‘Real Caption:’, real_caption)
print (‘Prediction Caption:’, ‘ ‘.join(result))
plot_attention(image, result, attention_plot)
Image.open(img_name_val[rid])
总结
想要对图像生成字幕,首先需要提取图像特征,本文我们利用迁移学习使用 Inception 模型来提取特征,对于 Inception 模型,我们重点理解卷积核分解。至于文本预测部分与使用注意力机制实现机器翻译大体一致。有一点想说的是,类似这样的项目维度转换会比较多,也是很容易出错的地方,这一点需要格外留意。
本文代码内容来自 Yash Katariya 在此表示感谢。