PyTorch 深度学习实践-循环神经网络(高级篇)

avatar
作者
筋斗云
阅读量:4

视频指路
参考博客笔记
参考笔记二

文章目录


上课笔记

个人能力有限,重看几遍吧,第一遍基本看不懂

在这里插入图片描述

名字的每个字母都是一个特征x1,x2,x3…,一个名字是一个序列

rnn用GRU

在这里插入图片描述

用ASCII表作为词典,长度为128,每一个值对应一个独热向量,比如77对应128维向量中第77个位置为1其他位置为0,但是对于embed层只要告诉它哪个是1就行,这些序列长短不一,需要padding到统一长度

在这里插入图片描述

把国家的名字变成索引标签,做成下图所示的词典

在这里插入图片描述
数据集构建

class NameDataset(Dataset):     def __init__(self, is_train=True):         # 文件读取         filename = './dataset/names_train.csv.gz' if is_train else './dataset/names_test.csv.gz'         with gzip.open(filename, 'rt') as f:  # rt表示以只读模式打开文件,并将文件内容解析为文本形式             reader = csv.reader(f)             rows =list(reader)  # 每个元素由一个名字和国家组成         # 提取属性         self.names = [row[0] for row in rows]         self.len = len(self.names)         self.countries = [row[1] for row in rows]         # 编码处理         self.country_list = list(sorted(set(self.countries)))  # 列表,按字母表顺序排序,去重后有18个国家名         self.country_dict = self.get_countries_dict()  # 字典,国家名对应序号标签         self.country_num = len(self.country_list)      def __getitem__(self, item):         # 索引获取         return self.names[item], self.country_dict[self.countries[item]]  # 根据国家去字典查找索引      def __len__(self):         # 获取个数         return self.len      def get_countries_dict(self):         # 根据国家名对应序号         country_dict = dict()         for idx, country_name in enumerate(self.country_list):             country_dict[country_name] = idx         return country_dict      def idx2country(self, index):         # 根据索引返回国家名字         return self.country_list[index]      def get_countries_num(self):         # 返回国家名个数(分类的总个数)         return self.country_num  

双向RNN,从左往右走一遍,把得到的值和逆向计算得到的hN拼到一起,比如最后一个 [ h 0 b , h n f ] [h^b_0, h^f_n] [h0b,hnf]

		self.n_directions = 2 if bidirectional else 1          self.gru = torch.nn.GRU(hidden_size, hidden_size, num_layers=n_layers, bidirectional=bidirectional)#bidirectional双向神经网络 

在这里插入图片描述

h i d d e n = [ h N f , h N b ] hidden = [h{^f_N},h{^b_N}] hidden=[hNf,hNb]

# 进行打包(不考虑0元素,提高运行速度)首先需要将嵌入数据按长度排好 gru_input = pack_padded_sequence(embedding, seq_lengths) 

pack_padded_sequence:这是 PyTorch 提供的一个函数,用于将填充后的序列打包。其主要目的是跳过填充值,并且在 RNN 中只处理实际的序列数据。它会将填充后的嵌入和实际序列长度作为输入,并返回一个打包后的序列,便于 RNN 处理。可以只把非零序列提取出来放到一块,也就是把为0的填充量都丢掉,这样将来fru就可以处理长短不一的输入序列

在这里插入图片描述

首先要根据序列长度进行排序,然后再经过嵌入层

在这里插入图片描述

如下图所示:这样用gru的时候效率就会更高,因为可以方便去掉好多padding的数据

在这里插入图片描述

双向RNN要拼接起来

if self.n_directions == 2:             hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1)  # hidden[-1]的形状是(1,256,100),hidden[-2]的形状是(1,256,100),拼接后的形状是(1,256,200) 

总代码

