Source code for cerebras.modelzoo.data_preparation.nlp.chunk_data_processing.lm_data_token_generator

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
LMDataTokenGenerator Module

This module provides the LMDataTokenGenerator class which is designed to process
text data and create features suitable for language modeling tasks.

Usage:
    tokenizer = LMDataTokenGenerator(dataset_params,max_sequence_length,tokenizer)
    tokenized_features = tokenizer.encode("Sample text for processing.")
"""

import random
from typing import Any, Dict, List, Tuple

import ftfy
import numpy as np

from cerebras.modelzoo.data_preparation.nlp.hdf5_preprocessing.utils import (
    split_text_and_tokenize,
    validate_tokens,
    wikitext_detokenizer,
)


[docs]def create_features_auto_lm( token_ids, max_sequence_length, short_seq_prob=0, inverted_mask=False, pad_id=0, min_len=10, input_ids_dtype="int32", input_mask_dtype="int32", labels_dtype="int32", rng=None, ): """Given a list of token_ids, generate input sequence and labels. Args: token_ids (sequence): List containing token ids for creating features, labels and input mask from. max_sequence_length (int): Maximum sequence length for data writes. short_seq_prob (float): Probability of generating short sequences from data. Defaults to `0`. inverted_mask (bool): Invert mask if specified for runtime execution. Defaults to `False`. min_len (int): Minimum length of token_ids to be considered a valid sequence. pad_id (int): Id for pad token. Defaults to `0`. input_ids_dtype (str): Dtype as string for input ids. Defaults to `int32`. input_mask_dtype (str): Dtype as string for input mask. Defaults to `int32`. labels_dtype (str): Dtype as string for labels. Defaults to `int32`. rng (random.Random obj): Instance of random object, with states set. Defaults to `None`. Returns: Tuple containing features, labels and number of pad tokens """ if not validate_tokens(token_ids, min_len=min_len): return [] if rng.random() < short_seq_prob: token_ids = token_ids[0 : rng.randint(2, max_sequence_length - 1)] input_ids = token_ids[:-1] labels = token_ids[1:] input_mask = [1] * len(input_ids) # padding num_pad = max_sequence_length - len(input_ids) padding = [pad_id] * num_pad input_ids.extend(padding) labels.extend(padding) input_mask.extend([0] * num_pad) # assertions to ensure correct output shapes assert ( len(input_ids) == max_sequence_length and len(labels) == max_sequence_length and len(input_mask) == max_sequence_length ), "Wrong sequence length" # create feature dict features = dict() features["input_ids"] = getattr(np, input_ids_dtype)(input_ids) features["input_mask"] = getattr(np, input_mask_dtype)(input_mask) if inverted_mask: features["input_mask"] = np.equal(features["input_mask"], 0).astype( features["input_mask"].dtype ) labels = getattr(np, labels_dtype)(labels) return np.stack([features["input_ids"], features["input_mask"], labels])
[docs]class LMDataTokenGenerator:
[docs] def __init__(self, params, tokenizer, eos_id, pad_id): """ Initialize the LMDataTokenGenerator class. Args: vocab_file (str): Path to the vocabulary file. encoder_file (str): Path to the encoder file. max_seq_length (int, optional): Maximum sequence length. Defaults to 2048. """ dataset_params = params["dataset"] processing_params = params["processing"] self.tokenizer = tokenizer self.use_ftfy = dataset_params.pop("use_ftfy", False) self.ftfy_normalizer = dataset_params.pop("ftfy_normalizer", "NFC") self.wikitext_detokenize = dataset_params.pop( "wikitext_detokenize", False ) self.pack_sequences = dataset_params.pop("pack_sequences", True) self.min_sequence_len = dataset_params.pop("min_sequence_len", 10) self.input_ids_dtype = dataset_params.pop("input_ids_dtype", "int32") self.input_mask_dtype = dataset_params.pop("input_mask_dtype", "int32") self.inverted_mask = dataset_params.pop("inverted_mask", False) self.inverted_mask = dataset_params.pop("inverted_mask", False) self.max_seq_length = processing_params.pop("max_seq_length", 2048) self.short_seq_prob = processing_params.pop("short_seq_prob", 0.0) self.seed = processing_params.pop("seed", 0) self.files_per_record = processing_params.pop( "files_per_record", 50000 ) ## redundant param self.split_text_to_tokenize = processing_params.pop( "split_text_to_tokenize", False ) if self.split_text_to_tokenize: self.chunk_len_to_split = processing_params.pop( "chunk_len_to_split", 2000 ) self.remove_bos_in_chunks = processing_params.pop( "remove_bos_in_chunks", False ) self.eos_id = eos_id self.pad_id = pad_id self.rng = random.Random() self.rng.seed(self.seed) self.prefix = []
[docs] def tokenize_text(self, text: str) -> List[int]: """ Tokenize the provided text. Args: text (str): Text to tokenize. Returns: List[int]: List of token IDs. """ return self.tokenizer.encode(text)
[docs] def process_chunks( self, tokenized_text_chunks: List[List[int]] ) -> Tuple[List[Any], Dict]: """ Processes chunks of tokenized text and returns processed features along with the total padding added. Args: tokenized_text_chunks (List[List[int]]): A list of tokenized text chunks, where each chunk is represented as a list of integers. Returns: Tuple[List[Any], Dict]: A tuple containing a list of processed results and dataset stats. """ results = [] # List to store the processed results stats = { "loss_valid_tokens": 0, "num_tokens": 0, "num_pad_tokens": 0, "non_pad_tokens": 0, "num_masked_tokens": 0, "empty_chunks": 0, } # Iterate over each chunk in the tokenized text chunks for chunk in tokenized_text_chunks: # Process the chunk and get the processed result and number of padding tokens added processed = create_features_auto_lm( chunk, self.max_seq_length, short_seq_prob=self.short_seq_prob, inverted_mask=self.inverted_mask, pad_id=self.pad_id, min_len=self.min_sequence_len, input_ids_dtype=self.input_ids_dtype, input_mask_dtype=self.input_mask_dtype, labels_dtype=self.input_ids_dtype, rng=self.rng, ) # If the processed chunk is not empty, add the results to the list and update the total padding if processed != []: loss_valid_tokens = int(processed[1, :].sum()) num_pad = int((processed[0, :] == self.pad_id).sum()) stats["num_pad_tokens"] += num_pad stats["non_pad_tokens"] += self.max_seq_length - num_pad stats["num_masked_tokens"] += ( self.max_seq_length - loss_valid_tokens ) stats["loss_valid_tokens"] += loss_valid_tokens stats["num_tokens"] += len(processed[0]) results.append(processed) else: stats["empty_chunks"] += 1 # Return the list of processed results and data stats return results, stats
[docs] def tokenize_text_auto_lm(self, text: str) -> Tuple[List[np.ndarray], Dict]: """ Tokenize the text and create features for auto-regressive language modeling. Args: text (str): Text to tokenize. Returns: Tuple[List[np.ndarray], Dict]: Tuple of encoded features for auto-regressive language modeling and dataset stats. """ discarded_files = 0 num_false_discards = 0 # tokenize text if self.split_text_to_tokenize: # TODO: implement a better fix for this by updating the tokenizer # normalization rules. This is a temporary fix and it may # cause issues with the spacing tokens being repeated. tokenized_text = split_text_and_tokenize( text, self.tokenizer, max_tok_len=self.chunk_len_to_split, remove_bos_in_chunks=self.remove_bos_in_chunks, ) else: tokenized_text = self.tokenize_text(text) if self.eos_id is not None: tokenized_text += [self.eos_id] all_text = self.prefix + tokenized_text tokenized_text_chunks = [ all_text[i : i + self.max_seq_length + 1] for i in range(0, len(all_text), self.max_seq_length) ] # reset prefix self.prefix = [] # update prefix if last chunk is < max_seq_length num_tokens_last_chunk = len(tokenized_text_chunks[-1]) if self.pack_sequences: if num_tokens_last_chunk < self.max_seq_length + 1: last_chunk = tokenized_text_chunks.pop(-1) self.prefix.extend(last_chunk) if tokenized_text_chunks == []: num_false_discards = 1 elif num_tokens_last_chunk < 2: _ = tokenized_text_chunks.pop(-1) discarded_files += 1 results, stats = self.process_chunks(tokenized_text_chunks) stats["discarded_files"] = discarded_files - num_false_discards return results, stats
[docs] def encode(self, data: str) -> Tuple[List[np.ndarray], Dict]: """ Tokenize and encode the data for auto-regressive language modeling. Args: data (str): Text data to encode. Returns: Tuple[List[np.ndarray], Dict]: Tuple of encoded features for auto-regressive language modeling and dataset stats. """ raw_chars_count = len(data) raw_bytes_count = len(data.encode("utf-8")) files_processed = 1 discarded_files = 0 normalized_chars_count = raw_chars_count normalized_bytes_count = raw_bytes_count if self.use_ftfy: data = ftfy.fix_text(data, normalization=self.ftfy_normalizer) normalized_chars_count = len(data) normalized_bytes_count = len(data.encode("utf-8")) if self.wikitext_detokenize: data = wikitext_detokenizer(data) normalized_chars_count = len(data) normalized_bytes_count = len(data.encode("utf-8")) sample, stats = self.tokenize_text_auto_lm(data) discarded_files = stats["discarded_files"] if sample == []: discarded_files += 1 data_stats = { "discarded": discarded_files, "processed": files_processed, "successful": files_processed - discarded_files, "raw_chars_count": raw_chars_count, "raw_bytes_count": raw_bytes_count, "num_pad_tokens": stats["num_pad_tokens"], "non_pad_tokens": stats["non_pad_tokens"], "num_masked_tokens": stats["num_masked_tokens"], "loss_valid_tokens": stats["loss_valid_tokens"], "num_tokens": stats["num_tokens"], "normalized_chars_count": normalized_chars_count, "normalized_bytes_count": normalized_bytes_count, } return sample, data_stats
[docs] def encode_leftover_prefix( self, prefix: List[np.ndarray] ) -> Tuple[List[np.ndarray], Dict]: """ Processes the leftover prefix which is a list of ndarray tokens into chunks based on max sequence length. The last chunk is handled specifically if it's shorter than the max sequence length. If the last chunk has less than two tokens, it's discarded. Args: prefix (List[np.ndarray]): The prefix list of token arrays to process. Returns: Tuple[List[np.ndarray], Dict]: A tuple containing the processed token chunks as a list of ndarrays and the dataset stats. """ tokenized_text_chunks = [ prefix[i : i + self.max_seq_length + 1] for i in range(0, len(prefix), self.max_seq_length) ] # Handle last chunk if shorter than max_seq_length num_tokens_last_chunk = len(tokenized_text_chunks[-1]) if num_tokens_last_chunk < self.max_seq_length + 1: _ = tokenized_text_chunks.pop(-1) elif num_tokens_last_chunk < 2: _ = tokenized_text_chunks.pop(-1) # Assuming process_chunks is a method that processes each tokenized chunk results, stats = self.process_chunks(tokenized_text_chunks) return results, stats
[docs] def get_token_id(self, token: str) -> int: """ Get the token ID for the given token. Args: token (str): Token for which the ID is needed. Returns: int: Token ID. """ return self.tokenizer.get_token_id(token)