Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test_python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ jobs:
thrift --gen py -out api/py/ api/thrift/common.thrift
thrift --gen py -out api/py/ api/thrift/observability.thrift
thrift --gen py -out api/py/ api/thrift/api.thrift
thrift --gen py -out api/py/ api/thrift/lineage.thrift
thrift --gen py -out api/py/ api/thrift/orchestration.thrift
cd api/py
pip3 install -r requirements/dev.txt
pip3 install tox
Expand Down
36 changes: 19 additions & 17 deletions api/py/ai/chronon/cli/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from typing import Dict
import click
from datetime import datetime, timedelta

from ai.chronon.cli.compile.compiler import Compiler
from ai.chronon.api.common.ttypes import ConfigType
from ai.chronon.cli.plan.physical_index import (
PhysicalIndex,
get_backfill_physical_graph,
submit_physical_graph,
)
from ai.chronon.cli.compile.compiler import CompileResult, Compiler
from ai.chronon.cli.compile.compile_context import CompileContext
from ai.chronon.cli.git_utils import get_current_branch

Expand All @@ -12,14 +19,11 @@ def cli():
pass


@cli.command()
@click.option("--branch", default=get_current_branch, help="Branch to sync")
def sync(branch):
"""Sync data for the specified branch"""
click.echo(f"\nSyncing data for branch \u001b[32m{branch}\u001b[0m")
def compile() -> Dict[ConfigType, CompileResult]:
compile_context = CompileContext()
compiler = Compiler(compile_context)
compiler.compile(compile_context)
# TODO(orc): add column lineage to objects
return compiler.compile(compile_context)


@cli.command()
Expand All @@ -34,17 +38,15 @@ def sync(branch):
default=datetime.now().strftime("%Y-%m-%d"),
help="End date for backfill (YYYY-MM-DD)",
)
@click.option(
"--scope",
type=click.Choice(["upstream", "self", "downstream"]),
default="upstream",
help="Scope of configs to backfill",
)
def backfill(conf: str, start_date: str, end_date: str, scope: str):
"""Backfill data between start and end dates"""
click.echo(
f"Backfilling with scope {scope} for config {conf} from {start_date} to {end_date}"
@click.option("--branch", default=get_current_branch, help="Branch to sync")
def backfill(conf: str, start_date: str, end_date: str):
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mismatch in function signature.
@click.option provides branch but the function has no branch parameter.

Apply this diff:

-def backfill(conf: str, start_date: str, end_date: str):
+def backfill(conf: str, start_date: str, end_date: str, branch: str):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@click.option("--branch", default=get_current_branch, help="Branch to sync")
def backfill(conf: str, start_date: str, end_date: str):
@click.option("--branch", default=get_current_branch, help="Branch to sync")
def backfill(conf: str, start_date: str, end_date: str, branch: str):

# compile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant comment? (could expand a bit more if you think that's helpful)

compile_result = compile()
physical_index = PhysicalIndex.from_compiled_obj(compile_result)
physical_graph = get_backfill_physical_graph(
conf, physical_index, start_date, end_date
)
submit_physical_graph(physical_graph)


@cli.command()
Expand Down
45 changes: 45 additions & 0 deletions api/py/ai/chronon/cli/plan/controller_iface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from ai.chronon.cli.plan.physical_graph import PhysicalGraph
from ai.chronon.cli.plan.physical_index import PhysicalNode

Comment on lines +1 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused import.

Static analysis correctly identified that PhysicalNode is imported but never used.

from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from ai.chronon.cli.plan.physical_graph import PhysicalGraph
-from ai.chronon.cli.plan.physical_index import PhysicalNode
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from ai.chronon.cli.plan.physical_graph import PhysicalGraph
from ai.chronon.cli.plan.physical_index import PhysicalNode
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from ai.chronon.cli.plan.physical_graph import PhysicalGraph
🧰 Tools
🪛 Ruff (0.8.2)

5-5: ai.chronon.cli.plan.physical_index.PhysicalNode imported but unused

Remove unused import: ai.chronon.cli.plan.physical_index.PhysicalNode

(F401)


class ControllerIface(ABC):
"""
Class used to make the rest of the planner code agnostic to the underlying orchestrator.
Mainly used to mock out the orchestrator for testing.
"""

@abstractmethod
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]:
pass

@abstractmethod
def upload_conf(self, name: str, hash: str, content: str) -> None:
pass

@abstractmethod
def create_workflow(
self, physical_graph: PhysicalGraph, start_date: str, end_date: str
) -> str:
"""
Submit a physical graph to the orchestrator and return workflow id
"""
pass

@abstractmethod
def get_workflow_status(self, workflow_id: str) -> str:
"""
Get the status of a workflow
"""
pass

