Data Transfer Component
Data Transfer component enables user to move data between different types of storage supported in Azure Machine Learning. For example, your data is currently in an ADLS account and you may want to make it available in a Blob storage.
This component is implemented by provisioning ADF as part of AML workspace and granting ADF managed identity access to storages where data will be copied to/from.
The Data Transfer component currently supports following storage types:
| Data store | Supported as a source | Supported as a sink |
|---|---|---|
| Azure Blob Storage | Yes | Yes |
| Azure Data Lake Storage Gen 1 | Yes | Yes |
| Azure Data Lake Storage Gen 2 | Yes | Yes |
Before using cloud storage, you need to register a datastore in your Azure Machine Learning workspace first. For more information, see How to Access Data.
If your source data changes, you can refresh the dataset and add new data by rerunning Data Transfer component.
Prerequisite
Note:
Data Transfer component can only be run on ADF compute because the component implementation is based on ADF managed identity to meet compliance requirement, and ensure safe and efficient data transference.
Attach Azure Data Factory compute to your workspace. Following code shows how to get or create an ADF compute and attach it to your workspace:
from azureml.core import Workspace
from azureml.core.compute import ComputeTarget, DataFactoryCompute
from azureml.exceptions import ComputeTargetException
def get_or_create_data_factory(workspace, factory_name):
try:
return DataFactoryCompute(workspace, factory_name)
except ComputeTargetException as e:
if 'ComputeTargetNotFound' in e.message:
print('Data factory not found, creating...')
provisioning_config = DataFactoryCompute.provisioning_configuration()
data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
data_factory.wait_for_completion()
return data_factory
else:
raise e
ws = Workspace.from_config()
data_factory_name = 'adftest'
data_factory_compute = get_or_create_data_factory(ws, data_factory_name)
print("Setup Azure Data Factory account complete")
How to write Data Transfer component yaml spec
Note:
Data Transfer component is exposed as custom component firstly for customer early testing and evaluation, and you need to create it from a component spec file. It will become a built-in component of Designer in the future, so you can directly load it from workspace by name.
Data Transfer component only support single input/output port.
Below is an example of Data Transfer component yaml. Usually there is no need for customer to change its content.
Example yaml:
$schema: https://componentsdk.azureedge.net/jsonschema/DataTransferComponent.json
name: microsoft.com.azureml.datatransfer
version: 0.0.1
display_name: Data Transfer
type: DataTransferComponent
description: transfer data between common storage types such as Azure Blob Storage and Azure Data Lake.
tags: {category: Component Tutorial, contact: amldesigner@microsoft.com}
is_deterministic: True
inputs:
source_data:
type: path
description: Source data, configured by source data store and path.
outputs:
destination_data:
type: path
description: Destination data, configured by destination data store and path.
See full example in github samples repo.
Follow how to access instructions if you meet 404 error when accessing the samples.
How to consume Data Transfer component in SDK
Below example is a two steps pipeline, which demos data transfer between blob and adls datastore.
from azure.ml.component import Run, Component, dsl, Pipeline
# 1. load the Data Transfer component
data_transfer_func = Component.from_yaml(ws, 'components/data-transfer/datatransfer.yaml')
# 2. get the source dataset on blob store to be transferred, assume there is already such dataset in workspace
blob_file_dataset = Dataset.get_by_name(ws, name='blob_file_dataset')
# 3. define the pipeline with the attached Azure Data Factory compute as default compute target
@dsl.pipeline(name='data_transfer', default_compute_target=data_factory_name)
def data_transfer_pipeline() -> Pipeline:
# Step1: copy from blob to adls
blob_to_adls = data_transfer_func(source_data=blob_file_dataset)
# configure output path
blob_to_adls.outputs.destination_data.configure(
# specify adls store as target datastore
datastore=adls,
# specify destination data path on datastore
path_on_datastore='/local/temp/datatransfer/{run-id}/outputs/{output-name}'
)
# Step2: copy from adls data store to default blob store
adls_to_blob = data_transfer_func(
# no need to specify the output name here, SDK will pick the only one output of data transfer component
source_data=blob_to_adls
)
# 4. create, validate and submit the pipeline
pipeline = data_transfer_pipeline()
pipeline.validate()
pipeline.submit()
See below link for a full sample:
How to use data transfer component - Demonstrates how to use data transfer component to transfer data from one data location to another.