import csv import time import matplotlib.pyplot as plt import numpy as np import math import gzip  # 用于读取压缩文件 import torch import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.nn.utils.rnn import pack_padded_sequence  # 1.超参数设置 HIDDEN_SIZE = 100 BATCH_SIZE = 256 N_LAYER = 2  # RNN的层数 N_EPOCHS = 100 N_CHARS = 128  # ASCII码的个数 USE_GPU = False   # 工具类函数 # 把名字转换成ASCII码,    b  返回ASCII码值列表和名字的长度 def name2list(name):     arr = [ord(c) for c in name]     return arr, len(arr)   # 是否把数据放到GPU上 def create_tensor(tensor):     if USE_GPU:         device = torch.device('cuda:0')         tensor = tensor.to(device)     return tensor   def timesince(since):     now = time.time()     s = now - since     m = math.floor(s / 60)  # math.floor()向下取整     s -= m * 60     return '%dmin %ds' % (m, s)  # 多少分钟多少秒   # 2.构建数据集 class NameDataset(Dataset):     def __init__(self, is_train=True):         # 文件读取         filename = './dataset/names_train.csv.gz' if is_train else './dataset/names_test.csv.gz'         with gzip.open(filename, 'rt') as f:  # rt表示以只读模式打开文件,并将文件内容解析为文本形式             reader = csv.reader(f)             rows =list(reader)  # 每个元素由一个名字和国家组成         # 提取属性         self.names = [row[0] for row in rows]         self.len = len(self.names)         self.countries = [row[1] for row in rows]         # 编码处理         self.country_list = list(sorted(set(self.countries)))  # 列表,按字母表顺序排序,去重后有18个国家名         self.country_dict = self.get_countries_dict()  # 字典,国家名对应序号标签         self.country_num = len(self.country_list)      def __getitem__(self, item):         # 索引获取         return self.names[item], self.country_dict[self.countries[item]]  # 根据国家去字典查找索引      def __len__(self):         # 获取个数         return self.len      def get_countries_dict(self):         # 根据国家名对应序号         country_dict = dict()         for idx, country_name in enumerate(self.country_list):             country_dict[country_name] = idx         return country_dict      def idx2country(self, index):         # 根据索引返回国家名字         return self.country_list[index]      def get_countries_num(self):         # 返回国家名个数(分类的总个数)         return self.country_num   # 3.实例化数据集 train_set = NameDataset(is_train=True) train_loader = DataLoader(train_set, shuffle=True, batch_size=BATCH_SIZE, num_workers=2) test_set = NameDataset(is_train=False) test_loder = DataLoader(test_set, shuffle=False, batch_size=BATCH_SIZE, num_workers=2) N_COUNTRY = train_set.get_countries_num()  # 18个国家名,即18个类别   # 4.模型构建 class GRUClassifier(torch.nn.Module):     def __init__(self, input_size, hidden_size, output_size, n_layers=1, bidirectional=True):         super(GRUClassifier, self).__init__()         self.hidden_size = hidden_size         self.n_layers = n_layers         self.n_directions = 2 if bidirectional else 1          # 词嵌入层,将词语映射到hidden维度         self.embedding = torch.nn.Embedding(input_size, hidden_size)         # GRU层(输入为特征数,这里是embedding_size,其大小等于hidden_size))         self.gru = torch.nn.GRU(hidden_size, hidden_size, num_layers=n_layers, bidirectional=bidirectional)#bidirectional双向神经网络         # 线性层         self.fc = torch.nn.Linear(hidden_size * self.n_directions, output_size)      def _init_hidden(self, bath_size):         # 初始化权重,(n_layers * num_directions 双向, batch_size, hidden_size)         hidden = torch.zeros(self.n_layers * self.n_directions, bath_size, self.hidden_size)         return create_tensor(hidden)      def forward(self, input, seq_lengths):         # 转置 B X S -> S X B         input = input.t()  # 此时的维度为seq_len, batch_size         batch_size = input.size(1)         hidden = self._init_hidden(batch_size)          # 嵌入层处理 input:(seq_len,batch_size) -> embedding:(seq_len,batch_size,embedding_size)         embedding = self.embedding(input)          # 进行打包(不考虑0元素,提高运行速度)需要将嵌入数据按长度排好         gru_input = pack_padded_sequence(embedding, seq_lengths)          # output:(*, hidden_size * num_directions),*表示输入的形状(seq_len,batch_size)         # hidden:(num_layers * num_directions, batch, hidden_size)         output, hidden = self.gru(gru_input, hidden)         if self.n_directions == 2:             hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1)  # hidden[-1]的形状是(1,256,100),hidden[-2]的形状是(1,256,100),拼接后的形状是(1,256,200)         else:             hidden_cat = hidden[-1]  # (1,256,100)         fc_output = self.fc(hidden_cat)         return fc_output   # 3.数据处理(姓名->数字) def make_tensors(names, countries):     # 获取嵌入长度从大到小排序的seq_tensor(嵌入向量)、seq_lengths(对应长度)、countries(对应顺序的国家序号)-> 便于pack_padded_sequence处理     name_len_list = [name2list(name) for name in names]  # 每个名字对应的1列表     name_seq = [sl[0] for sl in name_len_list]  # 姓名列表     seq_lengths = torch.LongTensor([sl[1] for sl in name_len_list])  # 名字对应的字符个数     countries = countries.long()   # PyTorch 中,张量的默认数据类型是浮点型 (float),这里转换成整型,可以避免浮点数比较时的精度误差,从而提高模型的训练效果      # 创建全零张量,再依次进行填充     # 创建了一个 len(name_seq) * seq_length.max()维的张量     seq_tensor = torch.zeros(len(name_seq), seq_lengths.max()).long()     for idx, (seq, seq_len) in enumerate(zip(name_seq, seq_lengths)):         seq_tensor[idx, :seq_len] = torch.LongTensor(seq)      # 为了使用pack_padded_sequence,需要按照长度排序     # perm_idx是排序后的数据在原数据中的索引,seq_tensor是排序后的数据,seq_lengths是排序后的数据的长度,countries是排序后的国家     seq_lengths, perm_idx = seq_lengths.sort(dim=0, descending=True)  # descending=True 表示按降序进行排序,即从最长的序列到最短的序列。     seq_tensor = seq_tensor[perm_idx]     countries = countries[perm_idx]      return create_tensor(seq_tensor), create_tensor(seq_lengths), create_tensor(countries)   # 训练循环 def train(epoch, start):     total_loss = 0     for i, (names, countries) in enumerate(train_loader, 1):         inputs, seq_lengths, target = make_tensors(names, countries)  # 输入、每个序列长度、输出         output = model(inputs, seq_lengths)         loss = criterion(output, target)         optimizer.zero_grad()         loss.backward()         optimizer.step()          total_loss += loss.item()         if i % 10 == 0:             print(f'[{timesince(start)}] Epoch {epoch} ', end='')             print(f'[{i * len(inputs)}/{len(train_set)}] ', end='')             print(f'loss={total_loss / (i * len(inputs))}')  # 打印每个样本的平均损失      return total_loss   # 测试循环 def test():     correct = 0     total = len(test_set)     print('evaluating trained model ...')     with torch.no_grad():         for i, (names, countries) in enumerate(test_loder, 1):             inputs, seq_lengths, target = make_tensors(names, countries)             output = model(inputs, seq_lengths)             pred = output.max(dim=1, keepdim=True)[1]  # 返回每一行中最大值的那个元素的索引,且keepdim=True,表示保持输出的二维特性             correct += pred.eq(target.view_as(pred)).sum().item()  # 计算正确的个数         percent = '%.2f' % (100 * correct / total)         print(f'Test set: Accuracy {correct}/{total} {percent}%')      return correct / total  # 返回的是准确率,0.几几的格式,用来画图   if __name__ == '__main__':     model = GRUClassifier(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYER)     criterion = torch.nn.CrossEntropyLoss()     optimizer = optim.Adam(model.parameters(), lr=0.001)     device = 'cuda:0' if USE_GPU else 'cpu'     model.to(device)     start = time.time()     print('Training for %d epochs...' % N_EPOCHS)     acc_list = []     # 在每个epoch中,训练完一次就测试一次     for epoch in range(1, N_EPOCHS + 1):         # Train cycle         train(epoch, start)         acc = test()         acc_list.append(acc)      # 绘制在测试集上的准确率     epoch = np.arange(1, len(acc_list) + 1)     acc_list = np.array(acc_list)     plt.plot(epoch, acc_list)     plt.xlabel('Epoch')     plt.ylabel('Accuracy')     plt.grid()     plt.show()  

