《动手学深度学习pytorch版》代码详解——WORD2VEC的实现

# 导入包
import collections
import math
import random
import time
import os
import numpy as np
import torch
from torch import nn
import sys
import torch.utils.data as Data

1.处理数据集

# 打开并读取数据集ptb
dataset_path = "ptb.train.txt"
raw_dataset = []
with open (dataset_path, \'r\') as fo:
    lines = fo.readlines() # 逐行读取
    for st in lines:
        raw_dataset.append(st.split()) # split函数不带参数时以空格分割

print("# sentences %d" % len(raw_dataset))
sum_words = sum([len(st) for st in raw_dataset])

# 我在这里增加了一句原始数据集的所有单词总数,方便对照
print("all words number:", sum_words)
# 打印前三句以及每句的前五个词
for st in raw_dataset[:3]:
    print("#token:", len(st), st[:5])

1.1建立词语索引

# 建立词语索引
# 保留在数据集中至少出现了5次以上的词语
# 计数
counter = collections.Counter([tk for st in raw_dataset for tk in st])

# 创建词语对应出现次数字典
# filter过滤函数, lambda创建一次性匿名函数对象
counter = dict(filter(lambda x: x[1] >= 5, counter.items())) 
counter
# 将词语映射至整数索引
#字符_不做用处,作用是将每个词出现的次数给丢弃,保留单词
idx_to_token = [tk for tk, _ in counter.items()]

# 建立词语索引字典, key对应单词,value对应位置序号(默认从0开始)
# enumerate函数作用遍历列表并计数
token_to_idx = {tk: idx for idx, tk in enumerate(idx_to_token)}

# 遍历原始数据选取其中一行st,遍历st选取其中一单词tk, 如果在token_to_idx字典中,既满足出现次数>=5,其对应的索引号存入列表
# 所有单词将转换成数字
dataset = [[token_to_idx[tk] for tk in st if tk in token_to_idx] for st in raw_dataset]
num_tokens = sum([len(st) for st in dataset])

# 其实就是单词转成数字后还剩多少个词,887100相比于原始数据887521减少了一些
"# token:%d" % num_tokens

1.2二次采样

# 二次采样
# 函数传入单词对应的数字,根据idx_to_token返回对应索引的单词(字母式),再根据计数字典counter返回该单词的出现次数
# counter, idx_to_token, token_to_idx单词对应的索引位置下标全部是一样的
def discard(idx):
     return random.uniform(0, 1) < 1 - math.sqrt( 1e-4 / counter[idx_to_token[idx]] * num_tokens)
    
# 如果此单词的丢弃概率计算为random.uniform(0, 1)中的数字,则进入二次采样的新数据集
subsampled_dataset = [[tk for tk in st if not discard(tk)] for st in dataset]

# 新数据集结果不固定, 如果将random,uniform(0, 1)直接换为0,数据集结果固定不变
\'# tokens: %d\' % sum([len(st) for st in subsampled_dataset])
# 比较一个词在二次采样前后出现在数据集中的次数
def compare_counts(token):
    return "# %s: before = %d, after = %d" % (token, sum([st.count(token_to_idx[token]) for st in dataset]), sum([st.count(token_to_idx[token]) for st in subsampled_dataset]))

# 第二个参数sum([st.count(token_to_idx[token]) for st in dataset]) 可以换成counter[token]

print(compare_counts("the"))
print(compare_counts("join"))

1.3提取中心词和背景词

背景词:与中心词距离不超过背景窗口大小的词

def get_centers_and_contexts(dataset, max_window_size):
    centers, contexts = [], []
    for st in dataset:
        if len(st) < 2: # 每个句子至少有2个词才可能组成一对“中心词-背景词”
            continue
        # 可以组成“中心词-背景词”进入centers列表
        centers += st # 类似centers.extend()
        for center_i in range(len(st)):
            # 背景窗口大小
            # 每次在1和max_window_size之间随机均匀采样一个整数作为背景窗口大小
            window_size = random.randint(1, max_window_size)
            # 切片另存储为列表,max,min函数防止出st边界
            # indices内的数字不是单词索引,只是满足背景词范围的数字,从边界到边界计数
            indices = list(range(max(0, center_i - window_size), min(len(st), center_i+1+window_size)))
            indices.remove(center_i) # 将中心词排除在背景词之外
            # 前面的indices作为单词数字形式的索引
            #eg:单词索引[4, 8, 1, 7, 6, 9, 35]
            #   indices背景词列表[1, 2, 4, 5], 去掉了中心词3, 背景窗口为2
            #   contexts列表为[8, 1, 6, 9]
            contexts.append([st[idx] for idx in indices])
    return centers, contexts
all_centers, all_contexts = get_centers_and_contexts(subsampled_dataset, 5)

2.负采样

对于一对中心词和背景词,随机采样K个噪声词

噪声词采样概率\(p(w) = (\frac{w词频}{总词频})^{0.75}\)

def get_negatives(all_contexts, sampling_weights, K):
    all_negatives, neg_candidates, i = [], [], 0
    population = list(range(len(sampling_weights)))
    for contexts in all_contexts:
        negatives = []
        while len(negatives) < len(contexts) * K:
            if i == len(neg_candidates):
                # 根据每个词的权重(sampling_weights)随机⽣成k个词的索引作为噪声词。
                i, neg_candidates = 0, random.choices(population, sampling_weights, k=int(1e5))
    
            neg, i = neg_candidates[i], i + 1
            # 噪声词不能是背景词
            if neg not in set(contexts):
                negatives.append(neg)
        all_negatives.append(negatives)
    return all_negatives

#计算采样概率
sampling_weights = [(counter[w]/num_tokens)**0.75 for w in idx_to_token]
all_negatives = get_negatives(all_contexts, sampling_weights, 5)

random.choice(seq)函数,作用在seq序列中随机抽取

random.choices(population, weights=None, cum_weights=None, k=1)函数,作用从集群中按权重随机选取k次数据,返回一个列表,选取不会影响原序列

population 集群指被选取的对象,weights 相对权重(集群中各个值被抽取的概率),

cum_weights累加权重(根据相对权重,类似离散型随即变量的分布函数计算),k选取次数

3.读取数据

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, centers, contexts, negatives):
        self.centers = centers
        self.contexts = contexts
        self.negatives = negatives
    
    def __getitem__(self, index):
        return (self.centers[index], self.contexts[index], self.negatives[index])
    
    def __len__(self):
        return len(self.centers)

