Creating custom dataloaders#

This guide offers insights on adapting and optimizing PyTorch dataloaders for Cerebras systems, emphasizing the transition from conventional GPU dataloaders. It outlines the process for converting map-style and iterable-style datasets, ensuring consistent batch shapes and efficient data sharding. Key points include benchmarking dataloader performance and strategies to enhance speed, such as increasing parallelism and optimizing I/O operations. The guide is a crucial resource for developers aiming to optimize their dataloaders for the high-speed, parallel processing capabilities of Cerebras systems.

Converting a HuggingFace dataloader#

If your existing dataloader is implemented using the HuggingFace dataloader utilities, then a good first step is to look at our tools for converting HuggingFace datasets. This may be a good way to get a working solution up and running quickly.

A mental model for how data is fed to CS system#

Each CS system has one or more nodes whose sole responsibility is to feed data to the system. We’ll call these nodes “worker nodes” or input pre-processing servers. So if you have a cluster of 4 CS systems and have configured your run to use 2 workers per system, then in total you will have 8 worker nodes. Workers 0 and 1 will feed data to system 0, workers 2 and 3 will feed data to system 1, etc.

When you set up your model code, you supply a function that creates an instance of your dataloader. At run time, each worker node calls this function on setup to get its own independent copy of your dataloader. These worker nodes then start feeding batches to their respective CS systems in a round-robin fashion. In our 4 CS system 8 worker node example from above, this would mean that the first weight update would be performed using data from workers 0, 2, 4, and 6, then the second weight update step would be performed using data from workers 1, 3, 5, and 7.

Note

The number of workers is set on the runconfig parameters by the variable num_workers_per_csx. For more information about this flag in Cerebras Model Zoo models visit YAML params documentation.

Basic considerations for converting your dataloader#

There are a few details that you need to pay attention to when converting an existing GPU dataloader to run on CS system:

  • The shape and datatype of each tensor in each batch must stay the same throughout training. In practice, many dataloaders written for GPU already do this, but it’s good to double check this behavior and also to ensure that you set drop_last=True (yes, the batch dimension must also stay constant).

  • Since you stream data from each worker node independently, you need to make sure to shard your data so that you don’t encounter issues with data repetition. A given worker should have its own unique subset of the data which it is responsible for streaming, and this data should be disjoint from what other workers stream.

  • Speed matters. CS systems are really fast. That’s part of what makes them so useful. But all this speed means that sometimes dataloaders that are fast enough to run on GPU may end up bottlenecking speed when running on CS system. Later in this section, we will discuss more about Improve dataloader performance

  • If you’re not careful, your batches of data might get split up. Normally, samples in a batch are drawn IID from some global pool of samples, so we don’t really care about if batch boundary definitions are changed a little. But if your particular use case requires that samples that your dataloader batches together stay together, then you need to pay extra care. The batch size exposed to the dataloader is the global batch size, i.e. microbatch_size * num_systems * grad_accum_steps. However, under the hood the framework doesn’t care about what batch size the dataloader yields. Instead it repeatedly grabs the next chunk of microbatch_size samples from the data it has buffered and feeds these samples to the system. If this splitting of batches is unacceptable then you’ll need to implement some workarounds. However, this use case is both uncommon and advanced and so won’t be covered in detail here.

Example 1: converting a map-style dataset#

Lets get our hands dirty with some code. Consider the following dataset which reads a text file, tokenizes it, and slices it into samples using PyTorch map-style dataset paradigms.

import torch
import numpy as np

from tokenizers import Tokenizer

class TextDataset(torch.utils.data.Dataset):
    def __init__(self, input_file, sequence_length):
        self.sequence_length = sequence_length
        with open(input_file, "r") as f:
            text = f.read()
        tokenizer = Tokenizer.from_pretrained("gpt2")
        self.data = np.array(tokenizer.encode(text).ids, dtype=np.int32)
        self.data = [
            self.data[i : i + self.sequence_length + 1]
            for i in range(
                0, len(self.data) - self.sequence_length - 1, self.sequence_length
            )
        ]

    def __getitem__(self, i):
        x = self.data[i]
        return {
            "input_ids": x[:-1],
            "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
            "labels": x[1:],
       }

    def __len__(self):
        return (len(self.data) - 1) // self.sequence_length


