Parallel Component

Overview

ParallelComponent is a kind of component to run ParallelRunStep. Similar to ParallelRunStep, ParallelComponent is suitable for the scenarios that the input includes a large amount of data. The typical scenarios include the following:

  • Image processing, text processing, and other tasks to process a large number of files;

  • Use a trained model to do batch inference on a large amount of input data;

Here are some more documents about ParallelRunStep:

How to write a Parallel Component

Basically, there are three steps to write a Parallel Component and use it for batch inference:

  1. Write a Parallel Component yaml spec which defines the input/output interface, code reference and environment of your component.

  2. Write a Entry Script that would be invoked by Parallel Component (ParallelRunStep) runtime.

  3. Authoring a pipeline with Component SDK to submit jobs to AML.

Samples

Follow how to access instructions if you meet 404 error when accessing the samples.

Limitation

Compared to ParallelRunStep, currently ParallelComponent has following known limitations:

  • Input dataset type supports both FileDataset and TabularDataset. But when consuming TabularDataset, the input mode need to be set to direct. Please check this sample for more details.

  • Output action only supports summary_only. append_row is not supported.

    Notice: with summary_only output action, run() method of entry script should return a list of elements to indicate the count of items that successfully processed in this mini batch.

  • On Linux, command line argument “–output” cannot be used to get a correct output directory in entry script. For more details, please see here.

Other behaviors and the limitations are the same as ParallelRunStep in AzureML

How to write Parallel Component yaml spec

Please refer to ParallelComponent spec doc.

Please refer to ParallelComponent Schema.

Example yaml:

$schema: https://componentsdk.azureedge.net/jsonschema/ParallelComponent.json
name: microsoft.com.azureml.samples.parallel_score_image
version: 0.0.1
display_name: Parallel Score Image Classification Model
type: ParallelComponent
description: Score images with trained image classification model.

inputs:
  trained_model:
    type: path
    description: Trained image classification model.
    optional: false
  images_to_score:
    type: path
    description: Images to score.
    optional: false
outputs:
  scored_dataset:
    type: path
    description: Scored dataframes stored in parquet files.

environment:
  docker:
    image: mcr.microsoft.com/azureml/openmpi3.1.2-cuda10.2-cudnn8-ubuntu18.04
  conda:
    conda_dependencies:
      name: project_environment
      channels:
      - defaults
      dependencies:
      - psutil
      - python=3.6.8
      - pip:
        - torch
        - torchvision
        # The following dependencies are used for parallel run driver.
        - azureml-defaults
        - azureml-dataprep>=1.6
  os: Linux

parallel:
  input_data: inputs.images_to_score
  output_data: outputs.scored_dataset
  entry: score_parallel.py
  args: >-
    --model {inputs.trained_model} --output {outputs.scored_dataset}

See more component spec examples.

How to write an Entry Script

The entry script of Parallel Component should contain two functions:

  • init(): this function should be used for any costly or common preparation for subsequent inferences, e.g., deserializing and loading the model into a global object.

  • run(mini_batch): The method to be parallelized. Each invocation will have one minibatch.

    • mini_batch: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in mini batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.

    • return value: run() method should return a an array. Each returned output element in returned array indicates one successful inference of input element in the input mini-batch. The contents of the elements are ignored.

def init():
    parser = argparse.ArgumentParser()
    parser.add_argument("--output", default='output')
    parser.add_argument("--model", default='model')

    global args
    # Parallel Component would add some predefined arguments in commandline
    # that would be consumed by Parallel Component runtime. 
    # Please always use "parse_known_args" to avoid exceptions.
    args, _ = parser.parse_known_args()
    os.makedirs(args.output, exist_ok=True)

    global model
    model = load_my_model(args.model)


def run(mini_batch):
    # Input mini_batch is a list of files
    print('Input batch size:', len(mini_batch))

    results = []
    for input_file in mini_batch:
        input_data = load_my_data(input_file)

        # Use the "model" object that initialized in init() method
        pred_output = model.predict(input_data)
        write_my_pred_output(args.output, pred_output)

        # "results" only indicates the success of processed items.
        # The content would be ignored.
        results.append(True)

    print("Batch done")
    return results

See more entry script examples.

