Benjamin Sturgeon

12 November 2023

Goals of this project

The aim of this project was to act as a simple exploratory project to practice building a fairly fundamental tool in NLP.

I learned the concepts behind the word2vec model, and while it was fairly understandable I wanted to see how it would translate to code.

I also got to practice working more with the pytorch library as a result, which was a big win.

The biggest challenge for me in building this was getting the vector dimensions right for matrix multiplication. Learning to respect that process and approach it slowly was valuable.

The original repo can be found at https://github.com/BenSturgeon/word_2_vec

import re
import random
import torch.nn as nn
import torch
import numpy as np
from fastcore import *
from nbdev.showdoc import *
from tqdm import tqdm
import pickle as pkl

Data ingestion

Here we define some functions for reading in and cleaning the data.

The primary data cleaning involves removing any characters which aren't letters as well making all those letters lowercase. We also remove all whitespace except for single spaces between words.

We want to remove words which do not carry much semantic weight on their own, which we will call stop words. We simply filter these out of the data.

def read_file(file_name):
    with open(file_name, 'r') as f:
        data = f.read()
    return data

def remove_non_alpha_characters(data):
    data = data.lower()
    # use regex to remove all non-alphanumeric characters
    data = re.sub(r'[^a-zA-Z\s]', '', data)
    # use regex to remove all whitespace characters
    data = re.sub(r'\s+', ' ', data)
    return data


def remove_stopwords(data):
    stopwords = ['a', 'an', 'the', 'and', 'or', 'but', 'if', 'then', 'else', 'when', 'at', 'from', 'by', 'on', 'off', 'for', 'in', 'out', 'over', 'to', 'into', 'with', ""]
    data = [word for word in data if word not in stopwords]
    return data

raw_data = read_file('data/shakespeare.txt')

data = remove_non_alpha_characters(raw_data)
data = data.split(" ")
data = remove_stopwords(data)

Here we create our dictionary of unique words which we will use for defining our embeddings.

We make this into a dictionary so that later we can refer to these values and get an index from each one.

unique_words = set(list(data))
unique_dict = {word: i for i, word in enumerate(unique_words)}

Create a train loader and dataset

Here we create a train_loader that will randomly generate samples for us to use during training.

We choose a large batch size as this task does not demand a great deal of memory and larger batches help with training speed.

We define only the functions here and call them later so that all hyper parameters can be defined and run in one place.

def return_list_without_a_value(data, value):
    return [x for x in data if x != value]

def create_dataset(window_size, data):
    dataset = []

    for index, val in enumerate(data):
        sub = data[max(0,index-window_size):index]
        sub.extend(data[index+1:min(index+window_size, len(data))])
        for target in sub:
            dataset.append((unique_dict[val],unique_dict[target]))
    return dataset  

def create_dataloader(dataset, batch_size):
    train_loader = torch.utils.data.DataLoader(dataset=dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)
    return train_loader

Defining our model

Here we define our skipgram model class.

We take advantage of the built in pytorch module and embedding class to create our vector embeddings and handle the updating of our embedding weights.

Our embeddings are essentially just matrices with dimensions matching the size of our dictionary and the embedding size we choose to represent the different features that will emerge from our data.

One thing to take note of is how we initialise our weights. Currently we simply use a normal distribution with a mean of 0 and std_deviation of 0.1. It is possible there are better initialisations to use here. We also use sparse embeddings many of the values will be very close to, or at 0.

class SkipGramModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(SkipGramModel, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.u_embeddings = nn.Embedding(vocab_size, embedding_dim, sparse=True)
        self.v_embeddings = nn.Embedding(vocab_size, embedding_dim, sparse=True)
        self.init_emb()

    def init_emb(self):
        init_mean = 0
        init_std = 0.1
        self.u_embeddings.weight.data.normal_(init_mean, init_std)
        self.v_embeddings.weight.data.normal_(init_mean, init_std)

    def forward(self, pos_u, pos_v, neg_v):

        emb_u = self.u_embeddings(pos_u)
        emb_v = self.v_embeddings(pos_v)

        neg_emb_v = self.v_embeddings(neg_v)

        score = torch.einsum("ij,ij->i", emb_u, emb_v)
        score = torch.sigmoid(score)

        neg_score = torch.einsum("ij,ikj->ik", emb_u, neg_emb_v)
        neg_score = torch.sigmoid(neg_score)

        return score, neg_score


    def forward_without_negatives(self, word1, word2):
        """This lets us do a simple vector comparison rather than doing a full forward pass."""""
        pos_u = torch.tensor([unique_dict[word1]])
        pos_v = torch.tensor([unique_dict[word2]])
        emb_u = self.u_embeddings(pos_u)
        emb_v = self.v_embeddings(pos_v)

        score = torch.einsum("ij,ij->i", emb_u, emb_v)
        score = torch.sigmoid(score)
        return score

    def get_dict_embeddings(self):
        return self.u_embeddings.weight.data.cpu().numpy()
    
    def get_embedding_from_word(self, word):
        index = unique_dict[word]
        return self.u_embeddings.weight.data[index]
    
    def get_embedding_from_index(self, index):
        return self.u_embeddings.weight.data[index]

    def save_embedding(self, file_name):
        # Save embedding lookup table as pkl file
        with open(file_name, 'wb') as f:
            pkl.dump(self.u_embeddings.weight.data.cpu().numpy(), f)
    
    def import_embeddings(self, file_name):
        with open(file_name, 'rb') as f:
            self.u_embeddings.weight.data = torch.from_numpy(pkl.load(f)).to(torch.float32)

Loss function

Here we create a custom loss function which gives us a loss based on the model's error when predicting a 1 or 0 for the context word or randomly sampled words.

We use a custom loss function because it allows us to add weight decay to our training and capture the specific nature of what we want the model to improve at, which in this case is relatedness of words.

def loss_function(score, neg_score, lr, weight_decay, model):
    pos_loss = -torch.mean(torch.log(score))
    neg_loss = -torch.mean(torch.sum(torch.log(1 - neg_score), dim=1))
    loss = pos_loss + neg_loss
    # add L2 regularization term
    l2_loss = 0
    for param in model.parameters():
        l2_loss += torch.sum(param**2)
        loss += weight_decay * l2_loss
    return loss

Training loop

This is the training loop for our model.

Our train loader iterator is declared every epoch and we then iterate over it according to our steps per epoch.

We generate our negative samples randomly at runtime as the cost of doing so is very low.

The parameters passed in for training have a massive impact on model performance. The length of negative samples should be somewhere between 5 and 20. Having a lower numbers means the model may stray into simply having all of its values tend to 1 which is not what we want. Thus a higher number is favoured.

def train(model, train_loader, batch_size, negative_sample_length, weight_decay, learning_rate, steps_per_epoch, epochs, dictionary_length):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    for epoch in range(epochs):
        loss_sum = 0
        train_loader_iter = iter(train_loader)
        for i in tqdm(range(steps_per_epoch)):
            x, y = next(train_loader_iter)
            pos_u = torch.tensor(x)
            pos_v = torch.tensor(y)
            neg_v = torch.randint(0, dictionary_length, (batch_size, negative_sample_length))
            optimizer.zero_grad()
            pos_score, neg_score = model(pos_u, pos_v, neg_v)
            loss = loss_function(pos_score, neg_score, learning_rate, weight_decay, model)
            loss.backward()
            optimizer.step()
            loss_sum += loss.item()
        print("Epoch: {}, Loss: {}".format(epoch, loss_sum / steps_per_epoch))
    return model

Instantiating our model

Here we instantiate our model with the embedding dimensions of our choice.

The embedding dimensions are critical because they dictate how many "features" our vectors can contain. More dimensions mean richer vectors, with the trade off being longer trading times.

I find that 100 is a good number for getting sensible results on vector comparison, but even having very few can produce decent results.

I instantiate this in a different step than running the training loop as I don't necessarily want to throw away my weights every time.

embedding_dim = 100
dictionary_length = len(unique_words)

model = SkipGramModel(dictionary_length, embedding_dim)

Training hyperparameters

The key hyperparameters we choose here are our window size, negative sample length, learning rate and weight decay.

  • Window size: Determines the size of the context we will include for each of our unique words. We will take in x words from the left and right. Having a smaller window size(2-15) gives us a sense of the interchangeability of the words, while a larger window size (15 - 50) gives us a sense of the relatedness of the words.

  • Negative sample length: To make the classifation task difficult for our model and avoid having it giving a score of 1 for relatedness on all words we need to have negative samples to make the problem harder. To get these samples we simply randomly sample from our dataset of unique words. The number of negative samples we use in each batch determines how many negative samples will be used for generating our negative score.

  • Learning rate: The degree to which we update our weights with each pass. In this case we want a fairly low learning rate as higher rates tend to cause the model to not converge.

  • Weight decay: Having some regularisation is important for this task as otherwise the weights tend to converge on very similar values. To avoid this we have the weights naturally decay over time. Small values seem to work for this task.

window_size = 2
batch_size = 32
learning_rate = 0.001
negative_sample_length = 15
weight_decay = 0.0006
steps_per_epoch = 300
epochs = 30

dataset = create_dataset(window_size , data) 
train_loader = torch.utils.data.DataLoader(dataset=dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)

train(model, train_loader, batch_size, negative_sample_length, weight_decay, learning_rate, steps_per_epoch, epochs, dictionary_length)

Testing

We now go to the testing phase to see how our model is performing.

Testing functions

The following functions primarily exist to add, subtract and compare vectors. The goal is to produce intuitive results from the comparisons of our vectors.

Eg the following should have a high correlation: flower and rose man and king

man and woman

queen and woman

The following should have a low correlation

Flower and cart

concept and dog

power and table

def cos_sim(vector1, vector2):
    return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

def cos_sim_word(word1, word2):
    vector1 = get_emb(word1)
    vector2 = get_emb(word2)
    return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

def get_emb(word):
    return model.get_embedding_from_word(word)

def invert_dictionary(dictionary):
    return {v: k for k, v in dictionary.items()}

def get_closest_vector(vector):
    max = 0
    target = None
    for key,item in unique_dict.items():
        comparative = get_emb(key)
        comparison = cos_sim(vector, comparative)
        if comparison > max:
            max = comparison
            target = key

        
    return target

print(cos_sim_word("flower", "rose"),("flower", "rose"))
print(cos_sim_word("flower", "tree"), ("flower", "tree"))
print(cos_sim_word("flower", "dog"), ("flower", "dog"))
print(cos_sim_word("flower", "metal"), ("flower", "metal"))
print(cos_sim_word("flower", "cart"), ("flower", "cart"))
print(cos_sim_word("worm", "dog"), ("worm", "dog"))
print(cos_sim_word("king", "queen"), ("king", "queen"))
print(cos_sim_word("king", "royalty"), ("king", "royalty"))
print(cos_sim_word("queen", "royalty"), ("queen", "royalty"))
print(cos_sim_word("man", "king"), ("man", "king"))
print(cos_sim_word("woman", "king"), ("woman", "king"))
print(cos_sim_word("child", "prince"), ("child", "prince"))
print(cos_sim_word("child", "thought"), ("child", "thought"))

0.31146833 ('flower', 'rose')
0.3206159 ('flower', 'tree')
0.6835493 ('flower', 'dog')
nan ('flower', 'metal')
nan ('flower', 'cart')
0.67522717 ('worm', 'dog')
0.9980375 ('king', 'queen')
-0.22151712 ('king', 'royalty')
-0.20813023 ('queen', 'royalty')
0.99990135 ('man', 'king')
0.9579921 ('woman', 'king')
0.9976936 ('child', 'prince')
0.9977772 ('child', 'thought')


/var/folders/xr/3pxy4p9914ld14l6mxvvknvw0000gn/T/ipykernel_36271/221498206.py:8: RuntimeWarning: invalid value encountered in float_scalars
  return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

This classic test still doesn't seem to work great

vector = get_emb("king") - get_emb("man") + get_emb("woman") 
closest_vector = get_closest_vector(vector)
print(closest_vector)
/var/folders/xr/3pxy4p9914ld14l6mxvvknvw0000gn/T/ipykernel_36271/221498206.py:3: RuntimeWarning: divide by zero encountered in float_scalars
  return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))


