复现Resnet论文那些事

文摘   2024-09-28 12:38   广东  

Resnet(残差网络)的论文《Deep Residual Learning for Image Recognition》在深度学习和计算机视觉领域具有重要意义,是何凯明等人在2015提出的模型,获得了CVPR最佳论文奖,在多个比赛中获得成绩,被誉为撑起计算机视觉半边天的文章。

本文介绍如何复现Resnet论文,这是学习复现其他论文的基础。

论文提炼

Resnet网络结构表:

image-20240927160700367

Renset网络两种残差单元结构:

image-20240927162050232

ImageNet数据集准确率:

image-20240927165354088

Cifar10数据集准确率:

image-20240927165444802

代码复现

考虑到自己的电脑配置,代码复现选择的数据集是Cifar10数据集。

论文cifar数据集对应的网络结构表:

image-20240927171224922

代码设置n=5

我们将代码分为以下主要模块:

  • backbone模块
  • 图像增强模块
  • 损失函数模块
  • 数据加载模块
  • 绘制训练曲线模块
  • 优化器策略和学习率策略模块

backbone模块

根据论文给出的表格,构建resnet类。

import torch.nn as nn
import torch

class ResnetCifar(nn.Module):
    def __init__(self,ch1=16,ch2=32,ch3=64,n=5,cls_num=10):  # 论文的cifar网络的单元层数
        super().__init__()
        self.n = n
        self.ch1 = ch1
        self.ch2 = ch2
        self.ch3 = ch3
        self.conv_module1 = nn.Sequential(nn.Conv2d(3,ch1,3,1,1),nn.BatchNorm2d(ch1),nn.ReLU())
        self.conv_module2 = nn.Conv2d(ch1, ch2, 120)
        self.conv_module3 = nn.Conv2d(ch2, ch3, 120)
        self.res_module1 = self._build_res_unit(ch1,ch1,3,1,1)
        self.res_module2 = self._build_res_unit(ch1, ch2, 321)
        self.res_module3 = self._build_res_unit(ch2, ch2, 311)
        self.res_module4 = self._build_res_unit(ch2, ch3, 321)
        self.res_module5 = self._build_res_unit(ch3, ch3, 311)
        self.avgpool_module = nn.AdaptiveAvgPool2d(output_size=(1,1))
        self.linear_module = nn.Linear(ch3,cls_num)

    # 构建resnet单元,第一层需要通道映射
    def _build_res_unit(self,in_ch,out_ch,kernel_size,stride,padding):
        unit1_conv1 = nn.Conv2d(in_channels=in_ch,out_channels=out_ch,kernel_size=kernel_size,stride=stride,padding=padding)
        unit1_batch1 = nn.BatchNorm2d(out_ch)
        unit1_act1  = nn.ReLU()
        unit1_conv2 = nn.Conv2d(in_channels=out_ch, out_channels=out_ch,kernel_size=kernel_size,stride=1,padding=1)
        unit1_batch2 = nn.BatchNorm2d(out_ch)
        unit1_act2 = nn.ReLU()

        unit1 = nn.Sequential(*[unit1_conv1,unit1_batch1,unit1_act1,unit1_conv2,unit1_batch2,unit1_act2])

        return unit1

    # img:tensor(bs,3,32,32)
    def forward(self,img):
        x = self.conv_module1(img)
        # 第一个残差模块
        for i in range(self.n):
            x = x + self.res_module1(x)
        # 第二个残差模块
        for i in range(self.n):
            x = x + self.res_module3(x) if i>0 else self.conv_module2(x) + self.res_module2(x)
        # 第三个残差模块
        for i in range(self.n):
            x = x + self.res_module5(x) if i > 0 else self.conv_module3(x) + self.res_module4(x)
        # avg maxpool,x(bs,ch_out,1,1)
        x = self.avgpool_module(x)
        # x(bs,ch_out)
        x = torch.flatten(x,start_dim=1)
        # linear,x(bs,cls_num)
        x = self.linear_module(x)
        # 归一化
        x = nn.Softmax(dim=1)(x)

        return x

图像增强模块

代码选择imgaug库进行图像增强。

代码关于Cifar数据集的图像增强:

image-20240927172924953

图像增强包括:图像每条边padding4个像素,随机剪切像素,水平翻转。

图像增强模块:

import imgaug.augmenters as iaa
class TrainImgProcess:
    def __init__(self):
        #
        self.pad_crop_aug = iaa.Sequential([
            iaa.Sequential([
            iaa.Pad(px=4,keep_size=False),
            iaa.CenterCropToFixedSize(height=32,width=32)]),
            iaa.Fliplr(0.1)
            ])
    # 去均值
    def perpixel_mean_subtracted(self,img):
        return  img - np.mean(img)

损失函数模块

损失函数模块包括分类损失以及准确率计算,分类损失采用交叉熵损失。

class Loss:
    # input:tensor(N,M),M表示维度;target:tensor(N)
    def __init__(self,method='CrossEntropyLoss'):
        # CrossEntropyLoss将输出通过softmax转概率输出,
        self.module = nn.CrossEntropyLoss()

    # softmax->真实标签对应的预测位置的值
    def CrossEntropyLoss(self,input,target):
        v1 = self.module(input,target)
        return v1
    # pred_label(N,M),true_label(N,M)
    # 计算准确率
    def precision(self,pred_label,true_label):
        soft_pred_label = nn.Softmax(dim=1)(pred_label)
        pred_id = soft_pred_label.argmax(dim=1)
        # true_id = true_label.argmax(dim=1)
        # 相同个数
        num = len(torch.where(pred_id == true_label)[0])
        precision = num/len(true_label)
        return precision

数据加载模块

数据加载模块的基类采用torch.utils.data import Dataset,DataLoader

Dataset模块:

classnames = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
class MyDataSet(Dataset):
    def __init__(self,dir1,state='train'): # dir1:当前目录
        self.img_paths = []
        self.aug_module = TrainImgProcess()
        self.state = state
        for root1,dirs1,files in os.walk(dir1):
            for file1 in files:
                path1 = os.path.join(root1,file1)
                self.img_paths.append(path1)  # 获取当前目录下所有图像路径

    # 获取图像路径对应的图像array,解析文件路径获取one-hot类别
    def __getitem__(self, index):
        img_path = self.img_paths[index]
        img0 = cv2.imread(img_path)
        img0 = img0/255.0
        # 解析label,one-hot
        filename = Path(img_path).name
        clsname = filename.split('_',1)[0]
        idx = classnames.index(clsname)
        label = np.array(idx)
        # l1 = len(classnames)
        # label = np.zeros(l1,dtype=np.int8)
        # label[idx] = 1
        if self.state=='train':
            # 随机增强,均值相减
            img0 = self.aug_module.perpixel_mean_subtracted(img0)
            # pad和随机剪切,并以0.1的概率进行水平翻转
            img0 = self.aug_module.pad_crop_aug.augment_image(img0)
        # 转tensor
        img0 = np.ascontiguousarray(img0.transpose(2,0,1))
        img0 = torch.from_numpy(np.ascontiguousarray(img0))

        label = torch.from_numpy(label)

        return img0,label

DataLoader模块:

class MyDataLoaders(DataLoader):
    def __init__(self,batch_size,dataset,collate_fn,num_workers=16):
        super(MyDataLoaders,self).__init__(dataset=dataset,collate_fn=collate_fn,batch_size=batch_size,shuffle=True,num_workers=num_workers)

绘制训练曲线模块

采用tensorboardX模块绘制训练曲线模型。

from tensorboardX import SummaryWriter

# 绘制训练损失函数和精度曲线图
writer.add_scalar('train_loss',train_loss,j)
writer.add_scalar('train_precision', train_precision, j)

优化器策略和学习率策略

根据论文的描写,采用SGD优化策略和分阶段调整学习率。

# 优化器设置
optimizer = optim.SGD(model.parameters(),weight_decay=0.0001,momentum=0.9,lr=0.1)
# 学习率设置
lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer,milestones=[32000,48000],gamma=0.1)

train.py代码

import argparse
import torch
import torch.nn as nn
import os
import logging
from backbone import ResnetCifar
import torch.optim as optim
from copy import deepcopy
from torch.utils.data import random_split
from dataloader import MyDataLoaders,MyDataSet
from loss import Loss
from val import val
from print_model_params import get_model_params

from tensorboardX import SummaryWriter

