torch.distributed
Overview
torch.distributed launcher is used for multi-process parallelism across several computation nodes running on one or more machines.
This launcher actually does a similar work to the launch utility in torch.distributed.launch.
One difference is that Azure ML torch.distributed launcher is per-process-launcher while torch.distributed.launch is per-node-launcher.
In Azure ML torch.distributed launcher, user only needs to provide instance_count and process_count_per_node to determine the process group size.
The launcher will help automatically set up torch distributed required environmental variables like MASTER_ADDR, MASTER_PORT, RANK, LOCAL_RANK, and then run the script file per process.
Note:
torch.distributedlauncher adoptsNCCLbackend and it is recommended to use TCP initialization to initialize a process group.
How to define a torch.distributed DistributedComponent yaml spec
Please refer to DistributedComponent spec doc for spec definition.
Please refer to DistributedComponent Schema.
The only different part of DistributedComponent yaml from a CommandComponent, is the launcher section. Below is an example yaml for torch.distributed launch type:
launcher:
type: torch.distributed
additional_arguments: >-
python train.py --data-dir {inputs.data_dir} --num-epochs {inputs.num_epochs} --learning-rate {inputs.learning_rate} --momentum {inputs.momentum} --output-dir {outputs.output_dir}
How to consume a torch distributed component
Below is an example pipeline which configures the desired runsetting on instance_count, process_count_per_node:
@dsl.pipeline(default_compute_target=cluster_name)
def torch_pipeline(dataset, num_epochs) -> Pipeline:
torch_train_component = torch_train_func(data_dir=dataset, num_epochs=num_epochs)
torch_train_component.runsettings.resource_layout.configure(instance_count=2, process_count_per_node=2)
return torch_train_component.outputs
Samples
Follow how to access instructions if you meet 404 error when accessing the samples.
Fowl classification with PyTorch distributed training - Demonstrates how to perform a torch distributed training.
[!NOTE] If you meet an unsupported cluster error like “Pytorch jobs are not enabled in this cluster” when running the sample on itp cluster, you could contact itp team to enable the feature.
Reference
Environment Variables from PyTorch distributed
When running in PyTorch distributed environment, AzureML sets the following environment variables for each process launched:
MASTER_ADDR - master node (rank 0)’s ip address.
MASTER_PORT - master node (rank 0)’s free port that needs to be used for communication during distributed training.
RANK - rank of the current process.
LOCAL_RANK - relative rank within the node, also known as gpu index.
WORLD_SIZE - number of processes participating in the job.
The following code snippets kindly point out where you may modify your training script to use torch distributed training. See the torch_distributed_train.py for the whole script.
def parse_args():
parser = argparse.ArgumentParser()
args = parser.parse_args()
master_addr = os.environ.get("MASTER_ADDR", None)
master_port = os.environ.get("MASTER_PORT", None)
args.dist_url = f"tcp://{master_addr}:{master_port}" if master_addr else None
args.distributed = args.dist_url is not None and torch.cuda.is_available()
args.rank = int(os.environ["RANK"]) if args.distributed else None
args.world_size = int(os.environ["WORLD_SIZE"]) if args.distributed else None
args.local_rank = int(os.environ["LOCAL_RANK"]) if args.distributed else None
args.dist_backend = 'nccl'
return args
def train(data_dir,
model,
...
distributed=False,
local_rank=None):
...
# Set distributed sampler for distributed training.
if distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)
else:
train_sampler = None
train_loader = torch.utils.data.DataLoader(train_set,
shuffle=train_sampler is None,
sampler=train_sampler,
pin_memory=torch.cuda.is_available(),
num_workers=0)
if distributed:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
...
def main():
...
if args.distributed:
torch.cuda.set_device(args.local_rank)
os.environ['NCCL_BLOCKING_WAIT'] = '1'
dist.init_process_group(backend=args.dist_backend,
init_method=args.dist_url,
world_size=args.world_size,
rank=args.rank,
timeout=datetime.timedelta(seconds=5))
...
model = train(args.data_dir,
model,
...
distributed=args.distributed,
local_rank=args.local_rank)
Tip:
To see the list of environment variables provided by AzureML, just print
os.environin your training script.
import os
print(os.environ)
Notes: Above environment variables
MASTER_ADDRworks in AMLCompute. For other computes type the environment variable names may be different, e.g. ITP usesMASTER_IP, learn more: Distributed training job using PyTorch.