dataloader = torch.utils.data.DataLoader(
    TextDataset("/path/to/data.txt", 128),
    batch_size=16,
    shuffle=True,
)

Modify the code for CS compatibility by setting drop_last = True and sharding the data.

For simpler data sharding, use the helper functions num_tasks and task_id from modelzoo.data.common.input_utils. They return the total number of worker nodes and the current node’s index, respectively.

To create a dataloader with data sharding, you can use the num_tasks and task_id functions for efficient distribution. Here’s how you might structure the dataloader setup:

import torch
import numpy as np

from tokenizers import Tokenizer
from modelzoo.transformers.pytorch.input_utils import num_tasks, task_id

class ShardedTextDataset(torch.utils.data.Dataset):
    def __init__(self, input_file, sequence_length):
       self.sequence_length = sequence_length
        with open(input_file, "r") as f:
            text = f.read()
        tokenizer = Tokenizer.from_pretrained("gpt2")
        self.data = np.array(tokenizer.encode(text).ids, dtype=np.int32)
        self.data = [
            self.data[i : i + self.sequence_length + 1]
            for i in range(
                0, len(self.data) - self.sequence_length - 1, self.sequence_length
            )
        ]
        self.data = self.data[task_id()::num_tasks()]

    def __getitem__(self, i):
        x = self.data[i]
        return {
            "input_ids": x[:-1],
            "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
            "labels": x[1:],
        }

    def __len__(self):
        return (len(self.data) - 1) // self.sequence_length


dataloader = torch.utils.data.DataLoader(
    ShardedTextDataset("/path/to/data.txt", 128),
    batch_size=16,
    shuffle=True,
    drop_last=True,
)

To simplify the dataloader adjustment for data sharding, you only need to modify two lines:

  • Dataset Sharding: Within the __init__ method of your dataset class, add a line to shard the data. This ensures each worker gets a unique subset:

self.data = self.data[task_id()::num_tasks()]
  • Drop Last Batch: When initializing the dataloader, set drop_last = True. This ensures that all distributed batches have the same size, avoiding issues with uneven batch sizes at the end of each epoch:

dataloader = DataLoader(dataset, batch_size=..., shuffle=..., drop_last=True)

By sharding the dataset in the __init__ method, you ensure that each worker node gets a distinct subset of the data, eliminating concerns about sample repetition within an epoch. Additionally, this approach works seamlessly on a GPU without needing different dataloader versions.

Furthermore, you can streamline this process using your original TextDataset alongside a Cerebras-defined sampler, which automates the sharding. By integrating this sampler, you delegate the complexity of data sharding to the sampler, simplifying your dataloader configuration and making your code more maintainable:

from modelzoo.data.common.h5_map_dataset.samplers import CBSampler


dataset = TextDataset("/path/to/data.txt", 128)
sampler = CBSampler(
    dataset,
    shuffle=True,
    seed=0,
    batch_size=16,
    drop_last=True,
)

dataloader = torch.utils.data.DataLoader(
    dataset, batch_sampler=sampler
)

This CBSampler approach comes with a few additional benefits such as a data order that is independent of the number of systems you use or the number of workers streaming to each system. As with the previous example, this approach also works on GPU.

Example 2: converting an iterable-style dataset#

Next lets walk through an example of adapting an iterable-style dataset. The following dataset written for GPU has similar functionality to our previous datasets except that instead of reading all the data up front from a single text file, data is distributed across several text files each of which is read, tokenized, and split at iteration time. For the sake of brevity, this example doesn’t support multiprocessing, but in practice many dataloaders of this type might shard their data for this type of parallelism using a worker_init_fn or similar.

import glob
import os

class ShardedIterableTextDataset(torch.utils.data.IterableDataset):
    def __init__(self, input_dir, sequence_length, seed):
        self.sequence_length = sequence_length
        self.files = sorted(glob.glob(os.path.join(input_dir, "*.txt")))
        self.rng = np.random.default_rng(seed)
        self.rng.shuffle(self.files)
        self.tokenizer = Tokenizer.from_pretrained("gpt2")

    def __iter__(self):
        for file_name in self.files:
            with open(file_name, "r") as f:
                text = f.read()
            data = np.array(self.tokenizer.encode(text).ids, dtype=np.int32)
            data = [
                data[i : i + self.sequence_length + 1]
                for i in range(
                    0, len(data) - self.sequence_length - 1, self.sequence_length
                )
            ]
            self.rng.shuffle(np.array(data))
            for x in data:
                yield {
                    "input_ids": x[:-1],
                    "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
                    "labels": x[1:],
                }

