手撕torch系列总结

文摘   科技   2024-02-29 09:30   上海  

现在面试算法工程师,不一定会让我们手撕力扣,而是手撕一些深度学习常见的操作!

以下是一些在面试中可能会让写的torch操作,以下就有几个小律以前面试的时候,被要求过手撕,分享一下这些操作,一起学习!

1.IOU2D

IOU交并比的定义

IOU坐标图
import numpy as np

def compute_iou(boxA, boxB):
# box:(x1,y1,x2,y2), x1,y1为左上角。原点为左上角,x朝右为正,y朝下为正。
# 计算相交框的坐标
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])

# 计算交区域,并区域,及IOU。要和0比较大小,如果是负数就说明压根不相交
interArea = max(0, xB - xA) * max(0, yB - yA)

boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

iou = interArea / (boxAArea + boxBArea - interArea)

# return the intersection over union value
return iou

boxA = [1,1,3,3]
boxB = [2,2,4,4]
IOU = ComputeIOU(boxA, boxB)

2.NMS

NMS是一种用于去除冗余的边界框的方法,它首先将所有的边界框按照置信度排序,然后从中选择置信度最高的边界框,并移除所有与它有高IoU的边界框。这个过程会重复进行,直到没有剩余的边界框。

def nms(boxes, scores, threshold):
# boxes: 边界框列表,每个框是一个格式为 [x1, y1, x2, y2] 的列表
# scores: 每个边界框的得分列表
# threshold: NMS的IoU阈值

# 按得分升序排列边界框
sorted_indices = np.argsort(scores)
boxes = [boxes[i] for i in sorted_indices]
scores = [scores[i] for i in sorted_indices]

keep = [] # 保留的边界框的索引列表

while boxes:
# 取得分最高的边界框
current_box = boxes.pop()
current_score = scores.pop()

keep.append(sorted_indices[-1])
sorted_indices = sorted_indices[:-1]
discard_indices = [] # 需要丢弃的边界框的索引列表

for i, box in enumerate(boxes):
# 计算与当前边界框的IoU
iou = compute_iou(current_box, box)

# 如果IoU超过阈值,标记该边界框为需要丢弃
if iou > threshold:
discard_indices.append(i)

# 移除标记为需要丢弃的边界框。从后往前删,不然for循环会出错
for i in sorted(discard_indices, reverse=True):
boxes.pop(i)
scores.pop(i)
sorted_indices = np.delete(sorted_indices, i) # np与list的方法不同

return keep

3.conv2d

# 官方的Conv2d调用方法
torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True)

输入和输出的特征图尺寸大小关系:
def conv2d(inputs, kernels, bias, stride, padding):
"""
正向卷积操作
inputs: 输入数据,形状为 (C, H, W)
kernels: 卷积核,形状为 (F, C, HH, WW),C是图片输入层数,F是图片输出层数
bias: 偏置,形状为 (F,)
stride: 步长
padding: 填充
"""
# 获取输入数据和卷积核的形状
C, H, W = inputs.shape
F, _, HH, WW = kernels.shape

# 对输入数据进行填充。在第一个轴(通常是通道轴)上不进行填充,在第二个轴和第三个轴(通常是高度和宽度轴)上在开始和结束位置都填充padding个值。
inputs_pad = np.pad(inputs, ((0, 0), (padding, padding), (padding, padding)))

# 初始化输出数据,卷积后的图像size大小
H_out = 1 + (H + 2 * padding - HH) // stride
W_out = 1 + (W + 2 * padding - WW) // stride
outputs = np.zeros((F, H_out, W_out))

# 进行卷积操作
for i in range(H_out):
for j in range(W_out): # 找到out图像对于的原始图像区域,然后对图像进行sum和bias
inputs_slice = inputs_pad[:, i*stride:i*stride+HH, j*stride:j*stride+WW]
outputs[:, i, j] = np.sum(inputs_slice * kernels, axis=(1, 2, 3)) + bias
# axis=(1, 2, 3)表示在通道、高度和宽度这三个轴上进行求和。
return outputs

4.pooling

# 官方的pooling调用方法
nn.MaxPool2d(kernel_size=2, stride=(2, 1), padding=(0, 1))


# 手撕以max pooling为例,如果是平均pooling,只需要把np.max改成np.mean

def max_pooling(inputs, pool_size, stride):
"""
最大池化操作
inputs: 输入数据,形状为 (C, H, W)
pool_size: 池化核的大小
stride: 步长
"""
C, H, W = inputs.shape

# 初始化输出数据
H_out = (H - pool_size) // stride + 1
W_out = (W - pool_size) // stride + 1
outputs = np.zeros((C, H_out, W_out))

# 进行最大池化操作
for i in range(H_out):
for j in range(W_out):
inputs_slice = inputs[:, i*stride:i*stride+pool_size, j*stride:j*stride+pool_size]
outputs[:, i, j] = np.max(inputs_slice, axis=(1, 2))

return outputs

5.BN

BN的公式
def batch_norm(inputs, gamma, beta, eps):
"""
批量归一化操作。N, C, H, W样本数可以看做N*H*W,CNN中,BN通常在每个通道上独立进行
inputs: 输入数据,形状为 (N, C, H, W)
gamma: 缩放因子,形状为 (C,)
beta: 偏移因子,形状为 (C,)
eps: 防止除0的小数值
"""
N, C, H, W = inputs.shape

# 在N、H和W的维度上计算每个通道的均值和方差
mean = np.mean(inputs, axis=(0, 2, 3), keepdims=True) # (1,C,1,1)
var = np.var(inputs, axis=(0, 2, 3), keepdims=True)

