Source code for

# Copyright 2022 Cerebras Systems.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

Processor for PyTorch T5 training.
import csv
import os
import random
import sys
from functools import partial

import numpy as np
import torch

from cerebras.modelzoo.common.input_utils import (
from cerebras.modelzoo.common.registry import registry
from import (
from import build_vocab
from import (

[docs]@registry.register_datasetprocessor("T5DynamicDataProcessor") class T5DynamicDataProcessor( """ Reads text files containing the input text tokens, adds extra ids for language modeling task on the fly. :param str src_vocab_file: Path to file containing tokens of vocabulary, one token per line. :param str src_data_dir: Path to directory containing the output of, with all the files of tokenized data. :param int batch_size: Number of sequences per batch. Note that it is different between systems. :param bool shuffle, optional: If true the data will be shuffled before passing into the model. Recommended for training. Can be set to False for debugging. :param int shuffle_seed, optional: Sets random seed for the order of data shuffling. Allows for reproducibility while still shuffling data. :param int shuffle_buffer: Size of buffer used to store data before shuffling :param int extra_ids, optional: Number of sentinel tokens for T5 objective :param int src_max_sequence_length, optional: Largest possible sequence length for the input. If longer it will be truncated. All other sequences padded to this length. :param int tgt_max_sequence_length, optional: Largest possible sequence length for the labels. If longer it will be truncated. All other sequences padded to this length. :param int num_workers, optional: Number of processes that move data to the accelerator system, so that the system doesn't process data faster than it receives it. :param bool drop_last, optional: If the last batch is not the full size, i.e. the dataset could not divide evenly into the batch-size, do not use the last batch. :param int prefetch_factor, optional: Number of batch loaded in advance by each worker. :param bool persistent_workers, optional: If set, workers will not be shutdown after going through the dataset once. :param bool do_lower, optional: If set, will lowercase all tokens in vocabulary. T5's vocabulary is cased so this is not recommended. :param list buckets, optional: A list of boundaries for sequence lengths to bucket together in order to speed up VTS/VSL. :param bool dynamic_loss_weight, optional: If set, will divide the loss for a token by the length of the sequence that the token comes from. :param bool pack_sequences, optional: If set, will concatenate sequences so that computation is performed on real data rather than padding :param int num_documents_to_concatenate, optional: Specifies how many documents to pack together :param str oov_token, optional: Token for out-of-vocabulary words/sub-words :param str sos_token, optional: Token for start-of-sequence :param str eos_token, optional: Token for end-of-sequence :param str pad_token, optional: Token for padding :param int labels_pad_id, optional: Can set specific padding for labels :param int input_pad_id, optional: Can set specific padding for inputs :param bool mixed_precision, optional: If set, will use float16 rather than float32 when possible """
[docs] def __init__(self, params): super(T5DynamicDataProcessor, self).__init__() # Input params. self.meta_data = self.get_meta_data(params["src_data_dir"]) self.meta_data_values = list(self.meta_data.values()) self.meta_data_filenames = list(self.meta_data.keys()) # Please note the appending of [0]. self.meta_data_values_cum_sum = np.cumsum([0] + self.meta_data_values) self.num_examples = sum(map(int, self.meta_data.values())) self.batch_size = get_streaming_batch_size(params["batch_size"]) self.num_batches = self.num_examples // self.batch_size assert ( self.num_batches > 0 ), "Dataset does not contain enough samples for one batch. Please choose a smaller batch size" self.num_tasks = num_tasks() self.task_id = task_id() self.num_batch_per_task = self.num_batches // self.num_tasks assert ( self.num_batch_per_task > 0 ), "Dataset cannot be evenly distributed across the given tasks. Please choose fewer tasks to run with." self.num_examples_per_task = self.num_batch_per_task * self.batch_size self.files_in_task = get_data_for_task( self.task_id, self.meta_data_values_cum_sum, self.num_examples_per_task, self.meta_data_values, self.meta_data_filenames, ) self.shuffle = params.get("shuffle", True) self.shuffle_seed = params.get("shuffle_seed", None) self.np_rng = np.random.default_rng(self.shuffle_seed) self.shuffle_buffer = params.get("shuffle_buffer", 10 * self.batch_size) self.do_lower = params.get("do_lower", False) # buckets must be a list of boundaries or None to disable. Supplying an # integer for uniform bucketing is not currently supported. self.buckets = params.get("buckets", None) assert self.buckets is None or ( self.buckets and isinstance(self.buckets, list) ), f"buckets may be None or a non-empty list of boundaries. Got {self.buckets}" # Compute a loss weight for every batch to use in the averaging of # per-token loss across the batch. self.dynamic_loss_weight = params.get("dynamic_loss_weight") self.pack_sequences = params.get("pack_sequences", False) self.num_documents_to_concatenate = params.get( "num_documents_to_concatenate", 128 ) # Multi-processing params. self.num_workers = params.get("num_workers", 0) self.drop_last = params.get("drop_last", True) self.prefetch_factor = params.get("prefetch_factor", 10) self.persistent_workers = params.get("persistent_workers", True) # Get special tokens and tokens that should not be masked. self.special_tokens = { "oov_token": params.get("oov_token", "<unk>"), "sos_token": params.get("sos_token", "<s>"), "eos_token": params.get("eos_token", "</s>"), "pad_token": params.get("pad_token", "<pad>"), } if self.do_lower: self.special_tokens = { key: value.lower() for key, value in self.special_tokens.items() } # Get vocab file and size. self.src_vocab_file = params["src_vocab_file"] self.src_vocab_file, self.src_vocab_size = build_vocab( self.src_vocab_file, self.do_lower, self.special_tokens["oov_token"] ) # Updating input and model params to account extra ids # for T5 Language Modeling task. extra_ids = params.get("extra_ids", 0) self.src_vocab_size += extra_ids # Init tokenizer. self.src_tokenize = self.src_vocab_file.forward # Getting indices for special tokens. self.special_tokens_indices = { key: self.src_tokenize([value])[0] for key, value in self.special_tokens.items() } # Padding indices. # See self.labels_pad_id = params.get( "labels_pad_id", self.special_tokens_indices["pad_token"] ) self.input_pad_id = params.get( "input_pad_id", self.special_tokens_indices["pad_token"] ) self.attn_mask_pad_id = 0 assert all( pad >= 0 for pad in [ self.labels_pad_id, self.input_pad_id, self.attn_mask_pad_id, ] ), ( f"All padding must be non-negative, got" f" `labels_pad_id` = {self.labels_pad_id}, `input_pad_id` = {self.input_pad_id}," f" `attn_mask_pad_id` = {self.attn_mask_pad_id}." ) # Max sequence lengths size params. self.src_max_sequence_length = params["src_max_sequence_length"] self.tgt_max_sequence_length = params["tgt_max_sequence_length"] # Store params. self.data_buffer = [] self.text_files_per_task_per_worker = [] self.processed_buffers = 0
[docs] def get_meta_data(self, data_dir): """ Read data from meta files. :param str data_dir: Path to the input directory. :return: Processed meta data. """ if not isinstance(data_dir, list): data_dir = [data_dir] meta_data = {} for file_name in data_dir: meta_file = os.path.join(file_name, "meta.dat") assert os.path.exists( meta_file ), f"Meta file is missing in the input directory: {data_dir}." with open(meta_file, "r") as fin: for line in fin.readlines(): line = line.strip().split() # Format: src_file_path src_file_size meta_data[os.path.join(file_name, line[0])] = int(line[1]) return meta_data
[docs] def load_buffer(self): """ Generator to read samples of data. :returns: Yields data samples, one at a time. """ self.processed_buffers = 0 # The csv file might contain very huge fields, therefore increase the `field_size_limit`. csv.field_size_limit(sys.maxsize) while self.processed_buffers < len(self.text_files_per_task_per_worker): ( current_file_path, num_examples, start_id, ) = self.text_files_per_task_per_worker[self.processed_buffers] with open(current_file_path, "r", newline="") as fin: # Some fields contain zero bytes. data_reader = csv.reader( (x.replace("\0", "") for x in fin), delimiter="\n" ) for row_id, row in enumerate(data_reader): if row_id < start_id: continue if row_id >= start_id + num_examples: break yield row self.processed_buffers += 1
[docs] def get_single_item(self): """ Iterating over the data to construct input features. :return: A dict with training features: * np.array[int.32] input_ids: Numpy array with encoder input token indices. Shape: (`src_max_sequence_length`). * np.array[int.32] decoder_input_ids: Numpy array with decoder input token indices. Shape: (`tgt_max_sequence_length`). * np.array[int.32] attention_mask: Numpy array with attention mask for encoder. Shape: (`src_max_sequence_length`). * np.array[int.32] decoder_attention_mask: Numpy array with attention mask for decoder. Shape: (`tgt_max_sequence_length`). * np.array[int.32] labels: Numpy array with labels for teacher forcing mode. Shape: (`tgt_max_sequence_length`). """ # Shard the data across multiple processes. max_raw_sequence_len, max_target_len = get_raw_sequence_lengths( self.src_max_sequence_length ) if max_target_len > self.tgt_max_sequence_length: raise ValueError( f"Actual target sequence length must be less than max target " f"sequence length. Got {max_target_len} > " f"{self.tgt_max_sequence_length}. Please increase the max " f"target sequence length." ) dataset = self.load_buffer() # parse, tokenize, and select chunks from the input documents dataset = map( lambda x: parse_text("".join(x), do_lower=self.do_lower), dataset ) dataset = map(self.src_tokenize, dataset) dataset = map(partial(select_random_chunk, rng=self.np_rng), dataset) # pack sequences to reduce padding if self.pack_sequences: if self.shuffle: # shuffle before concatenation so that you get variety in which # documents get concatenated dataset = shuffle( dataset, self.shuffle_buffer // self.batch_size, self.rng ) dataset = concatenate_documents( dataset, num_to_concatenate=self.num_documents_to_concatenate, pad_id=self.input_pad_id, ) # split documents into sequences and format for input to T5 model dataset = flat_map( partial(split_sequences, length=max_raw_sequence_len), dataset ) dataset = filter(len, dataset) if self.shuffle: # shuffle after `split_sequences` so that sequences from the same # document aren't always consecutive dataset = shuffle(dataset, self.shuffle_buffer, self.rng) dataset = map( partial( construct_denoising_objective, vocab_size=self.src_vocab_size, sos_token=self.special_tokens_indices["sos_token"], eos_token=self.special_tokens_indices["eos_token"], rng=self.np_rng, ), dataset, ) dataset = map( lambda features: pad_t5_input_features( src_max_sequence_length=self.src_max_sequence_length, tgt_max_sequence_length=self.tgt_max_sequence_length, input_pad_id=self.input_pad_id, attn_mask_pad_id=self.attn_mask_pad_id, labels_pad_id=self.labels_pad_id, features=features, ), dataset, ) return dataset
[docs] def element_length_fn(self, features): """ Takes a single sample and returns the sequence length of that sample to be used for VTS bucketing. """ return np.sum(features["attention_mask"])
def __iter__(self): batched_dataset = bucketed_batch( self.get_single_item(), self.batch_size, buckets=self.buckets, element_length_fn=self.element_length_fn, drop_last=self.drop_last, seed=self.shuffle_seed, ) for batch in batched_dataset: if self.dynamic_loss_weight: scale = self.batch_size / torch.sum( batch["decoder_attention_mask"] ) batch["loss_weight"] = scale.expand(self.batch_size, 1) yield batch def _worker_init_fn(self, worker_id): worker_info = if worker_info is not None: worker_id = num_workers = worker_info.num_workers else: # Single-process worker_id = 0 num_workers = 1 self.processed_buffers = 0 if self.shuffle_seed is not None: self.shuffle_seed += worker_id + 1 self.rng = random.Random(self.shuffle_seed) self.np_rng = np.random.default_rng(self.shuffle_seed) # Shard the data across multiple processes. self.text_files_per_task_per_worker = shard_list_interleaved( self.files_in_task, worker_id, num_workers ) if self.shuffle: self.rng.shuffle(self.text_files_per_task_per_worker)
[docs] def create_dataloader(self): """ Classmethod to create the dataloader object. """ if not self.num_workers: self.prefetch_factor = None self.persistent_workers = False dataloader = self, batch_size=None, num_workers=self.num_workers, prefetch_factor=self.prefetch_factor, persistent_workers=self.persistent_workers, worker_init_fn=self._worker_init_fn, ) if self.num_workers == 0: self._worker_init_fn(0) return dataloader