Skip to content

ETL

Extract

read_delta

read_delta(
    table_uri: str, eager: bool = False
) -> DataFrameType

Reads a Delta table from the specified abfss URI. Automatically handles the authentication with OneLake.

Parameters:

Name Type Description Default
table_uri str

The abfss URI of the Delta table to read.

required
eager bool

If True, reads the table eagerly; otherwise, returns a lazy frame. Defaults to False.

False

Returns:

Name Type Description
DataFrameType DataFrameType

The data from the Delta table.

Example
from blueno.etl import read_delta

workspace_id = "12345678-1234-1234-1234-123456789012"
lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef"
table_name = "my-delta-table"
table_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}"

df = read_delta(table_uri, eager=True)
lazy_df = read_delta(table_uri, eager=False)

read_parquet

read_parquet(
    table_uri: str, eager: bool = False
) -> DataFrameType

Reads a Parquet file from the specified abfss URI. Automatically handles the authentication with OneLake.

Parameters:

Name Type Description Default
table_uri str

The abfss URI of the Parquet file to read. Supports globbing.

required
eager bool

If True, reads the file eagerly; otherwise, returns a lazy frame. Defaults to False.

False

Returns:

Type Description
DataFrameType

The data from the Parquet file.

Example

Reading a single file

from blueno.etl import read_parquet

workspace_id = "12345678-1234-1234-1234-123456789012"
lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef"

file_path = "my-parquet-file.parquet"
folder_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/"

df = read_parquet(folder_uri + file_path, eager=True)

Reading all Parquet files in a folder

from blueno.etl import read_parquet

workspace_id = "12345678-1234-1234-1234-123456789012"
lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef"

folder_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/"
glob_df = read_parquet(folder_uri + "**/*.parquet", eager=True)

Transforms

add_audit_columns

add_audit_columns(
    df: DataFrameType, audit_columns: list[Column]
) -> DataFrameType

Adds audit columns to the given DataFrame or LazyFrame based on the configuration.

Parameters:

Name Type Description Default
df DataFrameType

The DataFrame or LazyFrame to which audit columns will be added.

required
audit_columns list[Column]

A list of audit columns to put on the DataFrame.

required

Returns:

Type Description
DataFrameType

The DataFrame or LazyFrame with the added audit columns.

Example
from blueno.etl import add_audit_columns, Column
import polars as pl
from datetime import datetime, timezone

audit_columns = [
    Column("created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC")))
]
df = pl.DataFrame({"data": [1, 2, 3]})
updated_df = add_audit_columns(df, audit_columns)

deduplicate

deduplicate(
    df: DataFrameType,
    key_columns: Optional[List[str]] = None,
    deduplication_order_columns: Optional[List[str]] = None,
    deduplication_order_descending: bool = True,
) -> DataFrameType

Removes duplicate rows from the DataFrame based on primary key columns.

Parameters:

Name Type Description Default
df DataFrameType

The DataFrame or LazyFrame from which duplicates will be removed.

required
key_columns Optional[List[str]]

The columns to use as primary keys for deduplication.

None
deduplication_order_columns Optional[List[str]]

The columns to determine the order of rows for deduplication.

None
deduplication_order_descending bool

Whether to sort the deduplication order in descending order.

True

Returns:

Type Description
DataFrameType

The DataFrame or LazyFrame with duplicates removed.

Example
import polars as pl
from blueno.etl import deduplicate

df = pl.DataFrame({"id": [1, 2, 2, 3], "value": ["a", "b", "b", "c"]})
deduped_df = deduplicate(df, key_columns=["id"])

normalize_column_names

normalize_column_names(
    df: DataFrameType,
    normalization_strategy: Callable[[str], str],
) -> DataFrameType

Normalizes the column names of the DataFrame using a provided normalization strategy.

Parameters:

Name Type Description Default
df DataFrameType

The DataFrame or LazyFrame whose column names will be normalized.

required
normalization_strategy Callable[[str], str]

A callable which takes a string and returns a modified string.

required

Returns:

Type Description
DataFrameType

The DataFrame or LazyFrame with normalized column names.

Example
import polars as pl
from blueno.etl import normalize_column_names


def my_strategy(old_column_name: str) -> str:
    new_name = old_column_name.replace(" ", "_").lower()
    return new_name


df = pl.DataFrame({"First Name": [1, 2], "Last Name": [3, 4]})
normalized_df = normalize_column_names(df, my_strategy)

reorder_columns_by_suffix

reorder_columns_by_suffix(
    df: DataFrameType,
    suffix_order: List[str],
    sort_alphabetically_within_group: bool = True,
) -> DataFrameType

Reorders DataFrame columns based on their suffixes according to the provided order.

