When to use the universal_transfer_operator operator

The universal_transfer_operator operator allows data transfers between any supported source Datasets and destination Datasets. It offers a consistent agnostic interface, simplifying the users’ experience, so they do not need to use specific providers or operators for transfers.

This ensures a consistent set of Data Providers that can read from and write to Datasets. The Universal Transfer Operator can use the respective Data Providers to transfer between as a source and a destination. It also takes advantage of any existing fast and direct high-speed endpoints, such as Snowflake’s built-in COPY INTO command to load S3 files efficiently into the Snowflake.

Universal transfer operator also supports the transfers using third-party platforms like Fivetran.

../_images/approach.png

There are three modes to transfer data using of the universal_transfer_operator.

  1. Non-native transfer

  2. Improving bottlenecks by using native transfer

  3. Transfer using third-party tool

More details on how transfer works can be found at How Universal Transfer Operator Works.

Case 1: Transfer using non-native approach

Following is an example of non-native transfers between Google cloud storage and Sqlite:

    transfer_non_native_gs_to_sqlite = UniversalTransferOperator(
        task_id="transfer_non_native_gs_to_sqlite",
        source_dataset=File(
            path=f"{gcs_bucket}/example_uto/csv_files/", conn_id="google_cloud_default", filetype=FileType.CSV
        ),
        destination_dataset=Table(name="uto_gs_to_sqlite_table", conn_id="sqlite_default"),
    )

Case 2: Transfer using native approach

Case 3: Transfer using third-party platform

Here is an example of how to use Fivetran for transfers:

    transfer_fivetran_with_connector_id = UniversalTransferOperator(
        task_id="transfer_fivetran_with_connector_id",
        source_dataset=File(path=f"{s3_bucket}/uto/", conn_id="aws_default"),
        destination_dataset=Table(name="fivetran_test", conn_id="snowflake_default"),
        transfer_mode=TransferMode.THIRDPARTY,
        transfer_params=FiveTranOptions(conn_id="fivetran_default", connector_id="filing_muppet"),
    )

Cross database transfers

Universal transfer operators can be used to transfer data between databases. For examples:

    transfer_non_native_bigquery_to_snowflake = UniversalTransferOperator(
        task_id="transfer_non_native_bigquery_to_snowflake",
        source_dataset=Table(
            name="uto_s3_to_bigquery_table",
            conn_id="google_cloud_default",
            metadata=Metadata(schema="astro"),
        ),
        destination_dataset=Table(
            name="uto_bigquery_to_snowflake_table",
            conn_id="snowflake_conn",
        ),
    )

Comparison with traditional transfer Operator

  1. File to File transfers

    Following example transfers data from S3 to GCS using Universal transfer Operator:

        uto_transfer_non_native_s3_to_gs = UniversalTransferOperator(
            task_id="uto_transfer_non_native_s3_to_gs",
            source_dataset=File(path=f"{s3_bucket}/uto/csv_files/", conn_id="aws_default"),
            destination_dataset=File(
                path=f"{gcs_bucket}/uto/csv_files/",
                conn_id="google_cloud_default",
            ),
        )
    

    Following example transfers data from S3 to GCS using traditional S3ToGCSOperator:

        traditional_s3_to_gcs_transfer = S3ToGCSOperator(
            task_id="traditional_s3_to_gcs_transfer",
            bucket="astro-sdk-test",
            prefix="uto/csv_files/",
            aws_conn_id="aws_default",
            gcp_conn_id="google_cloud_default",
            dest_gcs=f"{gcs_bucket}/uto/csv_files/",
            replace=False,
        )
    
  2. File to Table transfers

    Following example transfers data from S3 to Snowflake using Universal transfer Operator:

        uto_transfer_non_native_s3_to_snowflake = UniversalTransferOperator(
            task_id="uto_transfer_non_native_s3_to_snowflake",
            source_dataset=File(
                path="s3://astro-sdk-test/uto/csv_files/homes2.csv", conn_id="aws_default", filetype=FileType.CSV
            ),
            destination_dataset=Table(name="uto_s3_table_to_snowflake", conn_id="snowflake_conn"),
        )
    

    Following example transfers data from S3 to Snowflake using traditional S3ToSnowflakeOperator. Table and stage needs to be created before the transfer. This is not handled by S3ToSnowflakeOperator.

        snowflake_create_table = SnowflakeOperator(
            task_id="snowflake_create_table",
            sql=create_table,
            params={"table_name": "s3_to_snowflake_table"},
            snowflake_conn_id="snowflake_conn",
        )
    
        snowflake_create_stage = SnowflakeOperator(
            task_id="snowflake_create_stage", sql=create_stage, snowflake_conn_id="snowflake_conn"
        )
    
        traditional_copy_from_s3_to_snowflake = S3ToSnowflakeOperator(
            task_id="traditional_copy_from_s3_to_snowflake",
            snowflake_conn_id="snowflake_conn",
            s3_keys="s3://astro-sdk-test/uto/csv_files/homes2.csv",
            table="s3_to_snowflake_table",
            stage="WORKSPACE_STAGE_ONE",
            file_format="(type = 'CSV',field_delimiter = ';')",
        )