From 76d15eed923281032fc87818d546740f5511bd18 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Sat, 28 Dec 2024 12:14:48 -0800 Subject: [PATCH 01/10] branch logic write up --- api/thrift/orchestration.thrift | 14 +- orchestration/README.md | 278 ++++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 13 deletions(-) create mode 100644 orchestration/README.md diff --git a/api/thrift/orchestration.thrift b/api/thrift/orchestration.thrift index 1e33729ea7..fe55dba784 100644 --- a/api/thrift/orchestration.thrift +++ b/api/thrift/orchestration.thrift @@ -82,18 +82,6 @@ struct NodeInfo { 30: optional LogicalNode conf } - -/** First Pass -* NodeInstance::(name, type, conf_hash) -> #[parent_nodes] -* Node::(name, type) -> #[conf_hash] - -* Second Pass -* Node::(name, type, compute_hash) -> #[parent_nodes] - -* different file_hashes but same lineage_hash should all go into the same orchestrator workflow -* Node::(name, type, lineage_hash) -**/ - struct NodeConnections { 1: optional list parents 2: optional list children @@ -272,7 +260,7 @@ struct TableDependency { * JoinParts could use data from batch backfills or upload tables when available * When not available they shouldn't force computation of the backfills and upload tables. **/ - 21: optional bool forceComputae + 21: optional bool forceCompute } union Dependency { diff --git a/orchestration/README.md b/orchestration/README.md new file mode 100644 index 0000000000..0b35f732ab --- /dev/null +++ b/orchestration/README.md @@ -0,0 +1,278 @@ + + +# Branch support + +We want to support "branches" that allow users to run pipelines and services with two +critical goals: + +- don't pollute production datasets and end-points +- are cheap, by re-using as much of existing datasets as possible + +## Scenarios within experimentation + +While developing on a branch - users could +- make semantic updates - that change output data - eg., logic within where-s, selects, aggregations etc +- make non-semantic updates - that don't change output data - eg., spark exec memory, # of pods / workers etc + - *note that everything that is stored in metadata field within our API's are non-semantic fields* +- add new nodes +- delete existing nodes +- make changes to several compute nodes at once +- decide to merge the branch into master + +With this context, the goal of this document is to develop / describe a representation to handle the above user workflows. + + +## Motivating Example + +Legend: +``` +"sq" stands for StagingQuery "j" for Join +"t" stands for table "m" for Model +"gb" for GroupBy +``` + +Nodes will be numbered - `gb4`, `m2` etc + +Semantic changes to node notated using a plus "+". +Eg., Join `J3` becomes `J3+` + +Non-Semantic changes with an asterisk "*" - `J3*` + + +```mermaid +--- +title: Initial state of the example +--- + +graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; +``` + +### Semantic updates + +Say that, `sq1` changes semantically to `sq1+`. It is going to change the output of all +nodes downstream of it. + +```mermaid +--- +title: sq1 is updated semantically +--- + +graph TD; + sq1+-->t1+; + t1+-->gb1+; + gb1+-->j1+; + t2-->gb2; + gb2-->j1+; + j1+-->m1+; + gb1+-->j2+; + + style sq1+ fill:wheat,color:black,stroke:#333 + style t1+ fill:wheat,color:black,stroke:#333 + style gb1+ fill:wheat,color:black,stroke:#333 + style j1+ fill:wheat,color:black,stroke:#333 + style j2+ fill:wheat,color:black,stroke:#333 + style m1+ fill:wheat,color:black,stroke:#333 +``` + +> A major concern here is that, if the local repository of the user is behind remote, +> we will a lot more changes than the user intends to. + +One approach to mitigate this is to, only make the CLI only pick up changes to files listed as edited by +commits to the git branch. + +Another approach is to force user to rebase on any change to the repo. However, this does not +guarantee that changes while the job is running is accounted for. + +### Non Semantic updates +Instead, if `sq1` changes non-semantically to `sq1-`. None of the downstream nodes would change. + +```mermaid +--- +title: sq1 is updated non-semantically +--- + + graph TD; + sq1*-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + + style sq1* fill:lavender,color:black,stroke:#333 +``` + +Depending on who is running the job we need to decide which version of the node to use +- if the branch author is causing the node to be computed we need to use `sq1*` instead of `sq1` +- if the prod flow or other authors who haven't updated `sq1` are causing the compute, we should use `sq1` +- if another branch is also updating `sq1` non semantically to `sq1**` we need to use that instead. + +### Adding new nodes + +Adding new leaf nodes will not impact any of the existing nodes. + +```mermaid +--- +title: m2 is added +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + j2-->m2; + + style m2 fill:lightgreen,color:black,stroke:#333 +``` + + +But adding non-leaf node - +as parent to existing node - would almost always cause semantic updates to nodes downstream. + +```mermaid +--- +title: gb3 is added +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1+; + t2-->gb2; + t3-->gb3; + gb2-->j1+; + gb3-->j1+; + j1+-->m1+; + gb1-->j2; + + style t3 fill:lightgreen,color:black,stroke:#333 + style gb3 fill:lightgreen,color:black,stroke:#333 + style j1+ fill:wheat,color:black,stroke:#333 + style m1+ fill:wheat,color:black,stroke:#333 +``` + +One interesting case here is migrating the sql from an external system to StagingQuery of an +already used table. Even though this is not a leaf node, absorbing it as same as a leaf node change +would be the right thing to do. + + +```mermaid +--- +title: sq2 is added +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + sq2-->t2; + + style sq2 fill:lightgreen,color:black,stroke:#333 +``` + +### Deleting existing nodes + +Deleting leaf nodes is straight forward. We just need to program a cleanup mechanism +to remove data and pipelines generated by that node. + +```mermaid +--- +title: m1 is deleted +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + sq2-->t2; + + style m1 fill:coral,color:black,stroke:#333 +``` + +Indirectly connected components - via table references - shouldn't be allowed to be deleted +as long as there are nodes that depend on the table. We will fail this during the sync step. + +```mermaid +--- +title: sq1 is deleted (not-allowed) +--- + + graph TD; + sq1-->t1; + t1-->gb1; + gb1-->j1; + t2-->gb2; + gb2-->j1; + j1-->m1; + gb1-->j2; + sq2-->t2; + + style sq1 fill:coral,color:black,stroke:#333 +``` + +Directly connected parents when deleted will have updates in the child node - or the compilation +would fail. In these cases it would be ideal to garbage collect upstream chain of the deleted node. + +```mermaid +--- +title: gb2 is deleted, j1 is updated +--- + +graph TD; + sq2-->t2; + sq1-->t1; + t1-->gb1; + gb1-->j1+; + t2-->gb2; + gb2-->j1+ + j1+-->m1+; + gb1-->j2; + + style sq2 fill:coral,color:black,stroke:#333 + style t2 fill:coral,color:black,stroke:#333 + style gb2 fill:coral,color:black,stroke:#333 + style j1+ fill:wheat,color:black,stroke:#333 + style m1+ fill:wheat,color:black,stroke:#333 +``` + +### Isolating the changed assets + +While development on a branch is in progress we need to create temporary data assets for +the semantically changed nodes - shown in yellow above. Adds, Deletes & Semantic Updates could +trigger this flow. + + +#### Logic to achieve isolation +Make a new copy of the conf & update the name (file name & metadata.name) - + +`new_name = old_name + '_' + branch_name` + +This needs to be followed by changing references in the downstream nodes - +all tables and nodes downstream will have the branch suffix. + +### Merging changes into `main` + +- Deletes should actually trigger asset and pipeline clean up. +- Updates should trigger asset renaming +- Adds can work as is - we are not suffixing adds. + From ad0c7c4477da4d97274fad2b72c513e20910aa27 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Fri, 3 Jan 2025 17:35:41 -0800 Subject: [PATCH 02/10] [WIP] Branch versioning logic - RepoIndex --- orchestration/README.md | 14 +- .../chronon/orchestration/LineageIndex.scala | 90 --------- .../ai/chronon/orchestration/LogicalSet.scala | 25 --- .../scala/ai/chronon/orchestration/README.md | 50 ----- .../ai/chronon/orchestration/RepoIndex.scala | 184 ++++++++++++++++++ .../ai/chronon/orchestration/RepoNodes.scala | 25 +++ .../ai/chronon/orchestration/RepoParser.scala | 58 +++--- .../chronon/orchestration/VersionUpdate.scala | 44 +++++ .../logical/GroupByNodeImpl.scala | 2 +- .../orchestration/logical/JoinNodeImpl.scala | 2 +- .../logical/LogicalNodeImpl.scala | 1 + .../orchestration/logical/ModelNodeImpl.scala | 2 +- .../logical/StagingQueryNodeImpl.scala | 2 +- .../logical/TabularDataNodeImpl.scala | 2 +- .../orchestration/utils/SequenceMap.scala | 71 +++++++ .../orchestration/utils/TablePrinter.scala | 63 ++++++ .../orchestration/test/RepoIndexSpec.scala | 119 +++++++++++ .../orchestration/test/SequenceMapSpec.scala | 134 +++++++++++++ 18 files changed, 689 insertions(+), 199 deletions(-) delete mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala delete mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala delete mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/README.md create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala create mode 100644 orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala create mode 100644 orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala diff --git a/orchestration/README.md b/orchestration/README.md index 0b35f732ab..1701665839 100644 --- a/orchestration/README.md +++ b/orchestration/README.md @@ -270,9 +270,21 @@ Make a new copy of the conf & update the name (file name & metadata.name) - This needs to be followed by changing references in the downstream nodes - all tables and nodes downstream will have the branch suffix. +``` +# 1. cli sends file_hash_map to remote + +local_file_map = repo.compiled.file_hash_map +remote_file_map = remote.file_map +deleted = remote_file_map - local_file_map +added = local_file_map - remote_file_map +updated = [k in intersect(local_file_map, remote_file_map)] +# 2. remote marks the changed files it needs + +(node, lineage_hash) => +``` + ### Merging changes into `main` - Deletes should actually trigger asset and pipeline clean up. - Updates should trigger asset renaming - Adds can work as is - we are not suffixing adds. - diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala deleted file mode 100644 index 999954c360..0000000000 --- a/orchestration/src/main/scala/ai/chronon/orchestration/LineageIndex.scala +++ /dev/null @@ -1,90 +0,0 @@ -package ai.chronon.orchestration - -import ai.chronon.orchestration.logical.GroupByNodeImpl -import ai.chronon.orchestration.logical.JoinNodeImpl -import ai.chronon.orchestration.logical.LogicalNodeImpl -import ai.chronon.orchestration.logical.ModelNodeImpl -import ai.chronon.orchestration.logical.StagingQueryNodeImpl -import ai.chronon.orchestration.logical.TabularDataNodeImpl -import ai.chronon.orchestration.utils.Config.getType - -import scala.collection.mutable - -object Builders { - def nodeKey(name: String, logicalType: LogicalType, lineageHash: String = null): NodeKey = { - val result = new NodeKey() - result.setName(name) - result.setLogicalType(logicalType) - result.setLineageHash(lineageHash) - result - } -} - -case class NodeInfo(inputConfigs: Seq[LogicalNode], config: LogicalNode, outputTables: Seq[String]) - -case class LineageIndex(tableToParent: mutable.Map[String, mutable.Buffer[NodeKey]] = mutable.HashMap.empty, - toChildConfig: mutable.Map[NodeKey, mutable.Buffer[NodeKey]] = mutable.HashMap.empty, - toInfo: mutable.Map[NodeKey, NodeInfo] = mutable.HashMap.empty) { - // node, parent_nodes, child_nodes, input_tables, output_tables -} - -object LineageIndex { - - def apply(logicalSet: LogicalSet): LineageIndex = { - val index = LineageIndex() - logicalSet.toLogicalNodes.foreach(update(_, index)) - index - } - - def update(node: LogicalNodeImpl, index: LineageIndex): Unit = { - - val nodeKey = toKey(node) - if (index.toInfo.contains(nodeKey)) return - - // update nodeInfo - val parents = node.parents - val info = NodeInfo(parents, node.toConfig, node.outputTables) - index.toInfo.put(nodeKey, info) - - // register parents of tables - node.outputTables.foreach { t => - val tabularNode = new TabularData() - tabularNode.setTable(t) - tabularNode.setType(node.tabularDataType) - - toKey(TabularDataNodeImpl(tabularNode)) - - index.tableToParent - .getOrElseUpdate(t, mutable.ArrayBuffer.empty) - .append(nodeKey) - } - - // register children of parents - parents.foreach { parent => - val parentNode = configToNode(parent) - val parentKey = toKey(parentNode) - - index.toChildConfig - .getOrElseUpdate(parentKey, mutable.ArrayBuffer.empty) - .append(nodeKey) - - update(parentNode, index) - } - } - - private def configToNode(config: LogicalNode): LogicalNodeImpl = { - config match { - case c if c.isSetJoin => JoinNodeImpl(c.getJoin) - case c if c.isSetGroupBy => GroupByNodeImpl(c.getGroupBy) - case c if c.isSetStagingQuery => StagingQueryNodeImpl(c.getStagingQuery) - case c if c.isSetModel => ModelNodeImpl(c.getModel) - case c if c.isSetTabularData => TabularDataNodeImpl(c.getTabularData) - } - } - - private def toKey(node: LogicalNodeImpl): NodeKey = { - val nodeType = getType(node.toConfig) - Builders.nodeKey(node.name, nodeType) - } - -} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala b/orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala deleted file mode 100644 index 2cd61f158f..0000000000 --- a/orchestration/src/main/scala/ai/chronon/orchestration/LogicalSet.scala +++ /dev/null @@ -1,25 +0,0 @@ -package ai.chronon.orchestration - -import ai.chronon.api._ -import ai.chronon.orchestration.logical._ - -// Main LogicalSet class -case class LogicalSet(joins: Seq[Join] = Seq.empty, - groupBys: Seq[GroupBy] = Seq.empty, - stagingQueries: Seq[StagingQuery] = Seq.empty, - models: Seq[Model] = Seq.empty) { - - def toLogicalNodes: Seq[LogicalNodeImpl] = - joins.map(JoinNodeImpl) ++ - groupBys.map(GroupByNodeImpl) ++ - stagingQueries.map(StagingQueryNodeImpl) ++ - models.map(ModelNodeImpl) - - def :+(join: Join): LogicalSet = copy(joins = joins :+ join) - - def :+(groupBy: GroupBy): LogicalSet = copy(groupBys = groupBys :+ groupBy) - - def :+(stagingQuery: StagingQuery): LogicalSet = copy(stagingQueries = stagingQueries :+ stagingQuery) - - def :+(model: Model): LogicalSet = copy(models = models :+ model) -} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/README.md b/orchestration/src/main/scala/ai/chronon/orchestration/README.md deleted file mode 100644 index 603db705c1..0000000000 --- a/orchestration/src/main/scala/ai/chronon/orchestration/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Global planner for chronon workloads - -We will currently focus only on batch workloads - -## Batch - -We have a few stages of logic in this pipeline. - -> NOTE: `>>` represents dependency operator. `a >> b` means b depends on a. - -1. parse confs - 1. walk the directory structure of chronon and parse configs => logical nodes - -2. convert logical nodes to physical nodes - 1. Foreach groupBy - if coalescing is enabled - 1. insert groupBy's into `coalesced = index[conf.source + conf.key]` - 2. compare previous coalesced to new coalesced and do tetris backfill - 2. Foreach groupBy - 1. if fct + batch - 1. logical nodes are - partition_ir >> snapshot (== backfill) >> upload - 2. if fct + realtime - 1. logical nodes are - partition_ir (collapsed) + raw (tails) >> sawtooth_ir - 2. sawtooth_ir >> uploads (if online = True) - 3. sawtooth_ir >> snapshots (if backfill = true) - 3. if dim + batch - 1. can't cache - raw >> snapshot, snapshot >> upload - 4. if dim + realtime - 1. raw >> sawtooth_ir - 5. scd works similar to fct - 3. For each join - 1. bootstrap_source(s) >> joinPart tetris - 2. (source_hash + group_by) >> joinPart - - 3. left-fct - 1. gb-case::(dim/fct batch) snapshot >> joinPart - 2. gb-case::(fct realtime) sawtooth + ds_raw >> joinPart - 3. gb-case::(dim realtime) sawtooth + ds_mutations >> joinPart - 4. left-dim - 1. gb-case::dim/fct snapshot >> joinPart - 5. left-scd is sub-optimal - we ask to use staging query to convert to fct - - 6. bootstrap >> missingPartRanges >> joinParts >> final join - 7. join_raw >> derivations (view?) - 8. label source + left >> label_part - 9. label_part + join_raw >> final join - -3. workflow - 1. trigger staging query submissions on submit - 2. trigger gb_snapshot & sawtooth_ir computations on submit - 3. trigger join computations on schedule or on-demand diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala new file mode 100644 index 0000000000..984532dc39 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -0,0 +1,184 @@ +package ai.chronon.orchestration + +import ai.chronon.orchestration.RepoIndex._ +import ai.chronon.orchestration.RepoTypes._ +import ai.chronon.orchestration.utils.CollectionExtensions.IteratorExtensions +import ai.chronon.orchestration.utils.SequenceMap + +import scala.collection.mutable + +class RepoIndex[T](proc: ConfProcessor[T]) { + + // first pass updates + private val branchToFileHash: TriMap[Branch, Name, FileHash] = mutable.Map.empty + private val fileHashToContent: TriMap[Name, FileHash, NodeContent[T]] = mutable.Map.empty + + // second pass updates + private val branchVersionIndex: TriMap[Branch, Name, Version] = mutable.Map.empty + private val versionSequencer: SequenceMap[Name, GlobalHash] = new SequenceMap[Name, GlobalHash] + + def addNodes(fileHashes: mutable.Map[Name, FileHash], + nodes: Seq[T], + branch: Branch, + dryRun: Boolean = true): Seq[VersionUpdate] = { + + val newContents = nodes.map { node => + val data = proc.toLocalData(node) + val nodeContent = NodeContent(data, node) + + require(data.fileHash == fileHashes(data.name), s"File hash mismatch for ${data.name}") + + data.name -> (data.fileHash -> nodeContent) + + }.toMap + + def getContents(name: Name, fileHash: FileHash): NodeContent[T] = { + + val incomingContents = newContents.get(name).map(_._2) + + lazy val existingContents = fileHashToContent + .get(name) + .flatMap(_.get(fileHash)) + + incomingContents.orElse(existingContents).get + } + + val globalHashes = mutable.Map.empty[Name, GlobalHash] + + // memoizes into globalHashes and recursively computes global hash from parents + def computeGlobalHash(name: Name): GlobalHash = { + + if (globalHashes.contains(name)) return globalHashes(name) + + val fileHash = fileHashes(name) + val content = getContents(name, fileHash) + + val localHash = content.localData.localHash + val parents = content.localData.inputs + + // recursively compute parent hashes + val parentHashes = parents + .map { parent => + val parentHash = globalHashes.getOrElse(parent, computeGlobalHash(parent)).hash + s"${parent.name}:$parentHash" + + } + .mkString(",") + + // combine parent hashcode with local hash + val codeString = s"node=${name.name}:$localHash|parents=$parentHashes" + val globalHash = GlobalHash(codeString.hashCode().toHexString) + + globalHashes.update(name, globalHash) + globalHash + } + + val newVersions = mutable.Map.empty[Name, Version] + + fileHashes.foreach { + case (name, _) => + val globalHash = computeGlobalHash(name) + + val versionIndex = versionSequencer.potentialIndex(name, globalHash) + newVersions.update(name, Version("v" + versionIndex.toString)) + } + + val existingVersions = branchVersionIndex.getOrElse(branch, mutable.Map.empty) + val mainVersions = branchVersionIndex.getOrElse(Branch.main, mutable.Map.empty) + + val versionUpdates = VersionUpdate.join(newVersions, existingVersions, mainVersions) + VersionUpdate.print(versionUpdates) + + if (!dryRun) { + + newContents.foreach { + case (name, (fileHash, content)) => update(fileHashToContent, name, fileHash, content) + } + + branchToFileHash.update(branch, fileHashes) + branchVersionIndex.update(branch, newVersions) + + } + + versionUpdates + } + + // returns the contents of the files not present in the index + def diff(incomingFileHashes: mutable.Map[Name, FileHash]): Seq[Name] = { + + incomingFileHashes + .filter { + case (name, incomingHash) => + val fileHashMap = fileHashToContent.get(name) + + lazy val nameAbsentInIndex = fileHashMap.isEmpty + lazy val fileHashAbsentForName = !fileHashMap.get.contains(incomingHash) + + nameAbsentInIndex || fileHashAbsentForName + + } + .keys + .toSeq + } + + def pruneBranch(branch: Branch): Unit = { + + branchToFileHash.remove(branch) + + pruneContents() + } + + private def pruneContents(): Unit = { + + // collect unique hashes per name from every branch + val validHashes: mutable.Map[Name, mutable.HashSet[FileHash]] = innerValues(branchToFileHash) + + fileHashToContent.retain { + case (name, fileHashMap) => + fileHashMap.retain { + + case (fileHash, _) => + validHashes.get(name) match { + case None => false // no branch has this name + case Some(hashes) => hashes.contains(fileHash) // this branch has this fileHash + } + + } + + fileHashMap.nonEmpty + } + } + + def addFiles(fileHashes: mutable.Map[Name, FileHash], updatedFiles: Map[String, String], branch: Branch): Unit = { + + val nodes: Seq[T] = updatedFiles.iterator.flatMap { + case (name, content) => + proc.parse(name, FileContent(content)) + }.distinct + + addNodes(fileHashes, nodes, branch) + } + +} + +object RepoIndex { + + private case class NodeContent[T](localData: LocalData, conf: T) + + private type TriMap[K1, K2, V] = mutable.Map[K1, mutable.Map[K2, V]] + + private def update[K1, K2, V](map: TriMap[K1, K2, V], k1: K1, k2: K2, v: V): Unit = + map.getOrElseUpdate(k1, mutable.Map.empty).update(k2, v) + + private def innerValues[K1, K2, V](map: TriMap[K1, K2, V]): mutable.Map[K2, mutable.HashSet[V]] = { + val result = mutable.Map.empty[K2, mutable.HashSet[V]] + map.values.foreach { innerMap => + innerMap.foreach { + case (k2, v) => + result.getOrElseUpdate(k2, mutable.HashSet.empty).add(v) + } + } + result + } + +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala new file mode 100644 index 0000000000..a7a1f3a44f --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoNodes.scala @@ -0,0 +1,25 @@ +package ai.chronon.orchestration + +import ai.chronon.api._ +import ai.chronon.orchestration.logical._ + +// Main LogicalSet class +case class RepoNodes(joins: Seq[Join] = Seq.empty, + groupBys: Seq[GroupBy] = Seq.empty, + stagingQueries: Seq[StagingQuery] = Seq.empty, + models: Seq[Model] = Seq.empty) { + + def toLogicalNodes: Seq[LogicalNodeImpl] = + joins.map(JoinNodeImpl) ++ + groupBys.map(GroupByNodeImpl) ++ + stagingQueries.map(StagingQueryNodeImpl) ++ + models.map(ModelNodeImpl) + + def :+(join: Join): RepoNodes = copy(joins = joins :+ join) + + def :+(groupBy: GroupBy): RepoNodes = copy(groupBys = groupBys :+ groupBy) + + def :+(stagingQuery: StagingQuery): RepoNodes = copy(stagingQueries = stagingQueries :+ stagingQuery) + + def :+(model: Model): RepoNodes = copy(models = models :+ model) +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala index e08ee89ee1..06fde7fdfa 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala @@ -1,9 +1,11 @@ package ai.chronon.orchestration import ai.chronon.api +import ai.chronon.online.MetadataDirWalker import ai.chronon.online.MetadataDirWalker.listFiles import ai.chronon.online.MetadataDirWalker.parse import ai.chronon.online.MetadataDirWalker.relativePath +import ai.chronon.orchestration.RepoTypes._ import org.slf4j.LoggerFactory import java.io.File @@ -15,37 +17,37 @@ object RepoParser { private lazy val logger = LoggerFactory.getLogger(getClass) - private def parseLogicalSet(dir: File): LogicalSet = { - listFiles(dir).getValidFilesAndReport - .foldLeft(LogicalSet()) { (logicalSet, file) => - val filePath = file.getPath - - val logicalSetTry = filePath match { - case value if value.contains("joins/") => parse[api.Join](file).map(logicalSet :+ _) - case value if value.contains("group_bys/") => parse[api.GroupBy](file).map(logicalSet :+ _) - case value if value.contains("staging_queries/") => parse[api.StagingQuery](file).map(logicalSet :+ _) - case value if value.contains("models/") => parse[api.Model](file).map(logicalSet :+ _) - - case _ => Failure(new IllegalArgumentException(s"Unrecognized file path: $filePath")) - } + private def readContent(file: File): FileContent = { + val source = scala.io.Source.fromFile(file) + val content = FileContent(source.mkString) + source.close() + content + } - logicalSetTry match { - case Success(value) => logger.info(s"Parsed: ${relativePath(file)}"); value - case Failure(exception) => logger.error(s"Failed to parse file: ${relativePath(file)}", exception); logicalSet - } + def fileHashes(compileRoot: String): Map[Name, FileHash] = { + val compileDir = new File(compileRoot) + // list all files in compile root + MetadataDirWalker + .listFiles(new File(compileRoot)) + .getValidFilesAndReport + .map { file => + val relativePath = compileDir.toPath.relativize(file.toPath) + val name = Name(relativePath.toString) + val content = readContent(file) + val hash = content.hash + name -> hash } + .toMap } - def main(args: Array[String]): Unit = { - - require(args.length == 1, "Usage: RepoParser ") - - val dir = new File(args(0)) - require(dir.exists() && dir.isDirectory, s"Invalid directory: $dir") - - val logicalSet = parseLogicalSet(dir) - val li = LineageIndex(logicalSet) - - println(li) + def fileContents(compileRoot: String, names: Seq[String]): Map[Name, FileContent] = { + val compileDir = new File(compileRoot) + names.map { file => + val name = Name(file) + val relativePath = compileDir.toPath.resolve(file) + val content = readContent(relativePath.toFile) + name -> content + }.toMap } + } diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala new file mode 100644 index 0000000000..a144e3d942 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala @@ -0,0 +1,44 @@ +package ai.chronon.orchestration + +import ai.chronon.orchestration.RepoTypes._ +import ai.chronon.orchestration.utils.TablePrinter + +import scala.collection.mutable + +case class VersionUpdate(name: Name, previous: Option[Version], next: Option[Version], main: Option[Version]) { + + private def isChanged: Boolean = next != main || next != previous + + private def toRow: Seq[String] = + Seq( + name.name, + next.map(_.name).getOrElse(""), + previous.map(_.name).getOrElse(""), + main.map(_.name).getOrElse("") + ) +} + +object VersionUpdate { + + def join(next: mutable.Map[Name, Version], + previous: mutable.Map[Name, Version], + main: mutable.Map[Name, Version]): Seq[VersionUpdate] = { + + val allNames = previous.keySet ++ next.keySet ++ main.keySet + + allNames + .map { name => + VersionUpdate(name, previous.get(name), next.get(name), main.get(name)) + } + .filter(_.isChanged) + .toSeq + .sortBy(_.name.name) + } + + def print(versionUpdates: Seq[VersionUpdate]): Unit = { + + val header = Seq("Name", "Next", "Previous", "Main") + TablePrinter.printTypedTable(header, versionUpdates)(_.toRow) + + } +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala index 35451045f7..af51c87965 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala @@ -9,7 +9,7 @@ import ai.chronon.orchestration.utils.CollectionExtensions.JListExtension // GroupBy implementation case class GroupByNodeImpl(groupBy: GroupBy) extends LogicalNodeImpl { - override def name: String = groupBy.metaData.name + override def name: String = "group_bys." + groupBy.metaData.name override def outputTables: Seq[String] = Seq(groupBy.metaData.outputTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala index a4e48ab528..1eab436909 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala @@ -10,7 +10,7 @@ import ai.chronon.orchestration.utils.TabularDataUtils // Join implementation case class JoinNodeImpl(join: Join) extends LogicalNodeImpl { - override def name: String = join.metaData.name + override def name: String = "joins." + join.metaData.name override def outputTables: Seq[String] = Seq(join.metaData.outputTable, join.metaData.loggedTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala index a8f82001c2..2f65584e96 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/LogicalNodeImpl.scala @@ -5,6 +5,7 @@ import ai.chronon.orchestration.TabularDataType // Base trait for common node operations trait LogicalNodeImpl { + // unique name for the node def name: String def outputTables: Seq[String] def toConfig: LogicalNode diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala index 0b5fd6476e..08e55e4b78 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/ModelNodeImpl.scala @@ -9,7 +9,7 @@ import ai.chronon.orchestration.utils.TabularDataUtils // Model implementation case class ModelNodeImpl(model: Model) extends LogicalNodeImpl { - override def name: String = model.metaData.name + override def name: String = "models." + model.metaData.name override def outputTables: Seq[String] = Seq(model.metaData.outputTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala index c568986c7d..0266b1b293 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala @@ -9,7 +9,7 @@ import ai.chronon.orchestration.utils.CollectionExtensions.JListExtension // StagingQuery implementation case class StagingQueryNodeImpl(stagingQuery: StagingQuery) extends LogicalNodeImpl { - override def name: String = stagingQuery.metaData.name + override def name: String = "staging_queries." + stagingQuery.metaData.name override def outputTables: Seq[String] = Seq(stagingQuery.metaData.outputTable) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala b/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala index 36c8cc6c5d..f0a5fe6866 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/logical/TabularDataNodeImpl.scala @@ -5,7 +5,7 @@ import ai.chronon.orchestration.TabularData import ai.chronon.orchestration.TabularDataType case class TabularDataNodeImpl(tabularData: TabularData) extends LogicalNodeImpl { - override def name: String = tabularData.table + override def name: String = "tabular_data." + tabularData.table override def outputTables: Seq[String] = Seq.empty diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala new file mode 100644 index 0000000000..cfb0d54f07 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala @@ -0,0 +1,71 @@ +package ai.chronon.orchestration.utils + +import scala.collection.mutable + +// class that maintains a sequence of unique values for each key +// and returns the index of the value efficiently +// useful for maintaining version +class SequenceMap[K, V] { + private case class SequenceEntry(valueToIndex: mutable.Map[V, Int], indexToValue: mutable.Map[Int, V]) { + + def insert(value: V): Int = { + if (valueToIndex.contains(value)) return valueToIndex(value) + + val newIndex = indexToValue.size + valueToIndex.update(value, newIndex) + indexToValue.update(newIndex, value) + newIndex + } + + def nextIndex: Int = indexToValue.size + + def contains(value: V): Boolean = valueToIndex.contains(value) + } + + private val map: mutable.Map[K, SequenceEntry] = mutable.Map.empty + + def insert(key: K, value: V): Int = { + require(key != null, "Key cannot be null") + require(value != null, "Value cannot be null") + + map.get(key) match { + case Some(entry) => entry.insert(value) + case None => + val entry = SequenceEntry(mutable.Map.empty, mutable.Map.empty) + val newIndex = entry.insert(value) + map.update(key, entry) + newIndex + } + } + + def contains(key: K, value: V): Boolean = { + require(key != null, "Key cannot be null") + require(value != null, "Value cannot be null") + + map.get(key) match { + case Some(entry) => entry.contains(value) + case None => false + } + } + + def potentialIndex(key: K, value: V): Int = { + require(key != null, "Key cannot be null") + require(value != null, "Value cannot be null") + + map.get(key) match { + case Some(entry) => entry.valueToIndex.getOrElse(value, entry.nextIndex) + case None => 0 + } + } + + def get(key: K, index: Int): V = { + require(key != null, "Key cannot be null") + require(index >= 0, "Index cannot be negative") + require(map.contains(key), s"Key $key not found") + + val indexToValue = map(key).indexToValue + require(indexToValue.contains(index), s"Index $index not found") + + indexToValue(index) + } +} diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala new file mode 100644 index 0000000000..d154538142 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/TablePrinter.scala @@ -0,0 +1,63 @@ +package ai.chronon.orchestration.utils + +object TablePrinter { + def printTable[T](headers: Seq[String], data: Seq[Seq[T]]): Unit = { + if (headers.isEmpty || data.isEmpty) { + println("No data to display") + return + } + + // Convert all values to strings and find the maximum width for each column + val stringData = data.map(_.map(_.toString)) + val colWidths = headers.indices.map { i => + val colValues = stringData.map(_(i)) + (colValues :+ headers(i)).map(_.length).max + } + + // Create the format string for each row + val formatStr = colWidths.map(w => s"%-${w}s").mkString(" | ") + + // Print headers + println("+" + colWidths.map(w => "-" * (w + 2)).mkString("+") + "+") + println("| " + String.format(formatStr, headers.map(_.asInstanceOf[AnyRef]): _*) + " |") + println("+" + colWidths.map(w => "-" * (w + 2)).mkString("+") + "+") + + // Print data + stringData.foreach { row => + println("| " + String.format(formatStr, row.map(_.asInstanceOf[AnyRef]): _*) + " |") + } + println("+" + colWidths.map(w => "-" * (w + 2)).mkString("+") + "+") + } + + // Convenience method for single-type tables + def printTypedTable[T](headers: Seq[String], data: Seq[T])(implicit extractor: T => Seq[Any]): Unit = { + printTable(headers, data.map(extractor)) + } +} + +// Example usage: +object TablePrinterExample extends App { + // Basic usage with raw data + val headers: Seq[String] = Seq("Name", "Age", "City") + val data: Seq[Seq[Any]] = Seq( + Seq("John Doe", 30, "New York"), + Seq("Jane Smith", 25, "Los Angeles"), + Seq("Bob Johnson", 35, "Chicago") + ) + + println("Basic Table Example:") + TablePrinter.printTable(headers, data) + + // Example with case class + case class Person(name: String, age: Int, city: String) + + val people: Seq[Person] = Seq( + Person("John Doe", 30, "New York"), + Person("Jane Smith", 25, "Los Angeles"), + Person("Bob Johnson", 35, "Chicago") + ) + + println("\nTyped Table Example:") + implicit val personExtractor: Person => Seq[Any] = p => Seq(p.name, p.age, p.city) + TablePrinter.printTypedTable(headers, people) +} diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala new file mode 100644 index 0000000000..152c6ec079 --- /dev/null +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -0,0 +1,119 @@ +package ai.chronon.orchestration.test + +import ai.chronon.orchestration.RepoIndex +import ai.chronon.orchestration.RepoTypes._ +import ai.chronon.orchestration.VersionUpdate +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.collection.mutable + +class RepoIndexSpec extends AnyFlatSpec with Matchers { + + case class TestConf(name: String, + queryVersion: String, + params: String, + parents: Seq[String], + outputs: Seq[String] = Seq.empty) + + class TestConfProcessor extends ConfProcessor[TestConf] { + + override def toLocalData(conf: TestConf): LocalData = LocalData( + + name = Name(conf.name), + + fileHash = FileHash(conf.hashCode().toHexString), + localHash = LocalHash(conf.queryVersion.hashCode().toHexString), + + inputs = conf.parents.map(Name), + outputs = conf.outputs.map(Name) + + ) + + + override def parse(name: String, fileContent: FileContent): Seq[TestConf] = ??? + } + + + + object TestWithMain { + def main(args: Array[String]): Unit = { + val ldf = new TestConfProcessor() + val confs = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb2", "v1", "4g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + val ld = confs.map(ldf.toLocalData) + print(ld) + } + } + + "RepoIndex" should "propagate updates" in { + val confs = Seq( + TestConf("t1", "", "", Seq.empty), + TestConf("t2", "", "", Seq.empty), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb2", "v1", "4g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + + val proc = new TestConfProcessor + val repoIndex = new RepoIndex[TestConf](proc) + + def fileHashes(configs: Seq[TestConf]): mutable.Map[Name, FileHash] = { + val nameHashPairs = configs.map(c => Name(c.name) -> proc.toLocalData(c).fileHash) + mutable.Map(nameHashPairs : _*) + } + + def updateIndex(confs: Seq[TestConf], branch: Branch): Seq[VersionUpdate] = { + val fileHashMap = fileHashes(confs) + val diffNodes = repoIndex.diff(fileHashMap).map(_.name).toSet + + repoIndex.addNodes( + fileHashMap, + confs.filter(c => diffNodes.contains(c.name)), + branch, + dryRun = false) + } + + val fileHashMap = fileHashes(confs) + val fileDiffs = repoIndex.diff(fileHashMap) + + fileDiffs.size shouldBe fileHashMap.size + fileDiffs.toSet shouldBe fileHashMap.keySet + + repoIndex.addNodes(fileHashes(confs), confs, Branch.main, dryRun = false) + + val testBranch = Branch("test") + + val branchConfs1 = Seq( + TestConf("t1", "", "", Seq("sq1")), + TestConf("t2", "", "", Seq.empty), + TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")), // updated + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb2", "v1", "4g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + + updateIndex(branchConfs1, testBranch) + + val branchConfs2 = Seq( + TestConf("t1", "", "", Seq("sq1")), + TestConf("t2", "", "", Seq.empty), + TestConf("sq1", "v3", "4g", Seq.empty, Seq("t1")), // updated + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb2", "v1", "4g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + updateIndex(branchConfs2, testBranch) + } + + +} diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala new file mode 100644 index 0000000000..f6489cb390 --- /dev/null +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/SequenceMapSpec.scala @@ -0,0 +1,134 @@ +package ai.chronon.orchestration.test + +import ai.chronon.orchestration.utils.SequenceMap +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SequenceMapSpec extends AnyFlatSpec with Matchers { + + "SequenceMap" should "insert and retrieve values with correct indices" in { + val sequenceMap = new SequenceMap[String, Int] + + // Insert values for a single key + sequenceMap.insert("key1", 100) should be(0) + sequenceMap.insert("key1", 200) should be(1) + sequenceMap.insert("key1", 300) should be(2) + + // Verify retrieval + sequenceMap.get("key1", 0) should be(100) + sequenceMap.get("key1", 1) should be(200) + sequenceMap.get("key1", 2) should be(300) + } + + it should "maintain unique indices for duplicate values" in { + val sequenceMap = new SequenceMap[String, Int] + + // Insert same value multiple times + val firstIndex = sequenceMap.insert("key1", 100) + val secondIndex = sequenceMap.insert("key1", 100) + + // Should return same index for duplicate values + firstIndex should be(secondIndex) + firstIndex should be(0) + + // Verify retrieval + sequenceMap.get("key1", 0) should be(100) + } + + it should "handle multiple keys independently" in { + val sequenceMap = new SequenceMap[String, String] + + // Insert values for different keys + sequenceMap.insert("key1", "value1") should be(0) + sequenceMap.insert("key2", "value2") should be(0) + sequenceMap.insert("key1", "value3") should be(1) + + // Verify independent sequences + sequenceMap.get("key1", 0) should be("value1") + sequenceMap.get("key1", 1) should be("value3") + sequenceMap.get("key2", 0) should be("value2") + } + + it should "correctly check contains for existing and non-existing values" in { + val sequenceMap = new SequenceMap[Int, String] + + sequenceMap.insert(1, "test") + + // Check existing combinations + sequenceMap.contains(1, "test") should be(true) + + // Check non-existing combinations + sequenceMap.contains(1, "nonexistent") should be(false) + sequenceMap.contains(2, "test") should be(false) + } + + it should "throw IllegalArgumentException for null keys" in { + val sequenceMap = new SequenceMap[String, String] + + val exception = intercept[IllegalArgumentException] { + sequenceMap.insert(null, "value") + } + exception.getMessage should be("requirement failed: Key cannot be null") + } + + it should "throw IllegalArgumentException for null values" in { + val sequenceMap = new SequenceMap[String, String] + + val exception = intercept[IllegalArgumentException] { + sequenceMap.insert("key", null) + } + exception.getMessage should be("requirement failed: Value cannot be null") + } + + it should "throw IllegalArgumentException for negative indices" in { + val sequenceMap = new SequenceMap[String, String] + sequenceMap.insert("key", "value") + + val exception = intercept[IllegalArgumentException] { + sequenceMap.get("key", -1) + } + exception.getMessage should be("requirement failed: Index cannot be negative") + } + + it should "throw IllegalArgumentException for non-existing keys in get" in { + val sequenceMap = new SequenceMap[String, String] + + val exception = intercept[IllegalArgumentException] { + sequenceMap.get("nonexistent", 0) + } + exception.getMessage should be("requirement failed: Key nonexistent not found") + } + + it should "throw IllegalArgumentException for non-existing indices" in { + val sequenceMap = new SequenceMap[String, String] + sequenceMap.insert("key", "value") + + val exception = intercept[IllegalArgumentException] { + sequenceMap.get("key", 1) + } + exception.getMessage should be("requirement failed: Index 1 not found") + } + + it should "maintain correct sequence for mixed operations" in { + val sequenceMap = new SequenceMap[String, Int] + + // Insert some values + sequenceMap.insert("key", 100) should be(0) + sequenceMap.insert("key", 200) should be(1) + + // Check contains + sequenceMap.contains("key", 100) should be(true) + sequenceMap.contains("key", 200) should be(true) + + // Insert duplicate + sequenceMap.insert("key", 100) should be(0) + + // Insert new value + sequenceMap.insert("key", 300) should be(2) + + // Verify final sequence + sequenceMap.get("key", 0) should be(100) + sequenceMap.get("key", 1) should be(200) + sequenceMap.get("key", 2) should be(300) + } +} \ No newline at end of file From 4beeee345e5345043ed74d3eb7603c028b565f4f Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Fri, 3 Jan 2025 18:29:14 -0800 Subject: [PATCH 03/10] logging - directly use log4j2-scala-api --- build.sbt | 19 ++++++------------- .../ai/chronon/orchestration/RepoIndex.scala | 11 +++++++---- .../ai/chronon/orchestration/RepoParser.scala | 12 ++---------- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/build.sbt b/build.sbt index 3815c6776f..85ee555108 100644 --- a/build.sbt +++ b/build.sbt @@ -391,15 +391,6 @@ lazy val hub = (project in file("hub")) ) -val scala_test = "org.scalatest" %% "scalatest" % "3.2.19" % "test" -val sl4j = "org.slf4j" % "slf4j-api" % slf4jApiVersion -val logback = "ch.qos.logback" % "logback-classic" % logbackClassicVersion -val commonDependencies = Seq( - scala_test, - sl4j, - logback -) - // orchestrator lazy val orchestration = project .dependsOn(online.%("compile->compile;test->test")) @@ -408,6 +399,8 @@ lazy val orchestration = project assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"), Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"), + Compile / unmanagedResourceDirectories += baseDirectory.value / "src" / "main" / "resources", + assembly / assemblyMergeStrategy := { case "log4j2.properties" => MergeStrategy.first case "META-INF/log4j-provider.properties" => MergeStrategy.first @@ -415,10 +408,10 @@ lazy val orchestration = project case x => (assembly / assemblyMergeStrategy).value(x) }, - libraryDependencies ++= commonDependencies ++ Seq( - "org.apache.logging.log4j" % "log4j-api" % log4j2_version, - "org.apache.logging.log4j" % "log4j-core" % log4j2_version, - "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2_version, + libraryDependencies ++= Seq( + "org.apache.logging.log4j" %% "log4j-api-scala" % "13.1.0", + "org.apache.logging.log4j" % "log4j-core" % "2.20.0", + "org.scalatest" %% "scalatest" % "3.2.19" % "test", ), ) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala index 984532dc39..55d50419e9 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -4,10 +4,11 @@ import ai.chronon.orchestration.RepoIndex._ import ai.chronon.orchestration.RepoTypes._ import ai.chronon.orchestration.utils.CollectionExtensions.IteratorExtensions import ai.chronon.orchestration.utils.SequenceMap +import org.apache.logging.log4j.scala.Logging import scala.collection.mutable -class RepoIndex[T](proc: ConfProcessor[T]) { +class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { // first pass updates private val branchToFileHash: TriMap[Branch, Name, FileHash] = mutable.Map.empty @@ -59,14 +60,16 @@ class RepoIndex[T](proc: ConfProcessor[T]) { // recursively compute parent hashes val parentHashes = parents .map { parent => - val parentHash = globalHashes.getOrElse(parent, computeGlobalHash(parent)).hash + val parentHash = computeGlobalHash(parent).hash s"${parent.name}:$parentHash" - } .mkString(",") // combine parent hashcode with local hash - val codeString = s"node=${name.name}:$localHash|parents=$parentHashes" + val codeString = s"node=${name.name}:${localHash.hash}|parents=$parentHashes" + + logger.info(s"codeString: $codeString") + val globalHash = GlobalHash(codeString.hashCode().toHexString) globalHashes.update(name, globalHash) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala index 06fde7fdfa..be6b52c254 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoParser.scala @@ -1,21 +1,13 @@ package ai.chronon.orchestration -import ai.chronon.api import ai.chronon.online.MetadataDirWalker -import ai.chronon.online.MetadataDirWalker.listFiles -import ai.chronon.online.MetadataDirWalker.parse -import ai.chronon.online.MetadataDirWalker.relativePath import ai.chronon.orchestration.RepoTypes._ -import org.slf4j.LoggerFactory +import org.apache.logging.log4j.scala.Logging import java.io.File -import scala.util.Failure -import scala.util.Success // parses a folder of -object RepoParser { - - private lazy val logger = LoggerFactory.getLogger(getClass) +object RepoParser extends App with Logging { private def readContent(file: File): FileContent = { val source = scala.io.Source.fromFile(file) From 866e5c6f4403b17a2cd4b7f05bf53fcdaa9bd9b5 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Fri, 3 Jan 2025 23:39:38 -0800 Subject: [PATCH 04/10] tests --- .../src/main/resources/log4j2.properties | 2 +- .../ai/chronon/orchestration/RepoIndex.scala | 13 +++-- .../orchestration/test/RepoIndexSpec.scala | 47 +++++++++++++++++-- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/orchestration/src/main/resources/log4j2.properties b/orchestration/src/main/resources/log4j2.properties index bea9badbab..7df679e8b0 100644 --- a/orchestration/src/main/resources/log4j2.properties +++ b/orchestration/src/main/resources/log4j2.properties @@ -7,7 +7,7 @@ appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_OUT appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %yellow{%d{yyyy/MM/dd HH:mm:ss}} %highlight{%-5level} %green{%file:%line} - %message%n +appender.console.layout.pattern = %cyan{%d{yyyy/MM/dd HH:mm:ss}} %highlight{%-5level} %magenta{%file:%line} - %message%n # Configure specific logger logger.chronon.name = ai.chronon diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala index 55d50419e9..26cba6d303 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -78,10 +78,10 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { val newVersions = mutable.Map.empty[Name, Version] - fileHashes.foreach { - case (name, _) => - val globalHash = computeGlobalHash(name) + fileHashes.foreach { case (name, _) => computeGlobalHash(name) } + globalHashes.foreach { + case (name, globalHash) => val versionIndex = versionSequencer.potentialIndex(name, globalHash) newVersions.update(name, Version("v" + versionIndex.toString)) } @@ -98,6 +98,13 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { case (name, (fileHash, content)) => update(fileHashToContent, name, fileHash, content) } + val newVersions = globalHashes.map { + case (name, globalHash) => + val versionIndex = versionSequencer.insert(name, globalHash) + val version = Version("v" + versionIndex.toString) + name -> version + } + branchToFileHash.update(branch, fileHashes) branchVersionIndex.update(branch, newVersions) diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala index 152c6ec079..523fc3f77d 100644 --- a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -53,7 +53,7 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers { "RepoIndex" should "propagate updates" in { val confs = Seq( - TestConf("t1", "", "", Seq.empty), + TestConf("t1", "", "", Seq("sq1")), TestConf("t2", "", "", Seq.empty), TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), TestConf("gb1", "v1", "4g", Seq("t1")), @@ -106,13 +106,54 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers { val branchConfs2 = Seq( TestConf("t1", "", "", Seq("sq1")), TestConf("t2", "", "", Seq.empty), - TestConf("sq1", "v3", "4g", Seq.empty, Seq("t1")), // updated + TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")), TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "4g", Seq("t2")), + TestConf("gb2", "v1", "8g", Seq("t2")), // non-semantic update TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), ) + updateIndex(branchConfs2, testBranch) + + val branchConfs3 = Seq( + TestConf("t1", "", "", Seq("sq1")), + TestConf("t2", "", "", Seq.empty), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb2", "v1", "8g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + updateIndex(branchConfs3, testBranch) + + val branchConfs4 = Seq( + TestConf("t1", "", "", Seq("sq1")), + // TestConf("t2", "", "", Seq.empty), // deleted + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back + TestConf("gb1", "v1", "4g", Seq("t1")), + // TestConf("gb2", "v1", "8g", Seq("t2")), // deleted + TestConf("j1", "v1", "4g", Seq("gb1"), Seq("table_j1")), // parent deleted + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + + updateIndex(branchConfs4, testBranch) + + updateIndex(branchConfs4, Branch.main) + + val branchConfs5 = Seq( + TestConf("t1", "", "", Seq("sq1")), + TestConf("t3", "", "", Seq("sq3")), // new + TestConf("t2", "", "", Seq.empty), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back + TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")), // new + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb3", "v1", "4g", Seq("t3")), + TestConf("gb2", "v1", "8g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")), // parent reverted + new + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + + updateIndex(branchConfs5, Branch.main) } From 8f1145dbd7d41e8b2bfd3af374c01b3715955023 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Sat, 4 Jan 2025 10:36:01 -0800 Subject: [PATCH 05/10] untracked files --- api/py/ai/chronon/staging_query.py | 42 +++++++++++++++++++ .../ai/chronon/orchestration/RepoTypes.scala | 33 +++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 api/py/ai/chronon/staging_query.py create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala diff --git a/api/py/ai/chronon/staging_query.py b/api/py/ai/chronon/staging_query.py new file mode 100644 index 0000000000..3f165d0b7a --- /dev/null +++ b/api/py/ai/chronon/staging_query.py @@ -0,0 +1,42 @@ +from ai.chronon.api.ttypes import MetaData, StagingQuery +import inspect +import json + + +# Takes in an conf object class like GroupBy, Join and StagingQuery +# And returns a function that dispatches the arguments correctly to the object class and inner metadata +# Remaining args will end up in object.metaData.customJson +def _metadata_shim(conf_class): + constructor_params = list(inspect.signature(conf_class.__init__).parameters.keys()) + assert ( + constructor_params[0] == "self" + ), "First param should be 'self', found {}".format(constructor_params[0]) + assert ( + constructor_params[1] == "metaData" + ), "Second param should be 'metaData', found {}".format(constructor_params[1]) + outer_params = constructor_params[2:] + metadata_params = list(inspect.signature(MetaData.__init__).parameters.keys())[1:] + intersected_params = set(outer_params) & set(metadata_params) + unioned_params = set(outer_params) | set(metadata_params) + err_msg = "Cannot shim {}, because params: {} are intersecting with MetaData's params".format( + conf_class, intersected_params + ) + assert len(intersected_params) == 0, err_msg + + def shimmed_func(**kwargs): + meta_kwargs = { + key: value for key, value in kwargs.items() if key in metadata_params + } + outer_kwargs = { + key: value for key, value in kwargs.items() if key in outer_params + } + custom_json_args = { + key: value for key, value in kwargs.items() if key not in unioned_params + } + meta = MetaData(customJson=json.dumps(custom_json_args), **meta_kwargs) + return conf_class(metaData=meta, **outer_kwargs) + + return shimmed_func + + +StagingQuery = _metadata_shim(StagingQuery) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala new file mode 100644 index 0000000000..cf13ebdf34 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala @@ -0,0 +1,33 @@ +package ai.chronon.orchestration + +object RepoTypes { + + case class Name(name: String) + + case class Branch(name: String) + + case class FileHash(hash: String) + + case class LocalHash(hash: String) + + case class GlobalHash(hash: String) + + case class LocalData(name: Name, fileHash: FileHash, localHash: LocalHash, inputs: Seq[Name], outputs: Seq[Name]) + + object Branch { + val main: Branch = Branch("main") + } + + case class Version(name: String) + + case class FileContent(content: String) { + def hash: FileHash = FileHash(content.hashCode().toHexString) + } + + case class Table(name: String) + + trait ConfProcessor[T] { + def toLocalData(t: T): LocalData + def parse(name: String, fileContent: FileContent): Seq[T] + } +} From 2fa86a408250bc55179fc8e9f52747bd8d7e6302 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Sat, 4 Jan 2025 10:41:42 -0800 Subject: [PATCH 06/10] dead code removal --- api/py/ai/chronon/staging_query.py | 42 ------------------- .../orchestration/test/RepoIndexSpec.scala | 18 -------- 2 files changed, 60 deletions(-) delete mode 100644 api/py/ai/chronon/staging_query.py diff --git a/api/py/ai/chronon/staging_query.py b/api/py/ai/chronon/staging_query.py deleted file mode 100644 index 3f165d0b7a..0000000000 --- a/api/py/ai/chronon/staging_query.py +++ /dev/null @@ -1,42 +0,0 @@ -from ai.chronon.api.ttypes import MetaData, StagingQuery -import inspect -import json - - -# Takes in an conf object class like GroupBy, Join and StagingQuery -# And returns a function that dispatches the arguments correctly to the object class and inner metadata -# Remaining args will end up in object.metaData.customJson -def _metadata_shim(conf_class): - constructor_params = list(inspect.signature(conf_class.__init__).parameters.keys()) - assert ( - constructor_params[0] == "self" - ), "First param should be 'self', found {}".format(constructor_params[0]) - assert ( - constructor_params[1] == "metaData" - ), "Second param should be 'metaData', found {}".format(constructor_params[1]) - outer_params = constructor_params[2:] - metadata_params = list(inspect.signature(MetaData.__init__).parameters.keys())[1:] - intersected_params = set(outer_params) & set(metadata_params) - unioned_params = set(outer_params) | set(metadata_params) - err_msg = "Cannot shim {}, because params: {} are intersecting with MetaData's params".format( - conf_class, intersected_params - ) - assert len(intersected_params) == 0, err_msg - - def shimmed_func(**kwargs): - meta_kwargs = { - key: value for key, value in kwargs.items() if key in metadata_params - } - outer_kwargs = { - key: value for key, value in kwargs.items() if key in outer_params - } - custom_json_args = { - key: value for key, value in kwargs.items() if key not in unioned_params - } - meta = MetaData(customJson=json.dumps(custom_json_args), **meta_kwargs) - return conf_class(metaData=meta, **outer_kwargs) - - return shimmed_func - - -StagingQuery = _metadata_shim(StagingQuery) diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala index 523fc3f77d..bf4022559a 100644 --- a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -30,27 +30,9 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers { ) - override def parse(name: String, fileContent: FileContent): Seq[TestConf] = ??? } - - - object TestWithMain { - def main(args: Array[String]): Unit = { - val ldf = new TestConfProcessor() - val confs = Seq( - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "4g", Seq("t2")), - TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), - ) - val ld = confs.map(ldf.toLocalData) - print(ld) - } - } - "RepoIndex" should "propagate updates" in { val confs = Seq( TestConf("t1", "", "", Seq("sq1")), From a77ee24ec413cd03c641312b6680bf2dd2dea46c Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Tue, 7 Jan 2025 21:19:51 -0800 Subject: [PATCH 07/10] working tests --- api/py/ai/chronon/utils.py | 1 + .../ai/chronon/orchestration/RepoIndex.scala | 116 +++++++++++++----- .../ai/chronon/orchestration/RepoTypes.scala | 71 ++++++++++- .../chronon/orchestration/VersionUpdate.scala | 3 - .../utils/StringExtensions.scala | 22 ++++ .../orchestration/test/RepoIndexSpec.scala | 102 ++++++++------- 6 files changed, 234 insertions(+), 81 deletions(-) create mode 100644 orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala diff --git a/api/py/ai/chronon/utils.py b/api/py/ai/chronon/utils.py index ca41033795..c5236b9551 100644 --- a/api/py/ai/chronon/utils.py +++ b/api/py/ai/chronon/utils.py @@ -262,6 +262,7 @@ def join_part_output_table_name(join, jp, full_name: bool = False): def partOutputTable(jp: JoinPart): String = (Seq(join.metaData.outputTable) ++ Option(jp.prefix) :+ jp.groupBy.metaData.cleanName).mkString("_") """ + print(join) if not join.metaData.name and isinstance(join, api.Join): __set_name(join, api.Join, "joins") return "_".join( diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala index 26cba6d303..7640cfa8e5 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -4,11 +4,12 @@ import ai.chronon.orchestration.RepoIndex._ import ai.chronon.orchestration.RepoTypes._ import ai.chronon.orchestration.utils.CollectionExtensions.IteratorExtensions import ai.chronon.orchestration.utils.SequenceMap +import ai.chronon.orchestration.utils.StringExtensions.StringOps import org.apache.logging.log4j.scala.Logging import scala.collection.mutable -class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { +class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { // first pass updates private val branchToFileHash: TriMap[Branch, Name, FileHash] = mutable.Map.empty @@ -19,30 +20,14 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { private val versionSequencer: SequenceMap[Name, GlobalHash] = new SequenceMap[Name, GlobalHash] def addNodes(fileHashes: mutable.Map[Name, FileHash], - nodes: Seq[T], + newNodes: Seq[T], branch: Branch, dryRun: Boolean = true): Seq[VersionUpdate] = { - val newContents = nodes.map { node => - val data = proc.toLocalData(node) - val nodeContent = NodeContent(data, node) - - require(data.fileHash == fileHashes(data.name), s"File hash mismatch for ${data.name}") - - data.name -> (data.fileHash -> nodeContent) - - }.toMap - - def getContents(name: Name, fileHash: FileHash): NodeContent[T] = { - - val incomingContents = newContents.get(name).map(_._2) - - lazy val existingContents = fileHashToContent - .get(name) - .flatMap(_.get(fileHash)) - - incomingContents.orElse(existingContents).get - } + val newContents = buildContentMap(proc, newNodes, fileHashes) + val enrichedFileHashes = newContents.map { + case (name, content) => name -> content.localData.fileHash + } ++ fileHashes val globalHashes = mutable.Map.empty[Name, GlobalHash] @@ -51,8 +36,27 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { if (globalHashes.contains(name)) return globalHashes(name) - val fileHash = fileHashes(name) - val content = getContents(name, fileHash) + val fileHash = enrichedFileHashes.get(name) match { + case Some(hash) => hash + + // this could be an artifact related to unchanged files on the branch + // we reach out to content index + // artifacts are just names with no content - so there should be just one entry + case None => + val hashToContent = fileHashToContent(name) + + require(hashToContent.nonEmpty, s"Expected 1 entry for artifact $name, found none") + require(hashToContent.size == 1, s"Expected 1 entry for artifact $name, found ${hashToContent.size}") + + hashToContent.head._1 + } + + val content = if (newContents.contains(name)) { + newContents(name) + } else { + // fetch + fileHashToContent(name)(fileHash) + } val localHash = content.localData.localHash val parents = content.localData.inputs @@ -70,7 +74,7 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { logger.info(s"codeString: $codeString") - val globalHash = GlobalHash(codeString.hashCode().toHexString) + val globalHash = GlobalHash(codeString.md5) globalHashes.update(name, globalHash) globalHash @@ -90,12 +94,12 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { val mainVersions = branchVersionIndex.getOrElse(Branch.main, mutable.Map.empty) val versionUpdates = VersionUpdate.join(newVersions, existingVersions, mainVersions) - VersionUpdate.print(versionUpdates) if (!dryRun) { + logger.info("Not a dry run! Inserting new nodes into the index into branch: " + branch.name) newContents.foreach { - case (name, (fileHash, content)) => update(fileHashToContent, name, fileHash, content) + case (name, content) => update(fileHashToContent, name, content.localData.fileHash, content) } val newVersions = globalHashes.map { @@ -105,7 +109,7 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { name -> version } - branchToFileHash.update(branch, fileHashes) + branchToFileHash.update(branch, enrichedFileHashes) branchVersionIndex.update(branch, newVersions) } @@ -141,7 +145,7 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { private def pruneContents(): Unit = { // collect unique hashes per name from every branch - val validHashes: mutable.Map[Name, mutable.HashSet[FileHash]] = innerValues(branchToFileHash) + val validHashes: mutable.Map[Name, mutable.HashSet[FileHash]] = innerKeyToValueSet(branchToFileHash) fileHashToContent.retain { case (name, fileHashMap) => @@ -173,14 +177,12 @@ class RepoIndex[T](proc: ConfProcessor[T]) extends Logging { object RepoIndex { - private case class NodeContent[T](localData: LocalData, conf: T) - private type TriMap[K1, K2, V] = mutable.Map[K1, mutable.Map[K2, V]] private def update[K1, K2, V](map: TriMap[K1, K2, V], k1: K1, k2: K2, v: V): Unit = map.getOrElseUpdate(k1, mutable.Map.empty).update(k2, v) - private def innerValues[K1, K2, V](map: TriMap[K1, K2, V]): mutable.Map[K2, mutable.HashSet[V]] = { + private def innerKeyToValueSet[K1, K2, V](map: TriMap[K1, K2, V]): mutable.Map[K2, mutable.HashSet[V]] = { val result = mutable.Map.empty[K2, mutable.HashSet[V]] map.values.foreach { innerMap => innerMap.foreach { @@ -191,4 +193,54 @@ object RepoIndex { result } + /** + * Takes data from repo parser and builds a local index for the repo parser + * We treat inputs and outputs that are not present in FileHashes as artifacts + * For these artifacts we create additional entries in the result + */ + def buildContentMap[T >: Null](proc: ConfProcessor[T], + nodes: Seq[T], + fileHashes: mutable.Map[Name, FileHash]): mutable.Map[Name, NodeContent[T]] = { + + val contentMap = mutable.Map.empty[Name, NodeContent[T]] + + // first pass - update non-artifact contents + for ( + node <- nodes; + nodeContent <- proc.nodeContents(node) + ) { + + val name = nodeContent.localData.name + contentMap.update(name, nodeContent) + + def updateContents(artifactName: Name, isOutput: Boolean): Unit = { + + // artifacts are not present in file hashes + if (fileHashes.contains(artifactName)) return + + val existingParents = if (contentMap.contains(artifactName)) { + contentMap(artifactName).localData.inputs + } else { + Seq.empty + } + + val newParents = if (isOutput) Seq(name) else Seq.empty + + val parents = (existingParents ++ newParents).distinct + + val artifactData = LocalData.forArtifact(artifactName, parents) + val artifactContent = NodeContent[T](artifactData, null) + + contentMap.update(artifactName, artifactContent) + + } + + nodeContent.localData.outputs.foreach { output => updateContents(output, isOutput = true) } + nodeContent.localData.inputs.foreach { input => updateContents(input, isOutput = false) } + + } + + contentMap + } + } diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala index cf13ebdf34..f53d5704f4 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoTypes.scala @@ -1,33 +1,96 @@ package ai.chronon.orchestration +import ai.chronon.orchestration.utils.StringExtensions.StringOps + +/** + * Types relevant to the orchestration layer. + * It is very easy to get raw strings mixed up in the indexing logic. + * So we guard them using case classes. For that reason we have a lot of case classes here. + */ object RepoTypes { + /** + * name of the node + * example: group_bys..., joins..., staging_queries... + * and also table.. - adding a dummy node for the table makes the code easier to write + */ case class Name(name: String) case class Branch(name: String) + object Branch { + val main: Branch = Branch("main") + } + + /** + * Take the file content string and hashes it + * Whenever this changes the cli will upload the file into the index. + */ case class FileHash(hash: String) + /** + * Local hash represents the computation defined in the file. + * In chronon api, anything field other than metadata is + * considered to impact the computation & consequently the output. + */ case class LocalHash(hash: String) + /** + * Global hash represents the computation defined in the file and all its dependencies. + * We recursively scan all the parents of the node to compute the global hash. + * + * `global_hash(node) = hash(node.local_hash + node.parents.map(global_hash))` + */ case class GlobalHash(hash: String) + /** + * Local data represents relevant information for lineage tracking + * that can be computed by parsing a file in *isolation*. + */ case class LocalData(name: Name, fileHash: FileHash, localHash: LocalHash, inputs: Seq[Name], outputs: Seq[Name]) - object Branch { - val main: Branch = Branch("main") + object LocalData { + + def forArtifact(name: Name, parents: Seq[Name]): LocalData = { + + val nameHash = name.name.md5 + + LocalData( + name, + FileHash(nameHash), + LocalHash(nameHash), + inputs = parents, + outputs = Seq.empty + ) + } } + /** + * Node content represents the actual data that is stored in the index. + * It is a combination of local data and the actual data that is stored in the index. + */ case class Version(name: String) + /** + * Content of the compiled file + * Currently TSimpleJsonProtocol serialized StagingQuery, Join, GroupBy thrift objects. + * Python compile.py will serialize user's python code into these objects and + * the [[RepoParser]] will pick them up and sync into [[RepoIndex]]. + */ case class FileContent(content: String) { - def hash: FileHash = FileHash(content.hashCode().toHexString) + def hash: FileHash = FileHash(content.md5) } + case class NodeContent[T](localData: LocalData, conf: T) + case class Table(name: String) + /** + * To make the code testable, we parameterize the Config with `T` + * You can see how this is used in [[RepoIndexSpec]] + */ trait ConfProcessor[T] { - def toLocalData(t: T): LocalData + def nodeContents(t: T): Seq[NodeContent[T]] def parse(name: String, fileContent: FileContent): Seq[T] } } diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala index a144e3d942..74df60f0eb 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala @@ -7,8 +7,6 @@ import scala.collection.mutable case class VersionUpdate(name: Name, previous: Option[Version], next: Option[Version], main: Option[Version]) { - private def isChanged: Boolean = next != main || next != previous - private def toRow: Seq[String] = Seq( name.name, @@ -30,7 +28,6 @@ object VersionUpdate { .map { name => VersionUpdate(name, previous.get(name), next.get(name), main.get(name)) } - .filter(_.isChanged) .toSeq .sortBy(_.name.name) } diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala new file mode 100644 index 0000000000..12a08aa702 --- /dev/null +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/StringExtensions.scala @@ -0,0 +1,22 @@ +package ai.chronon.orchestration.utils + +import ai.chronon.api.Constants + +import java.security.MessageDigest + +object StringExtensions { + + lazy val digester: ThreadLocal[MessageDigest] = new ThreadLocal[MessageDigest]() { + override def initialValue(): MessageDigest = MessageDigest.getInstance("MD5") + } + + implicit class StringOps(s: String) { + def md5: String = + digester + .get() + .digest(s.getBytes(Constants.UTF8)) + .map("%02x".format(_)) + .mkString + .take(8) + } +} diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala index bf4022559a..7aaa0333d4 100644 --- a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -3,12 +3,14 @@ package ai.chronon.orchestration.test import ai.chronon.orchestration.RepoIndex import ai.chronon.orchestration.RepoTypes._ import ai.chronon.orchestration.VersionUpdate +import ai.chronon.orchestration.utils.StringExtensions.StringOps +import org.apache.logging.log4j.scala.Logging import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import scala.collection.mutable -class RepoIndexSpec extends AnyFlatSpec with Matchers { +class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { case class TestConf(name: String, queryVersion: String, @@ -18,64 +20,89 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers { class TestConfProcessor extends ConfProcessor[TestConf] { - override def toLocalData(conf: TestConf): LocalData = LocalData( + override def nodeContents(conf: TestConf): Seq[NodeContent[TestConf]] = { - name = Name(conf.name), + val ld = LocalData( - fileHash = FileHash(conf.hashCode().toHexString), - localHash = LocalHash(conf.queryVersion.hashCode().toHexString), + name = Name(conf.name), - inputs = conf.parents.map(Name), - outputs = conf.outputs.map(Name) + fileHash = FileHash(conf.toString.md5), + localHash = LocalHash(conf.queryVersion.md5), - ) + inputs = conf.parents.map(Name), + outputs = conf.outputs.map(Name) + + ) + + Seq(NodeContent(ld, conf)) + } override def parse(name: String, fileContent: FileContent): Seq[TestConf] = ??? } "RepoIndex" should "propagate updates" in { - val confs = Seq( - TestConf("t1", "", "", Seq("sq1")), - TestConf("t2", "", "", Seq.empty), - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "4g", Seq("t2")), - TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), - ) + val proc = new TestConfProcessor val repoIndex = new RepoIndex[TestConf](proc) + + def fileHashes(configs: Seq[TestConf]): mutable.Map[Name, FileHash] = { - val nameHashPairs = configs.map(c => Name(c.name) -> proc.toLocalData(c).fileHash) + val nameHashPairs = configs + .flatMap(proc.nodeContents) + .map(nc => nc.localData.name -> nc.localData.fileHash) mutable.Map(nameHashPairs : _*) } - def updateIndex(confs: Seq[TestConf], branch: Branch): Seq[VersionUpdate] = { + def updateIndex(confs: Seq[TestConf], branch: Branch, commitMessage: String): Seq[VersionUpdate] = { + logger.info(s"Updating index branch @${branch.name} and commit - $commitMessage") val fileHashMap = fileHashes(confs) val diffNodes = repoIndex.diff(fileHashMap).map(_.name).toSet - repoIndex.addNodes( + logger.info("incoming files:\n " + fileHashMap.keySet.mkString("\n ")) + logger.info("diff nodes:\n " + diffNodes.mkString("\n ")) + + val updates = repoIndex.addNodes( fileHashMap, confs.filter(c => diffNodes.contains(c.name)), branch, dryRun = false) + + VersionUpdate.print(updates) + logger.info(s"Finished adding commit: $commitMessage\n\n") + updates } + val confs = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), + TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("gb2", "v1", "4g", Seq("t2")), + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + ) + val fileHashMap = fileHashes(confs) + + // check artifact nodes are present + val map = RepoIndex.buildContentMap(proc, confs, fileHashMap) + map.get(Name("t1")) shouldNot be(None) + map.get(Name("t2")) shouldNot be(None) + + + + logger.info(s"fileHashMap: $fileHashMap") + val fileDiffs = repoIndex.diff(fileHashMap) fileDiffs.size shouldBe fileHashMap.size fileDiffs.toSet shouldBe fileHashMap.keySet - repoIndex.addNodes(fileHashes(confs), confs, Branch.main, dryRun = false) + updateIndex(confs, Branch.main, "initial commit") val testBranch = Branch("test") val branchConfs1 = Seq( - TestConf("t1", "", "", Seq("sq1")), - TestConf("t2", "", "", Seq.empty), TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")), // updated TestConf("gb1", "v1", "4g", Seq("t1")), TestConf("gb2", "v1", "4g", Seq("t2")), @@ -83,11 +110,9 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers { TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), ) - updateIndex(branchConfs1, testBranch) + updateIndex(branchConfs1, testBranch, "semantically updated sq1") val branchConfs2 = Seq( - TestConf("t1", "", "", Seq("sq1")), - TestConf("t2", "", "", Seq.empty), TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")), TestConf("gb1", "v1", "4g", Seq("t1")), TestConf("gb2", "v1", "8g", Seq("t2")), // non-semantic update @@ -95,47 +120,40 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers { TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), ) - updateIndex(branchConfs2, testBranch) + updateIndex(branchConfs2, testBranch, "non semantically updated gb2") val branchConfs3 = Seq( - TestConf("t1", "", "", Seq("sq1")), - TestConf("t2", "", "", Seq.empty), TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back TestConf("gb1", "v1", "4g", Seq("t1")), TestConf("gb2", "v1", "8g", Seq("t2")), TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), ) - updateIndex(branchConfs3, testBranch) + updateIndex(branchConfs3, testBranch, "reverted back semantic update to sq1") val branchConfs4 = Seq( - TestConf("t1", "", "", Seq("sq1")), - // TestConf("t2", "", "", Seq.empty), // deleted - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), TestConf("gb1", "v1", "4g", Seq("t1")), // TestConf("gb2", "v1", "8g", Seq("t2")), // deleted TestConf("j1", "v1", "4g", Seq("gb1"), Seq("table_j1")), // parent deleted TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), ) - updateIndex(branchConfs4, testBranch) + updateIndex(branchConfs4, testBranch, "deleted gb2 (depends on t2)") - updateIndex(branchConfs4, Branch.main) + updateIndex(branchConfs4, Branch.main, "updated main with change in test branch") val branchConfs5 = Seq( - TestConf("t1", "", "", Seq("sq1")), - TestConf("t3", "", "", Seq("sq3")), // new - TestConf("t2", "", "", Seq.empty), - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")), // new TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb3", "v1", "4g", Seq("t3")), - TestConf("gb2", "v1", "8g", Seq("t2")), + TestConf("gb3", "v1", "4g", Seq("t3")), // new + TestConf("gb2", "v1", "8g", Seq("t2")), // gb2 added back TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")), // parent reverted + new TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), ) - updateIndex(branchConfs5, Branch.main) + updateIndex(branchConfs5, Branch.main, "new sq3 and gb3, un-deleted gb2") } From 1f14c98460ed1fe6827800d295353169f6156116 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Wed, 8 Jan 2025 15:57:33 -0800 Subject: [PATCH 08/10] test checks and comments --- .../ai/chronon/orchestration/RepoIndex.scala | 110 +++++++++++++----- .../chronon/orchestration/VersionUpdate.scala | 8 ++ .../orchestration/utils/SequenceMap.scala | 14 ++- .../orchestration/test/RepoIndexSpec.scala | 78 +++++++------ 4 files changed, 144 insertions(+), 66 deletions(-) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala index 7640cfa8e5..fa1ff8f1f5 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -9,16 +9,55 @@ import org.apache.logging.log4j.scala.Logging import scala.collection.mutable +/** + * Indexer to store and assign versions to nodes in the repo based on their lineage. + * This also manages versions and contents across multiple branches. + * + * Careful consideration has been given to avoid O(N*N) operations in this class. + * + * We use a generic conf type T - make it easy to test complex cases. + * + * We expect the cli tool to + * first, call [[diff]] to see which files are not already present in the index. + * second, call [[addFiles]] to add updated/new nodes to the index along with their branch name. + * you can also dry-run the [[addFiles]] method to see potential versions that will be assigned. + * upon closing of a branch, call [[pruneBranch]] to remove un-used the contents of the branch from the index. + * + * merging into master - is simply calling [[addNodes]] with the new contents of master post merge. + * + * NOTE: This class is not thread safe. Only one thread should access this index at a time. + */ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { - // first pass updates + /** + * We don't duplicate contents of files across branches and commits in the index + * We just store contents by fileHash and branch to fileHash mapping separately. + */ private val branchToFileHash: TriMap[Branch, Name, FileHash] = mutable.Map.empty private val fileHashToContent: TriMap[Name, FileHash, NodeContent[T]] = mutable.Map.empty - // second pass updates + /** + * Versions are globally unique, we compute global hashes for each node based on lineage + * and then assign a version to it. We store these versions per branch, node in the [[branchVersionIndex]] + */ private val branchVersionIndex: TriMap[Branch, Name, Version] = mutable.Map.empty + + /** + * We use a [[SequenceMap]] to then take this global hash and assign a version to it + * version entries in the sequencer are never deleted. So that, + * upon reverting changes we can refer to older version instead + * of bumping it unnecessarily. This will avoid re-computation of assets. + */ private val versionSequencer: SequenceMap[Name, GlobalHash] = new SequenceMap[Name, GlobalHash] + /** + * @param fileHashes map of all the file names to their hashes in the branch. + * @param newNodes nodes that aren't present in the index's [[fileHashToContent]] table. New nodes typically. + * @param branch branch on which the user is making the changes. + * @param dryRun when true shows potential versions that will be assigned to nodes without modifying the index. + * @return + * - a list of version updates that will be applied to the index. + */ def addNodes(fileHashes: mutable.Map[Name, FileHash], newNodes: Seq[T], branch: Branch, @@ -29,9 +68,13 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { case (name, content) => name -> content.localData.fileHash } ++ fileHashes + /** + * we turn local hash into global hash by combining it with parent hashes recursively + * global_hash(node) = hash(node.local_hash + node.parents.map(global_hash)) + * + * we use memoization to avoid recomputing global hashes via the [[globalHashes]] map + */ val globalHashes = mutable.Map.empty[Name, GlobalHash] - - // memoizes into globalHashes and recursively computes global hash from parents def computeGlobalHash(name: Name): GlobalHash = { if (globalHashes.contains(name)) return globalHashes(name) @@ -40,13 +83,14 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { case Some(hash) => hash // this could be an artifact related to unchanged files on the branch - // we reach out to content index + // so we reach out to content index // artifacts are just names with no content - so there should be just one entry case None => val hashToContent = fileHashToContent(name) - require(hashToContent.nonEmpty, s"Expected 1 entry for artifact $name, found none") require(hashToContent.size == 1, s"Expected 1 entry for artifact $name, found ${hashToContent.size}") + require(hashToContent.head._2.localData.fileHash.hash == name.name.md5, + s"Expected artifact $name to have no inputs") hashToContent.head._1 } @@ -54,7 +98,6 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { val content = if (newContents.contains(name)) { newContents(name) } else { - // fetch fileHashToContent(name)(fileHash) } @@ -72,7 +115,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { // combine parent hashcode with local hash val codeString = s"node=${name.name}:${localHash.hash}|parents=$parentHashes" - logger.info(s"codeString: $codeString") + logger.info(s"Global Hash elements: $codeString") val globalHash = GlobalHash(codeString.md5) @@ -80,24 +123,15 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { globalHash } - val newVersions = mutable.Map.empty[Name, Version] - fileHashes.foreach { case (name, _) => computeGlobalHash(name) } - globalHashes.foreach { - case (name, globalHash) => - val versionIndex = versionSequencer.potentialIndex(name, globalHash) - newVersions.update(name, Version("v" + versionIndex.toString)) - } - val existingVersions = branchVersionIndex.getOrElse(branch, mutable.Map.empty) val mainVersions = branchVersionIndex.getOrElse(Branch.main, mutable.Map.empty) - val versionUpdates = VersionUpdate.join(newVersions, existingVersions, mainVersions) - if (!dryRun) { logger.info("Not a dry run! Inserting new nodes into the index into branch: " + branch.name) + newContents.foreach { case (name, content) => update(fileHashToContent, name, content.localData.fileHash, content) } @@ -112,16 +146,37 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { branchToFileHash.update(branch, enrichedFileHashes) branchVersionIndex.update(branch, newVersions) + return VersionUpdate.join(newVersions, existingVersions, mainVersions) + } + + // dry run - don't insert into any members of the index + val newVersions = mutable.Map.empty[Name, Version] + globalHashes.foreach { + case (name, globalHash) => + val versionIndex = versionSequencer.potentialIndex(name, globalHash) + newVersions.update(name, Version("v" + versionIndex.toString)) } + val versionUpdates = VersionUpdate.join(newVersions, existingVersions, mainVersions) versionUpdates } + def addFiles(fileHashes: mutable.Map[Name, FileHash], updatedFiles: Map[String, String], branch: Branch): Unit = { + + val nodes: Seq[T] = updatedFiles.iterator.flatMap { + case (name, content) => + proc.parse(name, FileContent(content)) + }.distinct + + addNodes(fileHashes, nodes, branch) + } + // returns the contents of the files not present in the index def diff(incomingFileHashes: mutable.Map[Name, FileHash]): Seq[Name] = { incomingFileHashes .filter { + case (name, incomingHash) => val fileHashMap = fileHashToContent.get(name) @@ -135,13 +190,21 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { .toSeq } + /** + * Removes contents and index entries related to a branch + */ def pruneBranch(branch: Branch): Unit = { branchToFileHash.remove(branch) + branchVersionIndex.remove(branch) pruneContents() + } + /** + * Removes contents that are not referred to via [[FileHash]] in any branch + */ private def pruneContents(): Unit = { // collect unique hashes per name from every branch @@ -162,17 +225,6 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { fileHashMap.nonEmpty } } - - def addFiles(fileHashes: mutable.Map[Name, FileHash], updatedFiles: Map[String, String], branch: Branch): Unit = { - - val nodes: Seq[T] = updatedFiles.iterator.flatMap { - case (name, content) => - proc.parse(name, FileContent(content)) - }.distinct - - addNodes(fileHashes, nodes, branch) - } - } object RepoIndex { diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala index 74df60f0eb..57cb414645 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/VersionUpdate.scala @@ -32,6 +32,14 @@ object VersionUpdate { .sortBy(_.name.name) } + def toMap(versionUpdates: Seq[VersionUpdate]): Map[String, String] = { + + versionUpdates.map { versionUpdate => + versionUpdate.name.name -> versionUpdate.next.map(_.name).getOrElse("") + }.toMap + + } + def print(versionUpdates: Seq[VersionUpdate]): Unit = { val header = Seq("Name", "Next", "Previous", "Main") diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala index cfb0d54f07..e8968647a3 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/SequenceMap.scala @@ -2,10 +2,17 @@ package ai.chronon.orchestration.utils import scala.collection.mutable -// class that maintains a sequence of unique values for each key -// and returns the index of the value efficiently -// useful for maintaining version +/** + * [[SequenceMap]] maintains a sequence of unique values for each key + * while preserving the first insertion order and returning the index of the first-insertion. + * + * [[RepoIndex]] needs to compute versions of the whole repo on every call to its [[addNodes]] method. + * This map ensures that version assignment is fast. + * + * NOTE: Methods of this class are not thread safe. Only one thread should access this map at a time. + */ class SequenceMap[K, V] { + private case class SequenceEntry(valueToIndex: mutable.Map[V, Int], indexToValue: mutable.Map[Int, V]) { def insert(value: V): Int = { @@ -20,6 +27,7 @@ class SequenceMap[K, V] { def nextIndex: Int = indexToValue.size def contains(value: V): Boolean = valueToIndex.contains(value) + } private val map: mutable.Map[K, SequenceEntry] = mutable.Map.empty diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala index 7aaa0333d4..ab40274417 100644 --- a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -55,8 +55,12 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { mutable.Map(nameHashPairs : _*) } - def updateIndex(confs: Seq[TestConf], branch: Branch, commitMessage: String): Seq[VersionUpdate] = { + def updateIndex(confsWithExpectedVersions: Seq[(TestConf, String)], branch: Branch, commitMessage: String): Seq[VersionUpdate] = { logger.info(s"Updating index branch @${branch.name} and commit - $commitMessage") + + val expectedVersions = confsWithExpectedVersions.map(c => c._1.name -> c._2).toMap + + val confs = confsWithExpectedVersions.map(_._1) val fileHashMap = fileHashes(confs) val diffNodes = repoIndex.diff(fileHashMap).map(_.name).toSet @@ -69,23 +73,29 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { branch, dryRun = false) + val actualVersions = VersionUpdate.toMap(updates) + + expectedVersions.foreach { case (name, expectedVersion) => + actualVersions.get(name) shouldBe Some(expectedVersion) + } + VersionUpdate.print(updates) logger.info(s"Finished adding commit: $commitMessage\n\n") updates } val confs = Seq( - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "4g", Seq("t2")), - TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb2", "v1", "4g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v0", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v0", ) - val fileHashMap = fileHashes(confs) + val fileHashMap = fileHashes(confs.map(_._1)) // check artifact nodes are present - val map = RepoIndex.buildContentMap(proc, confs, fileHashMap) + val map = RepoIndex.buildContentMap(proc, confs.map(_._1), fileHashMap) map.get(Name("t1")) shouldNot be(None) map.get(Name("t2")) shouldNot be(None) @@ -103,40 +113,40 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { val testBranch = Branch("test") val branchConfs1 = Seq( - TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")), // updated - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "4g", Seq("t2")), - TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")) -> "v1", // updated + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v1", + TestConf("gb2", "v1", "4g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v1", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v1", ) updateIndex(branchConfs1, testBranch, "semantically updated sq1") val branchConfs2 = Seq( - TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")), - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "8g", Seq("t2")), // non-semantic update - TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + TestConf("sq1", "v2", "4g", Seq.empty, Seq("t1")) -> "v1", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v1", + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", // non-semantic update + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v1", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v1", ) updateIndex(branchConfs2, testBranch, "non semantically updated gb2") val branchConfs3 = Seq( - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), // reverted back - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb2", "v1", "8g", Seq("t2")), - TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")), - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", // reverted back + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2"), Seq("table_j1")) -> "v0", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v0", ) updateIndex(branchConfs3, testBranch, "reverted back semantic update to sq1") val branchConfs4 = Seq( - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), - TestConf("gb1", "v1", "4g", Seq("t1")), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", // TestConf("gb2", "v1", "8g", Seq("t2")), // deleted - TestConf("j1", "v1", "4g", Seq("gb1"), Seq("table_j1")), // parent deleted - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + TestConf("j1", "v1", "4g", Seq("gb1"), Seq("table_j1")) -> "v2", // parent deleted + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v2", ) updateIndex(branchConfs4, testBranch, "deleted gb2 (depends on t2)") @@ -144,13 +154,13 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { updateIndex(branchConfs4, Branch.main, "updated main with change in test branch") val branchConfs5 = Seq( - TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")), - TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")), // new - TestConf("gb1", "v1", "4g", Seq("t1")), - TestConf("gb3", "v1", "4g", Seq("t3")), // new - TestConf("gb2", "v1", "8g", Seq("t2")), // gb2 added back - TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")), // parent reverted + new - TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")), + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")) -> "v0", // new + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb3", "v1", "4g", Seq("t3")) -> "v0", // new + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", // gb2 added back + TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")) -> "v3", // parent reverted + new + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v3", ) updateIndex(branchConfs5, Branch.main, "new sq3 and gb3, un-deleted gb2") From ce411b4e3c1e62034b74e1dae4bb4d49646e5435 Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Wed, 8 Jan 2025 18:43:39 -0800 Subject: [PATCH 09/10] comments - functional global hash computation + comments fixes --- .../ai/chronon/orchestration/RepoIndex.scala | 38 ++++++++++--------- .../orchestration/test/RepoIndexSpec.scala | 13 +++++++ 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala index fa1ff8f1f5..d49b8cee81 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala @@ -52,7 +52,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { /** * @param fileHashes map of all the file names to their hashes in the branch. - * @param newNodes nodes that aren't present in the index's [[fileHashToContent]] table. New nodes typically. + * @param newNodes nodes that aren't already present in the index. De-duped across branches. * @param branch branch on which the user is making the changes. * @param dryRun when true shows potential versions that will be assigned to nodes without modifying the index. * @return @@ -74,8 +74,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { * * we use memoization to avoid recomputing global hashes via the [[globalHashes]] map */ - val globalHashes = mutable.Map.empty[Name, GlobalHash] - def computeGlobalHash(name: Name): GlobalHash = { + def computeGlobalHash(name: Name, globalHashes: mutable.Map[Name, GlobalHash]): GlobalHash = { if (globalHashes.contains(name)) return globalHashes(name) @@ -107,7 +106,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { // recursively compute parent hashes val parentHashes = parents .map { parent => - val parentHash = computeGlobalHash(parent).hash + val parentHash = computeGlobalHash(parent, globalHashes).hash s"${parent.name}:$parentHash" } .mkString(",") @@ -123,7 +122,9 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { globalHash } - fileHashes.foreach { case (name, _) => computeGlobalHash(name) } + val globalHashes = mutable.Map.empty[Name, GlobalHash] + // this line fills global hashes + fileHashes.foreach { case (name, _) => computeGlobalHash(name, globalHashes) } val existingVersions = branchVersionIndex.getOrElse(branch, mutable.Map.empty) val mainVersions = branchVersionIndex.getOrElse(Branch.main, mutable.Map.empty) @@ -146,19 +147,20 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { branchToFileHash.update(branch, enrichedFileHashes) branchVersionIndex.update(branch, newVersions) - return VersionUpdate.join(newVersions, existingVersions, mainVersions) - } + VersionUpdate.join(newVersions, existingVersions, mainVersions) - // dry run - don't insert into any members of the index - val newVersions = mutable.Map.empty[Name, Version] - globalHashes.foreach { - case (name, globalHash) => - val versionIndex = versionSequencer.potentialIndex(name, globalHash) - newVersions.update(name, Version("v" + versionIndex.toString)) - } + } else { + + // dry run - don't insert into any members of the index + val newVersions = mutable.Map.empty[Name, Version] + globalHashes.foreach { + case (name, globalHash) => + val versionIndex = versionSequencer.potentialIndex(name, globalHash) + newVersions.update(name, Version("v" + versionIndex.toString)) + } - val versionUpdates = VersionUpdate.join(newVersions, existingVersions, mainVersions) - versionUpdates + VersionUpdate.join(newVersions, existingVersions, mainVersions) + } } def addFiles(fileHashes: mutable.Map[Name, FileHash], updatedFiles: Map[String, String], branch: Branch): Unit = { @@ -171,7 +173,9 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging { addNodes(fileHashes, nodes, branch) } - // returns the contents of the files not present in the index + /** + * returns the names of the files whose contents haven't been found in the index across any of the versions + */ def diff(incomingFileHashes: mutable.Map[Name, FileHash]): Seq[Name] = { incomingFileHashes diff --git a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala index ab40274417..99b196d810 100644 --- a/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala +++ b/orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala @@ -164,6 +164,19 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging { ) updateIndex(branchConfs5, Branch.main, "new sq3 and gb3, un-deleted gb2") + + val branchConfs6 = Seq( + TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0", + TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")) -> "v0", + TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0", + TestConf("gb3", "v1", "4g", Seq("t3")) -> "v0", + TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0", + TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")) -> "v3", + TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v3", + TestConf("m2", "v1", "4g", Seq("j1"), Seq("table_m2")) -> "v0", + ) + + updateIndex(branchConfs6, Branch.main, "m2 is added") } From 9c58f340e06f9e36706ee8aa33ab3a46c3867a8a Mon Sep 17 00:00:00 2001 From: nikhil-zlai Date: Wed, 8 Jan 2025 19:15:18 -0800 Subject: [PATCH 10/10] sl4j in api for thrift - or it fails downstream --- build.sbt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index b70aee8541..bb12cb1787 100644 --- a/build.sbt +++ b/build.sbt @@ -125,7 +125,9 @@ lazy val api = project "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", "com.novocode" % "junit-interface" % "0.11" % "test", "org.scalatest" %% "scalatest" % "3.2.19" % "test", - "org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test" + "org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test", + // needed by thrift + "org.slf4j" % "slf4j-api" % slf4jApiVersion, ) ) @@ -408,6 +410,7 @@ lazy val orchestration = project libraryDependencies ++= Seq( "org.apache.logging.log4j" %% "log4j-api-scala" % "13.1.0", "org.apache.logging.log4j" % "log4j-core" % "2.20.0", +// "org.slf4j" % "slf4j-api" % slf4jApiVersion, "org.scalatest" %% "scalatest" % "3.2.19" % "test", ), )