本文主要從代碼的角度一步步來講解一下BERT模型是如何實作的,(后附完整代碼)
關于BERT的理論決議現在隨便一搜就可以找到很多,在這里就不在贅述,
BERT原始碼
- 模型架構
- 模型實作
- 定義超參
- 文本預處理
- make_batch()
- model
- Embedding
- get_attn_pad_mask
- layers
- MultiHeadAttention
- ScaledDotProductAttention
- PoswiseFeedForwardNet
- Loss
- 總結:
- 完整原始碼
模型架構
首先先說一下大佬傳授的技巧 😃
寫模型代碼要注意兩點:
1.從整體到區域;
2.資料流動形狀;
怎么理解呢?對于一個模型的搭建我們不可能一蹴而就,要先從整體入手,先把大框搭起來,然后在實作每個函式具體的功能,
對于“資料流動形狀”,要著重關注這個函式輸入輸出,比如經過Embedding層,原資料肯定會多一個維度,增加的這個維度后續我們會怎么處理,
先大概說一下我們這個模型要干什么事,先對bert這個模型有個大概一下印象,

- 對文本處理得到原始標簽
- 對原始標簽做mask
- 做Embedding
- 送入bert
- 對輸出部分做處理
a. 第一個字符 < cls>對應的輸出接一個linear層做一個二分類即NSP任務;
b.mask對應位置的輸出接一個解碼層 將768維的Embedding映射成詞表大小,然后與真實標簽做loss,
再來看一下4.中的bert都干了些什么?
首先將輸入文本做一個Embedding(①),然后送入多頭注意力機制中(②),輸出接一個Layer Normalization和殘差連接(③)最后送入兩個linear層中(④)
這就是上圖中一個Encoder做的事情,

到這大家對我們要干什么腦海里應該有了一個模糊的框架,
接下來就是代碼部分,完全按照上面描繪的走~
下面代碼的一個整體框架,其中make_batch ,model部分是重點
而model中的layers又是重重之中

模型實作
我們先從整體 即“main”入手:
定義超參
if __name__ == '__main__':
# BERT Parameters
maxlen = 30 # 句子的最大長度
batch_size = 6 # 每一組有多少個句子一起送進去模型
max_pred = 5 # max tokens of prediction
n_layers = 6 # number of Encoder of Encoder Layer
n_heads = 12 # number of heads in Multi-Head Attention
d_model = 768 # Embedding Size
d_ff = 3072 # 4*d_model, FeedForward dimension
d_k = d_v = 64 # dimension of K(=Q), V
n_segments = 2 # 用于NSP任務
首先是定義一些超參,具體作用均已標注,
其中需要注意的一個引數是:max_pred 它表示的是一個句子中最多可以有多少個mask,怎么用后面我們會談到(一個坑,在make_batch()部分會講到)
文本預處理
因為我們注重的是模型的實作,所以資料部分就自己定義了一些對話陳述句,在實際應用中往往會是海量的文本,
text = (
'Hello, how are you? I am Romeo.\n'
'Hello, Romeo My name is Juliet. Nice to meet you.\n'
'Nice meet you too. How are you today?\n'
'Great. My baseball team won the competition.\n'
'Oh Congratulations, Juliet\n'
'Thanks you Romeo'
)
資料有了,接下來就是對資料進行處理
sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') # filter '.', ',', '?', '!'
通過re.sub函式將資料中的特殊字符清除掉 并將大寫字符全部轉變為小寫字符
效果:
word_list = list(set(" ".join(sentences).split()))
word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}
for i, w in enumerate(word_list):
word_dict[w] = i + 4
number_dict = {i: w for i, w in enumerate(word_dict)}
vocab_size = len(word_dict)
word_list:根據“ ”空格切分后的單詞串列
word_dict: 加入特殊字符生成的詞典
number_dict:將word_dict的鍵值對調換(預測時候會用到)
效果:

# 把文本轉化成數字
token_list = list()
for sentence in sentences:
arr = [word_dict[s] for s in sentence.split()]
token_list.append(arr)
根據剛剛生成好的word_dict 將sentences 中的字符轉換成數字 方便后面處理
效果:

文本預處理完~~~~~~
繼續往下看就是最重要的資料構建部分了
make_batch()
batch = make_batch() # 最重要的一部分 預訓練任務的資料構建部分
input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))# map把函式依次作用在list中的每一個元素上,得到一個新的list并回傳,注意,map不改變原list,而是回傳一個新list,
通過 make_batch() 對資料進行處理,
得到 input_ids, segment_ids, masked_tokens, masked_pos, isNext
我們跳到make_batch()函式部分看一下這幾個代表的是什么,和它具體是怎么處理的,
def make_batch():
batch = [] # list
positive = negative = 0 # 計數器 為了記錄NSP任務中的正樣本和負樣本的個數,比例最好是在一個batch中接近1:1
while positive != batch_size/2 or negative != batch_size/2:
# 抽出來兩句話 先隨機sample兩個index 再通過index找出樣本
tokens_a_index, tokens_b_index= randrange(len(sentences)), randrange(len(sentences)) # 比如tokens_a_index=3,tokens_b_index=1;從整個樣本中抽取對應的樣本;
tokens_a, tokens_b= token_list[tokens_a_index], token_list[tokens_b_index]## 根據索引獲取對應樣本:tokens_a=[5, 23, 26, 20, 9, 13, 18] tokens_b=[27, 11, 23, 8, 17, 28, 12, 22, 16, 25]
# 拼接
input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']] ## 加上特殊符號,CLS符號是1,sep符號是2:[1, 5, 23, 26, 20, 9, 13, 18, 2, 27, 11, 23, 8, 17, 28, 12, 22, 16, 25, 2]
segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (len(tokens_b) + 1)##分割句子符號:[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
實作的效果:

input_ids 是下圖中的Token Embeddings
segment_ids就是下圖中的Segment Embeddings
接下來是要對剛剛拼接好的input_ids進行mask處理:
# MASK LM
n_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15)))) # n_pred=3;整個句子的15%的字符可以被mask掉,這里取和max_pred中的最小值,確保每次計算損失的時候沒有那么多字符以及資訊充足,有15%做控制就夠了;其實可以不用加這個,單個句子少了,就要加上足夠的訓練樣本
# 不讓特殊字符參與mask
cand_maked_pos = [i for i, token in enumerate(input_ids)
if token != word_dict['[CLS]'] and token != word_dict['[SEP]']] ## cand_maked_pos=[1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18];整個句子input_ids中可以被mask的符號必須是非cls和sep符號的,要不然沒意義
shuffle(cand_maked_pos)## 打亂順序:cand_maked_pos=[6, 5, 17, 3, 1, 13, 16, 10, 12, 2, 9, 7, 11, 18, 4, 14, 15] 其實取mask對應的位置有很多方法,這里只是一種使用shuffle的方式
在這部分就要用到我們最開始提到的那個值得注意的超參:max_pred(填坑)
為什么需要max_pred?
比如在mask時候,一個句子被mask了3個單詞,另一個句子被mask了7個單詞,
很難把這兩個句子組成一個有效的矩陣,我們之前做了一個最大長度的截斷,這max_pred也相當一個截斷引數,
cand_maked_pos的作用是去掉特殊字符< CLS > < SEP>,整個句子input_ids中可以被mask的符號必須是非cls和sep符號的,要不然沒意義
masked_tokens, masked_pos = [], []
for pos in cand_maked_pos[:n_pred]: # 取其中的三個;masked_pos=[6, 5, 17] 注意這里對應的是position資訊;masked_tokens=[13, 9, 16] 注意這里是被mask的元素之前對應的原始單字數字;
masked_pos.append(pos)
masked_tokens.append(input_ids[pos])
if random() < 0.8: # 80%
input_ids[pos] = word_dict['[MASK]'] # make mask
elif random() < 0.5: # 10%
index = randint(0, vocab_size - 1) # random index in vocabulary
input_ids[pos] = word_dict[number_dict[index]] # replace
masked_tokens對應的是被mask元素之前的原始的單字數字,
masked_pos 對應的是position資訊
然后對其按照8/1/1比例mask
想要實作的效果:

接下來是補零操作:
# Zero Paddings
n_pad = maxlen - len(input_ids)##maxlen=30;n_pad=10
input_ids.extend([0] * n_pad)
segment_ids.extend([0] * n_pad)# 這里有一個問題,0和之前的重了
# Zero Padding (100% - 15%) tokens 是為了計算一個batch中句子的mlm損失的時候可以組成一個有效矩陣放進去;不然第一個句子預測5個字符,第二句子預測7個字符,第三個句子預測8個字符,組不成一個有效的矩陣;
## 這里非常重要,為什么是對masked_tokens是補零,而不是補其他的字符?
## 我補1可不可以? 后面會講到
if max_pred > n_pred:
n_pad = max_pred - n_pred
masked_tokens.extend([0] * n_pad)## masked_tokens= [13, 9, 16, 0, 0] masked_tokens 對應的是被mask的元素的原始真實標簽是啥,也就是groundtruth
masked_pos.extend([0] * n_pad)## masked_pos= [6, 5, 17,0,0] masked_pos是記錄哪些位置被mask了
if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True]) # IsNext
positive += 1
elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False]) # NotNext
negative += 1
return batch
為什么要補零呢?
是為了計算一個batch中句子的mlm損失的時候可以組成一個有效矩陣放進去;不然第一個句子預測5個字符,第二句子預測7個字符,第三個句子預測8個字符,組不成一個有效的矩陣;
還有一個點,為什么補的是零,而不是其他值?
在后面的loss部分會給出解釋,
通過 batch.append添加的欄位就是我們要得到 input_ids, segment_ids, masked_tokens, masked_pos, isNext
input_ids 是bert輸入的Token Embeddings
segment_idsbert輸入的Segment Embeddings
masked_tokens對應的是被mask元素之前的原始的單字數字,
masked_pos 對應的是position資訊
isNext 代表這兩個句子是否是相鄰的背景關系
make_batch()完~~~~~~
回到main()繼續往下看
定義模型,損失函式 和 優化策略
model = BERT()
criterion = nn.CrossEntropyLoss(ignore_index=0) # 只計算mask位置的損失
optimizer = optim.Adam(model.parameters(), lr=0.001)
這里有一個細節是nn.CrossEntropyLoss(ignore_index=0) 我們定義了一個ignore_index為0,

我們可以看到loss 中給出的解釋的我們可以指定一個值,這個值不參與計算,也就是說我們后面在計算loss的時候,0不參與計算,即我們對masked_tokens補零后不影響結果,
接下來是main()中的呼叫部分
for epoch in range(100):
optimizer.zero_grad()
# logits_lm 語言詞表的輸出
# logits_clsf 二分類的輸出
# logits_lm:[batch_size, max_pred, n_vocab]
logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)## logits_lm 【6,5,29】 bs*max_pred*voca logits_clsf:[6*2]
model
我們傳給model 三個引數input_ids, segment_ids, masked_pos,分別為
input_ids :bert輸入的Token Embeddings
segment_ids :bert輸入的Segment Embeddings
masked_pos :對應的是選中那15%的position資訊
我們剛剛在make_baatch已經說的很清楚了,還是不太明白的可以在回去看一下~
我們來看一下model = BERT()的詳細處理程序:
首先是一些定義
class BERT(nn.Module):
def __init__(self):
super(BERT, self).__init__()
self.embedding = Embedding() ## 詞向量層,構建詞表矩陣
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) ## 把N個encoder堆疊起來,具體encoder實作一會看
self.fc = nn.Linear(d_model, d_model) ## 前饋神經網路-cls
self.activ1 = nn.Tanh() ## 激活函式-cls
self.linear = nn.Linear(d_model, d_model)#-mlm
self.activ2 = gelu ## 激活函式--mlm
self.norm = nn.LayerNorm(d_model)
self.classifier = nn.Linear(d_model, 2)## cls 這是一個分類層,維度是從d_model到2,對應我們架構圖中就是這種:
# decoder is shared with embedding layer
# 注意這部分的decoder不是transformer中的decoder 而是將‘mlm任務’輸出解碼到詞表大小的一個映射
embed_weight = self.embedding.tok_embed.weight
n_vocab, n_dim = embed_weight.size()
self.decoder = nn.Linear(n_dim, n_vocab, bias=False)
self.decoder.weight = embed_weight
self.decoder_bias = nn.Parameter(torch.zeros(n_vocab))
其中值得關注的是self.embedding,self.layers 這也是我們要重點講的
下面是BERT的實作部分
def forward(self, input_ids, segment_ids, masked_pos):
# 將input_ids,segment_ids,pos_embed加和得到input
input = self.embedding(input_ids, segment_ids)
我們將input_ids, segment_ids傳給Embedding,那Embedding會進行什么操作呢?
Embedding
class Embedding(nn.Module):
def __init__(self):
super(Embedding, self).__init__()
self.tok_embed = nn.Embedding(vocab_size, d_model) # token embedding
self.pos_embed = nn.Embedding(maxlen, d_model) # position embedding
self.seg_embed = nn.Embedding(n_segments, d_model) # segment(token type) embedding
# self.norm = nn.LayerNorm(d_model)
def forward(self, input_ids, segment_ids)
seq_len = input_ids.size(1) # input_ids:batch_size x len x d_model
pos = torch.arange(seq_len, dtype=torch.long) #生成陣列
pos = pos.unsqueeze(0).expand_as(input_ids) # (seq_len,) -> (batch_size, seq_len)
embedding = self.tok_embed(input_ids) + self.pos_embed(pos) + self.seg_embed(segment_ids)
return embedding
首先是在定義部分( init )定義了三個映射規則
然后在實作部分( forward )通過arange生成一個與input_ids 維度一致的陣列(已填充好的),然后將傳入的input_ids, segment_ids和生成的pos 相加得到bert的最終輸入,即下圖中的input,
arange函式的效果:

