PipelineParameter

What is PipelineParameter?

PipelineParameter allow user dynamically running pipeline experiments with different inputs and parameters without any code change.

Supported type

Type Description
int Python built-in numeric type
str Python built-in text sequence type
bool Python built-in truth value testing type
float Python built-in numeric type
Dataset Azureml dataset type like FileDataset

Use PipelineParameter

PipelineParameter as a component input/parameter

In the sample pipeline function below, pipeline parameters are:

  • input_data - the original dataset

  • replacement_value - a string value used to replace missing values

# prepare dataset and component function

# create a FileDataset
titanic_dataset = Dataset.File.from_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
# get 'Clean Missing Data' component function
clean_missing_data_func = Component.load(workspace, name='azureml://Clean Missing Data')

# define a pipeline function with parameter.
@dsl.pipeline(name='sample_pipeline_with_pipeline_parameter',default_compute_target='aml-compute')
def sample_pipeline(input_data, replacement_value) -> Pipeline:
    # clean missing data of our input dataset
    clean_missing_data = clean_missing_data_func(
        dataset=input_data,
        columns_to_be_cleaned='["AllColumns"]',
        minimum_missing_value_ratio=0.0,
        maximum_missing_value_ratio=1.0,
        cleaning_mode='Custom substitution value',
        replacement_value=replacement_value,
        generate_missing_value_indicator_column=False)
    return clean_missing_data.outputs

# create a pipeline instance with input=titanic_dataset and replacement_value='0'
pipeline = sample_pipeline(input_data=titanic_dataset, replacement_value='0')
# submit pipeline run
pipeline.submit()

*Parameters defined by user function decorated with dsl.pipeline() will be transformed into PipelineParameter type. If there are nested pipelines decorators, only the parameters of the outermost user function will be transformed into PipelineParameter.

Open the run link returned by submit() function, you will see the pipeline parameters replacement_value with ‘0’ and input_data with a link.

image-run-with-default-pipeline-parameter

Click the Clean Missing Data component, you can see the input box of parameter replacement_value followed by a question mark, which indicate a PipelineParameter value has been applied to the component’s parameter.

image-pipeline-parameter-on-component

PipelineParameter as substrings of a component parameter

PipelineParameter can be used as substrings of a string parameter.

Suggest we have a pipeline with PipelineParameter param_name param_value, we’d like to format them as a json string then pass it to the component, the pipeline can be defined as follow.

@dsl.pipeline(name="pipeline")
def pipeline(param_name, param_value) -> Pipeline:
    print_string_component_func(
        string_parameter='{{"name": "{0}", "value": "{1}"}}'.format(param_name, param_value))

print_string_pipeline = pipeline('my_param', '1/25/2021')
print_string_pipeline.submit()

Submit the pipeline, we can see our two pipeline parameters param_name=my_param, param_value=1/25/2021.

image-pipeline-parameter-as-substring

And the string_parameter of component has been formatted just like what we want.

image-pipeline-parameter-as-substring-on-component

Submit with PipelineParameter

Submit pipeline with PipelineParameter via Component SDK

Submit the pipeline created above with replacement_value values ‘100’:

# define pipeline parameters with new replacement value
pipeline_parameters = {'replacement_value': '100'}

# submit run with pipeline parameters
pipeline.submit(pipeline_parameters=pipeline_parameters)

The replacement_value display ‘100’ now and the input_data is same as before.

image-run-with-different-pipeline-parameter

Resubmit run with PipelineParameter

Click the Resubmit button on the pipeline run page, you can set up parameters if the run has PipelineParameter.

image-resubmit-run-with-pipeline-parameter

Publish run with PipelineParameter

PipelineParameter also have effect on pipeline endpoint, and you can set them up if submit run with endpoint.

Click Publish button on the pipeline run page, pipeline parameters and default values will be display if there exists.

image-publish-run-with-pipeline-parameter

PipelineEndpoint SDK support will be public available later.

Use Parameter Group

What is ParameterGroup?

ParameterGroup is used to group a set of component parameters and make component configuration easier.

Define ParameterGroup using dsl.parameter_group decorator

