MPI
Overview
MPI Launcher type means leveraging mpirun to launch a given number of processes in each node.
In this model, there is a cluster consistes of a number of processes (workers) run on the nodes (number of processes = number of nodes * process count per node) – each process will be assigned a global rank (and a local rank). All processes are equal, and after the cluster has been formed, mpi will execute the same command in each of them. The executing application will then decide based on the rank what work the process should do. Rank 0 is often given a special task (like accumulating results and writing them out, e.g. write out the model, logs, metrics, etc.).
Tip:
It is a common confusion of the concept of mpirun and mpi as communication backend and users might think AzureML mpirun won’t be able to run distributed jobs with
ncclorgloobackend. Unfortunately the difference is not well explained in official AzureML documentation. AzureML uses mpirun as launcher utility to launch processes for user’s training script in distributed nodes and sets up all necessary environment variables which user can use to init the process group. The real communication backend is what user set in their training script.
Note:
This document is mainly based on PyTorch concepts.
How to define mpi DistributedComponent yaml spec
The only different part of DistributedComponent yaml from a CommandComponent, is the launcher section. Below is an example yaml for mpi launch type:
launcher:
type: mpi
additional_arguments: >-
python mpi_train.py --training_data {inputs.training_data} --max_epochs {inputs.max_epochs} --learning_rate {inputs.learning_rate} --model_output {outputs.model_output}
How to consume mpi distributed component
Below is a example pipeline which configures the desired runsetting on node_count, process_count_per_node:
from azure.ml.component import Component, dsl, Pipeline
# load the mpi train component you create by the component yaml which is workspace independent.
train_component_func = Component.from_yaml(yaml_file='components/imagecnn-train/entry.spec.yaml')
# define pipeline
@dsl.pipeline(name='mpi_train_pipeline', description='mpi train pipeline', default_compute_target=cpu_compute_target)
def train_pipeline() -> Pipeline:
train = train_component_func(script_arg_0=arg_0, script_arg_1=arg_1)
# below runsettings give the equivalent command as:
train.runsettings.resource_layout.configure(
node_count=2, # use 2 nodes to train the model
process_count_per_node=2) # use 2 processes per node
# submit the pipeline
pipeline = train_pipeline()
pipeline.submit(experiment_name='mpi-sample-experiment', workspace=ws)
How to use MPI in user script
When the MPI job is submitted to the AzureML compute, the user script will be launched by mpirun and the MPI environment will be initialized.
To get the MPI information in the user script, basically we recommend to use mpi4py.
mpi4py should be added to the conda dependencies or the custom image.
Here is a simple example script:
from mpi4py import MPI
comm = MPI.COMM_WORLD
world_size = comm.Get_size() # Get the size of the launched processes
rank = comm.Get_rank() # Get the process' rank among all processes
Note:
mpi4pyworks well both in IntelMPI images and OpenMpi images;In a distributed training job, we usually need
local_rankto allocate gpus in the node,mpi4pydoesn’t provide such method. In this case, you may use an OpenMpi image and get the rank from environment variables, see the reference below.
Samples
Follow how to access instructions if you meet 404 error when accessing the samples.
Image classification with MPI training - Demonstrates how to perform a distributed mpi training using PyTorch.
Train Tensorflow model with horovod - Demonstrates how to create a distributed component for Horovod script to train a Tensorflow model.
Reference
Please refer to DistributedComponent spec doc for spec definition.
Please refer to DistributedComponent Schema for the schema.
Environment Variables from OpenMPI
When running MPIRUN with OpenMPI images, AzureML sets the following environment variables for each process launched:
OMPI_COMM_WORLD_RANK - the rank of the process
OMPI_COMM_WORLD_SIZE - the world size
AZ_BATCH_MASTER_NODE - master address with port, MASTER_ADDR:MASTER_PORT
OMPI_COMM_WORLD_LOCAL_RANK - the local rank of the process on the node
OMPI_COMM_WORLD_LOCAL_SIZE - number of processes on the node
Caution:
Despite the name, environment variable OMPI_COMM_WORLD_NODE_RANK does not correspond to the NODE_RANK. To use per-node-launcher, simply set
process_count_per_node=1and useOMPI_COMM_WORLD_RANKas the NODE_RANK.
The following code maps the OpenMPI environment variables to PyTorch style. For the majority of the pytorch script, simply call set_environment_variables_for_nccl_backend() function before your script calls torch.distributed.init_process_group. If your script passes in information like local_rank or rank as script arguments, just remove these and use provided helper functions get_local_rank() and get_rank() instead.
import os
def set_environment_variables_for_nccl_backend(master_port=6105, verbose=True):
os.environ["RANK"] = os.environ["OMPI_COMM_WORLD_RANK"]
os.environ["WORLD_SIZE"] = os.environ["OMPI_COMM_WORLD_SIZE"]
single_node = int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]) == int(
os.environ["WORLD_SIZE"]
)
if not single_node:
master_node_params = os.environ["AZ_BATCH_MASTER_NODE"].split(":")
os.environ["MASTER_ADDR"] = master_node_params[0]
# Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE
if "MASTER_PORT" not in os.environ:
os.environ["MASTER_PORT"] = str(master_port)
else:
os.environ["MASTER_ADDR"] = os.environ["AZ_BATCHAI_MPI_MASTER_NODE"]
os.environ["MASTER_PORT"] = "54965"
print(
"NCCL_SOCKET_IFNAME original value = {}".format(
os.environ["NCCL_SOCKET_IFNAME"]
)
)
os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
if verbose:
print("RANK = {}".format(os.environ["RANK"]))
print("WORLD_SIZE = {}".format(os.environ["WORLD_SIZE"]))
print("MASTER_ADDR = {}".format(os.environ["MASTER_ADDR"]))
print("MASTER_PORT = {}".format(os.environ["MASTER_PORT"]))
print(
"NCCL_SOCKET_IFNAME new value = {}".format(os.environ["NCCL_SOCKET_IFNAME"])
)
def get_rank():
return int(os.environ["OMPI_COMM_WORLD_RANK"])
def get_local_rank():
return int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"])
def get_global_size():
return int(os.environ["OMPI_COMM_WORLD_SIZE"])
def get_local_size():
return int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"])
def get_world_size():
return int(os.environ["OMPI_COMM_WORLD_SIZE"])
Tip:
To see the list of environment variables provided by AzureML, just print
os.environin your training script.
import os
print(os.environ)