“””
orginal from:
https://github.com/graykode/n…
“””
import math
import re
from random import *
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
数据预处理
def make_batch():
batch = [] # list
positive = negative = 0 # 计数器 为了记录 NSP 工作中的正样本和负样本的个数,比例最好是在一个 batch 中靠近 1:1
while positive != batch_size/2 or negative != batch_size/2:
# 抽出来两句话 先随机 sample 两个 index 再通过 index 找出样本
tokens_a_index, tokens_b_index= randrange(len(sentences)), randrange(len(sentences)) # 比方 tokens_a_index=3,tokens_b_index=1;从整个样本中抽取对应的样本;tokens_a, tokens_b= token_list[tokens_a_index], token_list[tokens_b_index]## 依据索引获取对应样本:tokens_a=[5, 23, 26, 20, 9, 13, 18] tokens_b=[27, 11, 23, 8, 17, 28, 12, 22, 16, 25]
# 拼接
input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']] ## 加上特殊符号,CLS 符号是 1,sep 符号是 2:[1, 5, 23, 26, 20, 9, 13, 18, 2, 27, 11, 23, 8, 17, 28, 12, 22, 16, 25, 2]
segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (len(tokens_b) + 1)## 宰割句子符号:[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# MASK LM
n_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15)))) # n_pred=3;整个句子的 15% 的字符能够被 mask 掉,这里取和 max_pred 中的最小值,确保每次计算损失的时候没有那么多字符以及信息短缺,有 15% 做管制就够了;其实能够不必加这个,单个句子少了,就要加上足够的训练样本
# 不让特殊字符参加 mask
cand_maked_pos = [i for i, token in enumerate(input_ids)
if token != word_dict['[CLS]'] and token != word_dict['[SEP]']] ## cand_maked_pos=[1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18];整个句子 input_ids 中能够被 mask 的符号必须是非 cls 和 sep 符号的,要不然没意义
shuffle(cand_maked_pos)## 打乱程序:cand_maked_pos=[6, 5, 17, 3, 1, 13, 16, 10, 12, 2, 9, 7, 11, 18, 4, 14, 15] 其实取 mask 对应的地位有很多办法,这里只是一种应用 shuffle 的形式
masked_tokens, masked_pos = [], []
for pos in cand_maked_pos[:n_pred]: # 取其中的三个;masked_pos=[6, 5, 17] 留神这里对应的是 position 信息;masked_tokens=[13, 9, 16] 留神这里是被 mask 的元素之前对应的原始单字数字;masked_pos.append(pos)
masked_tokens.append(input_ids[pos]) # 回到 ppt 看一下
if random() < 0.8: # 80%
input_ids[pos] = word_dict['[MASK]'] # make mask
elif random() < 0.5: # 10%
index = randint(0, vocab_size - 1) # random index in vocabulary
input_ids[pos] = word_dict[number_dict[index]] # replace
# Zero Paddings
n_pad = maxlen - len(input_ids)##maxlen=30;n_pad=10
input_ids.extend([0] * n_pad)
segment_ids.extend([0] * n_pad)# 这里有一个问题,0 和之前的重了
# Zero Padding (100% - 15%) tokens 是为了计算一个 batch 中句子的 mlm 损失的时候能够组成一个无效矩阵放进去;不然第一个句子预测 5 个字符,第二句子预测 7 个字符,第三个句子预测 8 个字符,组不成一个无效的矩阵;## 这里十分重要,为什么是对 masked_tokens 是补零,而不是补其余的字符????我补 1 可不可以??if max_pred > n_pred:
n_pad = max_pred - n_pred
masked_tokens.extend([0] * n_pad)## masked_tokens= [13, 9, 16, 0, 0] masked_tokens 对应的是被 mask 的元素的原始实在标签是啥,也就是 groundtruth
masked_pos.extend([0] * n_pad)## masked_pos= [6, 5, 17,0,0] masked_pos 是记录哪些地位被 mask 了
if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True]) # IsNext
positive += 1
elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False]) # NotNext
negative += 1
return batch
符号矩阵
def get_attn_pad_mask(seq_q, seq_k): # 在自注意力层 q k 是统一的
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# eq(0) 示意和 0 相等的返回 True,不相等返回 False。pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is masking
return pad_attn_mask.expand(batch_size, len_q, len_k) # 反复了 len_q 次 batch_size x len_q x len_k 不懂能够看一下例子
def gelu(x):
"Implementation of the gelu activation function by Hugging Face"
return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))
Embedding 层
class Embedding(nn.Module):
def __init__(self):
super(Embedding, self).__init__()
self.tok_embed = nn.Embedding(vocab_size, d_model) # token embedding
self.pos_embed = nn.Embedding(maxlen, d_model) # position embedding
self.seg_embed = nn.Embedding(n_segments, d_model) # segment(token type) embedding
self.norm = nn.LayerNorm(d_model)
def forward(self, input_ids, segment_ids):# x 对应 input_ids, seg 对应 segment_ids
seq_len = input_ids.size(1)
pos = torch.arange(seq_len, dtype=torch.long)
pos = pos.unsqueeze(0).expand_as(input_ids) # (seq_len,) -> (batch_size, seq_len)
embedding = self.tok_embed(input_ids) + self.pos_embed(pos) + self.seg_embed(segment_ids)
return self.norm(embedding)
注意力打分函数
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_pad):
## 输出进来的维度别离是 [batch_size x n_heads x len_q x d_k] K:[batch_size x n_heads x len_k x d_k] V: [batch_size x n_heads x len_k x d_v]
## 首先通过 matmul 函数失去的 scores 形态是 : [batch_size x n_heads x len_q x len_k]
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
## 而后关键词中央来了,上面这个就是用到了咱们之前重点讲的 attn_pad,把被 pad 的中央置为无限小,softmax 之后根本就是 0,对 q 的单词不起作用
scores.masked_fill_(attn_pad, -1e9) # Fills elements of self tensor with value where mask is one.
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V)
return context, attn
多头注意力机制
class MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
## 输出进来的 QKV 是相等的,应用映射 linear 做一个映射失去参数矩阵 Wq, Wk,Wv
self.W_Q = nn.Linear(d_model, d_k * n_heads)
self.W_K = nn.Linear(d_model, d_k * n_heads)
self.W_V = nn.Linear(d_model, d_v * n_heads)
def forward(self, Q, K, V, attn_pad):
## 这个多头分为这几个步骤,首先映射分头,而后计算 atten_scores,而后计算 atten_value;
## 输出进来的数据形态:Q: [batch_size x len_q x d_model], K: [batch_size x len_k x d_model], V: [batch_size x len_k x d_model]
# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
residual, batch_size = Q, Q.size(0)
# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)
## 上面这个就是先映射,后分头;肯定要留神的是 q 和 k 分头之后维度是统一额,所以这里都是 dk
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]
k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]
v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]
## 输出进行的 attn_pad 形态是 batch_size x len_q x len_k,而后通过上面这个代码失去 新的 attn_pad : [batch_size x n_heads x len_q x len_k],就是把 pad 信息反复了 n 个头上
attn_pad = attn_pad.unsqueeze(1).repeat(1, n_heads, 1, 1) # repeat 对张量反复裁减
# context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_pad)
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]
output = nn.Linear(n_heads * d_v, d_model)(context)
return nn.LayerNorm(d_model)(output + residual), attn # output: [batch_size x len_q x d_model]
基于地位的前馈神经网络
class PoswiseFeedForwardNet(nn.Module):
def __init__(self): # 对每个字的加强语义向量再做两次线性变换,以加强整个模型的表达能力。super(PoswiseFeedForwardNet, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
def forward(self, x):
# (batch_size, len_seq, d_model) -> (batch_size, len_seq, d_ff) -> (batch_size, len_seq, d_model)
return self.fc2(gelu(self.fc1(x)))
Encoder
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_pad):
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_pad) # enc_inputs to same Q,K,V enc_self_attn_mask 是 pad 符号矩阵
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]
return enc_outputs, attn
1. BERT 模型整体架构
class BERT(nn.Module):
def __init__(self):
super(BERT, self).__init__()
self.embedding = Embedding() ## 词向量层,构建词表矩阵
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) ## 把 N 个 encoder 重叠起来,具体 encoder 实现一会看
self.fc = nn.Linear(d_model, d_model) ## 前馈神经网络 -cls
self.activ1 = nn.Tanh() ## 激活函数 -cls
self.linear = nn.Linear(d_model, d_model)#-mlm
self.activ2 = gelu ## 激活函数 --mlm
self.norm = nn.LayerNorm(d_model)
self.classifier = nn.Linear(d_model, 2)## cls 这是一个分类层,维度是从 d_model 到 2,[利率期货](https://www.gendan5.com/ff/if.html) 对应咱们架构图中就是这种:# decoder is shared with embedding layer
embed_weight = self.embedding.tok_embed.weight
n_vocab, n_dim = embed_weight.size()
self.decoder = nn.Linear(n_dim, n_vocab, bias=False)
self.decoder.weight = embed_weight
self.decoder_bias = nn.Parameter(torch.zeros(n_vocab))
def forward(self, input_ids, segment_ids, masked_pos):
input = self.embedding(input_ids, segment_ids) # 将 input_ids,segment_ids,pos_embed 加和
##get_attn_pad_mask 是为了失去句子中 pad 的地位信息,给到模型前面,在计算自注意力和交互注意力的时候去掉 pad 符号的影响,去看一下这个函数 4.
enc_self_attn_pad = get_attn_pad_mask(input_ids, input_ids)
for layer in self.layers:
output, enc_self_attn = layer(input, enc_self_attn_pad) ## enc_self_attn 这里是 QK 转置相乘之后 softmax 之后的矩阵值,代表的是每个单词和其余单词相关性;# output : [batch_size, len, d_model], attn : [batch_size, n_heads, d_mode, d_model]
h_pooled = self.activ1(self.fc(output[:, 0])) # [batch_size, d_model] cls 对应的地位 能够看一下例子
logits_clsf = self.classifier(h_pooled) # [batch_size, 2]
masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1)) # [batch_size, max_pred, d_model] 其中一个 masked_pos= [6, 5, 17,0,0]
# get masked position from final output of transformer.
h_masked = torch.gather(output, 1, masked_pos) #在 output 取出一维对应 masked_pos 数据 masking position [batch_size, max_pred, d_model]
h_masked = self.norm(self.activ2(self.linear(h_masked)))
logits_lm = self.decoder(h_masked) + self.decoder_bias # [batch_size, max_pred, n_vocab]
return logits_lm, logits_clsf
1. 从整体到部分
2. 数据流动形态(输出 输入)
if name == ‘__main__’:
# BERT Parameters
maxlen = 30 # 句子的最大长度
batch_size = 6 # 每一组有多少个句子一起送进去模型
max_pred = 5 # max tokens of prediction
n_layers = 6 # number of Encoder of Encoder Layer
n_heads = 12 # number of heads in Multi-Head Attention
d_model = 768 # Embedding Size
d_ff = 3072 # 4*d_model, FeedForward dimension
d_k = d_v = 64 # dimension of K(=Q), V
n_segments = 2
text = (
'Hello, how are you? I am Romeo.\n'
'Hello, Romeo My name is Juliet. Nice to meet you.\n'
'Nice meet you too. How are you today?\n'
'Great. My baseball team won the competition.\n'
'Oh Congratulations, Juliet\n'
'Thanks you Romeo'
)
sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') # filter'.',',','?','!'word_list = list(set(" ".join(sentences).split()))
word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}
for i, w in enumerate(word_list):
word_dict[w] = i + 4
number_dict = {i: w for i, w in enumerate(word_dict)}
vocab_size = len(word_dict)
# 把文本转化成数字
token_list = list()
for sentence in sentences:
arr = [word_dict[s] for s in sentence.split()]
token_list.append(arr)
batch = make_batch() # 最重要的一部分 预训练任务的数据构建局部
input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))# map 把函数顺次作用在 list 中的每一个元素上,失去一个新的 list 并返回。留神,map 不扭转原 list,而是返回一个新 list。model = BERT()
criterion = nn.CrossEntropyLoss(ignore_index=0) # 只计算 mask 地位的损失
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
optimizer.zero_grad()
# logits_lm 语言词表的输入
# logits_clsf 二分类的输入
# logits_lm:[batch_size, max_pred, n_vocab]
logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)## logits_lm【6,5,29】bs*max_pred*voca logits_clsf:[6*2]
loss_lm = criterion(logits_lm.transpose(1, 2), masked_tokens) # for masked LM ;masked_tokens [6,5]
loss_lm = (loss_lm.float()).mean()
loss_clsf = criterion(logits_clsf, isNext) # for sentence classification
loss = loss_lm + loss_clsf
if (epoch + 1) % 10 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()