Load
upsert
upsert(table_or_uri: str | DeltaTable, df: PolarsFrame, primary_key_columns: str | list[str], update_exclusion_columns: str | list[str] | None = None, predicate_exclusion_columns: str | list[str] | None = None, delta_merge_options: dict[str, Any] | None = None) -> dict[str, str]
Upserts dataframe into a Delta table using the provided primary key columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to upsert. |
required |
primary_key_columns
|
str | list[str]
|
Primary key column(s) for the upsert. |
required |
update_exclusion_columns
|
str | list[str] | None
|
Columns which will not be updated in the merge. |
None
|
predicate_exclusion_columns
|
str | list[str] | None
|
Columns to exclude from the upsert. Difference between source and target of these columns will not trigger an update, however if there is a difference in the other columns, the row will be also be updated with the |
None
|
delta_merge_options
|
dict[str, Any] | None
|
Additional keyword arguments while merging to the Delta Table |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Result of the merge operation. |
Example
from msfabricutils.etl import upsert
import polars as pl
data = pl.DataFrame({...})
upsert(
"path/to/delta_table",
data,
primary_key_columns=["id"],
)
upsert_cdf
upsert_cdf(table_or_uri: str, df: PolarsFrame, primary_key_columns: list[str], change_type_column: str = '_change_type', change_type_insert: str = 'insert', change_type_update: str = 'update', change_type_delete: str = 'delete', delta_merge_options: dict[str, Any] | None = None)
Upserts dataframe into a Delta table using the provided primary key columns using Change Data Feed (CDF).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to upsert. |
required |
primary_key_columns
|
str | list[str]
|
Primary key column(s) for the upsert. |
required |
change_type_column
|
str
|
The column in the dataframe containing the change type, i.e. |
'_change_type'
|
change_type_insert
|
str
|
The value of the |
'insert'
|
change_type_update
|
str
|
The value of the |
'update'
|
change_type_delete
|
str
|
The value of the |
'delete'
|
delta_merge_options
|
dict[str, Any] | None
|
Additional keyword arguments while writing a Delta lake Table. |
None
|
Returns:
| Type | Description |
|---|---|
|
Result of the merge operation. |
Example
from msfabricutils.etl import upsert_cdf
import polars as pl
data = pl.DataFrame({...})
upsert_cdf(
"path/to/delta_table",
data,
primary_key_columns=["id"],
)
overwrite
overwrite(table_or_uri: str | DeltaTable, df: PolarsFrame, delta_write_options: dict[str, Any] | None = None) -> None
Overwrites the entire Delta table with the provided dataframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to write to the Delta table. |
required |
delta_write_options
|
dict[str, Any] | None
|
Additional keyword arguments while writing a Delta lake Table. |
None
|
Example
from msfabricutils.etl import overwrite
import polars as pl
data = pl.DataFrame({...})
overwrite("path/to/delta_table", data)
replace_range
replace_range(table_or_uri: str | DeltaTable, df: PolarsFrame, range_column: str, delta_write_options: dict[str, Any] | None = None) -> None
Replaces a range of data in the Delta table based on a specified column.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to write to the Delta table. |
required |
range_column
|
str
|
The column used to determine the range of data to replace. This replaces the data in the range of the |
required |
delta_write_options
|
dict[str, Any] | None
|
Additional keyword arguments while writing a Delta lake Table. |
None
|
Example
from msfabricutils.etl import replace_range
import polars as pl
data = pl.DataFrame({...})
replace_range("path/to/delta_table", data, range_column="date")
append
append(table_or_uri: str | DeltaTable, df: PolarsFrame, delta_write_options: dict[str, Any] | None = None) -> None
Appends the provided dataframe to the Delta table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to append to the Delta table. |
required |
delta_write_options
|
dict[str, Any] | None
|
Additional keyword arguments while writing a Delta lake Table. |
None
|
Example
from msfabricutils.etl import append
import polars as pl
data = pl.DataFrame({...})
append("path/to/delta_table", data)
incremental
incremental(table_or_uri: str | DeltaTable, df: PolarsFrame, incremental_column: str, delta_write_options: dict[str, Any] | None = None) -> None
Appends new data to the Delta table based on an incremental column.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_or_uri
|
str
|
The URI of the target Delta table. |
required |
df
|
PolarsFrame
|
The dataframe to append to the Delta table. |
required |
incremental_column
|
str
|
The column used to determine new data to append. The source dataframe will only append rows where the value of the |
required |
delta_write_options
|
dict[str, Any] | None
|
Additional keyword arguments while writing a Delta lake Table. |
None
|
Example
from msfabricutils.etl import incremental
import polars as pl
data = pl.DataFrame({...})
incremental("path/to/delta_table", data, incremental_column="timestamp")