Creating Custom Dataloaders#

This tutorial will walk you through the process of converting a dataloader that already works on GPU to something that will run on CS system.

Converting a HuggingFace dataloader#

If your existing dataloader is implemented using the HuggingFace dataloader utilities, then a good first step is to look at our tools for converting HuggingFace datasets. This may be a good way to get a working solution up and running quickly.

A mental model for how data is fed to CS system#

Each CS system has one or more nodes whose sole responsibility is to feed data to the system. We’ll call these nodes “worker nodes” or input pre-processing servers. So if you have a cluster of 4 CS systems and have configured your run to use 2 workers per system, then in total you will have 8 worker nodes. Workers 0 and 1 will feed data to system 0, workers 2 and 3 will feed data to system 1, etc.

When you set up your model code, you supply a function that creates an instance of your dataloader. At run time, each worker node calls this function on setup to get its own independent copy of your dataloader. These worker nodes then start feeding batches to their respective CS systems in a round-robin fashion. In our 4 CS system 8 worker node example from above, this would mean that the first weight update would be performed using data from workers 0, 2, 4, and 6, then the second weight update step would be performed using data from workers 1, 3, 5, and 7.

Note

The number of workers is set on the runconfig parameters by the variable num_workers_per_csx. For more information about this flag in Cerebras Model Zoo models visit YAML params documentation.

Basic considerations for converting your dataloader#

There are a few details that you need to pay attention to when converting an existing GPU dataloader to run on CS system:

  • The shape and datatype of each tensor in each batch must stay the same throughout training. In practice, many dataloaders written for GPU already do this, but it’s good to double check this behavior and also to ensure that you set drop_last=True (yes, the batch dimension must also stay constant).

  • Since you stream data from each worker node independently, you need to make sure to shard your data so that you don’t encounter issues with data repetition. A given worker should have its own unique subset of the data which it is responsible for streaming, and this data should be disjoint from what other workers stream.

  • Speed matters. CS systems are really fast. That’s part of what makes them so useful. But all this speed means that sometimes dataloaders that are fast enough to run on GPU may end up bottlenecking speed when running on CS system. Later in this section, we will discuss more about Improve dataloader performance

  • If you’re not careful, your batches of data might get split up. Normally, samples in a batch are drawn IID from some global pool of samples, so we don’t really care about if batch boundary definitions are changed a little. But if your particular use case requires that samples that your dataloader batches together stay together, then you need to pay extra care. The batch size exposed to the dataloader is the global batch size, i.e. microbatch_size * num_systems * grad_accum_steps. However, under the hood the framework doesn’t care about what batch size the dataloader yields. Instead it repeatedly grabs the next chunk of microbatch_size samples from the data it has buffered and feeds these samples to the system. If this splitting of batches is unacceptable then you’ll need to implement some workarounds. However, this use case is both uncommon and advanced and so won’t be covered in detail here.

Example 1: converting a map-style dataset#

Lets get our hands dirty with some code. Consider the following dataset which reads a text file, tokenizes it, and slices it into samples using PyTorch map-style dataset paradigms.

import torch
import numpy as np

from tokenizers import Tokenizer

class TextDataset(torch.utils.data.Dataset):
    def __init__(self, input_file, sequence_length):
        self.sequence_length = sequence_length
        with open(input_file, "r") as f:
            text = f.read()
        tokenizer = Tokenizer.from_pretrained("gpt2")
        self.data = np.array(tokenizer.encode(text).ids, dtype=np.int32)
        self.data = [
            self.data[i : i + self.sequence_length + 1]
            for i in range(
                0, len(self.data) - self.sequence_length - 1, self.sequence_length
            )
        ]

    def __getitem__(self, i):
        x = self.data[i]
        return {
            "input_ids": x[:-1],
            "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
            "labels": x[1:],
       }

    def __len__(self):
        return (len(self.data) - 1) // self.sequence_length


dataloader = torch.utils.data.DataLoader(
    TextDataset("/path/to/data.txt", 128),
    batch_size=16,
    shuffle=True,
)

