DuckDB Utilities
Quick Start
from msfabricutils import FabricDuckDBConnection, get_onelake_access_token
#Initialize connection
access_token = get_onelake_access_token()
conn = FabricDuckDBConnection(access_token=access_token)
# Register lakehouses from different workspaces
conn.register_workspace_lakehouses(
workspace_id='12345678-1234-5678-1234-567812345678', # Sales workspace
lakehouses=['sales']
)
conn.register_workspace_lakehouses(
workspace_id='87654321-8765-4321-8765-432187654321', # Marketing workspace
lakehouses=['marketing']
)
# Query across workspaces using fully qualified names
df = conn.sql("""
SELECT
c.customer_id,
c.name,
c.region,
s.segment,
s.lifetime_value
FROM sales_workspace.sales.main.customers c
JOIN marketing_workspace.marketing.main.customer_segments s
ON c.customer_id = s.customer_id
WHERE c.region = 'EMEA'
""").df()
Table Name Formats
The connection wrapper will automatically resolve table names to the fully qualified name, if there is no ambiguity between registed tables.
- 4-part: workspace.lakehouse.schema.table
- 3-part: lakehouse.schema.table
- 2-part: lakehouse.table or schema.table
- 1-part: table
FabricDuckDBConnection
A DuckDB connection wrapper for Microsoft Fabric Lakehouses.
Provides a seamless interface between DuckDB and Microsoft Fabric Lakehouses, allowing SQL queries across multiple lakehouses - even lakehouses across workspaces with automatic table registration.
Features
- Automatic table registration from Fabric lakehouses
- Cross-lakehouse and cross-workspace querying
- Token-based authentication
- Delta Lake table support
Parameters:
Name | Type | Description | Default |
---|---|---|---|
access_token
|
str
|
The Microsoft Fabric access token for authentication.
In a notebook, use |
required |
config
|
dict
|
DuckDB configuration options. Defaults to {}. |
{}
|
Example:
# Initialize connection
access_token = notebookutils.credentials.getToken('storage')
conn = FabricDuckDBConnection(access_token=access_token)
# Register lakehouses from different workspaces
conn.register_workspace_lakehouses(
workspace_id='12345678-1234-5678-1234-567812345678',
lakehouses=['sales', 'marketing']
)
conn.register_workspace_lakehouses(
workspace_id='87654321-8765-4321-8765-432187654321',
lakehouses=['marketing']
)
# Query across workspaces using fully qualified names
df = conn.sql("""
SELECT
c.customer_id,
c.name,
c.region,
s.segment,
s.lifetime_value
FROM sales_workspace.sales.main.customers c
JOIN marketing_workspace.marketing.main.customer_segments s
ON c.customer_id = s.customer_id
WHERE c.region = 'EMEA'
""").df()
refresh_access_token
refresh_access_token(access_token: str)
Refresh the access token for all registered lakehouses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
access_token
|
str
|
The new access token to use |
required |
Example:
# Initialize connection
conn = FabricDuckDBConnection(access_token='old_token')
# When token expires, refresh it
new_token = notebookutils.credentials.getToken('storage')
conn.refresh_access_token(new_token)
register_workspace_lakehouses
register_workspace_lakehouses(workspace_id: str, lakehouses: str | list[str] = None)
Register one or more lakehouses from a workspace for querying.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace_id
|
str
|
The ID of the Microsoft Fabric workspace |
required |
lakehouses
|
str | list[str]
|
Name(s) of lakehouse(s) to register. |
None
|
Raises:
Type | Description |
---|---|
Exception
|
If a lakehouse uses the schema-enabled preview feature |
Example:
# Initialize connection with access token
access_token = notebookutils.credentials.getToken('storage')
conn = FabricDuckDBConnection(access_token=access_token)
# Register a single lakehouse
conn.register_workspace_lakehouses(
workspace_id='12345678-1234-5678-1234-567812345678',
lakehouses='sales_lakehouse'
)
# Register multiple lakehouses
conn.register_workspace_lakehouses(
workspace_id='12345678-1234-5678-1234-567812345678',
lakehouses=['sales_lakehouse', 'marketing_lakehouse']
)
print_lakehouse_catalog
print_lakehouse_catalog()
Print a hierarchical view of all registered lakehouses, schemas, and tables.
Example:
conn.print_lakehouse_catalog()
📁 Database: workspace1.sales_lakehouse
└─📂 Schema: main
├─📄 customers
├─📄 orders
└─📄 products
📁 Database: workspace1.marketing_lakehouse
└─📂 Schema: main
├─📄 campaigns
└─📄 customer_segments
write
write(df: Any, full_table_name: str, workspace_id: str = None, workspace_name: str = None, *args: Any, **kwargs: Any)
Write a DataFrame to a Fabric Lakehouse table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
Any
|
The DataFrame to write |
required |
full_table_name
|
str
|
Table name in format ' |
required |
workspace_id
|
str
|
The workspace ID. Required if multiple workspaces are registered |
None
|
workspace_name
|
str
|
The workspace name. Alternative to workspace_id |
None
|
*args
|
Any
|
Additional positional arguments passed to write_deltalake |
()
|
**kwargs
|
Any
|
Additional keyword arguments passed to write_deltalake. Commonly used kwargs: mode (str): 'error' or 'overwrite'. Defaults to 'error' partition_by (list[str]): Columns to partition by |
{}
|
Raises:
Type | Description |
---|---|
Exception
|
If table_name format is invalid or workspace cannot be determined |