writer = SummaryWriter('log')
def train(args):
    bs,gpu_id,img_sz,train_dir,val_dir,test_dir = args.bs,args.gpu_id,args.img_sz,args.train_dir,args.val_dir,args.test_dir
    # 迭代次数,平均评价间隔,若在stop_epoch损失没有减少,则停止训练
    epoch,eval_interval,stop_epoch = args.epoch,args.eval_interval,args.stop_epoch
    # 保存权重目录
    weight_dir = 'weights'
    os.makedirs(weight_dir,exist_ok=True)
    best_pt_path = os.path.join(weight_dir,'best.pt')
    last_pt_path = os.path.join(weight_dir,'last.pt')
    # load backbone
    model = ResnetCifar()
    # 是否gpu训练
    gpu_bool = True if gpu_id!='cpu' else False
    device = 'cpu'
    # gpu是否并行
    if gpu_bool:
        device = 'cuda:0'
        device_ids = gpu_id.rsplit(',')
        device_ids = [ int(f) for f in device_ids]
        # 每个GPU都运行一个模型的副本,并处理一部分输入数据,最后所有GPU上的结果被收集并合并,以产生与单个GPU上运行模型相同的输出
        model = nn.DataParallel(model,device_ids=device_ids)
        model.to(device_ids[0])
    # 优化器设置
    optimizer = optim.SGD(model.parameters(),weight_decay=0.0001,momentum=0.9,lr=0.1)
    # 学习率设置
    lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer,milestones=[32000,48000],gamma=0.1)
    # 损失函数
    loss_module = Loss()
    # 数据加载
    train_dataset = MyDataSet(train_dir,state='train')
    val_dataset = MyDataSet(val_dir, state='val')
    # train_size = int(len(dataset0)*0.9)
    # val_size = len(dataset0)-train_size
    # train_dataset,val_dataset = random_split(dataset0,[train_size,val_size])
    test_dataset = MyDataSet(test_dir,state='test')
    # train_dataloader = MyDataLoaders(bs,train_dataset,train_dataset.collate_fn)
    train_dataloader = MyDataLoaders(bs, train_dataset, train_dataset.collate_fn)
    val_dataloader = MyDataLoaders(bs,val_dataset,val_dataset.collate_fn)
    test_dataloader = MyDataLoaders(bs,test_dataset,test_dataset.collate_fn)
    train_sample_iter = len(train_dataloader)  # 训练数据迭代次数
    # 迭代训练
    val_loss_std = 1e5
    for j in range(epoch):
        train_loss = 0
        train_precision = 0
        model.train()
        # 迭代数据
        for i,(train_samples,train_labels) in enumerate(train_dataloader):
            train_samples = train_samples.to(device).float()
            train_labels = train_labels.to(device).long()
            outputs = model(train_samples)
            # 计算损失
            loss_v = loss_module.CrossEntropyLoss(outputs,train_labels)
            train_loss += loss_v
            # 计算准确率
            precision = loss_module.precision(outputs,train_labels)
            train_precision += precision
            # 梯度清零
            optimizer.zero_grad()
            # 反向传播
            loss_v.backward()
            # 更新参数
            optimizer.step()
        # 打印模型权重
        conv_weight_mean, conv_weight_std, bs_weight_mean, bs_weight_std, linear_weight_mean, linear_weight_std = get_model_params(model)
        # ema = deepcopy(model).eval()
        # 打印训练结果
        train_loss /= train_sample_iter
        train_precision /= train_sample_iter
        # print('train_loss=',train_loss)
        # print('train_precision=', train_precision)
        train_loss = round(train_loss.item(),3)
        train_precision = round(train_precision, 3)
        print(f'epoch/epochs:{j}/{epoch},train_loss:{train_loss},train_precision:{train_precision}')
        # 绘制训练损失函数和精度曲线图
        writer.add_scalar('train_loss',train_loss,j)
        writer.add_scalar('train_precision', train_precision, j)
        # 最后一个epoch或者每隔interval评价
        # if j==epoch-1 or j%eval_interval==0:
        # 验证集准确率
        val_loss, val_precision = val(val_dataloader, model, loss_module)
        val_loss = round(val_loss.item(), 3)
        val_precision = round(val_precision, 3)
        # 保存最优模型
        if val_loss < val_loss_std: # 更新最优模型
            val_loss_std = val_loss
            dict1 = {'epoch':epoch,'val_loss':val_loss,'val_precision':val_precision,'params':model.state_dict()}
            torch.save(dict1,best_pt_path)
        # 保存最新模型
        dict1 = {'epoch': epoch, 'last_val_loss': val_loss, 'last_val_precision': val_precision, 'params': model.state_dict()}
        torch.save(dict1, last_pt_path)
        # 测试集准确率
        test_loss,test_precision = val(test_dataloader,model,loss_module)
        test_loss = round(test_loss.item(), 3)
        test_precision = round(test_precision, 3)
        #
        print(f'epoch/epochs:{j}/{epoch},val_loss:{val_loss},val_precision:{val_precision}')
        print(f'epoch/epochs:{j}/{epoch},test_loss:{test_loss},test_precision:{test_precision}')
        # 绘制验证集准确率
        writer.add_scalar('val_loss', val_loss, j)
        writer.add_scalar('val_precision', val_precision, j)
        # 绘制测试集准确率
        writer.add_scalar('test_loss', test_loss, j)
        writer.add_scalar('test_precision', test_precision, j)
        # 学习率策略更新
        lr_scheduler.step()