There are only a few minor changes needed to make this code CS compatible, namely specifying drop_last = True and sharding the data. To make data sharding easier, the two helpers num_tasks and task_id (in modelzoo.transformers.pytorch.input_utils) return the total number of worker nodes and the index of the current worker node respectively. Using these functions our next pass at creating a dataloader might be

import torch
import numpy as np

from tokenizers import Tokenizer
from modelzoo.transformers.pytorch.input_utils import num_tasks, task_id

class ShardedTextDataset(torch.utils.data.Dataset):
    def __init__(self, input_file, sequence_length):
       self.sequence_length = sequence_length
        with open(input_file, "r") as f:
            text = f.read()
        tokenizer = Tokenizer.from_pretrained("gpt2")
        self.data = np.array(tokenizer.encode(text).ids, dtype=np.int32)
        self.data = [
            self.data[i : i + self.sequence_length + 1]
            for i in range(
                0, len(self.data) - self.sequence_length - 1, self.sequence_length
            )
        ]
        self.data = self.data[task_id()::num_tasks()]

    def __getitem__(self, i):
        x = self.data[i]
        return {
            "input_ids": x[:-1],
            "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
            "labels": x[1:],
        }

    def __len__(self):
        return (len(self.data) - 1) // self.sequence_length


dataloader = torch.utils.data.DataLoader(
    ShardedTextDataset("/path/to/data.txt", 128),
    batch_size=16,
    shuffle=True,
    drop_last=True,
)

This is only a two line change from what we started with, namely the last line of __init__ and the setting drop_last = True. Since with self.data = self.data[task_id()::num_tasks()] we ensure that each worker has a unique subset of the data, we no longer have to worry about samples being repeated within an epoch. Note that this dataset will still work perfectly well on GPU, so there’s no need to maintain several versions of your dataloaders.

However, we can actually do this conversion even more easily by using our original TextDataset implementation in combination with a Cerebras defined sampler which handles the sharding under the hood for you.

from modelzoo.transformers.data_processing.h5_map_dataset.samplers import CBSampler


dataset = TextDataset("/path/to/data.txt", 128)
sampler = CBSampler(
    dataset,
    shuffle=True,
    seed=0,
    batch_size=16,
    drop_last=True,
)

dataloader = torch.utils.data.DataLoader(
    dataset, batch_sampler=sampler
)

This CBSampler approach comes with a few additional benefits such as a data order that is independent of the number of systems you use or the number of workers streaming to each system. As with the previous example, this approach also works on GPU.

Example 2: converting an iterable-style dataset#

Next lets walk through an example of adapting an iterable-style dataset. The following dataset written for GPU has similar functionality to our previous datasets except that instead of reading all the data up front from a single text file, data is distributed across several text files each of which is read, tokenized, and split at iteration time. For the sake of brevity, this example doesn’t support multiprocessing, but in practice many dataloaders of this type might shard their data for this type of parallelism using a worker_init_fn or similar.

import glob
import os

class ShardedIterableTextDataset(torch.utils.data.IterableDataset):
    def __init__(self, input_dir, sequence_length, seed):
        self.sequence_length = sequence_length
        self.files = sorted(glob.glob(os.path.join(input_dir, "*.txt")))
        self.rng = np.random.default_rng(seed)
        self.rng.shuffle(self.files)
        self.tokenizer = Tokenizer.from_pretrained("gpt2")

    def __iter__(self):
        for file_name in self.files:
            with open(file_name, "r") as f:
                text = f.read()
            data = np.array(self.tokenizer.encode(text).ids, dtype=np.int32)
            data = [
                data[i : i + self.sequence_length + 1]
                for i in range(
                    0, len(data) - self.sequence_length - 1, self.sequence_length
                )
            ]
            self.rng.shuffle(np.array(data))
            for x in data:
                yield {
                    "input_ids": x[:-1],
                    "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
                    "labels": x[1:],
                }

dataloader = torch.utils.data.DataLoader(
    ShardedIterableTextDataset("/path/to/data_dir", 128, seed=0), batch_size=16
)

