Quick start¶
If you just want to get started, you can download the download an example and run it.
Installation¶
First install the tool:
$ pip install blueno
---> 100%
Successfully installed blueno
$ uv add blueno
---> 100%
Successfully installed blueno
Create a project¶
Copy the example and save it to a folder, e.g. blueprints/example.py
.
Using the medallion architecture the example consists of:
- three mocked bronze source tables
- three silver tables
- and a single gold table.
import random
import time
import polars as pl
from blueno import Blueprint, DataFrameType, blueprint
RAND_SIZE = 10
@blueprint(table_uri="lakehouse/bronze/product")
def bronze_product() -> DataFrameType:
df = pl.DataFrame(
{
"product_id": [1, 2, 3],
"product_name": ["ball", "bat", "tent"],
"price": [4.99, 9.99, 29.99],
}
)
time.sleep(random.random() * RAND_SIZE)
return df
@blueprint(table_uri="lakehouse/bronze/transaction")
def bronze_transaction() -> DataFrameType:
df = pl.DataFrame(
{
"product_id": [3, 2, 1, 1],
"transaction_date": ["2025-01-01", "2025-01-02", "2025-01-03", "2025-01-04"],
"quantity": [2, 1, 1, 0],
}
)
time.sleep(random.random() * RAND_SIZE)
return df
@blueprint(
table_uri="lakehouse/silver/product",
primary_keys=["product_id"],
)
def silver_product(self: Blueprint, bronze_product: DataFrameType) -> DataFrameType:
df = bronze_product.unique(subset=self.primary_keys)
time.sleep(random.random() * RAND_SIZE)
return df
@blueprint(
table_uri="lakehouse/silver/transaction",
primary_keys=["product_id"],
)
def silver_transaction(bronze_transaction: DataFrameType) -> DataFrameType:
df = bronze_transaction.filter(pl.col("quantity") > 0)
time.sleep(random.random() * RAND_SIZE)
return df
@blueprint(
table_uri="lakehouse/gold/sales_metric",
write_mode="incremental",
incremental_column="transaction_date",
)
def gold_sales_metric(
silver_transaction: DataFrameType,
silver_product: DataFrameType,
) -> DataFrameType:
df = (
silver_transaction
.join(silver_product, on="product_id", how="left")
.group_by(
"transaction_date",
"product_id",
)
.agg(
pl.sum("quantity").alias("total_quantity"),
(pl.col("quantity") * pl.col("price")).sum().alias("total_sales"),
)
)
time.sleep(random.random() * RAND_SIZE)
return df
Run the blueprints¶
Run using one of the options below by pointing the project directory to the folder you saved the example.py
to.
blueno run --project-dir ./blueprints --concurrency 2
Create another python file, i.e. main.py
, and run it.
from blueno.cli import run
run(project_dir="./blueprints", concurrency=2)
You should see a table with the run status of the blueprints in the DAG.
Result¶
Once complete, you can navigate to the created lakehouse folder and discover what the was created.
The final folder structure should look something like this
.
├── blueno.log
├── blueprints
│ └── example.py
├── lakehouse
│ ├── bronze
│ │ ├── product
│ │ │ ├── _delta_log
│ │ │ │ ├── 00000000000000000000.json
│ │ │ │ └── 00000000000000000001.json
│ │ │ └── part-00001-00885a2d-23b9-46cb-9d78-fd0e73200e00-c000.snappy.parquet
│ │ └── transaction
│ │ ├── _delta_log
│ │ │ ├── 00000000000000000000.json
│ │ │ └── 00000000000000000001.json
│ │ └── part-00001-13268cd8-b475-4d54-a52f-37d86c1858e2-c000.snappy.parquet
│ ├── gold
│ │ └── sales_metric
│ │ ├── _delta_log
│ │ │ ├── 00000000000000000000.json
│ │ │ └── 00000000000000000001.json
│ │ └── part-00001-0e8a37a1-63c0-4a43-abc8-748518ad7463-c000.snappy.parquet
│ └── silver
│ ├── product
│ │ ├── _delta_log
│ │ │ ├── 00000000000000000000.json
│ │ │ └── 00000000000000000001.json
│ │ └── part-00001-aa5dba8e-426a-438e-b5a0-7e1c05208b31-c000.snappy.parquet
│ └── transaction
│ ├── _delta_log
│ │ ├── 00000000000000000000.json
│ │ └── 00000000000000000001.json
│ └── part-00001-aa0a9af5-860d-4c10-8fad-362eff6c8415-c000.snappy.parquet
└── main.py