【自然语言】手把手实现bilstm实体抽取,并使用TensorFlow自定义损失函数 - 大白社区
- 发帖时间:
- 2023-04-29 11:17:35
摘要:使用bilstm和手工自定义标签权重的损失函数,实现简单文本实体抽取
手把手实现bilstm简单实体抽取
自然语言之手把手实现bilstm实体抽取
实体抽取,其实简单来说就是把文字内容的关键部分文字字进行标注,接着放入深度学习bilstm模型里去预测每个字出现的概率。
本文只是从数据处理和定义函数方面进行研究,实现真正的模型效果需要进行修改
读取数据
这个文本数据集来自爬虫得到的文本数据,每一行文本都是对应一个文档。
这是每一篇文档标注好的信息,包括招标人、中标人、中标金额、中标时间,分别对应四个实体。
读取数据代码
# 读取标注数据
import csv
with open('merged1.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
next(reader) # 跳过表头
#为了读取标注的索引
labeled_data = [(int(row[0])-1,row[1], row[2], row[3], row[4]) for row in reader]
# 读取文本数据
with open('merged1.txt', encoding='utf-8') as f:
text_data = f.read().splitlines()
数据BIO标注法
序列标注(Sequence labeling)在序列标注中,我们想对一个序列的每一个元素标注一个标签。一般来说,一个序列指的是一个句子,而一个元素指的是句子中的一个词。在本文中为了方便直接使用了是单个字作为元素。
BIO标注法:
B-begin,代表实体的开头
I-inside,代表实体的中间或结尾
O-outside,代表不属于实体
大多数情况下,直接用BIO就可以了; 大多数情况下BIO和BIOES的结果差不太多
最终实现效果应当如下:
BIO标注法代码实现
# 定义标注函数
def bilou_tagging(text, entities):
"""
使用BILOU方法对文本进行实体标注。
text: 文本字符串。
entities: 实体列表,每个实体由实体类型、开始位置、结束位置组成。
返回值: 实体标注后的文本字符串。
"""
tags = ['O'] * len(text)
for entity_type, start, end in entities:
for i in range(start, end+1):
if i == start:
tags[i] = 'B-' + entity_type
elif i == end:
# 在这里修改I-为L-就能变成BILOU标注法,如果不修改
# 那就是BIO标注法
tags[i] = 'I-' + entity_type
elif i == start + 1:
tags[i] = 'I-' + entity_type
else:
tags[i] = 'I-' + entity_type
return tags
# 将数据转换为适合训练的格式
data = []
for i, text in enumerate(text_data):
entities = []
#labeled_data,text_data这两个数据通过for循环找出对应匹配索引
for row in labeled_data:
if row[0] == i:
# 使用row[1]作为判断,当标注招标人信息为空时候,
# 返回fasle,直接跳过,减少代码运行时间
if row[1]:
start = text.find(row[1])
# find查询语句是查出对应文档中文字的位置
# 如果没有找到find函数就输出-1
if start !=-1:
end = start + len(row[1]) - 1
entities.append(('招标人', start, end))
if row[2]:
start = text.find(row[2])
if start !=-1:
end = start + len(row[2]) - 1
entities.append(('中标人', start, end))
if row[3]:
start = text.find(row[3])
if start !=-1:
end = start + len(row[3]) - 1
entities.append(('中标金额', start, end))
if row[4]:
# 标注数据中用Excel有可能存在粘贴之后时间格式发生改变
# 如使用替换将2012/12/12替换成2012-12-12
date_str = row[4].replace('/', '-')
start = text.find(date_str)
if start !=-1:
end = start + len(date_str) - 1
entities.append(('中标时间', start, end))
tags = bilou_tagging(text, entities)
entities = [{'start': start, 'end': end, 'label': entity_type} for entity_type, start, end in entities]
data.append((text, {'entities': entities, 'labels': tags}))
通过这两个代码我们获取了这个数据,我们使用spyder中的变量浏览器查看这个data变量里的各类数据。
数据预处理
对于处理标注好的数据并不能直接使用,还需要进一步处理,为了减少不必要的训练时间,我们将文档中超过整体文档4分之3的最大长度去掉(有些文档长度超过了1万字),处理后我们需要的文档长度仅仅只有1800长度,又能拥有不错的数据量。1890个文档处理后,一共得到的文档1438个。
##计算文本4分之3的文档最大长度
# 计算每个标签的长度
label_lengths = [len(seq) for seq in labels ]
sorted_lengths = sorted(label_lengths)
# 计算第三个四分位数(Q3)
q3 = np.percentile(sorted_lengths, 75)
# # 筛选长度小于等于Q3的标签,并计算它们的平均长度
long_labels = [label for label in labels if len(label) <= q3]
avg_length = sum(len(label) for label in long_labels) / len(long_labels)
print(f"75%的标签长度小于等于{q3:.2f}")
print(f"平均长度为{avg_length:.2f}")
我们把标记无关的0,文字标签设置为0,从文本数据中进行处理。
# 定义标记映射表
tag2idx = {'O': -1, 'B-招标人': 1, 'I-招标人': 2, 'B-中标人': 3, 'I-中标人': 4,'B-中标金额': 5, 'I-中标金额': 6, 'B-中标时间':7, 'I-中标时间': 8}
idx2tag = {idx: tag for tag, idx in tag2idx.items()}
texts = []
labels = []
for sample in data:
if len(sample[0]) < 1800 and len(sample[1]['labels']) < 1800:
texts.append(sample[0])
labels.append(sample[1]['labels'])
# 将每个字符映射到一个唯一的整数索引
char2idx = {}
for sent in texts:
for char in sent:
if char not in char2idx:
char2idx[char] = len(char2idx)
# 将文本转换为整数索引序列
X = [[char2idx.get(c, 1) for c in s] for s in texts]
# 将标记列表转换为索引列表
y = [[tag2idx[w] for w in sent] for sent in labels]
通过获得字符映射到一个唯一的整数索引,我们得到X,y
再通过tensorflow进行数据填充,为了方便处理数据,我们将填充的值设置为最后一个类别9
from tensorflow.keras.preprocessing.sequence import pad_sequences
maxlen=1800
vocab_size=len(char2idx)
# 对文本进行填充,使得所有序列的长度相同
X = pad_sequences(X, maxlen=maxlen, padding='post', value=9)
# 构建嵌入层,将每个整数索引映射到一个固定大小的向量
y = pad_sequences(y, maxlen=maxlen, padding='post', value=9)
y = to_categorical(y, num_classes=len(tag2idx)+1)
标签极度不平衡
我们的数据集中,一个文档1800个字,至少1200是不标注的,被标注为的数据,比如B-招标人,整个文档就一个打了这个标签,而I-招标人,可能也就10个字符左右,同理其他四个实体的标签求和加起来,可能有真正有标签的也就44个=(1+10)+(1+10)+(1+10)+(1+10)
而标签为0可能就有1200个,剩下的是500个就是填充缺失文字数据,
可以看出来
不是实体: 实体1: 实体2:实体3:实体4:填充文本= 1200 : 1 : 10 : 1 : 10 : 1 : 10 : 1 : 10 : 500
数据呈现极度不平衡,导致模型可能训练结果都是0或者都是填充值,准确率极高,但反而说明模型懒惰了,但只要给标签定义正确权重大小,那模型便有很好的效果。
构建自定义损失函数
为了解决这个问题,我尝试找相关的损失函数可以自定义权重,
softmax_cross_entropy_with_logits这个损失函数需要独热编码,但不能自定义标签权重。
sparse_categorical_crossentrop这个损失函数可以不使用独热编码,但只支持一维的标签权重。
通过查看帮助文档,我发现其中有一个
weighted_cross_entropy_with_logits,可以自定义权重的损失函数,但
由于我们的预测标签经过独热编码后是三维数组(1438,1800,10),方便使用sorftmax层进行一个个单词预测,因而不能直接使用的他们自带的损失函数。
自定义函数似乎很麻烦,本人也搞了很久,主要是被三维数组赋值这一块牵制,TensorFlow不能直接像numpy一样用赋值 (例如a1==a) 就能直接更新所需要的值,需要使用tensor_scatter_nd_update()这个函数对高纬度的数组进行值的更新。
# 这个是三维向量在TensorFlow中更新的方法
import tensorflow as tf
ref = tf.constant(np.ones(shape=[6, 6, 3], dtype=np.float32))
indices = tf.constant([[4, 4, 1], [3, 0, 2], [1, 5, 0], [5, 0, 1]], dtype=tf.int32)
updates = tf.constant([9, 10, 11, 12], dtype=tf.float32)
update = tf.tensor_scatter_nd_update(ref, indices, updates)
print(update)
tf.config.run_functions_eagerly(True)
这个代码是用在未定义好损失函数时,类似于for循环,
每个计算都是按顺序,这样带来的好处就方便调试代码,
使 tf.function 的 所有调用都急切地运行,而不是作为跟踪图函数运行。
但因为并非是先计算图结构,所以会导致计算效率下降。我们调试好后,就可以注释掉这段代码。
自定义损失python代码
当我们把代码调试好后,就能应用,实际上代码还能简化,但是
from tensorflow.keras.backend import expand_dims
from tensorflow.keras.losses import Loss
# tf.config.run_functions_eagerly(True)
class WeightedCrossEntropyLoss(Loss):
def __init__(self, pos_weight=None):
super(WeightedCrossEntropyLoss, self).__init__()
self.pos_weight = pos_weight
def call(self, y_true, y_pred):
if self.pos_weight is not None:
# 使用传递进来的 pos_weight
pos_weight = self.pos_weight
else:
# 如果没有传递 pos_weight,则默认为 1
pos_weight = tf.ones_like(y_true)
# 查找标签(对独热编码后标签切片,查找标签为1,返回二维数组,如(5,6))
mask = tf.equal(y_true[...,9], 1)
mask0 = tf.equal(y_true[...,0], 1)
# 索引(二维数组并非三维数组),并转化格式
indices_3d = tf.cast(tf.where(mask), dtype=tf.int32)
indices_3d_0 = tf.cast(tf.where(mask0), dtype=tf.int32)
# 根据二维数组的查找的对应的值的长度,新增一列维度,
#最后(查出是1的值的一共有多少个,9)
new_col = tf.fill([len(indices_3d), 1],9)
new_col_0 = tf.fill([len(indices_3d_0), 1],0)
# 合并,作为一个可以查询的数组(5,6,9)(5,6)查询为位置,9为维度
new_indices_3d = tf.concat([indices_3d[:, :2], new_col], axis=1)
new_indices_3d_0 = tf.concat([indices_3d_0[:, :2], new_col_0], axis=1)
# 这个是一个一维数据,赋值为权重大小
updates = tf.fill(tf.shape(indices_3d)[:-1], 0.01)
updates0 = tf.fill(tf.shape(indices_3d_0)[:-1], 0.5)
# 根据真实标签和索引,设置为
pos_weight = tf.tensor_scatter_nd_update(y_true, new_indices_3d, updates)
pos_weight0 = tf.tensor_scatter_nd_update(pos_weight, new_indices_3d_0, updates0)
loss = tf.nn.weighted_cross_entropy_with_logits(labels=y_true, logits=y_pred, pos_weight=pos_weight0)
#对三维的loss进行求和,当然也可以去tf.reduce_mean取决于任务要求
#(本文由于没有对每个标签进行权重修改,所以损失值很难下降,用sum求和更方便看效果)
loss = tf.reduce_sum(loss, axis=[1, 2])
return loss
我们已经学会了如何自定义一个可以修改各个标签权重的损失函数了,现在我们可以根据我们的标签,自定义权重,这里我简单的修改标签来修改权重。(这个代码可还能修改,修改的话就是在计算损失函数前就把这个权重计算好,并用batch分配)
可以看到不同标签被赋予了不同权重
模型训练
#导包
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Embedding, Bidirectional, LSTM, Dropout, TimeDistributed, Dense
from tensorflow.keras.models import Model
from tensorflow.keras import metrics
from tensorflow.keras.layers import Embedding
from tensorflow.keras.backend import expand_dims
from tensorflow.keras.losses import Loss
embedding_dim = 16
input = Input(shape=(maxlen,))
model = Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=maxlen)(input)
model = Dropout(0.1)(model)
model = Bidirectional(LSTM(units=64, return_sequences=True, recurrent_dropout=0.1))(model)
#虽然使用relu可以有更好的效果,
#但是在TensorFlow.keras貌似只有默认的tanh支持加速
#没有加速一轮需要8分钟,有加速6分钟
model = TimeDistributed(Dense(32, activation="tanh"))(model)
model = TimeDistributed(Dense(len(idx2tag)+1, activation="softmax"))(model)
model = Model(input, model)
# 使用自定义损失函数
model.compile(optimizer="adam", loss=WeightedCrossEntropyLoss(), metrics=['categorical_accuracy'])
model.summary()
model.fit(X, y, batch_size=64, epochs=4, validation_split=0.1)
由于设备比较简陋(1650ti 4g显存),所使用的都是比较小的参数,也只有训练了4轮。模型不看准确率的效果,只看损失值是否下降。
模型测试
test_text = '''本次招标的中标人为华为公司,中标金额为100万,中标时间为2022年3月。'''
# 将测试文本转换为字索引序列
test_x = [[char2idx.get(c, 1) for c in test_text]]
# 对输入进行填充
test_x = pad_sequences(test_x, maxlen=maxlen, padding='post', value=0)
# 进行预测
pred = model.predict(test_x)
pred = np.argmax(pred, axis=-1)
idx2tag1 = {0: 'O',
1: 'B-招标人',
2: 'I-招标人',
3: 'B-中标人',
4: 'I-中标人',
5: 'B-中标金额',
6: 'I-中标金额',
7: 'B-中标时间',
8: 'I-中标时间',
9:'<pad>'}
# 将预测的标记序列转换为标记文本
pred_tags = [idx2tag1[i] for i in pred[0]]
print(pred_tags)
查看模型预测效果,模型通过训练只能分辨0和填充值
虽然预测效果很差,但是并不能说明模型没有用,它并没有随缘训练
虽然还是预测为0,但是从招标人或者是中标金额来看其实相比其他分类预测概率来看已经相比其他概率高很多,因为训练还不够,模型结构参数量太少了,修改模型参数,或者再加一层全连接分类层(或者是bilstm+CRF层),便应该能获得更好的效果。