在准确率最高点save模型

保存整个模型:

torch.save(model,'save.pt') 

只保存训练好的权重:

torch.save(model.state_dict(), 'save.pt') 

练习

数据集地址,判断句子是哪类(0-negative,1-somewhat negative,2-neutral,3-somewhat positive,4-positive)情感分析

import pandas as pd import torch from torch.utils.data import Dataset, DataLoader import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import LabelEncoder from torch.nn.utils.rnn import pad_sequence from torch.nn.utils.rnn import pack_padded_sequence import zipfile   # 超参数设置 BATCH_SIZE = 64 HIDDEN_SIZE = 100 N_LAYERS = 2 N_EPOCHS = 10 LEARNING_RATE = 0.001  # 数据集路径 TRAIN_ZIP_PATH = './dataset/train.tsv.zip' TEST_ZIP_PATH = './dataset/test.tsv.zip'  # 解压缩文件 def unzip_file(zip_path, extract_to='.'):     with zipfile.ZipFile(zip_path, 'r') as zip_ref:         zip_ref.extractall(extract_to)  unzip_file(TRAIN_ZIP_PATH) unzip_file(TEST_ZIP_PATH)  # 数据集路径 TRAIN_PATH = './train.tsv' TEST_PATH = './test.tsv'  # 自定义数据集类 class SentimentDataset(Dataset):     def __init__(self, phrases, sentiments=None):         self.phrases = phrases         self.sentiments = sentiments      def __len__(self):         return len(self.phrases)      def __getitem__(self, idx):         phrase = self.phrases[idx]         if self.sentiments is not None:             sentiment = self.sentiments[idx]             return phrase, sentiment         return phrase  # 加载数据 def load_data():     train_df = pd.read_csv(TRAIN_PATH, sep='\t')     test_df = pd.read_csv(TEST_PATH, sep='\t')     return train_df, test_df  train_df, test_df = load_data()  # 数据预处理 def preprocess_data(train_df, test_df):     le = LabelEncoder()     train_df['Sentiment'] = le.fit_transform(train_df['Sentiment'])     train_phrases = train_df['Phrase'].tolist()     train_sentiments = train_df['Sentiment'].tolist()     test_phrases = test_df['Phrase'].tolist()     return train_phrases, train_sentiments, test_phrases, le  train_phrases, train_sentiments, test_phrases, le = preprocess_data(train_df, test_df)  # 构建词汇表 def build_vocab(phrases):     vocab = set()     for phrase in phrases:         for word in phrase.split():             vocab.add(word)     word2idx = {word: idx for idx, word in enumerate(vocab, start=1)}     word2idx['<PAD>'] = 0     return word2idx  word2idx = build_vocab(train_phrases + test_phrases)  # 将短语转换为索引 def phrase_to_indices(phrase, word2idx):     return [word2idx[word] for word in phrase.split() if word in word2idx]  train_indices = [phrase_to_indices(phrase, word2idx) for phrase in train_phrases] test_indices = [phrase_to_indices(phrase, word2idx) for phrase in test_phrases]  # 移除长度为0的样本 train_indices = [x for x in train_indices if len(x) > 0] train_sentiments = [y for x, y in zip(train_indices, train_sentiments) if len(x) > 0] test_indices = [x for x in test_indices if len(x) > 0]  # 数据加载器 def collate_fn(batch):     phrases, sentiments = zip(*batch)     lengths = torch.tensor([len(x) for x in phrases])     phrases = [torch.tensor(x) for x in phrases]     phrases_padded = pad_sequence(phrases, batch_first=True, padding_value=0)     sentiments = torch.tensor(sentiments)     return phrases_padded, sentiments, lengths  train_dataset = SentimentDataset(train_indices, train_sentiments) train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)  test_dataset = SentimentDataset(test_indices) test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=lambda x: pad_sequence([torch.tensor(phrase) for phrase in x], batch_first=True, padding_value=0))  # 模型定义 class SentimentRNN(nn.Module):     def __init__(self, vocab_size, embed_size, hidden_size, output_size, n_layers):         super(SentimentRNN, self).__init__()         self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)         self.lstm = nn.LSTM(embed_size, hidden_size, n_layers, batch_first=True, bidirectional=True)         self.fc = nn.Linear(hidden_size * 2, output_size)      def forward(self, x, lengths):         x = self.embedding(x)         x = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)         _, (hidden, _) = self.lstm(x)         hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)         out = self.fc(hidden)         return out  vocab_size = len(word2idx) embed_size = 128 output_size = len(le.classes_)  model = SentimentRNN(vocab_size, embed_size, HIDDEN_SIZE, output_size, N_LAYERS)  criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  # 训练和测试循环 def train(model, train_loader, criterion, optimizer, n_epochs):     model.train()     for epoch in range(n_epochs):         total_loss = 0         for phrases, sentiments, lengths in train_loader:             optimizer.zero_grad()             output = model(phrases, lengths)             loss = criterion(output, sentiments)             loss.backward()             optimizer.step()             total_loss += loss.item()         print(f'Epoch: {epoch+1}, Loss: {total_loss/len(train_loader)}')  def generate_test_results(model, test_loader, test_ids):     model.eval()     results = []     with torch.no_grad():         for phrases in test_loader:             lengths = torch.tensor([len(x) for x in phrases])             output = model(phrases, lengths)             preds = torch.argmax(output, dim=1)             results.extend(preds.cpu().numpy())     return results  train(model, train_loader, criterion, optimizer, N_EPOCHS)  test_ids = test_df['PhraseId'].tolist() preds = generate_test_results(model, test_loader, test_ids)  # 保存结果 output_df = pd.DataFrame({'PhraseId': test_ids, 'Sentiment': preds}) output_df.to_csv('sentiment_predictions.csv', index=False)  

