Resnet(残差网络)的论文《Deep Residual Learning for Image Recognition》在深度学习和计算机视觉领域具有重要意义,是何凯明等人在2015提出的模型,获得了CVPR最佳论文奖,在多个比赛中获得成绩,被誉为撑起计算机视觉半边天的文章。
本文介绍如何复现Resnet论文,这是学习复现其他论文的基础。
论文提炼
Resnet网络结构表:
Renset网络两种残差单元结构:
ImageNet数据集准确率:
Cifar10数据集准确率:
代码复现
考虑到自己的电脑配置,代码复现选择的数据集是Cifar10数据集。
论文cifar数据集对应的网络结构表:
代码设置n=5
。
我们将代码分为以下主要模块:
backbone模块 图像增强模块 损失函数模块 数据加载模块 绘制训练曲线模块 优化器策略和学习率策略模块
backbone模块
根据论文给出的表格,构建resnet类。
import torch.nn as nn
import torch
class ResnetCifar(nn.Module):
def __init__(self,ch1=16,ch2=32,ch3=64,n=5,cls_num=10): # 论文的cifar网络的单元层数
super().__init__()
self.n = n
self.ch1 = ch1
self.ch2 = ch2
self.ch3 = ch3
self.conv_module1 = nn.Sequential(nn.Conv2d(3,ch1,3,1,1),nn.BatchNorm2d(ch1),nn.ReLU())
self.conv_module2 = nn.Conv2d(ch1, ch2, 1, 2, 0)
self.conv_module3 = nn.Conv2d(ch2, ch3, 1, 2, 0)
self.res_module1 = self._build_res_unit(ch1,ch1,3,1,1)
self.res_module2 = self._build_res_unit(ch1, ch2, 3, 2, 1)
self.res_module3 = self._build_res_unit(ch2, ch2, 3, 1, 1)
self.res_module4 = self._build_res_unit(ch2, ch3, 3, 2, 1)
self.res_module5 = self._build_res_unit(ch3, ch3, 3, 1, 1)
self.avgpool_module = nn.AdaptiveAvgPool2d(output_size=(1,1))
self.linear_module = nn.Linear(ch3,cls_num)
# 构建resnet单元,第一层需要通道映射
def _build_res_unit(self,in_ch,out_ch,kernel_size,stride,padding):
unit1_conv1 = nn.Conv2d(in_channels=in_ch,out_channels=out_ch,kernel_size=kernel_size,stride=stride,padding=padding)
unit1_batch1 = nn.BatchNorm2d(out_ch)
unit1_act1 = nn.ReLU()
unit1_conv2 = nn.Conv2d(in_channels=out_ch, out_channels=out_ch,kernel_size=kernel_size,stride=1,padding=1)
unit1_batch2 = nn.BatchNorm2d(out_ch)
unit1_act2 = nn.ReLU()
unit1 = nn.Sequential(*[unit1_conv1,unit1_batch1,unit1_act1,unit1_conv2,unit1_batch2,unit1_act2])
return unit1
# img:tensor(bs,3,32,32)
def forward(self,img):
x = self.conv_module1(img)
# 第一个残差模块
for i in range(self.n):
x = x + self.res_module1(x)
# 第二个残差模块
for i in range(self.n):
x = x + self.res_module3(x) if i>0 else self.conv_module2(x) + self.res_module2(x)
# 第三个残差模块
for i in range(self.n):
x = x + self.res_module5(x) if i > 0 else self.conv_module3(x) + self.res_module4(x)
# avg maxpool,x(bs,ch_out,1,1)
x = self.avgpool_module(x)
# x(bs,ch_out)
x = torch.flatten(x,start_dim=1)
# linear,x(bs,cls_num)
x = self.linear_module(x)
# 归一化
x = nn.Softmax(dim=1)(x)
return x
图像增强模块
代码选择imgaug
库进行图像增强。
代码关于Cifar数据集的图像增强:
图像增强包括:图像每条边padding4个像素,随机剪切像素,水平翻转。
图像增强模块:
import imgaug.augmenters as iaa
class TrainImgProcess:
def __init__(self):
#
self.pad_crop_aug = iaa.Sequential([
iaa.Sequential([
iaa.Pad(px=4,keep_size=False),
iaa.CenterCropToFixedSize(height=32,width=32)]),
iaa.Fliplr(0.1)
])
# 去均值
def perpixel_mean_subtracted(self,img):
return img - np.mean(img)
损失函数模块
损失函数模块包括分类损失以及准确率计算,分类损失采用交叉熵损失。
class Loss:
# input:tensor(N,M),M表示维度;target:tensor(N)
def __init__(self,method='CrossEntropyLoss'):
# CrossEntropyLoss将输出通过softmax转概率输出,
self.module = nn.CrossEntropyLoss()
# softmax->真实标签对应的预测位置的值
def CrossEntropyLoss(self,input,target):
v1 = self.module(input,target)
return v1
# pred_label(N,M),true_label(N,M)
# 计算准确率
def precision(self,pred_label,true_label):
soft_pred_label = nn.Softmax(dim=1)(pred_label)
pred_id = soft_pred_label.argmax(dim=1)
# true_id = true_label.argmax(dim=1)
# 相同个数
num = len(torch.where(pred_id == true_label)[0])
precision = num/len(true_label)
return precision
数据加载模块
数据加载模块的基类采用torch.utils.data import Dataset,DataLoader
Dataset模块:
classnames = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
class MyDataSet(Dataset):
def __init__(self,dir1,state='train'): # dir1:当前目录
self.img_paths = []
self.aug_module = TrainImgProcess()
self.state = state
for root1,dirs1,files in os.walk(dir1):
for file1 in files:
path1 = os.path.join(root1,file1)
self.img_paths.append(path1) # 获取当前目录下所有图像路径
# 获取图像路径对应的图像array,解析文件路径获取one-hot类别
def __getitem__(self, index):
img_path = self.img_paths[index]
img0 = cv2.imread(img_path)
img0 = img0/255.0
# 解析label,one-hot
filename = Path(img_path).name
clsname = filename.split('_',1)[0]
idx = classnames.index(clsname)
label = np.array(idx)
# l1 = len(classnames)
# label = np.zeros(l1,dtype=np.int8)
# label[idx] = 1
if self.state=='train':
# 随机增强,均值相减
img0 = self.aug_module.perpixel_mean_subtracted(img0)
# pad和随机剪切,并以0.1的概率进行水平翻转
img0 = self.aug_module.pad_crop_aug.augment_image(img0)
# 转tensor
img0 = np.ascontiguousarray(img0.transpose(2,0,1))
img0 = torch.from_numpy(np.ascontiguousarray(img0))
label = torch.from_numpy(label)
return img0,label
DataLoader模块:
class MyDataLoaders(DataLoader):
def __init__(self,batch_size,dataset,collate_fn,num_workers=16):
super(MyDataLoaders,self).__init__(dataset=dataset,collate_fn=collate_fn,batch_size=batch_size,shuffle=True,num_workers=num_workers)
绘制训练曲线模块
采用tensorboardX模块绘制训练曲线模型。
from tensorboardX import SummaryWriter
# 绘制训练损失函数和精度曲线图
writer.add_scalar('train_loss',train_loss,j)
writer.add_scalar('train_precision', train_precision, j)
优化器策略和学习率策略
根据论文的描写,采用SGD优化策略和分阶段调整学习率。
# 优化器设置
optimizer = optim.SGD(model.parameters(),weight_decay=0.0001,momentum=0.9,lr=0.1)
# 学习率设置
lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer,milestones=[32000,48000],gamma=0.1)
train.py代码
import argparse
import torch
import torch.nn as nn
import os
import logging
from backbone import ResnetCifar
import torch.optim as optim
from copy import deepcopy
from torch.utils.data import random_split
from dataloader import MyDataLoaders,MyDataSet
from loss import Loss
from val import val
from print_model_params import get_model_params
from tensorboardX import SummaryWriter
writer = SummaryWriter('log')
def train(args):
bs,gpu_id,img_sz,train_dir,val_dir,test_dir = args.bs,args.gpu_id,args.img_sz,args.train_dir,args.val_dir,args.test_dir
# 迭代次数,平均评价间隔,若在stop_epoch损失没有减少,则停止训练
epoch,eval_interval,stop_epoch = args.epoch,args.eval_interval,args.stop_epoch
# 保存权重目录
weight_dir = 'weights'
os.makedirs(weight_dir,exist_ok=True)
best_pt_path = os.path.join(weight_dir,'best.pt')
last_pt_path = os.path.join(weight_dir,'last.pt')
# load backbone
model = ResnetCifar()
# 是否gpu训练
gpu_bool = True if gpu_id!='cpu' else False
device = 'cpu'
# gpu是否并行
if gpu_bool:
device = 'cuda:0'
device_ids = gpu_id.rsplit(',')
device_ids = [ int(f) for f in device_ids]
# 每个GPU都运行一个模型的副本,并处理一部分输入数据,最后所有GPU上的结果被收集并合并,以产生与单个GPU上运行模型相同的输出
model = nn.DataParallel(model,device_ids=device_ids)
model.to(device_ids[0])
# 优化器设置
optimizer = optim.SGD(model.parameters(),weight_decay=0.0001,momentum=0.9,lr=0.1)
# 学习率设置
lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer,milestones=[32000,48000],gamma=0.1)
# 损失函数
loss_module = Loss()
# 数据加载
train_dataset = MyDataSet(train_dir,state='train')
val_dataset = MyDataSet(val_dir, state='val')
# train_size = int(len(dataset0)*0.9)
# val_size = len(dataset0)-train_size
# train_dataset,val_dataset = random_split(dataset0,[train_size,val_size])
test_dataset = MyDataSet(test_dir,state='test')
# train_dataloader = MyDataLoaders(bs,train_dataset,train_dataset.collate_fn)
train_dataloader = MyDataLoaders(bs, train_dataset, train_dataset.collate_fn)
val_dataloader = MyDataLoaders(bs,val_dataset,val_dataset.collate_fn)
test_dataloader = MyDataLoaders(bs,test_dataset,test_dataset.collate_fn)
train_sample_iter = len(train_dataloader) # 训练数据迭代次数
# 迭代训练
val_loss_std = 1e5
for j in range(epoch):
train_loss = 0
train_precision = 0
model.train()
# 迭代数据
for i,(train_samples,train_labels) in enumerate(train_dataloader):
train_samples = train_samples.to(device).float()
train_labels = train_labels.to(device).long()
outputs = model(train_samples)
# 计算损失
loss_v = loss_module.CrossEntropyLoss(outputs,train_labels)
train_loss += loss_v
# 计算准确率
precision = loss_module.precision(outputs,train_labels)
train_precision += precision
# 梯度清零
optimizer.zero_grad()
# 反向传播
loss_v.backward()
# 更新参数
optimizer.step()
# 打印模型权重
conv_weight_mean, conv_weight_std, bs_weight_mean, bs_weight_std, linear_weight_mean, linear_weight_std = get_model_params(model)
# ema = deepcopy(model).eval()
# 打印训练结果
train_loss /= train_sample_iter
train_precision /= train_sample_iter
# print('train_loss=',train_loss)
# print('train_precision=', train_precision)
train_loss = round(train_loss.item(),3)
train_precision = round(train_precision, 3)
print(f'epoch/epochs:{j}/{epoch},train_loss:{train_loss},train_precision:{train_precision}')
# 绘制训练损失函数和精度曲线图
writer.add_scalar('train_loss',train_loss,j)
writer.add_scalar('train_precision', train_precision, j)
# 最后一个epoch或者每隔interval评价
# if j==epoch-1 or j%eval_interval==0:
# 验证集准确率
val_loss, val_precision = val(val_dataloader, model, loss_module)
val_loss = round(val_loss.item(), 3)
val_precision = round(val_precision, 3)
# 保存最优模型
if val_loss < val_loss_std: # 更新最优模型
val_loss_std = val_loss
dict1 = {'epoch':epoch,'val_loss':val_loss,'val_precision':val_precision,'params':model.state_dict()}
torch.save(dict1,best_pt_path)
# 保存最新模型
dict1 = {'epoch': epoch, 'last_val_loss': val_loss, 'last_val_precision': val_precision, 'params': model.state_dict()}
torch.save(dict1, last_pt_path)
# 测试集准确率
test_loss,test_precision = val(test_dataloader,model,loss_module)
test_loss = round(test_loss.item(), 3)
test_precision = round(test_precision, 3)
#
print(f'epoch/epochs:{j}/{epoch},val_loss:{val_loss},val_precision:{val_precision}')
print(f'epoch/epochs:{j}/{epoch},test_loss:{test_loss},test_precision:{test_precision}')
# 绘制验证集准确率
writer.add_scalar('val_loss', val_loss, j)
writer.add_scalar('val_precision', val_precision, j)
# 绘制测试集准确率
writer.add_scalar('test_loss', test_loss, j)
writer.add_scalar('test_precision', test_precision, j)
# 学习率策略更新
lr_scheduler.step()
def parse_params():
parser = argparse.ArgumentParser(description='renet reproduction')
parser.add_argument('--gpu_id',type=str,default='0')
parser.add_argument('--bs', type=int, default=1920) # 1920
parser.add_argument('--img_sz',type=int,default=32)
parser.add_argument('--train_dir',type=str,default='public_data/cifar10/train')
parser.add_argument('--val_dir', type=str, default='public_data/cifar10/val')
parser.add_argument('--test_dir', type=str, default='public_data/cifar10/test_batch')
parser.add_argument('--epoch',type=int,default=50000)
parser.add_argument('--stop_epoch',type=int,default=20)
parser.add_argument('--eval_interval',type=int,default=10)
args = parser.parse_args()
return args
if __name__=='__main__':
args = parse_params()
train(args)
val.py
"""
验证评价损失和准确率
"""
import torch
import torch.nn as nn
def val(dataloader,model,loss_module):
device = list(model.parameters())[0].device
loss_v = 0
# 切换到评估模式,以关闭Dropout和BatchNorm层的训练模式
# 训练时用的是一个batchsize的均值和方差,并更新滑动平均法计算全局的running_mean和running_var
# 测试时用的是全局的running_mean和running_var
# model.eval()
pred_label = []
val_label = []
with torch.no_grad(): # 禁止梯度计算,节省内存和计算资源
for i,(val_samples,val_labels) in enumerate(dataloader):
val_samples = val_samples.to(device).float()
val_labels = val_labels.to(device).long()
output = model(val_samples)
temp_loss = loss_module.CrossEntropyLoss(output,val_labels)
loss_v += temp_loss
# 计算准确个数
softmax_output = nn.Softmax(dim=1)(output)
output_id1 = softmax_output.argmax(dim=1)
# val_id1 = val_labels.argmax(dim=1)
pred_label.append(output_id1)
val_label.append(val_labels)
# 拼接
pred_label = torch.cat(pred_label,dim=0)
val_label = torch.cat(val_label,dim=0)
# 相同个数
num = len(torch.where(pred_label==val_label)[0])
precision = num/len(pred_label)
loss_v /= len(dataloader)
# 返回损失值和精确率
return loss_v,precision
if __name__=='__main__':
pass
问题讨论
很遗憾目前按照论文的优化策略并没有复现论文的准确率,训练准确率较高,但是验证率很低。
模型训练阶段设置 model.train()
,验证和测试阶段设置了model.val()
,训练准确率符合论文的目标,但是验证准确率和测试准确率非常低模型训练阶段设置 model.train()
,验证和测试阶段取消了model.val()
,训练准确率符合论文的目标,但是验证准确率和测试准确率虽有提高,但准确率收敛在60%附近,和论文相差甚远。
我在想是不是多阶段的学习率策略设置的过大,论文在迭代次数达到32k和48k才进行衰减,是不是把迭代次数改小点会好些?
不知道大家是怎么理解的了?欢迎评论交流