From 874f0acfd5ff69dae63d28d98152899e42e7eef0 Mon Sep 17 00:00:00 2001 From: Nikhil Simha Date: Mon, 10 Mar 2025 19:02:27 -0700 Subject: [PATCH 01/10] stub: physical graph for workflow submission + column lineage --- api/py/ai/chronon/cli/entrypoint.py | 37 +++++------ api/py/ai/chronon/cli/plan/physical_index.py | 64 ++++++++++++++++++++ api/thrift/api.thrift | 3 + api/thrift/lineage.thrift | 54 +++++++++++++++++ 4 files changed, 141 insertions(+), 17 deletions(-) create mode 100644 api/py/ai/chronon/cli/plan/physical_index.py create mode 100644 api/thrift/lineage.thrift diff --git a/api/py/ai/chronon/cli/entrypoint.py b/api/py/ai/chronon/cli/entrypoint.py index bd62142b3e..d5fbc11c06 100644 --- a/api/py/ai/chronon/cli/entrypoint.py +++ b/api/py/ai/chronon/cli/entrypoint.py @@ -1,7 +1,15 @@ +from dataclasses import dataclass +from typing import Dict, List import click from datetime import datetime, timedelta -from ai.chronon.cli.compile.compiler import Compiler +from ai.chronon.api.common.ttypes import ConfigType +from ai.chronon.cli.plan.physical_index import ( + PhysicalIndex, + get_backfill_physical_graph, + submit_physical_graph, +) +from ai.chronon.cli.compile.compiler import CompileResult, Compiler from ai.chronon.cli.compile.compile_context import CompileContext from ai.chronon.cli.git_utils import get_current_branch @@ -12,14 +20,11 @@ def cli(): pass -@cli.command() -@click.option("--branch", default=get_current_branch, help="Branch to sync") -def sync(branch): - """Sync data for the specified branch""" - click.echo(f"\nSyncing data for branch \u001b[32m{branch}\u001b[0m") +def compile() -> Dict[ConfigType, CompileResult]: compile_context = CompileContext() compiler = Compiler(compile_context) - compiler.compile(compile_context) + # TODO(orc): add column lineage to objects + return compiler.compile(compile_context) @cli.command() @@ -34,17 +39,15 @@ def sync(branch): default=datetime.now().strftime("%Y-%m-%d"), help="End date for backfill (YYYY-MM-DD)", ) -@click.option( - "--scope", - type=click.Choice(["upstream", "self", "downstream"]), - default="upstream", - help="Scope of configs to backfill", -) -def backfill(conf: str, start_date: str, end_date: str, scope: str): - """Backfill data between start and end dates""" - click.echo( - f"Backfilling with scope {scope} for config {conf} from {start_date} to {end_date}" +@click.option("--branch", default=get_current_branch, help="Branch to sync") +def backfill(conf: str, start_date: str, end_date: str): + # compile + compile_result = compile() + physical_index = PhysicalIndex.from_compiled_obj(compile_result) + physical_graph = get_backfill_physical_graph( + conf, physical_index, start_date, end_date ) + submit_physical_graph(physical_graph) @cli.command() diff --git a/api/py/ai/chronon/cli/plan/physical_index.py b/api/py/ai/chronon/cli/plan/physical_index.py new file mode 100644 index 0000000000..72dabaab54 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/physical_index.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +from typing import Dict, List +from ai.chronon.api.common.ttypes import ConfigType +from ai.chronon.cli.compile.compile_context import CompiledObj +from ai.chronon.cli.compile.compiler import CompileResult +from ai.chronon.lineage.ttypes import Column, ColumnLineage +from ai.chronon.types import TableDependency + + +@dataclass +class PhysicalNode: + name: str + node_type: str # NodeType + conf: CompiledObj + table_dependencies: List[TableDependency] + output_columns: List[str] + output_table: str + # TODO: online deps + + def construct_lineage(self): + pass + + +def get_physical_nodes(obj: CompiledObj) -> List[PhysicalNode]: + pass + + +@dataclass +class PhysicalGraph: + node: PhysicalNode + dependencies: List["PhysicalGraph"] + start_date: str + end_date: str + + +@dataclass +class PhysicalIndex: + table_to_physical: Dict[str, PhysicalNode] + + # TODO incorporate stage column lineage + column_lineage: Dict[Column, ColumnLineage] + + @classmethod + def from_compiled_obj( + cls, compiled_obj: Dict[ConfigType, CompileResult] + ) -> "PhysicalIndex": + pass + + +def get_backfill_physical_graph( + conf: str, index: PhysicalIndex, start_date: str, end_date: str +) -> PhysicalGraph: + pass + + +def get_deploy_physical_graph(conf: str, index: PhysicalIndex) -> PhysicalGraph: + pass + + +def submit_physical_graph(physical_graph: PhysicalGraph) -> str: + # 1. get physical nodes to submit + # 2. find missing nodes from server + # 3. + pass diff --git a/api/thrift/api.thrift b/api/thrift/api.thrift index 965db610fe..1c21d4a056 100644 --- a/api/thrift/api.thrift +++ b/api/thrift/api.thrift @@ -3,6 +3,7 @@ namespace java ai.chronon.api include "common.thrift" include "observability.thrift" +include "lineage.thrift" // cd /path/to/chronon // thrift --gen py -out api/py/ api/thrift/api.thrift @@ -263,6 +264,8 @@ struct MetaData { // column -> tag_key -> tag_value 21: optional map> columnTags + 8: optional list stageColumnLineages + // marking this as true means that the conf can be served online // once marked online, a conf cannot be changed - compiling the conf won't be allowed 100: optional bool online diff --git a/api/thrift/lineage.thrift b/api/thrift/lineage.thrift new file mode 100644 index 0000000000..341f2039b5 --- /dev/null +++ b/api/thrift/lineage.thrift @@ -0,0 +1,54 @@ +namespace py ai.chronon.lineage +namespace java ai.chronon.lineage + +struct Column { + 1: optional string logicalNodeName + 2: optional string stageName + 3: optional string columnName +} + +enum ColumnLineageStage { + SELECT, + AGG, + DERIVE, + JOIN +} + + +/** +* NOTE: Staging Query will only be purely output columns. +* stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT +**/ +enum ColumnLineageType { + WHERE, + JOIN_KEY, + GROUP_BY_KEY, + DERIVE, + TIMESTAMP, + DIRECT + + /** + * select x+y as a, c, json_extract(payload, "$.meta.user") as user where b = 10 + * group_by user + * agg - count(a, window=7d, bucket=c) + * + * Stage = SELECT, output = a, expression = x + y, input_columns = [x, y], type = DIRECT + * expression = "b = 10", input_columns = [b], type = WHERE + * output=user, expression = 'json_extract(payload, "$.meta.user")', input_columns = [payload], type = GROUP_BY_KEY + * + * Stage = AGG, output = a_count_7d_by_c, expression = 'count(a, window=7d, bucket=c)', input_columns = [a, c], type = DIRECT + * output=user, input_columns = [user], type = GROUP_BY_KEY + **/ +} + +struct ColumnLineage { + 1: optional list inputColumns + 2: optional string expression + 3: optional Column outputColumn + 4: optional ColumnLineageType lineageType // not present means direct +} + +struct StageColumnLineage { + 1: optional ColumnLineageStage stage + 2: optional list lineage +} \ No newline at end of file From 5d68f396cfede8957146a62768da8fefa107be2c Mon Sep 17 00:00:00 2001 From: Nikhil Simha Date: Tue, 11 Mar 2025 19:08:13 -0700 Subject: [PATCH 02/10] object oriented --- .../ai/chronon/cli/plan/controller_iface.py | 17 +++ api/py/ai/chronon/cli/plan/physical_graph.py | 23 +++ api/py/ai/chronon/cli/plan/physical_index.py | 80 +++++----- api/py/ai/chronon/cli/plan/physical_node.py | 33 +++++ api/thrift/common.thrift | 10 +- api/thrift/orchestration.thrift | 138 +++++++----------- 6 files changed, 176 insertions(+), 125 deletions(-) create mode 100644 api/py/ai/chronon/cli/plan/controller_iface.py create mode 100644 api/py/ai/chronon/cli/plan/physical_graph.py create mode 100644 api/py/ai/chronon/cli/plan/physical_node.py diff --git a/api/py/ai/chronon/cli/plan/controller_iface.py b/api/py/ai/chronon/cli/plan/controller_iface.py new file mode 100644 index 0000000000..be95826ba6 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/controller_iface.py @@ -0,0 +1,17 @@ +from abc import ABC, abstractmethod +from typing import Dict, List + +from ai.chronon.cli.plan.physical_index import PhysicalNode + + +class OrchestratorIface(ABC): + def __init__(self): + pass + + @abstractmethod + def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]: + pass + + @abstractmethod + def upload_conf(self, name: str, hash: str, content: str) -> None: + pass diff --git a/api/py/ai/chronon/cli/plan/physical_graph.py b/api/py/ai/chronon/cli/plan/physical_graph.py new file mode 100644 index 0000000000..b016311723 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/physical_graph.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import Dict, List + +from ai.chronon.cli.plan.physical_index import PhysicalNode + + +@dataclass +class PhysicalGraph: + node: PhysicalNode + dependencies: List["PhysicalGraph"] + start_date: str + end_date: str + + def flatten(self) -> Dict[str, PhysicalNode]: + # recursively find hashes of all nodes in the physical graph + + result = {self.node.name: self.node} + + for sub_graph in self.dependencies: + sub_hashes = sub_graph.flatten() + result.update(sub_hashes) + + return result diff --git a/api/py/ai/chronon/cli/plan/physical_index.py b/api/py/ai/chronon/cli/plan/physical_index.py index 72dabaab54..907998229a 100644 --- a/api/py/ai/chronon/cli/plan/physical_index.py +++ b/api/py/ai/chronon/cli/plan/physical_index.py @@ -1,36 +1,12 @@ from dataclasses import dataclass from typing import Dict, List + from ai.chronon.api.common.ttypes import ConfigType -from ai.chronon.cli.compile.compile_context import CompiledObj from ai.chronon.cli.compile.compiler import CompileResult +from ai.chronon.cli.plan.controller_iface import ControllerIface +from ai.chronon.cli.plan.physical_graph import PhysicalGraph from ai.chronon.lineage.ttypes import Column, ColumnLineage -from ai.chronon.types import TableDependency - - -@dataclass -class PhysicalNode: - name: str - node_type: str # NodeType - conf: CompiledObj - table_dependencies: List[TableDependency] - output_columns: List[str] - output_table: str - # TODO: online deps - - def construct_lineage(self): - pass - - -def get_physical_nodes(obj: CompiledObj) -> List[PhysicalNode]: - pass - - -@dataclass -class PhysicalGraph: - node: PhysicalNode - dependencies: List["PhysicalGraph"] - start_date: str - end_date: str +from ai.chronon.cli.plan.physical_node import PhysicalNode @dataclass @@ -39,6 +15,23 @@ class PhysicalIndex: # TODO incorporate stage column lineage column_lineage: Dict[Column, ColumnLineage] + controller: ControllerIface + + def __init__( + self, + physical_nodes: List[PhysicalNode], + controller: ControllerIface, + branch: str, + ): + self.controller = controller + self.table_to_physical = {} + self.column_lineage = {} + self.branch = branch + self.populate_index(physical_nodes) + + # TODO: populate index + def populate_index(self, physical_nodes: List[PhysicalNode]): + pass @classmethod def from_compiled_obj( @@ -46,19 +39,28 @@ def from_compiled_obj( ) -> "PhysicalIndex": pass + def get_backfill_physical_graph( + self, conf_name: str, start_date: str, end_date: str + ) -> PhysicalGraph: + pass + + def get_deploy_physical_graph(self, conf_name: str, date: str) -> PhysicalGraph: + pass -def get_backfill_physical_graph( - conf: str, index: PhysicalIndex, start_date: str, end_date: str -) -> PhysicalGraph: - pass + def submit_physical_graph(self, physical_graph: PhysicalGraph) -> str: + node_to_physical: Dict[str, PhysicalNode] = physical_graph.flatten() -def get_deploy_physical_graph(conf: str, index: PhysicalIndex) -> PhysicalGraph: - pass + node_to_hash = {name: node.conf_hash for name, node in node_to_physical.items()} + missing_conf_names = self.controller.fetch_missing_confs(node_to_hash) + missing_physical_nodes = [ + node_to_physical[conf_name] for conf_name in missing_conf_names + ] -def submit_physical_graph(physical_graph: PhysicalGraph) -> str: - # 1. get physical nodes to submit - # 2. find missing nodes from server - # 3. - pass + # upload missing confs + for physical_node in missing_physical_nodes: + hash = physical_node.conf_hash + json = physical_node.conf.tjson + name = physical_node.name + self.controller.upload_conf(name, hash, json) diff --git a/api/py/ai/chronon/cli/plan/physical_node.py b/api/py/ai/chronon/cli/plan/physical_node.py new file mode 100644 index 0000000000..e5fa8dea59 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/physical_node.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass +from typing import List + +from ai.chronon.api.common.ttypes import TableDependency +from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery +from ai.chronon.cli.compile.compile_context import CompiledObj + + +@dataclass +class PhysicalNode: + name: str + node_type: str # NodeType + conf: CompiledObj + conf_hash: str + table_dependencies: List[TableDependency] + output_columns: List[str] + output_table: str + + @classmethod + def from_group_by(cls, group_by: GroupBy) -> List["PhysicalNode"]: + pass + + @classmethod + def from_join(cls, join: Join) -> List["PhysicalNode"]: + pass + + @classmethod + def from_staging_query(cls, staging_query: StagingQuery) -> List["PhysicalNode"]: + pass + + @classmethod + def from_model(cls, model: Model) -> List["PhysicalNode"]: + pass diff --git a/api/thrift/common.thrift b/api/thrift/common.thrift index 02c9bfc3ef..73ccc33302 100644 --- a/api/thrift/common.thrift +++ b/api/thrift/common.thrift @@ -67,14 +67,19 @@ struct TableDependency { // fully qualified table name 1: optional string table - // params to select the partitions of the table for any query range - // logic is: [max(query.start - startOffset, startCutOff), min(query.end - endOffset, endCutOff)] + // DEPENDENCY_RANGE_LOGIC + // 1. get final start_partition, end_partition + // 2. break into step ranges + // 3. for each dependency + // a. dependency_start: max(query.start - startOffset, startCutOff) + // b. dependency_end: min(query.end - endOffset, endCutOff) 2: optional Window startOffset 3: optional Window endOffset 4: optional string startCutOff 5: optional string endCutOff # if not present we will pull from defaults + // needed to enumerate what partitions are in a range 100: optional string partitionColumn 101: optional string partitionFormat 102: optional Window partitionInterval @@ -116,6 +121,7 @@ struct ExecutionInfo { 4: optional i64 healthCheckIntervalMillis # relevant for batch jobs + # temporal workflow nodes maintain their own cron schedule 10: optional string scheduleCron 11: optional i32 stepDays 12: optional bool historicalBackfill diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index e2b033726c..5900358952 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -30,6 +30,7 @@ union LogicalNode { 5: TabularData tabularData } + enum LogicalType { GROUP_BY = 1, JOIN = 2, @@ -38,60 +39,7 @@ enum LogicalType { TABULAR_DATA = 5 } -struct NodeKey { - 1: optional string name - - 2: optional LogicalType logicalType - 3: optional PhysicalNodeType physicalType - - /** - * represents the computation of the node including the computation of all its parents - * direct and indirect changes that change output will affect lineage hash - **/ - 10: optional string lineageHash -} - -struct NodeInfo { - /** - * represents the computation that a node does - * direct changes to conf that change output will affect semantic hash - * changing spark params etc shouldn't affect this - **/ - 11: optional string semanticHash - - /** - * simple hash of the entire conf (that is TSimpleJsonProtocol serialized), - * computed by cli and used to check if new conf_json need to be pushed from user's machine - **/ - 12: optional string confHash - - /** - * when new/updated conf's are pushed the branch is also set from the cli - * upon merging the branch will be unset - **/ - 20: optional string branch - - /** - * will be set to the author of the last semantic change to node - * (non-semantic changes like code-mods or spark params don't affect this) - **/ - 21: optional string author - - /** - * contents of the conf itself - **/ - 30: optional LogicalNode conf -} - -struct NodeConnections { - 1: optional list parents - 2: optional list children -} -struct NodeGraph { - 1: optional map connections - 2: optional map infoMap -} @@ -149,36 +97,6 @@ union PhysicalNodeType { // ====================== End of physical node types ====================== -/** -* Multiple logical nodes could share the same physical node -* For that reason we don't have a 1-1 mapping between logical and physical nodes -**/ -struct PhysicalNodeKey { - 1: optional string name - 2: optional PhysicalNodeType nodeType - - /** - * parentLineageHashes[] + semanticHash of the portion of compute this node does - **/ - 20: optional string lineageHash - -} - -struct PhysicalNode { - 1: optional string name - 2: optional PhysicalNodeType nodeType - - /** - * parentLineageHashes[] + semanticHash of the portion of compute this node does - **/ - 20: optional string version - 3: optional LogicalNode config - - 21: optional string branch - - // null means ad-hoc, zero means continuously running - 40: optional common.Window scheduleInterval -} struct SourceWithFilter { 1: optional api.Source source @@ -260,4 +178,56 @@ struct JoinPartJobArgs { * -- Phase 4 Plan -- model training * Model::training - [source.table] >> training * Model::bulk_inference - [source.table] >> bulk_inference -**/ \ No newline at end of file +**/ + + +/** +* physical node -> workflow id +* +* +* ad-hoc -> graph +* we will trigger the root node with the right start_date and end_date +* +* +* Global Scheduler Workflow: +* 1. wakeup more frequently 15 minutes +* 2. scan database for unscheduled workflows +* 3. trigger unscheduled but required statuses +* +* +* Workflow is always triggered externally: +* +* node.trigger(start_date?, end_date, branch, is_scheduled): +* +* # activity - 1 +* (missing_start, missing_end) = partition_dao.find_missing(start_date?, end_date) +* missing_steps = compute_steps(missing_start, missing_end, branch_dao.get_step_days(this)) +* +* foreach_par missing_step in missing_steps: +* foreach_par dependency in dependencies: +* +* if dependency.is_internal: +* +* (dep_start, dep_end) = dependency.compute_range(missing_step.start, missing_step.end) +* # activity - 2 +* dependency.trigger_and_wait(dep_start, dep_end, branch) +* +* else: +* +* # activity - 3 +* if is_scheduled: +* dependency.wait(dep_start, dep_end) +* else: +* dependency.fail_if_absent(dep_start, dep_end) +* +* # activity - 4 +* node.submit_work_and_wait(missing_start, missing_end, branch_dao.get_conf(this)) +* +* return +* +* +* +* +* sync(physical_graph): +* +**/ From 450c9d061939baac924ded94af0a56af8253ea61 Mon Sep 17 00:00:00 2001 From: Nikhil Simha Date: Tue, 11 Mar 2025 19:15:59 -0700 Subject: [PATCH 03/10] Update api/py/ai/chronon/cli/plan/physical_node.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- api/py/ai/chronon/cli/plan/physical_node.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/py/ai/chronon/cli/plan/physical_node.py b/api/py/ai/chronon/cli/plan/physical_node.py index e5fa8dea59..905a9838d3 100644 --- a/api/py/ai/chronon/cli/plan/physical_node.py +++ b/api/py/ai/chronon/cli/plan/physical_node.py @@ -18,16 +18,16 @@ class PhysicalNode: @classmethod def from_group_by(cls, group_by: GroupBy) -> List["PhysicalNode"]: - pass + raise NotImplementedError("Method not yet implemented") @classmethod def from_join(cls, join: Join) -> List["PhysicalNode"]: - pass + raise NotImplementedError("Method not yet implemented") @classmethod def from_staging_query(cls, staging_query: StagingQuery) -> List["PhysicalNode"]: - pass + raise NotImplementedError("Method not yet implemented") @classmethod def from_model(cls, model: Model) -> List["PhysicalNode"]: - pass + raise NotImplementedError("Method not yet implemented") From 74ae75c5c22113e056b20b1de8e94ac77a82421b Mon Sep 17 00:00:00 2001 From: Nikhil Simha Date: Tue, 11 Mar 2025 19:16:14 -0700 Subject: [PATCH 04/10] Update api/py/ai/chronon/cli/entrypoint.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- api/py/ai/chronon/cli/entrypoint.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/py/ai/chronon/cli/entrypoint.py b/api/py/ai/chronon/cli/entrypoint.py index d5fbc11c06..512963927d 100644 --- a/api/py/ai/chronon/cli/entrypoint.py +++ b/api/py/ai/chronon/cli/entrypoint.py @@ -1,5 +1,4 @@ -from dataclasses import dataclass -from typing import Dict, List +from typing import Dict import click from datetime import datetime, timedelta From 2b0e37e27bc3155b073422535fd0056b88398d31 Mon Sep 17 00:00:00 2001 From: Nikhil Simha Date: Tue, 11 Mar 2025 19:17:02 -0700 Subject: [PATCH 05/10] Update api/py/ai/chronon/cli/plan/physical_index.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- api/py/ai/chronon/cli/plan/physical_index.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/api/py/ai/chronon/cli/plan/physical_index.py b/api/py/ai/chronon/cli/plan/physical_index.py index 907998229a..a99adf8c35 100644 --- a/api/py/ai/chronon/cli/plan/physical_index.py +++ b/api/py/ai/chronon/cli/plan/physical_index.py @@ -31,22 +31,21 @@ def __init__( # TODO: populate index def populate_index(self, physical_nodes: List[PhysicalNode]): - pass + raise NotImplementedError("Method not yet implemented") @classmethod def from_compiled_obj( cls, compiled_obj: Dict[ConfigType, CompileResult] ) -> "PhysicalIndex": - pass + raise NotImplementedError("Method not yet implemented") def get_backfill_physical_graph( self, conf_name: str, start_date: str, end_date: str ) -> PhysicalGraph: - pass + raise NotImplementedError("Method not yet implemented") def get_deploy_physical_graph(self, conf_name: str, date: str) -> PhysicalGraph: - pass - + raise NotImplementedError("Method not yet implemented") def submit_physical_graph(self, physical_graph: PhysicalGraph) -> str: node_to_physical: Dict[str, PhysicalNode] = physical_graph.flatten() From 25f65ec38d612cc51dc7a3a36e0eeb4aeadf8ef9 Mon Sep 17 00:00:00 2001 From: Nikhil Simha Date: Tue, 11 Mar 2025 20:44:21 -0700 Subject: [PATCH 06/10] ci fix and workflow methods --- .github/workflows/test_python.yaml | 2 ++ .../ai/chronon/cli/plan/controller_iface.py | 36 ++++++++++++++++--- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test_python.yaml b/.github/workflows/test_python.yaml index c1dac3deef..7297f581bc 100644 --- a/.github/workflows/test_python.yaml +++ b/.github/workflows/test_python.yaml @@ -60,6 +60,8 @@ jobs: thrift --gen py -out api/py/ api/thrift/common.thrift thrift --gen py -out api/py/ api/thrift/observability.thrift thrift --gen py -out api/py/ api/thrift/api.thrift + thrift --gen py -out api/py/ api/thrift/lineage.thrift + thrift --gen py -out api/py/ api/thrift/orchestration.thrift cd api/py pip3 install -r requirements/dev.txt pip3 install tox diff --git a/api/py/ai/chronon/cli/plan/controller_iface.py b/api/py/ai/chronon/cli/plan/controller_iface.py index be95826ba6..1980c068f0 100644 --- a/api/py/ai/chronon/cli/plan/controller_iface.py +++ b/api/py/ai/chronon/cli/plan/controller_iface.py @@ -1,12 +1,15 @@ from abc import ABC, abstractmethod -from typing import Dict, List +from typing import Dict, List, Optional +from ai.chronon.cli.plan.physical_graph import PhysicalGraph from ai.chronon.cli.plan.physical_index import PhysicalNode -class OrchestratorIface(ABC): - def __init__(self): - pass +class ControllerIface(ABC): + """ + Class used to make the rest of the planner code agnostic to the underlying orchestrator. + Mainly used to mock out the orchestrator for testing. + """ @abstractmethod def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]: @@ -15,3 +18,28 @@ def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]: @abstractmethod def upload_conf(self, name: str, hash: str, content: str) -> None: pass + + @abstractmethod + def create_workflow( + self, physical_graph: PhysicalGraph, start_date: str, end_date: str + ) -> str: + """ + Submit a physical graph to the orchestrator and return workflow id + """ + pass + + @abstractmethod + def get_workflow_status(self, workflow_id: str) -> str: + """ + Get the status of a workflow + """ + pass + + @abstractmethod + def get_active_workflows( + self, branch: Optional[str] = None, user: Optional[str] = None + ) -> List[str]: + """ + List all active workflows + """ + pass From 290a696b43940d66f0780baa9d320df7b4da12a9 Mon Sep 17 00:00:00 2001 From: ezvz Date: Tue, 18 Mar 2025 17:40:54 -0700 Subject: [PATCH 07/10] WIP --- api/py/ai/chronon/cli/plan/physical_node.py | 32 +++++++-------------- api/thrift/orchestration.thrift | 11 +++++++ 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/api/py/ai/chronon/cli/plan/physical_node.py b/api/py/ai/chronon/cli/plan/physical_node.py index 905a9838d3..e3407c60c0 100644 --- a/api/py/ai/chronon/cli/plan/physical_node.py +++ b/api/py/ai/chronon/cli/plan/physical_node.py @@ -4,30 +4,20 @@ from ai.chronon.api.common.ttypes import TableDependency from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery from ai.chronon.cli.compile.compile_context import CompiledObj +from ai.chronon.orchestration.ttypes import PhysicalNode -@dataclass -class PhysicalNode: - name: str - node_type: str # NodeType - conf: CompiledObj - conf_hash: str - table_dependencies: List[TableDependency] - output_columns: List[str] - output_table: str +def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") - @classmethod - def from_group_by(cls, group_by: GroupBy) -> List["PhysicalNode"]: - raise NotImplementedError("Method not yet implemented") - @classmethod - def from_join(cls, join: Join) -> List["PhysicalNode"]: - raise NotImplementedError("Method not yet implemented") +def from_join(cls, join: Join) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") - @classmethod - def from_staging_query(cls, staging_query: StagingQuery) -> List["PhysicalNode"]: - raise NotImplementedError("Method not yet implemented") - @classmethod - def from_model(cls, model: Model) -> List["PhysicalNode"]: - raise NotImplementedError("Method not yet implemented") +def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") + + +def from_model(cls, model: Model) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index 5900358952..85fe4856ad 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -95,6 +95,17 @@ union PhysicalNodeType { 5: TableNodeType tableNodeType } +struct PhysicalNode { + 1: required string name, + 2: required PhysicalNodeType nodeType, + 3: required LogicalNode logicalNode, + 4: required string confHash, + 5: required list tableDependencies, + 6: required list outputColumns, + 7: required string output_table +} + + // ====================== End of physical node types ====================== From b8326d82d6455d7a9bac6faa2a3c81b6d7202473 Mon Sep 17 00:00:00 2001 From: ezvz Date: Tue, 18 Mar 2025 17:43:59 -0700 Subject: [PATCH 08/10] renaming thrift names --- api/thrift/api.thrift | 4 +++- api/thrift/lineage.thrift | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api/thrift/api.thrift b/api/thrift/api.thrift index 1c21d4a056..b259b7f9e5 100644 --- a/api/thrift/api.thrift +++ b/api/thrift/api.thrift @@ -264,7 +264,9 @@ struct MetaData { // column -> tag_key -> tag_value 21: optional map> columnTags - 8: optional list stageColumnLineages + // A stage is a "sub-transformation" of a given node. For example a `GroupBy` can consist of selects (with SQL expressions), filters (in the form of where clauses), followed by aggregations defined in the Zipline DSL. + // Each of this is a `stage` with its own column level lineage. + 8: optional list stagesWithLineage // marking this as true means that the conf can be served online // once marked online, a conf cannot be changed - compiling the conf won't be allowed diff --git a/api/thrift/lineage.thrift b/api/thrift/lineage.thrift index 341f2039b5..72f74cd9d9 100644 --- a/api/thrift/lineage.thrift +++ b/api/thrift/lineage.thrift @@ -48,7 +48,7 @@ struct ColumnLineage { 4: optional ColumnLineageType lineageType // not present means direct } -struct StageColumnLineage { +struct StageWithLineage { 1: optional ColumnLineageStage stage 2: optional list lineage } \ No newline at end of file From f5cde30bf50fa9726dc49e01e08d869a46a023e9 Mon Sep 17 00:00:00 2001 From: ezvz Date: Tue, 18 Mar 2025 17:46:44 -0700 Subject: [PATCH 09/10] WIP --- api/thrift/lineage.thrift | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/api/thrift/lineage.thrift b/api/thrift/lineage.thrift index 72f74cd9d9..9c0363756e 100644 --- a/api/thrift/lineage.thrift +++ b/api/thrift/lineage.thrift @@ -3,7 +3,7 @@ namespace java ai.chronon.lineage struct Column { 1: optional string logicalNodeName - 2: optional string stageName + 2: optional ColumnLineageStage stageName 3: optional string columnName } @@ -16,18 +16,9 @@ enum ColumnLineageStage { /** -* NOTE: Staging Query will only be purely output columns. -* stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT -**/ -enum ColumnLineageType { - WHERE, - JOIN_KEY, - GROUP_BY_KEY, - DERIVE, - TIMESTAMP, - DIRECT - - /** + * NOTE: Staging Query will only be purely output columns. + * stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT + * * select x+y as a, c, json_extract(payload, "$.meta.user") as user where b = 10 * group_by user * agg - count(a, window=7d, bucket=c) @@ -39,6 +30,13 @@ enum ColumnLineageType { * Stage = AGG, output = a_count_7d_by_c, expression = 'count(a, window=7d, bucket=c)', input_columns = [a, c], type = DIRECT * output=user, input_columns = [user], type = GROUP_BY_KEY **/ +enum ColumnLineageType { + WHERE, + JOIN_KEY, + GROUP_BY_KEY, + DERIVE, + TIMESTAMP, + DIRECT } struct ColumnLineage { From 27785eea1fc0f895ba4139ddf94c9c451950dde8 Mon Sep 17 00:00:00 2001 From: ezvz Date: Tue, 18 Mar 2025 18:17:51 -0700 Subject: [PATCH 10/10] WIP --- api/thrift/orchestration.thrift | 65 ++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index 85fe4856ad..5ab81c0173 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -39,7 +39,60 @@ enum LogicalType { TABULAR_DATA = 5 } +struct NodeKey { + 1: optional string name + 2: optional LogicalType logicalType + 3: optional PhysicalNodeType physicalType + + /** + * represents the computation of the node including the computation of all its parents + * direct and indirect changes that change output will affect lineage hash + **/ + 10: optional string lineageHash +} + +struct NodeInfo { + /** + * represents the computation that a node does + * direct changes to conf that change output will affect semantic hash + * changing spark params etc shouldn't affect this + **/ + 11: optional string semanticHash + + /** + * simple hash of the entire conf (that is TSimpleJsonProtocol serialized), + * computed by cli and used to check if new conf_json need to be pushed from user's machine + **/ + 12: optional string confHash + + /** + * when new/updated conf's are pushed the branch is also set from the cli + * upon merging the branch will be unset + **/ + 20: optional string branch + + /** + * will be set to the author of the last semantic change to node + * (non-semantic changes like code-mods or spark params don't affect this) + **/ + 21: optional string author + + /** + * contents of the conf itself + **/ + 30: optional LogicalNode conf +} + +struct NodeConnections { + 1: optional list parents + 2: optional list children +} + +struct NodeGraph { + 1: optional map connections + 2: optional map infoMap +} @@ -96,12 +149,12 @@ union PhysicalNodeType { } struct PhysicalNode { - 1: required string name, - 2: required PhysicalNodeType nodeType, - 3: required LogicalNode logicalNode, - 4: required string confHash, - 5: required list tableDependencies, - 6: required list outputColumns, + 1: required string name + 2: required PhysicalNodeType nodeType + 3: required LogicalNode logicalNode + 4: required string confHash + 5: required list tableDependencies + 6: required list outputColumns 7: required string output_table }