昇思25天学习打卡营第15天|ResNet50迁移学习

avatar
作者
筋斗云
阅读量:0

迁移学习

通常的做法是在非常大的数据集(如ImageNet,其中包含120万张图片和1000个类别)上对卷积神经网络预训练,再在实际任务中使用这个网络或再进行微调。

有两种主要的迁移学习方案:
1. 预训练模型作为固定特征提取器:自己训练或者拿别人的预训练模型,移除最后的全连接层(这层是用来对图片分类的),把这个网络作为固定特征提取器。再为新数据集替换和重新训练分类器
2. 微调ConvNet:在1的基础上,还继续通过反向传播训练网络权重。这里可以保持某些底层的参数固定,只微调一些更高级别的部分。

微调分成多个场景:
1. 新数据集量级小,和原数据集相似:由于数据集较小,因此再去微调就会有过拟合的风险。这里最好是直接训练一个线性分类器
2. 新数据集量级大,和原数据集相似:数据集较大时没有过拟合风险,因此可以对整个网络微调训练。
3. 新数据量级小不同于原数据集:小数据集最好只训练线性分类器,而数据集差异很大,最好就从网络顶部训练分类器,因为网络中包含了很多训练数据集的特征。因此这里选择从网络的早期某处激活中训练SVM分类器。
4. 新数据量级大不同于原数据集:可以对整个网络微调

数据加载

使用分类狼与狗的Canidae数据集,该数据集图像来自ImageNet,每个分类包含120张训练图和30张验证图。

batch_size = 18                             # 批量大小 image_size = 224                            # 训练图像空间大小 num_epochs = 5                             # 训练周期数 lr = 0.001                                  # 学习率 momentum = 0.9                              # 动量 workers = 4                                 # 并行线程个数 

创建训练数据集

def create_dataset_canidae(dataset_path, usage):     data_set = ds.ImageFolderDataset(dataset_path,                                      num_parallel_workers=workers,                                      shuffle=True,)      # 数据增强操作     mean = [0.485 * 255, 0.456 * 255, 0.406 * 255]     std = [0.229 * 255, 0.224 * 255, 0.225 * 255]     scale = 32      if usage == "train":         #训练集则随机剪裁、解码、缩放、水平翻转、正则化、改变通道顺序。         trans = [             vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)),             vision.RandomHorizontalFlip(prob=0.5),             vision.Normalize(mean=mean, std=std),             vision.HWC2CHW()         ]     else:         # 解码、重定义大小、中心裁剪、正则化、改变通道顺序         trans = [             vision.Decode(),             vision.Resize(image_size + scale),             vision.CenterCrop(image_size),             vision.Normalize(mean=mean, std=std),             vision.HWC2CHW()         ]       # 数据映射操作     data_set = data_set.map(         operations=trans,         input_columns='image',         num_parallel_workers=workers)       # 批量操作     data_set = data_set.batch(batch_size)      return data_set   dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size()  dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size() 

模型搭建

构建一个基础的残差块

class ResidualBlockBase(nn.Cell):     expansion: int = 1  # 最后一个卷积核数量与第一个卷积核数量相等      def __init__(self, in_channel: int, out_channel: int,                  stride: int = 1, norm: Optional[nn.Cell] = None,                  down_sample: Optional[nn.Cell] = None) -> None:         super(ResidualBlockBase, self).__init__()         if not norm:             self.norm = nn.BatchNorm2d(out_channel)         else:             self.norm = norm 		# 创建两个3*3的卷积层         self.conv1 = nn.Conv2d(in_channel, out_channel,                                kernel_size=3, stride=stride,                                weight_init=weight_init)         self.conv2 = nn.Conv2d(in_channel, out_channel,                                kernel_size=3, weight_init=weight_init)         self.relu = nn.ReLU()         self.down_sample = down_sample      def construct(self, x):         identity = x  # shortcuts分支(跳跃连接)          out = self.conv1(x)  # 主分支第一层:3*3卷积层         out = self.norm(out)         out = self.relu(out)         out = self.conv2(out)  # 主分支第二层:3*3卷积层         out = self.norm(out)          if self.down_sample is not None:             identity = self.down_sample(x)         out += identity  # 输出为主分支与shortcuts之和 		# 加在一起后再经过ReLU激活函数         out = self.relu(out)          return out 

