01
引言
本文是手撕Transformer系列的第六篇。它从头开始介绍编码器(Encoder),编码器是Transformer架构的前半部分,包括前面的所有层。
02
背景介绍
编码器是前几篇文章中提到的子层的封装层。它采用位置嵌入,并将其通过多头注意力机制和前馈神经网络FFN。在每个子层之后,它都会执行残差加法和层归一化操作。
在 Transformer 结构中设置残差连接的原因更多的是技术上的,而不是架构设计上的。
残差连接主要有助于缓解梯度消失问题。在反向传播过程中,信号会乘以激活函数的导数。就 ReLU 而言,这意味着在大约一半的情况下,梯度为零。如果没有残差连接,大部分训练信号都会在反向传播过程中丢失。残差连接能减少影响,因为求和与导数是线性关系,所以每个残差块也能得到不受梯度消失影响的信号。残差连接的求和操作在计算图中形成了一条路径,梯度不会在这条路径上消失。
03
Transformer中的Encoder
请注意,这里使用的是 nn.LayerNorm,而不是上一篇文章中的从零开始的实现。两者都可以接受,但为了简单起见,我们使用了 PyTorch 的实现。
class EncoderLayer(nn.Module):
def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):
"""
Args:
d_model: dimension of embeddings
n_heads: number of heads
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
# multi-head attention sublayer
self.attention = MultiHeadAttention(d_model, n_heads, dropout)
# layer norm for multi-head attention
self.attn_layer_norm = nn.LayerNorm(d_model)
# position-wise feed-forward network
self.positionwise_ffn = PositionwiseFeedForward(d_model, d_ffn, dropout)
# layer norm for position-wise ffn
self.ffn_layer_norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, src: Tensor, src_mask: Tensor):
"""
Args:
src: positionally embedded sequences (batch_size, seq_length, d_model)
src_mask: mask for the sequences (batch_size, 1, 1, seq_length)
Returns:
src: sequences after self-attention (batch_size, seq_length, d_model)
"""
# pass embeddings through multi-head attention
_src, attn_probs = self.attention(src, src, src, src_mask)
# residual add and norm
src = self.attn_layer_norm(src + self.dropout(_src))
# position-wise feed-forward network
_src = self.positionwise_ffn(src)
# residual add and norm
src = self.ffn_layer_norm(src + self.dropout(_src))
return src, attn_probs
04
Encoder Stack
为了利用多头注意力子层的优势,输入Token在传递到解码器之前,每次都要经过一叠编码器层。这在文章开头的图片中标记为 Nx,下图显示了这些堆叠编码器如何将其输出传递给解码器层。
前向传递后,可通过 encoder.attn_probs 访问注意力得分:
class Encoder(nn.Module):
def __init__(self, d_model: int, n_layers: int,
n_heads: int, d_ffn: int, dropout: float = 0.1):
"""
Args:
d_model: dimension of embeddings
n_layers: number of encoder layers
n_heads: number of heads
d_ffn: dimension of feed-forward network
dropout: probability of dropout occurring
"""
super().__init__()
# create n_layers encoders
self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ffn, dropout)
for layer in range(n_layers)])
self.dropout = nn.Dropout(dropout)
def forward(self, src: Tensor, src_mask: Tensor):
"""
Args:
src: embedded sequences (batch_size, seq_length, d_model)
src_mask: mask for the sequences (batch_size, 1, 1, seq_length)
Returns:
src: sequences after self-attention (batch_size, seq_length, d_model)
"""
# pass the sequences through each encoder
for layer in self.layers:
src, attn_probs = layer(src, src_mask)
self.attn_probs = attn_probs
return src
05
前向传播
下面的示例显示了使用三个等长序列和无掩码操作的前向传递。这些序列经过嵌入、位置编码,然后通过多头注意力机制和 FFN 及其残差加法和层归一化操作。子类的定义取决于前面文章中的函数。
torch.set_printoptions(precision=2, sci_mode=False)
# convert the sequences to integers
sequences = ["I wonder what will come next!",
"This is a basic example paragraph.",
"Hello, what is a basic split?"]
# tokenize the sequences
tokenized_sequences = [tokenize(seq) for seq in sequences]
# index the sequences
indexed_sequences = [[stoi[word] for word in seq] for seq in tokenized_sequences]
# convert the sequences to a tensor
tensor_sequences = torch.tensor(indexed_sequences).long()
# parameters
vocab_size = len(stoi)
d_model = 8
d_ffn = d_model*4 # 32
n_heads = 4
n_layers = 4
dropout = 0.1
# create the embeddings
lut = Embeddings(vocab_size, d_model) # look-up table (lut)
# create the positional encodings
pe = PositionalEncoding(d_model=d_model, dropout=0.1, max_length=10)
# embed the sequence
embeddings = lut(tensor_sequences)
# positionally encode the sequences
X = pe(embeddings)
# initialize encoder
encoder = Encoder(d_model, n_layers, n_heads,
d_ffn, dropout)
# pass through encoder
encoder(src=X, src_mask=None)
# sequence 0
display_attention(tokenized_sequences[0], tokenized_sequences[0], encoder.attn_probs[0], n_heads, n_rows=2, n_cols=2)
06
什么是mask?
在上面的示例中,src_mask 被设置为None。如第三篇文章所述,可选掩码可通过多头注意力层传递。对于编码器来说,这种掩码通常是根据序列的填充来创建的。本文中使用的三个序列长度均为 6。为确保批次中的所有序列长度相同,会对序列添加填充。当出现这种情况时,模型无需关注填充标记。为每个序列创建一个掩码向量,以反映应关注的值。
该掩码的形状为(batch_size, 1, 1, seq_length)。这个掩码会在序列的每个头的表示中广播。
例如,下面三个序列的长度各不相同:
”What will come next?” → [21, 22, 5, 15]
”This is a basic paragraph.” → [20, 13, 0, 3, 17]
”A basic split will come next!” → [0, 3, 18, 22, 5, 15]
要在张量中使用,它们必须具有相同的长度,因此必须添加填充。这可以使用 torch.nn.functional 中的一个简单函数 pad 来实现。它允许将每个输入填充为相同长度。样例代码如下:
from torch.nn.functional import pad
def pad_seq(seq: Tensor, max_length: int = 10, pad_idx: int = 0):
pad_to_add = max_length - len(seq) # amount of padding to add
return pad(seq,(0, pad_to_add), value=pad_idx,)
sequences = ['What will come next?',
'This is a basic paragraph.',
'A basic split will come next!']
# tokenize the sequences
tokenized_sequences = [tokenize(seq) for seq in sequences]
# index the sequences
indexed_sequences = [[stoi[word] for word in seq] for seq in tokenized_sequences]
max_length = 8
pad_idx = len(stoi) # 24
padded_seqs = []
for seq in seqs:
# pad each sequence
padded_seqs.append(pad_seq(torch.Tensor(seq), max_length, pad_idx))
# create a tensor from the padded sequences
tensor_sequences = torch.stack(padded_seqs).long()
print(tensor_sequences)
结果如下:
torch.set_printoptions(precision=2, sci_mode=False)
# parameters
vocab_size = len(stoi) + 1 # add one for the padding token
d_model = 8
d_ffn = d_model*4 # 32
n_heads = 4
n_layers = 4
dropout = 0.1
# create the embeddings
lut = Embeddings(vocab_size, d_model) # look-up table (lut)
# create the positional encodings
pe = PositionalEncoding(d_model=d_model, dropout=0.1, max_length=10)
# embed the sequence
embeddings = lut(tensor_sequences)
# positionally encode the sequences
X = pe(embeddings)
# initialize encoder
encoder = Encoder(d_model, n_layers, n_heads,
d_ffn, dropout)
# pass through encoder
encoder(src=X, src_mask=None)
# probabilities for sequence 0
encoder.attn_probs[0]
display_attention(tensor_sequences[0].int().tolist(), tensor_sequences[0].int().tolist(), encoder.attn_probs[0], n_heads, n_rows=2, n_cols=2)
07
The Source Mask
通过比较填充序列 tensor_sequences 中的标记与填充索引,可以创建源掩码。只有填充标记不会被计算在内。在将其传入编码器时,每个填充标记的值都需要用一个极大的负值来代替,如-∞或-1e10。由于该值在指数化后是一个不重要的值(e-∞ = 0),因此不会对softmax的输出产生重大影响。这意味着在概率分布中只会考虑适当的标记,而填充标记的值为 0。
tensor([[21, 22, 5, 15, 24, 24, 24, 24],
[20, 13, 0, 3, 17, 24, 24, 24],
[ 0, 3, 18, 22, 5, 15, 24, 24]])
# pad_idx is 24 in this example
tensor_sequences != pad_idx
src_mask = (tensor_sequences != pad_idx).unsqueeze(1).unsqueeze(2)
print(src_mask)
torch.set_printoptions(precision=2, sci_mode=False)
# parameters
vocab_size = len(stoi) + 1 # add one for the padding token
d_model = 8
d_ffn = d_model*4 # 32
n_heads = 4
n_layers = 4
dropout = 0.1
# create the embeddings
lut = Embeddings(vocab_size, d_model) # look-up table (lut)
# create the positional encodings
pe = PositionalEncoding(d_model=d_model, dropout=0.1, max_length=10)
# embed the sequence
embeddings = lut(tensor_sequences)
# positionally encode the sequences
X = pe(embeddings)
# initialize encoder
encoder = Encoder(d_model, n_layers, n_heads,
d_ffn, dropout)
# pass through encoder
encoder(src=X, src_mask=src_mask)
# probabilities for sequence 0
encoder.attn_probs[0]
# sequence 0
display_attention(tensor_sequences[0].int().tolist(), tensor_sequences[0].int().tolist(), encoder.attn_probs[0], n_heads, n_rows=2, n_cols=2)
08
组合起来
def make_src_mask(src: Tensor, pad_idx: int = 0):
"""
Args:
src: raw sequences with padding (batch_size, seq_length)
Returns:
src_mask: mask for each sequence (batch_size, 1, 1, seq_length)
"""
# assign 1 to tokens that need attended to and 0 to padding tokens, then add 2 dimensions
src_mask = (src != pad_idx).unsqueeze(1).unsqueeze(2)
return src_mask
def pad_seq(seq: Tensor, max_length: int = 10, pad_idx: int = 0):
"""
Args:
seq: raw sequence (batch_size, seq_length)
max_length: maximum length of a sequence
pad_idx: index for padding tokens
Returns:
padded seq: padded sequence (batch_size, max_length)
"""
pad_to_add = max_length - len(seq) # amount of padding to add
return pad(seq,(0, pad_to_add), value=pad_idx,)
sequences = ['What will come next?',
'This is a basic paragraph.',
'A basic split will come next!']
# tokenize the sequences
tokenized_sequences = [tokenize(seq) for seq in sequences]
# index the sequences
indexed_sequences = [[stoi[word] for word in seq] for seq in tokenized_sequences]
max_length = 8
pad_idx = len(stoi)
padded_seqs = []
for seq in indexed_sequences:
# pad each sequence
padded_seqs.append(pad_seq(torch.Tensor(seq), max_length, pad_idx))
# create a tensor from the padded sequences
tensor_sequences = torch.stack(padded_seqs).long()
# create the source masks for the sequences
src_mask = make_src_mask(tensor_sequences, pad_idx)
torch.set_printoptions(precision=2, sci_mode=False)
# parameters
vocab_size = len(stoi) + 1 # add one for the padding token
d_model = 8
d_ffn = d_model*4 # 32
n_heads = 4
n_layers = 4
dropout = 0.1
# create the embeddings
lut = Embeddings(vocab_size, d_model) # look-up table (lut)
# create the positional encodings
pe = PositionalEncoding(d_model=d_model, dropout=0.1, max_length=10)
# embed the sequence
embeddings = lut(tensor_sequences)
# positionally encode the sequences
X = pe(embeddings)
# initialize encoder
encoder = Encoder(d_model, n_layers, n_heads,
d_ffn, dropout)
# pass through encoder
encoder(src=X, src_mask=src_mask)
# preview each sequence
for i in range(0,3):
display_attention(tensor_sequences[i].int().tolist(), tensor_sequences[i].int().tolist(), encoder.attn_probs[i], n_heads, n_rows=2, n_cols=2)
请不要忘记点赞和关注,以获取更多信息!
点击上方小卡片关注我
添加个人微信,进专属粉丝群!