diff --git a/.github/workflows/test_python.yaml b/.github/workflows/test_python.yaml index c1dac3deef..7297f581bc 100644 --- a/.github/workflows/test_python.yaml +++ b/.github/workflows/test_python.yaml @@ -60,6 +60,8 @@ jobs: thrift --gen py -out api/py/ api/thrift/common.thrift thrift --gen py -out api/py/ api/thrift/observability.thrift thrift --gen py -out api/py/ api/thrift/api.thrift + thrift --gen py -out api/py/ api/thrift/lineage.thrift + thrift --gen py -out api/py/ api/thrift/orchestration.thrift cd api/py pip3 install -r requirements/dev.txt pip3 install tox diff --git a/api/py/ai/chronon/cli/entrypoint.py b/api/py/ai/chronon/cli/entrypoint.py index bd62142b3e..512963927d 100644 --- a/api/py/ai/chronon/cli/entrypoint.py +++ b/api/py/ai/chronon/cli/entrypoint.py @@ -1,7 +1,14 @@ +from typing import Dict import click from datetime import datetime, timedelta -from ai.chronon.cli.compile.compiler import Compiler +from ai.chronon.api.common.ttypes import ConfigType +from ai.chronon.cli.plan.physical_index import ( + PhysicalIndex, + get_backfill_physical_graph, + submit_physical_graph, +) +from ai.chronon.cli.compile.compiler import CompileResult, Compiler from ai.chronon.cli.compile.compile_context import CompileContext from ai.chronon.cli.git_utils import get_current_branch @@ -12,14 +19,11 @@ def cli(): pass -@cli.command() -@click.option("--branch", default=get_current_branch, help="Branch to sync") -def sync(branch): - """Sync data for the specified branch""" - click.echo(f"\nSyncing data for branch \u001b[32m{branch}\u001b[0m") +def compile() -> Dict[ConfigType, CompileResult]: compile_context = CompileContext() compiler = Compiler(compile_context) - compiler.compile(compile_context) + # TODO(orc): add column lineage to objects + return compiler.compile(compile_context) @cli.command() @@ -34,17 +38,15 @@ def sync(branch): default=datetime.now().strftime("%Y-%m-%d"), help="End date for backfill (YYYY-MM-DD)", ) -@click.option( - "--scope", - type=click.Choice(["upstream", "self", "downstream"]), - default="upstream", - help="Scope of configs to backfill", -) -def backfill(conf: str, start_date: str, end_date: str, scope: str): - """Backfill data between start and end dates""" - click.echo( - f"Backfilling with scope {scope} for config {conf} from {start_date} to {end_date}" +@click.option("--branch", default=get_current_branch, help="Branch to sync") +def backfill(conf: str, start_date: str, end_date: str): + # compile + compile_result = compile() + physical_index = PhysicalIndex.from_compiled_obj(compile_result) + physical_graph = get_backfill_physical_graph( + conf, physical_index, start_date, end_date ) + submit_physical_graph(physical_graph) @cli.command() diff --git a/api/py/ai/chronon/cli/plan/controller_iface.py b/api/py/ai/chronon/cli/plan/controller_iface.py new file mode 100644 index 0000000000..1980c068f0 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/controller_iface.py @@ -0,0 +1,45 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Optional + +from ai.chronon.cli.plan.physical_graph import PhysicalGraph +from ai.chronon.cli.plan.physical_index import PhysicalNode + + +class ControllerIface(ABC): + """ + Class used to make the rest of the planner code agnostic to the underlying orchestrator. + Mainly used to mock out the orchestrator for testing. + """ + + @abstractmethod + def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]: + pass + + @abstractmethod + def upload_conf(self, name: str, hash: str, content: str) -> None: + pass + + @abstractmethod + def create_workflow( + self, physical_graph: PhysicalGraph, start_date: str, end_date: str + ) -> str: + """ + Submit a physical graph to the orchestrator and return workflow id + """ + pass + + @abstractmethod + def get_workflow_status(self, workflow_id: str) -> str: + """ + Get the status of a workflow + """ + pass + + @abstractmethod + def get_active_workflows( + self, branch: Optional[str] = None, user: Optional[str] = None + ) -> List[str]: + """ + List all active workflows + """ + pass diff --git a/api/py/ai/chronon/cli/plan/physical_graph.py b/api/py/ai/chronon/cli/plan/physical_graph.py new file mode 100644 index 0000000000..b016311723 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/physical_graph.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import Dict, List + +from ai.chronon.cli.plan.physical_index import PhysicalNode + + +@dataclass +class PhysicalGraph: + node: PhysicalNode + dependencies: List["PhysicalGraph"] + start_date: str + end_date: str + + def flatten(self) -> Dict[str, PhysicalNode]: + # recursively find hashes of all nodes in the physical graph + + result = {self.node.name: self.node} + + for sub_graph in self.dependencies: + sub_hashes = sub_graph.flatten() + result.update(sub_hashes) + + return result diff --git a/api/py/ai/chronon/cli/plan/physical_index.py b/api/py/ai/chronon/cli/plan/physical_index.py new file mode 100644 index 0000000000..a99adf8c35 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/physical_index.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass +from typing import Dict, List + +from ai.chronon.api.common.ttypes import ConfigType +from ai.chronon.cli.compile.compiler import CompileResult +from ai.chronon.cli.plan.controller_iface import ControllerIface +from ai.chronon.cli.plan.physical_graph import PhysicalGraph +from ai.chronon.lineage.ttypes import Column, ColumnLineage +from ai.chronon.cli.plan.physical_node import PhysicalNode + + +@dataclass +class PhysicalIndex: + table_to_physical: Dict[str, PhysicalNode] + + # TODO incorporate stage column lineage + column_lineage: Dict[Column, ColumnLineage] + controller: ControllerIface + + def __init__( + self, + physical_nodes: List[PhysicalNode], + controller: ControllerIface, + branch: str, + ): + self.controller = controller + self.table_to_physical = {} + self.column_lineage = {} + self.branch = branch + self.populate_index(physical_nodes) + + # TODO: populate index + def populate_index(self, physical_nodes: List[PhysicalNode]): + raise NotImplementedError("Method not yet implemented") + + @classmethod + def from_compiled_obj( + cls, compiled_obj: Dict[ConfigType, CompileResult] + ) -> "PhysicalIndex": + raise NotImplementedError("Method not yet implemented") + + def get_backfill_physical_graph( + self, conf_name: str, start_date: str, end_date: str + ) -> PhysicalGraph: + raise NotImplementedError("Method not yet implemented") + + def get_deploy_physical_graph(self, conf_name: str, date: str) -> PhysicalGraph: + raise NotImplementedError("Method not yet implemented") + def submit_physical_graph(self, physical_graph: PhysicalGraph) -> str: + + node_to_physical: Dict[str, PhysicalNode] = physical_graph.flatten() + + node_to_hash = {name: node.conf_hash for name, node in node_to_physical.items()} + + missing_conf_names = self.controller.fetch_missing_confs(node_to_hash) + missing_physical_nodes = [ + node_to_physical[conf_name] for conf_name in missing_conf_names + ] + + # upload missing confs + for physical_node in missing_physical_nodes: + hash = physical_node.conf_hash + json = physical_node.conf.tjson + name = physical_node.name + self.controller.upload_conf(name, hash, json) diff --git a/api/py/ai/chronon/cli/plan/physical_node.py b/api/py/ai/chronon/cli/plan/physical_node.py new file mode 100644 index 0000000000..e3407c60c0 --- /dev/null +++ b/api/py/ai/chronon/cli/plan/physical_node.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import List + +from ai.chronon.api.common.ttypes import TableDependency +from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery +from ai.chronon.cli.compile.compile_context import CompiledObj +from ai.chronon.orchestration.ttypes import PhysicalNode + + +def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") + + +def from_join(cls, join: Join) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") + + +def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") + + +def from_model(cls, model: Model) -> List[PhysicalNode]: + raise NotImplementedError("Method not yet implemented") diff --git a/api/thrift/api.thrift b/api/thrift/api.thrift index 965db610fe..b259b7f9e5 100644 --- a/api/thrift/api.thrift +++ b/api/thrift/api.thrift @@ -3,6 +3,7 @@ namespace java ai.chronon.api include "common.thrift" include "observability.thrift" +include "lineage.thrift" // cd /path/to/chronon // thrift --gen py -out api/py/ api/thrift/api.thrift @@ -263,6 +264,10 @@ struct MetaData { // column -> tag_key -> tag_value 21: optional map> columnTags + // A stage is a "sub-transformation" of a given node. For example a `GroupBy` can consist of selects (with SQL expressions), filters (in the form of where clauses), followed by aggregations defined in the Zipline DSL. + // Each of this is a `stage` with its own column level lineage. + 8: optional list stagesWithLineage + // marking this as true means that the conf can be served online // once marked online, a conf cannot be changed - compiling the conf won't be allowed 100: optional bool online diff --git a/api/thrift/common.thrift b/api/thrift/common.thrift index 02c9bfc3ef..73ccc33302 100644 --- a/api/thrift/common.thrift +++ b/api/thrift/common.thrift @@ -67,14 +67,19 @@ struct TableDependency { // fully qualified table name 1: optional string table - // params to select the partitions of the table for any query range - // logic is: [max(query.start - startOffset, startCutOff), min(query.end - endOffset, endCutOff)] + // DEPENDENCY_RANGE_LOGIC + // 1. get final start_partition, end_partition + // 2. break into step ranges + // 3. for each dependency + // a. dependency_start: max(query.start - startOffset, startCutOff) + // b. dependency_end: min(query.end - endOffset, endCutOff) 2: optional Window startOffset 3: optional Window endOffset 4: optional string startCutOff 5: optional string endCutOff # if not present we will pull from defaults + // needed to enumerate what partitions are in a range 100: optional string partitionColumn 101: optional string partitionFormat 102: optional Window partitionInterval @@ -116,6 +121,7 @@ struct ExecutionInfo { 4: optional i64 healthCheckIntervalMillis # relevant for batch jobs + # temporal workflow nodes maintain their own cron schedule 10: optional string scheduleCron 11: optional i32 stepDays 12: optional bool historicalBackfill diff --git a/api/thrift/lineage.thrift b/api/thrift/lineage.thrift new file mode 100644 index 0000000000..9c0363756e --- /dev/null +++ b/api/thrift/lineage.thrift @@ -0,0 +1,52 @@ +namespace py ai.chronon.lineage +namespace java ai.chronon.lineage + +struct Column { + 1: optional string logicalNodeName + 2: optional ColumnLineageStage stageName + 3: optional string columnName +} + +enum ColumnLineageStage { + SELECT, + AGG, + DERIVE, + JOIN +} + + +/** + * NOTE: Staging Query will only be purely output columns. + * stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT + * + * select x+y as a, c, json_extract(payload, "$.meta.user") as user where b = 10 + * group_by user + * agg - count(a, window=7d, bucket=c) + * + * Stage = SELECT, output = a, expression = x + y, input_columns = [x, y], type = DIRECT + * expression = "b = 10", input_columns = [b], type = WHERE + * output=user, expression = 'json_extract(payload, "$.meta.user")', input_columns = [payload], type = GROUP_BY_KEY + * + * Stage = AGG, output = a_count_7d_by_c, expression = 'count(a, window=7d, bucket=c)', input_columns = [a, c], type = DIRECT + * output=user, input_columns = [user], type = GROUP_BY_KEY + **/ +enum ColumnLineageType { + WHERE, + JOIN_KEY, + GROUP_BY_KEY, + DERIVE, + TIMESTAMP, + DIRECT +} + +struct ColumnLineage { + 1: optional list inputColumns + 2: optional string expression + 3: optional Column outputColumn + 4: optional ColumnLineageType lineageType // not present means direct +} + +struct StageWithLineage { + 1: optional ColumnLineageStage stage + 2: optional list lineage +} \ No newline at end of file diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index e2b033726c..5ab81c0173 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -30,6 +30,7 @@ union LogicalNode { 5: TabularData tabularData } + enum LogicalType { GROUP_BY = 1, JOIN = 2, @@ -147,38 +148,19 @@ union PhysicalNodeType { 5: TableNodeType tableNodeType } -// ====================== End of physical node types ====================== - -/** -* Multiple logical nodes could share the same physical node -* For that reason we don't have a 1-1 mapping between logical and physical nodes -**/ -struct PhysicalNodeKey { - 1: optional string name - 2: optional PhysicalNodeType nodeType - - /** - * parentLineageHashes[] + semanticHash of the portion of compute this node does - **/ - 20: optional string lineageHash - -} - struct PhysicalNode { - 1: optional string name - 2: optional PhysicalNodeType nodeType + 1: required string name + 2: required PhysicalNodeType nodeType + 3: required LogicalNode logicalNode + 4: required string confHash + 5: required list tableDependencies + 6: required list outputColumns + 7: required string output_table +} - /** - * parentLineageHashes[] + semanticHash of the portion of compute this node does - **/ - 20: optional string version - 3: optional LogicalNode config - 21: optional string branch +// ====================== End of physical node types ====================== - // null means ad-hoc, zero means continuously running - 40: optional common.Window scheduleInterval -} struct SourceWithFilter { 1: optional api.Source source @@ -260,4 +242,56 @@ struct JoinPartJobArgs { * -- Phase 4 Plan -- model training * Model::training - [source.table] >> training * Model::bulk_inference - [source.table] >> bulk_inference -**/ \ No newline at end of file +**/ + + +/** +* physical node -> workflow id +* +* +* ad-hoc -> graph +* we will trigger the root node with the right start_date and end_date +* +* +* Global Scheduler Workflow: +* 1. wakeup more frequently 15 minutes +* 2. scan database for unscheduled workflows +* 3. trigger unscheduled but required statuses +* +* +* Workflow is always triggered externally: +* +* node.trigger(start_date?, end_date, branch, is_scheduled): +* +* # activity - 1 +* (missing_start, missing_end) = partition_dao.find_missing(start_date?, end_date) +* missing_steps = compute_steps(missing_start, missing_end, branch_dao.get_step_days(this)) +* +* foreach_par missing_step in missing_steps: +* foreach_par dependency in dependencies: +* +* if dependency.is_internal: +* +* (dep_start, dep_end) = dependency.compute_range(missing_step.start, missing_step.end) +* # activity - 2 +* dependency.trigger_and_wait(dep_start, dep_end, branch) +* +* else: +* +* # activity - 3 +* if is_scheduled: +* dependency.wait(dep_start, dep_end) +* else: +* dependency.fail_if_absent(dep_start, dep_end) +* +* # activity - 4 +* node.submit_work_and_wait(missing_start, missing_end, branch_dao.get_conf(this)) +* +* return +* +* +* +* +* sync(physical_graph): +* +**/