自定义函数batchify指定了小批量的读取方式,用作DataLoader的参数collate_fn

传入参数data包含中心词,背景词和噪声词,输入长度是batch_size的list,其中每个元素是MyDataset类调用_getitem_得到的结果

为了避免填充项对损失函数计算的影响,构造了掩码变量 masks , 其每⼀个元素分别与连结后的背景词和噪声词contexts_negatives 中的元素⼀⼀对应。当 contexts_negatives 变量中的某个元素为填充项时,相同位置的掩码变量 masks 中 的元素取0,否则取 1 。

为了区分正负类,即区分背景词和噪声词,模仿masks构造labels,背景词对应的元素为1,其余为0

def batchify(data): 
    # 找出data中背景词+噪声词最大长度
    max_len = max(len(c) + len(n) for _, c, n in data)
    centers, contexts_negatives, masks, labels = [], [], [], []
    for center, context, negative in data:
        # 计算现有背景词+噪声词长度
        cur_len = len(context) + len(negative)
        centers += [center]
        # 现有长度和最大长度比较,小于就填充0,保持contexts_negatives每一项长度相等
        contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
        # 有填充时, 填充项的位置为0
        masks += [[1] * cur_len + [0] * (max_len - cur_len)]
        # 长度与contexts_negatives相同,背景词的长度全设为1,其余额外的长度为0
        labels += [[1] * len(context) + [0] * (max_len - len(context))]
    return (torch.tensor(centers).view(-1, 1), torch.tensor(contexts_negatives), torch.tensor(masks), torch.tensor(labels))
batch_size = 512
num_workers = 0 if sys.platform.startswith("win32") else 4

dataset = MyDataset(all_centers, all_contexts, all_negatives)
data_iter = Data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify, num_workers=num_workers)

# 打印第一个批量的各个变量的形状
for batch in data_iter:
    for name, data in zip([\'centers\', \'contexts_negatives\', \'masks\', \'labels\'], batch):
        print(name, \'shape:\', data.shape)
    break

4.跳字模型

根据嵌入层和小批量乘法实现跳字模型

嵌入层:获取词嵌入的层;

nn.Embedding(num_embedddings, embedding_dim)看作是一个矩阵;num_embedddings为行数,由词典大小决定,一般输入为词的索引;embedding_dim为列数,作为每个词向量的维度,是超参数,自定义。

def skip_gram(center, contexts_and_negatives, embed_v, embed_u):
    v = embed_v(center)
    u = embed_u(contexts_and_negatives)
    pred = torch.bmm(v, u.permute(0, 2, 1))
    return pred

5.训练模型

# 二元交叉熵损失函数
class SigmoidBinaryCrossEntropyLoss(nn.Module):
    def __init__(self): # none mean sum
        super(SigmoidBinaryCrossEntropyLoss, self).__init__()
    def forward(self, inputs, targets, mask=None):
        """
        input – Tensor shape: (batch_size, len)
        target – Tensor of the same shape as input
        """
        inputs, targets, mask = inputs.float(), targets.float(), mask.float()
        res =nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction="none", weight=mask)
        return res.mean(dim=1)
    
loss = SigmoidBinaryCrossEntropyLoss()
# 初始化模型参数
embed_size = 100

# 嵌入层输入为词的索引

net = nn.Sequential(nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size),
                    nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size)
                   )
net
"""
Sequential(
  (0): Embedding(9858, 100)
  (1): Embedding(9858, 100))
"""
def train(net, lr, num_epoches):
    device = torch.device(\'cuda\' if torch.cuda.is_available() else \'cpu\')
    print("train on", device)
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    for epoch in range(num_epoches):
        start, l_sum, n = time.time(), 0.0, 0
        for batch in data_iter:
            center, context_negative, mask, label = [d.to(device) for d in batch]
            pred = skip_gram(center, context_negative, net[0], net[1])

            # 使⽤掩码变量mask来避免填充项对损失函数计算的影响, 当mask=1,相应位置的预测值和标签将将参与损失函数的计算
            l = (loss(pred.view(label.shape), label, mask) * mask.shape[1] / mask.float().sum(dim=1)).mean()
            # 一个batch的平均loss
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            l_sum += l.cpu().item()
            n += 1
        print(\'epoch %d, loss %.2f, time %.2fs\' % (epoch + 1, l_sum / n, time.time() - start))

train(net, 0.01, 10)