# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import math
import torch
from modelzoo.transformers.pytorch.input_utils import cluster_config
[docs]class CBSampler(torch.utils.data.Sampler):
"""
A sampler to handle sharding, batching, and skipping of map style datasets
intended for use on CSX. Sharding is performed in such a way that data order
is independent of the number of systems being used and the number of workers
per system.
"""
[docs] def __init__(
self,
data_source,
shuffle=True,
seed=None,
start_index=0,
shard=True,
batch_size=None,
drop_last=True,
num_samples=None,
):
"""
Create a sampler to handle shuffling in a deterministic and restartable
way as well as sharding.
Args:
data_source (torch.utils.data.Dataset): dataset to sample from
shuffle (bool): whether or not to shuffle the dataset
seed (int): The seed used to make shuffling deterministic
start_index (int): The index of the first sample to yield
shard (bool): Whether or not to shard the dataset across Cerebras
data streamer nodes
batch_size (int): The batch size to use to compute sharded indices
and group samples into batches. If `None`, no batching will be
performed. When running on worker nodes, this should be the
per-system batch size rather than the global batch size or the
microbatch size. The per-system batch size is defined as
`global_batch_size / num_csx` and can be found using the
`modelzoo.common.pytorch.input_utils.get_streaming_batch_size`
function. When running on the coordinator node, this should
be the global batch size. Again, the `get_streaming_batch_size`
function will return the appropriate result.
num_samples (int): The number of samples to shuffle over. In multi-
epoch training, it is common to set this to the total number
of samples that you plan to see in your training run to get
smoother loss curves and improved convergence.
"""
cluster_spec, _ = cluster_config()
_num_systems = cluster_spec.num_csx
if _num_systems > 1 and not drop_last:
raise ValueError(
f"`drop_last=False` is only supported on GPU. Please re-run "
f"with `drop_last=True`."
)
self.sampler = BaseSampler(
data_source,
shuffle=shuffle,
seed=seed,
start_index=start_index,
num_samples=num_samples,
)
if batch_size is not None:
self.sampler = BatchSampler(self.sampler, batch_size, drop_last)
if shard:
self.sampler = Sharder(self.sampler)
self.kwargs = {
"data_source": data_source,
"shuffle": shuffle,
"seed": seed,
"shard": shard,
"batch_size": batch_size,
"drop_last": drop_last,
}
def __iter__(self):
return iter(self.sampler)
def __len__(self):
return len(self.sampler)
[docs] def set_state(self, start_index):
"""
Sets the state of the sampler to continue deterministically from a prior
run.
Args:
start_index: the total number of samples streamed globally across
all workers from a previous run.
"""
self.__init__(**self.kwargs, start_index=start_index)
[docs]class BaseSampler(torch.utils.data.Sampler):
"""
Handle shuffling and skipping
"""
[docs] def __init__(
self,
data_source,
num_samples=None,
shuffle=True,
seed=None,
start_index=0,
):
self.data_source = data_source
self._num_samples = num_samples
if not isinstance(self.num_samples, int) or self.num_samples <= 0:
raise ValueError(
"num_samples should be a positive integer "
"value, but got num_samples={}".format(self.num_samples)
)
self._num_samples_frozen = self.num_samples
self.shuffle = shuffle
self.seed = seed
self.epoch = start_index // self.num_samples
self.start_index = start_index - self.num_samples * self.epoch
@property
def num_samples(self):
if self._num_samples is None:
return len(self.data_source)
return self._num_samples
def __iter__(self):
if self.num_samples != self._num_samples_frozen:
raise RuntimeError(
f"Data source passed into Sampler must have the same length "
f"every epoch. Original length was {self._num_samples_frozen}, "
f"new length is {self.num_samples}"
)
if self.shuffle:
gen = torch.Generator()
gen.manual_seed(self.seed + self.epoch)
if self.num_samples > len(self.data_source):
epochs = math.ceil(self.num_samples / len(self.data_source))
perm = torch.cat(
[
torch.arange(len(self.data_source))
for _ in range(epochs - 1)
]
)
perm = torch.cat(
(perm, torch.randperm(len(self.data_source), generator=gen))
)
perm = perm[: self.num_samples]
indices = torch.randperm(self.num_samples, generator=gen)
perm = perm[indices]
else:
perm = torch.randperm(len(self.data_source), generator=gen)
perm = perm[: self.num_samples]
perm = perm[self.start_index :]
yield from perm.tolist()
else:
yield from range(self.start_index, self.num_samples)
self.epoch += 1
self.start_index = 0
def __len__(self):
return self.num_samples - self.start_index
[docs]class Sharder(torch.utils.data.Sampler):
[docs] def __init__(self, data_source):
self.data_source = data_source
cluster_spec, worker_spec = cluster_config()
self.task_id = (
worker_spec.local_rank * cluster_spec.num_csx + worker_spec.wse_id
)
self.num_tasks = cluster_spec.num_workers_per_csx * cluster_spec.num_csx
self.first_task = 0
def __iter__(self):
n = len(self.data_source)
effective_task_id = (self.task_id - self.first_task) % self.num_tasks
for i, x in enumerate(self.data_source):
if i % self.num_tasks == effective_task_id:
yield x
self.first_task = (
self.first_task + (n % self.num_tasks)
) % self.num_tasks
def __len__(self):
effective_task_id = (self.task_id - self.first_task) % self.num_tasks
n = len(self.data_source)
l = n // self.num_tasks
if n % self.num_tasks > effective_task_id:
l += 1
return l
[docs]class BatchSampler(torch.utils.data.Sampler):
"""
A slight modification of the PyTorch batch sampler such that any samples not
yielded at the end of an epoch when `drop_last=True` will be yielded at the
start of the next epoch. This is necessary for shard-invariance.
Adapted from the PyTorch batch sampler
"""
[docs] def __init__(self, sampler, batch_size, drop_last):
if (
not isinstance(batch_size, int)
or isinstance(batch_size, bool)
or batch_size <= 0
):
raise ValueError(
"batch_size should be a positive integer value, "
"but got batch_size={}".format(batch_size)
)
if not isinstance(drop_last, bool):
raise ValueError(
"drop_last should be a boolean value, but got "
"drop_last={}".format(drop_last)
)
self.sampler = sampler
self.batch_size = batch_size
self.drop_last = drop_last
self.leftover_samples = []
def __iter__(self):
# Implemented based on the benchmarking in https://github.com/pytorch/pytorch/pull/76951
if self.drop_last:
sampler_iter = itertools.chain(self.leftover_samples, self.sampler)
while True:
try:
batch = []
for _ in range(self.batch_size):
batch.append(next(sampler_iter))
yield batch
except StopIteration:
self.leftover_samples = batch
break
else:
batch = [0] * self.batch_size
idx_in_batch = 0
for idx in self.sampler:
batch[idx_in_batch] = idx
idx_in_batch += 1
if idx_in_batch == self.batch_size:
yield batch
idx_in_batch = 0
batch = [0] * self.batch_size
if idx_in_batch > 0:
yield batch[:idx_in_batch]
def __len__(self):
if self.drop_last:
return (
len(self.sampler) + len(self.leftover_samples)
) // self.batch_size
else:
return (len(self.sampler) + self.batch_size - 1) // self.batch_size