End-to-end Examples#
Define Dataloader and Input Functions#
A dataloader can be defined in a file separate from the model and main execution loop, as shown below:
dataloader.py#
import os
import torch
from torchvision import datasets, transforms
def get_mnist_dataset(train=True):
data_dir = os.path.join(os.getcwd(), 'mnist_dataset')
return datasets.MNIST(
data_dir,
train=train,
download=True,
transform=transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)),
transforms.Lambda(
lambda x: torch.as_tensor(x, dtype=torch.float16)
),
]
),
target_transform=transforms.Lambda(
lambda x: torch.as_tensor(x, dtype=torch.int32)
),
)
def input_fn_train(batch_size=4, drop_last=False):
train_dataset = get_mnist_dataset(train=True)
return torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size, drop_last=drop_last, shuffle=True,
)
def input_fn_eval(batch_size=4, drop_last=False):
eval_dataset = get_mnist_dataset(train=False)
return torch.utils.data.DataLoader(
eval_dataset, batch_size=batch_size, drop_last=drop_last, shuffle=False,
)
Training Example#
In the same directory as the dataloader, create a training script as follows:
training.py#
""" Example of training script for FC MNIST model on CSX with Weight Streaming. """
import logging
import os
import cerebras_pytorch as cstorch
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataloader import input_fn_train, input_fn_eval
class MNISTModel(nn.Module):
def __init__(self):
super().__init__()
self.fc_layers = []
input_size = 784
hidden_size = 50
depth = 10
hidden_sizes = [hidden_size] * depth
for hidden_size in hidden_sizes:
fc_layer = nn.Linear(input_size, hidden_size)
self.fc_layers.append(fc_layer)
input_size = hidden_size
self.fc_layers = nn.ModuleList(self.fc_layers)
self.last_layer = nn.Linear(input_size, 10)
self.nonlin = nn.ReLU()
self.dropout = nn.Dropout(p=0.0)
def forward(self, inputs):
x = torch.flatten(inputs, 1)
for fc_layer in self.fc_layers:
x = fc_layer(x)
x = self.nonlin(x)
x = self.dropout(x)
pred_logits = self.last_layer(x)
outputs = F.log_softmax(pred_logits, dim=1)
return outputs
# CONFIGURABLE VARIABLES FOR THIS SCRIPT
# Can optionally move these arguments to a params file and configure from there.
MODEL_DIR = "./"
COMPILE_ONLY = False
VALIDATE_ONLY = False
TRAINING_STEPS = 10
CKPT_STEPS = 5
LOG_STEPS = 5
# Checkpoint-related configurations
CHECKPOINT_STEPS = 5
IS_PRETRAINED_CHECKPOINT = False
def main_training_loop(cs_config: cstorch.utils.CSConfig):
"""Main training loop for FC MNIST model"""
torch.manual_seed(2023)
backend = cstorch.backend(
"CSX",
artifact_dir=MODEL_DIR,
compile_dir="./compile_dir",
compile_only=COMPILE_ONLY,
validate_only=VALIDATE_ONLY,
)
with backend.device:
model = MNISTModel()
model = cstorch.compile(model, backend)
# Define loss function for FC MNIST Model
loss_fn = torch.nn.NLLLoss()
# Define the optimizer used for training.
# This example will be using SGD from cerebras_pytorch.optim.Optimizer
# For a complete list of optimizers available in the Cerebras PyTorch API, please see
# https://docs.cerebras.net/en/latest/wsc/api/cerebras_pytorch/optim.html
optimizer = cstorch.optim.configure_optimizer(
optimizer_type="SGD", params=model.parameters(), lr=0.01, momentum=0.0,
)
# Optionally define the learning rate scheduler
# This example will be using LinearLR from cerebras_pytorch.optim.lr_scheduler
# For a complete list of lr schedulers available in the Cerebras PyTorch API, please see
# https://docs.cerebras.net/en/latest/wsc/api/cerebras_pytorch/optim.html#learning-rate-schedulers-in-cerebras-pytorch
lr_params = {
"scheduler": "Linear",
"initial_learning_rate": 0.01,
"end_learning_rate": 0.001,
"total_iters": 5,
}
lr_scheduler = cstorch.optim.configure_lr_scheduler(optimizer, lr_params)
# Define gradient scaling parameters.
grad_scaler = cstorch.amp.GradScaler(loss_scale="dynamic")
loss_values = []
total_steps = 0
@cstorch.step_closure
def accumulate_loss(loss):
nonlocal loss_values
nonlocal total_steps
loss_values.append(loss.item())
total_steps += 1
lr_values = []
@cstorch.step_closure
def save_learning_rate():
lr_values.append(lr_scheduler.get_last_lr())
# Define method for saving ckpts
@cstorch.checkpoint_closure
def save_checkpoint(step):
logging.info(f"Saving checkpoint at step {step}")
checkpoint_file = os.path.join(MODEL_DIR, f"checkpoint_{step}.mdl")
state_dict = {
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"grad_scalar": grad_scaler.state_dict(),
}
state_dict["global_step"] = step
cstorch.save(state_dict, checkpoint_file)
logging.info(f"Saved checkpoint {checkpoint_file}")
global_step = 0
# Define the actual training loop
@cstorch.trace
def training_step(batch):
inputs, targets = batch
outputs = model(inputs)
loss = loss_fn(outputs, targets)
cstorch.amp.optimizer_step(
loss, optimizer, grad_scaler,
)
lr_scheduler.step()
save_learning_rate()
accumulate_loss(loss)
# Save the loss value to be able to plot the loss curve
cstorch.summarize_scalar("loss", loss)
return loss
# Define post-training loop if you are interested in tracking summaries, etc.
writer = cstorch.utils.tensorboard.SummaryWriter(log_dir=os.path.join(MODEL_DIR, "train"))
@cstorch.step_closure
def post_training_step(loss):
if LOG_STEPS and global_step % LOG_STEPS == 0:
# Define the logging any way desired.
logging.info(
f"| Train: {model.device} "
f"Step={global_step}, "
f"Loss={loss.item():.5f}"
)
# Add handling for NaN values
if torch.isnan(loss).any().item():
raise ValueError(
"NaN loss detected. "
"Please try different hyperparameters "
"such as the learning rate, batch size, etc."
)
if torch.isinf(loss).any().item():
raise ValueError("inf loss detected.")
for group, lr in enumerate(lr_scheduler.get_last_lr()):
writer.add_scalar(f"lr.{group}", lr, global_step)
# PERFORM TRAINING LOOPS
batch_size = 4
dataloader = cstorch.utils.data.DataLoader(input_fn_train, batch_size)
executor = cstorch.utils.data.DataExecutor(
dataloader,
num_steps=TRAINING_STEPS,
checkpoint_steps=CHECKPOINT_STEPS,
writer=writer,
cs_config=cs_config,
)
for _, batch in enumerate(executor):
loss = training_step(batch)
global_step += 1
post_training_step(loss)
if CHECKPOINT_STEPS and global_step % CHECKPOINT_STEPS == 0:
save_checkpoint(global_step)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
os.makedirs(os.path.join(os.getcwd(),'mnist_dataset'), exist_ok=True)
cs_config = cstorch.utils.CSConfig(
mount_dirs=[os.getcwd()],
python_paths=[os.getcwd()],
max_wgt_servers=1,
num_workers_per_csx=1,
max_act_per_csx=1,
)
main_training_loop(cs_config)
Eval Example#
In the same directory as the dataloader, create a evaluation script as follows:
eval.py#
""" Example of training script for FC MNIST model on CSX with Weight Streaming. """
import logging
import os
import cerebras_pytorch as cstorch
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
class MNISTModel(nn.Module):
def __init__(self):
super().__init__()
self.fc_layers = []
input_size = 784
hidden_size = 50
depth = 10
hidden_sizes = [hidden_size] * depth
for hidden_size in hidden_sizes:
fc_layer = nn.Linear(input_size, hidden_size)
self.fc_layers.append(fc_layer)
input_size = hidden_size
self.fc_layers = nn.ModuleList(self.fc_layers)
self.last_layer = nn.Linear(input_size, 10)
self.nonlin = nn.ReLU()
self.dropout = nn.Dropout(p=0.0)
def forward(self, inputs):
x = torch.flatten(inputs, 1)
for fc_layer in self.fc_layers:
x = fc_layer(x)
x = self.nonlin(x)
x = self.dropout(x)
pred_logits = self.last_layer(x)
outputs = F.log_softmax(pred_logits, dim=1)
return outputs
# CONFIGURABLE VARIABLES FOR THIS SCRIPT
# Can optionally move these arguments to a params file and configure from there.
MODEL_DIR = "./"
COMPILE_ONLY = False
VALIDATE_ONLY = False
CKPT_STEPS = 5
# Checkpoint-related configurations
CHECKPOINT_STEPS = 5
CHECKPOINT_PATH_EVAL = None
def main_eval_loop(cs_config: cstorch.utils.CSConfig):
"""Main evaluation loop for the MNIST model."""
backend = cstorch.backend(
"CSX",
artifact_dir=MODEL_DIR,
compile_dir="./compile_dir",
compile_only=COMPILE_ONLY,
validate_only=VALIDATE_ONLY,
)
with backend.device:
model = MNISTModel()
model = cstorch.compile(model, backend)
def load_checkpoint(checkpoint_path):
state_dict = cstorch.load(checkpoint_path)
model.load_state_dict(state_dict["model"])
global_step = state_dict.get("global_step", 0)
return global_step
global_step = 0
if CHECKPOINT_PATH_EVAL is not None:
global_step = load_checkpoint(CHECKPOINT_PATH_EVAL)
else:
logging.info(
f"No checkpoint was provided, model parameters will be "
f"initialized randomly"
)
writer = cstorch.utils.tensorboard.SummaryWriter(log_dir=os.path.join(MODEL_DIR, "eval"))
# Define the accuracy use by the model for evaluation.
# This example shows two different eval metrics being used,
# accuracy and perplexity. NOTE: For a complete list of eval metrics
# available in the Cerebras PyTorch API, please see
# https://docs.cerebras.net/en/latest/wsc/api/cerebras_pytorch/metrics.html
accuracy = cstorch.metrics.AccuracyMetric("accuracy",)
perplexity = cstorch.metrics.PerplexityMetric("perplexity",)
# Define loss function for FC MNIST Model
loss_fn = torch.nn.NLLLoss()
@cstorch.trace
def eval_step(batch):
inputs, targets = batch
outputs = model(inputs).to(torch.float16)
loss = loss_fn(outputs, targets)
accuracy(
labels=targets.clone(), predictions=outputs.argmax(-1).int(),
)
perplexity(labels=targets.clone(), loss=loss)
return loss
total_loss = 0
total_steps = 0
@cstorch.step_closure
def post_eval_step(loss: torch.Tensor):
nonlocal total_loss
nonlocal total_steps
logging.info(
f"| Eval: {model.device} "
f"Step={global_step}, "
f"Loss={loss.item():.5f}"
)
if torch.isnan(loss).any().item():
raise ValueError("NaN loss detected.")
if torch.isinf(loss).any().item():
raise ValueError("inf loss detected.")
total_loss += loss.item()
total_steps += 1
cstorch.summarize_scalar("loss", loss)
# Perform evaluation loops
batch_size = 4
dataloader = cstorch.utils.data.DataLoader(input_fn_eval, batch_size)
executor = cstorch.utils.data.DataExecutor(
dataloader,
num_steps=TRAINING_STEPS,
checkpoint_steps=CHECKPOINT_STEPS,
writer=writer,
cs_config=cs_config,
)
for _, batch in enumerate(executor):
loss = eval_step(batch)
global_step += 1
post_eval_step(loss)
writer.add_scalar(f"Eval Accuracy", float(accuracy), global_step)
writer.add_scalar(f"Eval Perplexity", float(perplexity), global_step)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
os.makedirs(os.path.join(os.getcwd(),'mnist_dataset'), exist_ok=True)
cs_config = cstorch.utils.CSConfig(
mount_dirs=[os.getcwd()],
python_paths=[os.getcwd()],
max_wgt_servers=1,
num_workers_per_csx=1,
max_act_per_csx=1,
)
main_eval_loop(cs_config)