Skip to content

DuckDB Utilities

Quick Start

from msfabricutils import FabricDuckDBConnection, get_onelake_access_token

#Initialize connection
access_token = get_onelake_access_token()
conn = FabricDuckDBConnection(access_token=access_token)

# Register lakehouses from different workspaces
conn.register_workspace_lakehouses(
    workspace_id='12345678-1234-5678-1234-567812345678', # Sales workspace
    lakehouses=['sales']
)

conn.register_workspace_lakehouses(
    workspace_id='87654321-8765-4321-8765-432187654321', # Marketing workspace
    lakehouses=['marketing']
)

# Query across workspaces using fully qualified names
df = conn.sql("""
    SELECT
        c.customer_id,
        c.name,
        c.region,
        s.segment,
        s.lifetime_value
    FROM sales_workspace.sales.main.customers c
    JOIN marketing_workspace.marketing.main.customer_segments s
    ON c.customer_id = s.customer_id
    WHERE c.region = 'EMEA'
""").df()

Table Name Formats

The connection wrapper will automatically resolve table names to the fully qualified name, if there is no ambiguity between registed tables.

  • 4-part: workspace.lakehouse.schema.table
  • 3-part: lakehouse.schema.table
  • 2-part: lakehouse.table or schema.table
  • 1-part: table

FabricDuckDBConnection

A DuckDB connection wrapper for Microsoft Fabric Lakehouses.

Provides a seamless interface between DuckDB and Microsoft Fabric Lakehouses, allowing SQL queries across multiple lakehouses - even lakehouses across workspaces with automatic table registration.

Features
  • Automatic table registration from Fabric lakehouses
  • Cross-lakehouse and cross-workspace querying
  • Token-based authentication
  • Delta Lake table support

Parameters:

Name Type Description Default
access_token str

The Microsoft Fabric access token for authentication. In a notebook, use notebookutils.credentials.getToken('storage').

required
config dict

DuckDB configuration options. Defaults to {}.

{}

Example:

# Initialize connection
access_token = notebookutils.credentials.getToken('storage')
conn = FabricDuckDBConnection(access_token=access_token)

# Register lakehouses from different workspaces
conn.register_workspace_lakehouses(
    workspace_id='12345678-1234-5678-1234-567812345678',
    lakehouses=['sales', 'marketing']
)
conn.register_workspace_lakehouses(
    workspace_id='87654321-8765-4321-8765-432187654321',
    lakehouses=['marketing']
)

# Query across workspaces using fully qualified names
df = conn.sql("""
    SELECT
        c.customer_id,
        c.name,
        c.region,
        s.segment,
        s.lifetime_value
    FROM sales_workspace.sales.main.customers c
    JOIN marketing_workspace.marketing.main.customer_segments s
        ON c.customer_id = s.customer_id
        WHERE c.region = 'EMEA'
    """).df()

refresh_access_token

refresh_access_token(access_token: str)

Refresh the access token for all registered lakehouses.

Parameters:

Name Type Description Default
access_token str

The new access token to use

required

Example:

# Initialize connection
conn = FabricDuckDBConnection(access_token='old_token')

# When token expires, refresh it
new_token = notebookutils.credentials.getToken('storage')
conn.refresh_access_token(new_token)

register_workspace_lakehouses

register_workspace_lakehouses(workspace_id: str, lakehouses: str | list[str] = None)

Register one or more lakehouses from a workspace for querying.

Parameters:

Name Type Description Default
workspace_id str

The ID of the Microsoft Fabric workspace

required
lakehouses str | list[str]

Name(s) of lakehouse(s) to register.

None

Raises:

Type Description
Exception

If a lakehouse uses the schema-enabled preview feature

Example:

# Initialize connection with access token
access_token = notebookutils.credentials.getToken('storage')
conn = FabricDuckDBConnection(access_token=access_token)

# Register a single lakehouse
conn.register_workspace_lakehouses(
    workspace_id='12345678-1234-5678-1234-567812345678',
    lakehouses='sales_lakehouse'
)

# Register multiple lakehouses
conn.register_workspace_lakehouses(
    workspace_id='12345678-1234-5678-1234-567812345678',
    lakehouses=['sales_lakehouse', 'marketing_lakehouse']
)

print_lakehouse_catalog

print_lakehouse_catalog()

Print a hierarchical view of all registered lakehouses, schemas, and tables.

Example:

conn.print_lakehouse_catalog()
📁 Database: workspace1.sales_lakehouse
└─📂 Schema: main
    ├─📄 customers
    ├─📄 orders
    └─📄 products
📁 Database: workspace1.marketing_lakehouse
└─📂 Schema: main
    ├─📄 campaigns
    └─📄 customer_segments

write

write(df: Any, full_table_name: str, workspace_id: str = None, workspace_name: str = None, *args: Any, **kwargs: Any)

Write a DataFrame to a Fabric Lakehouse table.

Parameters:

Name Type Description Default
df Any

The DataFrame to write

required
full_table_name str

Table name in format '.'

required
workspace_id str

The workspace ID. Required if multiple workspaces are registered

None
workspace_name str

The workspace name. Alternative to workspace_id

None
*args Any

Additional positional arguments passed to write_deltalake

()
**kwargs Any

Additional keyword arguments passed to write_deltalake. Commonly used kwargs: mode (str): 'error' or 'overwrite'. Defaults to 'error' partition_by (list[str]): Columns to partition by

{}

Raises:

Type Description
Exception

If table_name format is invalid or workspace cannot be determined