ETL¶
Extract¶
read_delta ¶
read_delta(
table_uri: str, eager: bool = False
) -> DataFrameType
Reads a Delta table from the specified abfss URI. Automatically handles the authentication with OneLake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_uri
|
str
|
The abfss URI of the Delta table to read. |
required |
eager
|
bool
|
If True, reads the table eagerly; otherwise, returns a lazy frame. Defaults to False. |
False
|
Returns:
Name | Type | Description |
---|---|---|
DataFrameType |
DataFrameType
|
The data from the Delta table. |
Example
from blueno.etl import read_delta
workspace_id = "12345678-1234-1234-1234-123456789012"
lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef"
table_name = "my-delta-table"
table_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}"
df = read_delta(table_uri, eager=True)
lazy_df = read_delta(table_uri, eager=False)
read_parquet ¶
read_parquet(
table_uri: str, eager: bool = False
) -> DataFrameType
Reads a Parquet file from the specified abfss URI. Automatically handles the authentication with OneLake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_uri
|
str
|
The abfss URI of the Parquet file to read. Supports globbing. |
required |
eager
|
bool
|
If True, reads the file eagerly; otherwise, returns a lazy frame. Defaults to False. |
False
|
Returns:
Type | Description |
---|---|
DataFrameType
|
The data from the Parquet file. |
Example
Reading a single file
from blueno.etl import read_parquet
workspace_id = "12345678-1234-1234-1234-123456789012"
lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef"
file_path = "my-parquet-file.parquet"
folder_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/"
df = read_parquet(folder_uri + file_path, eager=True)
Reading all Parquet files in a folder
from blueno.etl import read_parquet
workspace_id = "12345678-1234-1234-1234-123456789012"
lakehouse_id = "beefbeef-beef-beef-beef-beefbeefbeef"
folder_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/"
glob_df = read_parquet(folder_uri + "**/*.parquet", eager=True)
Transforms¶
add_audit_columns ¶
add_audit_columns(
df: DataFrameType, audit_columns: list[Column]
) -> DataFrameType
Adds audit columns to the given DataFrame or LazyFrame based on the configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrameType
|
The DataFrame or LazyFrame to which audit columns will be added. |
required |
audit_columns
|
list[Column]
|
A list of audit columns to put on the DataFrame. |
required |
Returns:
Type | Description |
---|---|
DataFrameType
|
The DataFrame or LazyFrame with the added audit columns. |
Example
from blueno.etl import add_audit_columns, Column
import polars as pl
from datetime import datetime, timezone
audit_columns = [
Column("created_at", pl.lit(datetime.now(timezone.utc)).cast(pl.Datetime("us", "UTC")))
]
df = pl.DataFrame({"data": [1, 2, 3]})
updated_df = add_audit_columns(df, audit_columns)
deduplicate ¶
deduplicate(
df: DataFrameType,
key_columns: Optional[List[str]] = None,
deduplication_order_columns: Optional[List[str]] = None,
deduplication_order_descending: bool = True,
) -> DataFrameType
Removes duplicate rows from the DataFrame based on primary key columns.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrameType
|
The DataFrame or LazyFrame from which duplicates will be removed. |
required |
key_columns
|
Optional[List[str]]
|
The columns to use as primary keys for deduplication. |
None
|
deduplication_order_columns
|
Optional[List[str]]
|
The columns to determine the order of rows for deduplication. |
None
|
deduplication_order_descending
|
bool
|
Whether to sort the deduplication order in descending order. |
True
|
Returns:
Type | Description |
---|---|
DataFrameType
|
The DataFrame or LazyFrame with duplicates removed. |
Example
import polars as pl
from blueno.etl import deduplicate
df = pl.DataFrame({"id": [1, 2, 2, 3], "value": ["a", "b", "b", "c"]})
deduped_df = deduplicate(df, key_columns=["id"])
normalize_column_names ¶
normalize_column_names(
df: DataFrameType,
normalization_strategy: Callable[[str], str],
) -> DataFrameType
Normalizes the column names of the DataFrame using a provided normalization strategy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrameType
|
The DataFrame or LazyFrame whose column names will be normalized. |
required |
normalization_strategy
|
Callable[[str], str]
|
A callable which takes a string and returns a modified string. |
required |
Returns:
Type | Description |
---|---|
DataFrameType
|
The DataFrame or LazyFrame with normalized column names. |
Example
import polars as pl
from blueno.etl import normalize_column_names
def my_strategy(old_column_name: str) -> str:
new_name = old_column_name.replace(" ", "_").lower()
return new_name
df = pl.DataFrame({"First Name": [1, 2], "Last Name": [3, 4]})
normalized_df = normalize_column_names(df, my_strategy)
reorder_columns_by_suffix ¶
reorder_columns_by_suffix(
df: DataFrameType,
suffix_order: List[str],
sort_alphabetically_within_group: bool = True,
) -> DataFrameType
Reorders DataFrame columns based on their suffixes according to the provided order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrameType
|
The DataFrame or LazyFrame whose columns will be reordered. |
required |
suffix_order
|
List[str]
|
List of suffixes in the desired order. |
required |
sort_alphabetically_within_group
|
bool
|
Whether to sort columns alphabetically within each suffix group. |
True
|
Returns:
Type | Description |
---|---|
DataFrameType
|
The DataFrame or LazyFrame with reordered columns. |
Example
import polars as pl
from blueno.etl import reorder_columns_by_suffix
df = pl.DataFrame(
{
"name_key": ["a", "b"],
"age_key": [1, 2],
"name_value": ["x", "y"],
"age_value": [10, 20],
}
)
reordered_df = reorder_columns_by_suffix(df, suffix_order=["_pk", "_fk"])
reorder_columns_by_prefix ¶
reorder_columns_by_prefix(
df: DataFrameType,
prefix_order: List[str],
sort_alphabetically_within_group: bool = True,
) -> DataFrameType
Reorders DataFrame columns based on their prefixes according to the provided order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrameType
|
The DataFrame or LazyFrame whose columns will be reordered. |
required |
prefix_order
|
List[str]
|
List of prefixes in the desired order. |
required |
sort_alphabetically_within_group
|
bool
|
Whether to sort columns alphabetically within each prefix group. |
True
|
Returns:
Type | Description |
---|---|
DataFrameType
|
The DataFrame or LazyFrame with reordered columns. |
Example
import polars as pl
from blueno.etl import reorder_columns_by_prefix
df = pl.DataFrame(
{
"dim_name": ["a", "b"],
"dim_age": [1, 2],
"fact_sales": [100, 200],
"fact_quantity": [5, 10],
}
)
reordered_df = reorder_columns_by_prefix(df, prefix_order=["pk_", "fk_"])
apply_scd_type_2 ¶
apply_scd_type_2(
source_df: DataFrameType,
target_df: DataFrameType,
primary_key_columns: List[str],
valid_from_column: str,
valid_to_column: str,
) -> DataFrameType
Applies Slowly Changing Dimension (SCD) Type 2 logic to merge source and target DataFrames.
SCD Type 2 maintains historical records by creating new rows for changed data while preserving the history through valid_from and valid_to dates.
The result maintains the full history of changes while ensuring proper date ranges for overlapping records.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_df
|
DataFrameType
|
The new/source DataFrame containing updated records. |
required |
target_df
|
DataFrameType
|
The existing/target DataFrame containing current records. |
required |
primary_key_columns
|
List[str]
|
Column(s) that uniquely identify each entity. |
required |
valid_from_column
|
str
|
Column name containing the validity start date. |
required |
valid_to_column
|
str
|
Column name containing the validity end date. |
required |
Returns:
Type | Description |
---|---|
DataFrameType
|
A DataFrame containing both current and historical records with updated validity periods. |
Example
import polars as pl
from blueno.etl import apply_scd_type_2
from datetime import datetime
# Create sample source and target dataframes
source_df = pl.DataFrame({
"customer_id": [1, 2],
"name": ["John Updated", "Jane Updated"],
"valid_from": [datetime(2024, 1, 1), datetime(2024, 1, 1)],
})
target_df = pl.DataFrame({
"customer_id": [1, 2],
"name": ["John", "Jane"],
"valid_from": [datetime(2023, 1, 1), datetime(2023, 1, 1)],
"valid_to": [None, None]
})
# Apply SCD Type 2
result_df = apply_scd_type_2(
source_df=source_df,
target_df=target_df,
primary_key_columns="customer_id",
valid_from_column="valid_from",
valid_to_column="valid_to"
)
print(result_df.sort("customer_id", "valid_from"))
"""
shape: (4, 4)
┌─────────────┬──────────────┬─────────────────────┬─────────────────────┐
│ customer_id ┆ name ┆ valid_from ┆ valid_to │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ datetime[μs] ┆ datetime[μs] │
╞═════════════╪══════════════╪═════════════════════╪═════════════════════╡
│ 1 ┆ John ┆ 2023-01-01 00:00:00 ┆ 2024-01-01 00:00:00 │
│ 1 ┆ John Updated ┆ 2024-01-01 00:00:00 ┆ null │
│ 2 ┆ Jane ┆ 2023-01-01 00:00:00 ┆ 2024-01-01 00:00:00 │
│ 2 ┆ Jane Updated ┆ 2024-01-01 00:00:00 ┆ null │
└─────────────┴──────────────┴─────────────────────┴─────────────────────┘
"""
Notes
- The function handles overlapping date ranges by adjusting valid_to dates
- NULL in valid_to indicates a currently active record
- Records in source_df will create new versions if they differ from target_df
- Historical records are preserved with appropriate valid_to dates
Load¶
Delta¶
upsert ¶
upsert(
table_or_uri: Union[str, DeltaTable],
df: DataFrameType,
key_columns: List[str],
update_exclusion_columns: Optional[List[str]] = None,
predicate_exclusion_columns: Optional[List[str]] = None,
) -> Dict[str, str]
Updates existing records and inserts new records into a Delta table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
Union[str, DeltaTable]
|
Path to the Delta table or a DeltaTable instance |
required |
df
|
DataFrameType
|
Data to upsert as a Polars DataFrame or LazyFrame |
required |
key_columns
|
List[str]
|
Column(s) that uniquely identify each record |
required |
update_exclusion_columns
|
Optional[List[str]]
|
Columns that should never be updated (e.g., created_at) |
None
|
predicate_exclusion_columns
|
Optional[List[str]]
|
Columns to ignore when checking for changes |
None
|
Returns:
Type | Description |
---|---|
Dict[str, str]
|
Dict containing merge operation statistics |
Example
from blueno.etl import upsert
import polars as pl
# Create sample data
data = pl.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
# Upsert data using 'id' as the key column
upsert("path/to/upsert_delta_table", data, key_columns=["id"])
overwrite ¶
overwrite(
table_or_uri: str | DeltaTable, df: DataFrameType
) -> None
Replaces all data in a Delta table with new data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str | DeltaTable
|
Path to the Delta table or a DeltaTable instance |
required |
df
|
DataFrameType
|
Data to write as a Polars DataFrame or LazyFrame |
required |
Example
from blueno.etl import overwrite
import polars as pl
# Create sample data
data = pl.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
# Replace entire table with new data
overwrite("path/to/overwrite_delta_table", data)
replace_range ¶
replace_range(
table_or_uri: str | DeltaTable,
df: DataFrameType,
range_column: str,
) -> None
Replaces data within a specific range in the Delta table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str | DeltaTable
|
Path to the Delta table or a DeltaTable instance |
required |
df
|
DataFrameType
|
Data to write as a Polars DataFrame or LazyFrame |
required |
range_column
|
str
|
Column used to define the range. Records in the table with values between the min and max of this column in df will be replaced |
required |
Example
from blueno.etl import replace_range
import polars as pl
# Create sample data for dates 2024-01-01 to 2024-01-31
data = pl.DataFrame({"date": ["2024-01-01", "2024-01-31"], "value": [100, 200]})
# Replace all records between Jan 1-31
replace_range("path/to/replace_range_delta_table", data, range_column="date")
append ¶
append(
table_or_uri: str | DeltaTable, df: DataFrameType
) -> None
Appends the provided dataframe to the Delta table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str | DeltaTable
|
The URI of the target Delta table |
required |
df
|
DataFrameType
|
The dataframe to append to the Delta table |
required |
Example
from blueno.etl import append
import polars as pl
# Create sample data
data = pl.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
# Append data to table
append("path/to/append_delta_table", data)
incremental ¶
incremental(
table_or_uri: str | DeltaTable,
df: DataFrameType,
incremental_column: str,
) -> None
Appends only new records based on an incremental column value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_or_uri
|
str | DeltaTable
|
Path to the Delta table or a DeltaTable instance |
required |
df
|
DataFrameType
|
Data to append as a Polars DataFrame or LazyFrame |
required |
incremental_column
|
str
|
Column used to identify new records. Only records where this column's value is greater than the maximum value in the existing table will be appended |
required |
Example
from blueno.etl import incremental
import polars as pl
# Create sample data
data = pl.DataFrame({"timestamp": ["2024-05-24T10:00:00"], "value": [100]})
# Append only records newer than existing data
incremental("path/to/incremental_delta_table", data, incremental_column="timestamp")
DuckLake¶
Parquet¶
write_parquet ¶
write_parquet(
uri: str,
df: DataFrameType,
partition_by: Optional[List[str]] = None,
) -> None
Overwrites the entire parquet file or directory (if using partition_by
) with the provided dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri
|
str
|
The file or directory URI to write to. This should be a path if using |
required |
df
|
DataFrameType
|
The dataframe to write |
required |
partition_by
|
Optional[List[str]]
|
Column(s) to partition by |
None
|
Example
from blueno.etl import write_parquet
import polars as pl
# Create sample data with dates
data = pl.DataFrame(
{"year": [2024, 2024, 2024], "month": [1, 2, 3], "value": [100, 200, 300]}
)
# Write data partitioned by year and month
write_parquet(uri="path/to/parquet", df=data, partition_by=["year", "month"])