Embedding完~~~~~~
回到model部分我們繼續往下看
是一個get_attn_pad_mask函式,它的作用是為了得到句子中pad的位置資訊,給到模型后面,在計算自注意力和互動注意力的時候去掉pad符號的影響,
##get_attn_pad_mask是為了得到句子中pad的位置資訊,給到模型后面,在計算自注意力和互動注意力的時候去掉pad符號的影響
enc_self_attn_pad = get_attn_pad_mask(input_ids, input_ids)
下面我們來具體看一下這個函式
get_attn_pad_mask
def get_attn_pad_mask(seq_q, seq_k): # 在自注意力層q k是一致的
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# eq(0)表示和0相等的回傳True,不相等回傳False,
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is masking
return pad_attn_mask.expand(batch_size, len_q, len_k) # 重復了len_q次 batch_size x len_q x len_k 不懂可以看一下例子
內容不多也很好懂,seq_k.data.eq(0)是將input_ids中=0的置為True 其他置為False,.unsqueeze(1)的作用是增加一維,然后通過.expand函式重復 len_q次 ,最侄訓return我們想要的 符號矩陣,
.expand函式:

get_attn_pad_mask完~~~~~~
回到model部分我們繼續往下看
layers
for layer in self.layers:
output, enc_self_attn = layer(input, enc_self_attn_pad) ## enc_self_attn這里是QK轉置相乘之后softmax之后的矩陣值,代表的是每個單詞和其他單詞相關性;
# output : [batch_size, len, d_model], attn : [batch_size, n_heads, d_mode, d_model]
對于layers是我們在最開始就提到它是整個模型的重中之中,因為我們要在layer部分實作最重要的多頭注意力機制和 pos_ffn

class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_pad):
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_pad) # enc_inputs to same Q,K,V enc_self_attn_mask是pad符號矩陣
對于多頭注意力機制我們的輸入有四個enc_inputs, enc_inputs, enc_inputs, enc_self_attn_pad 分別代表Q K V 和我們之前求出的符號矩陣,
MultiHeadAttention
lass MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
## 輸入進來的QKV是相等的,使用映射linear做一個映射得到引數矩陣Wq, Wk,Wv
self.W_Q = nn.Linear(d_model, d_k * n_heads)
self.W_K = nn.Linear(d_model, d_k * n_heads)
self.W_V = nn.Linear(d_model, d_v * n_heads)
def forward(self, Q, K, V, attn_pad):
## 這個多頭分為這幾個步驟,首先映射分頭,然后計算atten_scores,然后計算atten_value;
## 輸入進來的資料形狀: Q: [batch_size x len_q x d_model], K: [batch_size x len_k x d_model], V: [batch_size x len_k x d_model]
# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
residual, batch_size = Q, Q.size(0)
# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)
##下面這個就是先映射,后分頭;一定要注意的是q和k分頭之后維度是一致額,所以這里都是dk
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]
k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]
v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]
## 輸入進行的attn_pad形狀是 batch_size x len_q x len_k,然后經過下面這個代碼得到 新的attn_pad : [batch_size x n_heads x len_q x len_k],就是把pad資訊重復了n個頭上
attn_pad = attn_pad.unsqueeze(1).repeat(1, n_heads, 1, 1) # repeat 對張量重復擴充
首先使用映射linear做一個映射得到引數矩陣Wq, Wk,Wv 注意這里的引數矩陣的維度是d_k * n_heads 是‘多頭’之后的
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)
的意思是先通過映射得到引數矩陣,在通過.view將其“分頭”并調整維度順序
.view函式效果:

然后通過打分函式得到注意力矩陣context, 注意力分數attn(沒乘V之前的矩陣)
# context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_pad)
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]
output = nn.Linear(n_heads * d_v, d_model)(context)
return nn.LayerNorm(d_model)(output + residual), attn # output: [batch_size x len_q x d_model]
ScaledDotProductAttention部分要實作的就是點積注意力計算公式:
ScaledDotProductAttention
點積注意力計算公式:

class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_pad):
## 輸入進來的維度分別是 [batch_size x n_heads x len_q x d_k] K: [batch_size x n_heads x len_k x d_k] V: [batch_size x n_heads x len_k x d_v]
##首先經過matmul函式得到的scores形狀是 : [batch_size x n_heads x len_q x len_k]
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
## 然后關鍵詞地方來了,下面這個就是用到了我們之前重點講的attn_pad,把被pad的地方置為無限小,softmax之后基本就是0,對q的單詞不起作用
scores.masked_fill_(attn_pad, -1e9) # Fills elements of self tensor with value where mask is one.
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V)
return context, attn
其中一個值的注意的點是我們通過scores.masked_fill_(attn_pad, -1e9) 將符號矩陣對應的位置 置為無窮小,這樣經過softmax后它就不會對q的單詞起作用,也就實作了我們想要的去除掉pad對其他單詞影響的效果,
MultiHeadAttention完~~~~~~
回到layers我們繼續看
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]
return enc_outputs, attn
將剛剛通過多頭注意力機制得到的enc_outputs 送入PoswiseFeedForwardNet ,得到layers最終的結果
PoswiseFeedForwardNet
class PoswiseFeedForwardNet(nn.Module):
def __init__(self): # 對每個字的增強語意向量再做兩次線性變換,以增強整個模型的表達能力,
super(PoswiseFeedForwardNet, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
def forward(self, x):
# (batch_size, len_seq, d_model) -> (batch_size, len_seq, d_ff) -> (batch_size, len_seq, d_model)
return self.fc2(gelu(self.fc1(x)))
這部分就是一個兩層的linear層,沒什么好說的,對每個字的增強語意向量再做兩次線性變換,以增強整個模型的表達能力,這里,變換后的向量與原向量保持長度相同,
pos_ffn完~~~~~~
layers 完~~~~~~
回到model部分我們繼續往下看
h_pooled = self.activ1(self.fc(output[:, 0])) # [batch_size, d_model] cls 對應的位置 可以看一下例子
logits_clsf = self.classifier(h_pooled) # [batch_size, 2]
masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1)) # [batch_size, max_pred, d_model] 其中一個 masked_pos= [6, 5, 17,0,0]
# get masked position from final output of transformer.
h_masked = torch.gather(output, 1, masked_pos) #在output取出一維對應masked_pos資料 masking position [batch_size, max_pred, d_model]
h_masked = self.norm(self.activ2(self.linear(h_masked)))
logits_lm = self.decoder(h_masked) + self.decoder_bias # [batch_size, max_pred, n_vocab]
return logits_lm, logits_clsf
通過output[:, 0]切片的方式得到cls對應位置的資訊 將其送入一個linear層得到h_pooled 再將其送入一個二分類的linear 得到nsp任務的結果:logits_clsf
通過torch.gather 在output中取出一維masked_pos 對應的資料 h_masked 再將其送入linear層 并解碼(decoder)得到mlm任務的結果: logits_lm
model 完~~~~~~
model的回傳值
logits_lm, 代表 mask對應位置的輸出
logits_clsf,為nsp任務的輸出,
回到main()部分我們繼續往下看
Loss
loss_lm = criterion(logits_lm.transpose(1, 2), masked_tokens) # for masked LM ;masked_tokens [6,5]
loss_lm = (loss_lm.float()).mean()
loss_clsf = criterion(logits_clsf, isNext) # for sentence classification
loss = loss_lm + loss_clsf
if (epoch + 1) % 10 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
BERT 的損失函式由兩部分組成,第一部分是來自 Mask-LM 的「單詞級別分類任務」,另一部分是「句子級別的分類任務」,通過這兩個任務的聯合學習,可以使得 BERT 學習到的表征既有 token 級別資訊,同時也包含了句子級別的語意資訊,
最后再來看一下我們一開始提到的模型框架圖,是不是很輕松就能理解了 😃

