cerebras.modelzoo.data_preparation.data_preprocessing.data_preprocessor.DataPreprocessor#

class cerebras.modelzoo.data_preparation.data_preprocessing.data_preprocessor.DataPreprocessor(params)[source]#

Bases: object

Initialize the class with given parameters. :param params: Configuration parameters. :type params: dict

Methods

append_buffer_to_hdf5

Appends an entire buffer of chunk to multiple HDF5 files.

average_chars_and_bytes

calculate_total_chunks

Calculate the total number of chunks based on the given total size and the predefined max chunk size.

calculate_total_size

Calculate the total size of all input files, taking compression factors into consideration.

check_unused_params

Check for any unused parameters and log them as warnings.

estimate_queue_size

Estimates an optimal queue size based on the max_chunk_size and a fraction of available system memory.

file_split_process_dataset

Process the dataset by splitting files across multiple processes.

get_output_dir

Retrieve the output directory path.

get_params_file

Retrieve the path to the JSON parameters file.

get_vocab_size

Get tokenizer vocabulary size :returns: text to tokenize :rtype: vocab_size (int)

handle_input_files

Handle input files based on provided configuration.

human_readable_size

Convert a size in bytes to a human-readable format (e.g., KB, MB, GB).

initialize_customtokenizer

Initialize custom tokenizer.

initialize_gpt2tokenizer

Initialize GPT-2 tokenizer.

initialize_huggingfacetokenizer

Initialize Hugging Face tokenizer.

initialize_miscellaneous_attributes

Initialize miscellaneous attributes.

initialize_neoxtokenizer

Initialize Neox tokenizer.

initialize_tokenizer

Initialize tokenizer based on the provided tokenizer_type parameter.

load_dataset

Loads a dataset from a specified source and saves it in a specified format in the given directory, potentially within a subdirectory denoted by a 'split'.

load_format_hook_fn

process_dataset

Process the dataset either through file split or task split methods.

process_dataset_params

Process dataset specific parameters.

process_files

Process the given files, tokenize the data chunks, and save to HDF5 format.

process_params

Process parameters by calling various initialization methods.

process_processing_params

Process the processing parameters and initialize relevant class attributes.

process_setup_params

Setup the number of processes based on provided configuration.

read_checkpoint

This function reads the checkpoint args from the created checkpoint file.

reader_process

Reads data from input files and distributes them to the tokenizer queues.

save_buffer_to_hdf5

setup_output_directory

Set up the output directory based on provided configuration.

shuffle_second_pass

This function performs the second pass of shuffling.

split_shuffle_second_pass

This function divides the output hdf5 files into different processes and prepares them for the second pass of shuffling.

stats_collation

This function collates the stats obtained from the different writer processes into a combined final stats :param num_writer_processes: Number of writer processes

task_split_process_dataset

Split the dataset processing tasks across multiple processes.

tokenizer_process

Tokenizes data and forwards the tokenized data to the writer queue.

update_checkpoint

write_remaining_prefix

This function writes the prefix remaining after processing LMData when pack_sequences is set to true.

writer_process

Process that writes tokenized data to HDF5 format.

load_dataset(input_data_params)[source]#

Loads a dataset from a specified source and saves it in a specified format in the given directory, potentially within a subdirectory denoted by a ‘split’.

Args: input_data_params (Dict[str, Optional[str]]): Parameters for dataset loading

including ‘source’, ‘split’ (optional), and ‘format’.

Returns: str: The directory where the dataset has been saved.

Raises: ValueError: If the specified format is not supported.

process_params()[source]#

Process parameters by calling various initialization methods.

setup_output_directory()[source]#

Set up the output directory based on provided configuration.

handle_input_files()[source]#

Handle input files based on provided configuration.

process_setup_params()[source]#

Setup the number of processes based on provided configuration.

check_unused_params()[source]#

Check for any unused parameters and log them as warnings.

process_dataset_params()[source]#

Process dataset specific parameters.

estimate_queue_size(fraction_of_memory=0.5)[source]#

Estimates an optimal queue size based on the max_chunk_size and a fraction of available system memory.

Args: - fraction_of_memory: Fraction of available system memory to be used for queues.

Returns: - An integer representing the optimal queue size.

process_processing_params()[source]#

Process the processing parameters and initialize relevant class attributes.

initialize_tokenizer(processing_params)[source]#

Initialize tokenizer based on the provided tokenizer_type parameter.

Parameters

processing_params (Dict[str, Any]) – Dictionary of processing parameters.

initialize_gpt2tokenizer(tokenizer_params)[source]#

Initialize GPT-2 tokenizer.

Parameters

processing_params (Dict[str, Any]) – Dictionary of processing parameters.

initialize_neoxtokenizer(tokenizer_params)[source]#

Initialize Neox tokenizer.