# 计算归一化的输入。eps防止除0
inputs_norm = (inputs - mean) / np.sqrt(var + eps)

# 缩放和偏移
outputs = gamma * inputs_norm + beta

return outputs

6.resnet

import torch
import torch.nn as nn
import torch.nn.functional as F

class ResNetBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
# 下面这行代码的作用是调用父类nn.Module的构造函数,这是在创建自定义的神经网络模块时的必须步骤。
super(ResNetBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)

if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential( # 当通道数变化时,用1*1卷积来配准通道数。
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
else:
self.shortcut = nn.Sequential()

def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out)) # 这里不加relu
out += self.shortcut(x)
out = F.relu(out)
return out

7.全连接FCN

class FCN(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(FCN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, num_classes)

def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out

8.多头注意力(建议看下面的注意力)

from torch import nn

class MultiHeadAttention(nn.Module):
"""多头自注意力机制,来自'Attention Is All You Need'论文"""

def __init__(self, config):
super().__init__()
self.config = config
self.hidden_size = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = self.hidden_size // self.num_heads
self.max_position_embeddings = config.max_position_embeddings

# 定义线性变换层,用于生成Q、K、V
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
self.k_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
self.v_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
self.output_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
output_attentions: bool = False,
use_cache: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
batch_size, seq_len, _ = hidden_states.size()

# 通过线性变换生成Q、K、V
query_states = self.q_proj(hidden_states).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
key_states = self.k_proj(hidden_states).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
value_states = self.v_proj(hidden_states).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

# 如果存在past_key_value,则将其与当前的K、V拼接
if past_key_value is not None:
key_states = torch.cat([past_key_value[0], key_states], dim=2)
value_states = torch.cat([past_key_value[1], value_states], dim=2)

# 如果使用缓存,则保存当前的K、V
past_key_value = (key_states, value_states) if use_cache else None

# 计算注意力权重
attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)

# 如果存在attention_mask,则将其加到注意力权重上
if attention_mask is not None:
attn_weights = attn_weights + attention_mask
dtype_min = torch.tensor(
torch.finfo(attn_weights.dtype).min, device=attn_weights.device, dtype=attn_weights.dtype
)
attn_weights = torch.maximum(attn_weights, dtype_min)

# 计算softmax得到最终的注意力权重
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
attn_output = torch.matmul(attn_weights, value_states)

# 调整维度并通过输出线性变换
attn_output = attn_output.transpose(1, 2)
attn_output = attn_output.reshape(batch_size, seq_len, self.hidden_size)
attn_output = self.output_proj(attn_output)

# 如果不需要输出注意力权重,则将其设为None
if not output_attentions:
attn_weights = None

return attn_output, attn_weights, past_key_value

注意力简单版本

多头注意力(Multi-head Attention)是一种在自注意力(Self-Attention)机制中引入的技术,它的目的是让模型能够关注输入的不同部分(即从不同的“角度”或“视点”关注输入)。这种方法的基本思想是将模型的注意力分散到多个不同的表示子空间(或称为“头”)中,而不是仅在一个单一的表示空间中进行。

在具体实现中,我们首先将输入的每个向量(通常是词嵌入向量或某一层的输出向量)分割成多个较小的向量,每个向量对应一个头。然后,对于每个头,我们都独立地计算其自注意力权重,并用这些权重来更新其对应的向量。最后,我们将所有头的输出向量连接起来,形成最终的输出。

当我们将输入向量拆分为多个较小的向量时,每个较小的向量都包含原始向量的一部分信息。然后,每个头独立地对其对应的小向量进行自注意力计算。这意味着每个头都在其自己的子空间中工作,可以专注于捕获输入的某种特定方面的信息。最后,我们将所有头的输出向量连接起来,形成最终的输出。这个输出向量包含了所有头捕获的信息,因此并没有丢失原始的特征。

import torch
import torch.nn.functional as F
from torch import nn
import math
# 在计算自注意力的时候,首先计算每个元素的 Query 和所有其他元素的 Key 之间的相似度(通常用点积来计算),
# 然后对相似度进行 softmax 操作得到权重,最后将这个权重应用到每个元素的 Value 上,再将所有元素的加权 Value 求和
# ,就得到了当前元素的输出。
class SimpleAttention(nn.Module):
def __init__(self, dim, num_heads):
super().__init__()
self.num_heads = num_heads
self.dim = dim # dim是特征长度
self.head_dim = dim // num_heads

def forward(self, query, key, value, mask=None):
B, _, N = query.size() # N是序列长度

# 将输入的Q、K、V拆分为多头。num_heads多头、head_dim每个头的特征维度、N是序列长度
query = query.view(B, self.num_heads, self.head_dim, N)
key = key.view(B, self.num_heads, self.head_dim, N)
value = value.view(B, self.num_heads, self.head_dim, N)


# 计算注意力分数。计算query和key的点积(dot product)。点积是相似性度量方法,
# 我们使用点积来衡量query和每个key之间的相似性,相似性越高,对应的value在最终的注意力输出中的权重就越大。
# sqrt这部分是在对点积的结果进行缩放(scaling),用于防止点积的结果过大,导致softmax函数的梯度过小,从而影响模型的训练。
attn_scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_dim)

# 如果提供了mask,将其应用到注意力分数上
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))

# 计算注意力权重
attn_weights = F.softmax(attn_scores, dim=-1)

# 计算加权的V
output = torch.matmul(attn_weights, value)

# 合并多头
output = output.contiguous().view(B, -1, N)

return output


未完待续...

不摸鱼的小律
互联网大厂算法工程师一枚,分享各种技术、职场热点和感悟。不做每日打卡的路人。
 最新文章