growing

Here we have the option to generate some common words. Currently I prefer using a small hand picked set


def find_most_common_words(words, n, m):
    word_counts = {}
    for word in words:
        if word not in word_counts:
            word_counts[word] = 0
        word_counts[word] += 1
    sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
    sorted_words = [word[0] for word in sorted_words]
    return sorted_words[n:m]

common_words = find_most_common_words(data, 100, 200)

Save the embeddings using any filename you like

path = "outputs/embeddings.emb"
model.save_embedding(path)

Import the embeddings if you simply wish to display them without training

model.import_embeddings("outputs/embeddings.emb")

Here we choose some possibly interesting words to look at in our dataset, based on my poor knowledge of Shakespeare

words = ["king","queen", "prince", "princess", "knight","skull", "sword", "skull", "flower", "tree", "fool", "metal", "cart", "worm", "boy", "witch"]

Displaying our results

Here we use principal component analysis to reduce our dimensions down to 2 so we can display them

from sklearn.decomposition import PCA

number_components = 2

embeddings = model.get_dict_embeddings()
pca = PCA(n_components=number_components)
reduced_embeddings= pca.fit_transform(embeddings)

Display the embeddings

display embeddings visually using matplotlib.

Each word is represented by a point in 2D space.

The x and y coordinates of the point are the first and second dimensions of the word's embedding.