總結:
BERT文章作者提出了兩個預訓練任務:Masked LM和Next Sentence Prediction
BERT的第一個任務是采用 MaskLM 的方式來訓練語言模型,通俗地說就是在輸入一句話的時
候,隨機地選一些要預測的詞,然后用一個特殊的符號[MASK]來代替它們,因為我們知道被蓋起來的部分是什么,但BERT不知道,所以BERT學習的目標就是 :輸出跟蓋起來的越接近越好,思想來源于 「完形填空」 的 任 務 , 具體來說 , 文章作者在一句話中隨機選擇 15% 的 詞匯用于預 測 , 對于在原句中被 抹 去 的 詞 匯 :
80% 情況下采用 一 個特殊符號 [MASK] 替 換 ,
10% 情況下采用 一 個任意詞替換,
剩余 10% 情況下保持原詞匯不變
這樣做的好處是,BERT 并不知道[MASK]替換的是這 15%個 Token 中的哪一個詞(「注意:這里意思是輸入的時候不知道[MASK] 替換的是哪一個詞,但是輸出還是知道要預測哪個詞的」)
Next Sentence Prediction 的任務描述為:給定一篇文章中的兩句話,判斷第二句話在文本中是否緊跟在第一句話之后,
這個類似于 「段落重排序」 的任務
只考慮兩句話,判斷是否是一篇文章中的前后句,在實際預訓練程序中,
文章作者從文本語料庫中隨機選擇 50% 正確陳述句對和 50% 錯誤陳述句對進行訓練,在第一個句子的首部會加上一個[CLS] token,在兩個句子中間以及最后一個句子的尾部會加上一個[SEP] token,
這樣能讓模型去學習一下句子層面的資訊,
本文完~~~~~~
完整原始碼
"""
orginal from :
https://github.com/graykode/nlp-tutorial/tree/master/5-2.BERT
"""
import math
import re
from random import *
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
# 資料預處理
def make_batch():
batch = [] # list
positive = negative = 0 # 計數器 為了記錄NSP任務中的正樣本和負樣本的個數,比例最好是在一個batch中接近1:1
while positive != batch_size/2 or negative != batch_size/2:
# 抽出來兩句話 先隨機sample兩個index 再通過index找出樣本
tokens_a_index, tokens_b_index= randrange(len(sentences)), randrange(len(sentences)) # 比如tokens_a_index=3,tokens_b_index=1;從整個樣本中抽取對應的樣本;
tokens_a, tokens_b= token_list[tokens_a_index], token_list[tokens_b_index]## 根據索引獲取對應樣本:tokens_a=[5, 23, 26, 20, 9, 13, 18] tokens_b=[27, 11, 23, 8, 17, 28, 12, 22, 16, 25]
# 拼接
input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']] ## 加上特殊符號,CLS符號是1,sep符號是2:[1, 5, 23, 26, 20, 9, 13, 18, 2, 27, 11, 23, 8, 17, 28, 12, 22, 16, 25, 2]
segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (len(tokens_b) + 1)##分割句子符號:[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# MASK LM
n_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15)))) # n_pred=3;整個句子的15%的字符可以被mask掉,這里取和max_pred中的最小值,確保每次計算損失的時候沒有那么多字符以及資訊充足,有15%做控制就夠了;其實可以不用加這個,單個句子少了,就要加上足夠的訓練樣本
# 不讓特殊字符參與mask
cand_maked_pos = [i for i, token in enumerate(input_ids)
if token != word_dict['[CLS]'] and token != word_dict['[SEP]']] ## cand_maked_pos=[1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18];整個句子input_ids中可以被mask的符號必須是非cls和sep符號的,要不然沒意義
shuffle(cand_maked_pos)## 打亂順序:cand_maked_pos=[6, 5, 17, 3, 1, 13, 16, 10, 12, 2, 9, 7, 11, 18, 4, 14, 15] 其實取mask對應的位置有很多方法,這里只是一種使用shuffle的方式
masked_tokens, masked_pos = [], []
for pos in cand_maked_pos[:n_pred]: # 取其中的三個;masked_pos=[6, 5, 17] 注意這里對應的是position資訊;masked_tokens=[13, 9, 16] 注意這里是被mask的元素之前對應的原始單字數字;
masked_pos.append(pos)
masked_tokens.append(input_ids[pos]) # 回到ppt看一下
if random() < 0.8: # 80%
input_ids[pos] = word_dict['[MASK]'] # make mask
elif random() < 0.5: # 10%
index = randint(0, vocab_size - 1) # random index in vocabulary
input_ids[pos] = word_dict[number_dict[index]] # replace
# Zero Paddings
n_pad = maxlen - len(input_ids)##maxlen=30;n_pad=10
input_ids.extend([0] * n_pad)
segment_ids.extend([0] * n_pad)# 這里有一個問題,0和之前的重了
# Zero Padding (100% - 15%) tokens 是為了計算一個batch中句子的mlm損失的時候可以組成一個有效矩陣放進去;不然第一個句子預測5個字符,第二句子預測7個字符,第三個句子預測8個字符,組不成一個有效的矩陣;
## 這里非常重要,為什么是對masked_tokens是補零,而不是補其他的字符????我補1可不可以??
if max_pred > n_pred:
n_pad = max_pred - n_pred
masked_tokens.extend([0] * n_pad)## masked_tokens= [13, 9, 16, 0, 0] masked_tokens 對應的是被mask的元素的原始真實標簽是啥,也就是groundtruth
masked_pos.extend([0] * n_pad)## masked_pos= [6, 5, 17,0,0] masked_pos是記錄哪些位置被mask了
if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True]) # IsNext
positive += 1
elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False]) # NotNext
negative += 1
return batch
# 符號矩陣
def get_attn_pad_mask(seq_q, seq_k): # 在自注意力層q k是一致的
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# eq(0)表示和0相等的回傳True,不相等回傳False,
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is masking
return pad_attn_mask.expand(batch_size, len_q, len_k) # 重復了len_q次 batch_size x len_q x len_k 不懂可以看一下例子
def gelu(x):
"Implementation of the gelu activation function by Hugging Face"
return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))
#Embedding層
class Embedding(nn.Module):
def __init__(self):
super(Embedding, self).__init__()
self.tok_embed = nn.Embedding(vocab_size, d_model) # token embedding
self.pos_embed = nn.Embedding(maxlen, d_model) # position embedding
self.seg_embed = nn.Embedding(n_segments, d_model) # segment(token type) embedding
self.norm = nn.LayerNorm(d_model)
def forward(self, input_ids, segment_ids):# x對應input_ids, seg對應segment_ids
seq_len = input_ids.size(1)
pos = torch.arange(seq_len, dtype=torch.long)
pos = pos.unsqueeze(0).expand_as(input_ids) # (seq_len,) -> (batch_size, seq_len)
embedding = self.tok_embed(input_ids) + self.pos_embed(pos) + self.seg_embed(segment_ids)
return self.norm(embedding)
# 注意力打分函式
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_pad):
## 輸入進來的維度分別是 [batch_size x n_heads x len_q x d_k] K: [batch_size x n_heads x len_k x d_k] V: [batch_size x n_heads x len_k x d_v]
##首先經過matmul函式得到的scores形狀是 : [batch_size x n_heads x len_q x len_k]
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
## 然后關鍵詞地方來了,下面這個就是用到了我們之前重點講的attn_pad,把被pad的地方置為無限小,softmax之后基本就是0,對q的單詞不起作用
scores.masked_fill_(attn_pad, -1e9) # Fills elements of self tensor with value where mask is one.
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V)
return context, attn
#多頭注意力機制
class MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
## 輸入進來的QKV是相等的,使用映射linear做一個映射得到引數矩陣Wq, Wk,Wv
self.W_Q = nn.Linear(d_model, d_k * n_heads)
self.W_K = nn.Linear(d_model, d_k * n_heads)
self.W_V = nn.Linear(d_model, d_v * n_heads)
def forward(self, Q, K, V, attn_pad):
## 這個多頭分為這幾個步驟,首先映射分頭,然后計算atten_scores,然后計算atten_value;
## 輸入進來的資料形狀: Q: [batch_size x len_q x d_model], K: [batch_size x len_k x d_model], V: [batch_size x len_k x d_model]
# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
residual, batch_size = Q, Q.size(0)
# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)
##下面這個就是先映射,后分頭;一定要注意的是q和k分頭之后維度是一致額,所以這里都是dk
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]
k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]
v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]
## 輸入進行的attn_pad形狀是 batch_size x len_q x len_k,然后經過下面這個代碼得到 新的attn_pad : [batch_size x n_heads x len_q x len_k],就是把pad資訊重復了n個頭上
attn_pad = attn_pad.unsqueeze(1).repeat(1, n_heads, 1, 1) # repeat 對張量重復擴充
# context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_pad)
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]
output = nn.Linear(n_heads * d_v, d_model)(context)
return nn.LayerNorm(d_model)(output + residual), attn # output: [batch_size x len_q x d_model]
#基于位置的前饋神經網路
class PoswiseFeedForwardNet(nn.Module):
def __init__(self): # 對每個字的增強語意向量再做兩次線性變換,以增強整個模型的表達能力,
super(PoswiseFeedForwardNet, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
def forward(self, x):
# (batch_size, len_seq, d_model) -> (batch_size, len_seq, d_ff) -> (batch_size, len_seq, d_model)
return self.fc2(gelu(self.fc1(x)))
#Encoder
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_pad):
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_pad) # enc_inputs to same Q,K,V enc_self_attn_mask是pad符號矩陣
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]
return enc_outputs, attn
## 1. BERT模型整體架構
class BERT(nn.Module):
def __init__(self):
super(BERT, self).__init__()
self.embedding = Embedding() ## 詞向量層,構建詞表矩陣
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) ## 把N個encoder堆疊起來,具體encoder實作一會看
self.fc = nn.Linear(d_model, d_model) ## 前饋神經網路-cls
self.activ1 = nn.Tanh() ## 激活函式-cls
self.linear = nn.Linear(d_model, d_model)#-mlm
self.activ2 = gelu ## 激活函式--mlm
self.norm = nn.LayerNorm(d_model)
self.classifier = nn.Linear(d_model, 2)## cls 這是一個分類層,維度是從d_model到2,對應我們架構圖中就是這種:
# decoder is shared with embedding layer
embed_weight = self.embedding.tok_embed.weight
n_vocab, n_dim = embed_weight.size()
self.decoder = nn.Linear(n_dim, n_vocab, bias=False)
self.decoder.weight = embed_weight
self.decoder_bias = nn.Parameter(torch.zeros(n_vocab))
def forward(self, input_ids, segment_ids, masked_pos):
input = self.embedding(input_ids, segment_ids) # 將input_ids,segment_ids,pos_embed加和
##get_attn_pad_mask是為了得到句子中pad的位置資訊,給到模型后面,在計算自注意力和互動注意力的時候去掉pad符號的影響,去看一下這個函式 4.
enc_self_attn_pad = get_attn_pad_mask(input_ids, input_ids)
for layer in self.layers:
output, enc_self_attn = layer(input, enc_self_attn_pad) ## enc_self_attn這里是QK轉置相乘之后softmax之后的矩陣值,代表的是每個單詞和其他單詞相關性;
# output : [batch_size, len, d_model], attn : [batch_size, n_heads, d_mode, d_model]
h_pooled = self.activ1(self.fc(output[:, 0])) # [batch_size, d_model] cls 對應的位置 可以看一下例子
logits_clsf = self.classifier(h_pooled) # [batch_size, 2]
masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1)) # [batch_size, max_pred, d_model] 其中一個 masked_pos= [6, 5, 17,0,0]
# get masked position from final output of transformer.
h_masked = torch.gather(output, 1, masked_pos) #在output取出一維對應masked_pos資料 masking position [batch_size, max_pred, d_model]
h_masked = self.norm(self.activ2(self.linear(h_masked)))
logits_lm = self.decoder(h_masked) + self.decoder_bias # [batch_size, max_pred, n_vocab]
return logits_lm, logits_clsf
# 1.從整體到區域
# 2.資料流動形狀(輸入 輸出)
if __name__ == '__main__':
# BERT Parameters
maxlen = 30 # 句子的最大長度
batch_size = 6 # 每一組有多少個句子一起送進去模型
max_pred = 5 # max tokens of prediction
n_layers = 6 # number of Encoder of Encoder Layer
n_heads = 12 # number of heads in Multi-Head Attention
d_model = 768 # Embedding Size
d_ff = 3072 # 4*d_model, FeedForward dimension
d_k = d_v = 64 # dimension of K(=Q), V
n_segments = 2
text = (
'Hello, how are you? I am Romeo.\n'
'Hello, Romeo My name is Juliet. Nice to meet you.\n'
'Nice meet you too. How are you today?\n'
'Great. My baseball team won the competition.\n'
'Oh Congratulations, Juliet\n'
'Thanks you Romeo'
)
sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') # filter '.', ',', '?', '!'
word_list = list(set(" ".join(sentences).split()))
word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}
for i, w in enumerate(word_list):
word_dict[w] = i + 4
number_dict = {i: w for i, w in enumerate(word_dict)}
vocab_size = len(word_dict)
# 把文本轉化成數字
token_list = list()
for sentence in sentences:
arr = [word_dict[s] for s in sentence.split()]
token_list.append(arr)
batch = make_batch() # 最重要的一部分 預訓練任務的資料構建部分
input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))# map把函式依次作用在list中的每一個元素上,得到一個新的list并回傳,注意,map不改變原list,而是回傳一個新list,
model = BERT()
criterion = nn.CrossEntropyLoss(ignore_index=0) # 只計算mask位置的損失
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
optimizer.zero_grad()
# logits_lm 語言詞表的輸出
# logits_clsf 二分類的輸出
# logits_lm:[batch_size, max_pred, n_vocab]
logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)## logits_lm 【6,5,29】 bs*max_pred*voca logits_clsf:[6*2]
loss_lm = criterion(logits_lm.transpose(1, 2), masked_tokens) # for masked LM ;masked_tokens [6,5]
loss_lm = (loss_lm.float()).mean()
loss_clsf = criterion(logits_clsf, isNext) # for sentence classification
loss = loss_lm + loss_clsf
if (epoch + 1) % 10 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
特別鳴謝:
DASOU
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379391.html
標籤:AI
