MPI

Overview

MPI Launcher type means leveraging mpirun to launch a given number of processes in each node.

In this model, there is a cluster consistes of a number of processes (workers) run on the nodes (number of processes = number of nodes * process count per node) – each process will be assigned a global rank (and a local rank). All processes are equal, and after the cluster has been formed, mpi will execute the same command in each of them. The executing application will then decide based on the rank what work the process should do. Rank 0 is often given a special task (like accumulating results and writing them out, e.g. write out the model, logs, metrics, etc.).

Tip:

It is a common confusion of the concept of mpirun and mpi as communication backend and users might think AzureML mpirun won’t be able to run distributed jobs with nccl or gloo backend. Unfortunately the difference is not well explained in official AzureML documentation. AzureML uses mpirun as launcher utility to launch processes for user’s training script in distributed nodes and sets up all necessary environment variables which user can use to init the process group. The real communication backend is what user set in their training script.

Note:

This document is mainly based on PyTorch concepts.

How to author and consume

User can adopt this approach to run distributed training using either per-process-launcher or per-node-launcher, depending on whether user sets process_count_per_node to be only 1 for per-node-launcher, or equal to the number of devices/GPUs for per-process-launcher. Currently AzureML does not expose cluster hostfiles for user to launch a head-node-launcher like DeepSpeed launcher. It runs mpirun behind the scene.

No matter which launch style user choose, users can follow these steps:

  1. Use an AzureML environment with the preferred deep learning framework and MPI. AzureML provides curated environment for popular frameworks.

  2. Prepare the user script. See detailed instruction and examples.

  3. Define a distributed component with the mpi launcher.

  4. Submit the component in a pipeline with correct runsetting: node_count, process_count_per_node.

Caution:

To use AzureML mpirun, the base docker image used by the run needs to have Open MPI. They are included in all AzureML default GPU base images. User is responsible to install one of these when using custom base image. AzureML also provides curated environment for popular frameworks.

How to define mpi DistributedComponent yaml spec

The only different part of DistributedComponent yaml from a CommandComponent, is the launcher section. Below is an example yaml for mpi launch type:

launcher:
  type: mpi
  additional_arguments: >-
    python mpi_train.py --training_data {inputs.training_data} --max_epochs {inputs.max_epochs} --learning_rate {inputs.learning_rate} --model_output {outputs.model_output}

How to consume mpi distributed component

Below is a example pipeline which configures the desired runsetting on node_count, process_count_per_node:

from azure.ml.component import Component, dsl, Pipeline

# load the mpi train component you create by the component yaml which is workspace independent.
train_component_func = Component.from_yaml(yaml_file='components/imagecnn-train/entry.spec.yaml')

# define pipeline
@dsl.pipeline(name='mpi_train_pipeline', description='mpi train pipeline', default_compute_target=cpu_compute_target)
def train_pipeline() -> Pipeline:
    train = train_component_func(script_arg_0=arg_0, script_arg_1=arg_1)
	
    # below runsettings give the equivalent command as:
    train.runsettings.resource_layout.configure(
        node_count=2,  # use 2 nodes to train the model
        process_count_per_node=2)  # use 2 processes per node

# submit the pipeline
pipeline = train_pipeline()
pipeline.submit(experiment_name='mpi-sample-experiment', workspace=ws)

How to use MPI in user script

When the MPI job is submitted to the AzureML compute, the user script will be launched by mpirun and the MPI environment will be initialized. To get the MPI information in the user script, basically we recommend to use mpi4py. mpi4py should be added to the conda dependencies or the custom image.

Here is a simple example script:

from mpi4py import MPI
comm = MPI.COMM_WORLD
world_size = comm.Get_size()  # Get the size of the launched processes
rank = comm.Get_rank()  # Get the process' rank among all processes

Note:

  • mpi4py works well both in IntelMPI images and OpenMpi images;

  • In a distributed training job, we usually need local_rank to allocate gpus in the node, mpi4py doesn’t provide such method. In this case, you may use an OpenMpi image and get the rank from environment variables, see the reference below.

Samples

Follow how to access instructions if you meet 404 error when accessing the samples.

Reference

Please refer to DistributedComponent spec doc for spec definition.

Please refer to DistributedComponent Schema for the schema.

Environment Variables from OpenMPI

When running MPIRUN with OpenMPI images, AzureML sets the following environment variables for each process launched:

  1. OMPI_COMM_WORLD_RANK - the rank of the process

  2. OMPI_COMM_WORLD_SIZE - the world size

  3. AZ_BATCH_MASTER_NODE - master address with port, MASTER_ADDR:MASTER_PORT

  4. OMPI_COMM_WORLD_LOCAL_RANK - the local rank of the process on the node

  5. OMPI_COMM_WORLD_LOCAL_SIZE - number of processes on the node

Caution:

Despite the name, environment variable OMPI_COMM_WORLD_NODE_RANK does not correspond to the NODE_RANK. To use per-node-launcher, simply set process_count_per_node=1 and use OMPI_COMM_WORLD_RANK as the NODE_RANK.

The following code maps the OpenMPI environment variables to PyTorch style. For the majority of the pytorch script, simply call set_environment_variables_for_nccl_backend() function before your script calls torch.distributed.init_process_group. If your script passes in information like local_rank or rank as script arguments, just remove these and use provided helper functions get_local_rank() and get_rank() instead.

import os

def set_environment_variables_for_nccl_backend(master_port=6105, verbose=True):
    os.environ["RANK"] = os.environ["OMPI_COMM_WORLD_RANK"]
    os.environ["WORLD_SIZE"] = os.environ["OMPI_COMM_WORLD_SIZE"]
    single_node = int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]) == int(
        os.environ["WORLD_SIZE"]
    )
    if not single_node:
        master_node_params = os.environ["AZ_BATCH_MASTER_NODE"].split(":")
        os.environ["MASTER_ADDR"] = master_node_params[0]
        # Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE
        if "MASTER_PORT" not in os.environ:
            os.environ["MASTER_PORT"] = str(master_port)
    else:
        os.environ["MASTER_ADDR"] = os.environ["AZ_BATCHAI_MPI_MASTER_NODE"]
        os.environ["MASTER_PORT"] = "54965"
    print(
        "NCCL_SOCKET_IFNAME original value = {}".format(
            os.environ["NCCL_SOCKET_IFNAME"]
        )
    )
    os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
    if verbose:
        print("RANK = {}".format(os.environ["RANK"]))
        print("WORLD_SIZE = {}".format(os.environ["WORLD_SIZE"]))
        print("MASTER_ADDR = {}".format(os.environ["MASTER_ADDR"]))
        print("MASTER_PORT = {}".format(os.environ["MASTER_PORT"]))
        print(
            "NCCL_SOCKET_IFNAME new value = {}".format(os.environ["NCCL_SOCKET_IFNAME"])
        )

def get_rank():
    return int(os.environ["OMPI_COMM_WORLD_RANK"])

def get_local_rank():
    return int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"])

def get_global_size():
    return int(os.environ["OMPI_COMM_WORLD_SIZE"])

def get_local_size():
    return int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"])

def get_world_size():
    return int(os.environ["OMPI_COMM_WORLD_SIZE"])

Tip:

To see the list of environment variables provided by AzureML, just print os.environ in your training script.

import os
print(os.environ)