手撕Transformer之组合各组件

文摘   科技   2024-10-13 11:23   江苏  


点击蓝字
 
关注我们










01


引言



这是《手撕Transformer》系列的第八篇,也是最后一篇。编码器和解码器相结合,创建了一个能够轻松将德语翻译成英语的模型。

闲话少说,我们直接开始吧!







02


嵌入层


嵌入层为单词库中的每个标记Token提供相应的向量表示。这是每个序列必须经过的第一层。每个序列中的每个标记都必须嵌入到一个长度为 d_model 的向量中。这一层的输入是(batch_size,seq_length)。输出为(batch_size、seq_length、d_model)

class Embeddings(nn.Module):  def __init__(self, vocab_size: int, d_model: int):    """    Args:      vocab_size:     size of vocabulary      d_model:        dimension of embeddings    """    # inherit from nn.Module    super().__init__()       # embedding look-up table (lut)                              self.lut = nn.Embedding(vocab_size, d_model)       # dimension of embeddings     self.d_model = d_model                          
def forward(self, x: Tensor): """ Args:      x:              input Tensor (batch_size, seq_length) Returns: embedding vector """ # embeddings by constant sqrt(d_model) return self.lut(x) * math.sqrt(self.d_model)




03


位置编码层


接着需要对这些嵌入序列进行位置编码,为每个单词提供额外的语义。这也使得单个单词可以根据其在句子中的不同位置而具有不同的含义。该层的输入为(batch_size、seq_length、d_model)。位置编码矩阵的维度大小为(max_length, d_model),它必须被切片成与批次中每个序列相同的长度,使其维度变为(seq_length, d_model)。最终该层输出为(batch_size, seq_length, d_model)
class PositionalEncoding(nn.Module):  def __init__(self, d_model: int, dropout: float = 0.1, max_length: int = 5000):    """    Args:      d_model:      dimension of embeddings      dropout:      randomly zeroes-out some of the input      max_length:   max sequence length    """    # inherit from Module    super().__init__()     
# initialize dropout self.dropout = nn.Dropout(p=dropout) # create tensor of 0s pe = torch.zeros(max_length, d_model) # create position column     k = torch.arange(0, max_length).unsqueeze(1)   # calc divisor for positional encoding div_term = torch.exp( torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model) )
# calc sine on even indices pe[:, 0::2] = torch.sin(k * div_term) # calc cosine on odd indices     pe[:, 1::2] = torch.cos(k * div_term)   # add dimension pe = pe.unsqueeze(0) # buffers are saved in state_dict but not trained by the optimizer self.register_buffer("pe", pe)
def forward(self, x: Tensor): """ Args:      x:        embeddings (batch_size, seq_length, d_model) Returns: embeddings + positional encodings (batch_size, seq_length, d_model) """ # add positional encoding to the embeddings x = x + self.pe[:, : x.size(1)].requires_grad_(False)
# perform dropout return self.dropout(x)





04


Multi-Head Attention


这些嵌入和位置编码后的序列被复制三份传递到多头注意力层,以创建对应的Query、Key和Value张量,并由线性层进行转换。它们的大小均为(batch_size、seq_length、d_model)。这些张量被分成各自的head数,大小为(batch_size, n_heads, seq_length, d_key),其中 d_key = (d_model / n_heads)。现在,每个序列都有 n_heads 表示,它们可以在训练过程中关注序列的不同方面。

查询张量Query和Key张量相乘,生成一个概率分布,再除以 √(d_key)。Key张量必须转置。乘法的输出代表每个序列与自身的关系,在解码器的交叉注意力机制中代表目标序列与源序列的关系。这些分布的大小为(batch_size、n_heads、Q_length、K_length)。根据序列的填充情况,填充的位置将会被掩模;如果它们处于解码器的自注意力机制中,它们也会被掩模,以使序列只注意之前的标记,这就是解码器的自回归特性。

代码实现如下:

class MultiHeadAttention(nn.Module):  def __init__(self, d_model: int = 512, n_heads: int = 8, dropout: float = 0.1):    """    Args:        d_model:      dimension of embeddings        n_heads:      number of self attention heads        dropout:      probability of dropout occurring    """    super().__init__()    assert d_model % n_heads == 0            # ensure an even num of heads    self.d_model = d_model                   # 512 dim    self.n_heads = n_heads                   # 8 heads    self.d_key = d_model // n_heads          # assume d_value equals d_key | 512/8=64
self.Wq = nn.Linear(d_model, d_model) # query weights self.Wk = nn.Linear(d_model, d_model) # key weights self.Wv = nn.Linear(d_model, d_model) # value weights self.Wo = nn.Linear(d_model, d_model) # output weights
self.dropout = nn.Dropout(p=dropout) # initialize dropout layer
def forward(self, query: Tensor, key: Tensor, value: Tensor, mask: Tensor = None): """ Args: query: query vector (batch_size, q_length, d_model) key: key vector (batch_size, k_length, d_model) value: value vector (batch_size, s_length, d_model) mask: mask for decoder
Returns: output: attention values (batch_size, q_length, d_model) attn_probs: softmax scores (batch_size, n_heads, q_length, k_length) """ batch_size = key.size(0) # calculate query, key, and value tensors Q = self.Wq(query) # (32, 10, 512) x (512, 512) = (32, 10, 512) K = self.Wk(key) # (32, 10, 512) x (512, 512) = (32, 10, 512) V = self.Wv(value) # (32, 10, 512) x (512, 512) = (32, 10, 512)
# split each tensor into n-heads to compute attention
# query tensor Q = Q.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64) -1, # -1 = q_length self.n_heads, self.d_key ).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, q_length, d_key) # key tensor K = K.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64) -1, # -1 = k_length self.n_heads, self.d_key ).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, k_length, d_key) # value tensor V = V.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64) -1, # -1 = v_length self.n_heads, self.d_key ).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, v_length, d_key) # computes attention # scaled dot product -> QK^{T} scaled_dot_prod = torch.matmul(Q, # (32, 8, 10, 64) x (32, 8, 64, 10) -> (32, 8, 10, 10) = (batch_size, n_heads, q_length, k_length) K.permute(0, 1, 3, 2) ) / math.sqrt(self.d_key) # sqrt(64) # fill those positions of product as (-1e10) where mask positions are 0 if mask is not None: scaled_dot_prod = scaled_dot_prod.masked_fill(mask == 0, -1e10)
# apply softmax     attn_probs = torch.softmax(scaled_dot_prod, dim=-1)   # multiply by values to get attention A = torch.matmul(self.dropout(attn_probs), V) # (32, 8, 10, 10) x (32, 8, 10, 64) -> (32, 8, 10, 64) # (batch_size, n_heads, q_length, k_length) x (batch_size, n_heads, v_length, d_key) -> (batch_size, n_heads, q_length, d_key)
# reshape attention back to (32, 10, 512) A = A.permute(0, 2, 1, 3).contiguous() # (32, 8, 10, 64) -> (32, 10, 8, 64)    A = A.view(batch_size, -1self.n_heads*self.d_key) # (32, 10, 8, 64) -> (32, 10, 8*64) -> (32, 10, 512) = (batch_size, q_length, d_model) # push through the final weight layer output = self.Wo(A) # (32, 10, 512) x (512, 512) = (32, 10, 512)
return output, attn_probs






05


 前馈神经网络FFN


经过层归一化和残差连接后,注意力机制的输出被传递到 FFN。FFN 由两层线性层和一个 ReLU 激活函数组成。第一层的形状为 (d_model, d_ffn)。 每个序列的张量形状为(batch_size、seq_length、d_model)经过该层,它允许模型学习更多关于每个序列的信息。此时,张量的形状为(batch_size、seq_length、d_ffn),并通过 ReLU。然后,它将通过第二层,第二层的形状为(d_ffn,d_model)。这样,张量就会收缩为原来的大小(batch_size, seq_length, d_model)。输出经过层归一化处理,并进行残差加法。


代码如下:

class PositionwiseFeedForward(nn.Module):  def __init__(self, d_model: int, d_ffn: int, dropout: float = 0.1):    """    Args:        d_model:      dimension of embeddings        d_ffn:        dimension of feed-forward network        dropout:      probability of dropout occurring    """    super().__init__()
self.w_1 = nn.Linear(d_model, d_ffn) self.w_2 = nn.Linear(d_ffn, d_model) self.dropout = nn.Dropout(dropout)
def forward(self, x): """ Args: x: output from attention (batch_size, seq_length, d_model) Returns: expanded-and-contracted representation (batch_size, seq_length, d_model) """ # w_1(x).relu(): (batch_size, seq_length, d_model) x (d_model,d_ffn) -> (batch_size, seq_length, d_ffn) # w_2(w_1(x).relu()): (batch_size, seq_length, d_ffn) x (d_ffn, d_model) -> (batch_size, seq_length, d_model) return self.w_2(self.dropout(self.w_1(x).relu()))








06


 层归一化和残差连接


针对输入形状为(batch_size、seq_length、d_model)张量,层归一化将对每个 d_model 向量进行归一化。使用修改后的 z-score 公式对其进行标准化,这样可以防止梯度下降出现问题。



残差加法将进入层之前的嵌入向量添加到输出中。这就利用从多头注意力和 FFN 中获得的信息丰富了嵌入向量。


层归一化或残差加法都不会影响其输入的形状。这些都在编码器和解码器模块中实现,使用 nn.LayerNorm 是为了简单起见,而不是前序文章中创建的自定义模块。







07


 编码器


每个编码器层包括上述所有层。它负责丰富源序列的嵌入。输入的大小为(batch_size、seq_length、d_model)。嵌入序列直接传递给多头注意力机制。在编码器堆栈中经过 Nx 层后,输出是每个序列的丰富表示,其中包含尽可能多的上下文。它的大小为(batch_size、seq_length、d_model)


代码如下:

class EncoderLayer(nn.Module):    def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):    """    Args:        d_model:      dimension of embeddings        n_heads:      number of heads        d_ffn:        dimension of feed-forward network        dropout:      probability of dropout occurring    """    super().__init__()    # multi-head attention sublayer    self.attention = MultiHeadAttention(d_model, n_heads, dropout)    # layer norm for multi-head attention    self.attn_layer_norm = nn.LayerNorm(d_model)
# position-wise feed-forward network self.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout) # layer norm for position-wise ffn self.ffn_layer_norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, src: Tensor, src_mask: Tensor): """ Args: src: positionally embedded sequences (batch_size, seq_length, d_model) src_mask: mask for the sequences (batch_size, 1, 1, seq_length) Returns: src: sequences after self-attention (batch_size, seq_length, d_model) """ # pass embeddings through multi-head attention _src, attn_probs = self.attention(src, src, src, src_mask)
# residual add and norm src = self.attn_layer_norm(src + self.dropout(_src))
# position-wise feed-forward network _src = self.positionwise_ffn(src)
# residual add and norm src = self.ffn_layer_norm(src + self.dropout(_src))
return src, attn_probs
class Encoder(nn.Module): def __init__(self, d_model: int, n_layers: int, n_heads: int, d_ffn: int, dropout: float = 0.1): """ Args: d_model: dimension of embeddings n_layers: number of encoder layers n_heads: number of heads d_ffn: dimension of feed-forward network dropout: probability of dropout occurring """ super().__init__()
# create n_layers encoders self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ffn, dropout) for layer in range(n_layers)])
self.dropout = nn.Dropout(dropout)
def forward(self, src: Tensor, src_mask: Tensor): """ Args: src: embedded sequences (batch_size, seq_length, d_model) src_mask: mask for the sequences (batch_size, 1, 1, seq_length)
Returns: src: sequences after self-attention (batch_size, seq_length, d_model) """
# pass the sequences through each encoder for layer in self.layers: src, attn_probs = layer(src, src_mask)
self.attn_probs = attn_probs
return src







08


 解码器


每个解码器层有两个职责:(1) 学习移位目标序列的自回归表示;(2) 学习目标序列如何与编码器的丰富嵌入相关联。与编码器一样,解码器栈也有 Nx 个解码器层。如前所述,编码器的输出被传递到每个解码器层。


第一个解码层的输入被右移,并被嵌入和位置编码。它的形状为(batch_size, seq_length, d_model)。它将通过第一个注意力机制,在该机制中,模型将学习序列与自身的自回归表示。该机制的输出保持其形状,并传递给第二个交叉注意力机制。它与编码器的丰富嵌入相乘,输出再次保持原来的形状。


通过 FFN 后,将通过最后的线性层,该层的形状为(d_model,vocab_size)。这样就生成了一个大小为(batch_size、seq_length、vocab_size)的张量。之后可以通过一个 softmax 函数,最高概率就是对每个标记的预测值。


代码如下:

class DecoderLayer(nn.Module):
def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float): """ Args: d_model: dimension of embeddings n_heads: number of heads d_ffn: dimension of feed-forward network dropout: probability of dropout occurring """ super().__init__() # masked multi-head attention sublayer self.masked_attention = MultiHeadAttention(d_model, n_heads, dropout) # layer norm for masked multi-head attention self.masked_attn_layer_norm = nn.LayerNorm(d_model)
# multi-head attention sublayer self.attention = MultiHeadAttention(d_model, n_heads, dropout) # layer norm for multi-head attention self.attn_layer_norm = nn.LayerNorm(d_model)
# position-wise feed-forward network self.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout) # layer norm for position-wise ffn self.ffn_layer_norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, trg: Tensor, src: Tensor, trg_mask: Tensor, src_mask: Tensor): """ Args: trg: embedded sequences (batch_size, trg_seq_length, d_model) src: embedded sequences (batch_size, src_seq_length, d_model) trg_mask: mask for the sequences (batch_size, 1, trg_seq_length, trg_seq_length) src_mask: mask for the sequences (batch_size, 1, 1, src_seq_length)
Returns: trg: sequences after self-attention (batch_size, trg_seq_length, d_model) attn_probs: self-attention softmax scores (batch_size, n_heads, trg_seq_length, src_seq_length) """ # pass trg embeddings through masked multi-head attention _trg, attn_probs = self.masked_attention(trg, trg, trg, trg_mask)
# residual add and norm trg = self.masked_attn_layer_norm(trg + self.dropout(_trg))
# pass trg and src embeddings through multi-head attention _trg, attn_probs = self.attention(trg, src, src, src_mask)
# residual add and norm trg = self.attn_layer_norm(trg + self.dropout(_trg))
# position-wise feed-forward network _trg = self.positionwise_ffn(trg)
# residual add and norm trg = self.ffn_layer_norm(trg + self.dropout(_trg))
return trg, attn_probs
class Decoder(nn.Module): def __init__(self, vocab_size: int, d_model: int, n_layers: int, n_heads: int, d_ffn: int, dropout: float = 0.1): """ Args: vocab_size: size of the target vocabulary d_model: dimension of embeddings n_layers: number of encoder layers n_heads: number of heads d_ffn: dimension of feed-forward network dropout: probability of dropout occurring """ super().__init__()
# create n_layers encoders self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ffn, dropout) for layer in range(n_layers)])
self.dropout = nn.Dropout(dropout) # set output layer self.Wo = nn.Linear(d_model, vocab_size)
def forward(self, trg: Tensor, src: Tensor, trg_mask: Tensor, src_mask: Tensor): """ Args: trg: embedded sequences (batch_size, trg_seq_length, d_model) src: encoded sequences from encoder (batch_size, src_seq_length, d_model) trg_mask: mask for the sequences (batch_size, 1, trg_seq_length, trg_seq_length) src_mask: mask for the sequences (batch_size, 1, 1, src_seq_length)
Returns: output: sequences after decoder (batch_size, trg_seq_length, vocab_size) attn_probs: self-attention softmax scores (batch_size, n_heads, trg_seq_length, src_seq_length) """
# pass the sequences through each decoder for layer in self.layers: trg, attn_probs = layer(trg, src, trg_mask, src_mask)
self.attn_probs = attn_probs
return self.Wo(trg)








09


 The Transformer


编码器和解码器可在一个模块中组合,以创建Transformer模型。该模块可通过编码器、解码器以及目标嵌入和源嵌入进行初始化。


前向传递需要源序列和经过移位的目标序列。源序列被嵌入并通过编码器。编码器的输出和嵌入的目标序列通过解码器。创建源掩码和目标掩码的函数也是该模块的一部分。

class Transformer(nn.Module):  def __init__(self, encoder: Encoder, decoder: Decoder,               src_embed: Embeddings, trg_embed: Embeddings,               src_pad_idx: int, trg_pad_idx: int, device):    """    Args:        encoder:      encoder stack                            decoder:      decoder stack        src_embed:    source embeddings and encodings        trg_embed:    target embeddings and encodings        src_pad_idx:  padding index                  trg_pad_idx:  padding index        device:       cuda or cpu
Returns: output: sequences after decoder (batch_size, trg_seq_length, vocab_size) """ super().__init__()
self.encoder = encoder self.decoder = decoder self.src_embed = src_embed self.trg_embed = trg_embed self.device = device self.src_pad_idx = src_pad_idx self.trg_pad_idx = trg_pad_idx
def make_src_mask(self, src: Tensor): """ Args: src: raw sequences with padding (batch_size, seq_length)
Returns: src_mask: mask for each sequence (batch_size, 1, 1, seq_length) """ # assign 1 to tokens that need attended to and 0 to padding tokens, then add 2 dimensions src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
return src_mask
def make_trg_mask(self, trg: Tensor): """ Args: trg: raw sequences with padding (batch_size, seq_length)
Returns: trg_mask: mask for each sequence (batch_size, 1, seq_length, seq_length) """
seq_length = trg.shape[1] # assign True to tokens that need attended to and False to padding tokens, then add 2 dimensions trg_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, seq_length)
# generate subsequent mask trg_sub_mask = torch.tril(torch.ones((seq_length, seq_length), device=self.device)).bool() # (batch_size, 1, seq_length, seq_length) # bitwise "and" operator | 0 & 0 = 0, 1 & 1 = 1, 1 & 0 = 0 trg_mask = trg_mask & trg_sub_mask
return trg_mask
def forward(self, src: Tensor, trg: Tensor): """ Args: trg: raw target sequences (batch_size, trg_seq_length) src: raw src sequences (batch_size, src_seq_length)
Returns: output: sequences after decoder (batch_size, trg_seq_length, output_dim) """
# create source and target masks src_mask = self.make_src_mask(src) # (batch_size, 1, 1, src_seq_length) trg_mask = self.make_trg_mask(trg) # (batch_size, 1, trg_seq_length, trg_seq_length)
# push the src through the encoder layers src = self.encoder(self.src_embed(src), src_mask) # (batch_size, src_seq_length, d_model) # decoder output and attention probabilities output = self.decoder(self.trg_embed(trg), src, trg_mask, src_mask)
return output








10


 构建模型


下面的简单函数初始化了编码器、解码器、位置编码和嵌入层。然后,它将这些信息传递给 Transformer 模块,创建一个可以训练的模型。

def make_model(device, src_vocab, trg_vocab, n_layers: int = 3, d_model: int = 512,                d_ffn: int = 2048, n_heads: int = 8, dropout: float = 0.1,                max_length: int = 5000):  """    Construct a model when provided parameters.
Args: src_vocab: source vocabulary trg_vocab: target vocabulary n_layers: Number of Encoder and Decoders d_model: dimension of embeddings d_ffn: dimension of feed-forward network n_heads: number of heads dropout: probability of dropout occurring max_length: maximum sequence length for positional encodings
Returns: Transformer model based on hyperparameters """
# create the encoder encoder = Encoder(d_model, n_layers, n_heads, d_ffn, dropout) # create the decoder decoder = Decoder(len(trg_vocab), d_model, n_layers, n_heads, d_ffn, dropout)
# create source embedding matrix src_embed = Embeddings(len(src_vocab), d_model) # create target embedding matrix trg_embed = Embeddings(len(trg_vocab), d_model)
# create a positional encoding matrix pos_enc = PositionalEncoding(d_model, dropout, max_length)
# create the Transformer model model = Transformer(encoder, decoder, nn.Sequential(src_embed, pos_enc), nn.Sequential(trg_embed, pos_enc), src_pad_idx=src_vocab.get_stoi()["<pad>"], trg_pad_idx=trg_vocab.get_stoi()["<pad>"], device=device)
# initialize parameters with Xavier/Glorot for p in model.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p)
return model







11


 将德语翻译成英语之数据预处理


上一篇文章使用一个小型数据集训练了一个Transformer模型,用于将德语翻译成英语。本文将使用 torchtext.datasets 中的 Multi30k 数据集。它包含训练集、验证集和测试集。所有加载标记符、生成词汇表、处理数据和生成批次的自定义函数都可以在附录中找到。


第一步是从 spaCy 中加载每种语言的标记符,并使用 load_vocab 创建两种语言的词汇表。它调用 build_vocabary,这是一个使用 torchtext.vocab中的build_vocab_from_iterator 函数的自定义函数。词汇表中出现单词的最小频率为 2,词汇表中的每个单词都是小写。build_vocabulary 函数加载 Multi30k 数据集以生成词汇表。
# global variables used later in the scriptspacy_de, spacy_en = load_tokenizers()vocab_src, vocab_trg = load_vocab(spacy_de, spacy_en)

Loaded English and German tokenizers.
Building German Vocabulary...Building English Vocabulary...
Vocabulary sizes: Source: 8147 Target: 6082

词汇表生成后,可以设置一些全局变量,这些变量将用大写字母表示。下面的变量用于、和的索引,源词汇表和目标词汇表中的这些索引是相同的。
BOS_IDX = vocab_trg['<bos>']EOS_IDX = vocab_trg['<eos>']PAD_IDX = vocab_trg['<pad>']

可以加载数据集并处理
# raw datatrain_data_raw, val_data_raw, test_data_raw = datasets.Multi30k(language_pair=("de", "en"))

每个集合都是一个数据迭代器,可以看作是一个元组列表。每个元组包含一对德语-英语词组,如("Wie heißt du?","What is your name?")。可以根据词汇表对这些数据进行标记化并转换为相应的索引。这些操作在自定义函数 data_process 中执行。
# processed datatrain_data = data_process(train_data_raw)val_data = data_process(val_data_raw)test_data = data_process(test_data_raw)

这些数据迭代器现在可以传递给 torch.utils.data 中的 DataLoader,用于在训练过程中生成batch。DataLoader 需要一个数据迭代器、batchsize和一个用于自定义批次的处理函数。它还允许对批次进行洗牌,并在最后一个批次不完整时将其丢弃。
MAX_PADDING = 20BATCH_SIZE = 128
train_iter = DataLoader(to_map_style_dataset(train_data), batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
valid_iter = DataLoader(to_map_style_dataset(val_data), batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
test_iter = DataLoader(to_map_style_dataset(test_data), batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)

在上面的代码中,MAX_PADDING 表示序列可以处理的最大标记数。torch.nn.functional 中的 pad 函数会截断任何长度超过它的序列。generate_batch函数会调用pad函数,在序列中添加标记,并生成用于训练的批次。





12


 将德语翻译成英语之搭建模型


下一步是创建模型来训练数据。可以通过 make_model 函数传递参数来创建模型,还可以使用 model.cuda() 来确保模型在 GPU 可用的情况下在 GPU 上进行训练。以下超参的值是根据经验选择的。

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = make_model(device, vocab_src, vocab_trg, n_layers=3, n_heads=8, d_model=256, d_ffn=512, max_length=50)model.cuda()

还可以预览模型的全部可训练参数,以评估其规模。

def count_parameters(model):    return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')

输出如下:

The model has 9,159,362 trainable parameters.







13


 将德语翻译成英语之设置训练参数


为了训练模型,可以使用学习率为 0.0005 的 Adam 优化器,损失函数可以使用 Cross Entropy LossCross Entropy Loss 将模型的对数作为输入,用softmax函数进行转换,提取每个标记的 argmax,并将其与预期目标输出进行比较。
LEARNING_RATE = 0.0005
optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE)criterion = nn.CrossEntropyLoss(ignore_index = PAD_IDX)

该模型可使用以下函数进行训练,这些函数是每个epoch训练期间要执行的步骤。模型根据损失函数更新参数。最后,函数会返回该周期内各批次的平均损失。

def train(model, iterator, optimizer, criterion, clip):  """    Train the model on the given data.
Args: model: Transformer model to be trained iterator: data to be trained on optimizer: optimizer for updating parameters criterion: loss function for updating parameters clip: value to help prevent exploding gradients
Returns: loss for the epoch """
# set the model to training mode  model.train()   epoch_loss = 0
# loop through each batch in the iterator for i, batch in enumerate(iterator):
# set the source and target batches src,trg = batch
# zero the gradients optimizer.zero_grad()
# logits for each output logits = model(src, trg[:,:-1])
# expected output expected_output = trg[:,1:]
# calculate the loss loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),                     expected_output.contiguous().view(-1)) # backpropagation    loss.backward()   # clip the weights torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
# update the weights    optimizer.step()  # update the loss epoch_loss += loss.item()
# return the average loss for the epoch return epoch_loss / len(iterator)


下面的评价函数执行与 train 函数相同的过程,但不更新权重。它将用于测试集和验证集,以了解模型的泛化情况。
def evaluate(model, iterator, criterion):  """    Evaluate the model on the given data.
Args: model: Transformer model to be trained iterator: data to be evaluated criterion: loss function for assessing outputs
Returns: loss for the data """
# set the model to evaluation mode model.eval() epoch_loss = 0
# evaluate without updating gradients with torch.no_grad(): # loop through each batch in the iterator for i, batch in enumerate(iterator):
# set the source and target batches src, trg = batch # logits for each output logits = model(src, trg[:,:-1]) # expected output expected_output = trg[:,1:]
# calculate the loss loss = criterion(logits.contiguous().view(-1, logits.shape[-1]), expected_output.contiguous().view(-1)) # update the loss epoch_loss += loss.item()
# return the average loss for the epoch return epoch_loss / len(iterator)
最后,还可以创建一个函数来计算每个epoch所需的时间。
def epoch_time(start_time, end_time):  elapsed_time = end_time - start_time  elapsed_mins = int(elapsed_time / 60)  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))  return elapsed_mins, elapsed_secs






14


 将德语翻译成英语之模型训练


现在可以创建训练循环,以训练模型并评估其在验证集上的性能。
N_EPOCHS = 10CLIP = 1
best_valid_loss = float('inf')
# loop through each epochfor epoch in range(N_EPOCHS):
start_time = time.time()
# calculate the train loss and update the parameters train_loss = train(model, train_iter, optimizer, criterion, CLIP)
# calculate the loss on the validation set valid_loss = evaluate(model, valid_iter, criterion)
end_time = time.time()
# calculate how long the epoch took epoch_mins, epoch_secs = epoch_time(start_time, end_time)
# save the model when it performs better than the previous run if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), 'transformer-model.pt')
print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s') print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}') print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
结果如下:
Epoch: 01 | Time: 0m 21s Train Loss: 4.534 | Train PPL:  93.169  Val. Loss: 3.474 |  Val. PPL:  32.280Epoch: 02 | Time: 0m 13s Train Loss: 3.219 | Train PPL:  24.992  Val. Loss: 2.735 |  Val. PPL:  15.403Epoch: 03 | Time: 0m 13s Train Loss: 2.544 | Train PPL:  12.733  Val. Loss: 2.225 |  Val. PPL:   9.250Epoch: 04 | Time: 0m 14s Train Loss: 2.096 | Train PPL:   8.131  Val. Loss: 1.980 |  Val. PPL:   7.246Epoch: 05 | Time: 0m 13s Train Loss: 1.801 | Train PPL:   6.055  Val. Loss: 1.829 |  Val. PPL:   6.229Epoch: 06 | Time: 0m 14s Train Loss: 1.588 | Train PPL:   4.896  Val. Loss: 1.743 |  Val. PPL:   5.717Epoch: 07 | Time: 0m 13s Train Loss: 1.427 | Train PPL:   4.166  Val. Loss: 1.700 |  Val. PPL:   5.476Epoch: 08 | Time: 0m 13s Train Loss: 1.295 | Train PPL:   3.650  Val. Loss: 1.679 |  Val. PPL:   5.358Epoch: 09 | Time: 0m 13s Train Loss: 1.184 | Train PPL:   3.268  Val. Loss: 1.677 |  Val. PPL:   5.349Epoch: 10 | Time: 0m 13s Train Loss: 1.093 | Train PPL:   2.984  Val. Loss: 1.677 |  Val. PPL:   5.351

在评估结果之前,还可以使用评估函数在测试集上评估准确性。
# load the weightsmodel.load_state_dict(torch.load('transformer-model.pt'))
# calculate the loss on the test settest_loss = evaluate(model, test_iter, criterion)
print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}')

结果如下:

Test Loss: 1.692 | Test PPL:   5.430

虽然损失大幅减少,但没有迹象表明该模型在从德语翻译成英语方面有多成功。这可以通过两种方法来评估。第一种方法是向模型提供一个句子,并在推理过程中预览其翻译效果。第二种方法是通过另一种指标来计算其准确性,比如翻译任务的标准指标 BLEU。







15


 将德语翻译成英语之模型推理


将一个句子传递给下面的函数,即可进行实时翻译。句子将被标记化并通过模型,每次生成一个标记。一旦出现标记,就会返回输出结果。
def translate_sentence(sentence, model, device, max_length = 50):  """    Translate a German sentence to its English equivalent.
Args: sentence: German sentence to be translated to English; list or str model: Transformer model used for translation device: device to perform translation on max_length: maximum token length for translation
Returns: src: return the tokenized input trg_input: return the input to the decoder before the final output trg_output: return the final translation, shifted right attn_probs: return the attention scores for the decoder heads masked_attn_probs: return the masked attention scores for the decoder heads """
model.eval()
# tokenize and index the provided string if isinstance(sentence, str): src = ['<bos>'] + [token.text.lower() for token in spacy_de(sentence)] + ['<eos>'] else: src = ['<bos>'] + sentence + ['<eos>']
# convert to integers src_indexes = [vocab_src[token] for token in src] # convert list to tensor src_tensor = torch.tensor(src_indexes).int().unsqueeze(0).to(device)
# set <bos> token for target generation trg_indexes = [vocab_trg.get_stoi()['<bos>']]
# generate new tokens for i in range(max_length):
# convert the list to a tensor trg_tensor = torch.tensor(trg_indexes).int().unsqueeze(0).to(device)
# generate the next token with torch.no_grad(): # generate the logits logits = model.forward(src_tensor, trg_tensor) # select the newly predicted token pred_token = logits.argmax(2)[:,-1].item()
# if <eos> token or max length, stop generating if pred_token == vocab_trg.get_stoi()['<eos>'] or i == (max_length-1):
# decoder input trg_input = vocab_trg.lookup_tokens(trg_indexes) # decoder output trg_output = vocab_trg.lookup_tokens(logits.argmax(2).squeeze(0).tolist())
return src, trg_input, trg_output, model.decoder.attn_probs, model.decoder.masked_attn_probs
# else, continue generating else: # add the token trg_indexes.append(pred_token)

可以使用训练集中的一个示例来确保所生成的可视化效果能够展示注意力是如何运作的。
# 'a woman with a large purse is walking by a gate'src = ['eine', 'frau', 'mit', 'einer', 'großen', 'geldbörse', 'geht', 'an', 'einem', 'tor', 'vorbei', '.']
src, trg_input, trg_output, attn_probs, masked_attn_probs = translate_sentence(src, model, device)
print(f'source = {src}')print(f'target input = {trg_input}')print(f'target output = {trg_output}')

结果如下:
source = ['<bos>', 'eine', 'frau', 'mit', 'einer', 'großen', 'geldbörse', 'geht', 'an', 'einem', 'tor', 'vorbei', '.', '<eos>']target input = ['<bos>', 'a', 'woman', 'with', 'a', 'large', 'purse', 'walking', 'past', 'a', 'gate', '.']target output = ['a', 'woman', 'with', 'a', 'large', 'purse', 'walking', 'past', 'a', 'gate', '.', '<eos>']

target output是模型对源序列的预测,target input是解码器在生成序列结束标记之前的最终输入。这就是注意力矩阵中源序列的可视化效果。
display_attention(src, trg_input, attn_probs)




基于mask掩膜的注意力矩阵也可与目标输入一起查看。
display_attention(trg_input, trg_input, masked_attn_probs)
输出如下 :

虽然这些可视化效果很有用,但不在训练集中的句子也可以用来确定模型在实际翻译中的实用性。为了评估模型在整个测试集上的准确度,现在可以计算 BLEU 分数。





16


 将德语翻译成英语之模型评价


BLEU是评估机器翻译模型的常用指标。分值介于 0 和 1 之间,1 表示预测翻译和预期翻译完全相同。
根据 Google 的官方文档,BLEU 分数的含义如下(以百分比表示):

< 10:几乎无用 

10-19: 难以理解 

20-29:可以理解,但有明显语法错误 

30-39:可以理解到良好 

40-49: 高质量 

50-59: 高质量、充分、流畅 

60:优于人类质量

要计算 BLEU 分数,需要生成模型的预测值及其预期值。这可以通过下面的函数来完成,该函数使用了 translate_sentence 函数。

def compute_metrics(model, iterator):  """    Generate predictions for the provided iterator.
Args: model: Transformer model to be trained iterator: data to be evaluated
Returns: predictions: list of predictions, which are tokenized strings labels: list of expected output, which are tokenized strings """
# set the model to evaluation mode model.eval()
predictions = [] labels = [] # evaluate without updating gradients with torch.no_grad(): # loop through each batch in the iterator for i, batch in enumerate(iterator): # set the source and target batches src, trg = batch # predict the output src_out, trg_input, trg_output, attn_probs, masked_attn_probs = translate_sentence(vocab_src.lookup_tokens(src.tolist()), model, device)
# prediction | remove <eos> token predictions.append(trg_output[:-1])
# expected output | add extra dim for calculation labels.append([vocab_trg.lookup_tokens(trg.tolist())])
# return the average loss for the epoch return predictions, labels

之前生成的 test_data可以传递给 compute_metrics 函数。然后,可以将预测结果和标签传递给 torchtext.data.metrics 中的 bleu_score,以计算 BLEU 分数。

from torchtext.data.metrics import bleu_scorebleu_score(predictions, labels)
得到结果如下:
0.3588869571685791




这一输出结果表明翻译效果良好,是本教程可以接受的结果。

本例完成后,《手撕Transformer》系列也就结束了。



请不要忘记点赞和关注,以获取更多信息!      :)






附录: 
  • Packages

!pip install -q portalocker
# importing required librariesimport mathimport copyimport timeimport randomimport spacyimport numpy as npimport os
# torch packagesimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom torch import Tensorimport torch.optim as optim
# load and build datasetsimport torchtextfrom torchtext.data.functional import to_map_style_datasetfrom torch.nn.functional import padfrom torch.utils.data import DataLoaderfrom torchtext.vocab import build_vocab_from_iteratorimport torchtext.datasets as datasetsimport portalocker
# visualization packagesfrom mpl_toolkits import mplot3dimport matplotlib.pyplot as pltimport matplotlib.ticker as ticker
  • Loading the Tokenizers

def load_tokenizers():  """    Load the German and English tokenizers provided by spaCy.
Returns: spacy_de: German tokenizer spacy_en: English tokenizer """ try: spacy_de = spacy.load("de_core_news_sm") except OSError: os.system("python -m spacy download de_core_news_sm") spacy_de = spacy.load("de_core_news_sm")
try: spacy_en = spacy.load("en_core_web_sm") except OSError: os.system("python -m spacy download en_core_web_sm") spacy_en = spacy.load("en_core_web_sm") print("Loaded English and German tokenizers.") return spacy_de, spacy_en
  • Tokenize the Sequences

def tokenize(text: str, tokenizer):  """    Split a string into its tokens using the provided tokenizer.
Args: text: string tokenizer: tokenizer for the language Returns: tokenized list of strings """ return [tok.text.lower() for tok in tokenizer.tokenizer(text)]
  • Yield Tokens

def yield_tokens(data_iter, tokenizer, index: int):  """    Return the tokens for the appropriate language.
Args: data_iter: text here tokenizer: tokenizer for the language index: index of the language in the tuple | (de=0, en=1) Yields: sequences based on index """ for from_tuple in data_iter: yield tokenizer(from_tuple[index])
  • Building the Vocabulary

def build_vocabulary(spacy_de, spacy_en, min_freq: int = 2):    def tokenize_de(text: str):    """      Call the German tokenizer.
Args: text: string min_freq: minimum frequency needed to include a word in the vocabulary Returns: tokenized list of strings """ return tokenize(text, spacy_de)
def tokenize_en(text: str): """ Call the English tokenizer.
Args: text: string Returns: tokenized list of strings """ return tokenize(text, spacy_en)
print("Building German Vocabulary...") # load train, val, and test data pipelines train, val, test = datasets.Multi30k(language_pair=("de", "en"))
# generate source vocabulary vocab_src = build_vocab_from_iterator( yield_tokens(train + val + test, tokenize_de, index=0), # tokens for each German sentence (index 0) min_freq=min_freq, specials=["<bos>", "<eos>", "<pad>", "<unk>"], )
print("Building English Vocabulary...")
# generate target vocabulary vocab_trg = build_vocab_from_iterator( yield_tokens(train + val + test, tokenize_en, index=1), # tokens for each English sentence (index 1) min_freq=2, # specials=["<bos>", "<eos>", "<pad>", "<unk>"], )
# set default token for out-of-vocabulary words (OOV) vocab_src.set_default_index(vocab_src["<unk>"]) vocab_trg.set_default_index(vocab_trg["<unk>"])
return vocab_src, vocab_trg
  • Load the Vocabulary

def load_vocab(spacy_de, spacy_en, min_freq: int = 2):  """    Args:        spacy_de:     German tokenizer        spacy_en:     English tokenizer        min_freq:     minimum frequency needed to include a word in the vocabulary        Returns:        vocab_src:    German vocabulary        vocab_trg:     English vocabulary         """    if not os.path.exists("vocab.pt"):    # build the German/English vocabulary if it does not exist    vocab_src, vocab_trg = build_vocabulary(spacy_de, spacy_en, min_freq)    # save it to a file    torch.save((vocab_src, vocab_trg), "vocab.pt")  else:    # load the vocab if it exists    vocab_src, vocab_trg = torch.load("vocab.pt")
print("Finished.\nVocabulary sizes:") print("\tSource:", len(vocab_src)) print("\tTarget:", len(vocab_trg)) return vocab_src, vocab_trg
  • Indexing Sequences

def data_process(raw_data):  """    Process raw sentences by tokenizing and converting to integers based on     the vocabulary.
Args: raw_data: German-English sentence pairs Returns: data: tokenized data converted to index based on vocabulary """
data = [] # loop through each sentence pair for (raw_de, raw_en) in raw_data: # tokenize the sentence and convert each word to an integers de_tensor_ = torch.tensor([vocab_src[token.text.lower()] for token in spacy_de.tokenizer(raw_de)], dtype=torch.long) en_tensor_ = torch.tensor([vocab_trg[token.text.lower()] for token in spacy_en.tokenizer(raw_en)], dtype=torch.long)
# append tensor representations data.append((de_tensor_, en_tensor_)) return data
  • Generating Batches

def generate_batch(data_batch):  """    Process indexed-sequences by adding <bos>, <eos>, and <pad> tokens.
Args: data_batch: German-English indexed-sentence pairs Returns: two batches: one for German and one for English """ de_batch, en_batch = [], []
# for each sentence for (de_item, en_item) in data_batch: # add <bos> and <eos> indices before and after the sentence de_temp = torch.cat([torch.tensor([BOS_IDX]), de_item, torch.tensor([EOS_IDX])], dim=0).to(device) en_temp = torch.cat([torch.tensor([BOS_IDX]), en_item, torch.tensor([EOS_IDX])], dim=0).to(device)
# add padding de_batch.append(pad(de_temp,(0, # dimension to pad MAX_PADDING - len(de_temp), # amount of padding to add ),value=PAD_IDX,)) # add padding en_batch.append(pad(en_temp,(0, # dimension to pad MAX_PADDING - len(en_temp), # amount of padding to add ), value=PAD_IDX,)) return torch.stack(de_batch), torch.stack(en_batch)
  • Displaying Attention

def display_attention(sentence: list, translation: list, attention: Tensor,                       n_heads: int = 8, n_rows: int = 4, n_cols: int = 2):  """    Display the attention matrix for each head of a sequence.
Args: sentence: German sentence to be translated to English; list translation: English sentence predicted by the model attention: attention scores for the heads n_heads: number of heads n_rows: number of rows n_cols: number of columns """ # ensure the number of rows and columns are equal to the number of heads assert n_rows * n_cols == n_heads # figure size fig = plt.figure(figsize=(15,25)) # visualize each head for i in range(n_heads): # create a plot ax = fig.add_subplot(n_rows, n_cols, i+1) # select the respective head and make it a numpy array for plotting _attention = attention.squeeze(0)[i,:,:].cpu().detach().numpy()
# plot the matrix cax = ax.matshow(_attention, cmap='bone')
# set the size of the labels ax.tick_params(labelsize=12)
# set the indices for the tick marks ax.set_xticks(range(len(sentence))) ax.set_yticks(range(len(translation)))
# if the provided sequences are sentences or indices if isinstance(sentence[0], str): ax.set_xticklabels([t.lower() for t in sentence], rotation=45) ax.set_yticklabels(translation) elif isinstance(sentence[0], int): ax.set_xticklabels(sentence) ax.set_yticklabels(translation)
plt.show()






点击上方小卡片关注我




添加个人微信,进专属粉丝群!

AI算法之道
一个专注于深度学习、计算机视觉和自动驾驶感知算法的公众号,涵盖视觉CV、神经网络、模式识别等方面,包括相应的硬件和软件配置,以及开源项目等。
 最新文章