When to use the universal_transfer_operator
operator
The universal_transfer_operator operator
allows data transfers between any supported source Datasets and destination Datasets. It offers a consistent agnostic interface, simplifying the users’ experience, so they do not need to use specific providers or operators for transfers.
This ensures a consistent set of Data Providers
that can read from and write to Datasets. The Universal Transfer
Operator can use the respective Data Providers
to transfer between as a source and a destination. It also takes advantage of any existing fast and
direct high-speed endpoints, such as Snowflake’s built-in COPY INTO
command to load S3 files efficiently into the Snowflake.
Universal transfer operator also supports the transfers using third-party platforms like Fivetran.
There are three modes to transfer data using of the universal_transfer_operator
.
More details on how transfer works can be found at How Universal Transfer Operator Works.
- Case 1: Transfer using non-native approach
Following is an example of non-native transfers between Google cloud storage and Sqlite:
transfer_non_native_gs_to_sqlite = UniversalTransferOperator( task_id="transfer_non_native_gs_to_sqlite", source_dataset=File( path=f"{gcs_bucket}/example_uto/csv_files/", conn_id="google_cloud_default", filetype=FileType.CSV ), destination_dataset=Table(name="uto_gs_to_sqlite_table", conn_id="sqlite_default"), )
Case 2: Transfer using native approach
- Case 3: Transfer using third-party platform
Here is an example of how to use Fivetran for transfers:
transfer_fivetran_with_connector_id = UniversalTransferOperator( task_id="transfer_fivetran_with_connector_id", source_dataset=File(path=f"{s3_bucket}/uto/", conn_id="aws_default"), destination_dataset=Table(name="fivetran_test", conn_id="snowflake_default"), transfer_mode=TransferMode.THIRDPARTY, transfer_params=FiveTranOptions(conn_id="fivetran_default", connector_id="filing_muppet"), )
Cross database transfers
Universal transfer operators can be used to transfer data between databases. For examples:
transfer_non_native_bigquery_to_snowflake = UniversalTransferOperator(
task_id="transfer_non_native_bigquery_to_snowflake",
source_dataset=Table(
name="uto_s3_to_bigquery_table",
conn_id="google_cloud_default",
metadata=Metadata(schema="astro"),
),
destination_dataset=Table(
name="uto_bigquery_to_snowflake_table",
conn_id="snowflake_conn",
),
)
Comparison with traditional transfer Operator
- File to File transfers
Following example transfers data from S3 to GCS using Universal transfer Operator:
uto_transfer_non_native_s3_to_gs = UniversalTransferOperator( task_id="uto_transfer_non_native_s3_to_gs", source_dataset=File(path=f"{s3_bucket}/uto/csv_files/", conn_id="aws_default"), destination_dataset=File( path=f"{gcs_bucket}/uto/csv_files/", conn_id="google_cloud_default", ), )
Following example transfers data from S3 to GCS using traditional S3ToGCSOperator:
traditional_s3_to_gcs_transfer = S3ToGCSOperator( task_id="traditional_s3_to_gcs_transfer", bucket="astro-sdk-test", prefix="uto/csv_files/", aws_conn_id="aws_default", gcp_conn_id="google_cloud_default", dest_gcs=f"{gcs_bucket}/uto/csv_files/", replace=False, )
- File to Table transfers
Following example transfers data from S3 to Snowflake using Universal transfer Operator:
uto_transfer_non_native_s3_to_snowflake = UniversalTransferOperator( task_id="uto_transfer_non_native_s3_to_snowflake", source_dataset=File( path="s3://astro-sdk-test/uto/csv_files/homes2.csv", conn_id="aws_default", filetype=FileType.CSV ), destination_dataset=Table(name="uto_s3_table_to_snowflake", conn_id="snowflake_conn"), )
Following example transfers data from S3 to Snowflake using traditional S3ToSnowflakeOperator. Table and stage needs to be created before the transfer. This is not handled by
S3ToSnowflakeOperator
.snowflake_create_table = SnowflakeOperator( task_id="snowflake_create_table", sql=create_table, params={"table_name": "s3_to_snowflake_table"}, snowflake_conn_id="snowflake_conn", ) snowflake_create_stage = SnowflakeOperator( task_id="snowflake_create_stage", sql=create_stage, snowflake_conn_id="snowflake_conn" ) traditional_copy_from_s3_to_snowflake = S3ToSnowflakeOperator( task_id="traditional_copy_from_s3_to_snowflake", snowflake_conn_id="snowflake_conn", s3_keys="s3://astro-sdk-test/uto/csv_files/homes2.csv", table="s3_to_snowflake_table", stage="WORKSPACE_STAGE_ONE", file_format="(type = 'CSV',field_delimiter = ';')", )