Transform
add_audit_columns
add_audit_columns(df: PolarsFrame, audit_columns: list[Column]) -> PolarsFrame
Adds audit columns to the given DataFrame or LazyFrame based on the configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
PolarsFrame
|
The DataFrame or LazyFrame to which audit columns will be added. |
required |
audit_columns
|
list[Column]
|
A list of audit columns to put on the DataFrame. |
required |
Returns:
Type | Description |
---|---|
PolarsFrame
|
The DataFrame or LazyFrame with the added audit columns. |
Example
from msfabricutils.etl import add_audit_columns
import polars as pl
audit_columns = [
Column(
"created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))
)
]
df = pl.DataFrame({"data": [1, 2, 3]})
updated_df = add_audit_columns(df, audit_columns)
deduplicate
deduplicate(df: PolarsFrame, primary_key_columns: str | list[str] | None = None, deduplication_order_columns: str | list[str] | None = None, deduplication_order_descending: bool | list[bool] = True) -> PolarsFrame
Removes duplicate rows from the DataFrame based on primary key columns.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
PolarsFrame
|
The DataFrame or LazyFrame from which duplicates will be removed. |
required |
primary_key_columns
|
list[str] | None
|
The columns to use as primary keys for deduplication. |
None
|
deduplication_order_columns
|
list[str] | None
|
The columns to determine the order of rows for deduplication. |
None
|
deduplication_order_descending
|
bool | list[bool]
|
Whether to sort the deduplication order in descending order. |
True
|
Returns:
Name | Type | Description |
---|---|---|
PolarsFrame |
PolarsFrame
|
The DataFrame or LazyFrame with duplicates removed. |
Example
import polars as pl
df = pl.DataFrame({
"id": [1, 2, 2, 3],
"value": ["a", "b", "b", "c"]
})
deduped_df = deduplicate(df, primary_key_columns=["id"])
normalize_column_names
normalize_column_names(df: PolarsFrame, normalization_strategy: Callable[[str], str]) -> PolarsFrame
Normalizes the column names of the DataFrame using a provided normalization strategy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
PolarsFrame
|
The DataFrame or LazyFrame whose column names will be normalized. |
required |
normalization_strategy
|
Callable[[str], str]
|
A callable which takes a string and returns a modified string. |
required |
Returns:
Name | Type | Description |
---|---|---|
PolarsFrame |
PolarsFrame
|
The DataFrame or LazyFrame with normalized column names. |
Example
import polars as pl
def my_strategy(old_column_name: str) -> str:
new_name = old_column_name.replace(" ").lower()
return new_name
df = pl.DataFrame({"First Name": [1, 2], "Last Name": [3, 4]})
normalized_df = normalize_column_names(df, my_strategy)
reorder_columns_by_suffix
reorder_columns_by_suffix(df: PolarsFrame, suffix_order: list[str], sort_alphabetically_within_group: bool = True) -> PolarsFrame
Reorders DataFrame columns based on their suffixes according to the provided order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
PolarsFrame
|
The DataFrame or LazyFrame whose columns will be reordered. |
required |
suffix_order
|
list[str]
|
List of suffixes in the desired order. |
required |
sort_alphabetically_within_group
|
bool
|
Whether to sort columns alphabetically within each suffix group. |
True
|
Returns:
Name | Type | Description |
---|---|---|
PolarsFrame |
PolarsFrame
|
The DataFrame or LazyFrame with reordered columns. |
Example
import polars as pl
df = pl.DataFrame({
"name_key": ["a", "b"],
"age_key": [1, 2],
"name_value": ["x", "y"],
"age_value": [10, 20]
})
reordered_df = reorder_columns_by_suffix(df, suffix_order=["_key", "_value"])
reorder_columns_by_prefix
reorder_columns_by_prefix(df: PolarsFrame, prefix_order: list[str], sort_alphabetically_within_group: bool = True) -> PolarsFrame
Reorders DataFrame columns based on their prefixes according to the provided order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
PolarsFrame
|
The DataFrame or LazyFrame whose columns will be reordered. |
required |
prefix_order
|
list[str]
|
List of prefixes in the desired order. |
required |
sort_alphabetically_within_group
|
bool
|
Whether to sort columns alphabetically within each prefix group. |
True
|
Returns:
Name | Type | Description |
---|---|---|
PolarsFrame |
PolarsFrame
|
The DataFrame or LazyFrame with reordered columns. |
Example
import polars as pl
df = pl.DataFrame({
"dim_name": ["a", "b"],
"dim_age": [1, 2],
"fact_sales": [100, 200],
"fact_quantity": [5, 10]
})
reordered_df = reorder_columns_by_prefix(df, prefix_order=["dim_", "fact_"])
apply_scd_type_2
apply_scd_type_2(source_df: PolarsFrame, target_df: PolarsFrame, primary_key_columns: str | list[str], valid_from_column: str, valid_to_column: str) -> PolarsFrame
Applies Slowly Changing Dimension (SCD) Type 2 logic to merge source and target DataFrames. SCD Type 2 maintains historical records by creating new rows for changed data while preserving the history through valid_from and valid_to dates.
The result maintains the full history of changes while ensuring proper date ranges for overlapping records.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_df
|
PolarsFrame
|
The new/source DataFrame containing updated records. |
required |
target_df
|
PolarsFrame
|
The existing/target DataFrame containing current records. |
required |
primary_key_columns
|
str | list[str]
|
Column(s) that uniquely identify each entity. |
required |
valid_from_column
|
str
|
Column name containing the validity start date. |
required |
valid_to_column
|
str
|
Column name containing the validity end date. |
required |
Returns:
Name | Type | Description |
---|---|---|
PolarsFrame |
PolarsFrame
|
A DataFrame containing both current and historical records with updated validity periods. |
Example
import polars as pl
from datetime import datetime
# Create sample source and target dataframes
source_df = pl.DataFrame({
"customer_id": [1, 2],
"name": ["John Updated", "Jane"],
"valid_from": [datetime(2024, 1, 1), datetime(2024, 1, 1)]
})
target_df = pl.DataFrame({
"customer_id": [1, 2],
"name": ["John", "Jane"],
"valid_from": [datetime(2023, 1, 1), datetime(2023, 1, 1)],
"valid_to": [None, None]
})
# Apply SCD Type 2
result_df = apply_scd_type_2(
source_df=source_df,
target_df=target_df,
primary_key_columns="customer_id",
valid_from_column="valid_from",
valid_to_column="valid_to"
)
print(result_df.sort("customer_id", "valid_from"))
shape: (4, 4)
┌─────────────┬──────────────┬─────────────────────┬─────────────────────┐
│ customer_id ┆ name ┆ valid_from ┆ valid_to │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ datetime[μs] ┆ datetime[μs] │
╞═════════════╪══════════════╪═════════════════════╪═════════════════════╡
│ 1 ┆ John ┆ 2023-01-01 00:00:00 ┆ 2024-01-01 00:00:00 │
│ 1 ┆ John Updated ┆ 2024-01-01 00:00:00 ┆ null │
│ 2 ┆ Jane ┆ 2023-01-01 00:00:00 ┆ 2024-01-01 00:00:00 │
│ 2 ┆ Jane Updated ┆ 2024-01-01 00:00:00 ┆ null │
└─────────────┴──────────────┴─────────────────────┴─────────────────────┘
Notes
- The function handles overlapping date ranges by adjusting valid_to dates
- NULL in valid_to indicates a currently active record
- Records in source_df will create new versions if they differ from target_df
- Historical records are preserved with appropriate valid_to dates