def parse_params():
    parser = argparse.ArgumentParser(description='renet reproduction')
    parser.add_argument('--gpu_id',type=str,default='0')
    parser.add_argument('--bs', type=int, default=1920)  # 1920
    parser.add_argument('--img_sz',type=int,default=32)
    parser.add_argument('--train_dir',type=str,default='public_data/cifar10/train')
    parser.add_argument('--val_dir', type=str, default='public_data/cifar10/val')
    parser.add_argument('--test_dir', type=str, default='public_data/cifar10/test_batch')
    parser.add_argument('--epoch',type=int,default=50000)
    parser.add_argument('--stop_epoch',type=int,default=20)
    parser.add_argument('--eval_interval',type=int,default=10)

    args = parser.parse_args()

    return args

if __name__=='__main__':
    args = parse_params()
    train(args)

val.py

"""
验证评价损失和准确率
"""

import torch
import torch.nn as nn
def val(dataloader,model,loss_module):
    device = list(model.parameters())[0].device
    loss_v = 0
    # 切换到评估模式,以关闭Dropout和BatchNorm层的训练模式
    # 训练时用的是一个batchsize的均值和方差,并更新滑动平均法计算全局的running_mean和running_var
    # 测试时用的是全局的running_mean和running_var
    # model.eval()
    pred_label = []
    val_label = []
    with torch.no_grad(): # 禁止梯度计算,节省内存和计算资源
        for i,(val_samples,val_labels) in enumerate(dataloader):
            val_samples = val_samples.to(device).float()
            val_labels = val_labels.to(device).long()
            output = model(val_samples)
            temp_loss = loss_module.CrossEntropyLoss(output,val_labels)
            loss_v += temp_loss
            # 计算准确个数
            softmax_output = nn.Softmax(dim=1)(output)
            output_id1 = softmax_output.argmax(dim=1)
            # val_id1 = val_labels.argmax(dim=1)
            pred_label.append(output_id1)
            val_label.append(val_labels)
        # 拼接
        pred_label = torch.cat(pred_label,dim=0)
        val_label = torch.cat(val_label,dim=0)
        # 相同个数
        num = len(torch.where(pred_label==val_label)[0])
        precision = num/len(pred_label)
        loss_v /= len(dataloader)
    # 返回损失值和精确率
    return loss_v,precision

if __name__=='__main__':
    pass

问题讨论

很遗憾目前按照论文的优化策略并没有复现论文的准确率,训练准确率较高,但是验证率很低。

  • 模型训练阶段设置model.train(),验证和测试阶段设置了model.val(),训练准确率符合论文的目标,但是验证准确率和测试准确率非常低
  • 模型训练阶段设置model.train(),验证和测试阶段取消了model.val(),训练准确率符合论文的目标,但是验证准确率和测试准确率虽有提高,但准确率收敛在60%附近,和论文相差甚远。

我在想是不是多阶段的学习率策略设置的过大,论文在迭代次数达到32k和48k才进行衰减,是不是把迭代次数改小点会好些?

不知道大家是怎么理解的了?欢迎评论交流

机器学习实战
多名大厂算法工程师共同运营,主要专注机器学习算法、深度学习算法、计算机视觉等领域技术干货分享,一天进步一点点
 最新文章