Source code for cerebras.modelzoo.data_preparation.nlp.slimpajama.preprocessing.datasets

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import os
import pickle
import random
from glob import glob
from multiprocessing import Manager, Process

from lm_dataformat import Reader
from tqdm import tqdm

from cerebras.modelzoo.data_preparation.nlp.slimpajama.utils import (
    cycle_documents,
    utf8len,
)


[docs]class Dataset(abc.ABC):
[docs] def dir_path(self): """Path to the directory"""
[docs] def short_documents_path(self): """Path to the file with short documents"""
[docs] def name(self): """Human-readable name of tfhe dataset"""
[docs] def documents(self, process_id, n_process, dup_sh, short_sh): """A generator producing all documents in the dataset.""" filtered = 0 total_count = 0 files = glob(self.dir_path()) random.shuffle(files) for file_path in files: reader = Reader(file_path) file_name = file_path.replace(self.stem_dir_path(), "") duplicates_set = dup_sh.get(file_name, set()) short_set = short_sh.get(file_name, set()) for doc_id, doc in enumerate(reader._stream_data(jsonl_key="text")): if doc_id % n_process == process_id: if doc_id not in short_set and doc_id not in duplicates_set: total_count += 1 yield {"doc": doc, "meta": {}} else: filtered += 1 print( f"Total number of documents: {total_count}", f"Filtered documents: {filtered}", )
[docs] def size(self): """Return an estimate of the dataset size. Implementations may use a faster, less accurate estimate.""" size = sum( map( lambda x: utf8len(x["doc"]), tqdm(self.documents(), total=self.num_docs()), ) ) return size
def num_docs(self): num_docs = sum( map( lambda x: 1, tqdm(self.documents(), total=self.num_docs()), ) ) return num_docs
[docs] def already_shuffled(self): """Datasets where the source is already shuffled should override this to return True so that it isn't shuffled again.""" return False
[docs]class RedPajamaBooksDataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "book/*.jsonl")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaBook"
[docs] def size(self): return 102851843814
def size_duplicate_docs(self): return 2106014751 def size_short_docs(self): return 0 def num_docs(self): return 200242 def num_duplicate_docs(self): return 5502 def num_short_docs(self): return 0
[docs]class RedPajamaArXivDataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "arxiv/*.jsonl")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaArXiv"
[docs] def size(self): return 89018875739
def size_duplicate_docs(self): return 54749418 def size_short_docs(self): return 574293 def num_docs(self): return 1546641 def num_duplicate_docs(self): return 1979 def num_short_docs(self): return 9686
[docs]class RedPajamaCommonCrawlDataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "common_crawl/*/*.jsonl.zst")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaCommonCrawl"
[docs] def size(self): return 1384835073956
def size_duplicate_docs(self): return 2436638659265 def size_short_docs(self): return 6867259 def num_docs(self): return 187084822 def num_duplicate_docs(self): return 289100390 def num_short_docs(self): return 90807
[docs]class RedPajamaC4Dataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "c4/*.jsonl")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaC4"
[docs] def size(self): return 734903985384
def size_duplicate_docs(self): return 53403692569 def size_short_docs(self): return 664163266 def num_docs(self): return 324686115 def num_duplicate_docs(self): return 23015691 def num_short_docs(self): return 17167086
[docs]class RedPajamaWikipediaDataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "wikipedia/*.jsonl")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaWikipedia"
[docs] def size(self): return 78649866316
def size_duplicate_docs(self): return 1798885899 def size_short_docs(self): return 0 def num_docs(self): return 26967854 def num_duplicate_docs(self): return 2866317 def num_short_docs(self): return 0
[docs]class RedPajamaGithubDataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "github/*.jsonl")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaGithub"
[docs] def size(self): return 105581774510
def size_duplicate_docs(self): return 90515346113 def size_short_docs(self): return 0 def num_docs(self): return 21232084 def num_duplicate_docs(self): return 7561228 def num_short_docs(self): return 0
[docs]class RedPajamaStackExchangeDataset(Dataset):
[docs] def __init__(self, input_dir): self.stem_dir_path_ = input_dir self.dir_path_ = os.path.join(input_dir, "stackexchange/*.jsonl")
[docs] def dir_path(self): return self.dir_path_
def stem_dir_path(self): return self.stem_dir_path_
[docs] def name(self): return "RedPajamaStackExchange"
[docs] def size(self): return 71278349386
def size_duplicate_docs(self): return 139373830 def size_short_docs(self): return 3987870 def num_docs(self): return 29702946 def num_duplicate_docs(self): return 25975 def num_short_docs(self): return 96165
[docs]class RedPajamaReplication(Dataset):
[docs] def __init__(self, datasets, duplicates, short_docs): self.datasets = datasets self.duplicates = duplicates self.short_docs = short_docs self.rnd_docs = random.Random(42) self.rnd_queues = random.Random(420)
[docs] def name(self): return "RedPajama"
[docs] def size(self): return int(sum([weight * ds.size() for ds, weight in self.datasets]))
[docs] def num_docs(self): """Return an estimate of the dataset number of documents. Implementations may use a faster, less accurate estimate.""" return int( sum([ds.num_docs() * weight for ds, weight in self.datasets]) )
def sample_documents( self, weights, k, queues, process_id, n_process, dup_sh, short_sh ): # each process is going to sample documents with batch size k # sampling is happening globally across all available documents; datasets = [] for dataset, _ in self.datasets: datasets.append( ( dataset.name(), cycle_documents( dataset, process_id, n_process, dup_sh, short_sh ), ) ) for j in range(self.num_docs() // k // n_process): if j % 1000 == 0: print(f"Sampling chunk of documents {j}") chunk = self.rnd_docs.choices( population=datasets, weights=weights, k=k, ) for name, documents in chunk: document = next(documents) text, meta = document["doc"], document["meta"] meta["redpajama_set_name"] = name q = self.rnd_queues.choice(queues) q.put({"doc": text, "meta": meta}) print("Finished sampling documents.")
[docs] def documents(self, queues): weights = [] # calculate relative_weight for each total_weight = sum([x[1] * x[0].num_docs() for x in self.datasets]) for dataset, weight in self.datasets: relative_weight = weight * dataset.num_docs() / total_weight weights.append(relative_weight) with open(self.duplicates, "rb") as fin: dup = pickle.load(fin) with open(self.short_docs, "rb") as fin: short = pickle.load(fin) manager = Manager() dup_sh = manager.dict(dup) short_sh = manager.dict(short) # create processes here to speed up read and write in shuffle_holdout.py # queues are given by shuffle_holdout to populate with documents n_process = 2 * len(queues) k = 1000 procs = [] for process_id in range(n_process): p = Process( target=self.sample_documents, args=( weights, k, queues, process_id, n_process, dup_sh, short_sh, ), ) procs.append(p) return procs, manager
[docs]def redpj_datasets(input_dir): return [ (RedPajamaWikipediaDataset(input_dir), 1.0), (RedPajamaC4Dataset(input_dir), 1.0), (RedPajamaCommonCrawlDataset(input_dir), 1.0), (RedPajamaStackExchangeDataset(input_dir), 1.0), (RedPajamaBooksDataset(input_dir), 1.0), (RedPajamaGithubDataset(input_dir), 1.0), (RedPajamaArXivDataset(input_dir), 1.0), ]