Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions api/py/ai/chronon/cli/plan/controller_iface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional

from ai.chronon.cli.plan.physical_graph import PhysicalGraph
from ai.chronon.orchestration.ttypes import (
BranchMappingRequest,
DiffResponse,
NodeInfo,
UploadPhysicalNodesResponse,
)


class ControllerIface(ABC):
Expand All @@ -11,20 +16,15 @@ class ControllerIface(ABC):
"""

@abstractmethod
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]:
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
# req = DiffRequest(namesToHashes=node_to_hash)
# TODO -- call API
pass
Comment on lines +18 to 21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement method with proper API call

Placeholder needs implementation.

     def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
-        # req = DiffRequest(namesToHashes=node_to_hash)
-        # TODO -- call API
+        from ai.chronon.orchestration.ttypes import DiffRequest
+        request = DiffRequest(namesToHashes=node_to_hash)
+        # TODO -- Make actual API call to backend service
         pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
# req = DiffRequest(namesToHashes=node_to_hash)
# TODO -- call API
pass
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
from ai.chronon.orchestration.ttypes import DiffRequest
request = DiffRequest(namesToHashes=node_to_hash)
# TODO -- Make actual API call to backend service
pass


@abstractmethod
def upload_conf(self, name: str, hash: str, content: str) -> None:
pass

@abstractmethod
def create_workflow(
self, physical_graph: PhysicalGraph, start_date: str, end_date: str
) -> str:
"""
Submit a physical graph to the orchestrator and return workflow id
"""
def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: upload_branch_mapping

# TODO
BranchMappingRequest()
pass

@abstractmethod
Expand Down
23 changes: 0 additions & 23 deletions api/py/ai/chronon/cli/plan/physical_graph.py

This file was deleted.

3 changes: 2 additions & 1 deletion api/py/ai/chronon/cli/plan/physical_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from ai.chronon.cli.compile.compiler import CompileResult
from ai.chronon.cli.plan.controller_iface import ControllerIface
from ai.chronon.cli.plan.physical_graph import PhysicalGraph
from ai.chronon.cli.plan.physical_node import PhysicalNode
from ai.chronon.lineage.ttypes import Column, ColumnLineage
from ai.chronon.orchestration.ttypes import PhysicalNode


@dataclass
Expand Down Expand Up @@ -46,6 +46,7 @@ def get_backfill_physical_graph(

def get_deploy_physical_graph(self, conf_name: str, date: str) -> PhysicalGraph:
raise NotImplementedError("Method not yet implemented")

def submit_physical_graph(self, physical_graph: PhysicalGraph) -> str:

node_to_physical: Dict[str, PhysicalNode] = physical_graph.flatten()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ai.chronon.orchestration.utils
package ai.chronon.api

import scala.collection.Seq

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package ai.chronon.orchestration.utils
package ai.chronon.api

import ai.chronon.api.Constants
import ai.chronon.api.Query
import ai.chronon.orchestration.utils.CollectionExtensions.JMapExtension
import ai.chronon.api.CollectionExtensions.JMapExtension

case class ColumnExpression(column: String, expression: Option[String]) {
def render: String =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package ai.chronon.orchestration.utils
package ai.chronon.api

import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.JoinPartOps
import ai.chronon.api.Extensions.SourceOps
import ai.chronon.api.Extensions.StringOps
import ai.chronon.api.Extensions.{GroupByOps, JoinPartOps, SourceOps, StringOps}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.orchestration.utils.CollectionExtensions.JMapExtension
import ai.chronon.orchestration.utils.ColumnExpression.getTimeExpression
import CollectionExtensions.JMapExtension
import ai.chronon.api.ColumnExpression.getTimeExpression

// TODO(phase-2): This is not wired into the planner yet
// computes subset of the left source that is relevant for a join part
Expand Down Expand Up @@ -50,18 +46,21 @@ object RelevantLeftForJoinPart {

val combinedHash = HashUtils.md5Hex(relevantLeft.render + joinPart.groupBy.semanticHash).toLowerCase

// removing ns to keep the table name short, hash is enough to differentiate
val leftTable = removeNamespace(relevantLeft.leftTable)

s"${groupByName}__${leftTable}__$combinedHash"
}

def fullPartTableName(join: Join, joinPart: JoinPart): String = {
// POLICY: caches are computed per team / namespace.
// we have four options here
// - use right namespace. other teams typically won't have perms.
// - use a common cache namespace, but this could a way to leak information outside ACLs
// - use right input table namespace, also suffers from perm issue.
// - use the join namespace, this could create duplicate tables, but safest.
val outputNamespace = join.metaData.outputNamespace

// removing ns to keep the table name short, hash is enough to differentiate
val leftTable = removeNamespace(relevantLeft.leftTable)

s"$outputNamespace.${groupByName}__${leftTable}__$combinedHash"
s"$outputNamespace.${partTableName(join, joinPart)}"
}

// changing the left side shouldn't always change the joinPart table
Expand Down
6 changes: 6 additions & 0 deletions api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ai.chronon.api

import ai.chronon.api.Extensions.StringsOps
import ai.chronon.api.HashUtils.md5Bytes
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.thrift.TBase
import ai.chronon.api.thrift.TDeserializer
Expand Down Expand Up @@ -74,6 +75,11 @@ object ThriftJsonCodec {
HashUtils.md5Base64(ThriftJsonCodec.toJsonStr(obj).getBytes(Constants.UTF8))
}

def hexDigest[T <: TBase[_, _]: Manifest](obj: T, length: Int = 6): String = {
// Get the MD5 hash bytes
md5Bytes(serializer.serialize(obj)).map("%02x".format(_)).mkString.take(length)
}

def md5Digest[T <: TBase[_, _]: Manifest](obj: util.List[T]): String = {
HashUtils.md5Base64(ThriftJsonCodec.toJsonList(obj).getBytes(Constants.UTF8))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ai.chronon.orchestration.test
package ai.chronon.api.test

import ai.chronon.orchestration.utils.CollectionExtensions._
import ai.chronon.api.CollectionExtensions._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ai.chronon.orchestration.test
package ai.chronon.api.test

import ai.chronon.api
import ai.chronon.api.Builders._
import ai.chronon.orchestration.utils.RelevantLeftForJoinPart
import ai.chronon.api.RelevantLeftForJoinPart
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -98,8 +98,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
)
)

val baseTableName = RelevantLeftForJoinPart.partTableName(baseJoin, joinPart)
val extraSelectsTableName = RelevantLeftForJoinPart.partTableName(joinWithExtraSelects, joinPart)
val baseTableName = RelevantLeftForJoinPart.fullPartTableName(baseJoin, joinPart)
val extraSelectsTableName = RelevantLeftForJoinPart.fullPartTableName(joinWithExtraSelects, joinPart)

baseTableName shouldEqual extraSelectsTableName
}
Expand All @@ -117,8 +117,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
leftStart = "2024-02-01" // Different start date
)

val baseTableName = RelevantLeftForJoinPart.partTableName(baseJoin, joinPart)
val differentDateTableName = RelevantLeftForJoinPart.partTableName(joinWithDifferentDate, joinPart)
val baseTableName = RelevantLeftForJoinPart.fullPartTableName(baseJoin, joinPart)
val differentDateTableName = RelevantLeftForJoinPart.fullPartTableName(joinWithDifferentDate, joinPart)

baseTableName shouldEqual differentDateTableName
}
Expand All @@ -137,8 +137,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
val (baseJoin, baseJoinPart) = createBasicJoin(groupBy = baseGroupBy)
val (modifiedJoin, modifiedJoinPart) = createBasicJoin(groupBy = modifiedGroupBy)

val baseTableName = RelevantLeftForJoinPart.partTableName(baseJoin, baseJoinPart)
val modifiedTableName = RelevantLeftForJoinPart.partTableName(modifiedJoin, modifiedJoinPart)
val baseTableName = RelevantLeftForJoinPart.fullPartTableName(baseJoin, baseJoinPart)
val modifiedTableName = RelevantLeftForJoinPart.fullPartTableName(modifiedJoin, modifiedJoinPart)

baseTableName should not equal modifiedTableName
}
Expand All @@ -157,8 +157,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
val (baseJoin, baseJoinPart) = createBasicJoin(groupBy = baseGroupBy)
val (modifiedJoin, modifiedJoinPart) = createBasicJoin(groupBy = modifiedGroupBy)

val baseTableName = RelevantLeftForJoinPart.partTableName(baseJoin, baseJoinPart)
val modifiedTableName = RelevantLeftForJoinPart.partTableName(modifiedJoin, modifiedJoinPart)
val baseTableName = RelevantLeftForJoinPart.fullPartTableName(baseJoin, baseJoinPart)
val modifiedTableName = RelevantLeftForJoinPart.fullPartTableName(modifiedJoin, modifiedJoinPart)

baseTableName should not equal modifiedTableName
}
Expand All @@ -177,8 +177,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
val (baseJoin, baseJoinPart) = createBasicJoin(groupBy = baseGroupBy)
val (modifiedJoin, modifiedJoinPart) = createBasicJoin(groupBy = modifiedGroupBy)

val baseTableName = RelevantLeftForJoinPart.partTableName(baseJoin, baseJoinPart)
val modifiedTableName = RelevantLeftForJoinPart.partTableName(modifiedJoin, modifiedJoinPart)
val baseTableName = RelevantLeftForJoinPart.fullPartTableName(baseJoin, baseJoinPart)
val modifiedTableName = RelevantLeftForJoinPart.fullPartTableName(modifiedJoin, modifiedJoinPart)

baseTableName should not equal modifiedTableName
}
Expand All @@ -196,8 +196,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
joinName = "test_join_2" // Different join name
)

val tableName1 = RelevantLeftForJoinPart.partTableName(join1, joinPart)
val tableName2 = RelevantLeftForJoinPart.partTableName(join2, joinPart)
val tableName1 = RelevantLeftForJoinPart.fullPartTableName(join1, joinPart)
val tableName2 = RelevantLeftForJoinPart.fullPartTableName(join2, joinPart)

tableName1 shouldEqual tableName2
}
Expand All @@ -214,8 +214,8 @@ class RelevantLeftForJoinPartSpec extends AnyFlatSpec with Matchers {
groupBy = groupBy
)

val tableNameWithPrefix = RelevantLeftForJoinPart.partTableName(joinWithPrefix, joinPartWithPrefix)
val tableNameWithoutPrefix = RelevantLeftForJoinPart.partTableName(joinWithoutPrefix, joinPartWithoutPrefix)
val tableNameWithPrefix = RelevantLeftForJoinPart.fullPartTableName(joinWithPrefix, joinPartWithPrefix)
val tableNameWithoutPrefix = RelevantLeftForJoinPart.fullPartTableName(joinWithoutPrefix, joinPartWithoutPrefix)

tableNameWithPrefix should not equal tableNameWithoutPrefix
tableNameWithPrefix should include("test_prefix__")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package ai.chronon.orchestration.test
package ai.chronon.api.test

import ai.chronon.api.Query
import ai.chronon.orchestration.utils.ColumnExpression
import ai.chronon.orchestration.utils.ColumnExpression.getTimeExpression
import ai.chronon.api.ColumnExpression.getTimeExpression
import ai.chronon.api.{ColumnExpression, Query}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
Loading
Loading