Tips:

  • There is a known issue of argument “–output” when running on Linux. Please check [FAQ](#How-to-get output-directory-correctly-when-running-on-Linux) section for more details.

  • There are some predefined arguments that would be used by Parallel Component runtime. Please check FAQ to get predefined arguments list.

Advanced features

Mini batch level failure control

Currently, Parallel Component provides an error_threshold parameter to have the failure control on items level.

  • For Tabular dataset, items are rows of input Pandas dataframe.

  • For File dataset, items are files of input file list.

If a mini batch is failed or timeout, items of this mini batch would be counted. Once the total failed item count exceeds the error_threshold, job would be marked as failed.

However, in some edge cases (e.g. process crashed or dataset expired) items of a mini batch cannot be extracted successfully, the error_threshold parameter won’t work as expected anymore.

To mitigate this issue, we introduced two new parameters –allowed_failed_count and –allowed_failed_percent to allow users to have control the failures on mini batch level. Users can simply add them as arguments to command line to enable this feature.

Argument Description
--error_threshold The number of allowed failed items that processed in mini batches.
--allowed_failed_count The number of allowed failed mini batches.
--allowed_failed_percent The percent of allowed failed mini batches.

Smart download on Windows

Smart download is a feature in Parallel Component that designed for handling large volume data on Windows. Currently, the only dataset consumption way on Windows is download, which means all input data would be downloaded to local disk. Downloading all data to local disk is unscalable and low efficiency.

So, Parallel Component provides a smart download mode to download data on mini batch level. More details of smart download feature can be found here.

The way to enable it is specifying the input mode to direct in pipeline. Please see this sample for more details.

Use partitioned datasets as inputs

Currently, Parallel Component generates tasks by mini_batch_size field in runsettings. If mini_batch_size is 5, there would be 5 files in each mini batch task.

When using a partitioned dataset as input, Parallel Component would generate tasks by partition info of the input dataset. For example, if a dataset is partitioned by “country” and “state”, users can use “country” or “country + state” to split the input dataset for each task.

country1/state1/file1.txt
country1/state1/file2.txt
country1/state2/file1.txt
country1/state2/file2.txt
country1/state3/file1.txt
country1/state4/file1.txt
country2/state1/file1.txt
country2/state2/file1.txt
country2/state3/file1.txt
country2/state3/file2.txt

Here is an example that shows how to use parallel component with partitioned dataset as inputs.

FAQ

Predefined arguments in Parallel Component

There are some predefined arguments in command line that for passing run level settings to Parallel Component runtime.

Command Argument Description
--client_sdk_version Specifies the version of AzureML SDK. Currently, it's a hard code string in Parallel Component.
--scoring_module_name Specifies the user script which will be run in parallel on multiple nodes.
--output_action Current Parallel component only support “summary-only”.
--create_snapshot_at_runtime Specifies if creates a snapshot at runtime, default is True.
--input_fds Name references of input datasets.
--process_count_per_node Number of processes executed on each node.
--error_threshold The number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing.
--mini_batch_size For FileDataset input this field is the number of files a user script can process in one run() call.
For TabularDataset input this field is the approximate size of data the user script can process in one run() call.
--logging_level A string of the logging level name, which is defined in 'logging'.
--run_invocation_timeout Timeout in seconds for each invocation of the run() method.
--run_max_try The number of maximum tries for a failed or timeout mini batch.

How to get output directory correctly when running on Linux

Outputs of all component types have been enabled with “output as dataset” feature for better connectivity and productivity gains. However, Parallel Component is running in a legacy working mode on Linux, which cannot resolve the output path correctly on each node.

Consequently, the value that parsed from command line argument “–output” is only valid on one node. Output files on other nodes wouldn’t be uploaded to the final storage.

Here is a short term work around for this issue. Basically, customers need to change their entry script code to get the output folder from “EntryScript().output_dir” or environment variable “AZUREML_BI_OUTPUT_PATH”.

def run(input_files):
    # Option 1: get output_dir from EntryScript().output_dir
    from azureml_user.parallel_run import EntryScript
    output_dir = EntryScript().output_dir
    print('output_dir from EntryScript: {}'.format(output_dir))

    # Option 2: get output_dir from AZUREML_BI_OUTPUT_PATH
    output_dir = os.getenv("AZUREML_BI_OUTPUT_PATH")
    print('output_dir from AZUREML_BI_OUTPUT_PATH: {}'.format(output_dir))

In long term, Parallel Component on Linux would upgrade to the new working mode (Parallel Task mode) which is same as on Windows. The output directory limitation would be resolved as well.

Appendix

In legacy module concept, this maps to parallel jobType Module.