@abstractmethod
def get_active_workflows(
self, branch: Optional[str] = None, user: Optional[str] = None
) -> List[str]:
"""
List all active workflows
"""
pass
23 changes: 23 additions & 0 deletions api/py/ai/chronon/cli/plan/physical_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import Dict, List

from ai.chronon.cli.plan.physical_index import PhysicalNode


@dataclass
class PhysicalGraph:
node: PhysicalNode
dependencies: List["PhysicalGraph"]
start_date: str
end_date: str

def flatten(self) -> Dict[str, PhysicalNode]:
# recursively find hashes of all nodes in the physical graph

result = {self.node.name: self.node}

for sub_graph in self.dependencies:
sub_hashes = sub_graph.flatten()
result.update(sub_hashes)

return result
65 changes: 65 additions & 0 deletions api/py/ai/chronon/cli/plan/physical_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from dataclasses import dataclass
from typing import Dict, List

from ai.chronon.api.common.ttypes import ConfigType
from ai.chronon.cli.compile.compiler import CompileResult
from ai.chronon.cli.plan.controller_iface import ControllerIface
from ai.chronon.cli.plan.physical_graph import PhysicalGraph
from ai.chronon.lineage.ttypes import Column, ColumnLineage
from ai.chronon.cli.plan.physical_node import PhysicalNode


@dataclass
class PhysicalIndex:
table_to_physical: Dict[str, PhysicalNode]

# TODO incorporate stage column lineage
column_lineage: Dict[Column, ColumnLineage]
controller: ControllerIface

def __init__(
self,
physical_nodes: List[PhysicalNode],
controller: ControllerIface,
branch: str,
):
self.controller = controller
self.table_to_physical = {}
self.column_lineage = {}
self.branch = branch
self.populate_index(physical_nodes)

# TODO: populate index
def populate_index(self, physical_nodes: List[PhysicalNode]):
raise NotImplementedError("Method not yet implemented")

@classmethod
def from_compiled_obj(
cls, compiled_obj: Dict[ConfigType, CompileResult]
) -> "PhysicalIndex":
raise NotImplementedError("Method not yet implemented")

def get_backfill_physical_graph(
self, conf_name: str, start_date: str, end_date: str
) -> PhysicalGraph:
raise NotImplementedError("Method not yet implemented")

def get_deploy_physical_graph(self, conf_name: str, date: str) -> PhysicalGraph:
raise NotImplementedError("Method not yet implemented")
def submit_physical_graph(self, physical_graph: PhysicalGraph) -> str:

node_to_physical: Dict[str, PhysicalNode] = physical_graph.flatten()

node_to_hash = {name: node.conf_hash for name, node in node_to_physical.items()}

missing_conf_names = self.controller.fetch_missing_confs(node_to_hash)
missing_physical_nodes = [
node_to_physical[conf_name] for conf_name in missing_conf_names
]

# upload missing confs
for physical_node in missing_physical_nodes:
hash = physical_node.conf_hash
json = physical_node.conf.tjson
name = physical_node.name
self.controller.upload_conf(name, hash, json)
23 changes: 23 additions & 0 deletions api/py/ai/chronon/cli/plan/physical_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import List

from ai.chronon.api.common.ttypes import TableDependency
from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery
from ai.chronon.cli.compile.compile_context import CompiledObj
from ai.chronon.orchestration.ttypes import PhysicalNode

Comment on lines +1 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unused imports detected.

Several imports are not used in the code.

-from dataclasses import dataclass
from typing import List

-from ai.chronon.api.common.ttypes import TableDependency
from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery
-from ai.chronon.cli.compile.compile_context import CompiledObj
from ai.chronon.orchestration.ttypes import PhysicalNode
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from dataclasses import dataclass
from typing import List
from ai.chronon.api.common.ttypes import TableDependency
from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery
from ai.chronon.cli.compile.compile_context import CompiledObj
from ai.chronon.orchestration.ttypes import PhysicalNode
from typing import List
from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery
from ai.chronon.orchestration.ttypes import PhysicalNode
🧰 Tools
🪛 Ruff (0.8.2)

1-1: dataclasses.dataclass imported but unused

Remove unused import: dataclasses.dataclass

(F401)


4-4: ai.chronon.api.common.ttypes.TableDependency imported but unused

Remove unused import: ai.chronon.api.common.ttypes.TableDependency

(F401)


6-6: ai.chronon.cli.compile.compile_context.CompiledObj imported but unused

Remove unused import: ai.chronon.cli.compile.compile_context.CompiledObj

(F401)


def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")

Comment on lines +10 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing class definition for methods.

Functions have cls parameter indicating they should be class methods.

-def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]:
+@classmethod
+def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")
@classmethod
def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")


def from_join(cls, join: Join) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")

Comment on lines +14 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing class definition for methods (2).

Same issue as previous method.

