LSTM+CRF序列标注 AI代码解析
本篇案例暂不支持在windows系统上运行。
概述
序列标注指给定输入序列,给序列中每个Token进行标注标签的过程。序列标注问题通常用于从文本中进行信息抽取,包括分词(Word Segmentation)、词性标注(Position Tagging)、命名实体识别(Named Entity Recognition, NER)等。以命名实体识别为例:
输入序列 | 清 | 华 | 大 | 学 | 座 | 落 | 于 | 首 | 都 | 北 | 京 |
---|---|---|---|---|---|---|---|---|---|---|---|
输出标注 | B | I | I | I | O | O | O | O | O | B | I |
如上表所示,清华大学
和 北京
是地名,需要将其识别,我们对每个输入的单词预测其标签,最后根据标签来识别实体。
这里使用了一种常见的命名实体识别的标注方法——“BIOE”标注,将一个实体(Entity)的开头标注为B,其他部分标注为I,非实体标注为O。
条件随机场(Conditional Random Field, CRF)
从上文的举例可以看到,对序列进行标注,实际上是对序列中每个Token进行标签预测,可以直接视作简单的多分类问题。但是序列标注不仅仅需要对单个Token进行分类预测,同时相邻Token直接有关联关系。以清华大学
一词为例:
输入序列 | 清 | 华 | 大 | 学 | |
---|---|---|---|---|---|
输出标注 | B | I | I | I | √ |
输出标注 | O | I | I | I | × |
如上表所示,正确的实体中包含的4个Token有依赖关系,I前必须是B或I,而错误输出结果将清
字标注为O,违背了这一依赖。将命名实体识别视为多分类问题,则每个词的预测概率都是独立的,易产生类似的问题,因此需要引入一种能够学习到此种关联关系的算法来保证预测结果的正确性。而条件随机场是适合此类场景的一种概率图模型。下面对条件随机场的定义和参数化形式进行简析。
考虑到序列标注问题的线性序列特点,本节所述的条件随机场特指线性链条件随机场(Linear Chain CRF)
设为输入序列,为输出的标注序列,其中为序列的最大长度,表示对应的所有可能的输出序列集合。则输出序列的概率为:
设, 为序列的第个Token和对应的标签,则需要能够在计算和的映射的同时,捕获相邻标签和之间的关系,因此我们定义两个概率函数:
- 发射概率函数:表示的概率。
- 转移概率函数:表示的概率。
则可以得到的计算公式:
设标签集合为,构造大小为的矩阵,用于存储标签间的转移概率;由编码层(可以为Dense、LSTM等)输出的隐状态可以直接视作发射概率,此时的计算公式可以转化为:
完整的CRF完整推导可参考Log-Linear Models, MEMMs, and CRFs
接下来我们根据上述公式,使用MindSpore来实现CRF的参数化形式。首先实现CRF层的前向训练部分,将CRF和损失函数做合并,选择分类问题常用的负对数似然函数(Negative Log Likelihood, NLL),则有:
由公式可得,
根据公式,我们称被减数为Normalizer,减数为Score,分别实现后相减得到最终Loss。
Score计算
首先根据公式计算正确标签序列所对应的得分,这里需要注意,除了转移概率矩阵外,还需要维护两个大小为的向量,分别作为序列开始和结束时的转移概率。同时我们引入了一个掩码矩阵,将多个序列打包为一个Batch时填充的值忽略,使得计算仅包含有效的Token。
def compute_score(emissions, tags, seq_ends, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# tags: (seq_length, batch_size)
# mask: (seq_length, batch_size)
seq_length, batch_size = tags.shape # 获取序列长度和批次大小 mask = mask.astype(emissions.dtype) # 将mask的类型转换为与emissions一致 # 将score设置为初始转移概率 # shape: (batch_size,) score = start_trans[tags[0]] # 初始化score为起始转移概率,其中tags[0]表示第一个时间步的标签索引 # score += 第一次发射概率 # shape: (batch_size,) score += emissions[0, mnp.arange(batch_size), tags[0]] # 将第一次发射概率加到score上 for i in range(1, seq_length): # 从第二个时间步开始迭代 # 标签由i-1转移至i的转移概率(当mask == 1时有效) # shape: (batch_size,) score += trans[tags[i - 1], tags[i]] * mask[i] # 添加标签转移概率,只有当mask[i]为1时才有效 # 预测tags[i]的发射概率(当mask == 1时有效) # shape: (batch_size,) score += emissions[i, mnp.arange(batch_size), tags[i]] * mask[i] # 添加当前时间步的发射概率,只有当mask[i]为1时才有效 # 结束转移 # shape: (batch_size,) last_tags = tags[seq_ends, mnp.arange(batch_size)] # 获取每个batch中序列结束时的标签 # score += 结束转移概率 # shape: (batch_size,) score += end_trans[last_tags] # 将结束转移概率加到score上 return score # 返回最终的score
代码解析
- 获取序列长度和批次大小:
seq_length, batch_size = tags.shape
: 从标签数组中获取序列长度和批次大小。
- 转换mask类型:
mask = mask.astype(emissions.dtype)
: 确保mask与发射概率emissions的数据类型一致,以便进行后续计算。
- 初始化score:
score = start_trans[tags[0]]
: 用第一个时间步的标签索引来获取起始转移概率,初始化score。
- 添加第一次发射概率:
score += emissions[0, mnp.arange(batch_size), tags[0]]
: 将第一个时间步的发射概率加到score上。
- 循环遍历序列:
for i in range(1, seq_length)
: 从第二个时间步开始进行遍历。score += trans[tags[i - 1], tags[i]] * mask[i]
: 计算标签转移概率并加到score上,仅当mask[i]为1时有效。score += emissions[i, mnp.arange(batch_size), tags[i]] * mask[i]
: 计算当前时间步的发射概率并加到score上,仅当mask[i]为1时有效。
- 结束转移:
last_tags = tags[seq_ends, mnp.arange(batch_size)]
: 获取每个batch中序列结束时的标签。score += end_trans[last_tags]
: 将结束转移概率加到score上。
- 返回最终的score:
return score
:返回计算得到的score。
API解析
start_trans
: 初始转移概率数组,尺寸通常为标签数量。mask
: 用于指示哪些时间步是有效的,通常是一个布尔数组,包含值1(有效)或0(无效)。emissions
: 发射概率数组,表示在每个时间步和每个标签的发射概率。trans
: 标签之间的转移概率矩阵,表示从一个标签转移到另一个标签的概率。end_trans
: 结束状态的转移概率数组,表示从某个标签转移到结束状态的概率。tags
: 包含每个时间步标签的数组,通常是一个二维数组,形状为(seq_length, batch_size)。seq_ends
: 用于标识每个序列结束的索引,通常是一个一维数组。
Normalizer计算
根据公式,Normalizer是对应的所有可能的输出序列的Score的对数指数和(Log-Sum-Exp)。此时如果按穷举法进行计算,则需要将每个可能的输出序列Score都计算一遍,共有个结果。这里我们采用动态规划算法,通过复用计算结果来提高效率。
假设需要计算从第至第个Token所有可能的输出序列得分,则可以先计算出从第至第个Token所有可能的输出序列得分。因此,Normalizer可以改写为以下形式:
其中为第个Token的发射概率,是转移矩阵。由于发射概率矩阵和转移概率矩阵独立于的序列路径计算,可以将其提出,可得:
根据公式(7),Normalizer的实现如下:
def compute_normalizer(emissions, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# mask: (seq_length, batch_size)
seq_length = emissions.shape[0] # 获取序列的长度,即发射概率的第一个维度 # 将score设置为初始转移概率,并加上第一次发射概率 # shape: (batch_size, num_tags) score = start_trans + emissions[0] # 初始化score为起始转移概率加上第一次发射概率 for i in range(1, seq_length): # 从第二个时间步开始迭代 # 扩展score的维度用于总score的计算 # shape: (batch_size, num_tags, 1) broadcast_score = score.expand_dims(2) # 将score的最后一个维度扩展为1,以便用于广播计算 # 扩展emission的维度用于总score的计算 # shape: (batch_size, 1, num_tags) broadcast_emissions = emissions[i].expand_dims(1) # 将当前时间步的发射概率扩展为1,以便用于广播计算 # 根据公式(7),计算score_i # 此时broadcast_score是由第0个到当前Token所有可能路径 # 对应score的log_sum_exp # shape: (batch_size, num_tags, num_tags) next_score = broadcast_score + trans + broadcast_emissions # 计算下一步的score # 对score_i做log_sum_exp运算,用于下一个Token的score计算 # shape: (batch_size, num_tags) next_score = ops.logsumexp(next_score, axis=1) # 对最后一个维度进行log_sum_exp,以整合所有路径的得分 # 当mask == 1时,score才会变化 # shape: (batch_size, num_tags) score = mnp.where(mask[i].expand_dims(1), next_score, score) # 根据mask更新score # 最后加结束转移概率 # shape: (batch_size, num_tags) score += end_trans # 将结束转移概率加到score上 # 对所有可能的路径得分求log_sum_exp # shape: (batch_size,) return ops.logsumexp(score, axis=1) # 返回最终的得分
代码解析
- 获取序列长度:
seq_length = emissions.shape[0]
: 从发射概率数组中获取序列的长度。
- 初始化score:
score = start_trans + emissions[0]
: 将初始转移概率与第一次发射概率相加,初始化score。
- 循环遍历序列:
for i in range(1, seq_length)
: 从第二个时间步开始进行遍历。- 扩展score的维度:
broadcast_score = score.expand_dims(2)
: 将score的最后一个维度扩展为1,以便与其他数组进行广播计算。
- 扩展emissions的维度:
broadcast_emissions = emissions[i].expand_dims(1)
: 将当前时间步的发射概率扩展为1,以便在计算中进行广播。
- 计算下一步的score:
next_score = broadcast_score + trans + broadcast_emissions
: 计算当前时间步的所有可能路径得分,包括转移概率和发射概率。
- 进行log_sum_exp计算:
next_score = ops.logsumexp(next_score, axis=1)
: 对最后一个维度进行log_sum_exp,整合所有路径的得分,以便生成下一个时间步的score。
- 更新score:
score = mnp.where(mask[i].expand_dims(1), next_score, score)
: 根据mask更新当前的score,只有mask为1的情况下score才会被更新。
- 扩展score的维度:
- 添加结束转移概率:
score += end_trans
: 将结束转移概率加到score上。
- 返回最终得分:
return ops.logsumexp(score, axis=1)
: 对所有可能路径的得分进行log_sum_exp计算,返回最终的结果。
API解析
start_trans
: 初始转移概率数组,通常大小为(num_tags,),表示从起始状态到每个标签的概率。emissions
: 发射概率数组,形状为(seq_length, batch_size, num_tags),表示每个时间步的发射概率。trans
: 标签之间的转移概率矩阵,形状为(num_tags, num_tags),表示从一个标签转移到另一个标签的概率。end_trans
: 结束状态的转移概率数组,形状为(batch_size, num_tags),表示每个标签转移到结束状态的概率。mask
: 布尔数组,形状为(seq_length, batch_size),用于指示哪些时间步是有效的。ops.logsumexp
: 用于计算给定维度上的log_sum_exp,这是一种数值稳定的计算方式,通常用于处理对数概率和数值溢出问题。mnp.where
: 根据条件生成新的数组,在这里用来根据mask的值决定更新的score。
Viterbi算法
在完成前向训练部分后,需要实现解码部分。这里我们选择适合求解序列最优路径的Viterbi算法。与计算Normalizer类似,使用动态规划求解所有可能的预测序列得分。不同的是在解码时同时需要将第个Token对应的score取值最大的标签保存,供后续使用Viterbi算法求解最优预测序列使用。
取得最大概率得分,以及每个Token对应的标签历史后,根据Viterbi算法可以得到公式:
从第0个至第个Token对应概率最大的序列,只需要考虑从第0个至第个Token对应概率最大的序列,以及从第个至第个概率最大的标签即可。因此我们逆序求解每一个概率最大的标签,构成最佳的预测序列。
由于静态图语法限制,我们将Viterbi算法求解最佳预测序列的部分作为后处理函数,不纳入后续CRF层的实现。
def viterbi_decode(emissions, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# mask: (seq_length, batch_size)
seq_length = mask.shape[0] # 获取序列长度,即mask的第一维度 score = start_trans + emissions[0] # 初始化score为起始转移概率加上第一次发射概率 history = () # 初始化历史记录,用于保存每个时间步的最佳标签索引 for i in range(1, seq_length): # 从第二个时间步开始迭代 broadcast_score = score.expand_dims(2) # 扩展score的维度以便与其他数组广播 broadcast_emission = emissions[i].expand_dims(1) # 扩展当前时间步的发射概率 next_score = broadcast_score + trans + broadcast_emission # 计算当前时间步的score # 求当前Token对应score取值最大的标签,并保存 indices = next_score.argmax(axis=1) # 获取每个batch中score最大的标签索引 history += (indices,) # 将索引添加到历史记录中 next_score = next_score.max(axis=1) # 取出每个batch中score的最大值 score = mnp.where(mask[i].expand_dims(1), next_score, score) # 根据mask更新score score += end_trans # 加上结束转移概率 return score, history # 返回最终的score和历史记录
代码解析
- 获取序列长度:
seq_length = mask.shape[0]
: 从mask中获取序列长度。
- 初始化score和历史记录:
score = start_trans + emissions[0]
: 将起始转移概率与第一次发射概率相加,初始化score。history = ()
: 初始化一个空的元组,用于记录每个时间步的最佳标签索引。
- 循环遍历序列:
for i in range(1, seq_length)
: 从第二个时间步开始进行遍历。- 扩展score和发射概率的维度:
broadcast_score = score.expand_dims(2)
: 将score的最后一个维度扩展为1,以便于广播计算。broadcast_emission = emissions[i].expand_dims(1)
: 将当前时间步的发射概率扩展为1,以便与其他数组进行广播。
- 计算当前时间步的score:
next_score = broadcast_score + trans + broadcast_emission
: 计算下一时间步的所有可能路径得分。
- 获取最佳标签索引:
indices = next_score.argmax(axis=1)
: 获取每个batch中得分最高的标签的索引,并保存到indices中。history += (indices,)
: 将当前时间步的最佳标签索引记录到历史记录中。
- 更新score:
next_score = next_score.max(axis=1)
: 取出每个batch的最大得分。score = mnp.where(mask[i].expand_dims(1), next_score, score)
: 根据mask更新当前的score,只有在mask为1时score才会被更新。
- 扩展score和发射概率的维度:
- 添加结束转移概率:
score += end_trans
: 将结束转移概率加到score上。
- 返回结果:
return score, history
: 返回最终的得分和历史记录。
API解析
start_trans
: 初始转移概率数组,形状为(num_tags,),表示从起始状态到每个标签的概率。emissions
: 发射概率数组,形状为(seq_length, batch_size, num_tags),表示每个时间步的发射概率。trans
: 标签之间的转移概率矩阵,形状为(num_tags, num_tags),表示从一个标签转移到另一个标签的概率。end_trans
: 结束状态的转移概率数组,形状为(batch_size, num_tags),表示每个标签转移到结束状态的概率。mask
: 布尔数组,形状为(seq_length, batch_size),用于指示哪些时间步是有效的。mnp.where
: 根据条件生成新的数组,在这里用来根据mask的值决定score是否更新。argmax
: 返回指定轴上最大值的索引,用于确定得分最高的标签。
def post_decode(score, history, seq_length): # 使用Score和History计算最佳预测序列 batch_size = seq_length.shape[0] # 获取batch大小 seq_ends = seq_length - 1 # 计算每个序列的结束索引 best_tags_list = [] # 用于存储每个样本的最佳预测标签序列 # 依次对一个Batch中每个样例进行解码 for idx in range(batch_size): # 查找使最后一个Token对应的预测概率最大的标签, # 并将其添加至最佳预测序列存储的列表中 best_last_tag = score[idx].argmax(axis=0) # 找到当前样本最后一个时间步的最佳标签 best_tags = [int(best_last_tag.asnumpy())] # 将最佳标签转换为整数并添加到列表中 # 重复查找每个Token对应的预测概率最大的标签,加入列表 for hist in reversed(history[:seq_ends[idx]]): # 按时间步反向遍历历史记录 best_last_tag = hist[idx][best_tags[-1]] # 获取当前最佳标签对应的历史标签 best_tags.append(int(best_last_tag.asnumpy())) # 将该标签添加到最佳标签列表中 # 将逆序求解的序列标签重置为正序 best_tags.reverse() # 反转列表,以便得到正序的标签序列 best_tags_list.append(best_tags) # 将结果添加到最终列表中 return best_tags_list # 返回所有样本的最佳标签序列
代码解析
- 函数定义:
def post_decode(score, history, seq_length)
: 定义一个名为post_decode
的函数,用于根据得分和历史记录计算最佳预测序列。
- 获取batch大小和结束索引:
batch_size = seq_length.shape[0]
: 从seq_length
中获取batch的大小。seq_ends = seq_length - 1
: 计算每个序列的结束索引,表示最后一个有效时间步的索引。
- 初始化存储最佳标签的列表:
best_tags_list = []
: 初始化一个空列表,用于存储每个样本的最佳标签序列。
- 对每个样本进行解码:
for idx in range(batch_size)
: 遍历每个样本。- 查找最后一个Token的最佳标签:
best_last_tag = score[idx].argmax(axis=0)
: 获取当前样本最后一个时间步的得分最高的标签索引。best_tags = [int(best_last_tag.asnumpy())]
: 将最佳标签转换为整数,并添加到best_tags
列表中。
- 查找历史标签:
for hist in reversed(history[:seq_ends[idx]])
: 反向遍历当前样本的历史记录。best_last_tag = hist[idx][best_tags[-1]]
: 获取当前最佳标签对应的历史标签。best_tags.append(int(best_last_tag.asnumpy()))
: 将该历史标签添加到最佳标签列表中。
- 重置标签序列为正序:
best_tags.reverse()
: 反转列表,使其顺序与原始时间步一致。best_tags_list.append(best_tags)
: 将反转后的最佳标签序列添加到最终结果列表中。
- 查找最后一个Token的最佳标签:
- 返回最佳标签列表:
return best_tags_list
: 返回计算得到的所有样本的最佳标签序列。
API解析
score
: 每个样本在每个时间步的得分,形状为(batch_size, num_tags),用于判断得分最高的标签。history
: 包含每个时间步最佳标签索引的元组,长度为seq_length,记录了每个样本在每个时间步的最佳标签。seq_length
: 包含每个序列长度的数组,形状为(batch_size,),用于确定序列的有效部分。argmax
: 用于返回指定维度上最大值的索引,表示在当前状态下得分最高的标签。asnumpy
: 将张量转换为NumPy数组,以便进行常规的数值操作和类型转换。
CRF层
完成上述前向训练和解码部分的代码后,将其组装完整的CRF层。考虑到输入序列可能存在Padding的情况,CRF的输入需要考虑输入序列的真实长度,因此除发射矩阵和标签外,加入seq_length
参数传入序列Padding前的长度,并实现生成mask矩阵的sequence_mask
方法。
综合上述代码,使用nn.Cell
进行封装,最后实现完整的CRF层如下:
import mindspore as ms # 导入MindSpore框架 import mindspore.nn as nn # 导入神经网络模块 import mindspore.ops as ops # 导入操作模块 import mindspore.numpy as mnp # 导入NumPy兼容模块 from mindspore.common.initializer import initializer, Uniform # 导入初始化器 def sequence_mask(seq_length, max_length, batch_first=False): """根据序列实际长度和最大长度生成mask矩阵""" range_vector = mnp.arange(0, max_length, 1, seq_length.dtype) # 生成从0到max_length的向量 result = range_vector < seq_length.view(seq_length.shape + (1,)) # 生成mask矩阵,形状为(seq_length, max_length) if batch_first: return result.astype(ms.int64) # 如果batch_first为True,直接返回 return result.astype(ms.int64).swapaxes(0, 1) # 否则转置返回 class CRF(nn.Cell): def init(self, num_tags: int, batch_first: bool = False, reduction: str = 'sum') -> None: """初始化CRF层参数""" if num_tags <= 0: # 检查标签数量是否合法 raise ValueError(f'invalid number of tags: {num_tags}') super().init() # 调用父类初始化 if reduction not in ('none', 'sum', 'mean', 'token_mean'): # 检查reduction参数是否合法 raise ValueError(f'invalid reduction: {reduction}') self.num_tags = num_tags # 保存标签数量 self.batch_first = batch_first # 记录batch_first参数 self.reduction = reduction # 记录reduction参数 # 初始化起始、结束和转移参数 self.start_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='start_transitions') self.end_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='end_transitions') self.transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags, num_tags)), name='transitions')
代码解析
- 导入模块:
- 导入MindSpore及其相关模块,包括
nn
(神经网络)、ops
(操作)、numpy
(NumPy兼容)和初始化器。
- 导入MindSpore及其相关模块,包括
sequence_mask
函数:- 功能:根据序列长度和最大长度生成一个mask矩阵,用于指示哪些位置是有效的。
range_vector = mnp.arange(0, max_length, 1, seq_length.dtype)
: 生成一个从0到max_length
的向量。result = range_vector < seq_length.view(seq_length.shape + (1,))
: 创建一个mask矩阵,比较range_vector
和seq_length
,生成布尔值。- 返回:如果
batch_first
为True
,则返回不转置的结果,否则返回转置结果。
CRF
类:- 功能:定义条件随机场(CRF)层。
def init(self, num_tags: int, batch_first: bool = False, reduction: str = 'sum')
: 初始化方法,构造CRF层的参数。- 参数合法性检查:
- 检查
num_tags
是否大于0。 - 检查
reduction
参数是否在合法选项中。
- 检查
- 参数初始化:
self.start_transitions
: 初始化起始转移参数,形状为(num_tags,),表示从起始状态到每个标签的概率。self.end_transitions
: 初始化结束转移参数,形状为(num_tags,),表示每个标签到结束状态的概率。self.transitions
: 初始化转移参数,形状为(num_tags, num_tags),表示从一个标签转移到另一个标签的概率。
API解析
ms.Parameter
: 用于定义可训练的参数,MindSpore中的可学习变量。initializer
: 用于初始化参数,创建时指定初始化方法,如均匀分布。Uniform(0.1)
: 初始化参数时指定均匀分布的范围为[0, 0.1)
,用于生成随机初始值。mnp.arange
: 类似于NumPy的arange
,用于生成一维数组。view()
: 用于调整张量的形状。astype()
: 用于转换数据类型,这里将布尔值转换为int64
类型。
def construct(self, emissions, tags=None, seq_length=None): """构建CRF层的前向计算或解码过程""" if tags is None: # 如果没有提供标签,则进行解码 return self._decode(emissions, seq_length) return self._forward(emissions, tags, seq_length) # 否则进行前向计算 def _forward(self, emissions, tags=None, seq_length=None): """CRF的前向传播计算""" if self.batch_first: batch_size, max_length = tags.shape # 提取batch_size和max_length emissions = emissions.swapaxes(0, 1) # 调整emissions的维度 tags = tags.swapaxes(0, 1) # 调整tags的维度 else: max_length, batch_size = tags.shape # 提取batch_size和max_length if seq_length is None: seq_length = mnp.full((batch_size,), max_length, ms.int64) # 如果seq_length为None,填充为max_length mask = sequence_mask(seq_length, max_length) # 根据序列长度生成mask # 计算分子,shape: (batch_size,) numerator = compute_score(emissions, tags, seq_length - 1, mask, self.transitions, self.start_transitions, self.end_transitions) # 计算分母,shape: (batch_size,) denominator = compute_normalizer(emissions, mask, self.transitions, self.start_transitions, self.end_transitions) # 计算对数似然 llh = denominator - numerator # shape: (batch_size,) # 根据reduction类型返回结果 if self.reduction == 'none': return llh if self.reduction == 'sum': return llh.sum() if self.reduction == 'mean': return llh.mean() return llh.sum() / mask.astype(emissions.dtype).sum() # token_mean情况 def _decode(self, emissions, seq_length=None): """解码过程,使用Viterbi算法""" if self.batch_first: batch_size, max_length = emissions.shape[:2] # 提取batch_size和max_length emissions = emissions.swapaxes(0, 1) # 调整emissions的维度 else: batch_size, max_length = emissions.shape[:2] # 提取batch_size和max_length if seq_length is None: seq_length = mnp.full((batch_size,), max_length, ms.int64) # 如果seq_length为None,填充为max_length mask = sequence_mask(seq_length, max_length) # 根据序列长度生成mask return viterbi_decode(emissions, mask, self.transitions, self.start_transitions, self.end_transitions) # 调用Viterbi解码
代码解析
construct
方法:- 功能:根据提供的参数决定是进行前向计算(
_forward
)还是解码(_decode
)。 if tags is None
: 如果没有提供标签,调用解码方法。return self._forward(emissions, tags, seq_length)
: 否则调用前向传播方法。
- 功能:根据提供的参数决定是进行前向计算(
_forward
方法:- 功能:进行CRF模型的前向传播计算,返回对数似然值。
- 维度调整:根据
batch_first
参数调整emissions
和tags
的维度。 - 序列长度处理:如果
seq_length
为None
,则填充为最大长度。 mask = sequence_mask(seq_length, max_length)
: 生成mask矩阵。- 分子计算:调用
compute_score
计算分子。 - 分母计算:调用
compute_normalizer
计算分母。 - 对数似然计算:用分母减去分子得到对数似然
llh
。 - 结果返回:根据减少方法(
reduction
)返回对应的对数似然值。
_decode
方法:- 功能:进行序列解码,通常使用Viterbi算法。
- 维度调整:根据
batch_first
参数调整emissions
的维度。 - 序列长度处理:如果
seq_length
为None
,则填充为最大长度。 mask = sequence_mask(seq_length, max_length)
: 生成mask矩阵。- 返回:调用
viterbi_decode
进行解码并返回结果。
API解析
emissions
: 每个时间步的发射概率,形状为(batch_size, max_length, num_tags)。tags
: 标签序列,形状为(batch_size, max_length),用于前向计算。seq_length
: 实际序列长度,形状为(batch_size,),用于生成mask。compute_score
: 计算给定emissions
和tags
的得分。compute_normalizer
: 计算归一化因子,用于分母。viterbi_decode
: 实现Viterbi算法的解码函数,获取最优标签序列。sequence_mask
: 生成mask矩阵,指示哪些位置是有效的,避免计算填充值。
BiLSTM+CRF模型
在实现CRF后,我们设计一个双向LSTM+CRF的模型来进行命名实体识别任务的训练。模型结构如下:
nn.Embedding -> nn.LSTM -> nn.Dense -> CRF
其中LSTM提取序列特征,经过Dense层变换获得发射概率矩阵,最后送入CRF层。具体实现如下:
class BiLSTM_CRF(nn.Cell): def init(self, vocab_size, embedding_dim, hidden_dim, num_tags, padding_idx=0): """初始化BiLSTM-CRF模型的各个层""" super().init() # 调用父类的初始化方法 # 初始化嵌入层,将词汇转换为嵌入向量 self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx) # 初始化双向LSTM层 self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, bidirectional=True, batch_first=True) # 初始化全连接层,将LSTM输出转换为标签的得分 self.hidden2tag = nn.Dense(hidden_dim, num_tags, 'he_uniform') # 初始化CRF层 self.crf = CRF(num_tags, batch_first=True) def construct(self, inputs, seq_length, tags=None): """构建BiLSTM-CRF的前向计算过程""" embeds = self.embedding(inputs) # 将输入词汇转换为嵌入向量 outputs, _ = self.lstm(embeds, seq_length=seq_length) # LSTM层的前向传播 feats = self.hidden2tag(outputs) # 将LSTM输出经过全连接层转换为标签得分 crf_outs = self.crf(feats, tags, seq_length) # 通过CRF层计算最终输出 return crf_outs # 返回CRF层的输出
代码解析
BiLSTM_CRF
类:- 功能:定义一个双向LSTM-CRF模型,用于序列标注任务,如命名实体识别等。
init
方法:- 参数:
vocab_size
: 词汇表大小,用于嵌入层。embedding_dim
: 嵌入维度,词的表示向量的维度。hidden_dim
: LSTM隐藏层的维度(双向LSTM时实际输出维度为hidden_dim
)。num_tags
: 标签的数量,输出层的维度。padding_idx
: 用于填充的索引(默认值为0)。
- 层的初始化:
self.embedding
: 嵌入层,将词汇转换为向量。self.lstm
: 双向LSTM层,hidden_dim // 2
是每个方向的隐藏层维度。self.hidden2tag
: 全连接层,将LSTM的输出转换为每个标签的得分。self.crf
: CRF层,用于对标签序列进行建模。
- 参数:
construct
方法:- 功能:实现BiLSTM-CRF模型的前向传播过程。
- 参数:
inputs
: 输入的词汇序列,形状为(batch_size, max_length)。seq_length
: 序列长度,用于LSTM和mask的生成。tags
: 可选参数,实际标签序列,用于训练时的损失计算。
- 前向传播步骤:
embeds = self.embedding(inputs)
: 将输入词汇序列转换为嵌入向量。outputs, _ = self.lstm(embeds, seq_length=seq_length)
: 通过LSTM层进行前向传播,输出为每个时间步的隐藏状态。feats = self.hidden2tag(outputs)
: 将LSTM输出通过全连接层转换为标签得分(特征)。crf_outs = self.crf(feats, tags, seq_length)
: 通过CRF层计算最终的输出,返回CRF的输出。
API解析
nn.Embedding
: 嵌入层,用于将词汇表中的索引映射到固定维度的稠密向量。nn.LSTM
: LSTM层,具有双向特性,能处理序列数据。nn.Dense
: 全连接层,用于将LSTM的输出映射到标签得分空间。CRF
: 条件随机场层,用于处理序列标注任务,考虑标签之间的依赖关系。construct
: 该方法是MindSpore中定义模型前向传播的标准方法,负责执行计算图中的操作。
完成模型设计后,我们生成两句例子和对应的标签,并构造词表和标签表。
embedding_dim = 16 # 定义嵌入维度 hidden_dim = 32 # 定义LSTM隐藏层维度 training_data = [ # 定义训练数据 ( "清 华 大 学 坐 落 于 首 都 北 京".split(), # 输入序列 "B I I I O O O O O B I".split() # 标签序列 ), ( "重 庆 是 一 个 魔 幻 城 市".split(), "B I O O O O O O O".split() ) ] # 创建词汇到索引的映射 word_to_idx = {} word_to_idx['<pad>'] = 0 # 添加填充标记 for sentence, tags in training_data: # 遍历训练数据 for word in sentence: # 遍历每个句子中的单词 if word not in word_to_idx: # 如果单词不在词典中,添加 word_to_idx[word] = len(word_to_idx) # 创建标签到索引的映射 tag_to_idx = {"B": 0, "I": 1, "O": 2} # 标签映射 # 获取词汇大小 len(word_to_idx) # 计算词汇表的大小 # 实例化BiLSTM-CRF模型 model = BiLSTM_CRF(len(word_to_idx), embedding_dim, hidden_dim, len(tag_to_idx)) # 选择优化器 optimizer = nn.SGD(model.trainable_params(), learning_rate=0.01, weight_decay=1e-4) # 使用value_and_grad生成损失和梯度函数 grad_fn = ms.value_and_grad(model, None, optimizer.parameters) def train_step(data, seq_length, label): """进行一次训练步骤,计算损失和更新参数""" loss, grads = grad_fn(data, seq_length, label) # 计算损失和梯度 optimizer(grads) # 使用优化器更新参数 return loss # 返回损失值 # 将生成的数据打包成Batch def prepare_sequence(seqs, word_to_idx, tag_to_idx): """将序列转换为Tensor,并进行填充""" seq_outputs, label_outputs, seq_length = [], [], [] max_len = max([len(i[0]) for i in seqs]) # 找到输入序列的最大长度 for seq, tag in seqs: # 遍历序列和标签 seq_length.append(len(seq)) # 记录每个序列的长度 idxs = [word_to_idx[w] for w in seq] # 获取单词索引 labels = [tag_to_idx[t] for t in tag] # 获取标签索引 # 填充序列和标签 idxs.extend([word_to_idx['<pad>'] for i in range(max_len - len(seq))]) labels.extend([tag_to_idx['O'] for i in range(max_len - len(seq))]) seq_outputs.append(idxs) # 添加到输出列表 label_outputs.append(labels) return ms.Tensor(seq_outputs, ms.int64), \ ms.Tensor(label_outputs, ms.int64), \ ms.Tensor(seq_length, ms.int64) # 返回输入序列、标签和序列长度 # 准备训练数据 data, label, seq_length = prepare_sequence(training_data, word_to_idx, tag_to_idx) # 输出数据、标签和序列长度的形状 data.shape, label.shape, seq_length.shape # 预编译模型并进行训练 steps = 500 # 定义训练步骤 from tqdm import tqdm # 导入进度条库 with tqdm(total=steps) as t: for i in range(steps): # 进行训练 loss = train_step(data, seq_length, label) # 执行训练步骤 t.set_postfix(loss=loss) # 更新进度条的后缀信息为损失值 t.update(1) # 更新进度 # 观察训练后的模型效果 score, history = model(data, seq_length) # 获取模型输出的得分和历史 # 使用后处理函数进行预测得分的后处理 predict = post_decode(score, history, seq_length) # 处理得分以获得预测序列 # 将预测的index序列转换为标签序列 idx_to_tag = {idx: tag for tag, idx in tag_to_idx.items()} # 标签索引映射 def sequence_to_tag(sequences, idx_to_tag): """将索引序列转换为标签序列""" outputs = [] for seq in sequences: # 遍历每个预测序列 outputs.append([idx_to_tag[i] for i in seq]) # 转换为标签 return outputs # 返回标签列表 # 打印输出预测结果 sequence_to_tag(predict, idx_to_tag)
代码解析
- 定义超参数和训练数据:
embedding_dim
和hidden_dim
是嵌入层和LSTM隐藏层的维度。training_data
定义了训练用的句子和对应的标签。
- 构建词汇和标签映射:
word_to_idx
和tag_to_idx
分别构建词汇到索引和标签到索引的映射。- 使用填充标记
<pad>
,确保所有输入序列有相同长度。
- 模型实例化:
- 创建
BiLSTM_CRF
模型实例,并定义优化器SGD
。
- 创建
- 训练步骤定义:
train_step
函数执行一次训练,计算损失并更新模型参数。
- 准备序列数据:
prepare_sequence
函数将输入序列及其标签转换为Tensor,确保统一的长度。
- 训练过程:
- 通过
tqdm
库可视化训练过程,执行500个训练步骤。
- 通过
- 模型预测:
- 训练完成后,使用模型进行预测,并对得分进行后处理。
sequence_to_tag
函数将模型输出的索引序列转换为标签序列。
API解析
nn.Embedding
: 将单词映射为稠密的向量表示。nn.LSTM
: 实现LSTM层,能够处理序列数据。nn.Dense
: 全连接层,将LSTM的输出映射到标签空间。ms.Tensor
: MindSpore中的张量类型,用于存储数据。value_and_grad
: 计算模型的损失和梯度,用于优化。post_decode
: 处理模型输出分数并获得最终预测序列。tqdm
: 用于显示训练进度条的库。