Parameters:

Name Type Description Default
df DataFrameType

The DataFrame or LazyFrame whose columns will be reordered.

required
suffix_order List[str]

List of suffixes in the desired order.

required
sort_alphabetically_within_group bool

Whether to sort columns alphabetically within each suffix group.

True

Returns:

Type Description
DataFrameType

The DataFrame or LazyFrame with reordered columns.

Example
import polars as pl
from blueno.etl import reorder_columns_by_suffix


df = pl.DataFrame(
    {
        "name_key": ["a", "b"],
        "age_key": [1, 2],
        "name_value": ["x", "y"],
        "age_value": [10, 20],
    }
)
reordered_df = reorder_columns_by_suffix(df, suffix_order=["_pk", "_fk"])

reorder_columns_by_prefix

reorder_columns_by_prefix(
    df: DataFrameType,
    prefix_order: List[str],
    sort_alphabetically_within_group: bool = True,
) -> DataFrameType

Reorders DataFrame columns based on their prefixes according to the provided order.

Parameters:

Name Type Description Default
df DataFrameType

The DataFrame or LazyFrame whose columns will be reordered.

required
prefix_order List[str]

List of prefixes in the desired order.

required
sort_alphabetically_within_group bool

Whether to sort columns alphabetically within each prefix group.

True

Returns:

Type Description
DataFrameType

The DataFrame or LazyFrame with reordered columns.

Example
import polars as pl
from blueno.etl import reorder_columns_by_prefix

df = pl.DataFrame(
    {
        "dim_name": ["a", "b"],
        "dim_age": [1, 2],
        "fact_sales": [100, 200],
        "fact_quantity": [5, 10],
    }
)
reordered_df = reorder_columns_by_prefix(df, prefix_order=["pk_", "fk_"])

apply_scd_type_2

apply_scd_type_2(
    source_df: DataFrameType,
    target_df: DataFrameType,
    primary_key_columns: List[str],
    valid_from_column: str,
    valid_to_column: str,
) -> DataFrameType

Applies Slowly Changing Dimension (SCD) Type 2 logic to merge source and target DataFrames.

SCD Type 2 maintains historical records by creating new rows for changed data while preserving the history through valid_from and valid_to dates.

The result maintains the full history of changes while ensuring proper date ranges for overlapping records.

Parameters:

Name Type Description Default
source_df DataFrameType

The new/source DataFrame containing updated records.

required
target_df DataFrameType

The existing/target DataFrame containing current records.

required
primary_key_columns List[str]

Column(s) that uniquely identify each entity.

required
valid_from_column str

Column name containing the validity start date.

required
valid_to_column str

Column name containing the validity end date.

required

Returns:

Type Description
DataFrameType

A DataFrame containing both current and historical records with updated validity periods.

Example
import polars as pl
from blueno.etl import apply_scd_type_2
from datetime import datetime

# Create sample source and target dataframes
source_df = pl.DataFrame({
    "customer_id": [1, 2],
    "name": ["John Updated", "Jane Updated"],
    "valid_from": [datetime(2024, 1, 1), datetime(2024, 1, 1)],
})

target_df = pl.DataFrame({
    "customer_id": [1, 2],
    "name": ["John", "Jane"],
    "valid_from": [datetime(2023, 1, 1), datetime(2023, 1, 1)],
    "valid_to": [None, None]
})

# Apply SCD Type 2
result_df = apply_scd_type_2(
    source_df=source_df,
    target_df=target_df,
    primary_key_columns="customer_id",
    valid_from_column="valid_from",
    valid_to_column="valid_to"
)

print(result_df.sort("customer_id", "valid_from"))

"""
shape: (4, 4)
┌─────────────┬──────────────┬─────────────────────┬─────────────────────┐
│ customer_id ┆ name         ┆ valid_from          ┆ valid_to            │
│ ---         ┆ ---          ┆ ---                 ┆ ---                 │
│ i64         ┆ str          ┆ datetime[μs]        ┆ datetime[μs]        │
╞═════════════╪══════════════╪═════════════════════╪═════════════════════╡
│ 1           ┆ John         ┆ 2023-01-01 00:00:00 ┆ 2024-01-01 00:00:00 │
│ 1           ┆ John Updated ┆ 2024-01-01 00:00:00 ┆ null                │
│ 2           ┆ Jane         ┆ 2023-01-01 00:00:00 ┆ 2024-01-01 00:00:00 │
│ 2           ┆ Jane Updated ┆ 2024-01-01 00:00:00 ┆ null                │
└─────────────┴──────────────┴─────────────────────┴─────────────────────┘
"""
Notes
  • The function handles overlapping date ranges by adjusting valid_to dates
  • NULL in valid_to indicates a currently active record
  • Records in source_df will create new versions if they differ from target_df
  • Historical records are preserved with appropriate valid_to dates

