Load
upsert
upsert(table_or_uri: str | DeltaTable, df: PolarsFrame, primary_key_columns: str | list[str], update_exclusion_columns: str | list[str] | None = None, predicate_exclusion_columns: str | list[str] | None = None) -> dict[str, str]
Upserts dataframe into a Delta table using the provided primary key columns.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to upsert. |
required |
primary_key_columns
|
str | list[str]
|
Primary key column(s) for the upsert. |
required |
update_exclusion_columns
|
str | list[str] | None
|
Columns which will not be updated in the merge. |
None
|
predicate_exclusion_columns
|
str | list[str] | None
|
Columns to exclude from the upsert. Difference between source and target of these columns will not trigger an update, however if there is a difference in the other columns, the row will be also be updated with the |
None
|
Returns:
Type | Description |
---|---|
dict[str, str]
|
Result of the merge operation. |
Example
from msfabricutils.etl import upsert
import polars as pl
config = get_default_config()
data = pl.DataFrame({...})
upsert(
"path/to/delta_table",
data,
primary_key_columns=["id"],
)
overwrite
overwrite(table_or_uri: str | DeltaTable, df: PolarsFrame) -> None
Overwrites the entire Delta table with the provided dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to write to the Delta table. |
required |
Example
from msfabricutils.etl import overwrite
import polars as pl
data = pl.DataFrame({...})
overwrite("path/to/delta_table", data)
replace_range
replace_range(table_or_uri: str | DeltaTable, df: PolarsFrame, range_column: str) -> None
Replaces a range of data in the Delta table based on a specified column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to write to the Delta table. |
required |
range_column
|
str
|
The column used to determine the range of data to replace. This replaces the data in the range of the |
required |
Example
from msfabricutils.etl import replace_range
import polars as pl
data = pl.DataFrame({...})
replace_range("path/to/delta_table", data, range_column="date")
append
append(table_or_uri: str | DeltaTable, df: PolarsFrame) -> None
Appends the provided dataframe to the Delta table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to append to the Delta table. |
required |
Example
from msfabricutils.etl import append
import polars as pl
data = pl.DataFrame({...})
append("path/to/delta_table", data)
incremental
incremental(table_or_uri: str | DeltaTable, df: PolarsFrame, incremental_column: str) -> None
Appends new data to the Delta table based on an incremental column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to append to the Delta table. |
required |
incremental_column
|
str
|
The column used to determine new data to append. The source dataframe will only append rows where the value of the |
required |
Example
from msfabricutils.etl import incremental
import polars as pl
data = pl.DataFrame({...})
incremental("path/to/delta_table", data, incremental_column="timestamp")