-def from_join(cls, join: Join) -> List[PhysicalNode]:
+@classmethod
+def from_join(cls, join: Join) -> List[PhysicalNode]:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def from_join(cls, join: Join) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")
@classmethod
def from_join(cls, join: Join) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")


def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")

Comment on lines +18 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing class definition for methods (3).

Same issue as previous method.

-def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]:
+@classmethod
+def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")
@classmethod
def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")


def from_model(cls, model: Model) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")
Comment on lines +22 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing class definition for methods (4).

Same issue as previous method.

-def from_model(cls, model: Model) -> List[PhysicalNode]:
+@classmethod
+def from_model(cls, model: Model) -> List[PhysicalNode]:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def from_model(cls, model: Model) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")
@classmethod
def from_model(cls, model: Model) -> List[PhysicalNode]:
raise NotImplementedError("Method not yet implemented")

5 changes: 5 additions & 0 deletions api/thrift/api.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace java ai.chronon.api

include "common.thrift"
include "observability.thrift"
include "lineage.thrift"

// cd /path/to/chronon
// thrift --gen py -out api/py/ api/thrift/api.thrift
Expand Down Expand Up @@ -263,6 +264,10 @@ struct MetaData {
// column -> tag_key -> tag_value
21: optional map<string, map<string, string>> columnTags

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// A stage is a "sub-transformation" of a given node. For example a `GroupBy` can consist of selects (with SQL expressions), filters (in the form of where clauses), followed by aggregations defined in the Zipline DSL.
// Each of this is a `stage` with its own column level lineage.

// A stage is a "sub-transformation" of a given node. For example a `GroupBy` can consist of selects (with SQL expressions), filters (in the form of where clauses), followed by aggregations defined in the Zipline DSL.
// Each of this is a `stage` with its own column level lineage.
8: optional list<lineage.StageWithLineage> stagesWithLineage

// marking this as true means that the conf can be served online
// once marked online, a conf cannot be changed - compiling the conf won't be allowed
100: optional bool online
Expand Down
10 changes: 8 additions & 2 deletions api/thrift/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ struct TableDependency {
// fully qualified table name
1: optional string table

// params to select the partitions of the table for any query range
// logic is: [max(query.start - startOffset, startCutOff), min(query.end - endOffset, endCutOff)]
// DEPENDENCY_RANGE_LOGIC
// 1. get final start_partition, end_partition
// 2. break into step ranges
// 3. for each dependency
// a. dependency_start: max(query.start - startOffset, startCutOff)
// b. dependency_end: min(query.end - endOffset, endCutOff)
2: optional Window startOffset
3: optional Window endOffset
4: optional string startCutOff
5: optional string endCutOff

# if not present we will pull from defaults
// needed to enumerate what partitions are in a range
100: optional string partitionColumn
101: optional string partitionFormat
102: optional Window partitionInterval
Expand Down Expand Up @@ -116,6 +121,7 @@ struct ExecutionInfo {
4: optional i64 healthCheckIntervalMillis

# relevant for batch jobs
# temporal workflow nodes maintain their own cron schedule
10: optional string scheduleCron
11: optional i32 stepDays
12: optional bool historicalBackfill
Expand Down
52 changes: 52 additions & 0 deletions api/thrift/lineage.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace py ai.chronon.lineage
namespace java ai.chronon.lineage

struct Column {
1: optional string logicalNodeName
2: optional ColumnLineageStage stageName
3: optional string columnName
}

enum ColumnLineageStage {
SELECT,
AGG,
DERIVE,
JOIN
}


/**
* NOTE: Staging Query will only be purely output columns.
* stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT
*
* select x+y as a, c, json_extract(payload, "$.meta.user") as user where b = 10
* group_by user
* agg - count(a, window=7d, bucket=c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we include timestamp in this example too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question - i think all input columns in a events source and temporal entities will automatically need time as a indirect lineage.

*
* Stage = SELECT, output = a, expression = x + y, input_columns = [x, y], type = DIRECT
* expression = "b = 10", input_columns = [b], type = WHERE
* output=user, expression = 'json_extract(payload, "$.meta.user")', input_columns = [payload], type = GROUP_BY_KEY
*
* Stage = AGG, output = a_count_7d_by_c, expression = 'count(a, window=7d, bucket=c)', input_columns = [a, c], type = DIRECT
* output=user, input_columns = [user], type = GROUP_BY_KEY
**/
enum ColumnLineageType {
WHERE,
JOIN_KEY,
GROUP_BY_KEY,
DERIVE,
TIMESTAMP,
DIRECT
}

struct ColumnLineage {
1: optional list<Column> inputColumns
2: optional string expression
3: optional Column outputColumn
4: optional ColumnLineageType lineageType // not present means direct
}

struct StageWithLineage {
1: optional ColumnLineageStage stage
2: optional list<ColumnLineage> lineage
}
Loading