How to Interact with Multiple GPUs using TensorFlow
Tensorflow module has known issues After June 2024 maintenance, the module tensorflow/rocm5.6-tf2.12
has shown some problems. If you are facing issues using this module, please raise a ticket to our helpdesk and, in the meantime, try the following temporary fix:tensorflow
module, first swap pawseyenv
module to the previous software stack as explained here: June 2024 Software Update - Important Informationtensorflow
module
In this tutorial, you are going to see how to write a Horovod-powered distributed TensorFlow computation. More specifically, the final goal is to train different models in parallel by assigning each of them to a different GPU. The discussion is organised in two sections. The first section illustrates Horovod's basic concepts and its usage coupled with TensorFlow, the second one uses the MNIST classification task as test case.
Example: Horovod and Tensorflow Basics
Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. Its computational model is inspired by the MPI specification (and uses MPI for its implementation too). When Horovod is used in a Python program, which in turn is launched using srun
, it enables various spawned processes to be aware of each other and to communicate among themselves. For this communication to happen, the init
function must be called at the beginning of the program (similar to the MPI_Init
call). Let's see how it works by building a Python program example. Its purpose will be to create
processes, distributed across n
m
machines, and to assign to each of them a GPU to work with.
import tensorflow as tf import horovod.tensorflow as hvd import logging # Show our log messages logging.basicConfig(level=logging.INFO) # ...but disable tesorflow's ones except for errors logging.getLogger("tensorflow").setLevel(logging.ERROR) # initialize horovod - this call must always be done at the beginning of our scripts. hvd.init()
In the first three lines, the tensorflow
, horovod.tensorflow
modules, that provides functionalities to enhance Tensorflow capabilities, and logging
are imported. The last one is used by us to log information during execution, but mostly to avoid Tensorflow printing a very verbose log (only error messages are allowed to show up). Finally, the init
function is invoked. So far, so good.
One of the main concepts of MPI is process rank. It turns out that also Horovod exposes functionalities for a process to know its rank within the set of processes created by the srun
command. More specifically,
- the rank of a process is its index within a given ordering of all processes participating in the distributed computation
- the local rank of a process is its index within a given ordering of all processes participating in the distributed computation that are running on the same node as the process in question
Moreover, each process can invoke functions to know the size and local size of the computation. As you can expect, they are the total number of processes and the number or processes in the caller's node respectively.
As an example, consider a distributed computation involving 4 processes (size = 4) distributed across 2 nodes, each node running 2 of the 4 processes (both local_size values are equal to 2). The rank of a process can take values in [0, 1, 2, 3]
, while the local rank can take the value of either 0
or 1
. These values can be used to control which processes execute which instructions.
With this information in mind, let's continue with the Python program.
# retrieve and print the process's global and local ranks rank = hvd.rank() local_rank = hvd.local_rank() size = hvd.size() local_size = hvd.local_size() logging.info(f"This is process with rank {rank} and local rank {local_rank}") # each process retrieves the list of gpus available on its node gpus = tf.config.experimental.list_physical_devices('GPU') if local_rank == 0: logging.info(f"This is process with rank {rank} and local rank {local_rank}: gpus available are: {gpus}") # each process selects a gpu (if any gpu is available) if local_rank >= len(gpus): raise Exception("Not enough gpus.") tf.config.experimental.set_visible_devices(gpus[local_rank], 'GPU') # From now on each process has its own gpu to use.
The first two lines are convenient function calls to retrieve the rank and local rank of the process, which are then logged for demonstration purposes. Next, each process retrieves the list of GPUs that are available on the node it is running on. Of course, processes on the same node will retrieve the same list, whereas any two processes running on different nodes will have different, non overlapping, sets of GPUs. In the latter case, resource contention is structurally impossible; it is in the former case that the local rank concept comes handy. Each process uses its local rank as index to select a GPU in the gpus
list and will not share it with any other processes because:
- other processes on other nodes will select a GPU from a different list
- other processes on the same node will select a different GPU because the local rank is unique within the same node
The caveat is that there must be enough GPUs on each node. Another approach is to allow two or more processes to share the same GPU, taking care of assigning processes to GPUs as evenly as possible (e.g. with a round robin policy).
The last function call sets the GPU Tensorflow will use for each process.
To test what we have written so far, use the batch job script runTensorflow.sh
provided in the previous page as a template for submitting the job. You will need to adapt the batch job script and remove the exclusive
option to change the number of GPUs per node to 2 in the request of resources together with changes in the srun
command, and use of the python script (01_horovod_mnist.py
) containing the two parts described above. The adapted lines of the batch job script should look like:
#SBATCH --nodes=2 #2 nodes in this example
#SBATCH --gres=gpu:2 #2 GPUS per node
.
.
PYTHON_SCRIPT=$PYTHON_SCRIPT_DIR/01_horovod_mnist.py
.
.
srun -N 2 -n 4 -c 8 --gres=gpu:2 python3 $PYTHON_SCRIPT
(Note that the resource request for GPU nodes is different from the usual Slurm allocation requests and also the parameters to be given to the srun
command. Please refer to the page Example Slurm Batch Scripts for Setonix on GPU Compute Nodes for a detailed explanation of resource allocation on GPU nodes.)
You should see an output similar to the following one:
INFO:root:This is process with rank 1 and local rank 1 INFO:root:This is process with rank 0 and local rank 0 INFO:root:This is process with rank 3 and local rank 1 INFO:root:This is process with rank 2 and local rank 0 INFO:root:This is process with rank 0 and local rank 0: gpus available are: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')] INFO:root:This is process with rank 2 and local rank 0: gpus available are: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]
Now we are ready to work with multiple models in parallel!
Example: The MNIST classification task
The MNIST dataset is an acronym that stands for the Modified National Institute of Standards and Technology dataset. It comprises 60,000 small square 28x28 pixel grayscale images of handwritten single digits between 0 and 9. The task is to classify a given image of a handwritten digit into one of 10 classes representing integer values from 0 to 9, inclusively.
Starting from the Python program started above, let us add the additional code to complete the task. In particular, suppose we want to train 4 models with the same architecture but different training data, each of them on a different GPU. Here is the code to do that.
# From now on each process has its own gpu to use. # We will now train the same model on each gpu indipendently, and make each of them # output a prediction for a different input. mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy']) # We will partition the training set evenly among processes so that the same model # is trained by each process on different data. dataset_size = len(x_train) from math import ceil # samples per model - number of samples to train each model with spm = ceil(dataset_size / size) model.fit(x_train[rank*spm:(rank+1)*spm], y_train[rank*spm:(rank+1)*spm], epochs=15) print(model.evaluate(x_test, y_test, verbose=2))
First, the dataset is loaded from tensorFlow
module (being a standard dataset for test cases, Tensorflow provides a convenient function to retrieve it) and then split in two parts, one for training and the other for testing. What follows is the definition of the model and the loss function. Until now, every process executes the same code. They diverge when the model.fit
function is called. Indeed, the training dataset is implicitly partitioned using the size of the computation and rank of a process. Each process gets a different portion of samples because the rank is unique among all processes. Therefore, each trained model is different from one another. To prove this, each model is evaluated using the same test set through the model.evaluate
call. If you run the Python program adding this last part you should see that the accuracy reported from every task is slightly different. You can use the rank and size values in if
statements to train completely different models and, in general, make each process follow a different execution path.
Related pages
- Running TensorFlow on Setonix
- How to Use Horovod for Distributed Training in Parallel using TensorFlow
- Example Slurm Batch Scripts for Setonix on GPU Compute Nodes