diff --git a/skills/README.md b/skills/README.md index 488085c01..13a76b3ad 100644 --- a/skills/README.md +++ b/skills/README.md @@ -57,6 +57,7 @@ Once installed, invoke a skill by name in your conversation: |-------|-------------| | [wren-usage](wren-usage/SKILL.md) | **Primary skill** — CLI workflow guide: query data via `wren --sql`, gather schema context with `wren memory`, store/recall queries, handle errors | | [wren-generate-mdl](wren-generate-mdl/SKILL.md) | Generate a Wren MDL project from a live database — schema discovery, type normalization, YAML generation | +| [wren-dlt-connector](wren-dlt-connector/SKILL.md) | Connect SaaS data (HubSpot, Stripe, Salesforce, etc.) via dlt pipelines into DuckDB, then auto-generate a Wren project | ### wren-usage reference files diff --git a/skills/SKILLS.md b/skills/SKILLS.md index df992e108..0362db3bc 100644 --- a/skills/SKILLS.md +++ b/skills/SKILLS.md @@ -57,6 +57,27 @@ Generates a Wren MDL project by exploring a live database using whatever tools a --- +## wren-dlt-connector + +**File:** [wren-dlt-connector/SKILL.md](wren-dlt-connector/SKILL.md) + +Connects SaaS data (HubSpot, Stripe, Salesforce, GitHub, Slack, etc.) to Wren Engine for SQL analysis. Walks through the full flow: install dlt, pick a SaaS source, set up credentials, run the data pipeline into DuckDB, then auto-generate a Wren semantic project from the loaded data. + +### When to use + +- Connecting SaaS data sources (HubSpot, Stripe, Salesforce, GitHub, Slack, etc.) +- Importing data from an API via dlt pipelines +- Loading SaaS data into DuckDB for SQL analysis +- Creating a Wren project from an existing dlt-produced DuckDB file + +### Dependent skills + +| Skill | Purpose | +|-------|---------| +| `wren-generate-mdl` | Generate or regenerate MDL from the DuckDB database | + +--- + ## Installing a skill ```bash diff --git a/skills/index.json b/skills/index.json index 3794c0756..758b3be9b 100644 --- a/skills/index.json +++ b/skills/index.json @@ -5,6 +5,27 @@ "repository": "https://github.com/Canner/wren-engine", "license": "Apache-2.0", "skills": [ + { + "name": "wren-dlt-connector", + "version": "1.0", + "description": "Connect SaaS data (HubSpot, Stripe, Salesforce, GitHub, Slack, etc.) to Wren Engine for SQL analysis via dlt pipelines into DuckDB, then auto-generate a Wren semantic project.", + "tags": [ + "wren", + "dlt", + "saas", + "duckdb", + "pipeline", + "hubspot", + "stripe", + "salesforce", + "github", + "slack" + ], + "dependencies": [ + "wren-generate-mdl" + ], + "repository": "https://github.com/Canner/wren-engine/tree/main/skills/wren-dlt-connector" + }, { "name": "wren-generate-mdl", "version": "2.1", diff --git a/skills/install.sh b/skills/install.sh index 92f53057a..0e0ac62d4 100755 --- a/skills/install.sh +++ b/skills/install.sh @@ -13,7 +13,7 @@ set -euo pipefail REPO="Canner/wren-engine" BRANCH="${WREN_SKILLS_BRANCH:-main}" DEST="${CLAUDE_SKILLS_DIR:-$HOME/.claude/skills}" -ALL_SKILLS=(wren-generate-mdl wren-usage) +ALL_SKILLS=(wren-dlt-connector wren-generate-mdl wren-usage) # Parse --force flag and skill list from arguments FORCE=false @@ -51,8 +51,17 @@ fi # Locate index.json for dependency resolution (local or remote) INDEX_JSON="" +INDEX_JSON_TMP="" if [ -n "$SCRIPT_DIR" ] && [ -f "$SCRIPT_DIR/index.json" ]; then INDEX_JSON="$SCRIPT_DIR/index.json" +elif command -v curl &>/dev/null; then + INDEX_JSON_TMP="$(mktemp)" + if curl -fsSL "https://raw.githubusercontent.com/$REPO/$BRANCH/skills/index.json" -o "$INDEX_JSON_TMP" 2>/dev/null; then + INDEX_JSON="$INDEX_JSON_TMP" + else + rm -f "$INDEX_JSON_TMP" + INDEX_JSON_TMP="" + fi fi # Expand SELECTED_SKILLS to include dependencies declared in index.json. @@ -153,7 +162,7 @@ else echo "Destination: $DEST" echo "" tmpdir=$(mktemp -d) - trap 'rm -rf "$tmpdir"' EXIT + trap 'rm -rf "$tmpdir"; [ -n "${INDEX_JSON_TMP:-}" ] && rm -f "$INDEX_JSON_TMP"' EXIT extract_paths=() for skill in "${SELECTED_SKILLS[@]}"; do diff --git a/skills/versions.json b/skills/versions.json index ba5b4acd4..ad0e95415 100644 --- a/skills/versions.json +++ b/skills/versions.json @@ -1,4 +1,5 @@ { + "wren-dlt-connector": "1.0", "wren-generate-mdl": "2.1", "wren-usage": "2.1" } diff --git a/skills/wren-dlt-connector/SKILL.md b/skills/wren-dlt-connector/SKILL.md new file mode 100644 index 000000000..0aac037ae --- /dev/null +++ b/skills/wren-dlt-connector/SKILL.md @@ -0,0 +1,269 @@ +--- +name: wren-dlt-connector +description: "Connect SaaS data (HubSpot, Stripe, Salesforce, GitHub, Slack, etc.) to Wren Engine for SQL analysis. Guides the user through the full flow: install dlt, pick a SaaS source, set up credentials, run the data pipeline into DuckDB, then auto-generate a Wren semantic project from the loaded data. Use this skill whenever the user mentions: connecting SaaS data, importing data from an API, dlt pipelines, loading HubSpot/Stripe/Salesforce/GitHub/Slack data, querying SaaS data with SQL, or setting up a new data source from a REST API. Also trigger when the user already has a dlt-produced DuckDB file and wants to create a Wren project from it." +license: Apache-2.0 +metadata: + author: wren-engine + version: "1.0" +--- + +# wren-dlt-connector + +Connect SaaS data to Wren Engine for SQL analysis — from zero to a verified, queryable project in one conversation. + +## Who this is for + +Data analysts who know SQL and some Python, but may not have used dlt or Wren before. Explain concepts briefly when they first appear, but don't over-explain things a SQL-literate person would already know. + +## Overview + +This skill walks through a four-phase workflow: + +1. **Extract** — Use dlt (data load tool) to pull data from a SaaS API into a local DuckDB file +2. **Model** — Introspect the DuckDB schema and auto-generate a Wren semantic project (YAML models, relationships, profile) +3. **Build & Verify** — Build the project and run actual SQL queries to confirm everything works end-to-end +4. **Handoff** — Show the user their data and next steps + +The user might enter at any phase. Ask which phase they're starting from — they may already have a `.duckdb` file and just need phases 2–4. + +**The goal is a project that actually queries successfully, not just files that look correct.** Always run the verification step before declaring success. + +## Critical: DuckDB catalog naming + +When wren engine connects to a DuckDB file, it ATTACHes it using the filename (without `.duckdb` extension) as the catalog alias: + +``` +ATTACH DATABASE 'stripe_data.duckdb' AS "stripe_data" (READ_ONLY) +``` + +This means **every model's `table_reference.catalog` must equal the DuckDB filename stem**. If the file is `hubspot.duckdb`, the catalog is `hubspot`. If it's `my_pipeline.duckdb`, the catalog is `my_pipeline`. + +Getting this wrong causes "table not found" errors at query time. The `introspect_dlt.py` script handles this automatically. + +## Critical: Type normalization + +Column types must be normalized using wren SDK's `type_mapping.parse_type()` function, which uses sqlglot to convert database-specific types (like DuckDB's `HUGEINT`, `TIMESTAMP WITH TIME ZONE`) into canonical SQL types that wren-core understands. Do not hardcode type mappings — always delegate to `parse_type(raw_type, "duckdb")`. + +The `introspect_dlt.py` script does this automatically when wren SDK is installed. + +## Phase 1: Extract — dlt Pipeline Setup + +### Step 1: Pick the SaaS source + +Ask the user which SaaS service they want to connect. Read `references/dlt_sources.md` for a list of popular verified sources and their auth requirements. If the source isn't listed, check whether dlt has a verified source for it by searching `dlthub.com/docs/dlt-ecosystem/verified-sources`. + +### Step 2: Install dlt + +```bash +pip install "dlt[duckdb]" --break-system-packages +``` + +### Step 3: Write the pipeline script + +Create a Python script that: +1. Imports the dlt source function for the chosen SaaS +2. Configures the pipeline with `destination='duckdb'` and a local file path +3. Runs the pipeline with `pipeline.run(source)` + +Here's the general pattern — adapt it per source (check `references/dlt_sources.md` for source-specific templates): + +```python +import dlt + +pipeline = dlt.pipeline( + pipeline_name="_pipeline", + destination="duckdb", + dataset_name="_data", +) + +# Source-specific: check references/dlt_sources.md for auth patterns +source = (api_key=dlt.secrets.value) + +info = pipeline.run(source) +print(info) +``` + +### Step 4: Set up credentials + +dlt reads credentials from environment variables or `.dlt/secrets.toml`. The simplest approach for a one-time run: + +```bash +# Set the credential as an environment variable +# The exact variable name depends on the source — check references/dlt_sources.md +export SOURCES____API_KEY="the-actual-key" +``` + +Ask the user for their API key or token. Remind them: +- Never commit credentials to git +- Environment variables are the simplest way for a one-time run +- For repeated use, they can create `.dlt/secrets.toml` + +### Step 5: Run the pipeline + +```bash +python .py +``` + +After the run, confirm: +1. The pipeline completed without errors +2. A `.duckdb` file was created (usually at `.duckdb`) +3. Print discovered tables and their column counts + +```python +import duckdb +con = duckdb.connect(".duckdb", read_only=True) +for row in con.execute(""" + SELECT table_schema, table_name, + (SELECT COUNT(*) FROM information_schema.columns c + WHERE c.table_schema = t.table_schema AND c.table_name = t.table_name) as col_count + FROM information_schema.tables t + WHERE table_schema NOT IN ('information_schema', 'pg_catalog') + AND table_name NOT LIKE '_dlt_%' + ORDER BY table_schema, table_name +""").fetchall(): + print(f" {row[0]}.{row[1]} ({row[2]} columns)") +con.close() +``` + +## Phase 2: Model — Generate Wren Project + +Run the introspection script to auto-generate a complete Wren project from the DuckDB file: + +```bash +python /scripts/introspect_dlt.py \ + --duckdb-path \ + --output-dir \ + --project-name +``` + +This script: +- Connects to the DuckDB file (read-only) +- **Sets `table_reference.catalog` to the DuckDB filename stem** (matching wren engine's ATTACH behavior) +- Discovers all tables and columns via `information_schema` +- Filters out dlt internal tables (`_dlt_loads`, `_dlt_pipeline_state`, etc.) +- Filters out dlt metadata columns (`_dlt_id`, `_dlt_load_id`, `_dlt_list_idx`) from model definitions +- Detects parent-child relationships from `_dlt_parent_id` columns and table naming conventions +- **Normalizes column types using `wren.type_mapping.parse_type()`** (sqlglot-based) +- Generates a complete v2 YAML project (wren_project.yml, models/, relationships.yml, instructions.md) + +After running, show the user what was generated: + +```bash +# Show project summary +cat /wren_project.yml +echo "---" +ls /models/ +echo "---" +cat /relationships.yml +``` + +### Verify model correctness + +Spot-check one generated model to confirm: +1. `table_reference.catalog` matches the DuckDB filename (e.g., `stripe_data` for `stripe_data.duckdb`) +2. `table_reference.schema` matches the DuckDB schema (usually `main`) +3. No `_dlt_*` columns appear in the columns list +4. Column types look reasonable (VARCHAR, BIGINT, BOOLEAN, TIMESTAMP, etc.) + +### Set up the connection profile + +Create a Wren profile so the user can query without specifying connection details every time. The `url` must point to the **directory containing** the `.duckdb` file (not the file itself): + +```python +import yaml +from pathlib import Path + +wren_home = Path.home() / ".wren" +wren_home.mkdir(exist_ok=True) +profiles_file = wren_home / "profiles.yml" + +existing = (yaml.safe_load(profiles_file.read_text()) or {}) if profiles_file.exists() else {} +existing.setdefault("profiles", {}) + +profile_name = "_dlt" +existing["profiles"][profile_name] = { + "datasource": "duckdb", + "url": str(Path("").resolve().parent), + "format": "duckdb", +} +existing["active"] = profile_name + +profiles_file.write_text(yaml.dump(existing, default_flow_style=False, sort_keys=False)) +``` + +## Phase 3: Build & Verify — The Project Must Actually Work + +This phase is not optional. A project that generates YAML but fails at query time is not a success. + +### Step 1: Build the MDL + +```bash +cd +wren context build +``` + +This compiles the YAML models into `target/mdl.json`. If this fails, fix the issues before proceeding (see Troubleshooting below). + +### Step 2: Validate with a real query + +Run at least one query per generated model to confirm the project is functional: + +```bash +# For each model, verify it resolves correctly +wren --sql 'SELECT COUNT(*) as total FROM ""' +``` + +If any query fails, debug and fix the model before moving on. Common issues: +- Wrong catalog in table_reference → "table not found" +- Type mismatch → fix the column type in metadata.yml +- Missing profile → check `wren profile list` + +### Step 3: Run interesting queries + +Once basic queries pass, run 2–3 more interesting queries to show the user what their data looks like: + +```bash +# Preview data +wren --sql 'SELECT * FROM "" LIMIT 5' + +# If there's a relationship, verify both models are queryable +wren --sql 'SELECT * FROM "" LIMIT 5' +wren --sql 'SELECT * FROM "" LIMIT 5' +``` + +Show the results to the user and explain what they're seeing. This is their first look at the data through Wren — make it count. + +### Step 4: Confirm success + +Only after queries return real data, tell the user the setup is complete. Summarize: +- How many models were created +- What relationships were detected +- Which profile is active +- Example queries they can try next + +## Troubleshooting + +If `wren context build` fails: +- Check that `data_source: duckdb` is set in `wren_project.yml` +- Verify the DuckDB file path in the profile is correct +- Run `wren context validate` for detailed error messages + +If queries fail with "table not found": +- **Most likely cause:** `table_reference.catalog` doesn't match the DuckDB filename. If the file is `pipeline.duckdb`, the catalog must be `pipeline`, not empty string. +- Check the profile's `url` points to the directory containing the `.duckdb` file +- Table names with double underscores need quoting: `"hubspot__contacts"` + +If queries fail with type errors: +- Check column types in the model YAML — they should be canonical SQL types (VARCHAR, BIGINT, etc.) +- Re-run `introspect_dlt.py` with wren SDK installed to get proper type normalization + +General: +- Check that the profile is active: `wren profile list` +- The DuckDB file might be locked if a dlt pipeline is running — wait for it to finish + +## Important notes + +- dlt's `_dlt_parent_id` / `_dlt_id` columns are kept in the actual DuckDB tables but hidden from Wren model definitions. They're only used in relationship conditions. +- DuckDB has a single-writer limitation. Don't run a dlt sync while querying. For concurrent access, dlt should write to a separate file and swap atomically. +- The generated models use `table_reference` (not `ref_sql`) since they map directly to DuckDB tables created by dlt. +- Column types are normalized using wren SDK's `parse_type()` with sqlglot's DuckDB dialect. If a type looks wrong, the user can edit the model's `metadata.yml` directly. diff --git a/skills/wren-dlt-connector/evals/evals.json b/skills/wren-dlt-connector/evals/evals.json new file mode 100644 index 000000000..e2fc5ceea --- /dev/null +++ b/skills/wren-dlt-connector/evals/evals.json @@ -0,0 +1,23 @@ +{ + "skill_name": "wren-dlt-connector", + "evals": [ + { + "id": 1, + "prompt": "我想把公司 HubSpot CRM 的資料拉進來用 SQL 分析,可以幫我設定嗎?我有 HubSpot 的 private app token。", + "expected_output": "Should guide through: install dlt, write a HubSpot pipeline script, set credentials, run pipeline, introspect DuckDB, generate wren project, build, and run sample queries on contacts/deals tables.", + "files": [] + }, + { + "id": 2, + "prompt": "I already have a DuckDB file from a dlt pipeline at ./stripe_data.duckdb. I want to create a wren project so I can query the Stripe data with SQL. Can you set that up?", + "expected_output": "Should skip Phase 1 (dlt setup), go directly to introspecting the DuckDB file, generate wren project YAML, set up profile, build, and run sample queries.", + "files": [] + }, + { + "id": 3, + "prompt": "我們團隊用 GitHub 管理 open source project,我想定期把 issues 和 PR 資料抓下來做分析。怎麼開始?", + "expected_output": "Should guide through: install dlt, configure GitHub access token, write pipeline for github source (issues + PRs), run into DuckDB, generate wren project with models for issues/pull_requests/comments, detect relationships, build, run sample queries.", + "files": [] + } + ] +} diff --git a/skills/wren-dlt-connector/references/dlt_sources.md b/skills/wren-dlt-connector/references/dlt_sources.md new file mode 100644 index 000000000..c636c0743 --- /dev/null +++ b/skills/wren-dlt-connector/references/dlt_sources.md @@ -0,0 +1,159 @@ +# Popular dlt Verified Sources — Quick Reference + +## Source Auth Patterns + +| Source | Auth Method | Credential | Env Variable | +|--------|-----------|------------|-------------| +| HubSpot | Private App Token | PAT string | `SOURCES__HUBSPOT__API_KEY` | +| Stripe | Secret Key | `sk_live_...` or `sk_test_...` | `SOURCES__STRIPE_ANALYTICS__STRIPE_SECRET_KEY` | +| Salesforce | Username + Password + Security Token | 3 fields | `SOURCES__SALESFORCE__USERNAME`, `__PASSWORD`, `__SECURITY_TOKEN` | +| GitHub | Personal Access Token | `ghp_...` | `SOURCES__GITHUB__ACCESS_TOKEN` | +| Slack | Bot Token | `xoxb-...` | `SOURCES__SLACK__ACCESS_TOKEN` | +| Google Analytics | Service Account JSON | JSON key file | `SOURCES__GOOGLE_ANALYTICS__CREDENTIALS` (JSON string or file path) | +| Google Sheets | Service Account JSON | JSON key file | `SOURCES__GOOGLE_SHEETS__CREDENTIALS` | +| Notion | Integration Token | `secret_...` | `SOURCES__NOTION__API_KEY` | +| Jira | Email + API Token | 2 fields | `SOURCES__JIRA__SUBDOMAIN`, `__EMAIL`, `__API_TOKEN` | +| Zendesk | Email + API Token | 2 fields | `SOURCES__ZENDESK__SUBDOMAIN`, `__EMAIL`, `__PASSWORD` | +| Shopify | Admin API Access Token | token string | `SOURCES__SHOPIFY__PRIVATE_APP_PASSWORD` | +| Airtable | Personal Access Token | `pat...` | `SOURCES__AIRTABLE__ACCESS_TOKEN` | + +## Pipeline Script Templates + +### HubSpot + +```python +import dlt +from dlt.sources.rest_api import rest_api_source + +# Or use the dedicated hubspot source if available: +# dlt init hubspot duckdb +# Then: from hubspot import hubspot + +pipeline = dlt.pipeline( + pipeline_name="hubspot", + destination="duckdb", + dataset_name="hubspot_data", +) + +# With the verified source: +from hubspot import hubspot_source +source = hubspot_source(api_key=dlt.secrets.value) +info = pipeline.run(source) +print(info) +``` + +**Typical tables produced:** contacts, companies, deals, tickets, owners, pipelines, stages, emails, calls, meetings, notes, tasks, products, line_items, quotes + +### Stripe + +```python +import dlt +from stripe_analytics import stripe_source + +pipeline = dlt.pipeline( + pipeline_name="stripe", + destination="duckdb", + dataset_name="stripe_data", +) + +source = stripe_source() +info = pipeline.run(source) +print(info) +``` + +**Typical tables produced:** customers, charges, invoices, subscriptions, products, prices, payment_intents, refunds, balance_transactions, events + +### GitHub + +```python +import dlt +from github import github_reactions, github_repo_events + +pipeline = dlt.pipeline( + pipeline_name="github", + destination="duckdb", + dataset_name="github_data", +) + +source = github_reactions("owner/repo", access_token=dlt.secrets.value) +info = pipeline.run(source) +print(info) +``` + +**Typical tables produced:** issues, pull_requests, comments, reactions, stargazers, commits, events + +### Slack + +```python +import dlt +from datetime import datetime +from slack import slack_source + +pipeline = dlt.pipeline( + pipeline_name="slack", + destination="duckdb", + dataset_name="slack_data", +) + +source = slack_source( + selected_channels=["general", "engineering"], + start_date=datetime(2024, 1, 1), +) +info = pipeline.run(source) +print(info) +``` + +**Typical tables produced:** channels, messages, users, threads, files, reactions + +### Salesforce + +```python +import dlt +from salesforce import salesforce_source + +pipeline = dlt.pipeline( + pipeline_name="salesforce", + destination="duckdb", + dataset_name="salesforce_data", +) + +source = salesforce_source() +info = pipeline.run(source) +print(info) +``` + +**Typical tables produced:** accounts, contacts, leads, opportunities, cases, tasks, events, campaigns, users + +## How to Get Credentials + +### HubSpot +1. Go to HubSpot → Settings → Integrations → Private Apps +2. Create a private app with the scopes you need (contacts, companies, deals, etc.) +3. Copy the access token + +### Stripe +1. Go to Stripe Dashboard → Developers → API keys +2. Copy the Secret key (use test mode key for testing) + +### GitHub +1. Go to GitHub → Settings → Developer settings → Personal access tokens → Fine-grained tokens +2. Create a token with repo read access + +### Slack +1. Go to api.slack.com → Your Apps → Create New App +2. Add Bot Token Scopes: channels:history, channels:read, users:read +3. Install to workspace, copy Bot User OAuth Token + +### Salesforce +1. Your Salesforce username (email) +2. Your Salesforce password +3. Security token: Salesforce → Settings → Reset My Security Token (sent via email) + +## dlt Init Shortcut + +For verified sources, dlt provides a scaffolding command: +```bash +dlt init duckdb +``` +This creates a pipeline script and secrets template. Supported source names: +hubspot, stripe_analytics, salesforce, github, slack, google_analytics, google_sheets, notion, jira, zendesk, shopify, airtable, asana, chess, pokemon, pipedrive, freshdesk, matomo, mongodb, sql_database, rest_api diff --git a/skills/wren-dlt-connector/scripts/introspect_dlt.py b/skills/wren-dlt-connector/scripts/introspect_dlt.py new file mode 100644 index 000000000..445b63854 --- /dev/null +++ b/skills/wren-dlt-connector/scripts/introspect_dlt.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 +"""Introspect a dlt-produced DuckDB file and generate a Wren v2 YAML project. + +Usage: + python introspect_dlt.py --duckdb-path ./pipeline.duckdb --output-dir ./my_project + +This script: +1. Connects to a DuckDB file (read-only) +2. Discovers tables and columns via information_schema +3. Filters out dlt internal tables and metadata columns +4. Detects parent-child relationships from _dlt_parent_id +5. Normalizes column types using wren's type_mapping.parse_type (sqlglot) +6. Generates a complete Wren v2 YAML project +7. Optionally verifies the project builds and queries succeed +""" + +from __future__ import annotations + +import argparse +import re +import sys +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path + +import duckdb +import yaml + +# --------------------------------------------------------------------------- +# dlt internal tables and columns +# --------------------------------------------------------------------------- + +_DLT_METADATA_COLUMNS = frozenset( + { + "_dlt_id", + "_dlt_parent_id", + "_dlt_load_id", + "_dlt_list_idx", + } +) + +_EXCLUDED_SCHEMAS = frozenset({"information_schema", "pg_catalog"}) + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + + +@dataclass +class Column: + name: str + raw_type: str # original DuckDB type + wren_type: str # normalized via parse_type + is_nullable: bool + + +@dataclass +class Table: + catalog: str + schema: str + name: str + columns: list[Column] = field(default_factory=list) + has_dlt_parent_id: bool = False + + +@dataclass +class Relationship: + name: str + parent_model: str + child_model: str + condition: str + join_type: str = "ONE_TO_MANY" + schema: str = "" + + +# --------------------------------------------------------------------------- +# Type normalization — delegates to wren SDK's parse_type when available +# --------------------------------------------------------------------------- + + +def _normalize_type(raw_type: str) -> str: + """Normalize a DuckDB column type using wren's type_mapping.parse_type. + + Wren SDK uses sqlglot to parse database-specific types into canonical + SQL forms (e.g. "character varying" → "VARCHAR", "INT8" → "BIGINT"). + Falls back to uppercase raw type if wren SDK is not importable. + """ + try: + from wren.type_mapping import parse_type # noqa: PLC0415 + + return parse_type(raw_type, "duckdb") + except ImportError: + # Fallback if wren SDK is not installed in the environment + return raw_type.upper().strip() + + +# --------------------------------------------------------------------------- +# Catalog resolution — critical for DuckDB ATTACH behavior +# --------------------------------------------------------------------------- + + +def _resolve_catalog(duckdb_path: Path) -> str: + """Derive the DuckDB catalog name from the file path. + + When wren engine connects to a DuckDB file, it runs: + ATTACH DATABASE 'path/to/file.duckdb' AS "" (READ_ONLY) + where is the filename without extension. This means the catalog + in table_reference MUST match the filename stem, otherwise wren engine + cannot resolve the table. + + Example: stripe_data.duckdb → catalog = "stripe_data" + """ + return duckdb_path.stem + + +# --------------------------------------------------------------------------- +# Introspection +# --------------------------------------------------------------------------- + + +def discover_tables( + con: duckdb.DuckDBPyConnection, + *, + catalog_name: str, +) -> list[Table]: + """Query information_schema to find all user tables with their columns. + + Args: + con: DuckDB connection (read-only). + catalog_name: The catalog name that wren engine will use when + ATTACHing this DuckDB file (= filename stem). + """ + + rows = con.execute( + """ + SELECT + t.table_schema, + t.table_name, + c.column_name, + c.data_type, + c.is_nullable, + c.ordinal_position + FROM information_schema.tables t + JOIN information_schema.columns c + ON t.table_schema = c.table_schema + AND t.table_name = c.table_name + WHERE t.table_type IN ('BASE TABLE', 'VIEW') + AND t.table_schema NOT IN ('information_schema', 'pg_catalog') + AND t.table_name NOT LIKE '_dlt_%' + ORDER BY t.table_schema, t.table_name, c.ordinal_position + """ + ).fetchall() + + tables: dict[str, Table] = {} + + for schema, table_name, col_name, col_type, nullable, _pos in rows: + key = f"{schema}.{table_name}" + if key not in tables: + tables[key] = Table( + catalog=catalog_name, + schema=schema, + name=table_name, + ) + + t = tables[key] + + # Track _dlt_parent_id presence for relationship detection + if col_name == "_dlt_parent_id": + t.has_dlt_parent_id = True + + # Skip dlt metadata columns from model columns + if col_name in _DLT_METADATA_COLUMNS: + continue + + normalized = _normalize_type(col_type) + t.columns.append( + Column( + name=col_name, + raw_type=col_type, + wren_type=normalized, + is_nullable=nullable == "YES", + ) + ) + + return list(tables.values()) + + +def detect_relationships(tables: list[Table]) -> list[Relationship]: + """Detect parent-child relationships from _dlt_parent_id and table naming. + + dlt convention: child table name = parent_name__child_suffix + e.g. hubspot__contacts__emails is a child of hubspot__contacts + """ + tables_by_schema: dict[str, set[str]] = {} + for t in tables: + tables_by_schema.setdefault(t.schema, set()).add(t.name) + relationships: list[Relationship] = [] + + for t in tables: + if not t.has_dlt_parent_id: + continue + + # Find parent: try progressively shorter prefixes split on "__" + parts = t.name.split("__") + parent_name = None + child_suffix = None + + for i in range(len(parts) - 1, 0, -1): + candidate = "__".join(parts[:i]) + if candidate in tables_by_schema.get(t.schema, set()) and candidate != t.name: + parent_name = candidate + child_suffix = "__".join(parts[i:]) + break + + if parent_name is None: + print( + f" Warning: {t.name} has _dlt_parent_id but no matching parent table found", + file=sys.stderr, + ) + continue + + rel_name = f"{parent_name}__{child_suffix}" + relationships.append( + Relationship( + name=rel_name, + parent_model=parent_name, + child_model=t.name, + condition=f'"{t.name}"._dlt_parent_id = "{parent_name}"._dlt_id', + schema=t.schema, + ) + ) + + return relationships + + +# --------------------------------------------------------------------------- +# Project generation +# --------------------------------------------------------------------------- + + +def _safe_path_segment(value: str) -> str: + """Sanitize a string for use as a filesystem directory name.""" + cleaned = re.sub(r"[^A-Za-z0-9_.-]", "_", value).strip("._") + if not cleaned: + raise ValueError(f"Invalid path segment from identifier: {value!r}") + return cleaned + + +def generate_project_files( + tables: list[Table], + relationships: list[Relationship], + *, + project_name: str, + duckdb_path: str, +) -> dict[str, str]: + """Generate all project file contents as {relative_path: content} dict.""" + + files: dict[str, str] = {} + + # Detect cross-schema name collisions; qualify model names when needed + name_counts: dict[str, int] = {} + for t in tables: + name_counts[t.name] = name_counts.get(t.name, 0) + 1 + + def resolve_name(schema: str, table_name: str) -> str: + return f"{schema}__{table_name}" if name_counts.get(table_name, 0) > 1 else table_name + + # -- wren_project.yml -- + project_config = { + "schema_version": 2, + "name": project_name, + "version": "1.0", + "catalog": "", + "schema": "public", + "data_source": "duckdb", + } + files["wren_project.yml"] = yaml.dump( + project_config, default_flow_style=False, sort_keys=False + ) + + # -- models//metadata.yml -- + for t in tables: + model_name = resolve_name(t.schema, t.name) + columns_yaml = [] + for c in t.columns: + col_entry: dict = { + "name": c.name, + "type": c.wren_type, + "is_calculated": False, + "not_null": not c.is_nullable, + "properties": {}, + } + columns_yaml.append(col_entry) + + model: dict = { + "name": model_name, + "table_reference": { + "catalog": t.catalog, + "schema": t.schema, + "table": t.name, + }, + "columns": columns_yaml, + "cached": False, + "properties": { + "description": f"Table from dlt pipeline ({t.schema}.{t.name})", + }, + } + + dir_name = _safe_path_segment(model_name) + files[f"models/{dir_name}/metadata.yml"] = yaml.dump( + model, default_flow_style=False, sort_keys=False + ) + + # -- relationships.yml -- + if relationships: + rels_yaml = [] + for r in relationships: + parent = resolve_name(r.schema, r.parent_model) + child = resolve_name(r.schema, r.child_model) + rels_yaml.append( + { + "name": r.name, + "models": [parent, child], + "join_type": r.join_type, + "condition": f'"{child}"._dlt_parent_id = "{parent}"._dlt_id', + } + ) + files["relationships.yml"] = yaml.dump( + {"relationships": rels_yaml}, + default_flow_style=False, + sort_keys=False, + ) + else: + files["relationships.yml"] = "relationships: []\n" + + # -- instructions.md -- + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + instructions = ( + "# Instructions\n\n" + f"This Wren project was auto-generated from a dlt DuckDB pipeline.\n\n" + f"- **Source DuckDB:** `{duckdb_path}`\n" + f"- **Generated:** {timestamp}\n" + f"- **Tables:** {len(tables)}\n" + f"- **Relationships:** {len(relationships)}\n\n" + "dlt metadata columns (`_dlt_id`, `_dlt_parent_id`, etc.) are hidden from models\n" + "but still present in the underlying DuckDB tables.\n" + ) + files["instructions.md"] = instructions + + return files + + +def write_project(files: dict[str, str], output_dir: Path, *, force: bool = False): + """Write generated files to disk.""" + project_file = output_dir / "wren_project.yml" + if project_file.exists() and not force: + print( + f"Error: {project_file} already exists. Use --force to overwrite.", + file=sys.stderr, + ) + sys.exit(1) + + for rel_path, content in files.items(): + path = output_dir / rel_path + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser( + description="Generate a Wren project from a dlt DuckDB file." + ) + parser.add_argument( + "--duckdb-path", required=True, help="Path to the .duckdb file" + ) + parser.add_argument( + "--output-dir", + default=".", + help="Directory to write the Wren project (default: current dir)", + ) + parser.add_argument( + "--project-name", + default=None, + help="Project name (default: derived from DuckDB filename)", + ) + parser.add_argument( + "--force", action="store_true", help="Overwrite existing project files" + ) + args = parser.parse_args() + + duckdb_path = Path(args.duckdb_path).resolve() + if not duckdb_path.exists(): + print(f"Error: {duckdb_path} not found.", file=sys.stderr) + sys.exit(1) + + project_name = args.project_name or duckdb_path.stem.replace("-", "_") + output_dir = Path(args.output_dir).resolve() + + # The catalog must match the filename stem — this is how wren engine's + # DuckDB connector ATTACHes the file. + catalog_name = _resolve_catalog(duckdb_path) + + # Connect read-only + con = duckdb.connect(str(duckdb_path), read_only=True) + + try: + print(f"Introspecting {duckdb_path}...") + print(f" Catalog (from filename): {catalog_name}") + + tables = discover_tables(con, catalog_name=catalog_name) + print(f" Found {len(tables)} tables") + + if not tables: + print( + " Warning: no user tables found. The DuckDB file may be empty.", + file=sys.stderr, + ) + + relationships = detect_relationships(tables) + print(f" Detected {len(relationships)} parent-child relationships") + + files = generate_project_files( + tables, + relationships, + project_name=project_name, + duckdb_path=str(duckdb_path), + ) + + write_project(files, output_dir, force=args.force) + print(f"\nWren project written to {output_dir}/") + print(f" {len(tables)} models, {len(relationships)} relationships") + print(f"\nNext steps:") + print(f" cd {output_dir}") + print(f" wren context validate") + print(f" wren context build") + + finally: + con.close() + + +if __name__ == "__main__": + main()