Skip to content

Blueprint

Blueprint

Create a definition for how to compute a blueprint.

A blueprint is a function that takes any number of blueprints (or zero) and returns a dataframe. In addition, blueprint-information registered to know how to write the dataframe to a target table.

Parameters:

Name Type Description Default
name Optional[str]

The name of the blueprint. If not provided, the name of the function will be used. The name must be unique across all blueprints.

None
table_uri Optional[str]

The URI of the target table. If not provided, the blueprint will not be stored as a table.

None
schema Optional[Schema]

The schema of the output dataframe. If provided, transformation function will be validated against this schema.

None
primary_keys Optional[List[str]]

The primary keys of the target table. Is required for upsert and scd2 write_mode.

None
partition_by Optional[List[str]]

The columns to partition the of the target table by.

None
incremental_column Optional[str]

The incremental column for the target table. Is required for incremental write mode.

None
valid_from_column Optional[str]

The name of the valid from column. Is required for scd2 write mode.

None
valid_to_column Optional[str]

The name of the valid to column. Is required for scd2 write mode.

None
write_mode str

The write method to use. Defaults to overwrite. Options are: append, overwrite, upsert, incremental, replace_range, and scd2.

'overwrite'
format str

The format to use. Defaults to delta. Options are: delta, parquet, and dataframe. If dataframe is used, the blueprint will be stored in memory and not written to a target table.

'dataframe'
priority int

Determines the execution order among activities ready to run. Higher values indicate higher scheduling preference, but dependencies and concurrency limits are still respected.

100
max_concurrency Optional[int]

Maximum number of parallel executions allowed when this job is running. When set, limits the global concurrency while this blueprint is running. This is useful for blueprints with high CPU or memory requirements. For example, setting max_concurrency=1 ensures this job runs serially, while still allowing other jobs to run in parallel. Higher priority jobs will be scheduled first when concurrent limits are reached.

None
Example
from blueno import blueprint, Blueprint, DataFrameType


@blueprint(
    table_uri="/path/to/stage/customer",
    primary_keys=["customer_id"],
    write_mode="overwrite",
)
def stage_customer(self: Blueprint, bronze_customer: DataFrameType) -> DataFrameType:
    # Deduplicate customers
    df = bronze_customers.unique(subset=self.primary_keys)

    return df

Task

Create a definition for task.

A task can be anything and doesn't need to provide an output.

Parameters:

Name Type Description Default
name Optional[str]

The name of the blueprint. If not provided, the name of the function will be used. The name must be unique across all blueprints.

None
priority int

Determines the execution order among activities ready to run. Higher values indicate higher scheduling preference, but dependencies and concurrency limits are still respected.

100
Example

Creates a task for the notify_end, which is depends on a gold blueprint.

from blueno import blueprint, Blueprint, task
import logging

logger = logging.getLogger(__name__)


@task
def notify_end(gold_metrics: Blueprint) -> None:
    logger.info("Gold metrics ran successfully")

    # Send message on Slack