Refine a BERT model on social media data
Data recovery and preparation
The dataset we will use comes from Kaggle, you can download it here: https://www.kaggle.com/datasets/farisdurrani/sentimentsearch (CC BY 4.0 license). In my experiments, I chose only the Facebook and Twitter datasets.
The following snippet will take the csv files and save 3 splits (training, validation and testing) where you want them. I recommend saving them to Google Cloud Storage.
You can run the script with:
python make_splits --output-dir gs://your-bucket/
import pandas as pd
import argparse
import numpy as np
from sklearn.model_selection import train_test_splitdef make_splits(output_dir):
df=pd.concat((
pd.read_csv("data/farisdurrani/twitter_filtered.csv"),
pd.read_csv("data/farisdurrani/facebook_filtered.csv")
))
df = df.dropna(subset=('sentiment'), axis=0)
df('Target') = df('sentiment').apply(lambda x: 1 if x==0 else np.sign(x)+1).astype(int)
df_train, df_ = train_test_split(df, stratify=df('Target'), test_size=0.2)
df_eval, df_test = train_test_split(df_, stratify=df_('Target'), test_size=0.5)
print(f"Files will be saved in {output_dir}")
df_train.to_csv(output_dir + "/train.csv", index=False)
df_eval.to_csv(output_dir + "/eval.csv", index=False)
df_test.to_csv(output_dir + "/test.csv", index=False)
print(f"Train : ({df_train.shape}) samples")
print(f"Val : ({df_eval.shape}) samples")
print(f"Test : ({df_test.shape}) samples")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--output-dir')
args, _ = parser.parse_known_args()
make_splits(args.output_dir)
The data should look something like this:
Using a small BERT pre-trained model
For our model, we will use a lightweight BERT model, BERT-Tiny. This model has already been pre-trained on a large amount of data, but not necessarily with social media data and not necessarily with the aim of performing sentiment analysis. That's why we're going to tweak it.
It contains only 2 layers with a dimension of 128 units, the full list of models is visible here if you want to take a bigger one.
Let's first create a main.py
file, with all the necessary modules:
import pandas as pd
import argparse
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
import logging
import os
os.environ("TFHUB_MODEL_LOAD_FORMAT") = "UNCOMPRESSED"def train_and_evaluate(**params):
pass
# will be updated as we go
Let's also write our requirements in a dedicated document requirements.txt
transformers==4.40.1
torch==2.2.2
pandas==2.0.3
scikit-learn==1.3.2
gcsfs
We will now load 2 parts to train our model:
- THE tokenizerwhich will take care of dividing the text inputs into tokens with which BERT was trained.
- THE model himself.
You can get both from Huggingface here. You can also upload them to Cloud Storage. This is what I did, and I will therefore load them with:
# Load pretrained tokenizers and bert model
tokenizer = BertTokenizer.from_pretrained('models/bert_uncased_L-2_H-128_A-2/vocab.txt')
model = BertModel.from_pretrained('models/bert_uncased_L-2_H-128_A-2')
Now let's add the following part to our file:
class SentimentBERT(nn.Module):
def __init__(self, bert_model):
super().__init__()
self.bert_module = bert_model
self.dropout = nn.Dropout(0.1)
self.final = nn.Linear(in_features=128, out_features=3, bias=True) # Uncomment the below if you only want to retrain certain layers.
# self.bert_module.requires_grad_(False)
# for param in self.bert_module.encoder.parameters():
# param.requires_grad = True
def forward(self, inputs):
ids, mask, token_type_ids = inputs('ids'), inputs('mask'), inputs('token_type_ids')
# print(ids.size(), mask.size(), token_type_ids.size())
x = self.bert_module(ids, mask, token_type_ids)
x = self.dropout(x('pooler_output'))
out = self.final(x)
return out
A quick break here. We have several options when it comes to reusing an existing template.
- Transfer learning : We freeze the model weights and use it as a “feature extractor”. We can therefore add additional layers downstream. This is frequently used in computer vision where models like VGG, Xception, etc. can be reused to train a custom model on small datasets.
- Fine tuning : we unfreeze all or part of the weights of the model and we retrain the model on a personalized dataset. This is the preferred approach when training personalized LLMs.
More details on transfer learning and fine-tuning here:
In the model, we have chosen to unfreeze the entire model, but feel free to freeze one or more layers of the pre-trained BERT module and see how that influences performance.
The key element here is to add a fully connected layer after the BERT module to “tie” it to our classification task, hence the final layer with 3 units. This will allow us to reuse the pre-trained BERT weights and adapt our model to our task.
Creating data loaders
To create the data loaders, we will need the Tokenizer loaded above. The Tokenizer takes a string as input, and returns several outputs among which we can find the tokens ('input_ids' in our case):
The BERT tokenizer is a bit special and will return several outputs, but the most important is the input_ids
: these are the tokens used to encode our sentence. It can be words, parts or words. For example, the word “look” can be made up of 2 tokens, “look” and “##ing”.
Now let's create a dataloader module that will manage our datasets:
class BertDataset(Dataset):
def __init__(self, df, tokenizer, max_length=100):
super(BertDataset, self).__init__()
self.df=df
self.tokenizer=tokenizer
self.target=self.df('Target')
self.max_length=max_lengthdef __len__(self):
return len(self.df)
def __getitem__(self, idx):
X = self.df('bodyText').values(idx)
y = self.target.values(idx)
inputs = self.tokenizer.encode_plus(
X,
pad_to_max_length=True,
add_special_tokens=True,
return_attention_mask=True,
max_length=self.max_length,
)
ids = inputs("input_ids")
token_type_ids = inputs("token_type_ids")
mask = inputs("attention_mask")
x = {
'ids': torch.tensor(ids, dtype=torch.long).to(DEVICE),
'mask': torch.tensor(mask, dtype=torch.long).to(DEVICE),
'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long).to(DEVICE)
}
y = torch.tensor(y, dtype=torch.long).to(DEVICE)
return x, y
Write the main script to train the model
Let us first define two functions to manage the training and evaluation stages:
def train(epoch, model, dataloader, loss_fn, optimizer, max_steps=None):
model.train()
total_acc, total_count = 0, 0
log_interval = 50
start_time = time.time()for idx, (inputs, label) in enumerate(dataloader):
optimizer.zero_grad()
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
loss.backward()
optimizer.step()
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
if idx % log_interval == 0:
elapsed = time.time() - start_time
print(
"Epoch {:3d} | {:5d}/{:5d} batches "
"| accuracy {:8.3f} | loss {:8.3f} ({:.3f}s)".format(
epoch, idx, len(dataloader), total_acc / total_count, loss.item(), elapsed
)
)
total_acc, total_count = 0, 0
start_time = time.time()
if max_steps is not None:
if idx == max_steps:
return {'loss': loss.item(), 'acc': total_acc / total_count}
return {'loss': loss.item(), 'acc': total_acc / total_count}
def evaluate(model, dataloader, loss_fn):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (inputs, label) in enumerate(dataloader):
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
return {'loss': loss.item(), 'acc': total_acc / total_count}
We're getting closer to getting our main script up and running. Let's sew the pieces together. We have:
- A
BertDataset
class to handle data loading - A
SentimentBERT
model that takes our Tiny-BERT model and adds an extra layer for our custom use case train()
Andeval()
functions to manage these steps- A
train_and_eval()
functions that bring everything together
We will use argparse
to be able to launch our script with arguments. These arguments are usually the training/evaluation/test files to run our model with any dataset, the path where our model will be stored, and the training-related parameters.
import pandas as pd
import time
import torch.nn as nn
import torch
import logging
import numpy as np
import argparsefrom torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
logging.basicConfig(format='%(asctime)s (%(levelname)s): %(message)s', level=logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
# --- CONSTANTS ---
BERT_MODEL_NAME = 'small_bert/bert_en_uncased_L-2_H-128_A-2'
if torch.cuda.is_available():
logging.info(f"GPU: {torch.cuda.get_device_name(0)} is available.")
DEVICE = torch.device('cuda')
else:
logging.info("No GPU available. Training will run on CPU.")
DEVICE = torch.device('cpu')
# --- Data preparation and tokenization ---
class BertDataset(Dataset):
def __init__(self, df, tokenizer, max_length=100):
super(BertDataset, self).__init__()
self.df=df
self.tokenizer=tokenizer
self.target=self.df('Target')
self.max_length=max_length
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
X = self.df('bodyText').values(idx)
y = self.target.values(idx)
inputs = self.tokenizer.encode_plus(
X,
pad_to_max_length=True,
add_special_tokens=True,
return_attention_mask=True,
max_length=self.max_length,
)
ids = inputs("input_ids")
token_type_ids = inputs("token_type_ids")
mask = inputs("attention_mask")
x = {
'ids': torch.tensor(ids, dtype=torch.long).to(DEVICE),
'mask': torch.tensor(mask, dtype=torch.long).to(DEVICE),
'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long).to(DEVICE)
}
y = torch.tensor(y, dtype=torch.long).to(DEVICE)
return x, y
# --- Model definition ---
class SentimentBERT(nn.Module):
def __init__(self, bert_model):
super().__init__()
self.bert_module = bert_model
self.dropout = nn.Dropout(0.1)
self.final = nn.Linear(in_features=128, out_features=3, bias=True)
def forward(self, inputs):
ids, mask, token_type_ids = inputs('ids'), inputs('mask'), inputs('token_type_ids')
x = self.bert_module(ids, mask, token_type_ids)
x = self.dropout(x('pooler_output'))
out = self.final(x)
return out
# --- Training loop ---
def train(epoch, model, dataloader, loss_fn, optimizer, max_steps=None):
model.train()
total_acc, total_count = 0, 0
log_interval = 50
start_time = time.time()
for idx, (inputs, label) in enumerate(dataloader):
optimizer.zero_grad()
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
loss.backward()
optimizer.step()
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
if idx % log_interval == 0:
elapsed = time.time() - start_time
print(
"Epoch {:3d} | {:5d}/{:5d} batches "
"| accuracy {:8.3f} | loss {:8.3f} ({:.3f}s)".format(
epoch, idx, len(dataloader), total_acc / total_count, loss.item(), elapsed
)
)
total_acc, total_count = 0, 0
start_time = time.time()
if max_steps is not None:
if idx == max_steps:
return {'loss': loss.item(), 'acc': total_acc / total_count}
return {'loss': loss.item(), 'acc': total_acc / total_count}
# --- Validation loop ---
def evaluate(model, dataloader, loss_fn):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (inputs, label) in enumerate(dataloader):
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
return {'loss': loss.item(), 'acc': total_acc / total_count}
# --- Main function ---
def train_and_evaluate(**params):
logging.info("running with the following params :")
logging.info(params)
# Load pretrained tokenizers and bert model
# update the paths to whichever you are using
tokenizer = BertTokenizer.from_pretrained('models/bert_uncased_L-2_H-128_A-2/vocab.txt')
model = BertModel.from_pretrained('models/bert_uncased_L-2_H-128_A-2')
# Training parameters
epochs = int(params.get('epochs'))
batch_size = int(params.get('batch_size'))
learning_rate = float(params.get('learning_rate'))
# Load the data
df_train = pd.read_csv(params.get('training_file'))
df_eval = pd.read_csv(params.get('validation_file'))
df_test = pd.read_csv(params.get('testing_file'))
# Create dataloaders
train_ds = BertDataset(df_train, tokenizer, max_length=100)
train_loader = DataLoader(dataset=train_ds,batch_size=batch_size, shuffle=True)
eval_ds = BertDataset(df_eval, tokenizer, max_length=100)
eval_loader = DataLoader(dataset=eval_ds,batch_size=batch_size)
test_ds = BertDataset(df_test, tokenizer, max_length=100)
test_loader = DataLoader(dataset=test_ds,batch_size=batch_size)
# Create the model
classifier = SentimentBERT(bert_model=model).to(DEVICE)
total_parameters = sum((np.prod(p.size()) for p in classifier.parameters()))
model_parameters = filter(lambda p: p.requires_grad, classifier.parameters())
params = sum((np.prod(p.size()) for p in model_parameters))
logging.info(f"Total params : {total_parameters} - Trainable : {params} ({params/total_parameters*100}% of total)")
# Optimizer and loss functions
optimizer = torch.optim.Adam((p for p in classifier.parameters() if p.requires_grad), learning_rate)
loss_fn = nn.CrossEntropyLoss()
# If dry run we only
logging.info(f'Training model with {BERT_MODEL_NAME}')
if args.dry_run:
logging.info("Dry run mode")
epochs = 1
steps_per_epoch = 1
else:
steps_per_epoch = None
# Action !
for epoch in range(1, epochs + 1):
epoch_start_time = time.time()
train_metrics = train(epoch, classifier, train_loader, loss_fn=loss_fn, optimizer=optimizer, max_steps=steps_per_epoch)
eval_metrics = evaluate(classifier, eval_loader, loss_fn=loss_fn)
print("-" * 59)
print(
"End of epoch {:3d} - time: {:5.2f}s - loss: {:.4f} - accuracy: {:.4f} - valid_loss: {:.4f} - valid accuracy {:.4f} ".format(
epoch, time.time() - epoch_start_time, train_metrics('loss'), train_metrics('acc'), eval_metrics('loss'), eval_metrics('acc')
)
)
print("-" * 59)
if args.dry_run:
# If dry run, we do not run the evaluation
return None
test_metrics = evaluate(classifier, test_loader, loss_fn=loss_fn)
metrics = {
'train': train_metrics,
'val': eval_metrics,
'test': test_metrics,
}
logging.info(metrics)
# save model and architecture to single file
if params.get('job_dir') is None:
logging.warning("No job dir provided, model will not be saved")
else:
logging.info("Saving model to {} ".format(params.get('job_dir')))
torch.save(classifier.state_dict(), params.get('job_dir'))
logging.info("Bye bye")
if __name__ == '__main__':
# Create arguments here
parser = argparse.ArgumentParser()
parser.add_argument('--training-file', required=True, type=str)
parser.add_argument('--validation-file', required=True, type=str)
parser.add_argument('--testing-file', type=str)
parser.add_argument('--job-dir', type=str)
parser.add_argument('--epochs', type=float, default=2)
parser.add_argument('--batch-size', type=float, default=1024)
parser.add_argument('--learning-rate', type=float, default=0.01)
parser.add_argument('--dry-run', action="store_true")
# Parse them
args, _ = parser.parse_known_args()
# Execute training
train_and_evaluate(**vars(args))
This is great, but unfortunately this model will take a long time to train. Indeed, with around 4.7M parameters to train, a step will take around 3s on a 16GB Macbook Pro with an Intel chip.
3 seconds per step can be quite long when you have 1238 steps to do and 10 epochs to complete…
No GPU, no party.