Skip to content

Transform

add_audit_columns

add_audit_columns(df: PolarsFrame, audit_columns: list[Column]) -> PolarsFrame

Adds audit columns to the given DataFrame or LazyFrame based on the configuration.

Parameters:

Name Type Description Default
df PolarsFrame

The DataFrame or LazyFrame to which audit columns will be added.

required
audit_columns list[Column]

A list of audit columns to put on the DataFrame.

required

Returns:

Type Description
PolarsFrame

The DataFrame or LazyFrame with the added audit columns.

Example
from msfabricutils.etl import add_audit_columns
import polars as pl

audit_columns = [
    Column(
       "created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC"))
    )
]
df = pl.DataFrame({"data": [1, 2, 3]})
updated_df = add_audit_columns(df, audit_columns)

deduplicate

deduplicate(df: PolarsFrame, primary_key_columns: str | list[str] | None = None, deduplication_order_columns: str | list[str] | None = None, deduplication_order_descending: bool | list[bool] = True) -> PolarsFrame

Removes duplicate rows from the DataFrame based on primary key columns.

Parameters:

Name Type Description Default
df PolarsFrame

The DataFrame or LazyFrame from which duplicates will be removed.

required
primary_key_columns list[str] | None

The columns to use as primary keys for deduplication.

None
deduplication_order_columns list[str] | None

The columns to determine the order of rows for deduplication.

None
deduplication_order_descending bool | list[bool]

Whether to sort the deduplication order in descending order.

True

Returns:

Name Type Description
PolarsFrame PolarsFrame

The DataFrame or LazyFrame with duplicates removed.

Example
import polars as pl

df = pl.DataFrame({
    "id": [1, 2, 2, 3],
    "value": ["a", "b", "b", "c"]
})
deduped_df = deduplicate(df, primary_key_columns=["id"])

normalize_column_names

normalize_column_names(df: PolarsFrame, normalization_strategy: Callable[[str], str]) -> PolarsFrame

Normalizes the column names of the DataFrame using a provided normalization strategy.

Parameters:

Name Type Description Default
df PolarsFrame

The DataFrame or LazyFrame whose column names will be normalized.

required
normalization_strategy Callable[[str], str]

A callable which takes a string and returns a modified string.

required

Returns:

Name Type Description
PolarsFrame PolarsFrame

The DataFrame or LazyFrame with normalized column names.

Example
import polars as pl

def my_strategy(old_column_name: str) -> str:
    new_name = old_column_name.replace(" ").lower()
    return new_name

df = pl.DataFrame({"First Name": [1, 2], "Last Name": [3, 4]})
normalized_df = normalize_column_names(df, my_strategy)

reorder_columns_by_suffix

reorder_columns_by_suffix(df: PolarsFrame, suffix_order: list[str], sort_alphabetically_within_group: bool = True) -> PolarsFrame

Reorders DataFrame columns based on their suffixes according to the provided order.

Parameters:

Name Type Description Default
df PolarsFrame

The DataFrame or LazyFrame whose columns will be reordered.

required
suffix_order list[str]

List of suffixes in the desired order.

required
sort_alphabetically_within_group bool

Whether to sort columns alphabetically within each suffix group.

True

Returns:

Name Type Description
PolarsFrame PolarsFrame

The DataFrame or LazyFrame with reordered columns.

Example
import polars as pl

df = pl.DataFrame({
    "name_key": ["a", "b"],
    "age_key": [1, 2],
    "name_value": ["x", "y"],
    "age_value": [10, 20]
})
reordered_df = reorder_columns_by_suffix(df, suffix_order=["_key", "_value"])

reorder_columns_by_prefix

reorder_columns_by_prefix(df: PolarsFrame, prefix_order: list[str], sort_alphabetically_within_group: bool = True) -> PolarsFrame

Reorders DataFrame columns based on their prefixes according to the provided order.

Parameters:

Name Type Description Default
df PolarsFrame

The DataFrame or LazyFrame whose columns will be reordered.

required
prefix_order list[str]

List of prefixes in the desired order.

required
sort_alphabetically_within_group bool

Whether to sort columns alphabetically within each prefix group.

True

Returns:

Name Type Description
PolarsFrame PolarsFrame

The DataFrame or LazyFrame with reordered columns.

Example
import polars as pl

df = pl.DataFrame({
    "dim_name": ["a", "b"],
    "dim_age": [1, 2],
    "fact_sales": [100, 200],
    "fact_quantity": [5, 10]
})
reordered_df = reorder_columns_by_prefix(df, prefix_order=["dim_", "fact_"])

apply_scd_type_2

apply_scd_type_2(source_df: PolarsFrame, target_df: PolarsFrame, primary_key_columns: str | list[str], valid_from_column: str, valid_to_column: str) -> PolarsFrame

Applies Slowly Changing Dimension (SCD) Type 2 logic to merge source and target DataFrames. SCD Type 2 maintains historical records by creating new rows for changed data while preserving the history through valid_from and valid_to dates.

The result maintains the full history of changes while ensuring proper date ranges for overlapping records.

Parameters:

Name Type Description Default
source_df PolarsFrame

The new/source DataFrame containing updated records.

required
target_df PolarsFrame

The existing/target DataFrame containing current records.

required
primary_key_columns str | list[str]

Column(s) that uniquely identify each entity.

required
valid_from_column str

Column name containing the validity start date.

required
valid_to_column str

Column name containing the validity end date.

required

Returns:

Name Type Description
PolarsFrame PolarsFrame

A DataFrame containing both current and historical records with updated validity periods.

Example
import polars as pl
from datetime import datetime

# Create sample source and target dataframes
source_df = pl.DataFrame({
    "customer_id": [1, 2],
    "name": ["John Updated", "Jane"],
    "valid_from": [datetime(2024, 1, 1), datetime(2024, 1, 1)]
})

target_df = pl.DataFrame({
    "customer_id": [1, 2],
    "name": ["John", "Jane"],
    "valid_from": [datetime(2023, 1, 1), datetime(2023, 1, 1)],
    "valid_to": [None, None]
})

# Apply SCD Type 2
result_df = apply_scd_type_2(
    source_df=source_df,
    target_df=target_df,
    primary_key_columns="customer_id",
    valid_from_column="valid_from",
    valid_to_column="valid_to"
)

print(result_df.sort("customer_id", "valid_from"))

shape: (4, 4)
┌─────────────┬──────────────┬─────────────────────┬─────────────────────┐
 customer_id  name          valid_from           valid_to            
 ---          ---           ---                  ---                 
 i64          str           datetime[μs]         datetime[μs]        
╞═════════════╪══════════════╪═════════════════════╪═════════════════════╡
 1            John          2023-01-01 00:00:00  2024-01-01 00:00:00 
 1            John Updated  2024-01-01 00:00:00  null                
 2            Jane          2023-01-01 00:00:00  2024-01-01 00:00:00 
 2            Jane Updated  2024-01-01 00:00:00  null                
└─────────────┴──────────────┴─────────────────────┴─────────────────────┘        
Notes
  • The function handles overlapping date ranges by adjusting valid_to dates
  • NULL in valid_to indicates a currently active record
  • Records in source_df will create new versions if they differ from target_df
  • Historical records are preserved with appropriate valid_to dates