《动手学深度学习pytorch版》代码详解——WORD2VEC的实现
# 导入包
import collections
import math
import random
import time
import os
import numpy as np
import torch
from torch import nn
import sys
import torch.utils.data as Data
1.处理数据集
# 打开并读取数据集ptb
dataset_path = "ptb.train.txt"
raw_dataset = []
with open (dataset_path, \'r\') as fo:
lines = fo.readlines() # 逐行读取
for st in lines:
raw_dataset.append(st.split()) # split函数不带参数时以空格分割
print("# sentences %d" % len(raw_dataset))
sum_words = sum([len(st) for st in raw_dataset])
# 我在这里增加了一句原始数据集的所有单词总数,方便对照
print("all words number:", sum_words)
# 打印前三句以及每句的前五个词
for st in raw_dataset[:3]:
print("#token:", len(st), st[:5])
1.1建立词语索引
# 建立词语索引
# 保留在数据集中至少出现了5次以上的词语
# 计数
counter = collections.Counter([tk for st in raw_dataset for tk in st])
# 创建词语对应出现次数字典
# filter过滤函数, lambda创建一次性匿名函数对象
counter = dict(filter(lambda x: x[1] >= 5, counter.items()))
counter
# 将词语映射至整数索引
#字符_不做用处,作用是将每个词出现的次数给丢弃,保留单词
idx_to_token = [tk for tk, _ in counter.items()]
# 建立词语索引字典, key对应单词,value对应位置序号(默认从0开始)
# enumerate函数作用遍历列表并计数
token_to_idx = {tk: idx for idx, tk in enumerate(idx_to_token)}
# 遍历原始数据选取其中一行st,遍历st选取其中一单词tk, 如果在token_to_idx字典中,既满足出现次数>=5,其对应的索引号存入列表
# 所有单词将转换成数字
dataset = [[token_to_idx[tk] for tk in st if tk in token_to_idx] for st in raw_dataset]
num_tokens = sum([len(st) for st in dataset])
# 其实就是单词转成数字后还剩多少个词,887100相比于原始数据887521减少了一些
"# token:%d" % num_tokens
1.2二次采样
# 二次采样
# 函数传入单词对应的数字,根据idx_to_token返回对应索引的单词(字母式),再根据计数字典counter返回该单词的出现次数
# counter, idx_to_token, token_to_idx单词对应的索引位置下标全部是一样的
def discard(idx):
return random.uniform(0, 1) < 1 - math.sqrt( 1e-4 / counter[idx_to_token[idx]] * num_tokens)
# 如果此单词的丢弃概率计算为random.uniform(0, 1)中的数字,则进入二次采样的新数据集
subsampled_dataset = [[tk for tk in st if not discard(tk)] for st in dataset]
# 新数据集结果不固定, 如果将random,uniform(0, 1)直接换为0,数据集结果固定不变
\'# tokens: %d\' % sum([len(st) for st in subsampled_dataset])
# 比较一个词在二次采样前后出现在数据集中的次数
def compare_counts(token):
return "# %s: before = %d, after = %d" % (token, sum([st.count(token_to_idx[token]) for st in dataset]), sum([st.count(token_to_idx[token]) for st in subsampled_dataset]))
# 第二个参数sum([st.count(token_to_idx[token]) for st in dataset]) 可以换成counter[token]
print(compare_counts("the"))
print(compare_counts("join"))
1.3提取中心词和背景词
背景词:与中心词距离不超过背景窗口大小的词
def get_centers_and_contexts(dataset, max_window_size):
centers, contexts = [], []
for st in dataset:
if len(st) < 2: # 每个句子至少有2个词才可能组成一对“中心词-背景词”
continue
# 可以组成“中心词-背景词”进入centers列表
centers += st # 类似centers.extend()
for center_i in range(len(st)):
# 背景窗口大小
# 每次在1和max_window_size之间随机均匀采样一个整数作为背景窗口大小
window_size = random.randint(1, max_window_size)
# 切片另存储为列表,max,min函数防止出st边界
# indices内的数字不是单词索引,只是满足背景词范围的数字,从边界到边界计数
indices = list(range(max(0, center_i - window_size), min(len(st), center_i+1+window_size)))
indices.remove(center_i) # 将中心词排除在背景词之外
# 前面的indices作为单词数字形式的索引
#eg:单词索引[4, 8, 1, 7, 6, 9, 35]
# indices背景词列表[1, 2, 4, 5], 去掉了中心词3, 背景窗口为2
# contexts列表为[8, 1, 6, 9]
contexts.append([st[idx] for idx in indices])
return centers, contexts
all_centers, all_contexts = get_centers_and_contexts(subsampled_dataset, 5)
2.负采样
对于一对中心词和背景词,随机采样K个噪声词
噪声词采样概率\(p(w) = (\frac{w词频}{总词频})^{0.75}\)
def get_negatives(all_contexts, sampling_weights, K):
all_negatives, neg_candidates, i = [], [], 0
population = list(range(len(sampling_weights)))
for contexts in all_contexts:
negatives = []
while len(negatives) < len(contexts) * K:
if i == len(neg_candidates):
# 根据每个词的权重(sampling_weights)随机⽣成k个词的索引作为噪声词。
i, neg_candidates = 0, random.choices(population, sampling_weights, k=int(1e5))
neg, i = neg_candidates[i], i + 1
# 噪声词不能是背景词
if neg not in set(contexts):
negatives.append(neg)
all_negatives.append(negatives)
return all_negatives
#计算采样概率
sampling_weights = [(counter[w]/num_tokens)**0.75 for w in idx_to_token]
all_negatives = get_negatives(all_contexts, sampling_weights, 5)
random.choice(seq)函数,作用在seq序列中随机抽取
random.choices(population, weights=None, cum_weights=None, k=1)函数,作用从集群中按权重随机选取k次数据,返回一个列表,选取不会影响原序列
population 集群指被选取的对象,weights 相对权重(集群中各个值被抽取的概率),
cum_weights累加权重(根据相对权重,类似离散型随即变量的分布函数计算),k选取次数
3.读取数据
class MyDataset(torch.utils.data.Dataset):
def __init__(self, centers, contexts, negatives):
self.centers = centers
self.contexts = contexts
self.negatives = negatives
def __getitem__(self, index):
return (self.centers[index], self.contexts[index], self.negatives[index])
def __len__(self):
return len(self.centers)
自定义函数batchify指定了小批量的读取方式,用作DataLoader的参数collate_fn
传入参数data包含中心词,背景词和噪声词,输入长度是batch_size的list,其中每个元素是MyDataset类调用_getitem_得到的结果
为了避免填充项对损失函数计算的影响,构造了掩码变量 masks , 其每⼀个元素分别与连结后的背景词和噪声词contexts_negatives 中的元素⼀⼀对应。当 contexts_negatives 变量中的某个元素为填充项时,相同位置的掩码变量 masks 中 的元素取0,否则取 1 。
为了区分正负类,即区分背景词和噪声词,模仿masks构造labels,背景词对应的元素为1,其余为0
def batchify(data):
# 找出data中背景词+噪声词最大长度
max_len = max(len(c) + len(n) for _, c, n in data)
centers, contexts_negatives, masks, labels = [], [], [], []
for center, context, negative in data:
# 计算现有背景词+噪声词长度
cur_len = len(context) + len(negative)
centers += [center]
# 现有长度和最大长度比较,小于就填充0,保持contexts_negatives每一项长度相等
contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
# 有填充时, 填充项的位置为0
masks += [[1] * cur_len + [0] * (max_len - cur_len)]
# 长度与contexts_negatives相同,背景词的长度全设为1,其余额外的长度为0
labels += [[1] * len(context) + [0] * (max_len - len(context))]
return (torch.tensor(centers).view(-1, 1), torch.tensor(contexts_negatives), torch.tensor(masks), torch.tensor(labels))
batch_size = 512
num_workers = 0 if sys.platform.startswith("win32") else 4
dataset = MyDataset(all_centers, all_contexts, all_negatives)
data_iter = Data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify, num_workers=num_workers)
# 打印第一个批量的各个变量的形状
for batch in data_iter:
for name, data in zip([\'centers\', \'contexts_negatives\', \'masks\', \'labels\'], batch):
print(name, \'shape:\', data.shape)
break
4.跳字模型
根据嵌入层和小批量乘法实现跳字模型
嵌入层:获取词嵌入的层;
nn.Embedding(num_embedddings, embedding_dim)看作是一个矩阵;num_embedddings为行数,由词典大小决定,一般输入为词的索引;embedding_dim为列数,作为每个词向量的维度,是超参数,自定义。
def skip_gram(center, contexts_and_negatives, embed_v, embed_u):
v = embed_v(center)
u = embed_u(contexts_and_negatives)
pred = torch.bmm(v, u.permute(0, 2, 1))
return pred
5.训练模型
# 二元交叉熵损失函数
class SigmoidBinaryCrossEntropyLoss(nn.Module):
def __init__(self): # none mean sum
super(SigmoidBinaryCrossEntropyLoss, self).__init__()
def forward(self, inputs, targets, mask=None):
"""
input – Tensor shape: (batch_size, len)
target – Tensor of the same shape as input
"""
inputs, targets, mask = inputs.float(), targets.float(), mask.float()
res =nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction="none", weight=mask)
return res.mean(dim=1)
loss = SigmoidBinaryCrossEntropyLoss()
# 初始化模型参数
embed_size = 100
# 嵌入层输入为词的索引
net = nn.Sequential(nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size),
nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size)
)
net
"""
Sequential(
(0): Embedding(9858, 100)
(1): Embedding(9858, 100))
"""
def train(net, lr, num_epoches):
device = torch.device(\'cuda\' if torch.cuda.is_available() else \'cpu\')
print("train on", device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
for epoch in range(num_epoches):
start, l_sum, n = time.time(), 0.0, 0
for batch in data_iter:
center, context_negative, mask, label = [d.to(device) for d in batch]
pred = skip_gram(center, context_negative, net[0], net[1])
# 使⽤掩码变量mask来避免填充项对损失函数计算的影响, 当mask=1,相应位置的预测值和标签将将参与损失函数的计算
l = (loss(pred.view(label.shape), label, mask) * mask.shape[1] / mask.float().sum(dim=1)).mean()
# 一个batch的平均loss
optimizer.zero_grad()
l.backward()
optimizer.step()
l_sum += l.cpu().item()
n += 1
print(\'epoch %d, loss %.2f, time %.2fs\' % (epoch + 1, l_sum / n, time.time() - start))
train(net, 0.01, 10)