The decorator on class will:

  • Infer and save dsl.type from annotation for data in class

  • Generate __init__ and __repr__ function

Sample

# A sample: define parameter group
from azure.ml.component import dsl

@dsl.parameter_group
class ComputeSettings:
    cpu_cluster: str
    process_count_per_node: int = 1
    node_count: int = 1

help(ComputeSettings.__init__)

Help on function __init__ in module azure.ml.component.dsl.types:

__init__(self, cpu_cluster:str, process_count_per_node:int=1, node_count:int=1) -> None

Restrictions:

  • Group cannot be empty

  • Parameter name cannot start with “_”

  • All group member name must be public and MUST have annotation

  • Customer class definition should follow non-default parameter follows default parameter rule

Consume ParameterGroup in PipelineComponent

Firstly, create the training_pipeline above as pipeline component.

# create the pipeline component
training_pipeline_component_func = Component.create(training_pipeline, version='0.0.1')
# see the function help doc
help(training_pipeline_component_func)

The pipeline component function definition in help output:

[component] training_pipeline(*, input_data:'path'=None, test_data:'path'=None, learning_rate:'float'=None, compute:'group'={'cpu_cluster': 'aml-compute', 'process_count_per_node': 1, 'node_count': 1})

Consume option 1: create an empty component, set value via inputs

component = training_pipeline_component_func()
component.inputs.compute.cpu_cluster = cluster_name
component.inputs.compute.node_count = 2
component.inputs.compute.process_count_per_node = 2

The component.inputs.compute value:

{'cpu_cluster': 'aml-compute', 'process_count_per_node': 2, 'node_count': 2}

Consume option 2: create component with parameter group value

component = training_pipeline_component_func(compute=ComputeSettings())

The component.inputs.compute value:

ComputeSettings(cpu_cluster='aml-compute', process_count_per_node=2, node_count=1)

Note: If you are using a component with group parameters, but without the original parameter group class, you can define a new parameter group with all fields based on the component function information, then create component with value of the new parameter group class.

# suppose there is no parameter group class
# define a new one by the component function info
@dsl.parameter_group
class MyComputeSettings:
    cpu_cluster: str = cluster_name
    process_count_per_node: int = 2
    node_count: int = 1

component = training_pipeline_component_func(compute=MyComputeSettings())
component.inputs.compute

The component.inputs.compute value:

MyComputeSettings(cpu_cluster='aml-compute', process_count_per_node=2, node_count=1)

Dynamic PipelineParameter

What is Dynamic PipelineParameter?

Dynamic PipelineParameter allow user specify **kwargs in dsl.pipeline function, with its value passed in when user calls the pipeline function, a static pipeline instance will be created with determined parameters.

Supported types

Dynamic PipelineParameter support the same types as PipelineParameter.

Note: Dynamic PipelineParameter exists only on dsl.pipeline function, which means Component.create does not accept function with Dynamic PipelineParameter directly.

Use Dynamic PipelineParameter

Suppose we will use two_in_one_out_component_func in a pipeline and would like to pop up its component parameters as pipeline parameters, there is an easy way to do this with the help of Dynamic PipelineParameter. Below is the example:

from azure.ml.component import dsl, Pipeline

# Define pipeline func with **kwargs
@dsl.pipeline(default_compute_target='aml-compute')
def merge_dataset_pipeline(**kwargs) -> Pipeline:
    # Pass the kwargs to component func directly
    component = two_in_one_out_component_func(**kwargs)
    return component.outputs

# Create pipeline with component parameters.
# This pipeline will have 2 parameters 'input_dir1', 'input_dir2' and each of them is linked to the corresponding component parameter.
pipeline = merge_dataset_pipeline(input_dir1=dataset1, input_dir2=dataset2)
# Submit pipeline
pipeline.submit()

The graph of pipeline:

dynamic_pipeline_parameter_usage

Non-PipelineParameter

What is Non-PipelineParameter and what’s the difference with PipelineParameter?

Non-PipelineParameter is used for building pipelines dynamically. Parameter defined as Non-PipelineParameter must be a parameter of dsl.pipeline function, which controls how the pipeline is created, but will not exist as PipelineParameter in final submitted pipeline graph.

