Blueprint¶
Blueprint¶
Create a decorator for the Blueprint.
A blueprint is a function that takes any number of blueprints (or zero) and returns a dataframe. In addition, blueprint-information registered to know how to write the dataframe to a target table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
Optional[str]
|
The name of the blueprint. If not provided, the name of the function will be used. The name must be unique across all blueprints. |
None
|
table_uri
|
Optional[str]
|
The URI of the target table. If not provided, the blueprint will not be stored as a table. |
None
|
schema
|
Optional[Schema]
|
The schema of the output dataframe. If provided, transformation function will be validated against this schema. |
None
|
primary_keys
|
Optional[List[str]]
|
The primary keys of the target table. Is required for |
None
|
partition_by
|
Optional[List[str]]
|
The columns to partition the of the target table by. |
None
|
incremental_column
|
Optional[str]
|
The incremental column for the target table. Is required for |
None
|
scd2_column
|
Optional[str]
|
The name of the sequence column used for SCD2. Is required for |
None
|
write_mode
|
str
|
The write method to use. Defaults to |
'overwrite'
|
format
|
str
|
The format to use. Defaults to |
'dataframe'
|
tags
|
Optional[Dict[str, str]]
|
A dictionary of tags to apply to the blueprint. This can be used to group related blueprints by tag, and can be used to run a subset of blueprints based on tags. |
None
|
post_transforms
|
Optional[List[str]]
|
Optional list of post-transformation functions to apply after the main transformation. Options are: |
None
|
deduplication_order_columns
|
Optional[List[str]]
|
Optional list of columns to use for deduplication when |
None
|
priority
|
int
|
Determines the execution order among activities ready to run. Higher values indicate higher scheduling preference, but dependencies and concurrency limits are still respected. |
100
|
max_concurrency
|
Optional[int]
|
Maximum number of parallel executions allowed when this job is running. When set, limits the global concurrency while this blueprint is running. This is useful for blueprints with high CPU or memory requirements. For example, setting max_concurrency=1 ensures this job runs serially, while still allowing other jobs to run in parallel. Higher priority jobs will be scheduled first when concurrent limits are reached. |
None
|
freshness
|
Optional[timedelta]
|
Optional freshness threshold for the blueprint.
Only applicable if the format is |
None
|
schedule
|
Optional[str]
|
Optional cron style for schedule.
If blueno runs at a time in the intervals of the schedule, it will be run. Otherwise it will be skipped.
If not provided, blueprint will always be executed.
Still respects freshness. For instance |
None
|
maintenance_schedule
|
Optional[str]
|
Optional cron style for table maintenance.
Table maintenance compacts and vacuums the delta table.
If not provided, no maintenance will be executed.
Maintenance will only run once during a cron interval. Setting this value to |
None
|
**kwargs
|
Additional keyword arguments to pass to the blueprint. This is used when extending the blueprint with custom attributes or methods. |
{}
|
Simple example
from blueno import Blueprint, DataFrameType
@Blueprint.register(
table_uri="/path/to/stage/customer",
format="delta",
primary_keys=["customer_id"],
write_mode="overwrite",
)
def stage_customer(self: Blueprint, bronze_customer: DataFrameType) -> DataFrameType:
# Deduplicate customers
df = bronze_customer.unique(subset=self.primary_keys)
return df
Full example
from blueno import Blueprint, DataFrameType
from datetime import timedelta
@Blueprint.register(
name="gold_customer",
table_uri="/path/to/gold/customer",
primary_keys=["customer_id", "site_id"],
partition_by=["year", "month", "day"],
incremental_column="order_timestamp",
scd2_column="modified_timestamp",
write_mode="upsert",
format="delta",
tags={
"owner": "Alice Wonderlands",
"project": "customer_360",
"pii": "true",
"business_unit": "finance",
},
post_transforms=[
"deduplicate",
"add_audit_columns",
"apply_scd2_by_column",
],
deduplication_order_columns=["modified_timestamp"],
priority=110,
max_concurrency=2,
# Will only run if last materialization is more than 1 day ago.
freshness=timedelta(days=1),
# Runs if blueno runs on Mondays through Fridays - but only once a day at maximum due to `freshness` setting.
schedule="* * * * 1-5",
# Runs maintenance if blueno is run between 22 and 23 on Saturdays.
maintenance_schedule="* 22 * 6 *",
)
def gold_customer(self: Blueprint, silver_customer: DataFrameType) -> DataFrameType:
# Some advanced business logic
df = silver_customer.with_columns(...)
return df
Task¶
Create a definition for task.
A task can be anything and doesn't need to provide an output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
Optional[str]
|
The name of the blueprint. If not provided, the name of the function will be used. The name must be unique across all jobs. |
None
|
tags
|
Optional[Dict[str, str]]
|
A dictionary of tags to apply to the blueprint. This can be used to group related jobs by tag, and can be used to run a subset of jobs based on tags. |
None
|
priority
|
int
|
Determines the execution order among activities ready to run. Higher values indicate higher scheduling preference, but dependencies and concurrency limits are still respected. |
100
|
Simple example
Creates a task for the notify_end_task
, which is depends on a gold blueprint.
from blueno import Blueprint, Task
import logging
logger = logging.getLogger(__name__)
@Task.register()
def notify_end_task(gold_metrics: Blueprint) -> None:
logger.info("Gold metrics ran successfully")
# Send message on Slack