Load

Delta

upsert

upsert(
    table_or_uri: Union[str, DeltaTable],
    df: DataFrameType,
    key_columns: List[str],
    update_exclusion_columns: Optional[List[str]] = None,
    predicate_exclusion_columns: Optional[List[str]] = None,
) -> Dict[str, str]

Updates existing records and inserts new records into a Delta table.

Parameters:

Name Type Description Default
table_or_uri Union[str, DeltaTable]

Path to the Delta table or a DeltaTable instance

required
df DataFrameType

Data to upsert as a Polars DataFrame or LazyFrame

required
key_columns List[str]

Column(s) that uniquely identify each record

required
update_exclusion_columns Optional[List[str]]

Columns that should never be updated (e.g., created_at)

None
predicate_exclusion_columns Optional[List[str]]

Columns to ignore when checking for changes

None

Returns:

Type Description
Dict[str, str]

Dict containing merge operation statistics

Example
from blueno.etl import upsert
import polars as pl

# Create sample data
data = pl.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})

# Upsert data using 'id' as the key column
upsert("path/to/upsert_delta_table", data, key_columns=["id"])

overwrite

overwrite(
    table_or_uri: str | DeltaTable, df: DataFrameType
) -> None

Replaces all data in a Delta table with new data.

Parameters:

Name Type Description Default
table_or_uri str | DeltaTable

Path to the Delta table or a DeltaTable instance

required
df DataFrameType

Data to write as a Polars DataFrame or LazyFrame

required
Example
from blueno.etl import overwrite
import polars as pl

# Create sample data
data = pl.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})

# Replace entire table with new data
overwrite("path/to/overwrite_delta_table", data)

replace_range

replace_range(
    table_or_uri: str | DeltaTable,
    df: DataFrameType,
    range_column: str,
) -> None

Replaces data within a specific range in the Delta table.

Parameters:

Name Type Description Default
table_or_uri str | DeltaTable

Path to the Delta table or a DeltaTable instance

required
df DataFrameType

Data to write as a Polars DataFrame or LazyFrame

required
range_column str

Column used to define the range. Records in the table with values between the min and max of this column in df will be replaced

required
Example
from blueno.etl import replace_range
import polars as pl

# Create sample data for dates 2024-01-01 to 2024-01-31
data = pl.DataFrame({"date": ["2024-01-01", "2024-01-31"], "value": [100, 200]})

# Replace all records between Jan 1-31
replace_range("path/to/replace_range_delta_table", data, range_column="date")

append

append(
    table_or_uri: str | DeltaTable, df: DataFrameType
) -> None

Appends the provided dataframe to the Delta table.

Parameters:

Name Type Description Default
table_or_uri str | DeltaTable

The URI of the target Delta table

required
df DataFrameType

The dataframe to append to the Delta table

required
Example
from blueno.etl import append
import polars as pl

# Create sample data
data = pl.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})

# Append data to table
append("path/to/append_delta_table", data)

incremental

incremental(
    table_or_uri: str | DeltaTable,
    df: DataFrameType,
    incremental_column: str,
) -> None

Appends only new records based on an incremental column value.

Parameters:

Name Type Description Default
table_or_uri str | DeltaTable

Path to the Delta table or a DeltaTable instance

required
df DataFrameType

Data to append as a Polars DataFrame or LazyFrame

required
incremental_column str

Column used to identify new records. Only records where this column's value is greater than the maximum value in the existing table will be appended

required
Example
from blueno.etl import incremental
import polars as pl

# Create sample data
data = pl.DataFrame({"timestamp": ["2024-05-24T10:00:00"], "value": [100]})

# Append only records newer than existing data
incremental("path/to/incremental_delta_table", data, incremental_column="timestamp")

DuckLake

Parquet

write_parquet

write_parquet(
    uri: str,
    df: DataFrameType,
    partition_by: Optional[List[str]] = None,
) -> None

Overwrites the entire parquet file or directory (if using partition_by) with the provided dataframe.

Parameters:

Name Type Description Default
uri str

The file or directory URI to write to. This should be a path if using partition_by

required
df DataFrameType

The dataframe to write

required
partition_by Optional[List[str]]

Column(s) to partition by

None
Example
from blueno.etl import write_parquet
import polars as pl

# Create sample data with dates
data = pl.DataFrame(
    {"year": [2024, 2024, 2024], "month": [1, 2, 3], "value": [100, 200, 300]}
)

# Write data partitioned by year and month
write_parquet(uri="path/to/parquet", df=data, partition_by=["year", "month"])