Spark Component

Overview

A Spark Component is a Component that executes a spark job in AML. It will support attached synapse spark and hobo spark.

Please NOTE SDK 1.5 support for spark is in Private Preview.

We strongly encourage users to directly use v2 spark component which is in Public Preview.

Learn more: v2 spark sample.

Scenarios

Use Apache Spark to train your model or analysis your data, multiple popular frameworks are supported:

  • PySpark

Limitation

It doesn’t support HDI and ADB right now.

How to write Spark Component yaml spec

Please refer to SparkComponent spec doc.

Example yaml:

$schema: https://componentsdk.azureedge.net/jsonschema/SparkComponent.json
name: spark_test
type: spark
version: 1
display_name: Aml Spark dataset test module
description: Aml Spark dataset test module

# Please use hdfs mode for input and output data for type path
inputs:
  file_input1:
    type: path
    datastore_mode: hdfs
    description: The data to be read.
  file_input2:
    type: path
    datastore_mode: hdfs
    description: The data to be read.

outputs:
  output:
    type: path
    datastore_mode: hdfs

entry:
  file: entry.py # file path of the entry file relative to the code root folder

pyFiles: utils.zip
jars: scalaproj.jar

args: >-
  --file_input1 ${{inputs.file_input1}}
  --file_input2 ${{inputs.file_input2}}
  --output ${{outputs.output}}

environment:
  # note spark has different syntax compare to other component types in SDK 1.5
  conda_file: conda.yaml

conda.yaml

name: project_environment
channels:
  - anaconda
  - conda-forge
dependencies:
  - python=3.8.12
  - pip:
      - azureml-defaults
      - pillow
  - numpy==1.18.0

How to use the Spark Component to submit job


from azure.ml.component import Component

# load component
spark_comp_func = Component.from_yaml(ws, yaml_file='spec.yaml')

from azureml.core import Dataset

file_data1 = Dataset.get_by_name(ws, name="file_data1")
file_data2 = Dataset.get_by_name(ws, name="file_data2")
my_datastore = ws.get_default_datastore()

from azure.ml.component import dsl, Pipeline

@dsl.pipeline(
    name='Submit_Spark_Job_from_sdk',
    description='submit a spark job using component sdk',
    default_datastore="myblobdatastore")
def test_pipeline() -> Pipeline:
    spark_comp = spark_comp_func(
        file_input1=file_data1,
        file_input2=file_data2
    )

    spark_comp.outputs.output.configure(
    datastore=my_datastore,
    path_on_datastore="azureml/component/outputs/result_dataset/")
    
    spark_comp.runsettings.target = "SynapseCompute"
    spark_comp.runsettings.spark.configure(
        driver_cores=2,
        driver_memory='1g',
        executor_cores=1,
        executor_memory='1g',
        number_executors=1)

    # for office scenario, please do specify the managed identity type
    spark_comp.runsettings.spark_identity.configure(
        type='managed'
    )
    spark_comp.runsettings.spark.configure(
        conf={"spark.driver.supervise":False})

test_pipeline = test_pipeline()
test_pipeline.validate()

run = test_pipeline.submit(regenerate_outputs=True)
run.wait_for_completion()

Differences with other component types

Spark component is following dpv2 yaml schema, so slightly different from other SDK 1.5 component types.

  • Spark component doesn’t support type float and enum, you need update float to number and enum to string.

  • For “args” field in spark component, we need use double braces with “$” like ${{inputs.input1}}. But for “command” field in other SDK 1.5 component types, we need use single braces like {inputs.input1} in SDK 1.5.

  • For optional input, we need use double brackets with “$” like $[[${{inputs.optional_input}}]] in “args” field for spark component, but use single brackets like [{inputs.optional_input}] in “command” field for other SDK 1.5 component types.

  • For “environment” field in spark component, we need use “environment:conda_file” when using conda environment. But for other SDK 1.5 component types, we need use “environment:conda:conda_dependencies_file” for “environment” field.

Example yaml:

$schema: https://componentsdk.azureedge.net/jsonschema/SparkComponent.json
name: spark_test
type: spark
version: 1
display_name: Aml Spark dataset test module
description: Aml Spark dataset test module

# Please use hdfs mode for input and output data for type path
inputs:
  file_input1:
    type: path
    datastore_mode: hdfs
    description: The data to be read.
  number_parameter:
    type: number
    description: number parameter.
    optional: true
    default: 0
  string_parameter:
    type: string
    description: string parameter.
    optional: true
    default: "string"
  optional_param:
    type: string
    optional: true

outputs:
  output:
    type: path
    datastore_mode: hdfs

entry:
  file: entry.py # file path of the entry file relative to the code root folder

pyFiles: utils.zip
jars: scalaproj.jar

args: >-
  --file_input1 ${{inputs.file_input1}}
  --number_parameter ${{inputs.number_parameter}}
  --string_parameter ${{inputs.string_parameter}}
  $[[--optional_param ${{inputs.optional_param}}]]
  --output ${{outputs.output}}

environment:
  # note spark has different syntax compare to other component types in SDK 1.5
  conda_file: conda.yaml

References