构建残差层

class ResidualBlock(nn.Cell):     expansion = 4  # 最后一个卷积核的数量是第一个卷积核数量的4倍      def __init__(self, in_channel: int, out_channel: int,                  stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:         super(ResidualBlock, self).__init__()          self.conv1 = nn.Conv2d(in_channel, out_channel,                                kernel_size=1, weight_init=weight_init)         self.norm1 = nn.BatchNorm2d(out_channel)         self.conv2 = nn.Conv2d(out_channel, out_channel,                                kernel_size=3, stride=stride,                                weight_init=weight_init)         self.norm2 = nn.BatchNorm2d(out_channel)         self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,                                kernel_size=1, weight_init=weight_init)         self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)          self.relu = nn.ReLU()         self.down_sample = down_sample      def construct(self, x):          identity = x  # shortscuts分支          out = self.conv1(x)  # 主分支第一层:1*1卷积层         out = self.norm1(out)         out = self.relu(out)         out = self.conv2(out)  # 主分支第二层:3*3卷积层         out = self.norm2(out)         out = self.relu(out)         out = self.conv3(out)  # 主分支第三层:1*1卷积层         out = self.norm3(out)          if self.down_sample is not None:             identity = self.down_sample(x)          out += identity  # 输出为主分支与shortcuts之和         out = self.relu(out)          return out 

构建堆叠的残差块。

def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],                channel: int, block_nums: int, stride: int = 1):     down_sample = None  # shortcuts分支      if stride != 1 or last_out_channel != channel * block.expansion: 	# 构建 下采样模块:1个卷积层+批正则化层 	# 下采样用于减少特征图的空间分辨率,同时增加特征的抽象程度         down_sample = nn.SequentialCell([             nn.Conv2d(last_out_channel, channel * block.expansion,                       kernel_size=1, stride=stride, weight_init=weight_init),             nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)         ])      layers = []     layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))      in_channel = channel * block.expansion          # 堆叠残差网络     for _ in range(1, block_nums):         layers.append(block(in_channel, channel))      return nn.SequentialCell(layers) 

构建ResNet网络

class ResNet(nn.Cell):     def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],                  layer_nums: List[int], num_classes: int, input_channel: int) -> None:         super(ResNet, self).__init__()          self.relu = nn.ReLU()         # 第一个卷积层,输入channel为3(彩色图像),输出channel为64         self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init) 	# 使用批正则化         self.norm = nn.BatchNorm2d(64)         # 最大池化层,缩小图片的尺寸         self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')         # 各个残差网络结构块定义,         self.layer1 = make_layer(64, block, 64, layer_nums[0])         self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)         self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)         self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)         # 平均池化层         self.avg_pool = nn.AvgPool2d()         # flattern层         self.flatten = nn.Flatten()         # 全连接层         self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)      def construct(self, x):          x = self.conv1(x)         x = self.norm(x)         x = self.relu(x)         x = self.max_pool(x) 		# 四个残差块         x = self.layer1(x)         x = self.layer2(x)         x = self.layer3(x)         x = self.layer4(x) 		# 收尾:平均池化层、展平、全连接层         x = self.avg_pool(x)         x = self.flatten(x)         x = self.fc(x)          return x 

真正开始构建,可以选择使用预训练模型。

def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],             layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str,             input_channel: int):     model = ResNet(block, layers, num_classes, input_channel)      if pretrained:         # 加载预训练模型         download(url=model_url, path=pretrianed_ckpt, replace=True)         param_dict = load_checkpoint(pretrianed_ckpt)         load_param_into_net(model, param_dict)      return model   def resnet50(num_classes: int = 1000, pretrained: bool = False):       resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"     resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"     return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,                    pretrained, resnet50_ckpt, 2048) 

