Skip to content

Load

upsert

upsert(table_or_uri: str | DeltaTable, df: PolarsFrame, primary_key_columns: str | list[str], update_exclusion_columns: str | list[str] | None = None, predicate_exclusion_columns: str | list[str] | None = None, delta_merge_options: dict[str, Any] | None = None) -> dict[str, str]

Upserts dataframe into a Delta table using the provided primary key columns.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to upsert.

required
primary_key_columns str | list[str]

Primary key column(s) for the upsert.

required
update_exclusion_columns str | list[str] | None

Columns which will not be updated in the merge.

None
predicate_exclusion_columns str | list[str] | None

Columns to exclude from the upsert. Difference between source and target of these columns will not trigger an update, however if there is a difference in the other columns, the row will be also be updated with the excluded_columns.

None
delta_merge_options dict[str, Any] | None

Additional keyword arguments while merging to the Delta Table

None

Returns:

Type Description
dict[str, str]

Result of the merge operation.

Example
from msfabricutils.etl import upsert
import polars as pl

data = pl.DataFrame({...})

upsert(
    "path/to/delta_table",
    data,
    primary_key_columns=["id"],
)

upsert_cdf

upsert_cdf(table_or_uri: str, df: PolarsFrame, primary_key_columns: list[str], change_type_column: str = '_change_type', change_type_insert: str = 'insert', change_type_update: str = 'update', change_type_delete: str = 'delete', delta_merge_options: dict[str, Any] | None = None)

Upserts dataframe into a Delta table using the provided primary key columns using Change Data Feed (CDF).

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to upsert.

required
primary_key_columns str | list[str]

Primary key column(s) for the upsert.

required
change_type_column str

The column in the dataframe containing the change type, i.e. insert, update and delete

'_change_type'
change_type_insert str

The value of the insert change type in the dataframe.

'insert'
change_type_update str

The value of the update change type in the dataframe.

'update'
change_type_delete str

The value of the delete change type in the dataframe.

'delete'
delta_merge_options dict[str, Any] | None

Additional keyword arguments while writing a Delta lake Table.

None

Returns:

Type Description

Result of the merge operation.

Example
from msfabricutils.etl import upsert_cdf
import polars as pl

data = pl.DataFrame({...})

upsert_cdf(
    "path/to/delta_table",
    data,
    primary_key_columns=["id"],
)

overwrite

overwrite(table_or_uri: str | DeltaTable, df: PolarsFrame, delta_write_options: dict[str, Any] | None = None) -> None

Overwrites the entire Delta table with the provided dataframe.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to write to the Delta table.

required
delta_write_options dict[str, Any] | None

Additional keyword arguments while writing a Delta lake Table.

None
Example
from msfabricutils.etl import overwrite
import polars as pl

data = pl.DataFrame({...})

overwrite("path/to/delta_table", data)

replace_range

replace_range(table_or_uri: str | DeltaTable, df: PolarsFrame, range_column: str, delta_write_options: dict[str, Any] | None = None) -> None

Replaces a range of data in the Delta table based on a specified column.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to write to the Delta table.

required
range_column str

The column used to determine the range of data to replace. This replaces the data in the range of the range_column in the delta table based off the min and max values of the range_column in the dataframe.

required
delta_write_options dict[str, Any] | None

Additional keyword arguments while writing a Delta lake Table.

None
Example
from msfabricutils.etl import replace_range
import polars as pl

data = pl.DataFrame({...})

replace_range("path/to/delta_table", data, range_column="date")

append

append(table_or_uri: str | DeltaTable, df: PolarsFrame, delta_write_options: dict[str, Any] | None = None) -> None

Appends the provided dataframe to the Delta table.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to append to the Delta table.

required
delta_write_options dict[str, Any] | None

Additional keyword arguments while writing a Delta lake Table.

None
Example
from msfabricutils.etl import append
import polars as pl

data = pl.DataFrame({...})

append("path/to/delta_table", data)

incremental

incremental(table_or_uri: str | DeltaTable, df: PolarsFrame, incremental_column: str, delta_write_options: dict[str, Any] | None = None) -> None

Appends new data to the Delta table based on an incremental column.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to append to the Delta table.

required
incremental_column str

The column used to determine new data to append. The source dataframe will only append rows where the value of the incremental_column is greater than the max value of the incremental_column in the delta table.

required
delta_write_options dict[str, Any] | None

Additional keyword arguments while writing a Delta lake Table.

None
Example
from msfabricutils.etl import incremental
import polars as pl

data = pl.DataFrame({...})

incremental("path/to/delta_table", data, incremental_column="timestamp")