Skip to content

Blueprint

Blueprint

Create a decorator for the Blueprint.

A blueprint is a function that takes any number of blueprints (or zero) and returns a dataframe. In addition, blueprint-information registered to know how to write the dataframe to a target table.

Parameters:

Name Type Description Default
name Optional[str]

The name of the blueprint. If not provided, the name of the function will be used. The name must be unique across all blueprints.

None
table_uri Optional[str]

The URI of the target table. If not provided, the blueprint will not be stored as a table.

None
schema Optional[Schema]

The schema of the output dataframe. If provided, transformation function will be validated against this schema.

None
primary_keys Optional[List[str]]

The primary keys of the target table. Is required for upsert naive_upsert and scd2_by_column write_mode.

None
partition_by Optional[List[str]]

The columns to partition the of the target table by.

None
incremental_column Optional[str]

The incremental column for the target table. Is required for incremental and safe_append write mode.

None
scd2_column Optional[str]

The name of the sequence column used for SCD2. Is required for scd2_by_column and scd2_by_time write mode.

None
write_mode str

The write method to use. Defaults to overwrite. Options are: append, safe_append, overwrite, upsert, naive_upsert incremental, replace_range, and scd2_by_column. - append: Appends all records from the source dataframe into the target. - safe_append: Filters the source dataframe on existing primary keys and incremental_column value and then appends. - upsert: Updates the target table if there are any changes on existing primary keys, and inserts records with primary keys which doesn't exist in the target table. - naive_upsert: Same as upsert, but skips checking for changes and performs a blind update. - incremental: Filters the source dataframe on the max value of the incremental_column value and then appends. - replace_range: Overwrites a range the target table between the minimum and the maximum value of the incremental_column value in the source dataframe. - scd2_by_column: Performs a SCD2 Type upsert where the validity periods created by the scd2_column column.

'overwrite'
format str

The format to use. Defaults to delta. Options are: delta, parquet, and dataframe. If dataframe is used, the blueprint will be stored in memory and not written to a target table.

'dataframe'
tags Optional[Dict[str, str]]

A dictionary of tags to apply to the blueprint. This can be used to group related blueprints by tag, and can be used to run a subset of blueprints based on tags.

None
post_transforms Optional[List[str]]

Optional list of post-transformation functions to apply after the main transformation. Options are: deduplicate, add_audit_columns, add_identity_column. These functions will be applied in the order they are provided.

None
deduplication_order_columns Optional[List[str]]

Optional list of columns to use for deduplication when post_transforms includes deduplicate.

None
priority int

Determines the execution order among activities ready to run. Higher values indicate higher scheduling preference, but dependencies and concurrency limits are still respected.

100
max_concurrency Optional[int]

Maximum number of parallel executions allowed when this job is running. When set, limits the global concurrency while this blueprint is running. This is useful for blueprints with high CPU or memory requirements. For example, setting max_concurrency=1 ensures this job runs serially, while still allowing other jobs to run in parallel. Higher priority jobs will be scheduled first when concurrent limits are reached.

None
freshness Optional[timedelta]

Optional freshness threshold for the blueprint. Only applicable if the format is delta. If set, the blueprint will only be processed if the delta table's last modification time is older than the freshness threshold. E.g., setting this to timedelta(hours=1) will ensure this blueprint is only run at most once an hour.

None
schedule Optional[str]

Optional cron style for schedule. If blueno runs at a time in the intervals of the schedule, it will be run. Otherwise it will be skipped. If not provided, blueprint will always be executed. Still respects freshness. For instance * * * * 1-5 will run Monday through Friday.

None
maintenance_schedule Optional[str]

Optional cron style for table maintenance. Table maintenance compacts and vacuums the delta table. If not provided, no maintenance will be executed. Maintenance will only run once during a cron interval. Setting this value to * 0-8 * * 6 will run maintenance on the first run on Saturdays between 0 and 8.

None
**kwargs

Additional keyword arguments to pass to the blueprint. This is used when extending the blueprint with custom attributes or methods.

{}

Simple example

from blueno import Blueprint, DataFrameType


@Blueprint.register(
    table_uri="/path/to/stage/customer",
    format="delta",
    primary_keys=["customer_id"],
    write_mode="overwrite",
)
def stage_customer(self: Blueprint, bronze_customer: DataFrameType) -> DataFrameType:
    # Deduplicate customers
    df = bronze_customer.unique(subset=self.primary_keys)

    return df

Full example

from blueno import Blueprint, DataFrameType
from datetime import timedelta


@Blueprint.register(
    name="gold_customer",
    table_uri="/path/to/gold/customer",
    primary_keys=["customer_id", "site_id"],
    partition_by=["year", "month", "day"],
    incremental_column="order_timestamp",
    scd2_column="modified_timestamp",
    write_mode="upsert",
    format="delta",
    tags={
        "owner": "Alice Wonderlands",
        "project": "customer_360",
        "pii": "true",
        "business_unit": "finance",
    },
    post_transforms=[
        "deduplicate",
        "add_audit_columns",
        "apply_scd2_by_column",
    ],
    deduplication_order_columns=["modified_timestamp"],
    priority=110,
    max_concurrency=2,
    # Will only run if last materialization is more than 1 day ago.
    freshness=timedelta(days=1),
    # Runs if blueno runs on Mondays through Fridays - but only once a day at maximum due to `freshness` setting.
    schedule="* * * * 1-5",
    # Runs maintenance if blueno is run between 22 and 23 on Saturdays.
    maintenance_schedule="* 22 * 6 *",
)
def gold_customer(self: Blueprint, silver_customer: DataFrameType) -> DataFrameType:
    # Some advanced business logic
    df = silver_customer.with_columns(...)

    return df

Task

Create a definition for task.

A task can be anything and doesn't need to provide an output.

Parameters:

Name Type Description Default
name Optional[str]

The name of the blueprint. If not provided, the name of the function will be used. The name must be unique across all jobs.

None
tags Optional[Dict[str, str]]

A dictionary of tags to apply to the blueprint. This can be used to group related jobs by tag, and can be used to run a subset of jobs based on tags.

None
priority int

Determines the execution order among activities ready to run. Higher values indicate higher scheduling preference, but dependencies and concurrency limits are still respected.

100

Simple example

Creates a task for the notify_end_task, which is depends on a gold blueprint.

from blueno import Blueprint, Task
import logging

logger = logging.getLogger(__name__)


@Task.register()
def notify_end_task(gold_metrics: Blueprint) -> None:
    logger.info("Gold metrics ran successfully")

    # Send message on Slack