固定特征训练

这里冻结最后一层意外的所有网络层,使用requires_grad == False 冻结参数

import mindspore as ms import matplotlib.pyplot as plt import os import time  # 实例化网络 net_work = resnet50(pretrained=True)  # 全连接层输入层的大小 in_channels = net_work.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net_work.fc = head  # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net_work.avg_pool = avg_pool  # 冻结除最后一层外的所有参数 for param in net_work.get_parameters():     if param.name not in ["fc.weight", "fc.bias"]:         param.requires_grad = False  # 定义优化器和损失函数 opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5) loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')   def forward_fn(inputs, targets):     logits = net_work(inputs)     loss = loss_fn(logits, targets)      return loss  grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)  def train_step(inputs, targets):     loss, grads = grad_fn(inputs, targets)     opt(grads)     return loss  # 实例化模型 model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()}) 

训练和评估

import mindspore as ms import matplotlib.pyplot as plt import os import time # 开始循环训练 print("Start Training Loop ...")  best_acc = 0  # 按照定好的epoch数进行迭代 for epoch in range(num_epochs):     losses = []     net_work.set_train()      # 为每轮训练读入数据     for i, (images, labels) in enumerate(data_loader_train):         labels = labels.astype(ms.int32)         loss = train_step(images, labels)         losses.append(loss)      # 每个epoch结束后,验证准确率     acc = model1.eval(dataset_val)['Accuracy']          print("-" * 20)     print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (         epoch+1, num_epochs, sum(losses)/len(losses), acc     ))        # 保存下来准确度最佳的模型     if acc > best_acc:         best_acc = acc         if not os.path.exists(best_ckpt_dir):             os.mkdir(best_ckpt_dir)         ms.save_checkpoint(net_work, best_ckpt_path)  print("=" * 80) print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "       f"save the best ckpt file in {best_ckpt_path}", flush=True) 

预测

import matplotlib.pyplot as plt import mindspore as ms  # 可视化预测结果 def visualize_model(best_ckpt_path, val_ds):     net = resnet50()     # 全连接层输入层的大小     in_channels = net.fc.in_channels     # 输出通道数大小为狼狗分类数2     head = nn.Dense(in_channels, 2)     # 重置全连接层     net.fc = head     # 平均池化层kernel size为7     avg_pool = nn.AvgPool2d(kernel_size=7)     # 重置平均池化层     net.avg_pool = avg_pool     # 加载模型参数     param_dict = ms.load_checkpoint(best_ckpt_path)     ms.load_param_into_net(net, param_dict)     model = train.Model(net)     # 加载验证集的数据进行验证     data = next(val_ds.create_dict_iterator())     images = data["image"].asnumpy()     labels = data["label"].asnumpy()     class_name = {0: "dogs", 1: "wolves"}     # 预测图像类别     output = model.predict(ms.Tensor(data['image']))     pred = np.argmax(output.asnumpy(), axis=1)      # 显示图像及图像的预测值     plt.figure(figsize=(5, 5))     for i in range(4):         plt.subplot(2, 2, i + 1)         # 若预测正确,显示为蓝色;若预测错误,显示为红色         color = 'blue' if pred[i] == labels[i] else 'red'         plt.title('predict:{}'.format(class_name[pred[i]]), color=color)         picture_show = np.transpose(images[i], (1, 2, 0))         mean = np.array([0.485, 0.456, 0.406])         std = np.array([0.229, 0.224, 0.225])         picture_show = std * picture_show + mean         picture_show = np.clip(picture_show, 0, 1)         plt.imshow(picture_show)         plt.axis('off')      plt.show() 

最后得到模型的预测结果。
在这里插入图片描述

总结

本章依然使用了ResNet50网络模型,完成了狼狗图片分类任务。

打卡凭证

在这里插入图片描述

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!