共计 10120 个字符,预计需要花费 26 分钟才能阅读完成。
介绍
Attention 模型形象的比喻就是“图像对焦”。
上图是 Encoder-Decoder 模型,Decoder 中每个单词生成过程如下:
其中 C 是“语义编码 C”,f 是 Decoder 的非线性变换函数。由此,我们可以看出生成目标句子的每个单词都使用同一个语义编码 C,即:源句子中的每个单词的影响力都是一样的,这如同图像没有对焦的情况,现实项目中也存在明显的不合理。比如一个机器翻译模型,输入是“Tom chase Jerry”, 模型输出:“汤姆”,“追逐”,“杰瑞”。在翻译“杰瑞”的时候显然“Jerry”的贡献值最大,如果每个单词的贡献值相同明显不合理。这个问题在输入句子长度较短时问题不大,但是当输入句子较长时会丢失很多细节信息(个人觉得此处类似平均池化和最大值池化)。正因为如此,我们引入了 Attention 思想。
Soft Attention 模型
使用 Attention 模型翻译“杰瑞”的时候,我们可以得到输入句子中的每个单词对输出当前单词的贡献值大小如:(Tom,0.3)(Chase,0.2) (Jerry,0.5)。这意味着生成每个单词 yi 时不再使用同一个语义编码 C,而是根据 yi 使用不同的 Ci。在引入 Attention 模型后 yi 的计算过程改变如下所示:
每个 Ci 对应源句子中每个单词的注意力分配概率,示例如下:
f2 是 Encoder 对每个单词的变换函数,g 函数代表整个源句子的中间语义表示的变换函数,一般形式是加权求和:
aji 代表注意力分配系数,hj 代表源句子中某个单词的语义编码,Lx 代表源句子中单词数量。g 函数的计算过程如下图所示:
Attention 模型概率计算
如果所示,当我们要生成 yi 单词,此时我们用 i - 1 时刻的隐藏节点输出值 Hi- 1 去和源句子中的每个单词对应 RNN 隐藏节点状态 hj 依次进行对比,即:通过函数 F(hj,Hi-1)来获得 yi 对源句子中每个单词对应的对齐可能性,函数 F 常见方法如下图所示:
然后使用 Softmax 函数进行数值归一化处理。如对“对齐概率”不理解的朋友,可以查看下图英语 - 德语翻译系统中加入 Attention 机制后,Encoder 和 Decoder 两个句子中每个单词对应注意力分配概率分布。
Self Attention 模型
在 Soft Attention 模型中,Attention 机制发生在 Decoder 中 Yi 和 Encoder 中的所有元素之间。Self Attention 模型不是在两者之间,而是 Decoder 内部元素之间或者 Encoder 内部元素之间发生的 Attention 机制,计算方法和 Soft Attention 模型一致。那么 Self Attention 模型有什么好处?我们依然以机器翻译为例:
如图所示,Self Attention 模型在内部可以捕获一些句法特征或语义特征。Self Attention 模型相比传统 RNN 模型需要依次序序列计算,它的感受野更大,可以直接将句子中的任意两个单词的联系通过一个计算步骤联系起来,可以捕获远距离的相互依赖特征(就像列表和数组的区别)。此外,Self Attention 模型对于增加计算的并行性也有帮助。
案例
我们使用的语言数据集是“英语 - 西班牙语”,数据集样本如下图所示:
数据导入
# 数据下载
path_to_zip=tf.keras.utils.get_file(
fname=’spa-eng.zip’,
origin=’http://download.tensorflow.org/data/spa-eng.zip’,
# 解压 tar zip 文件
extract=True
)
path_to_file=os.path.dirname(path_to_zip)+’/spa-eng/spa.txt’
转码:
def unicode_to_ascii(sen):
return ”.join(
char for char in unicodedata.normalize(‘NFD’,sen)
if unicodedata.category(char) != ‘Mn’
)
数据预处理
每条训练语句添加开始和结束标记
移除句子中的特殊字符
字符转 ID,ID 转字符并排序
将句子补长到预设的最大长度
def preprocess_sentence(w):
w = unicode_to_ascii(w.lower().strip())
# 在单词和标点之间创建空格
# 如:“he is a boy.” => “he is a boy .”
w = re.sub(r”([?.!,¿])”, r” \1 “, w)
w = re.sub(r'[” “]+’, ” “, w)
# 特殊字符以空格代替
w = re.sub(r”[^a-zA-Z?.!,¿]+”, ” “, w)
w = w.rstrip().strip()
# 添加开始和结束标记
w = ‘<start> ‘ + w + ‘ <end>’
return w
创建数据集:
def create_dataset(path, num_examples):
lines = open(path, encoding=’UTF-8′).read().strip().split(‘\n’)
word_pairs = [[preprocess_sentence(w) for w in l.split(‘\t’)] for l in lines[:num_examples]]
# 返回格式:[ENGLISH, SPANISH]
return word_pairs
字符转 ID,ID 转字符,并排序:
class LanguageIndex():
def __init__(self,lang):
self.lang=lang
self.wrod2idx={}
self.id2word={}
self.vacab=set()
self.create_index()
def create_index(self):
for phrase in self.lang:
# 添加到集合中,重复内容不添加
self.vacab.update(phrase.split(‘ ‘))
self.vacab=sorted(self.vacab)
self.wrod2idx[‘<pad>’]=0
#字符 -ID 转换
for index,word in enumerate(self.vacab):
self.wrod2idx[word]=index+1
for word,index in self.wrod2idx.items():
self.id2word[index]=word
加载数据集:
# 计算最大长度
def max_length(tensor):
return max(len(t) for t in tensor)
def load_dataset(path,num_example):
#get inputs outputs
pairs=create_dataset(path,num_example)
# 获取 ID 表示
inp_lang=LanguageIndex(sp for en,sp in pairs)
targ_lang=LanguageIndex(en for en,sp in pairs)
# LanguageIndex 不包含重复值,以下包含重复值
input_tensor=[[inp_lang.wrod2idx[s]for s in sp.split(‘ ‘)]for en,sp in pairs]
target_tensor=[[targ_lang.wrod2idx[s]for s in en.split(‘ ‘)]for en,sp in pairs]
max_length_inp,max_length_tar=max_length(input_tensor),max_length(target_tensor)
# 将句子补长到预设的最大长度
# padding: post: 后补长,pre: 前补长
input_tensor=tf.keras.preprocessing.sequence.pad_sequences(
sequences=input_tensor,
maxlen=max_length_inp,
padding=’post’
)
target_tensor=tf.keras.preprocessing.sequence.pad_sequences(
sequences=target_tensor,
maxlen=max_length_tar,
padding=’post’
)
return input_tensor,target_tensor,inp_lang,targ_lang,max_length_inp,max_length_tar
创建训练集验证集:
# 本次项目只使用前 30000 条数据
num_examples = 30000
input_tensor, target_tensor, inp_lang, targ_lang, max_length_inp, max_length_targ = load_dataset(path_to_file, num_examples)
# 训练集 80%,验证集 20%
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)
模型训练配置
# 打乱数据集
BUFFER_SIZE=len(input_tensor_train)
BATCH_SIZE=64
# 每个 epoch 迭代次数
N_BATCH=BUFFER_SIZE // BATCH_SIZE
# 词嵌入维度
embedding_dim=256
# 隐藏神经元数量
units=1024
vocab_inp_size=len(inp_lang.wrod2idx)
vocab_tar_size=len(targ_lang.wrod2idx)
dataset=tf.data.Dataset.from_tensor_slices((input_tensor_train,target_tensor_train)).shuffle(BUFFER_SIZE)
# drop_remainder 当剩余数据量小于 batch_size 时候,是否丢弃
dataset=dataset.batch(BATCH_SIZE,drop_remainder=’True’)
案例 Attention 模型计算
文章开始我们介绍了 Attention 模型的计算过程,相信你会很容易理解上图的内容。对每个节点具体方程实现如下:
FC= 全连接层,EO= 编码器输出,H= 隐藏层状态,X= 解码器输入,模型计算过程如下表示:
score = FC(tanh(FC(EO) + FC(H)))
attention weights = softmax(score, axis = 1)
context vector = sum(attention weights * EO, axis = 1)
embedding output= 解码器输入 X,输入词嵌入层
merged vector=concat(embedding output, context vector)
将 merged vector 输入到 GRU
创建模型
GRU 配置:
def gru(units):
# 使用 GPU 加速运算
if tf.test.is_gpu_available():
return tf.keras.layers.CuDNNGRU(units,
return_sequences=True,
return_state=True,
# 循环核的初始化方法
# glorot_uniform 是 sqrt(2 / (fan_in + fan_out)) 的正态分布产生
# 其中 fan_in 和 fan_out 是权重张量的扇入扇出(即输入和输出单元数目)
recurrent_initializer=’glorot_uniform’)
else:
return tf.keras.layers.GRU(units,
return_sequences=True,
return_state=True,
# hard_sigmoid <= -1 输出 0,>=1 输出 1,中间为线性
recurrent_activation=’sigmoid’,
recurrent_initializer=’glorot_uniform’)
编码器:
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
self.batch_sz = batch_sz
self.enc_units = enc_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.gru = gru(self.enc_units)
def call(self, x, hidden):
x = self.embedding(x)
output, state = self.gru(x, initial_state = hidden)
return output, state
def initialize_hidden_state(self):
return tf.zeros((self.batch_sz, self.enc_units))
解码器:
class Decoder(tf.keras.Model):
def __init__(self,vocab_size,embedding_dim,dec_units,batch_sz):
super(Decoder, self).__init__()
self.batch_sz=batch_sz
self.dec_units=dec_units
self.embedding=tf.keras.layers.Embedding(
input_shape=vocab_size,
output_dim=embedding_dim
)
self.gru=gru(self.dec_units)
self.fc=tf.keras.layers.Dense(units=vocab_size)
# 用于计算 score,即:注意力权重系数
self.W1=tf.keras.layers.Dense(self.dec_units)
self.W2=tf.keras.layers.Dense(self.dec_units)
self.V=tf.keras.layers.Dense(units=1)
def __call__(self,x,hidden,ec_output):
# tf.expand_dims: 在指定索引出增加一维度,值为 1,从索引 0 开始
# axis: 取值范围是 [- 阶数,阶数],二维的时候 0 指的是列,1 指的是行,
# 更高维度的时候,数值是由外向里增加,如:3 维向量,外向内依次是:0,1,2
# 通过计算 score 公式可得,需要将 hidden 维度扩展至:[batch_size,1,hidden_size]
hidden_with_time_axis=tf.expand_dims(hidden,axis=1)
# score=[batch_size, max_length, 1]
score=self.V(tf.nn.tanh(self.W1(ec_output)+self.W2(hidden_with_time_axis)))
# 数值归一化和为 1 的概率分布值
attention_weight=tf.nn.softmax(score,axis=1)
context_vetor=attention_weight*ec_output
# 求和平均
context_vetor=tf.reduce_sum(context_vetor,axis=1)
X=self.embedding(x)
# 合并解码器 embedding 输出和 context vector
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
# output shape=(batch_size,time_step,hidden_size)
# state shape=(batch_size,hidden_size)
output,state=self.gru(x)
# output[batch_size*1,hidden_size]
output=tf.reshape(output,shape=(-1,output.shape[2]))
x-self.fc(output)
return x,state,attention_weight
def initilize_hidden_size(self):
return tf.zeros((self.batch_sz,self.dec_units))
实例化模型:
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)
损失函数,优化器:
optimizer = tf.train.AdamOptimizer()
def loss_function(real, pred):
mask = 1 – np.equal(real, 0)
loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask
return tf.reduce_mean(loss_)
模型保存:
checkpoint_dir = ‘./training_checkpoints’
checkpoint_prefix = os.path.join(checkpoint_dir, “ckpt”)
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
encoder=encoder,
decoder=decoder)
训练
由于我们使用 Teacher Forcing 进行训练,所以我们简单介绍下。
如图所示 Teacher Forcing 与 Free-running 不同,在训练过程中不再是前一时刻的 hidden-state 作为当前输入,而是在 Ground Truth 中找到对应的上一项作为当前输入。早期的 RNN 很弱,如果生成了非常差的结果 Free-running 的运行方式会导致后面的 hidden-state 都受到影响。Teacher Forcing 运行方式就可以避免这种问题,缺点也很明显它严重依赖标签数据。
# 迭代 10 次训练集
EPOCHS = 10
for epoch in range(EPOCHS):
start = time.time()
hidden = encoder.initialize_hidden_state()
total_loss = 0
for (batch, (inp, targ)) in enumerate(dataset):
loss = 0
# 先记录梯度
with tf.GradientTape() as tape:
# 编码器输出
enc_output, enc_hidden = encoder(inp, hidden)
dec_hidden = enc_hidden
dec_input = tf.expand_dims([targ_lang.word2idx[‘<start>’]] * BATCH_SIZE, 1)
# 使用 Teacher forcing 运行方式
for t in range(1, targ.shape[1]):
# 解码器输出
predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
loss += loss_function(targ[:, t], predictions)
# 样本标签作为输入
dec_input = tf.expand_dims(targ[:, t], 1)
batch_loss = (loss / int(targ.shape[1]))
# one_loss++;batch_loss++
total_loss += batch_loss
variables = encoder.variables + decoder.variables
gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
if batch % 100 == 0:
print(‘Epoch {} Batch {} Loss {:.4f}’.format(epoch + 1,
batch,
batch_loss.numpy()))
# 每迭代 2 次训练集保存一次模型
if (epoch + 1) % 2 == 0:
checkpoint.save(file_prefix = checkpoint_prefix)
翻译
评估函数我们不使用 teacher-forcing 模式,解码器的每步输入是它前一时刻的 hidden-state 和编码器输出,当模型遇到 <end> 标记停止运行。
# 和训练模型函数代码基本一致
def evaluate(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ):
attention_plot = np.zeros((max_length_targ, max_length_inp))
# 数据预处理
sentence = preprocess_sentence(sentence)
# 向量化表示输入数据
inputs = [inp_lang.word2idx[i] for i in sentence.split(‘ ‘)]
# 后置补长
inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding=’post’)
inputs = tf.convert_to_tensor(inputs)
result = ”
hidden = [tf.zeros((1, units))]
enc_out, enc_hidden = encoder(inputs, hidden)
dec_hidden = enc_hidden
# 维度扩展 batch_size
dec_input = tf.expand_dims([targ_lang.word2idx[‘<start>’]], 0)
for t in range(max_length_targ):
predictions, dec_hidden, attention_weights = decoder(dec_input, dec_hidden, enc_out)
# 保存权重用于稍后可视化展示
attention_weights = tf.reshape(attention_weights, (-1,))
attention_plot[t] = attention_weights.numpy()
predicted_id = tf.argmax(predictions[0]).numpy()
# 获取文本翻译结果
result += targ_lang.idx2word[predicted_id] + ‘ ‘
# 预设的结束标记
if targ_lang.idx2word[predicted_id] == ‘<end>’:
return result, sentence, attention_plot
# 预测值作为输入,以此输出下一时刻单词
dec_input = tf.expand_dims([predicted_id], 0)
return result, sentence, attention_plot
可视化权重值:
fig = plt.figure(figsize=(10,10))
ax = fig.add_subplot(1, 1, 1)
ax.matshow(attention, cmap=’viridis’)
fontdict = {‘fontsize’: 14}
ax.set_xticklabels([”] + sentence, fontdict=fontdict, rotation=90)
ax.set_yticklabels([”] + predicted_sentence, fontdict=fontdict)
plt.show()
总结
本篇文章篇幅较多,不过项目的重点是 Attention 思想的理解,Self Attention 模型具有更长的感受野,更容易捕获长距离的相互依赖特征,目前 Google 机器翻译模型就大量使用到 Self Attention。Attention 模型目前在机器翻译,图片描述任务,语音识别都有大量应用,熟练使用 Attention 对于解决实际问题会有很大的帮助。
文章部分内容参考 Yash Katariya 和 张俊林,在此表示感谢。