阅读量:0
迁移学习
通常的做法是在非常大的数据集(如ImageNet,其中包含120万张图片和1000个类别)上对卷积神经网络预训练,再在实际任务中使用这个网络或再进行微调。
有两种主要的迁移学习方案:
1. 预训练模型作为固定特征提取器:自己训练或者拿别人的预训练模型,移除最后的全连接层(这层是用来对图片分类的),把这个网络作为固定特征提取器。再为新数据集替换和重新训练分类器。
2. 微调ConvNet:在1的基础上,还继续通过反向传播训练网络权重。这里可以保持某些底层的参数固定,只微调一些更高级别的部分。
微调分成多个场景:
1. 新数据集量级小,和原数据集相似:由于数据集较小,因此再去微调就会有过拟合的风险。这里最好是直接训练一个线性分类器
2. 新数据集量级大,和原数据集相似:数据集较大时没有过拟合风险,因此可以对整个网络微调训练。
3. 新数据量级小,不同于原数据集:小数据集最好只训练线性分类器,而数据集差异很大,最好就从网络顶部训练分类器,因为网络中包含了很多训练数据集的特征。因此这里选择从网络的早期某处激活中训练SVM分类器。
4. 新数据量级大,不同于原数据集:可以对整个网络微调
数据加载
使用分类狼与狗的Canidae数据集,该数据集图像来自ImageNet,每个分类包含120张训练图和30张验证图。
batch_size = 18 # 批量大小 image_size = 224 # 训练图像空间大小 num_epochs = 5 # 训练周期数 lr = 0.001 # 学习率 momentum = 0.9 # 动量 workers = 4 # 并行线程个数
创建训练数据集
def create_dataset_canidae(dataset_path, usage): data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=workers, shuffle=True,) # 数据增强操作 mean = [0.485 * 255, 0.456 * 255, 0.406 * 255] std = [0.229 * 255, 0.224 * 255, 0.225 * 255] scale = 32 if usage == "train": #训练集则随机剪裁、解码、缩放、水平翻转、正则化、改变通道顺序。 trans = [ vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)), vision.RandomHorizontalFlip(prob=0.5), vision.Normalize(mean=mean, std=std), vision.HWC2CHW() ] else: # 解码、重定义大小、中心裁剪、正则化、改变通道顺序 trans = [ vision.Decode(), vision.Resize(image_size + scale), vision.CenterCrop(image_size), vision.Normalize(mean=mean, std=std), vision.HWC2CHW() ] # 数据映射操作 data_set = data_set.map( operations=trans, input_columns='image', num_parallel_workers=workers) # 批量操作 data_set = data_set.batch(batch_size) return data_set dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size() dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size()
模型搭建
构建一个基础的残差块
class ResidualBlockBase(nn.Cell): expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, norm: Optional[nn.Cell] = None, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlockBase, self).__init__() if not norm: self.norm = nn.BatchNorm2d(out_channel) else: self.norm = norm # 创建两个3*3的卷积层 self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.conv2 = nn.Conv2d(in_channel, out_channel, kernel_size=3, weight_init=weight_init) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): identity = x # shortcuts分支(跳跃连接) out = self.conv1(x) # 主分支第一层:3*3卷积层 out = self.norm(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 # 加在一起后再经过ReLU激活函数 out = self.relu(out) return out
构建残差层
class ResidualBlock(nn.Cell): expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlock, self).__init__() self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=1, weight_init=weight_init) self.norm1 = nn.BatchNorm2d(out_channel) self.conv2 = nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.norm2 = nn.BatchNorm2d(out_channel) self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion, kernel_size=1, weight_init=weight_init) self.norm3 = nn.BatchNorm2d(out_channel * self.expansion) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): identity = x # shortscuts分支 out = self.conv1(x) # 主分支第一层:1*1卷积层 out = self.norm1(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm2(out) out = self.relu(out) out = self.conv3(out) # 主分支第三层:1*1卷积层 out = self.norm3(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
构建堆叠的残差块。
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]], channel: int, block_nums: int, stride: int = 1): down_sample = None # shortcuts分支 if stride != 1 or last_out_channel != channel * block.expansion: # 构建 下采样模块:1个卷积层+批正则化层 # 下采样用于减少特征图的空间分辨率,同时增加特征的抽象程度 down_sample = nn.SequentialCell([ nn.Conv2d(last_out_channel, channel * block.expansion, kernel_size=1, stride=stride, weight_init=weight_init), nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init) ]) layers = [] layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample)) in_channel = channel * block.expansion # 堆叠残差网络 for _ in range(1, block_nums): layers.append(block(in_channel, channel)) return nn.SequentialCell(layers)
构建ResNet网络
class ResNet(nn.Cell): def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]], layer_nums: List[int], num_classes: int, input_channel: int) -> None: super(ResNet, self).__init__() self.relu = nn.ReLU() # 第一个卷积层,输入channel为3(彩色图像),输出channel为64 self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init) # 使用批正则化 self.norm = nn.BatchNorm2d(64) # 最大池化层,缩小图片的尺寸 self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same') # 各个残差网络结构块定义, self.layer1 = make_layer(64, block, 64, layer_nums[0]) self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2) self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2) self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2) # 平均池化层 self.avg_pool = nn.AvgPool2d() # flattern层 self.flatten = nn.Flatten() # 全连接层 self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes) def construct(self, x): x = self.conv1(x) x = self.norm(x) x = self.relu(x) x = self.max_pool(x) # 四个残差块 x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) # 收尾:平均池化层、展平、全连接层 x = self.avg_pool(x) x = self.flatten(x) x = self.fc(x) return x
真正开始构建,可以选择使用预训练模型。
def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]], layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str, input_channel: int): model = ResNet(block, layers, num_classes, input_channel) if pretrained: # 加载预训练模型 download(url=model_url, path=pretrianed_ckpt, replace=True) param_dict = load_checkpoint(pretrianed_ckpt) load_param_into_net(model, param_dict) return model def resnet50(num_classes: int = 1000, pretrained: bool = False): resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt" resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt" return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes, pretrained, resnet50_ckpt, 2048)
固定特征训练
这里冻结最后一层意外的所有网络层,使用requires_grad == False
冻结参数
import mindspore as ms import matplotlib.pyplot as plt import os import time # 实例化网络 net_work = resnet50(pretrained=True) # 全连接层输入层的大小 in_channels = net_work.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net_work.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net_work.avg_pool = avg_pool # 冻结除最后一层外的所有参数 for param in net_work.get_parameters(): if param.name not in ["fc.weight", "fc.bias"]: param.requires_grad = False # 定义优化器和损失函数 opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5) loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') def forward_fn(inputs, targets): logits = net_work(inputs) loss = loss_fn(logits, targets) return loss grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters) def train_step(inputs, targets): loss, grads = grad_fn(inputs, targets) opt(grads) return loss # 实例化模型 model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()})
训练和评估
import mindspore as ms import matplotlib.pyplot as plt import os import time # 开始循环训练 print("Start Training Loop ...") best_acc = 0 # 按照定好的epoch数进行迭代 for epoch in range(num_epochs): losses = [] net_work.set_train() # 为每轮训练读入数据 for i, (images, labels) in enumerate(data_loader_train): labels = labels.astype(ms.int32) loss = train_step(images, labels) losses.append(loss) # 每个epoch结束后,验证准确率 acc = model1.eval(dataset_val)['Accuracy'] print("-" * 20) print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % ( epoch+1, num_epochs, sum(losses)/len(losses), acc )) # 保存下来准确度最佳的模型 if acc > best_acc: best_acc = acc if not os.path.exists(best_ckpt_dir): os.mkdir(best_ckpt_dir) ms.save_checkpoint(net_work, best_ckpt_path) print("=" * 80) print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, " f"save the best ckpt file in {best_ckpt_path}", flush=True)
预测
import matplotlib.pyplot as plt import mindspore as ms # 可视化预测结果 def visualize_model(best_ckpt_path, val_ds): net = resnet50() # 全连接层输入层的大小 in_channels = net.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net.avg_pool = avg_pool # 加载模型参数 param_dict = ms.load_checkpoint(best_ckpt_path) ms.load_param_into_net(net, param_dict) model = train.Model(net) # 加载验证集的数据进行验证 data = next(val_ds.create_dict_iterator()) images = data["image"].asnumpy() labels = data["label"].asnumpy() class_name = {0: "dogs", 1: "wolves"} # 预测图像类别 output = model.predict(ms.Tensor(data['image'])) pred = np.argmax(output.asnumpy(), axis=1) # 显示图像及图像的预测值 plt.figure(figsize=(5, 5)) for i in range(4): plt.subplot(2, 2, i + 1) # 若预测正确,显示为蓝色;若预测错误,显示为红色 color = 'blue' if pred[i] == labels[i] else 'red' plt.title('predict:{}'.format(class_name[pred[i]]), color=color) picture_show = np.transpose(images[i], (1, 2, 0)) mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) picture_show = std * picture_show + mean picture_show = np.clip(picture_show, 0, 1) plt.imshow(picture_show) plt.axis('off') plt.show()
最后得到模型的预测结果。
总结
本章依然使用了ResNet50网络模型,完成了狼狗图片分类任务。