As with the previous example, the main changes we need to make are with sharding and consistency of batch shapes. We achieve this in much the same way as we did with the map style dataset.

from modelzoo.transformers.pytorch.input_utils import num_tasks, task_id

class IterableTextDataset(torch.utils.data.IterableDataset):
    def __init__(self, input_dir, sequence_length, seed):
        self.sequence_length = sequence_length
        self.files = sorted(glob.glob(os.path.join(input_dir, "*.txt")))
        self.files = self.files[task_id()::num_tasks()]
        self.rng = np.random.default_rng(seed)
        self.rng.shuffle(self.files)
        self.tokenizer = Tokenizer.from_pretrained("gpt2")

    def __iter__(self):
        for file_name in self.files:
            with open(file_name, "r") as f:
                text = f.read()
            data = np.array(self.tokenizer.encode(text).ids, dtype=np.int32)
            data = [
                data[i : i + self.sequence_length + 1]
                for i in range(
                    0, len(data) - self.sequence_length - 1, self.sequence_length
                )
            ]
            self.rng.shuffle(np.array(data))
            for x in data:
                yield {
                    "input_ids": x[:-1],
                    "attention_mask": np.ones(self.sequence_length, dtype=np.int32),
                    "labels": x[1:],
                }

dataloader = torch.utils.data.DataLoader(
    IterableTextDataset("/path/to/data_dir", 128, seed=0), batch_size=16, drop_last=True
)

In this case, we shard the list of files to read instead of the preprocessed data itself. This way a worker node only reads and tokenizes the data it’s actually going to stream, which saves wasted work. Also note the placement of the sharding before any randomized operations. If we had elected not to seed our random number generator (by passing in seed=None) and had moved the line self.files = self.files[task_id()::num_tasks()] to the end of the __init__ function, then the order of each worker’s copy of self.files might be different, in which case each worker’s list of files post sharding might not be disjoint from the lists of the other workers. Some data would get repeated, and some data would be ignored all together, which might create undesirable convergence characteristics.

The only other change made was to specify drop_last=True to ensure that the model gets the consistency of data shape that it needs.

Again, this dataloader works for both CS system and GPU.

Improve dataloader performance#

As mentioned earlier, the speed of your dataloader can play a much larger role when trying to keep a CS system fed compared to when it only needs to feed a small number of GPUs. Luckily since all the dataloader code runs on the CPUs of the worker nodes, there’s nothing too out of the ordinary about optimizing it, and all the tricks you’re used to for optimizing the rest of the CPU code in your various applications will apply here. We’ve also built a few tools and compiled a list of a few tricks and tips to help streamline the process.

Is my dataloader too slow?#

Luckily, there’s an easy way to answer this question. If your dataloader is too slow to keep up with your model, you’ll see the following warning message

WARNING:   Input starvation detected
Please check dataloader throughput

If the problem persists, the following error will be thrown

ERROR:   Declaring stall due to input starvation, no change in status for 630 secs

This tells you that your model is too fast compared to the speed of your dataloader, and it’s time to start thinking of ways you can speed up your data pipeline. All things considered though, “my model is too fast” isn’t the worst problem to have, and it’s often easy enough to fix too.

Benchmarking#

To start with, if your dataloader is too slow the first step is to measure its current performance so you have a solid place to start iterating from. We’ve exposed a simple tool to help do this easily while reducing boilerplate code required.

import cerebras_pytorch as cstorch

def make_dataloader():
    dataset = ShardedIterableTextDataset("/path/to/data_dir", 128, seed=0)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=16, drop_last=True)
    return dataloader

cstorch.utils.benchmark.benchmark_dataloader(
    input_fn=make_dataloader,
    num_epochs=1,
    steps_per_epoch=1000,
)

Note

To execute code using cerebras_pytorch, use the Cerebras virtual environment. Instructions to set it up can be found here.

The cerebras_pytorch.utils.benchmark.benchmark_dataloader function takes a function that creates a new dataloader as well as some arguments specifying how many steps to run for and similar. The output will look something like this

