torch.distributed

Overview

torch.distributed launcher is used for multi-process parallelism across several computation nodes running on one or more machines. This launcher actually does a similar work to the launch utility in torch.distributed.launch. One difference is that Azure ML torch.distributed launcher is per-process-launcher while torch.distributed.launch is per-node-launcher. In Azure ML torch.distributed launcher, user only needs to provide instance_count and process_count_per_node to determine the process group size. The launcher will help automatically set up torch distributed required environmental variables like MASTER_ADDR, MASTER_PORT, RANK, LOCAL_RANK, and then run the script file per process.

Note:

torch.distributed launcher adopts NCCL backend and it is recommended to use TCP initialization to initialize a process group.

How to author and consume

Here are the steps user can follow:

  1. Use a proper PyTorch AzureML environment. The latest version AzureML-PyTorch-1.6-GPU is recommended for advanced distributed features.

  2. Prepare the user script and map environment variables AzureML set up to your needs. See detailed instruction and examples.

  3. Define a torch.distributed launcher type Distributed Component

  4. Submit the component in a pipeline with correct runsetting: instance_count, process_count_per_node.

How to define a torch.distributed DistributedComponent yaml spec

Please refer to DistributedComponent spec doc for spec definition.

Please refer to DistributedComponent Schema.

The only different part of DistributedComponent yaml from a CommandComponent, is the launcher section. Below is an example yaml for torch.distributed launch type:

launcher:
  type: torch.distributed
  additional_arguments: >-
    python train.py --data-dir {inputs.data_dir} --num-epochs {inputs.num_epochs} --learning-rate {inputs.learning_rate} --momentum {inputs.momentum} --output-dir {outputs.output_dir}

How to consume a torch distributed component

Below is an example pipeline which configures the desired runsetting on instance_count, process_count_per_node:

@dsl.pipeline(default_compute_target=cluster_name)
def torch_pipeline(dataset, num_epochs) -> Pipeline:
    torch_train_component = torch_train_func(data_dir=dataset, num_epochs=num_epochs)
    torch_train_component.runsettings.resource_layout.configure(instance_count=2, process_count_per_node=2)
    return torch_train_component.outputs

Samples

Follow how to access instructions if you meet 404 error when accessing the samples.

[!NOTE] If you meet an unsupported cluster error like “Pytorch jobs are not enabled in this cluster” when running the sample on itp cluster, you could contact itp team to enable the feature.

Reference

Environment Variables from PyTorch distributed

When running in PyTorch distributed environment, AzureML sets the following environment variables for each process launched:

  1. MASTER_ADDR - master node (rank 0)’s ip address.

  2. MASTER_PORT - master node (rank 0)’s free port that needs to be used for communication during distributed training.

  3. RANK - rank of the current process.

  4. LOCAL_RANK - relative rank within the node, also known as gpu index.

  5. WORLD_SIZE - number of processes participating in the job.

The following code snippets kindly point out where you may modify your training script to use torch distributed training. See the torch_distributed_train.py for the whole script.

def parse_args():
    parser = argparse.ArgumentParser()
    args = parser.parse_args()
    master_addr = os.environ.get("MASTER_ADDR", None)
    master_port = os.environ.get("MASTER_PORT", None)
    args.dist_url = f"tcp://{master_addr}:{master_port}" if master_addr else None
    args.distributed = args.dist_url is not None and torch.cuda.is_available()
    args.rank = int(os.environ["RANK"]) if args.distributed else None
    args.world_size = int(os.environ["WORLD_SIZE"]) if args.distributed else None
    args.local_rank = int(os.environ["LOCAL_RANK"]) if args.distributed else None
    args.dist_backend = 'nccl'
    return args


def train(data_dir,
          model,
          ...
          distributed=False,
          local_rank=None):
    ...
    # Set distributed sampler for distributed training.
    if distributed:
        train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)
    else:
        train_sampler = None
    train_loader = torch.utils.data.DataLoader(train_set,
                                               shuffle=train_sampler is None,
                                               sampler=train_sampler,
                                               pin_memory=torch.cuda.is_available(),
                                               num_workers=0)                               
    if distributed:
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
    ...


def main():
    ...
    if args.distributed:
        torch.cuda.set_device(args.local_rank)
        os.environ['NCCL_BLOCKING_WAIT'] = '1'
        dist.init_process_group(backend=args.dist_backend,
                                init_method=args.dist_url,
                                world_size=args.world_size,
                                rank=args.rank,
                                timeout=datetime.timedelta(seconds=5))
    ...
    model = train(args.data_dir,
                  model,
                  ...
                  distributed=args.distributed,
                  local_rank=args.local_rank)

Tip:

To see the list of environment variables provided by AzureML, just print os.environ in your training script.

import os
print(os.environ)

Notes: Above environment variables MASTER_ADDR works in AMLCompute. For other computes type the environment variable names may be different, e.g. ITP uses MASTER_IP, learn more: Distributed training job using PyTorch.