From fceb666b150edbf9269f010e30900e39cee8d9ab Mon Sep 17 00:00:00 2001 From: Simo Tumelius Date: Sun, 13 Feb 2022 15:32:19 +0200 Subject: [PATCH] Feat/demo data catalog changes (#44) * Change explore_nodes now takes a list of nodes as arg * Add CatalogExploreCommand abstraction * Rename DbtCloudBaseModel as CLIBaseModel * Refactor explore method * Rename CLIBaseModel as ClickBaseModel Co-authored-by: Simo Tumelius --- README.md | 6 +- dbt_cloud/command/command.py | 4 +- dbt_cloud/command/job/create.py | 12 +- dbt_cloud/demo/catalog.py | 190 +++++++++++++++++--------------- 4 files changed, 113 insertions(+), 99 deletions(-) diff --git a/README.md b/README.md index d43de36..5415a75 100644 --- a/README.md +++ b/README.md @@ -1035,9 +1035,9 @@ An interactive CLI application for exploring `catalog.json` artifacts. ## ### -[?] Select attribute to explore: sources - > sources - nodes +[?] Select node type to explore: source + > source + node ``` ## Acknowledgements diff --git a/dbt_cloud/command/command.py b/dbt_cloud/command/command.py index b1ca35e..55dc3b4 100644 --- a/dbt_cloud/command/command.py +++ b/dbt_cloud/command/command.py @@ -18,7 +18,7 @@ def translate_click_options(**kwargs) -> dict: return kwargs_translated -class DbtCloudBaseModel(BaseModel): +class ClickBaseModel(BaseModel): @classmethod def click_options(cls, function, key_prefix: str = ""): for key, field in reversed(cls.__fields__.items()): @@ -73,7 +73,7 @@ def get_description(cls) -> str: return cls.__doc__.strip() -class DbtCloudCommand(DbtCloudBaseModel): +class DbtCloudCommand(ClickBaseModel): api_token: str = API_TOKEN_FIELD account_id: int = ACCOUNT_ID_FIELD dbt_cloud_host: str = DBT_CLOUD_HOST_FIELD diff --git a/dbt_cloud/command/job/create.py b/dbt_cloud/command/job/create.py index c426b98..8b3dc79 100644 --- a/dbt_cloud/command/job/create.py +++ b/dbt_cloud/command/job/create.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Optional, List from pydantic import Field -from dbt_cloud.command.command import DbtCloudCommand, DbtCloudBaseModel +from dbt_cloud.command.command import DbtCloudCommand, ClickBaseModel class DateTypeEnum(Enum): @@ -16,13 +16,13 @@ class TimeTypeEnum(Enum): AT_EXACT_HOURS = "at_exact_hours" -class DbtCloudJobTriggers(DbtCloudBaseModel): +class DbtCloudJobTriggers(ClickBaseModel): github_webhook: bool = Field(default=False) schedule: bool = Field(default=False) custom_branch_only: bool = Field(default=False) -class DbtCloudJobSettings(DbtCloudBaseModel): +class DbtCloudJobSettings(ClickBaseModel): threads: int = Field( default=1, description="The maximum number of models to run in parallel in a single dbt run.", @@ -33,16 +33,16 @@ class DbtCloudJobSettings(DbtCloudBaseModel): ) -class DbtCloudJobScheduleDate(DbtCloudBaseModel): +class DbtCloudJobScheduleDate(ClickBaseModel): type: DateTypeEnum = Field(default="every_day", description=None) -class DbtCloudJobScheduleTime(DbtCloudBaseModel): +class DbtCloudJobScheduleTime(ClickBaseModel): type: TimeTypeEnum = Field(default="every_hour", description=None) interval: int = Field(default=1) -class DbtCloudJobSchedule(DbtCloudBaseModel): +class DbtCloudJobSchedule(ClickBaseModel): cron: str = Field( default="0 * * * *", description="Cron-syntax schedule for the job." ) diff --git a/dbt_cloud/demo/catalog.py b/dbt_cloud/demo/catalog.py index a9441e5..cbe4366 100644 --- a/dbt_cloud/demo/catalog.py +++ b/dbt_cloud/demo/catalog.py @@ -1,10 +1,12 @@ import click +from enum import Enum +from pathlib import Path from typing import Optional, Dict, Any -from pydantic import Field, BaseModel -from dbt_cloud.command.command import DbtCloudBaseModel +from pydantic import BaseModel, Field +from dbt_cloud.command.command import ClickBaseModel -class Stats(DbtCloudBaseModel): +class Stats(BaseModel): """Represent node stats in the Catalog.""" id: str @@ -17,7 +19,7 @@ def __str__(self): return f"{self.label}: {self.value}" -class Column(DbtCloudBaseModel): +class Column(BaseModel): """Represents a column in the Catalog.""" type: str @@ -29,7 +31,7 @@ def __str__(self): return f"{self.name} (type: {self.type}, index: {self.index}, comment: {self.comment})" -class Node(DbtCloudBaseModel): +class Node(BaseModel): """Represents a node in the Catalog.""" unique_id: str @@ -53,21 +55,17 @@ def schema(self): def type(self): return self.metadata["type"] - @property - def owner(self): - return self.metadata.get("owner") - - def __str__(self): - return f"{self.name} (type: {self.type}, schema: {self.schema}, database: {self.database})" - def __gt__(self, other): return self.name > other.name def __lt__(self, other): return self.name < other.name + def __str__(self): + return f"{self.name} (type: {self.type}, schema: {self.schema}, database: {self.database})" + -class Catalog(DbtCloudBaseModel): +class Catalog(BaseModel): """Represents a dbt catalog.json artifact.""" metadata: Dict @@ -76,78 +74,94 @@ class Catalog(DbtCloudBaseModel): errors: Optional[Dict] -def explore_nodes(nodes: Dict[str, Node], node_type: str = "node"): - import inquirer - - while True: - databases = sorted(set([node.database for node in nodes.values()])) - database_options = [ - inquirer.List("database", message="Select database", choices=databases) - ] - database = inquirer.prompt(database_options)["database"] - nodes_filtered = { - node_name: node - for node_name, node in nodes.items() - if node.database == database - } - - schemas = sorted(set([node.schema for node in nodes_filtered.values()])) - schema_options = [ - inquirer.List("schema", message="Select schema", choices=schemas) - ] - schema = inquirer.prompt(schema_options)["schema"] - nodes_filtered = { - node_name: node - for node_name, node in nodes_filtered.items() - if node.schema == schema - } - - node_options = [ - inquirer.List( - "node", message="Select node", choices=sorted(nodes_filtered.values()) - ) - ] - node = inquirer.prompt(node_options)["node"] - click.echo(f"{node.name} columns:") - for column in node.columns.values(): - click.echo(f"- {column}") - click.echo("") - for stats in node.stats.values(): - if stats.id == "has_stats": - continue - click.echo(stats) - if not click.confirm(f"Explore another {node_type}?"): - break - - -@click.command(help="An inteactive application for exploring catalog artifacts.") -@click.option( - "-f", - "--file", - default="catalog.json", - type=str, - help="Catalog file path.", -) -def data_catalog(file): - import inquirer - from art import tprint - - catalog = Catalog.parse_file(file) - nodes = {node.name: node for node in catalog.nodes.values()} - tprint("Data Catalog", font="rand-large") - while True: - attribute_options = [ - inquirer.List( - "attribute", - message="Select attribute to explore", - choices=["sources", "nodes"], - ) - ] - attribute = inquirer.prompt(attribute_options)["attribute"] - - if attribute == "nodes": - explore_nodes(nodes) - elif attribute == "sources": - explore_nodes(catalog.sources, node_type="source") - if not click.confirm("Explore another attribute?"): - break +class NodeType(Enum): + SOURCE = "source" + NODE = "node" + + +class CatalogExploreCommand(ClickBaseModel): + """An inteactive application for exploring catalog artifacts.""" + + file: Path = Field(default="catalog.json", description="Catalog file path.") + title: str = Field( + default="Data Catalog", description="ASCII art title for the app." + ) + title_font: str = Field( + default="rand-large", + description="ASCII art title font (see https://github.com/sepandhaghighi/art#try-art-in-your-browser for a list of available fonts)", + ) + + def get_catalog(self) -> Catalog: + return Catalog.parse_file(self.file) + + def print_title(self): + from art import tprint + + tprint(self.title, font=self.title_font) + + def execute(self): + import inquirer + + self.print_title() + + while True: + node_type_options = [ + inquirer.List( + "node_type", + message="Select node type to explore", + choices=[node_type.value for node_type in NodeType], + ) + ] + node_type = NodeType(inquirer.prompt(node_type_options)["node_type"]) + self.explore(node_type=node_type) + if not click.confirm("Explore another node type?"): + break + + def explore(self, node_type: NodeType): + """Interactive exploration of nodes to explore and display their metadata""" + import inquirer + + catalog = self.get_catalog() + if node_type == NodeType.SOURCE: + nodes = list(catalog.sources.values()) + else: + nodes = list(catalog.nodes.values()) + + while True: + databases = sorted(set(map(lambda x: x.database, nodes))) + database_options = [ + inquirer.List("database", message="Select database", choices=databases) + ] + database = inquirer.prompt(database_options)["database"] + nodes_filtered = list(filter(lambda x: x.database == database, nodes)) + + schemas = sorted(set(map(lambda x: x.schema, nodes_filtered))) + schema_options = [ + inquirer.List("schema", message="Select schema", choices=schemas) + ] + schema = inquirer.prompt(schema_options)["schema"] + nodes_filtered = list(filter(lambda x: x.schema == schema, nodes_filtered)) + + node_options = [ + inquirer.List( + "node", message="Select node", choices=sorted(nodes_filtered) + ) + ] + node = inquirer.prompt(node_options)["node"] + click.echo(f"{node.name} columns:") + for column in node.columns.values(): + click.echo(f"- {column}") + click.echo("") + for stats in node.stats.values(): + if stats.id == "has_stats": + continue + click.echo(stats) + if not click.confirm(f"Explore another {node_type.value}?"): + break + + +@click.command(help=CatalogExploreCommand.get_description()) +@CatalogExploreCommand.click_options +def data_catalog(**kwargs): + command = CatalogExploreCommand.from_click_options(**kwargs) + command.execute()