diff --git a/api/py/ai/chronon/utils.py b/api/py/ai/chronon/utils.py index ca41033795..c5236b9551 100644 --- a/api/py/ai/chronon/utils.py +++ b/api/py/ai/chronon/utils.py @@ -262,6 +262,7 @@ def join_part_output_table_name(join, jp, full_name: bool = False): def partOutputTable(jp: JoinPart): String = (Seq(join.metaData.outputTable) ++ Option(jp.prefix) :+ jp.groupBy.metaData.cleanName).mkString("_") """ + print(join) if not join.metaData.name and isinstance(join, api.Join): __set_name(join, api.Join, "joins") return "_".join( diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index 1e33729ea7..fe55dba784 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -82,18 +82,6 @@ struct NodeInfo { 30: optional LogicalNode conf } - -/** First Pass -* NodeInstance::(name, type, conf_hash) -> #[parent_nodes] -* Node::(name, type) -> #[conf_hash] - -* Second Pass -* Node::(name, type, compute_hash) -> #[parent_nodes] - -* different file_hashes but same lineage_hash should all go into the same orchestrator workflow -* Node::(name, type, lineage_hash) -**/ - struct NodeConnections { 1: optional list parents 2: optional list children @@ -272,7 +260,7 @@ struct TableDependency { * JoinParts could use data from batch backfills or upload tables when available * When not available they shouldn't force computation of the backfills and upload tables. **/ - 21: optional bool forceComputae + 21: optional bool forceCompute } union Dependency { diff --git a/build.sbt b/build.sbt index 9d36260dc0..53710f505f 100644 --- a/build.sbt +++ b/build.sbt @@ -125,7 +125,9 @@ lazy val api = project "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", "com.novocode" % "junit-interface" % "0.11" % "test", "org.scalatest" %% "scalatest" % "3.2.19" % "test", - "org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test" + "org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test", + // needed by thrift + "org.slf4j" % "slf4j-api" % slf4jApiVersion, ) ) @@ -394,33 +396,22 @@ lazy val hub = (project in file("hub")) } ) -val scala_test = "org.scalatest" %% "scalatest" % "3.2.19" % "test" -val sl4j = "org.slf4j" % "slf4j-api" % slf4jApiVersion -val logback = "ch.qos.logback" % "logback-classic" % logbackClassicVersion -val commonDependencies = Seq( - scala_test, - sl4j, - logback -) // orchestrator lazy val orchestration = project .dependsOn(online.%("compile->compile;test->test")) .settings( assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"), + Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"), - assembly / assemblyMergeStrategy := { - case "log4j2.properties" => MergeStrategy.first - case "META-INF/log4j-provider.properties" => MergeStrategy.first - case PathList("org", "apache", "logging", "log4j", "core", "config", "plugins", "Log4j2Plugins.dat") => - MergeStrategy.first - case x => (assembly / assemblyMergeStrategy).value(x) - }, - libraryDependencies ++= commonDependencies ++ Seq( - "org.apache.logging.log4j" % "log4j-api" % log4j2_version, - "org.apache.logging.log4j" % "log4j-core" % log4j2_version, - "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version - ) + Compile / unmanagedResourceDirectories += baseDirectory.value / "src" / "main" / "resources", + + libraryDependencies ++= Seq( + "org.apache.logging.log4j" %% "log4j-api-scala" % "13.1.0", + "org.apache.logging.log4j" % "log4j-core" % "2.20.0", +// "org.slf4j" % "slf4j-api" % slf4jApiVersion, + "org.scalatest" %% "scalatest" % "3.2.19" % "test", + ), ) ThisBuild / assemblyMergeStrategy := { diff --git a/orchestration/README.md b/orchestration/README.md new file mode 100644 index 0000000000..1701665839 --- /dev/null +++ b/orchestration/README.md @@ -0,0 +1,290 @@ + + +# Branch support + +We want to support "branches" that allow users to run pipelines and services with two +critical goals: + +- don't pollute production datasets and end-points +- are cheap, by re-using as much of existing datasets as possible + +## Scenarios within experimentation + +While developing on a branch - users could +- make semantic updates - that change output data - eg., logic within where-s, selects, aggregations etc +- make non-semantic updates - that don't change output data - eg., spark exec memory, # of pods / workers etc + - *note that everything that is stored in metadata field within our API's are non-semantic fields* +- add new nodes +- delete existing nodes +- make changes to several compute nodes at once +- decide to merge the branch into master + +With this context, the goal of this document is to develop / describe a representation to handle the above user workflows. + + +## Motivating Example + +Legend: +``` +"sq" stands for StagingQuery "j" for Join +"t" stands for table "m" for Model +"gb" for GroupBy +``` + +Nodes will be numbered - `gb4`, `m2` etc + +Semantic changes to node notated using a plus "+". +Eg., Join `J3` becomes `J3+` + +Non-Semantic changes with an asterisk "*" - `J3*` + + +```mermaid +--- +title: Initial state of the example +--- + +graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; +``` + +### Semantic updates + +Say that, `sq1` changes semantically to `sq1+`. It is going to change the output of all +nodes downstream of it. + +```mermaid +--- +title: sq1 is updated semantically +--- + +graph TD; + sq1+-->t1+; + t1+-->gb1+; + gb1+-->j1+; + t2-->gb2; + gb2-->j1+; + j1+-->m1+; + gb1+-->j2+; + + style sq1+ fill:wheat,color:black,stroke:#333 + style t1+ fill:wheat,color:black,stroke:#333 + style gb1+ fill:wheat,color:black,stroke:#333 + style j1+ fill:wheat,color:black,stroke:#333 + style j2+ fill:wheat,color:black,stroke:#333 + style m1+ fill:wheat,color:black,stroke:#333 +``` + +> A major concern here is that, if the local repository of the user is behind remote, +> we will a lot more changes than the user intends to. + +One approach to mitigate this is to, only make the CLI only pick up changes to files listed as edited by +commits to the git branch. + +Another approach is to force user to rebase on any change to the repo. However, this does not +guarantee that changes while the job is running is accounted for. + +### Non Semantic updates +Instead, if `sq1` changes non-semantically to `sq1-`. None of the downstream nodes would change. + +```mermaid +--- +title: sq1 is updated non-semantically +--- + + graph TD; + sq1*-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + + style sq1* fill:lavender,color:black,stroke:#333 +``` + +Depending on who is running the job we need to decide which version of the node to use +- if the branch author is causing the node to be computed we need to use `sq1*` instead of `sq1` +- if the prod flow or other authors who haven't updated `sq1` are causing the compute, we should use `sq1` +- if another branch is also updating `sq1` non semantically to `sq1**` we need to use that instead. + +### Adding new nodes + +Adding new leaf nodes will not impact any of the existing nodes. + +```mermaid +--- +title: m2 is added +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + j2-->m2; + + style m2 fill:lightgreen,color:black,stroke:#333 +``` + + +But adding non-leaf node - +as parent to existing node - would almost always cause semantic updates to nodes downstream. + +```mermaid +--- +title: gb3 is added +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1+; + t2-->gb2; + t3-->gb3; + gb2-->j1+; + gb3-->j1+; + j1+-->m1+; + gb1-->j2; + + style t3 fill:lightgreen,color:black,stroke:#333 + style gb3 fill:lightgreen,color:black,stroke:#333 + style j1+ fill:wheat,color:black,stroke:#333 + style m1+ fill:wheat,color:black,stroke:#333 +``` + +One interesting case here is migrating the sql from an external system to StagingQuery of an +already used table. Even though this is not a leaf node, absorbing it as same as a leaf node change +would be the right thing to do. + + +```mermaid +--- +title: sq2 is added +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + sq2-->t2; + + style sq2 fill:lightgreen,color:black,stroke:#333 +``` + +### Deleting existing nodes + +Deleting leaf nodes is straight forward. We just need to program a cleanup mechanism +to remove data and pipelines generated by that node. + +```mermaid +--- +title: m1 is deleted +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + sq2-->t2; + + style m1 fill:coral,color:black,stroke:#333 +``` + +Indirectly connected components - via table references - shouldn't be allowed to be deleted +as long as there are nodes that depend on the table. We will fail this during the sync step. + +```mermaid +--- +title: sq1 is deleted (not-allowed) +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + sq2-->t2; + + style sq1 fill:coral,color:black,stroke:#333 +``` + +Directly connected parents when deleted will have updates in the child node - or the compilation +would fail. In these cases it would be ideal to garbage collect upstream chain of the deleted node. + +```mermaid +--- +title: gb2 is deleted, j1 is updated +--- + +graph TD; + sq2-->t2; + sq1-->t1; + t1-->gb1; + gb1-->j1+; + t2-->gb2; + gb2-->j1+ + j1+-->m1+; + gb1-->j2; + + style sq2 fill:coral,color:black,stroke:#333 + style t2 fill:coral,color:black,stroke:#333 + style gb2 fill:coral,color:black,stroke:#333 + style j1+ fill:wheat,color:black,stroke:#333 + style m1+ fill:wheat,color:black,stroke:#333 +``` + +### Isolating the changed assets + +While development on a branch is in progress we need to create temporary data assets for +the semantically changed nodes - shown in yellow above. Adds, Deletes & Semantic Updates could +trigger this flow. + + +#### Logic to achieve isolation +Make a new copy of the conf & update the name (file name & metadata.name) - + +`new_name = old_name + '_' + branch_name` + +This needs to be followed by changing references in the downstream nodes - +all tables and nodes downstream will have the branch suffix. + +``` +# 1. cli sends file_hash_map to remote + +local_file_map = repo.compiled.file_hash_map +remote_file_map = remote.file_map +deleted = remote_file_map - local_file_map +added = local_file_map - remote_file_map +updated = [k in intersect(local_file_map, remote_file_map)] +# 2. remote marks the changed files it needs + +(node, lineage_hash) => +``` + +### Merging changes into `main` + +- Deletes should actually trigger asset and pipeline clean up. +- Updates should trigger asset renaming +- Adds can work as is - we are not suffixing adds. diff --git a/orchestration/src/main/resources/log4j2.properties b/orchestration/src/main/resources/log4j2.properties index bea9badbab..7df679e8b0 100644 --- a/orchestration/src/main/resources/log4j2.properties +++ b/orchestration/src/main/resources/log4j2.properties @@ -7,7 +7,7 @@ appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_OUT appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %yellow{%d{yyyy/MM/dd HH:mm:ss}} %highlight{%-5level} %green{%file:%line} - %message%n +appender.console.layout.pattern = %cyan{%d{yyyy/MM/dd HH:mm:ss}} %highlight{%-5level} %magenta{%file:%line} - %message%n # Configure specific logger logger.chronon.name = ai.chronon diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala deleted file mode 100644 index 999954c360..0000000000 --- a/orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala +++ /dev/null @@ -1,90 +0,0 @@ -package ai.chronon.orchestration - -import ai.chronon.orchestration.logical.GroupByNodeImpl -import ai.chronon.orchestration.logical.JoinNodeImpl -import ai.chronon.orchestration.logical.LogicalNodeImpl -import ai.chronon.orchestration.logical.ModelNodeImpl -import ai.chronon.orchestration.logical.StagingQueryNodeImpl -import ai.chronon.orchestration.logical.TabularDataNodeImpl -import ai.chronon.orchestration.utils.Config.getType - -import scala.collection.mutable - -object Builders { - def nodeKey(name: String, logicalType: LogicalType, lineageHash: String = null): NodeKey = { - val result = new NodeKey() - result.setName(name) - result.setLogicalType(logicalType) - result.setLineageHash(lineageHash) - result - } -} - -case class NodeInfo(inputConfigs: Seq[LogicalNode], config: LogicalNode, outputTables: Seq[String]) - -case class LineageIndex(tableToParent: mutable.Map[String, mutable.Buffer[NodeKey]] = mutable.HashMap.empty, - toChildConfig: mutable.Map[NodeKey, mutable.Buffer[NodeKey]] = mutable.HashMap.empty, - toInfo: mutable.Map[NodeKey, NodeInfo] = mutable.HashMap.empty) { - // node, parent_nodes, child_nodes, input_tables, output_tables -} - -object LineageIndex { - - def apply(logicalSet: LogicalSet): LineageIndex = { - val index = LineageIndex() - logicalSet.toLogicalNodes.foreach(update(_, index)) - index - } - - def update(node: LogicalNodeImpl, index: LineageIndex): Unit = { - - val nodeKey = toKey(node) - if (index.toInfo.contains(nodeKey)) return - - // update nodeInfo - val parents = node.parents - val info = NodeInfo(parents, node.toConfig, node.outputTables) - index.toInfo.put(nodeKey, info) - - // register parents of tables - node.outputTables.foreach { t => - val tabularNode = new TabularData() - tabularNode.setTable(t) - tabularNode.setType(node.tabularDataType) - - toKey(TabularDataNodeImpl(tabularNode)) - - index.tableToParent - .getOrElseUpdate(t, mutable.ArrayBuffer.empty) - .append(nodeKey) - } - - // register children of parents - parents.foreach { parent => - val parentNode = configToNode(parent) - val parentKey = toKey(parentNode) - - index.toChildConfig - .getOrElseUpdate(parentKey, mutable.ArrayBuffer.empty) - .append(nodeKey) - - update(parentNode, index) - } - } - - private def configToNode(config: LogicalNode): LogicalNodeImpl = { - config match { - case c if c.isSetJoin => JoinNodeImpl(c.getJoin) - case c if c.isSetGroupBy => GroupByNodeImpl(c.getGroupBy) - case c if c.isSetStagingQuery => StagingQueryNodeImpl(c.getStagingQuery) - case c if c.isSetModel => ModelNodeImpl(c.getModel) - case c if c.isSetTabularData => TabularDataNodeImpl(c.getTabularData) - } - } - - private def toKey(node: LogicalNodeImpl): NodeKey = { - val nodeType = getType(node.toConfig) - Builders.nodeKey(node.name, nodeType) - } - -} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala b/orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala deleted file mode 100644 index 2cd61f158f..0000000000 --- a/orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala +++ /dev/null @@ -1,25 +0,0 @@ -package ai.chronon.orchestration - -import ai.chronon.api._ -import ai.chronon.orchestration.logical._ - -// Main LogicalSet class -case class LogicalSet(joins: Seq[Join] = Seq.empty, - groupBys: Seq[GroupBy] = Seq.empty, - stagingQueries: Seq[StagingQuery] = Seq.empty, - models: Seq[Model] = Seq.empty) { - - def toLogicalNodes: Seq[LogicalNodeImpl] = - joins.map(JoinNodeImpl) ++ - groupBys.map(GroupByNodeImpl) ++ - stagingQueries.map(StagingQueryNodeImpl) ++ - models.map(ModelNodeImpl) - - def :+(join: Join): LogicalSet = copy(joins = joins :+ join) - - def :+(groupBy: GroupBy): LogicalSet = copy(groupBys = groupBys :+ groupBy) - - def :+(stagingQuery: StagingQuery): LogicalSet = copy(stagingQueries = stagingQueries :+ stagingQuery) - - def :+(model: Model): LogicalSet = copy(models = models :+ model) -} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/README.md b/orchestration/src/main/scala/ai/chronon/orchestration/README.md deleted file mode 100644 index 603db705c1..0000000000 --- a/orchestration/src/main/scala/ai/chronon/orchestration/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Global planner for chronon workloads - -We will currently focus only on batch workloads - -## Batch - -We have a few stages of logic in this pipeline. - -> NOTE: `>>` represents dependency operator. `a >> b` means b depends on a. - -1. parse confs - 1. walk the directory structure of chronon and parse configs => logical nodes - -2. convert logical nodes to physical nodes - 1. Foreach groupBy - if coalescing is enabled - 1. insert groupBy's into `coalesced = index[conf.source + conf.key]` - 2. compare previous coalesced to new coalesced and do tetris backfill - 2. Foreach groupBy - 1. if fct + batch - 1. logical nodes are - partition_ir >> snapshot (== backfill) >> upload - 2. if fct + realtime - 1. logical nodes are - partition_ir (collapsed) + raw (tails) >> sawtooth_ir - 2. sawtooth_ir >> uploads (if online = True) - 3. sawtooth_ir >> snapshots (if backfill = true) - 3. if dim + batch - 1. can't cache - raw >> snapshot, snapshot >> upload - 4. if dim + realtime - 1. raw >> sawtooth_ir - 5. scd works similar to fct - 3. For each join - 1. bootstrap_source(s) >> joinPart tetris - 2. (source_hash + group_by) >> joinPart - - 3. left-fct - 1. gb-case::(dim/fct batch) snapshot >> joinPart - 2. gb-case::(fct realtime) sawtooth + ds_raw >> joinPart - 3. gb-case::(dim realtime) sawtooth + ds_mutations >> joinPart - 4. left-dim - 1. gb-case::dim/fct snapshot >> joinPart - 5. left-scd is sub-optimal - we ask to use staging query to convert to fct - - 6. bootstrap >> missingPartRanges >> joinParts >> final join - 7. join_raw >> derivations (view?) - 8. label source + left >> label_part - 9. label_part + join_raw >> final join - -3. workflow - 1. trigger staging query submissions on submit - 2. trigger gb_snapshot & sawtooth_ir computations on submit - 3. trigger join computations on schedule or on-demand diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala new file mode 100644 index 0000000000..d49b8cee81 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -0,0 +1,302 @@ +package ai.chronon.orchestration + +import ai.chronon.orchestration.RepoIndex._ +import ai.chronon.orchestration.RepoTypes._ +import ai.chronon.orchestration.utils.CollectionExtensions.IteratorExtensions +import ai.chronon.orchestration.utils.SequenceMap +import ai.chronon.orchestration.utils.StringExtensions.StringOps +import org.apache.logging.log4j.scala.Logging + +import scala.collection.mutable + +/** + * Indexer to store and assign versions to nodes in the repo based on their lineage. + * This also manages versions and contents across multiple branches. + * + * Careful consideration has been given to avoid O(N*N) operations in this class. + * + * We use a generic conf type T - make it easy to test complex cases. + * + * We expect the cli tool to + * first, call [[diff]] to see which files are not already present in the index. + * second, call [[addFiles]] to add updated/new nodes to the index along with their branch name. + * you can also dry-run the [[addFiles]] method to see potential versions that will be assigned. + * upon closing of a branch, call [[pruneBranch]] to remove un-used the contents of the branch from the index. + * + * merging into master - is simply calling [[addNodes]] with the new contents of master post merge. + * + * NOTE: This class is not thread safe. Only one thread should access this index at a time. + */ +class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { + + /** + * We don't duplicate contents of files across branches and commits in the index + * We just store contents by fileHash and branch to fileHash mapping separately. + */ + private val branchToFileHash: TriMap[Branch, Name, FileHash] = mutable.Map.empty + private val fileHashToContent: TriMap[Name, FileHash, NodeContent[T]] = mutable.Map.empty + + /** + * Versions are globally unique, we compute global hashes for each node based on lineage + * and then assign a version to it. We store these versions per branch, node in the [[branchVersionIndex]] + */ + private val branchVersionIndex: TriMap[Branch, Name, Version] = mutable.Map.empty + + /** + * We use a [[SequenceMap]] to then take this global hash and assign a version to it + * version entries in the sequencer are never deleted. So that, + * upon reverting changes we can refer to older version instead + * of bumping it unnecessarily. This will avoid re-computation of assets. + */ + private val versionSequencer: SequenceMap[Name, GlobalHash] = new SequenceMap[Name, GlobalHash] + + /** + * @param fileHashes map of all the file names to their hashes in the branch. + * @param newNodes nodes that aren't already present in the index. De-duped across branches. + * @param branch branch on which the user is making the changes. + * @param dryRun when true shows potential versions that will be assigned to nodes without modifying the index. + * @return + * - a list of version updates that will be applied to the index. + */ + def addNodes(fileHashes: mutable.Map[Name, FileHash], + newNodes: Seq[T], + branch: Branch, + dryRun: Boolean = true): Seq[VersionUpdate] = { + + val newContents = buildContentMap(proc, newNodes, fileHashes) + val enrichedFileHashes = newContents.map { + case (name, content) => name -> content.localData.fileHash + } ++ fileHashes + + /** + * we turn local hash into global hash by combining it with parent hashes recursively + * global_hash(node) = hash(node.local_hash + node.parents.map(global_hash)) + * + * we use memoization to avoid recomputing global hashes via the [[globalHashes]] map + */ + def computeGlobalHash(name: Name, globalHashes: mutable.Map[Name, GlobalHash]): GlobalHash = { + + if (globalHashes.contains(name)) return globalHashes(name) + + val fileHash = enrichedFileHashes.get(name) match { + case Some(hash) => hash + + // this could be an artifact related to unchanged files on the branch + // so we reach out to content index + // artifacts are just names with no content - so there should be just one entry + case None => + val hashToContent = fileHashToContent(name) + + require(hashToContent.size == 1, s"Expected 1 entry for artifact $name, found ${hashToContent.size}") + require(hashToContent.head._2.localData.fileHash.hash == name.name.md5, + s"Expected artifact $name to have no inputs") + + hashToContent.head._1 + } + + val content = if (newContents.contains(name)) { + newContents(name) + } else { + fileHashToContent(name)(fileHash) + } + + val localHash = content.localData.localHash + val parents = content.localData.inputs + + // recursively compute parent hashes + val parentHashes = parents + .map { parent => + val parentHash = computeGlobalHash(parent, globalHashes).hash + s"${parent.name}:$parentHash" + } + .mkString(",") + + // combine parent hashcode with local hash + val codeString = s"node=${name.name}:${localHash.hash}|parents=$parentHashes" + + logger.info(s"Global Hash elements: $codeString") + + val globalHash = GlobalHash(codeString.md5) + + globalHashes.update(name, globalHash) + globalHash + } + + val globalHashes = mutable.Map.empty[Name, GlobalHash] + // this line fills global hashes + fileHashes.foreach { case (name, _) => computeGlobalHash(name, globalHashes) } + + val existingVersions = branchVersionIndex.getOrElse(branch, mutable.Map.empty) + val mainVersions = branchVersionIndex.getOrElse(Branch.main, mutable.Map.empty) + + if (!dryRun) { + + logger.info("Not a dry run! Inserting new nodes into the index into branch: " + branch.name) + + newContents.foreach { + case (name, content) => update(fileHashToContent, name, content.localData.fileHash, content) + } + + val newVersions = globalHashes.map { + case (name, globalHash) => + val versionIndex = versionSequencer.insert(name, globalHash) + val version = Version("v" + versionIndex.toString) + name -> version + } + + branchToFileHash.update(branch, enrichedFileHashes) + branchVersionIndex.update(branch, newVersions) + + VersionUpdate.join(newVersions, existingVersions, mainVersions) + + } else { + + // dry run - don't insert into any members of the index + val newVersions = mutable.Map.empty[Name, Version] + globalHashes.foreach { + case (name, globalHash) => + val versionIndex = versionSequencer.potentialIndex(name, globalHash) + newVersions.update(name, Version("v" + versionIndex.toString)) + } + + VersionUpdate.join(newVersions, existingVersions, mainVersions) + } + } + + def addFiles(fileHashes: mutable.Map[Name, FileHash], updatedFiles: Map[String, String], branch: Branch): Unit = { + + val nodes: Seq[T] = updatedFiles.iterator.flatMap { + case (name, content) => + proc.parse(name, FileContent(content)) + }.distinct + + addNodes(fileHashes, nodes, branch) + } + + /** + * returns the names of the files whose contents haven't been found in the index across any of the versions + */ + def diff(incomingFileHashes: mutable.Map[Name, FileHash]): Seq[Name] = { + + incomingFileHashes + .filter { + + case (name, incomingHash) => + val fileHashMap = fileHashToContent.get(name) + + lazy val nameAbsentInIndex = fileHashMap.isEmpty + lazy val fileHashAbsentForName = !fileHashMap.get.contains(incomingHash) + + nameAbsentInIndex || fileHashAbsentForName + + } + .keys + .toSeq + } + + /** + * Removes contents and index entries related to a branch + */ + def pruneBranch(branch: Branch): Unit = { + + branchToFileHash.remove(branch) + branchVersionIndex.remove(branch) + + pruneContents() + + } + + /** + * Removes contents that are not referred to via [[FileHash]] in any branch + */ + private def pruneContents(): Unit = { + + // collect unique hashes per name from every branch + val validHashes: mutable.Map[Name, mutable.HashSet[FileHash]] = innerKeyToValueSet(branchToFileHash) + + fileHashToContent.retain { + case (name, fileHashMap) => + fileHashMap.retain { + + case (fileHash, _) => + validHashes.get(name) match { + case None => false // no branch has this name + case Some(hashes) => hashes.contains(fileHash) // this branch has this fileHash + } + + } + + fileHashMap.nonEmpty + } + } +} + +object RepoIndex { + + private type TriMap[K1, K2, V] = mutable.Map[K1, mutable.Map[K2, V]] + + private def update[K1, K2, V](map: TriMap[K1, K2, V], k1: K1, k2: K2, v: V): Unit = + map.getOrElseUpdate(k1, mutable.Map.empty).update(k2, v) + + private def innerKeyToValueSet[K1, K2, V](map: TriMap[K1, K2, V]): mutable.Map[K2, mutable.HashSet[V]] = { + val result = mutable.Map.empty[K2, mutable.HashSet[V]] + map.values.foreach { innerMap => + innerMap.foreach { + case (k2, v) => + result.getOrElseUpdate(k2, mutable.HashSet.empty).add(v) + } + } + result + } + + /** + * Takes data from repo parser and builds a local index for the repo parser + * We treat inputs and outputs that are not present in FileHashes as artifacts + * For these artifacts we create additional entries in the result + */ + def buildContentMap[T >: Null](proc: ConfProcessor[T], + nodes: Seq[T], + fileHashes: mutable.Map[Name, FileHash]): mutable.Map[Name, NodeContent[T]] = { + + val contentMap = mutable.Map.empty[Name, NodeContent[T]] + + // first pass - update non-artifact contents + for ( + node <- nodes; + nodeContent <- proc.nodeContents(node) + ) { + + val name = nodeContent.localData.name + contentMap.update(name, nodeContent) + + def updateContents(artifactName: Name, isOutput: Boolean): Unit = { + + // artifacts are not present in file hashes + if (fileHashes.contains(artifactName)) return + + val existingParents = if (contentMap.contains(artifactName)) { + contentMap(artifactName).localData.inputs + } else { + Seq.empty + } + + val newParents = if (isOutput) Seq(name) else Seq.empty + + val parents = (existingParents ++ newParents).distinct + + val artifactData = LocalData.forArtifact(artifactName, parents) + val artifactContent = NodeContent[T](artifactData, null) + + contentMap.update(artifactName, artifactContent) + + } + + nodeContent.localData.outputs.foreach { output => updateContents(output, isOutput = true) } + nodeContent.localData.inputs.foreach { input => updateContents(input, isOutput = false) } + + } + + contentMap + } + +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala new file mode 100644 index 0000000000..a7a1f3a44f --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala @@ -0,0 +1,25 @@ +package ai.chronon.orchestration + +import ai.chronon.api._ +import ai.chronon.orchestration.logical._ + +// Main LogicalSet class +case class RepoNodes(joins: Seq[Join] = Seq.empty, + groupBys: Seq[GroupBy] = Seq.empty, + stagingQueries: Seq[StagingQuery] = Seq.empty, + models: Seq[Model] = Seq.empty) { + + def toLogicalNodes: Seq[LogicalNodeImpl] = + joins.map(JoinNodeImpl) ++ + groupBys.map(GroupByNodeImpl) ++ + stagingQueries.map(StagingQueryNodeImpl) ++ + models.map(ModelNodeImpl) + + def :+(join: Join): RepoNodes = copy(joins = joins :+ join) + + def :+(groupBy: GroupBy): RepoNodes = copy(groupBys = groupBys :+ groupBy) + + def :+(stagingQuery: StagingQuery): RepoNodes = copy(stagingQueries = stagingQueries :+ stagingQuery) + + def :+(model: Model): RepoNodes = copy(models = models :+ model) +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala index e08ee89ee1..be6b52c254 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala @@ -1,51 +1,45 @@ package ai.chronon.orchestration -import ai.chronon.api -import ai.chronon.online.MetadataDirWalker.listFiles -import ai.chronon.online.MetadataDirWalker.parse -import ai.chronon.online.MetadataDirWalker.relativePath -import org.slf4j.LoggerFactory +import ai.chronon.online.MetadataDirWalker +import ai.chronon.orchestration.RepoTypes._ +import org.apache.logging.log4j.scala.Logging import java.io.File -import scala.util.Failure -import scala.util.Success // parses a folder of -object RepoParser { +object RepoParser extends App with Logging { - private lazy val logger = LoggerFactory.getLogger(getClass) - - private def parseLogicalSet(dir: File): LogicalSet = { - listFiles(dir).getValidFilesAndReport - .foldLeft(LogicalSet()) { (logicalSet, file) => - val filePath = file.getPath - - val logicalSetTry = filePath match { - case value if value.contains("joins/") => parse[api.Join](file).map(logicalSet :+ _) - case value if value.contains("group_bys/") => parse[api.GroupBy](file).map(logicalSet :+ _) - case value if value.contains("staging_queries/") => parse[api.StagingQuery](file).map(logicalSet :+ _) - case value if value.contains("models/") => parse[api.Model](file).map(logicalSet :+ _) - - case _ => Failure(new IllegalArgumentException(s"Unrecognized file path: $filePath")) - } + private def readContent(file: File): FileContent = { + val source = scala.io.Source.fromFile(file) + val content = FileContent(source.mkString) + source.close() + content + } - logicalSetTry match { - case Success(value) => logger.info(s"Parsed: ${relativePath(file)}"); value - case Failure(exception) => logger.error(s"Failed to parse file: ${relativePath(file)}", exception); logicalSet - } + def fileHashes(compileRoot: String): Map[Name, FileHash] = { + val compileDir = new File(compileRoot) + // list all files in compile root + MetadataDirWalker + .listFiles(new File(compileRoot)) + .getValidFilesAndReport + .map { file => + val relativePath = compileDir.toPath.relativize(file.toPath) + val name = Name(relativePath.toString) + val content = readContent(file) + val hash = content.hash + name -> hash } + .toMap } - def main(args: Array[String]): Unit = { - - require(args.length == 1, "Usage: RepoParser ") - - val dir = new File(args(0)) - require(dir.exists() && dir.isDirectory, s"Invalid directory: $dir") - - val logicalSet = parseLogicalSet(dir) - val li = LineageIndex(logicalSet) - - println(li) + def fileContents(compileRoot: String, names: Seq[String]): Map[Name, FileContent] = { + val compileDir = new File(compileRoot) + names.map { file => + val name = Name(file) + val relativePath = compileDir.toPath.resolve(file) + val content = readContent(relativePath.toFile) + name -> content + }.toMap } + } diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala new file mode 100644 index 0000000000..f53d5704f4 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala @@ -0,0 +1,96 @@ +package ai.chronon.orchestration + +import ai.chronon.orchestration.utils.StringExtensions.StringOps + +/** + * Types relevant to the orchestration layer. + * It is very easy to get raw strings mixed up in the indexing logic. + * So we guard them using case classes. For that reason we have a lot of case classes here. + */ +object RepoTypes { + + /** + * name of the node + * example: group_bys..., joins..., staging_queries... + * and also table.. - adding a dummy node for the table makes the code easier to write + */ + case class Name(name: String) + + case class Branch(name: String) + + object Branch { + val main: Branch = Branch("main") + } + + /** + * Take the file content string and hashes it + * Whenever this changes the cli will upload the file into the index. + */ + case class FileHash(hash: String) + + /** + * Local hash represents the computation defined in the file. + * In chronon api, anything field other than metadata is + * considered to impact the computation & consequently the output. + */ + case class LocalHash(hash: String) + + /** + * Global hash represents the computation defined in the file and all its dependencies. + * We recursively scan all the parents of the node to compute the global hash. + * + * `global_hash(node) = hash(node.local_hash + node.parents.map(global_hash))` + */ + case class GlobalHash(hash: String) + + /** + * Local data represents relevant information for lineage tracking + * that can be computed by parsing a file in *isolation*. + */ + case class LocalData(name: Name, fileHash: FileHash, localHash: LocalHash, inputs: Seq[Name], outputs: Seq[Name]) + + object LocalData { + + def forArtifact(name: Name, parents: Seq[Name]): LocalData = { + + val nameHash = name.name.md5 + + LocalData( + name, + FileHash(nameHash), + LocalHash(nameHash), + inputs = parents, + outputs = Seq.empty + ) + } + } + + /** + * Node content represents the actual data that is stored in the index. + * It is a combination of local data and the actual data that is stored in the index. + */ + case class Version(name: String) + + /** + * Content of the compiled file + * Currently TSimpleJsonProtocol serialized StagingQuery, Join, GroupBy thrift objects. + * Python compile.py will serialize user's python code into these objects and + * the [[RepoParser]] will pick them up and sync into [[RepoIndex]]. + */ + case class FileContent(content: String) { + def hash: FileHash = FileHash(content.md5) + } + + case class NodeContent[T](localData: LocalData, conf: T) + + case class Table(name: String) + + /** + * To make the code testable, we parameterize the Config with `T` + * You can see how this is used in [[RepoIndexSpec]] + */ + trait ConfProcessor[T] { + def nodeContents(t: T): Seq[NodeContent[T]] + def parse(name: String, fileContent: FileContent): Seq[T] + } +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala new file mode 100644 index 0000000000..57cb414645 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala @@ -0,0 +1,49 @@ +package ai.chronon.orchestration + +import ai.chronon.orchestration.RepoTypes._ +import ai.chronon.orchestration.utils.TablePrinter + +import scala.collection.mutable + +case class VersionUpdate(name: Name, previous: Option[Version], next: Option[Version], main: Option[Version]) { + + private def toRow: Seq[String] = + Seq( + name.name, + next.map(_.name).getOrElse(""), + previous.map(_.name).getOrElse(""), + main.map(_.name).getOrElse("") + ) +} + +object VersionUpdate { + + def join(next: mutable.Map[Name, Version], + previous: mutable.Map[Name, Version], + main: mutable.Map[Name, Version]): Seq[VersionUpdate] = { + + val allNames = previous.keySet ++ next.keySet ++ main.keySet + + allNames + .map { name => + VersionUpdate(name, previous.get(name), next.get(name), main.get(name)) + } + .toSeq + .sortBy(_.name.name) + } + + def toMap(versionUpdates: Seq[VersionUpdate]): Map[String, String] = { + + versionUpdates.map { versionUpdate => + versionUpdate.name.name -> versionUpdate.next.map(_.name).getOrElse("") + }.toMap + + } + + def print(versionUpdates: Seq[VersionUpdate]): Unit = { + + val header = Seq("Name", "Next", "Previous", "Main") + TablePrinter.printTypedTable(header, versionUpdates)(_.toRow) + + } +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala index 35451045f7..af51c87965 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala @@ -9,7 +9,7 @@ import ai.chronon.orchestration.utils.CollectionExtensions.JListExtension // GroupBy implementation case class GroupByNodeImpl(groupBy: GroupBy) extends LogicalNodeImpl { - override def name: String = groupBy.metaData.name + override def name: String = "group_bys." + groupBy.metaData.name override def outputTables: Seq[String] = Seq(groupBy.metaData.outputTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala index a4e48ab528..1eab436909 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala @@ -10,7 +10,7 @@ import ai.chronon.orchestration.utils.TabularDataUtils // Join implementation case class JoinNodeImpl(join: Join) extends LogicalNodeImpl { - override def name: String = join.metaData.name + override def name: String = "joins." + join.metaData.name override def outputTables: Seq[String] = Seq(join.metaData.outputTable, join.metaData.loggedTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala index a8f82001c2..2f65584e96 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala @@ -5,6 +5,7 @@ import ai.chronon.orchestration.TabularDataType // Base trait for common node operations trait LogicalNodeImpl { + // unique name for the node def name: String def outputTables: Seq[String] def toConfig: LogicalNode diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala index 0b5fd6476e..08e55e4b78 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala @@ -9,7 +9,7 @@ import ai.chronon.orchestration.utils.TabularDataUtils // Model implementation case class ModelNodeImpl(model: Model) extends LogicalNodeImpl { - override def name: String = model.metaData.name + override def name: String = "models." + model.metaData.name override def outputTables: Seq[String] = Seq(model.metaData.outputTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala index c568986c7d..0266b1b293 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala @@ -9,7 +9,7 @@ import ai.chronon.orchestration.utils.CollectionExtensions.JListExtension // StagingQuery implementation case class StagingQueryNodeImpl(stagingQuery: StagingQuery) extends LogicalNodeImpl { - override def name: String = stagingQuery.metaData.name + override def name: String = "staging_queries." + stagingQuery.metaData.name override def outputTables: Seq[String] = Seq(stagingQuery.metaData.outputTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala index 36c8cc6c5d..f0a5fe6866 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala @@ -5,7 +5,7 @@ import ai.chronon.orchestration.TabularData import ai.chronon.orchestration.TabularDataType case class TabularDataNodeImpl(tabularData: TabularData) extends LogicalNodeImpl { - override def name: String = tabularData.table + override def name: String = "tabular_data." + tabularData.table override def outputTables: Seq[String] = Seq.empty diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala new file mode 100644 index 0000000000..e8968647a3 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala @@ -0,0 +1,79 @@ +package ai.chronon.orchestration.utils + +import scala.collection.mutable + +/** + * [[SequenceMap]] maintains a sequence of unique values for each key + * while preserving the first insertion order and returning the index of the first-insertion. + * + * [[RepoIndex]] needs to compute versions of the whole repo on every call to its [[addNodes]] method. + * This map ensures that version assignment is fast. + * + * NOTE: Methods of this class are not thread safe. Only one thread should access this map at a time. + */ +class SequenceMap[K, V] { + + private case class SequenceEntry(valueToIndex: mutable.Map[V, Int], indexToValue: mutable.Map[Int, V]) { + + def insert(value: V): Int = { + if (valueToIndex.contains(value)) return valueToIndex(value) + + val newIndex = indexToValue.size + valueToIndex.update(value, newIndex) + indexToValue.update(newIndex, value) + newIndex + } + + def nextIndex: Int = indexToValue.size + + def contains(value: V): Boolean = valueToIndex.contains(value) + + } + + private val map: mutable.Map[K, SequenceEntry] = mutable.Map.empty + + def insert(key: K, value: V): Int = { + require(key != null, "Key cannot be null") + require(value != null, "Value cannot be null") + + map.get(key) match { + case Some(entry) => entry.insert(value) + case None => + val entry = SequenceEntry(mutable.Map.empty, mutable.Map.empty) + val newIndex = entry.insert(value) + map.update(key, entry) + newIndex + } + } + + def contains(key: K, value: V): Boolean = { + require(key != null, "Key cannot be null") + require(value != null, "Value cannot be null") + + map.get(key) match { + case Some(entry) => entry.contains(value) + case None => false + } + } + + def potentialIndex(key: K, value: V): Int = { + require(key != null, "Key cannot be null") + require(value != null, "Value cannot be null") + + map.get(key) match { + case Some(entry) => entry.valueToIndex.getOrElse(value, entry.nextIndex) + case None => 0 + } + } + + def get(key: K, index: Int): V = { + require(key != null, "Key cannot be null") + require(index >= 0, "Index cannot be negative") + require(map.contains(key), s"Key $key not found") + + val indexToValue = map(key).indexToValue + require(indexToValue.contains(index), s"Index $index not found") + + indexToValue(index) + } +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala new file mode 100644 index 0000000000..12a08aa702 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala @@ -0,0 +1,22 @@ +package ai.chronon.orchestration.utils + +import ai.chronon.api.Constants + +import java.security.MessageDigest + +object StringExtensions { + + lazy val digester: ThreadLocal[MessageDigest] = new ThreadLocal[MessageDigest]() { + override def initialValue(): MessageDigest = MessageDigest.getInstance("MD5") + } + + implicit class StringOps(s: String) { + def md5: String = + digester + .get() + .digest(s.getBytes(Constants.UTF8)) + .map("%02x".format(_)) + .mkString + .take(8) + } +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala new file mode 100644 index 0000000000..d154538142 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala @@ -0,0 +1,63 @@ +package ai.chronon.orchestration.utils + +object TablePrinter { + def printTable[T](headers: Seq[String], data: Seq[Seq[T]]): Unit = { + if (headers.isEmpty || data.isEmpty) { + println("No data to display") + return + } + + // Convert all values to strings and find the maximum width for each column + val stringData = data.map(_.map(_.toString)) + val colWidths = headers.indices.map { i => + val colValues = stringData.map(_(i)) + (colValues :+ headers(i)).map(_.length).max + } + + // Create the format string for each row + val formatStr = colWidths.map(w => s"%-${w}s").mkString(" | ") + + // Print headers + println("+" + colWidths.map(w => "-" * (w + 2)).mkString("+") + "+") + println("| " + String.format(formatStr, headers.map(_.asInstanceOf[AnyRef]): _*) + " |") + println("+" + colWidths.map(w => "-" * (w + 2)).mkString("+") + "+") + + // Print data + stringData.foreach { row => + println("| " + String.format(formatStr, row.map(_.asInstanceOf[AnyRef]): _*) + " |") + } + println("+" + colWidths.map(w => "-" * (w + 2)).mkString("+") + "+") + } + + // Convenience method for single-type tables + def printTypedTable[T](headers: Seq[String], data: Seq[T])(implicit extractor: T => Seq[Any]): Unit = { + printTable(headers, data.map(extractor)) + } +} + +// Example usage: +object TablePrinterExample extends App { + // Basic usage with raw data + val headers: Seq[String] = Seq("Name", "Age", "City") + val data: Seq[Seq[Any]] = Seq( + Seq("John Doe", 30, "New York"), + Seq("Jane Smith", 25, "Los Angeles"), + Seq("Bob Johnson", 35, "Chicago") + ) + + println("Basic Table Example:") + TablePrinter.printTable(headers, data) + + // Example with case class + case class Person(name: String, age: Int, city: String) + + val people: Seq[Person] = Seq( + Person("John Doe", 30, "New York"), + Person("Jane Smith", 25, "Los Angeles"), + Person("Bob Johnson", 35, "Chicago") + ) + + println("\nTyped Table Example:") + implicit val personExtractor: Person => Seq[Any] = p => Seq(p.name, p.age, p.city) + TablePrinter.printTypedTable(headers, people) +} diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala new file mode 100644 index 0000000000..99b196d810 --- /dev/null +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -0,0 +1,183 @@ +package ai.chronon.orchestration.test + +import ai.chronon.orchestration.RepoIndex +import ai.chronon.orchestration.RepoTypes._ +import ai.chronon.orchestration.VersionUpdate +import ai.chronon.orchestration.utils.StringExtensions.StringOps +import org.apache.logging.log4j.scala.Logging +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.collection.mutable + +class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { + + case class TestConf(name: String, + queryVersion: String, + params: String, + parents: Seq[String], + outputs: Seq[String] = Seq.empty) + + class TestConfProcessor extends ConfProcessor[TestConf] { + + override def nodeContents(conf: TestConf): Seq[NodeContent[TestConf]] = { + + val ld = LocalData( + + name = Name(conf.name), + + fileHash = FileHash(conf.toString.md5), + localHash = LocalHash(conf.queryVersion.md5), + + inputs = conf.parents.map(Name), + outputs = conf.outputs.map(Name) + + ) + + Seq(NodeContent(ld, conf)) + } + + override def parse(name: String, fileContent: FileContent): Seq[TestConf] = ??? + } + + "RepoIndex" should "propagate updates" in { + + + val proc = new TestConfProcessor + val repoIndex = new RepoIndex[TestConf](proc) + + + + def fileHashes(configs: Seq[TestConf]): mutable.Map[Name, FileHash] = { + val nameHashPairs = configs + .flatMap(proc.nodeContents) + .map(nc => nc.localData.name -> nc.localData.fileHash) + mutable.Map(nameHashPairs : _*) + } + + def updateIndex(confsWithExpectedVersions: Seq[(TestConf, String)], branch: Branch, commitMessage: String): Seq[VersionUpdate] = { + logger.info(s"Updating index branch @${branch.name} and commit - $commitMessage") + + val expectedVersions = confsWithExpectedVersions.map(c => c._1.name -> c._2).toMap + + val confs = confsWithExpectedVersions.map(_._1) + val fileHashMap = fileHashes(confs) + val diffNodes = repoIndex.diff(fileHashMap).map(_.name).toSet + + logger.info("incoming files:\n " + fileHashMap.keySet.mkString("\n ")) + logger.info("diff nodes:\n " + diffNodes.mkString("\n ")) + + val updates = repoIndex.addNodes( + fileHashMap, + confs.filter(c => diffNodes.contains(c.name)), + branch, + dryRun = false) + + val actualVersions = VersionUpdate.toMap(updates) + + expectedVersions.foreach { case (name, expectedVersion) => + actualVersions.get(name) shouldBe Some(expectedVersion) + } + + VersionUpdate.print(updates) + logger.info(s"Finished adding commit: $commitMessage\n\n") + updates + } + + val confs = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb2", "v1", "4g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v0", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v0", + ) + + val fileHashMap = fileHashes(confs.map(_._1)) + + // check artifact nodes are present + val map = RepoIndex.buildContentMap(proc, confs.map(_._1), fileHashMap) + map.get(Name("t1")) shouldNot be(None) + map.get(Name("t2")) shouldNot be(None) + + + + logger.info(s"fileHashMap: $fileHashMap") + + val fileDiffs = repoIndex.diff(fileHashMap) + + fileDiffs.size shouldBe fileHashMap.size + fileDiffs.toSet shouldBe fileHashMap.keySet + + updateIndex(confs, Branch.main, "initial commit") + + val testBranch = Branch("test") + + val branchConfs1 = Seq( + TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")) -> "v1", // updated + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v1", + TestConf("gb2", "v1", "4g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v1", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v1", + ) + + updateIndex(branchConfs1, testBranch, "semantically updated sq1") + + val branchConfs2 = Seq( + TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")) -> "v1", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v1", + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", // non-semantic update + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v1", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v1", + ) + + updateIndex(branchConfs2, testBranch, "non semantically updated gb2") + + val branchConfs3 = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", // reverted back + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v0", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v0", + ) + updateIndex(branchConfs3, testBranch, "reverted back semantic update to sq1") + + val branchConfs4 = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + // TestConf("gb2", "v1", "8g", Seq("t2")), // deleted + TestConf("j1", "v1", "4g", Seq("gb1"), Seq("table_j1")) -> "v2", // parent deleted + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v2", + ) + + updateIndex(branchConfs4, testBranch, "deleted gb2 (depends on t2)") + + updateIndex(branchConfs4, Branch.main, "updated main with change in test branch") + + val branchConfs5 = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")) -> "v0", // new + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb3", "v1", "4g", Seq("t3")) -> "v0", // new + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", // gb2 added back + TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")) -> "v3", // parent reverted + new + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v3", + ) + + updateIndex(branchConfs5, Branch.main, "new sq3 and gb3, un-deleted gb2") + + val branchConfs6 = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")) -> "v0", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb3", "v1", "4g", Seq("t3")) -> "v0", + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")) -> "v3", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v3", + TestConf("m2", "v1", "4g", Seq("j1"), Seq("table_m2")) -> "v0", + ) + + updateIndex(branchConfs6, Branch.main, "m2 is added") + } + + +} diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala new file mode 100644 index 0000000000..f6489cb390 --- /dev/null +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala @@ -0,0 +1,134 @@ +package ai.chronon.orchestration.test + +import ai.chronon.orchestration.utils.SequenceMap +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SequenceMapSpec extends AnyFlatSpec with Matchers { + + "SequenceMap" should "insert and retrieve values with correct indices" in { + val sequenceMap = new SequenceMap[String, Int] + + // Insert values for a single key + sequenceMap.insert("key1", 100) should be(0) + sequenceMap.insert("key1", 200) should be(1) + sequenceMap.insert("key1", 300) should be(2) + + // Verify retrieval + sequenceMap.get("key1", 0) should be(100) + sequenceMap.get("key1", 1) should be(200) + sequenceMap.get("key1", 2) should be(300) + } + + it should "maintain unique indices for duplicate values" in { + val sequenceMap = new SequenceMap[String, Int] + + // Insert same value multiple times + val firstIndex = sequenceMap.insert("key1", 100) + val secondIndex = sequenceMap.insert("key1", 100) + + // Should return same index for duplicate values + firstIndex should be(secondIndex) + firstIndex should be(0) + + // Verify retrieval + sequenceMap.get("key1", 0) should be(100) + } + + it should "handle multiple keys independently" in { + val sequenceMap = new SequenceMap[String, String] + + // Insert values for different keys + sequenceMap.insert("key1", "value1") should be(0) + sequenceMap.insert("key2", "value2") should be(0) + sequenceMap.insert("key1", "value3") should be(1) + + // Verify independent sequences + sequenceMap.get("key1", 0) should be("value1") + sequenceMap.get("key1", 1) should be("value3") + sequenceMap.get("key2", 0) should be("value2") + } + + it should "correctly check contains for existing and non-existing values" in { + val sequenceMap = new SequenceMap[Int, String] + + sequenceMap.insert(1, "test") + + // Check existing combinations + sequenceMap.contains(1, "test") should be(true) + + // Check non-existing combinations + sequenceMap.contains(1, "nonexistent") should be(false) + sequenceMap.contains(2, "test") should be(false) + } + + it should "throw IllegalArgumentException for null keys" in { + val sequenceMap = new SequenceMap[String, String] + + val exception = intercept[IllegalArgumentException] { + sequenceMap.insert(null, "value") + } + exception.getMessage should be("requirement failed: Key cannot be null") + } + + it should "throw IllegalArgumentException for null values" in { + val sequenceMap = new SequenceMap[String, String] + + val exception = intercept[IllegalArgumentException] { + sequenceMap.insert("key", null) + } + exception.getMessage should be("requirement failed: Value cannot be null") + } + + it should "throw IllegalArgumentException for negative indices" in { + val sequenceMap = new SequenceMap[String, String] + sequenceMap.insert("key", "value") + + val exception = intercept[IllegalArgumentException] { + sequenceMap.get("key", -1) + } + exception.getMessage should be("requirement failed: Index cannot be negative") + } + + it should "throw IllegalArgumentException for non-existing keys in get" in { + val sequenceMap = new SequenceMap[String, String] + + val exception = intercept[IllegalArgumentException] { + sequenceMap.get("nonexistent", 0) + } + exception.getMessage should be("requirement failed: Key nonexistent not found") + } + + it should "throw IllegalArgumentException for non-existing indices" in { + val sequenceMap = new SequenceMap[String, String] + sequenceMap.insert("key", "value") + + val exception = intercept[IllegalArgumentException] { + sequenceMap.get("key", 1) + } + exception.getMessage should be("requirement failed: Index 1 not found") + } + + it should "maintain correct sequence for mixed operations" in { + val sequenceMap = new SequenceMap[String, Int] + + // Insert some values + sequenceMap.insert("key", 100) should be(0) + sequenceMap.insert("key", 200) should be(1) + + // Check contains + sequenceMap.contains("key", 100) should be(true) + sequenceMap.contains("key", 200) should be(true) + + // Insert duplicate + sequenceMap.insert("key", 100) should be(0) + + // Insert new value + sequenceMap.insert("key", 300) should be(2) + + // Verify final sequence + sequenceMap.get("key", 0) should be(100) + sequenceMap.get("key", 1) should be(200) + sequenceMap.get("key", 2) should be(300) + } +} \ No newline at end of file