The words are labeled by their actual word.

import matplotlib.pyplot as plt
plt.figure(figsize=(10, 10))

for word in words:
  coord = reduced_embeddings[unique_dict[word]]
  plt.scatter(coord[0], coord[1])
  plt.annotate(word, (coord[0], coord[1]))

plt.savefig('outputs/embeddings.png')

The following is a way of displaying the vectors and their embeddings clearly.

import matplotlib.pyplot as plt
from IPython.display import HTML, display

def plot_embeddings(words, number_dimensions, reduced_embeddings):
  # Get the embeddings for the given list of words
  embeddings = [reduced_embeddings[unique_dict[word]] for word in words]

  # Create a list to store the table rows
  rows = []

  # Plot the first x embeddings
  for word in words:
    embedding = reduced_embeddings[unique_dict[word]]

    # Convert the embedding array to a list and take the first 'number_dimensions' elements
    embedding = embedding.tolist()
    embedding = embedding[:number_dimensions]

    # Create a row with the word and the embeddings, followed by the color for each embedding
    row = [word] + [f"{e:.2f}" for e in embedding] 
    rows.append(row)

  # Use IPython's display function to display the table
  display(HTML(
      '<table><tr>{}</tr></table>'.format(
          '</tr><tr>'.join(
              '<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in rows)
          )
  ))

words = ["king","queen"]
plot_embeddings(words, 5, reduced_embeddings)

Conclusion

The following has been an initial exploration of word embeddings.

While I would like to revisit this kind of work in future and I am sure there are many improvements I could make there are other projects to move on to.

Any feedback is very welcome at bwm.sturgeon@gmail.com

I hope reading this has been a clear and useful insight into how to train a basic word2vec model using skipgram with negative sampling.

Questions, thoughts?

Comments, questions, thoughts?
Share them below:

Your name:

Your email:

Your words: