universal_transfer_operator.data_providers.database.base

Module Contents

Classes

DatabaseDataProvider

DatabaseProviders represent all the DataProviders interactions with Databases.

class universal_transfer_operator.data_providers.database.base.DatabaseDataProvider(dataset, transfer_mode, transfer_params=attr.field(factory=TransferIntegrationOptions, converter=lambda val: ...))

Bases: universal_transfer_operator.data_providers.base.DataProviders[universal_transfer_operator.datasets.table.Table]

DatabaseProviders represent all the DataProviders interactions with Databases.

Parameters:
abstract property sql_type
abstract property hook: airflow.hooks.dbapi.DbApiHook

Return an instance of the database-specific Airflow hook.

Return type:

airflow.hooks.dbapi.DbApiHook

property connection: sqlalchemy.engine.base.Connection

Return a Sqlalchemy connection object for the given database.

Return type:

sqlalchemy.engine.base.Connection

property sqlalchemy_engine: sqlalchemy.engine.base.Engine

Return Sqlalchemy engine.

Return type:

sqlalchemy.engine.base.Engine

property transport_params: dict | None

Get credentials required by smart open to access files

Return type:

dict | None

abstract property openlineage_dataset_namespace: str

Returns the open lineage dataset namespace as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Return type:

str

abstract property openlineage_dataset_name: str

Returns the open lineage dataset name as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Return type:

str

abstract property openlineage_dataset_uri: str

Returns the open lineage dataset uri as per https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

Return type:

str

abstract property default_metadata: universal_transfer_operator.datasets.table.Metadata

Extract the metadata available within the Airflow connection associated with self.dataset.conn_id.

Returns:

a Metadata instance

Return type:

universal_transfer_operator.datasets.table.Metadata

illegal_column_name_chars: list[str] = []
illegal_column_name_chars_replacement: list[str] = []
IGNORE_HANDLER_IN_RUN_RAW_SQL: bool = False
NATIVE_PATHS: dict[Any, Any]
DEFAULT_SCHEMA
run_sql(sql='', parameters=None, handler=None, **kwargs)

Return the results to running a SQL statement.

Whenever possible, this method should be implemented using Airflow Hooks, since this will simplify the integration with Async operators.

Parameters:
  • sql (str | ClauseElement) – Contains SQL query to be run against database

  • parameters (dict | None) – Optional parameters to be used to render the query

  • autocommit – Optional autocommit flag

  • handler (Callable | None) –

Return type:

sqlalchemy.engine.cursor.CursorResult

columns_exist(table, columns)

Check that a list of columns exist in the given table.

Parameters:
Returns:

whether the columns exist in the table or not.

Return type:

bool

get_sqla_table(table)

Return SQLAlchemy table instance

Parameters:

table (universal_transfer_operator.datasets.table.Table) – Astro Table to be converted to SQLAlchemy table instance

Return type:

sqlalchemy.sql.schema.Table

table_exists(table)

Check if a table exists in the database.

Parameters:

table (universal_transfer_operator.datasets.table.Table) – Details of the table we want to check that exists

Return type:

bool

check_if_transfer_supported(source_dataset)

Checks if the transfer is supported from source to destination based on source_dataset.

Parameters:

source_dataset (universal_transfer_operator.datasets.table.Table) – Table present in the source location

Return type:

bool

read()

Convert a Table into a Pandas DataFrame

Return type:

Iterator[pandas.DataFrame]

write(source_ref)

Write the data from local reference location or dataframe to the database dataset or filesystem dataset.

Parameters:

source_ref (DataStream | pd.DataFrame) – Stream of data to be loaded into output table or a pandas dataframe.

Return type:

str

static get_table_qualified_name(table)

Return table qualified name. This is Database-specific. For instance, in Sqlite this is the table name. In Snowflake, however, it is the database, schema and table

Parameters:

table (universal_transfer_operator.datasets.table.Table) – The table we want to retrieve the qualified name for.