dataloader = torch.utils.data.DataLoader(
    ShardedIterableTextDataset("/path/to/data_dir", 128, seed=0), batch_size=16
)

As with the previous example, the main changes we need to make are with sharding and consistency of batch shapes. We achieve this in much the same way as we did with the map style dataset.

from modelzoo.data.common.input_utils import num_tasks, task_id

class IterableTextDataset(torch.utils.data.IterableDataset):
    def __init__(self, input_dir, sequence_length, seed):
        self.sequence_length = sequence_length
        self.files = sorted(glob.glob(os.path.join(input_dir, "*.txt")))
        self.files = self.files[task_id()::num_tasks()]
        self.rng = np.random.default_rng(seed)
        self.rng.shuffle(self.files)
        self.tokenizer = Tokenizer.from_pretrained("gpt2")

    def __iter__(self):
        for file_name in self.files:
            with open(file_name, "r") as f:
                text = f.read()
            data = np.array(self.tokenizer.encode(text).ids, dtype=np.int32)
            data = [
                data[i : i + self.sequence_length + 1]
                for i in range(
                    0, len(data) - self.sequence_length - 1, self.sequence_length
                )
            ]
            self.rng.shuffle(np.array(data))
            for x in data:
                yield {
                    "input_ids": x[:-1],
                    "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
                    "labels": x[1:],
                }

dataloader = torch.utils.data.DataLoader(
    IterableTextDataset("/path/to/data_dir", 128, seed=0), batch_size=16, drop_last=True
)

In this case, we shard the list of files to read instead of the preprocessed data itself. This way a worker node only reads and tokenizes the data it’s actually going to stream, which saves wasted work. Also note the placement of the sharding before any randomized operations. If we had elected not to seed our random number generator (by passing in seed=None) and had moved the line self.files = self.files[task_id()::num_tasks()] to the end of the __init__ function, then the order of each worker’s copy of self.files might be different, in which case each worker’s list of files post sharding might not be disjoint from the lists of the other workers. Some data would get repeated, and some data would be ignored all together, which might create undesirable convergence characteristics.

The only other change made was to specify drop_last=True to ensure that the model gets the consistency of data shape that it needs.

Again, this dataloader works for both CS system and GPU.

Improve dataloader performance#

As mentioned earlier, the speed of your dataloader can play a much larger role when trying to keep a CS system fed compared to when it only needs to feed a small number of GPUs. Luckily since all the dataloader code runs on the CPUs of the worker nodes, there’s nothing too out of the ordinary about optimizing it, and all the tricks you’re used to for optimizing the rest of the CPU code in your various applications will apply here. We’ve also built a few tools and compiled a list of a few tricks and tips to help streamline the process.

Is my dataloader too slow?#

Luckily, there’s an easy way to answer this question. If your dataloader is too slow to keep up with your model, you’ll see the following warning message

WARNING:   Input starvation detected
Please check dataloader throughput

If the problem persists, the following error will be thrown

ERROR:   Declaring stall due to input starvation, no change in status for 630 secs

This tells you that your model is too fast compared to the speed of your dataloader, and it’s time to start thinking of ways you can speed up your data pipeline. All things considered though, “my model is too fast” isn’t the worst problem to have, and it’s often easy enough to fix too.

Benchmarking#

To start with, if your dataloader is too slow the first step is to measure its current performance so you have a solid place to start iterating from. We’ve exposed a simple tool to help do this easily while reducing boilerplate code required.

import cerebras.pytorch as cstorch

def make_dataloader():
    dataset = ShardedIterableTextDataset("/path/to/data_dir", 128, seed=0)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=16, drop_last=True)
    return dataloader

cstorch.utils.benchmark.benchmark_dataloader(
    input_fn=make_dataloader,
    num_epochs=1,
    steps_per_epoch=1000,
)

Note

To execute code using cerebras_pytorch, use the Cerebras virtual environment. Instructions to set it up can be found here.

