Skip to content

Load

upsert

upsert(table_or_uri: str | DeltaTable, df: PolarsFrame, primary_key_columns: str | list[str], update_exclusion_columns: str | list[str] | None = None, predicate_exclusion_columns: str | list[str] | None = None) -> dict[str, str]

Upserts dataframe into a Delta table using the provided primary key columns.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to upsert.

required
primary_key_columns str | list[str]

Primary key column(s) for the upsert.

required
update_exclusion_columns str | list[str] | None

Columns which will not be updated in the merge.

None
predicate_exclusion_columns str | list[str] | None

Columns to exclude from the upsert. Difference between source and target of these columns will not trigger an update, however if there is a difference in the other columns, the row will be also be updated with the excluded_columns.

None

Returns:

Type Description
dict[str, str]

Result of the merge operation.

Example
from msfabricutils.etl import upsert
import polars as pl


config = get_default_config()
data = pl.DataFrame({...})

upsert(
    "path/to/delta_table",
    data,
    primary_key_columns=["id"],
)

overwrite

overwrite(table_or_uri: str | DeltaTable, df: PolarsFrame) -> None

Overwrites the entire Delta table with the provided dataframe.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to write to the Delta table.

required
Example
from msfabricutils.etl import overwrite
import polars as pl

data = pl.DataFrame({...})

overwrite("path/to/delta_table", data)

replace_range

replace_range(table_or_uri: str | DeltaTable, df: PolarsFrame, range_column: str) -> None

Replaces a range of data in the Delta table based on a specified column.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to write to the Delta table.

required
range_column str

The column used to determine the range of data to replace. This replaces the data in the range of the range_column in the delta table based off the min and max values of the range_column in the dataframe.

required
Example
from msfabricutils.etl import replace_range
import polars as pl

data = pl.DataFrame({...})

replace_range("path/to/delta_table", data, range_column="date")

append

append(table_or_uri: str | DeltaTable, df: PolarsFrame) -> None

Appends the provided dataframe to the Delta table.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to append to the Delta table.

required
Example
from msfabricutils.etl import append
import polars as pl

data = pl.DataFrame({...})

append("path/to/delta_table", data)

incremental

incremental(table_or_uri: str | DeltaTable, df: PolarsFrame, incremental_column: str) -> None

Appends new data to the Delta table based on an incremental column.

Parameters:

Name Type Description Default
table_or_uri str

The URI of the target Delta table.

required
df PolarsFrame

The dataframe to append to the Delta table.

required
incremental_column str

The column used to determine new data to append. The source dataframe will only append rows where the value of the incremental_column is greater than the max value of the incremental_column in the delta table.

required
Example
from msfabricutils.etl import incremental
import polars as pl

data = pl.DataFrame({...})

incremental("path/to/delta_table", data, incremental_column="timestamp")