##################################################################
################## Dataloader Benchmark Results ##################
##################################################################

Overall statistics
------------------
  Dataloader creation time: 178.2ms
  Total iteration time: 141.7ms
  Total steps: 41
  Total throughput: 289.28 steps/s (4628.52 samples/s)


Epoch statistics
----------------
        Steps Sample Points Iterator Creation Epoch Iteration Local steps/s             Global steps/s
                                                                        min   avg   max            min   avg   max
Epoch 1    41             1           166.6us         141.3ms         33.52 33.52 33.52          33.27 33.27 33.27


Unique Batch Specs
-------------------
  First occurence: epoch=1, epoch step=1
  Total number of occurences: 41
  PyTree:
    { 'attention_mask': 'TensorSpec(shape=(16, 128), dtype=torch.int32)',
      'input_ids': 'TensorSpec(shape=(16, 128), dtype=torch.int32)',
      'labels': 'TensorSpec(shape=(16, 128), dtype=torch.int32)'}

The dataloader benchmark results include

  • Overall statistics summary is the first place to look for useful information on the performance of your dataloader, including:

    • Total throughput line gives a view of the average speed,

    • Dataloader creation time tells you how long you’ll have to wait around for your dataloader to start up when you first launch your run.

  • Epoch statistics section shows a more detailed breakdown of the runtime.

  • Unique Batch Specs section helps to sanity check the shape and data type of the batches that get fed to the model.

Benchmarking with toy datasets#

You may notice that this report shows that the profiling job ran for 41 steps even though we specified 1000. This is because the example profiling job above was run with a small toy dataset instead of something more production-scale, so there aren’t many samples in the toy dataset overall. This isn’t uncommon for real-world profiling scenarios, and it leads nicely to our next topic of discussion.

One common pitfall of profiling i/o bound dataloaders using toy datasets is caching. Your OS is constantly trying to be smart and speed up your workloads as much as possible. When you write a small amount of data to disk, the OS will often cache this data in memory so that if you access it again soon, the OS can just grab the data from memory instead of doing an expensive disk read. This is awesome most of the time as it makes your code invisibly faster.

However, the OS doesn’t know when you’re running a benchmark, so it’ll also speed up your benchmarking code. These optimizations will make the benchmarking code unrepresentative of the workload you’re ultimately trying to learn about. The details of how to work around this type of issue are beyond the scope of this tutorial.

Note

If your disk performance seems too good to be true for a toy dataset, it probably is. It might be time to dig deeper into what’s actually happening (e.g. using a tool like iostat).

Common tricks#

Now you have a handle on how fast your dataloader is running. If you decide that this speed isn’t fast enough, there are a couple of common tricks that are good to have in mind to supplement all the other speedup methods you might be thinking of.

  • Increase parallelism for compute bound workloads. You can do this either by increasing the number of worker processes used by your PyTorch dataloader itself or by running this dataloader across more worker nodes at the same time. With compute bound workloads this can easily increase your throughput several fold, but unfortunately it might not do much if your workload is i/o bound.

  • Move as much of the work as possible to pre-processing. For example, our toy datasets above tokenized the data at runtime. However, you could consider tokenizing the data at pre-processing time and reading the already tokenized data from disk at runtime. You might even consider pre-processing to one of the formats expected by our dataloaders implemented in ModelZoo if you’re interested in saving some time writing extra code.

  • Be mindful of where you shard. As mentioned above, it’s usually worth while to shard your data between workers as early in the pipeline as possible. This reduces the repeated work between your parallel processes.

  • Optimize disk access patterns for i/o bound workloads. If your data pipeline requires frequently reading small chunks of data from random locations on disk, this can reduce the total throughput of what you’re reading from disk. In many application, this won’t end up mattering, but if your profiling suggests that disk access is your primary bottleneck, then you might consider optimizing your disk access patterns. Sequential reads are much faster than random reads, so if you can think of any tricks (like shuffling your data at preprocessing time rather than runtime) that make your disk access pattern more regular, these can often speed up your i/o performance substantially.