引入随机性:重要性采样,对分类的样本按照它的分布来进行随机采样

gpt优化后代码:停词表自己在网上找个博客复制粘贴成stopwords.txt文件就行
主要改进点:

  1. 增加了嵌入层维度和隐藏层大小,LSTM 层数。
  2. 使用 GloVe 预训练词向量。
  3. 使用了 dropout 防止过拟合。
  4. 使用 AdamW 优化器。
  5. 移除停用词。
  6. 增加了验证集并使用学习率调度器用于模型性能评估。
  7. 保存和加载最优模型。
 import pandas as pd import torch from torch.utils.data import Dataset, DataLoader import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import LabelEncoder from torch.nn.utils.rnn import pad_sequence from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence from sklearn.model_selection import train_test_split import zipfile import os from torchtext.vocab import GloVe  # 超参数设置 BATCH_SIZE = 64 HIDDEN_SIZE = 256 N_LAYERS = 3 N_EPOCHS = 20 LEARNING_RATE = 0.01 EMBED_SIZE = 300 DROPOUT = 0.5  # 数据集路径 TRAIN_ZIP_PATH = './dataset/train.tsv.zip' TEST_ZIP_PATH = './dataset/test.tsv.zip'   # 解压缩文件 def unzip_file(zip_path, extract_to='.'):     with zipfile.ZipFile(zip_path, 'r') as zip_ref:         zip_ref.extractall(extract_to)   unzip_file(TRAIN_ZIP_PATH) unzip_file(TEST_ZIP_PATH)  # 数据集路径 TRAIN_PATH = './train.tsv' TEST_PATH = './test.tsv'   # 自定义数据集类 class SentimentDataset(Dataset):     def __init__(self, phrases, sentiments=None):         self.phrases = phrases         self.sentiments = sentiments      def __len__(self):         return len(self.phrases)      def __getitem__(self, idx):         phrase = self.phrases[idx]         if self.sentiments is not None:             sentiment = self.sentiments[idx]             return phrase, sentiment         return phrase   # 加载数据 def load_data():     train_df = pd.read_csv(TRAIN_PATH, sep='\t')     test_df = pd.read_csv(TEST_PATH, sep='\t')     return train_df, test_df   train_df, test_df = load_data()   # 数据预处理 def preprocess_data(train_df, test_df):     le = LabelEncoder()     train_df['Sentiment'] = le.fit_transform(train_df['Sentiment'])     train_phrases = train_df['Phrase'].tolist()     train_sentiments = train_df['Sentiment'].tolist()     test_phrases = test_df['Phrase'].tolist()     test_ids = test_df['PhraseId'].tolist()     return train_phrases, train_sentiments, test_phrases, test_ids, le   train_phrases, train_sentiments, test_phrases, test_ids, le = preprocess_data(train_df, test_df)   # 移除停用词 def remove_stopwords(phrases):     stopwords = set(open('./dataset/stopwords.txt').read().split())     return [' '.join([word for word in phrase.split() if word not in stopwords]) for phrase in phrases]   train_phrases = remove_stopwords(train_phrases) test_phrases = remove_stopwords(test_phrases)   # 构建词汇表 def build_vocab(phrases):     vocab = set()     for phrase in phrases:         for word in phrase.split():             vocab.add(word)     word2idx = {word: idx for idx, word in enumerate(vocab, start=1)}     word2idx['<PAD>'] = 0     return word2idx   word2idx = build_vocab(train_phrases + test_phrases)  # 加载预训练的词向量 glove = GloVe(name='6B', dim=EMBED_SIZE)   def create_embedding_matrix(word2idx, glove):     vocab_size = len(word2idx)     embedding_matrix = torch.zeros((vocab_size, EMBED_SIZE))     for word, idx in word2idx.items():         if word in glove.stoi:             embedding_matrix[idx] = glove[word]         else:             embedding_matrix[idx] = torch.randn(EMBED_SIZE)     return embedding_matrix   embedding_matrix = create_embedding_matrix(word2idx, glove)   # 将短语转换为索引 def phrase_to_indices(phrase, word2idx):     return [word2idx[word] for word in phrase.split() if word in word2idx]   train_indices = [phrase_to_indices(phrase, word2idx) for phrase in train_phrases] test_indices = [phrase_to_indices(phrase, word2idx) for phrase in test_phrases]  # 移除长度为0的样本 train_indices, train_sentiments = zip(*[(x, y) for x, y in zip(train_indices, train_sentiments) if len(x) > 0]) # 注意:这里不移除 test_indices 中的空样本,因为我们需要保持 test_ids 的完整性 test_indices_with_default = [phrase if len(phrase) > 0 else [0] for phrase in test_indices]  # 划分训练集和验证集 train_indices, val_indices, train_sentiments, val_sentiments = train_test_split(     train_indices, train_sentiments, test_size=0.2, random_state=42)   # 数据加载器 def collate_fn(batch):     phrases, sentiments = zip(*batch)     lengths = torch.tensor([len(x) for x in phrases])     phrases = [torch.tensor(x) for x in phrases]     phrases_padded = pad_sequence(phrases, batch_first=True, padding_value=0)     sentiments = torch.tensor(sentiments)     return phrases_padded, sentiments, lengths   train_dataset = SentimentDataset(train_indices, train_sentiments) val_dataset = SentimentDataset(val_indices, val_sentiments) train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)  test_dataset = SentimentDataset(test_indices_with_default) test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,                          collate_fn=lambda x: pad_sequence([torch.tensor(phrase) for phrase in x], batch_first=True,                                                            padding_value=0))   # 模型定义 class SentimentRNN(nn.Module):     def __init__(self, vocab_size, embed_size, hidden_size, output_size, n_layers, dropout, embedding_matrix):         super(SentimentRNN, self).__init__()         self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False)         self.lstm = nn.LSTM(embed_size, hidden_size, n_layers, batch_first=True, bidirectional=True, dropout=dropout)         self.fc = nn.Linear(hidden_size * 2, output_size)         self.dropout = nn.Dropout(dropout)      def forward(self, x, lengths):         x = self.embedding(x)         x = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)         packed_output, (hidden, _) = self.lstm(x)         hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)         hidden = self.dropout(hidden)         out = self.fc(hidden)         return out   vocab_size = len(word2idx) output_size = len(le.classes_)  model = SentimentRNN(vocab_size, EMBED_SIZE, HIDDEN_SIZE, output_size, N_LAYERS, DROPOUT, embedding_matrix)  criterion = nn.CrossEntropyLoss() optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)  # 学习率调度器 scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)   # 训练和测试循环 def train(model, train_loader, val_loader, criterion, optimizer, scheduler, n_epochs):     model.train()     best_val_accuracy = 0     for epoch in range(n_epochs):         total_loss = 0         model.train()         for phrases, sentiments, lengths in train_loader:             optimizer.zero_grad()             output = model(phrases, lengths)             loss = criterion(output, sentiments)             loss.backward()             optimizer.step()             total_loss += loss.item()         print(f'Epoch: {epoch + 1}, Loss: {total_loss / len(train_loader)}')          val_accuracy = evaluate(model, val_loader)         scheduler.step(total_loss / len(train_loader))         if val_accuracy > best_val_accuracy:             best_val_accuracy = val_accuracy             torch.save(model.state_dict(), 'best_model.pt')         print(f'Epoch: {epoch + 1}, Val Accuracy: {val_accuracy}')   def evaluate(model, val_loader):     model.eval()     correct, total = 0, 0     with torch.no_grad():         for phrases, sentiments, lengths in val_loader:             output = model(phrases, lengths)             preds = torch.argmax(output, dim=1)             correct += (preds == sentiments).sum().item()             total += sentiments.size(0)     return correct / total   def generate_test_results(model, test_loader):     model.eval()     results = []     with torch.no_grad():         for phrases in test_loader:             lengths = torch.tensor([len(x) for x in phrases])             output = model(phrases, lengths)             preds = torch.argmax(output, dim=1)             results.extend(preds.cpu().numpy())     return results   train(model, train_loader, val_loader, criterion, optimizer, scheduler, N_EPOCHS)  # 加载最优模型 model.load_state_dict(torch.load('best_model.pt'))  preds = generate_test_results(model, test_loader)    # 确保 test_ids 和 preds 长度一致 assert len(test_ids) == len(preds), f"Lengths do not match: {len(test_ids)} vs {len(preds)}"   # 保存结果 output_df = pd.DataFrame({'PhraseId': test_ids, 'Sentiment': preds}) output_df.to_csv('sentiment_predictions2.csv', index=False) 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!