Multi-Worker Input Pipeline

Because the CS system can train a neural network at a very high speed, your input pipeline must be very fast to feed the input data to the CS system. When you are training a neural network on the CS system, a low input data throughput will become a bottleneck, preventing high CS system utilization. To achieve a high input data throughput often requires that your input pipeline is running on multiple worker processes across many CPU nodes at once.

For this reason, in TensorFlow, it is important to configure the input pipeline to properly cycle through these multiple worker processes. This section presents a few guidelines that apply to multi-worker CS system configuration.

Dataset for input workers

The following are the common ways to set up your dataset for multiple input worker nodes:

Dataset in NFS storage

The complete dataset is placed on a shared network file system such as NFS. Each input worker node will then access this shared NFS storage.

Copy the dataset

Another approach is to copy the entire dataset into the local memory of each input worker CPU node. Each input worker node will then have its own copy of the full dataset, and will independently and asynchronously send the input data to the CS system. To ensure that the model does not see duplicate samples or samples in the same order, you must reshuffle buffers after each iteration. More on this below in Configuring multi-worker input pipeline.

Shard the dataset

Alternately, you can shard the dataset across the N input worker nodes. This is a recommended option when your dataset is very large or the available local memory on your CPU nodes is limited.

In this approach, you split your dataset into multiple, shorter, distinct files and allocate them across the worker nodes so that different workers have different slices of the dataset. Sharding can make managing the dataset easier. Sharding also improves the capacity to shuffle the dataset, as the shuffle buffer memory needed is only to shuffle shards, and not the entire dataset. See Sharding For the CS system and the section Sharding dataset in Best Practices.

The rest of this document presents how to configure multiple workers without using sharding.

Configuring multi-worker input pipeline

In a multi-worker CS system environment, the Slurm orchestrator launches multiple, concurrent data worker processes that stream data to the CS system (see Train, Eval, and Predict).

On each worker an environment variable is defined that contains both that worker’s unique ID and the total number of workers that are running concurrently. This environment variable, the TF_CONFIG in TensorFlow, can be used to split the data set across workers and provide unique random seeds to each worker. See Setting the runtime configuration.

Determinism

In a multi-worker CS system environment, each worker runs asynchronously, streaming data to the CS system. There is no synchronization across the worker processes, and the data streaming is non-deterministic. The order in which the input data is streamed cannot be reproduced from training session to training session when more than one worker process is involved.

For determinism, the number of workers should to be limited to one. However, the single-worker mode is usually used for debugging. The data throughput and latency will be sub-optimal in this mode.

Tip

When determinism in data loading is required, a single worker scenario should be used. Slurm should be configured for the chief and worker as follows:

--nodes=1 --tasks-per-node=1

where only the worker node streams input data to the training process.

Furthermore, to ensure determinism with a single worker setup:

  • All randomness in the input function from operations, like shuffling, should be seeded.

  • Any parallelism in calls like interleave or map should be removed by setting num_parallel_calls to 1.

Shuffling buffers

Proper shuffling of the data samples is critical for model convergence, especially in a multi-worker input data streaming. Often the data is written in a non-random order, so there can be correlations in the data that can adversely affect training (for example, in MNIST having all the zeros first, the ones next, and so on).

In addition, each worker may stream identical data if it is not shuffled randomly across workers. This can become a serious issue if the data is not sharded, in which case the data from each worker could be highly correlated.

Moreover, the data should be reshuffled after each iteration so that during each epoch of the training the input training data that is processed arrives in a random order.

TensorFlow provides a built-in method to shuffle the data. See below:

dataset.shuffle(
 buffer_size,
 seed,
 reshuffle_each_iteration)

See Shuffling dataset in Best Practices and the TensorFlow documentation.

Optimizing Input Pipeline

Parallelism

In addition to the parallelism introduced by the multi-worker setup, each process can also take advantage of the parallel processing included in the TensorFlow data loader functionality. This will be parallelism in the data loader on each worker process itself.

The following guidelines apply.

  • For most applications, it is best to set the buffer_size to CS_AUTOTUNE and allow the buffer size to be dynamically tuned at runtime for the optimal results. See The CS_AUTOTUNE.

  • Set num_parallel_calls=CS_AUTOTUNE, which dynamically determines the number of threads at runtime.

Follow these TensorFlow recommendations to achieve best performance on the CS system:

Caching

To avoid executing time-consuming operations, such as map and opening a file, every iteration, perform dataset caching in memory or in local file using TensorFlow’s Dataset.cache.

All the operations prior to the cache operation are executed once during in the first iteration and loaded from cache file in the subsequent iterations, hence speeding up the input pipeline.

Use the cache operation in scenarios where the dataset still fits in the local file or memory. In cases where the user-defined function increases the dataset size, apply the cache operation prior to the function.

Note

Cache produces the same sequence of elements each iteration, hence dataset.shuffle should be used after cache operation to randomize data in the input pipeline.

See below for a recommended sequence of operations when using Dataset.cache:

dataset = dataset.map(time_consuming_mapping)
dataset = dataset.cache()
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(memory_consuming_mapping)

Follow these TensorFlow recommendations to achieve best performance on the CS system:

Summary of best practices

Data format

Use TFRecord files for best performance.

Sharding
  • Shard before using any randomizing operation.

  • Shard as early in the pipeline as possible.

  • Keep the data file size from getting too big. Maintain TFRecords less than 2GB, best if ~100MB.

  • When sharding, ensure there are enough shards such that each worker has at least one file.

Shuffling and shuffle buffer
  • The larger the shuffle buffer the better, but it can cause performance slowdown due to consumption of large memory and time in order to fill the buffer.

  • In theory, a shuffle buffer as large as the data will generate the best randomization of the samples, but this is often too big to use in practice.

Tip

Use a shuffle buffer of size at least ten times the batch size.

  • Randomizing the order of the data during datafile generation will help.

  • The Dataset.shuffle does not signal the end of an epoch until the shuffle buffer is empty. Hence, a shuffle placed before a repeat will show every element of one epoch before moving to the next. A repeat before a shuffle mixes the epoch boundaries together.

Prefetch
  • Always add prefetch as the last step of the data input pipeline.

  • Best to set the buffer_size to CS_AUTOTUNE.

  • Prefetch operates on the data from the previous stage in the pipeline. If the data is batched before prefetching will prefetch a buffer of batches, and not single samples.

Parallel calls
  • Use num_parallel_calls option for Dataset.map and Dataset.interleave function calls in the pipeline to process using multiple threads in parallel.

  • Set num_parallel_calls=CS_AUTOTUNE to determine the number of threads in the threadpool dynamically.

Batching
  • Currently there is no support for not dropping the remainder of the last batch. Hence, set drop_remainder to True.

Repeat
  • Leave the number of times to repeat the data loader at the default setting.

Cache
  • Cache the dataset after a time-consuming operation and before a memory consuming operation. See the below example:

    dataset.map(time_consuming_mapping).cache().shuffle(shuffle_buffer).map(memory_consuming_mapping

  • Shuffle the dataset after cache to sufficiently randomize the data.

Order of operations
  • Follow the below suggested order of operations in a pipeline:

    • Dataset_generator → shard → shuffle → batch → map → repeat → prefetch.

    • The order of map and batch depends on the map operation. When applying a map function for each example in dataset, map should come before batching. When a map function can process batches of data, it is better to batch first and then map, as the vectorized operation of map leads to the speed-up of pipeline.

  • Batch before repeat gives clear epoch separation, whereas batch after repeat does not.