Spark Component
Overview
A Spark Component is a Component that executes a spark job in AML. It will support attached synapse spark and hobo spark.
Please NOTE SDK 1.5 support for spark is in Private Preview.
We strongly encourage users to directly use v2 spark component which is in Public Preview.
Learn more: v2 spark sample.
Scenarios
Use Apache Spark to train your model or analysis your data, multiple popular frameworks are supported:
PySpark
Limitation
It doesn’t support HDI and ADB right now.
How to write Spark Component yaml spec
Please refer to SparkComponent spec doc.
Example yaml:
$schema: https://componentsdk.azureedge.net/jsonschema/SparkComponent.json
name: spark_test
type: spark
version: 1
display_name: Aml Spark dataset test module
description: Aml Spark dataset test module
# Please use hdfs mode for input and output data for type path
inputs:
file_input1:
type: path
datastore_mode: hdfs
description: The data to be read.
file_input2:
type: path
datastore_mode: hdfs
description: The data to be read.
outputs:
output:
type: path
datastore_mode: hdfs
entry:
file: entry.py # file path of the entry file relative to the code root folder
pyFiles: utils.zip
jars: scalaproj.jar
args: >-
--file_input1 ${{inputs.file_input1}}
--file_input2 ${{inputs.file_input2}}
--output ${{outputs.output}}
environment:
# note spark has different syntax compare to other component types in SDK 1.5
conda_file: conda.yaml
conda.yaml
name: project_environment
channels:
- anaconda
- conda-forge
dependencies:
- python=3.8.12
- pip:
- azureml-defaults
- pillow
- numpy==1.18.0
How to use the Spark Component to submit job
from azure.ml.component import Component
# load component
spark_comp_func = Component.from_yaml(ws, yaml_file='spec.yaml')
from azureml.core import Dataset
file_data1 = Dataset.get_by_name(ws, name="file_data1")
file_data2 = Dataset.get_by_name(ws, name="file_data2")
my_datastore = ws.get_default_datastore()
from azure.ml.component import dsl, Pipeline
@dsl.pipeline(
name='Submit_Spark_Job_from_sdk',
description='submit a spark job using component sdk',
default_datastore="myblobdatastore")
def test_pipeline() -> Pipeline:
spark_comp = spark_comp_func(
file_input1=file_data1,
file_input2=file_data2
)
spark_comp.outputs.output.configure(
datastore=my_datastore,
path_on_datastore="azureml/component/outputs/result_dataset/")
spark_comp.runsettings.target = "SynapseCompute"
spark_comp.runsettings.spark.configure(
driver_cores=2,
driver_memory='1g',
executor_cores=1,
executor_memory='1g',
number_executors=1)
# for office scenario, please do specify the managed identity type
spark_comp.runsettings.spark_identity.configure(
type='managed'
)
spark_comp.runsettings.spark.configure(
conf={"spark.driver.supervise":False})
test_pipeline = test_pipeline()
test_pipeline.validate()
run = test_pipeline.submit(regenerate_outputs=True)
run.wait_for_completion()
Differences with other component types
Spark component is following dpv2 yaml schema, so slightly different from other SDK 1.5 component types.
Spark component doesn’t support type
floatandenum, you need updatefloattonumberandenumtostring.For “args” field in spark component, we need use double braces with “$” like ${{inputs.input1}}. But for “command” field in other SDK 1.5 component types, we need use single braces like {inputs.input1} in SDK 1.5.
For optional input, we need use double brackets with “$” like $[[${{inputs.optional_input}}]] in “args” field for spark component, but use single brackets like [{inputs.optional_input}] in “command” field for other SDK 1.5 component types.
For “environment” field in spark component, we need use “environment:conda_file” when using conda environment. But for other SDK 1.5 component types, we need use “environment:conda:conda_dependencies_file” for “environment” field.
Example yaml:
$schema: https://componentsdk.azureedge.net/jsonschema/SparkComponent.json
name: spark_test
type: spark
version: 1
display_name: Aml Spark dataset test module
description: Aml Spark dataset test module
# Please use hdfs mode for input and output data for type path
inputs:
file_input1:
type: path
datastore_mode: hdfs
description: The data to be read.
number_parameter:
type: number
description: number parameter.
optional: true
default: 0
string_parameter:
type: string
description: string parameter.
optional: true
default: "string"
optional_param:
type: string
optional: true
outputs:
output:
type: path
datastore_mode: hdfs
entry:
file: entry.py # file path of the entry file relative to the code root folder
pyFiles: utils.zip
jars: scalaproj.jar
args: >-
--file_input1 ${{inputs.file_input1}}
--number_parameter ${{inputs.number_parameter}}
--string_parameter ${{inputs.string_parameter}}
$[[--optional_param ${{inputs.optional_param}}]]
--output ${{outputs.output}}
environment:
# note spark has different syntax compare to other component types in SDK 1.5
conda_file: conda.yaml