Return type:

str

populate_metadata()

Given a table, check if the table has metadata. If the metadata is missing, and the database has metadata, assign it to the table. If the table schema was not defined by the end, retrieve the user-defined schema. This method performs the changes in-place and also returns the table.

Parameters:

table – Table to potentially have their metadata changed

Return table:

Return the modified table

create_table_using_columns(table)

Create a SQL table using the table columns.

Parameters:

table (universal_transfer_operator.datasets.table.Table) – The table to be created.

Return type:

None

is_native_autodetect_schema_available(file)

Check if native auto detection of schema is available.

Parameters:

file (universal_transfer_operator.datasets.file.base.File) – File used to check the file type of to decide whether there is a native auto detection available for it.

Return type:

bool

abstract create_table_using_native_schema_autodetection(table, file)

Create a SQL table, automatically inferring the schema using the given file via native database support.

Parameters:
Return type:

None

create_table_using_schema_autodetection(table, file=None, dataframe=None, columns_names_capitalization='original')

Create a SQL table, automatically inferring the schema using the given file.

Parameters:
  • table (universal_transfer_operator.datasets.table.Table) – The table to be created.

  • file (File | None) – File used to infer the new table columns.

  • dataframe (pd.DataFrame | None) – Dataframe used to infer the new table columns if there is no file

  • columns_names_capitalization (universal_transfer_operator.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

Return type:

None

create_table(table, file=None, dataframe=None, columns_names_capitalization='original', use_native_support=True)

Create a table either using its explicitly defined columns or inferring it’s columns from a given file.

Parameters:
  • table (universal_transfer_operator.datasets.table.Table) – The table to be created

  • file (File | None) – (optional) File used to infer the table columns.

  • dataframe (pd.DataFrame | None) – (optional) Dataframe used to infer the new table columns if there is no file

  • columns_names_capitalization (universal_transfer_operator.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • use_native_support (bool) –

Return type:

None

create_table_from_select_statement(statement, target_table, parameters=None)

Export the result rows of a query statement into another table.

Parameters:
  • statement (str) – SQL query statement

  • target_table (universal_transfer_operator.datasets.table.Table) – Destination table where results will be recorded.

  • parameters (dict | None) – (Optional) parameters to be used to render the SQL query

Return type:

None

drop_table(table)

Delete a SQL table, if it exists.

Parameters:

table (universal_transfer_operator.datasets.table.Table) – The table to be deleted.

Return type:

None

create_schema_and_table_if_needed(table, file, normalize_config=None, columns_names_capitalization='original', if_exists='replace', use_native_support=True)

Checks if the autodetect schema exists for native support else creates the schema and table :param table: Table to create :param file: File path and conn_id for object stores :param normalize_config: pandas json_normalize params config :param columns_names_capitalization: determines whether to convert all columns to lowercase/uppercase :param if_exists: Overwrite file if exists :param use_native_support: Use native support for data transfer if available on the destination

Parameters:
create_schema_and_table_if_needed_from_dataframe(table, dataframe, columns_names_capitalization='original', if_exists='replace', use_native_support=True)

Creates the schema and table from dataframe

Parameters:
  • table (universal_transfer_operator.datasets.table.Table) – Table to create

  • dataframe (pandas.DataFrame) – dataframe object to be used as a source of data

  • columns_names_capitalization (universal_transfer_operator.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase

  • if_exists (universal_transfer_operator.constants.LoadExistStrategy) – Overwrite file if exists

  • use_native_support (bool) – Use native support for data transfer if available on the destination

fetch_all_rows(table, row_limit=-1)

Fetches all rows for a table and returns as a list. This is needed because some databases have different cursors that require different methods to fetch rows

Parameters:
Returns:

a list of rows

Return type:

list

load_file_to_table(input_file, output_table, normalize_config=None, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE, columns_names_capitalization='original', **kwargs)

Load content of multiple files in output_table. Multiple files are sourced from the file path, which can also be path pattern.

Parameters:
  • input_file (universal_transfer_operator.datasets.file.base.File) – File path and conn_id for object stores

  • output_table (universal_transfer_operator.datasets.table.Table) – Table to create

  • if_exists (universal_transfer_operator.constants.LoadExistStrategy) – Overwrite file if exists

  • chunk_size (int) – Specify the number of records in each batch to be written at a time

  • use_native_support – Use native support for data transfer if available on the destination

  • normalize_config (dict | None) – pandas json_normalize params config

  • native_support_kwargs – kwargs to be used by method involved in native support flow

  • columns_names_capitalization (universal_transfer_operator.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

  • enable_native_fallback – Use enable_native_fallback=True to fall back to default transfer

Return type:

str

load_dataframe_to_table(input_dataframe, output_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE, columns_names_capitalization='original')

Load content of dataframe in output_table.

Parameters:
  • input_dataframe (pandas.DataFrame) – dataframe

  • output_table (universal_transfer_operator.datasets.table.Table) – Table to create

  • if_exists (universal_transfer_operator.constants.LoadExistStrategy) – Overwrite file if exists

  • chunk_size (int) – Specify the number of records in each batch to be written at a time

  • normalize_config – pandas json_normalize params config

  • columns_names_capitalization (universal_transfer_operator.constants.ColumnCapitalization) – determines whether to convert all columns to lowercase/uppercase in the resulting dataframe

Return type:

str

load_file_to_table_using_pandas(input_file, output_table, normalize_config=None, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)
Parameters:
load_pandas_dataframe_to_table(source_dataframe, target_table, if_exists='replace', chunk_size=DEFAULT_CHUNK_SIZE)

Create a table with the dataframe’s contents. If the table already exists, append or replace the content, depending on the value of if_exists.

Parameters:
  • source_dataframe (pandas.DataFrame) – Local or remote filepath

  • target_table (universal_transfer_operator.datasets.table.Table) – Table in which the file will be loaded

  • if_exists (universal_transfer_operator.constants.LoadExistStrategy) – Strategy to be used in case the target table already exists.

  • chunk_size (int) – Specify the number of rows in each batch to be written at a time.

Return type:

None

static get_dataframe_from_file(file)

Get pandas dataframe file. We need export_to_dataframe() for Biqqery,Snowflake and Redshift except for Postgres. For postgres we are overriding this method and using export_to_dataframe_via_byte_stream(). export_to_dataframe_via_byte_stream copies files in a buffer and then use that buffer to ingest data. With this approach we have significant performance boost for postgres.

Parameters:

file (universal_transfer_operator.datasets.file.base.File) – File path and conn_id for object stores

check_schema_autodetection_is_supported(source_file)

Checks if schema autodetection is handled natively by the database. Return False by default.

Parameters:

source_file (universal_transfer_operator.datasets.file.base.File) – File from which we need to transfer data

Return type:

bool

check_file_pattern_based_schema_autodetection_is_supported(source_file)

Checks if schema autodetection is handled natively by the database for file patterns and prefixes. Return False by default.

Parameters:

source_file (universal_transfer_operator.datasets.file.base.File) – File from which we need to transfer data

Return type:

bool

row_count(table)

Returns the number of rows in a table.

Parameters:

table (universal_transfer_operator.datasets.table.Table) – table to count

Returns:

The number of rows in the table

create_schema_if_needed(schema)

This function checks if the expected schema exists in the database. If the schema does not exist, it will attempt to create it.

Parameters:

schema (str | None) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

None

abstract schema_exists(schema)

Checks if a schema exists in the database

Parameters:

schema (str) – DB Schema - a namespace that contains named objects like (tables, functions, etc)

Return type:

bool

export_table_to_pandas_dataframe()

Copy the content of a table to an in-memory Pandas dataframe.

Return type:

pandas.DataFrame