Parameters

processing_params (Dict[str, Any]) – Dictionary of processing parameters.

initialize_huggingfacetokenizer(hf_tokenizer, tokenizer_params)[source]#

Initialize Hugging Face tokenizer.

Parameters
  • hf_tokenizer (str) – str: HuggingFace tokenizer name.

  • processing_params (Dict[str, Any]) – Dictionary of processing parameters.

initialize_customtokenizer(custom_tokenizer, tokenizer_params)[source]#

Initialize custom tokenizer.

Parameters
  • custom_tokenizer – str: Path to implemenation of custom tokenizer.

  • tokenizer_params (Dict[str, Any]) – (Dict[str, Any]): Dictionary of tokenizer parameters.

initialize_miscellaneous_attributes()[source]#

Initialize miscellaneous attributes.

get_params_file()[source]#

Retrieve the path to the JSON parameters file.

Returns

Path to the JSON parameters file.

Return type

str

get_output_dir()[source]#

Retrieve the output directory path.

Returns

Path to the output directory.

Return type

str

calculate_total_size()[source]#

Calculate the total size of all input files, taking compression factors into consideration.

Returns

The total size of all input files in bytes.

Return type

int

human_readable_size(size, decimal_places=2)[source]#

Convert a size in bytes to a human-readable format (e.g., KB, MB, GB).

Parameters
  • size (int) – Size in bytes.

  • decimal_places (int) – Number of decimal places for rounding.

Returns

Formatted size string.

Return type

str

calculate_total_chunks(total_size)[source]#

Calculate the total number of chunks based on the given total size and the predefined max chunk size.

Parameters

total_size (int) – The total size of the data in bytes.

Returns

Total number of chunks.

Return type

int

read_checkpoint(num_writers)[source]#

This function reads the checkpoint args from the created checkpoint file. :param num_writers: The number of writer processes

write_remaining_prefix(chunk_locks, pid)[source]#

This function writes the prefix remaining after processing LMData when pack_sequences is set to true. :param chunk_locks: List of locks for appending to hdf5 files during shuffling :param pid: Process id of the current process

shuffle_second_pass(file_list, progress_counter, pid)[source]#

This function performs the second pass of shuffling. :param file_list: List of hdf5 file paths to shuffle :param progress_counter: A shared counter to track progress across processes.

split_shuffle_second_pass()[source]#

This function divides the output hdf5 files into different processes and prepares them for the second pass of shuffling.

stats_collation(num_writer_processes)[source]#

This function collates the stats obtained from the different writer processes into a combined final stats :param num_writer_processes: Number of writer processes

process_files(file_paths, process_idx, checkpoint_args, progress_counter, chunk_locks)[source]#

Process the given files, tokenize the data chunks, and save to HDF5 format.

Parameters
  • file_paths – list of file_paths.

  • process_idx – Index of current process among all process spawned for file split

  • checkpoint_args (Tuple[int, int, int]) – File index, doc start index, and hdf5 index.

  • progress_counter (Value[int]) – Shared counter tracking number of processed chunks.

  • chunk_locks – List of locks for appending to hdf5 files during shuffling

file_split_process_dataset()[source]#

Process the dataset by splitting files across multiple processes.

reader_process(process_checkpoints)[source]#

Reads data from input files and distributes them to the tokenizer queues.

Parameters

checkpoint_args (List[Tuple[int, int, int, int, int]]) – List of File index, doc start index, start_chunk_nuber, num_chunks_written, num_sequences_written

tokenizer_process(idx)[source]#

Tokenizes data and forwards the tokenized data to the writer queue.

Parameters

idx (int) – Queue ID to forward tokenized chunks of data.

writer_process(progress_counter, num_sentinels, writer_idx, chunk_locks, process_checkpoints)[source]#

Process that writes tokenized data to HDF5 format.

Parameters
  • progress_counter (Value[int]) – Shared counter tracking number of processed chunks.

  • num_sentinels (int) – Number of sentinels to be received for the current writer process

  • writer_idx (int) – The index of the current writer process

  • chunk_locks (List[multiprocessing.context.BaseContext.Lock]) – List of locks for appending to hdf5 files during shuffling

  • process_checkpoints (Tuple) – Checkpoint for the current process. This is used for resuming from checkpoint.

task_split_process_dataset()[source]#

Split the dataset processing tasks across multiple processes.

process_dataset()[source]#

Process the dataset either through file split or task split methods.

get_vocab_size()[source]#

Get tokenizer vocabulary size :returns: text to tokenize :rtype: vocab_size (int)

append_buffer_to_hdf5(buffer, output_dir, chunk_locks, dtype='i4', compression='gzip')[source]#

Appends an entire buffer of chunk to multiple HDF5 files. This method is called as part of the shuffling process.