01
引言
这是《手撕Transformer》系列的第八篇,也是最后一篇。编码器和解码器相结合,创建了一个能够轻松将德语翻译成英语的模型。
02
嵌入层
嵌入层为单词库中的每个标记Token提供相应的向量表示。这是每个序列必须经过的第一层。每个序列中的每个标记都必须嵌入到一个长度为 d_model 的向量中。这一层的输入是(batch_size,seq_length)。输出为(batch_size、seq_length、d_model)。
class Embeddings(nn.Module):
def __init__(self, vocab_size: int, d_model: int):
"""
Args:
vocab_size: size of vocabulary
d_model: dimension of embeddings
"""
# inherit from nn.Module
super().__init__()
# embedding look-up table (lut)
self.lut = nn.Embedding(vocab_size, d_model)
# dimension of embeddings
self.d_model = d_model
def forward(self, x: Tensor):
"""
Args:
x: input Tensor (batch_size, seq_length)
Returns:
embedding vector
"""
# embeddings by constant sqrt(d_model)
return self.lut(x) * math.sqrt(self.d_model)
03
位置编码层
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, dropout: float = 0.1, max_length: int = 5000):
"""
Args:
d_model: dimension of embeddings
dropout: randomly zeroes-out some of the input
max_length: max sequence length
"""
# inherit from Module
super().__init__()
# initialize dropout
self.dropout = nn.Dropout(p=dropout)
# create tensor of 0s
pe = torch.zeros(max_length, d_model)
# create position column
k = torch.arange(0, max_length).unsqueeze(1)
# calc divisor for positional encoding
div_term = torch.exp(
torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
)
# calc sine on even indices
pe[:, 0::2] = torch.sin(k * div_term)
# calc cosine on odd indices
pe[:, 1::2] = torch.cos(k * div_term)
# add dimension
pe = pe.unsqueeze(0)
# buffers are saved in state_dict but not trained by the optimizer
self.register_buffer("pe", pe)
def forward(self, x: Tensor):
"""
Args:
x: embeddings (batch_size, seq_length, d_model)
Returns:
embeddings + positional encodings (batch_size, seq_length, d_model)
"""
# add positional encoding to the embeddings
x = x + self.pe[:, : x.size(1)].requires_grad_(False)
# perform dropout
return self.dropout(x)
04
Multi-Head Attention
这些嵌入和位置编码后的序列被复制三份传递到多头注意力层,以创建对应的Query、Key和Value张量,并由线性层进行转换。它们的大小均为(batch_size、seq_length、d_model)。这些张量被分成各自的head数,大小为(batch_size, n_heads, seq_length, d_key),其中 d_key = (d_model / n_heads)。现在,每个序列都有 n_heads 表示,它们可以在训练过程中关注序列的不同方面。
查询张量Query和Key张量相乘,生成一个概率分布,再除以 √(d_key)。Key张量必须转置。乘法的输出代表每个序列与自身的关系,在解码器的交叉注意力机制中代表目标序列与源序列的关系。这些分布的大小为(batch_size、n_heads、Q_length、K_length)。根据序列的填充情况,填充的位置将会被掩模;如果它们处于解码器的自注意力机制中,它们也会被掩模,以使序列只注意之前的标记,这就是解码器的自回归特性。
代码实现如下:
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int = 512, n_heads: int = 8, dropout: float = 0.1):
"""
Args:
d_model: dimension of embeddings
n_heads: number of self attention heads
dropout: probability of dropout occurring
"""
super().__init__()
assert d_model % n_heads == 0 # ensure an even num of heads
self.d_model = d_model # 512 dim
self.n_heads = n_heads # 8 heads
self.d_key = d_model // n_heads # assume d_value equals d_key | 512/8=64
self.Wq = nn.Linear(d_model, d_model) # query weights
self.Wk = nn.Linear(d_model, d_model) # key weights
self.Wv = nn.Linear(d_model, d_model) # value weights
self.Wo = nn.Linear(d_model, d_model) # output weights
self.dropout = nn.Dropout(p=dropout) # initialize dropout layer
def forward(self, query: Tensor, key: Tensor, value: Tensor, mask: Tensor = None):
"""
Args:
query: query vector (batch_size, q_length, d_model)
key: key vector (batch_size, k_length, d_model)
value: value vector (batch_size, s_length, d_model)
mask: mask for decoder
Returns:
output: attention values (batch_size, q_length, d_model)
attn_probs: softmax scores (batch_size, n_heads, q_length, k_length)
"""
batch_size = key.size(0)
# calculate query, key, and value tensors
Q = self.Wq(query) # (32, 10, 512) x (512, 512) = (32, 10, 512)
K = self.Wk(key) # (32, 10, 512) x (512, 512) = (32, 10, 512)
V = self.Wv(value) # (32, 10, 512) x (512, 512) = (32, 10, 512)
# split each tensor into n-heads to compute attention
# query tensor
Q = Q.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64)
-1, # -1 = q_length
self.n_heads,
self.d_key
).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, q_length, d_key)
# key tensor
K = K.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64)
-1, # -1 = k_length
self.n_heads,
self.d_key
).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, k_length, d_key)
# value tensor
V = V.view(batch_size, # (32, 10, 512) -> (32, 10, 8, 64)
-1, # -1 = v_length
self.n_heads,
self.d_key
).permute(0, 2, 1, 3) # (32, 10, 8, 64) -> (32, 8, 10, 64) = (batch_size, n_heads, v_length, d_key)
# computes attention
# scaled dot product -> QK^{T}
scaled_dot_prod = torch.matmul(Q, # (32, 8, 10, 64) x (32, 8, 64, 10) -> (32, 8, 10, 10) = (batch_size, n_heads, q_length, k_length)
K.permute(0, 1, 3, 2)
) / math.sqrt(self.d_key) # sqrt(64)
# fill those positions of product as (-1e10) where mask positions are 0
if mask is not None:
scaled_dot_prod = scaled_dot_prod.masked_fill(mask == 0, -1e10)
# apply softmax
attn_probs = torch.softmax(scaled_dot_prod, dim=-1)
# multiply by values to get attention
A = torch.matmul(self.dropout(attn_probs), V) # (32, 8, 10, 10) x (32, 8, 10, 64) -> (32, 8, 10, 64)
# (batch_size, n_heads, q_length, k_length) x (batch_size, n_heads, v_length, d_key) -> (batch_size, n_heads, q_length, d_key)
# reshape attention back to (32, 10, 512)
A = A.permute(0, 2, 1, 3).contiguous() # (32, 8, 10, 64) -> (32, 10, 8, 64)
A = A.view(batch_size, -1, self.n_heads*self.d_key) # (32, 10, 8, 64) -> (32, 10, 8*64) -> (32, 10, 512) = (batch_size, q_length, d_model)
# push through the final weight layer
output = self.Wo(A) # (32, 10, 512) x (512, 512) = (32, 10, 512)
return output, attn_probs
05
前馈神经网络FFN
经过层归一化和残差连接后,注意力机制的输出被传递到 FFN。FFN 由两层线性层和一个 ReLU 激活函数组成。第一层的形状为 (d_model, d_ffn)。 每个序列的张量形状为(batch_size、seq_length、d_model)经过该层,它允许模型学习更多关于每个序列的信息。此时,张量的形状为(batch_size、seq_length、d_ffn),并通过 ReLU。然后,它将通过第二层,第二层的形状为(d_ffn,d_model)。这样,张量就会收缩为原来的大小(batch_size, seq_length, d_model)。输出经过层归一化处理,并进行残差加法。
代码如下:
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model: int, d_ffn: int, dropout: float = 0.1):
"""
Args:
d_model: dimension of embeddings
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
self.w_1 = nn.Linear(d_model, d_ffn)
self.w_2 = nn.Linear(d_ffn, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
"""
Args:
x: output from attention (batch_size, seq_length, d_model)
Returns:
expanded-and-contracted representation (batch_size, seq_length, d_model)
"""
# w_1(x).relu(): (batch_size, seq_length, d_model) x (d_model,d_ffn) -> (batch_size, seq_length, d_ffn)
# w_2(w_1(x).relu()): (batch_size, seq_length, d_ffn) x (d_ffn, d_model) -> (batch_size, seq_length, d_model)
return self.w_2(self.dropout(self.w_1(x).relu()))
06
层归一化和残差连接
针对输入形状为(batch_size、seq_length、d_model)张量,层归一化将对每个 d_model 向量进行归一化。使用修改后的 z-score 公式对其进行标准化,这样可以防止梯度下降出现问题。
残差加法将进入层之前的嵌入向量添加到输出中。这就利用从多头注意力和 FFN 中获得的信息丰富了嵌入向量。
层归一化或残差加法都不会影响其输入的形状。这些都在编码器和解码器模块中实现,使用 nn.LayerNorm 是为了简单起见,而不是前序文章中创建的自定义模块。
07
编码器
每个编码器层包括上述所有层。它负责丰富源序列的嵌入。输入的大小为(batch_size、seq_length、d_model)。嵌入序列直接传递给多头注意力机制。在编码器堆栈中经过 Nx 层后,输出是每个序列的丰富表示,其中包含尽可能多的上下文。它的大小为(batch_size、seq_length、d_model)。
代码如下:
class EncoderLayer(nn.Module):
def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):
"""
Args:
d_model: dimension of embeddings
n_heads: number of heads
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
# multi-head attention sublayer
self.attention = MultiHeadAttention(d_model, n_heads, dropout)
# layer norm for multi-head attention
self.attn_layer_norm = nn.LayerNorm(d_model)
# position-wise feed-forward network
self.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout)
# layer norm for position-wise ffn
self.ffn_layer_norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, src: Tensor, src_mask: Tensor):
"""
Args:
src: positionally embedded sequences (batch_size, seq_length, d_model)
src_mask: mask for the sequences (batch_size, 1, 1, seq_length)
Returns:
src: sequences after self-attention (batch_size, seq_length, d_model)
"""
# pass embeddings through multi-head attention
_src, attn_probs = self.attention(src, src, src, src_mask)
# residual add and norm
src = self.attn_layer_norm(src + self.dropout(_src))
# position-wise feed-forward network
_src = self.positionwise_ffn(src)
# residual add and norm
src = self.ffn_layer_norm(src + self.dropout(_src))
return src, attn_probs
class Encoder(nn.Module):
def __init__(self, d_model: int, n_layers: int,
n_heads: int, d_ffn: int, dropout: float = 0.1):
"""
Args:
d_model: dimension of embeddings
n_layers: number of encoder layers
n_heads: number of heads
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
# create n_layers encoders
self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ffn, dropout)
for layer in range(n_layers)])
self.dropout = nn.Dropout(dropout)
def forward(self, src: Tensor, src_mask: Tensor):
"""
Args:
src: embedded sequences (batch_size, seq_length, d_model)
src_mask: mask for the sequences (batch_size, 1, 1, seq_length)
Returns:
src: sequences after self-attention (batch_size, seq_length, d_model)
"""
# pass the sequences through each encoder
for layer in self.layers:
src, attn_probs = layer(src, src_mask)
self.attn_probs = attn_probs
return src
08
解码器
每个解码器层有两个职责:(1) 学习移位目标序列的自回归表示;(2) 学习目标序列如何与编码器的丰富嵌入相关联。与编码器一样,解码器栈也有 Nx 个解码器层。如前所述,编码器的输出被传递到每个解码器层。
第一个解码层的输入被右移,并被嵌入和位置编码。它的形状为(batch_size, seq_length, d_model)。它将通过第一个注意力机制,在该机制中,模型将学习序列与自身的自回归表示。该机制的输出保持其形状,并传递给第二个交叉注意力机制。它与编码器的丰富嵌入相乘,输出再次保持原来的形状。
通过 FFN 后,将通过最后的线性层,该层的形状为(d_model,vocab_size)。这样就生成了一个大小为(batch_size、seq_length、vocab_size)的张量。之后可以通过一个 softmax 函数,最高概率就是对每个标记的预测值。
代码如下:
class DecoderLayer(nn.Module):
def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):
"""
Args:
d_model: dimension of embeddings
n_heads: number of heads
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
# masked multi-head attention sublayer
self.masked_attention = MultiHeadAttention(d_model, n_heads, dropout)
# layer norm for masked multi-head attention
self.masked_attn_layer_norm = nn.LayerNorm(d_model)
# multi-head attention sublayer
self.attention = MultiHeadAttention(d_model, n_heads, dropout)
# layer norm for multi-head attention
self.attn_layer_norm = nn.LayerNorm(d_model)
# position-wise feed-forward network
self.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout)
# layer norm for position-wise ffn
self.ffn_layer_norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, trg: Tensor, src: Tensor, trg_mask: Tensor, src_mask: Tensor):
"""
Args:
trg: embedded sequences (batch_size, trg_seq_length, d_model)
src: embedded sequences (batch_size, src_seq_length, d_model)
trg_mask: mask for the sequences (batch_size, 1, trg_seq_length, trg_seq_length)
src_mask: mask for the sequences (batch_size, 1, 1, src_seq_length)
Returns:
trg: sequences after self-attention (batch_size, trg_seq_length, d_model)
attn_probs: self-attention softmax scores (batch_size, n_heads, trg_seq_length, src_seq_length)
"""
# pass trg embeddings through masked multi-head attention
_trg, attn_probs = self.masked_attention(trg, trg, trg, trg_mask)
# residual add and norm
trg = self.masked_attn_layer_norm(trg + self.dropout(_trg))
# pass trg and src embeddings through multi-head attention
_trg, attn_probs = self.attention(trg, src, src, src_mask)
# residual add and norm
trg = self.attn_layer_norm(trg + self.dropout(_trg))
# position-wise feed-forward network
_trg = self.positionwise_ffn(trg)
# residual add and norm
trg = self.ffn_layer_norm(trg + self.dropout(_trg))
return trg, attn_probs
class Decoder(nn.Module):
def __init__(self, vocab_size: int, d_model: int, n_layers: int,
n_heads: int, d_ffn: int, dropout: float = 0.1):
"""
Args:
vocab_size: size of the target vocabulary
d_model: dimension of embeddings
n_layers: number of encoder layers
n_heads: number of heads
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
# create n_layers encoders
self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ffn, dropout)
for layer in range(n_layers)])
self.dropout = nn.Dropout(dropout)
# set output layer
self.Wo = nn.Linear(d_model, vocab_size)
def forward(self, trg: Tensor, src: Tensor, trg_mask: Tensor, src_mask: Tensor):
"""
Args:
trg: embedded sequences (batch_size, trg_seq_length, d_model)
src: encoded sequences from encoder (batch_size, src_seq_length, d_model)
trg_mask: mask for the sequences (batch_size, 1, trg_seq_length, trg_seq_length)
src_mask: mask for the sequences (batch_size, 1, 1, src_seq_length)
Returns:
output: sequences after decoder (batch_size, trg_seq_length, vocab_size)
attn_probs: self-attention softmax scores (batch_size, n_heads, trg_seq_length, src_seq_length)
"""
# pass the sequences through each decoder
for layer in self.layers:
trg, attn_probs = layer(trg, src, trg_mask, src_mask)
self.attn_probs = attn_probs
return self.Wo(trg)
09
The Transformer
编码器和解码器可在一个模块中组合,以创建Transformer模型。该模块可通过编码器、解码器以及目标嵌入和源嵌入进行初始化。
前向传递需要源序列和经过移位的目标序列。源序列被嵌入并通过编码器。编码器的输出和嵌入的目标序列通过解码器。创建源掩码和目标掩码的函数也是该模块的一部分。
class Transformer(nn.Module):
def __init__(self, encoder: Encoder, decoder: Decoder,
src_embed: Embeddings, trg_embed: Embeddings,
src_pad_idx: int, trg_pad_idx: int, device):
"""
Args:
encoder: encoder stack
decoder: decoder stack
src_embed: source embeddings and encodings
trg_embed: target embeddings and encodings
src_pad_idx: padding index
trg_pad_idx: padding index
device: cuda or cpu
Returns:
output: sequences after decoder (batch_size, trg_seq_length, vocab_size)
"""
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.trg_embed = trg_embed
self.device = device
self.src_pad_idx = src_pad_idx
self.trg_pad_idx = trg_pad_idx
def make_src_mask(self, src: Tensor):
"""
Args:
src: raw sequences with padding (batch_size, seq_length)
Returns:
src_mask: mask for each sequence (batch_size, 1, 1, seq_length)
"""
# assign 1 to tokens that need attended to and 0 to padding tokens, then add 2 dimensions
src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
return src_mask
def make_trg_mask(self, trg: Tensor):
"""
Args:
trg: raw sequences with padding (batch_size, seq_length)
Returns:
trg_mask: mask for each sequence (batch_size, 1, seq_length, seq_length)
"""
seq_length = trg.shape[1]
# assign True to tokens that need attended to and False to padding tokens, then add 2 dimensions
trg_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, seq_length)
# generate subsequent mask
trg_sub_mask = torch.tril(torch.ones((seq_length, seq_length), device=self.device)).bool() # (batch_size, 1, seq_length, seq_length)
# bitwise "and" operator | 0 & 0 = 0, 1 & 1 = 1, 1 & 0 = 0
trg_mask = trg_mask & trg_sub_mask
return trg_mask
def forward(self, src: Tensor, trg: Tensor):
"""
Args:
trg: raw target sequences (batch_size, trg_seq_length)
src: raw src sequences (batch_size, src_seq_length)
Returns:
output: sequences after decoder (batch_size, trg_seq_length, output_dim)
"""
# create source and target masks
src_mask = self.make_src_mask(src) # (batch_size, 1, 1, src_seq_length)
trg_mask = self.make_trg_mask(trg) # (batch_size, 1, trg_seq_length, trg_seq_length)
# push the src through the encoder layers
src = self.encoder(self.src_embed(src), src_mask) # (batch_size, src_seq_length, d_model)
# decoder output and attention probabilities
output = self.decoder(self.trg_embed(trg), src, trg_mask, src_mask)
return output
10
构建模型
下面的简单函数初始化了编码器、解码器、位置编码和嵌入层。然后,它将这些信息传递给 Transformer 模块,创建一个可以训练的模型。
def make_model(device, src_vocab, trg_vocab, n_layers: int = 3, d_model: int = 512,
d_ffn: int = 2048, n_heads: int = 8, dropout: float = 0.1,
max_length: int = 5000):
"""
Construct a model when provided parameters.
Args:
src_vocab: source vocabulary
trg_vocab: target vocabulary
n_layers: Number of Encoder and Decoders
d_model: dimension of embeddings
d_ffn: dimension of feed-forward network
n_heads: number of heads
dropout: probability of dropout occurring
max_length: maximum sequence length for positional encodings
Returns:
Transformer model based on hyperparameters
"""
# create the encoder
encoder = Encoder(d_model, n_layers, n_heads, d_ffn, dropout)
# create the decoder
decoder = Decoder(len(trg_vocab), d_model, n_layers, n_heads, d_ffn, dropout)
# create source embedding matrix
src_embed = Embeddings(len(src_vocab), d_model)
# create target embedding matrix
trg_embed = Embeddings(len(trg_vocab), d_model)
# create a positional encoding matrix
pos_enc = PositionalEncoding(d_model, dropout, max_length)
# create the Transformer model
model = Transformer(encoder, decoder, nn.Sequential(src_embed, pos_enc),
nn.Sequential(trg_embed, pos_enc),
src_pad_idx=src_vocab.get_stoi()["<pad>"],
trg_pad_idx=trg_vocab.get_stoi()["<pad>"],
device=device)
# initialize parameters with Xavier/Glorot
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
11
将德语翻译成英语之数据预处理
上一篇文章使用一个小型数据集训练了一个Transformer模型,用于将德语翻译成英语。本文将使用 torchtext.datasets 中的 Multi30k 数据集。它包含训练集、验证集和测试集。所有加载标记符、生成词汇表、处理数据和生成批次的自定义函数都可以在附录中找到。
# global variables used later in the script
spacy_de, spacy_en = load_tokenizers()
vocab_src, vocab_trg = load_vocab(spacy_de, spacy_en)
Loaded English and German tokenizers.
Building German Vocabulary...
Building English Vocabulary...
Vocabulary sizes:
Source: 8147
Target: 6082
BOS_IDX = vocab_trg['<bos>']
EOS_IDX = vocab_trg['<eos>']
PAD_IDX = vocab_trg['<pad>']
# raw data
train_data_raw, val_data_raw, test_data_raw = datasets.Multi30k(language_pair=("de", "en"))
# processed data
train_data = data_process(train_data_raw)
val_data = data_process(val_data_raw)
test_data = data_process(test_data_raw)
MAX_PADDING = 20
BATCH_SIZE = 128
train_iter = DataLoader(to_map_style_dataset(train_data), batch_size=BATCH_SIZE,
shuffle=True, drop_last=True, collate_fn=generate_batch)
valid_iter = DataLoader(to_map_style_dataset(val_data), batch_size=BATCH_SIZE,
shuffle=True, drop_last=True, collate_fn=generate_batch)
test_iter = DataLoader(to_map_style_dataset(test_data), batch_size=BATCH_SIZE,
shuffle=True, drop_last=True, collate_fn=generate_batch)
12
将德语翻译成英语之搭建模型
下一步是创建模型来训练数据。可以通过 make_model 函数传递参数来创建模型,还可以使用 model.cuda() 来确保模型在 GPU 可用的情况下在 GPU 上进行训练。以下超参的值是根据经验选择的。
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = make_model(device, vocab_src, vocab_trg,
n_layers=3, n_heads=8, d_model=256,
d_ffn=512, max_length=50)
model.cuda()
还可以预览模型的全部可训练参数,以评估其规模。
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
输出如下:
The model has 9,159,362 trainable parameters.
13
将德语翻译成英语之设置训练参数
LEARNING_RATE = 0.0005
optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index = PAD_IDX)
该模型可使用以下函数进行训练,这些函数是每个epoch训练期间要执行的步骤。模型根据损失函数更新参数。最后,函数会返回该周期内各批次的平均损失。
def train(model, iterator, optimizer, criterion, clip):
"""
Train the model on the given data.
Args:
model: Transformer model to be trained
iterator: data to be trained on
optimizer: optimizer for updating parameters
criterion: loss function for updating parameters
clip: value to help prevent exploding gradients
Returns:
loss for the epoch
"""
# set the model to training mode
model.train()
epoch_loss = 0
# loop through each batch in the iterator
for i, batch in enumerate(iterator):
# set the source and target batches
src,trg = batch
# zero the gradients
optimizer.zero_grad()
# logits for each output
logits = model(src, trg[:,:-1])
# expected output
expected_output = trg[:,1:]
# calculate the loss
loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),
expected_output.contiguous().view(-1))
# backpropagation
loss.backward()
# clip the weights
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
# update the weights
optimizer.step()
# update the loss
epoch_loss += loss.item()
# return the average loss for the epoch
return epoch_loss / len(iterator)
def evaluate(model, iterator, criterion):
"""
Evaluate the model on the given data.
Args:
model: Transformer model to be trained
iterator: data to be evaluated
criterion: loss function for assessing outputs
Returns:
loss for the data
"""
# set the model to evaluation mode
model.eval()
epoch_loss = 0
# evaluate without updating gradients
with torch.no_grad():
# loop through each batch in the iterator
for i, batch in enumerate(iterator):
# set the source and target batches
src, trg = batch
# logits for each output
logits = model(src, trg[:,:-1])
# expected output
expected_output = trg[:,1:]
# calculate the loss
loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),
expected_output.contiguous().view(-1))
# update the loss
epoch_loss += loss.item()
# return the average loss for the epoch
return epoch_loss / len(iterator)
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
14
将德语翻译成英语之模型训练
N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')
# loop through each epoch
for epoch in range(N_EPOCHS):
start_time = time.time()
# calculate the train loss and update the parameters
train_loss = train(model, train_iter, optimizer, criterion, CLIP)
# calculate the loss on the validation set
valid_loss = evaluate(model, valid_iter, criterion)
end_time = time.time()
# calculate how long the epoch took
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
# save the model when it performs better than the previous run
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'transformer-model.pt')
print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
Epoch: 01 | Time: 0m 21s
Train Loss: 4.534 | Train PPL: 93.169
Val. Loss: 3.474 | Val. PPL: 32.280
Epoch: 02 | Time: 0m 13s
Train Loss: 3.219 | Train PPL: 24.992
Val. Loss: 2.735 | Val. PPL: 15.403
Epoch: 03 | Time: 0m 13s
Train Loss: 2.544 | Train PPL: 12.733
Val. Loss: 2.225 | Val. PPL: 9.250
Epoch: 04 | Time: 0m 14s
Train Loss: 2.096 | Train PPL: 8.131
Val. Loss: 1.980 | Val. PPL: 7.246
Epoch: 05 | Time: 0m 13s
Train Loss: 1.801 | Train PPL: 6.055
Val. Loss: 1.829 | Val. PPL: 6.229
Epoch: 06 | Time: 0m 14s
Train Loss: 1.588 | Train PPL: 4.896
Val. Loss: 1.743 | Val. PPL: 5.717
Epoch: 07 | Time: 0m 13s
Train Loss: 1.427 | Train PPL: 4.166
Val. Loss: 1.700 | Val. PPL: 5.476
Epoch: 08 | Time: 0m 13s
Train Loss: 1.295 | Train PPL: 3.650
Val. Loss: 1.679 | Val. PPL: 5.358
Epoch: 09 | Time: 0m 13s
Train Loss: 1.184 | Train PPL: 3.268
Val. Loss: 1.677 | Val. PPL: 5.349
Epoch: 10 | Time: 0m 13s
Train Loss: 1.093 | Train PPL: 2.984
Val. Loss: 1.677 | Val. PPL: 5.351
# load the weights
model.load_state_dict(torch.load('transformer-model.pt'))
# calculate the loss on the test set
test_loss = evaluate(model, test_iter, criterion)
print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}')
结果如下:
Test Loss: 1.692 | Test PPL: 5.430
虽然损失大幅减少,但没有迹象表明该模型在从德语翻译成英语方面有多成功。这可以通过两种方法来评估。第一种方法是向模型提供一个句子,并在推理过程中预览其翻译效果。第二种方法是通过另一种指标来计算其准确性,比如翻译任务的标准指标 BLEU。
15
将德语翻译成英语之模型推理
def translate_sentence(sentence, model, device, max_length = 50):
"""
Translate a German sentence to its English equivalent.
Args:
sentence: German sentence to be translated to English; list or str
model: Transformer model used for translation
device: device to perform translation on
max_length: maximum token length for translation
Returns:
src: return the tokenized input
trg_input: return the input to the decoder before the final output
trg_output: return the final translation, shifted right
attn_probs: return the attention scores for the decoder heads
masked_attn_probs: return the masked attention scores for the decoder heads
"""
model.eval()
# tokenize and index the provided string
if isinstance(sentence, str):
src = ['<bos>'] + [token.text.lower() for token in spacy_de(sentence)] + ['<eos>']
else:
src = ['<bos>'] + sentence + ['<eos>']
# convert to integers
src_indexes = [vocab_src[token] for token in src]
# convert list to tensor
src_tensor = torch.tensor(src_indexes).int().unsqueeze(0).to(device)
# set <bos> token for target generation
trg_indexes = [vocab_trg.get_stoi()['<bos>']]
# generate new tokens
for i in range(max_length):
# convert the list to a tensor
trg_tensor = torch.tensor(trg_indexes).int().unsqueeze(0).to(device)
# generate the next token
with torch.no_grad():
# generate the logits
logits = model.forward(src_tensor, trg_tensor)
# select the newly predicted token
pred_token = logits.argmax(2)[:,-1].item()
# if <eos> token or max length, stop generating
if pred_token == vocab_trg.get_stoi()['<eos>'] or i == (max_length-1):
# decoder input
trg_input = vocab_trg.lookup_tokens(trg_indexes)
# decoder output
trg_output = vocab_trg.lookup_tokens(logits.argmax(2).squeeze(0).tolist())
return src, trg_input, trg_output, model.decoder.attn_probs, model.decoder.masked_attn_probs
# else, continue generating
else:
# add the token
trg_indexes.append(pred_token)
# 'a woman with a large purse is walking by a gate'
src = ['eine', 'frau', 'mit', 'einer', 'großen', 'geldbörse', 'geht', 'an', 'einem', 'tor', 'vorbei', '.']
src, trg_input, trg_output, attn_probs, masked_attn_probs = translate_sentence(src, model, device)
print(f'source = {src}')
print(f'target input = {trg_input}')
print(f'target output = {trg_output}')
source = ['<bos>', 'eine', 'frau', 'mit', 'einer', 'großen', 'geldbörse', 'geht', 'an', 'einem', 'tor', 'vorbei', '.', '<eos>']
target input = ['<bos>', 'a', 'woman', 'with', 'a', 'large', 'purse', 'walking', 'past', 'a', 'gate', '.']
target output = ['a', 'woman', 'with', 'a', 'large', 'purse', 'walking', 'past', 'a', 'gate', '.', '<eos>']
display_attention(src, trg_input, attn_probs)
display_attention(trg_input, trg_input, masked_attn_probs)
16
将德语翻译成英语之模型评价
< 10:几乎无用
10-19: 难以理解
20-29:可以理解,但有明显语法错误
30-39:可以理解到良好
40-49: 高质量
50-59: 高质量、充分、流畅
60:优于人类质量
要计算 BLEU 分数,需要生成模型的预测值及其预期值。这可以通过下面的函数来完成,该函数使用了 translate_sentence 函数。
def compute_metrics(model, iterator):
"""
Generate predictions for the provided iterator.
Args:
model: Transformer model to be trained
iterator: data to be evaluated
Returns:
predictions: list of predictions, which are tokenized strings
labels: list of expected output, which are tokenized strings
"""
# set the model to evaluation mode
model.eval()
predictions = []
labels = []
# evaluate without updating gradients
with torch.no_grad():
# loop through each batch in the iterator
for i, batch in enumerate(iterator):
# set the source and target batches
src, trg = batch
# predict the output
src_out, trg_input, trg_output, attn_probs, masked_attn_probs = translate_sentence(vocab_src.lookup_tokens(src.tolist()), model, device)
# prediction | remove <eos> token
predictions.append(trg_output[:-1])
# expected output | add extra dim for calculation
labels.append([vocab_trg.lookup_tokens(trg.tolist())])
# return the average loss for the epoch
return predictions, labels
之前生成的 test_data可以传递给 compute_metrics 函数。然后,可以将预测结果和标签传递给 torchtext.data.metrics 中的 bleu_score,以计算 BLEU 分数。
from torchtext.data.metrics import bleu_score
bleu_score(predictions, labels)
0.3588869571685791
这一输出结果表明翻译效果良好,是本教程可以接受的结果。
本例完成后,《手撕Transformer》系列也就结束了。
Packages
!pip install -q portalocker
# importing required libraries
import math
import copy
import time
import random
import spacy
import numpy as np
import os
# torch packages
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor
import torch.optim as optim
# load and build datasets
import torchtext
from torchtext.data.functional import to_map_style_dataset
from torch.nn.functional import pad
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
import torchtext.datasets as datasets
import portalocker
# visualization packages
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
Loading the Tokenizers
def load_tokenizers():
"""
Load the German and English tokenizers provided by spaCy.
Returns:
spacy_de: German tokenizer
spacy_en: English tokenizer
"""
try:
spacy_de = spacy.load("de_core_news_sm")
except OSError:
os.system("python -m spacy download de_core_news_sm")
spacy_de = spacy.load("de_core_news_sm")
try:
spacy_en = spacy.load("en_core_web_sm")
except OSError:
os.system("python -m spacy download en_core_web_sm")
spacy_en = spacy.load("en_core_web_sm")
print("Loaded English and German tokenizers.")
return spacy_de, spacy_en
Tokenize the Sequences
def tokenize(text: str, tokenizer):
"""
Split a string into its tokens using the provided tokenizer.
Args:
text: string
tokenizer: tokenizer for the language
Returns:
tokenized list of strings
"""
return [tok.text.lower() for tok in tokenizer.tokenizer(text)]
Yield Tokens
def yield_tokens(data_iter, tokenizer, index: int):
"""
Return the tokens for the appropriate language.
Args:
data_iter: text here
tokenizer: tokenizer for the language
index: index of the language in the tuple | (de=0, en=1)
Yields:
sequences based on index
"""
for from_tuple in data_iter:
yield tokenizer(from_tuple[index])
Building the Vocabulary
def build_vocabulary(spacy_de, spacy_en, min_freq: int = 2):
def tokenize_de(text: str):
"""
Call the German tokenizer.
Args:
text: string
min_freq: minimum frequency needed to include a word in the vocabulary
Returns:
tokenized list of strings
"""
return tokenize(text, spacy_de)
def tokenize_en(text: str):
"""
Call the English tokenizer.
Args:
text: string
Returns:
tokenized list of strings
"""
return tokenize(text, spacy_en)
print("Building German Vocabulary...")
# load train, val, and test data pipelines
train, val, test = datasets.Multi30k(language_pair=("de", "en"))
# generate source vocabulary
vocab_src = build_vocab_from_iterator(
yield_tokens(train + val + test, tokenize_de, index=0), # tokens for each German sentence (index 0)
min_freq=min_freq,
specials=["<bos>", "<eos>", "<pad>", "<unk>"],
)
print("Building English Vocabulary...")
# generate target vocabulary
vocab_trg = build_vocab_from_iterator(
yield_tokens(train + val + test, tokenize_en, index=1), # tokens for each English sentence (index 1)
min_freq=2, #
specials=["<bos>", "<eos>", "<pad>", "<unk>"],
)
# set default token for out-of-vocabulary words (OOV)
vocab_src.set_default_index(vocab_src["<unk>"])
vocab_trg.set_default_index(vocab_trg["<unk>"])
return vocab_src, vocab_trg
Load the Vocabulary
def load_vocab(spacy_de, spacy_en, min_freq: int = 2):
"""
Args:
spacy_de: German tokenizer
spacy_en: English tokenizer
min_freq: minimum frequency needed to include a word in the vocabulary
Returns:
vocab_src: German vocabulary
vocab_trg: English vocabulary
"""
if not os.path.exists("vocab.pt"):
# build the German/English vocabulary if it does not exist
vocab_src, vocab_trg = build_vocabulary(spacy_de, spacy_en, min_freq)
# save it to a file
torch.save((vocab_src, vocab_trg), "vocab.pt")
else:
# load the vocab if it exists
vocab_src, vocab_trg = torch.load("vocab.pt")
print("Finished.\nVocabulary sizes:")
print("\tSource:", len(vocab_src))
print("\tTarget:", len(vocab_trg))
return vocab_src, vocab_trg
Indexing Sequences
def data_process(raw_data):
"""
Process raw sentences by tokenizing and converting to integers based on
the vocabulary.
Args:
raw_data: German-English sentence pairs
Returns:
data: tokenized data converted to index based on vocabulary
"""
data = []
# loop through each sentence pair
for (raw_de, raw_en) in raw_data:
# tokenize the sentence and convert each word to an integers
de_tensor_ = torch.tensor([vocab_src[token.text.lower()] for token in spacy_de.tokenizer(raw_de)], dtype=torch.long)
en_tensor_ = torch.tensor([vocab_trg[token.text.lower()] for token in spacy_en.tokenizer(raw_en)], dtype=torch.long)
# append tensor representations
data.append((de_tensor_, en_tensor_))
return data
Generating Batches
def generate_batch(data_batch):
"""
Process indexed-sequences by adding <bos>, <eos>, and <pad> tokens.
Args:
data_batch: German-English indexed-sentence pairs
Returns:
two batches: one for German and one for English
"""
de_batch, en_batch = [], []
# for each sentence
for (de_item, en_item) in data_batch:
# add <bos> and <eos> indices before and after the sentence
de_temp = torch.cat([torch.tensor([BOS_IDX]), de_item, torch.tensor([EOS_IDX])], dim=0).to(device)
en_temp = torch.cat([torch.tensor([BOS_IDX]), en_item, torch.tensor([EOS_IDX])], dim=0).to(device)
# add padding
de_batch.append(pad(de_temp,(0, # dimension to pad
MAX_PADDING - len(de_temp), # amount of padding to add
),value=PAD_IDX,))
# add padding
en_batch.append(pad(en_temp,(0, # dimension to pad
MAX_PADDING - len(en_temp), # amount of padding to add
),
value=PAD_IDX,))
return torch.stack(de_batch), torch.stack(en_batch)
Displaying Attention
def display_attention(sentence: list, translation: list, attention: Tensor,
n_heads: int = 8, n_rows: int = 4, n_cols: int = 2):
"""
Display the attention matrix for each head of a sequence.
Args:
sentence: German sentence to be translated to English; list
translation: English sentence predicted by the model
attention: attention scores for the heads
n_heads: number of heads
n_rows: number of rows
n_cols: number of columns
"""
# ensure the number of rows and columns are equal to the number of heads
assert n_rows * n_cols == n_heads
# figure size
fig = plt.figure(figsize=(15,25))
# visualize each head
for i in range(n_heads):
# create a plot
ax = fig.add_subplot(n_rows, n_cols, i+1)
# select the respective head and make it a numpy array for plotting
_attention = attention.squeeze(0)[i,:,:].cpu().detach().numpy()
# plot the matrix
cax = ax.matshow(_attention, cmap='bone')
# set the size of the labels
ax.tick_params(labelsize=12)
# set the indices for the tick marks
ax.set_xticks(range(len(sentence)))
ax.set_yticks(range(len(translation)))
# if the provided sequences are sentences or indices
if isinstance(sentence[0], str):
ax.set_xticklabels([t.lower() for t in sentence], rotation=45)
ax.set_yticklabels(translation)
elif isinstance(sentence[0], int):
ax.set_xticklabels(sentence)
ax.set_yticklabels(translation)
plt.show()
点击上方小卡片关注我
添加个人微信,进专属粉丝群!