Supported types

Non-PipelineParameter does not have the supported type limitation of PipelineParameter, it supports any type in Python.

Note: Azure ML currently only support static graph, input port could not be placed in Non-PipelineParameters which will make it a dynamic graph.

Use Non-PipelineParameter

The usage of Non-PipelineParameter is similar with PipelineParameter. It defines in dsl function’s signature, and declared in dsl.pipeline with non_pipeline_parameters. Below is an example:

from azure.ml.component import dsl, Pipeline

# `input_data_dict` is Non-PipelineParameter, `process_method` is PipelineParameter
@dsl.pipeline(non_pipeline_parameters=["input_data_dict"], default_compute_target="aml-compute")
def data_processing_pipeline_with_multi_sources(input_data_dict: dict[str, any], process_method) -> Pipeline:
    pipeline_outputs = {}
    for data_type, data in input_data_dict.items():
        component = data_prepare_component_func(
            input_data_type=data_type,
            input_data=data,
            data_process_method=process_method
        )
        # Use "comment" to tag the node
        component.comment = data_type
        pipeline_outputs.update(
            {data_type: component.outputs.output_data}
        )
    return pipeline_outputs

# Create pipeline
# This pipeline will have 2 component nodes, the node count is depends on `input_data_dict`
pipeline = data_processing_pipeline_with_multi_sources(
    input_data_dict={
        "azure_blob": blob_data,
        "sql_db": sql_db_data
    },
    process_method="copy"
)

# Submit with a different `process_method`
pipeline.submit(
    experiment_name="data_processing",
    pipeline_parameters={
        "process_method": "delete"
    }
)

Above pipeline will look like:

data_processing_pipeline_with_multi_sources_non_pipeline_parameter

See the sample notebook here.

More scenarios with Non-PipelineParameter

Pipeline created from dynamic workspace

from azure.ml.component import dsl, Component, Pipeline

@dsl.pipeline(non_pipeline_parameters=["workspace"])
def data_processing_pipeline(workspace, compute_name) -> Pipeline:
    # Create components in workspace
    preprocess_func = Component.from_yaml(workspace=workspace, yaml_file=preprocess_yaml)
    data_process_func = Component.from_yaml(workspace=workspace, yaml_file=data_process_yaml)
    # Consume components
    preprocess_component = preprocess_func()
    preprocess_component.runsettings.target = compute_name
    data_process_component = data_process_func(input_data=preprocess_component.outputs.dataset)
    data_process_component.runsettings.target = compute_name
    return data_process_component.outputs

# Use above pipeline in different workspaces
pipeline1 = data_processing_pipeline(workspace=my_workspace, compute_name=compute_in_my_workspace)
pipeline2 = data_processing_pipeline(workspace=my_another_workspace, compute_name=compute_in_my_another_workspace)

Pipeline with dynamic component

from azure.ml.component import dsl, Pipeline

@dsl.pipeline(non_pipeline_parameters=["training_component_func"])
def training_pipeline(input_data, training_component_func) -> Pipeline:
    data_process_component = data_process_func(input_data=input_data)
    training_component = training_component_func(dataset=data_process_component.outputs.dataset)
    score_component = score_func(model=training_component.outputs.model)
    evaluate_component = eval_func(dataset=score_component.outputs.scored_dataset)
    return evaluate_component.outputs

# Use above pipeline with different training_component_func
pipeline1 = training_pipeline(
    input_data=dataset,
    training_component_func=linear_regression_component_func
)
pipeline2 = training_pipeline(
    input_data=dataset,
    training_component_func=two_class_boosted_decision_tree_component_func
)

Pipeline with recursive sub pipelines

@dsl.pipeline(non_pipeline_parameters=["depth"], default_compute_target="aml-compute")
def recursive_pipeline(depth) -> Pipeline:
    if depth == 0:
        component = data_process_component_func(input_data=dataset)
        return component.outputs
    else:
        depth -= 1
        return recursive_pipeline(depth).outputs

# Recursive pipeline with 5 depth
pipeline = recursive_pipeline(5)

Above pipeline will look like:

recursive_pipeline_with_non_pipeline_parameter