The cerebras.pytorch.utils.benchmark.benchmark_dataloader function takes a function that creates a new dataloader as well as some arguments specifying how many steps to run for and similar. The output will look something like this

##################################################################
################## Dataloader Benchmark Results ##################
##################################################################

Overall statistics
------------------
  Dataloader creation time: 178.2ms
  Total iteration time: 141.7ms
  Total steps: 41
  Total throughput: 289.28 steps/s (4628.52 samples/s)


Epoch statistics
----------------
        Steps Sample Points Iterator Creation Epoch Iteration Local steps/s             Global steps/s
                                                                        min   avg   max            min   avg   max
Epoch 1    41             1           166.6us         141.3ms         33.52 33.52 33.52          33.27 33.27 33.27


Unique Batch Specs
-------------------
  First occurence: epoch=1, epoch step=1
  Total number of occurences: 41
  PyTree:
    { 'attention_mask': 'TensorSpec(shape=(16, 128), dtype=torch.int32)',
      'input_ids': 'TensorSpec(shape=(16, 128), dtype=torch.int32)',
      'labels': 'TensorSpec(shape=(16, 128), dtype=torch.int32)'}

The dataloader benchmark results include

  • Overall statistics summary is the first place to look for useful information on the performance of your dataloader, including:

    • Total throughput line gives a view of the average speed,

    • Dataloader creation time tells you how long you’ll have to wait around for your dataloader to start up when you first launch your run.

  • Epoch statistics section shows a more detailed breakdown of the runtime.

  • Unique Batch Specs section helps to sanity check the shape and data type of the batches that get fed to the model.

Evaluating data loaders using simplified datasets#

You may notice that this report shows that the profiling job ran for 41 steps even though we specified 1000. This is because the example profiling job above was run with a small “toy” dataset instead of something more production-scale, so there aren’t many samples in the toy dataset overall. This isn’t uncommon for real-world profiling scenarios, and it leads nicely to our next topic of discussion.

One common pitfall of profiling i/o bound dataloaders using toy datasets is caching. Your OS is constantly trying to be smart and speed up your workloads as much as possible. When you write a small amount of data to disk, the OS will often cache this data in memory so that if you access it again soon, the OS can just grab the data from memory instead of doing an expensive disk read. This is awesome most of the time as it makes your code invisibly faster.

However, the OS doesn’t know when you’re running a benchmark, so it’ll also speed up your benchmarking code. These optimizations will make the benchmarking code unrepresentative of the workload you’re ultimately trying to learn about. The details of how to work around this type of issue are beyond the scope of this tutorial.

Note

If your disk performance seems too good to be true for a toy dataset, it probably is. It might be time to dig deeper into what’s actually happening (e.g. using a tool like iostat).

Optimization Strategies for Data loaders#

Now you have a handle on how fast your dataloader is running. If you decide that this speed isn’t fast enough, there are a couple of common tricks that are good to have in mind to supplement all the other speedup methods you might be thinking of.

  • Increase parallelism for compute bound workloads. You can do this either by increasing the number of worker processes used by your PyTorch dataloader itself or by running this dataloader across more worker nodes at the same time. With compute bound workloads this can easily increase your throughput several fold, but unfortunately it might not do much if your workload is i/o bound.

  • Move as much of the work as possible to pre-processing. For example, our toy datasets above tokenized the data at runtime. However, you could consider tokenizing the data at pre-processing time and reading the already tokenized data from disk at runtime. You might even consider pre-processing to one of the formats expected by our dataloaders implemented in ModelZoo if you’re interested in saving some time writing extra code.

  • Be mindful of where you shard. As mentioned above, it’s usually worth while to shard your data between workers as early in the pipeline as possible. This reduces the repeated work between your parallel processes.

  • Optimize disk access patterns for i/o bound workloads. If your data pipeline requires frequently reading small chunks of data from random locations on disk, this can reduce the total throughput of what you’re reading from disk. In many application, this won’t end up mattering, but if your profiling suggests that disk access is your primary bottleneck, then you might consider optimizing your disk access patterns. Sequential reads are much faster than random reads, so if you can think of any tricks (like shuffling your data at preprocessing time rather than runtime) that make your disk access pattern more regular, these can often speed up your i/o performance substantially.