Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ai.chronon.spark.batch

import ai.chronon.api.{MetaData, PartitionRange, PartitionSpec, ThriftJsonCodec}
import ai.chronon.api.planner.NodeRunner
import ai.chronon.planner.{GroupByUploadNode, MonolithJoinNode, NodeContent, StagingQueryNode}
import ai.chronon.planner.{GroupByUploadNode, MonolithJoinNode, Node, NodeContent, StagingQueryNode}
import ai.chronon.spark.{GroupByUpload, Join}
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.join.UnionJoin
Expand Down Expand Up @@ -120,27 +120,16 @@ object BatchNodeRunner extends NodeRunner {
run(metadata, conf, range.get, createTableUtils(metadata.name))
}

private[batch] def loadNodeContent(confPath: String): (MetaData, NodeContent) = {
val nodeContent = ThriftJsonCodec.fromJsonFile[NodeContent](confPath, check = true)
(nodeContent.getSetField match {
case NodeContent._Fields.MONOLITH_JOIN => nodeContent.getMonolithJoin.join.metaData
case NodeContent._Fields.STAGING_QUERY => nodeContent.getStagingQuery.stagingQuery.metaData
case NodeContent._Fields.GROUP_BY_UPLOAD => nodeContent.getGroupByUpload.groupBy.metaData
case other => throw new UnsupportedOperationException(s"NodeContent type ${other} not supported")
},
nodeContent)
}

def runFromArgs(api: Api, confPath: String, startDs: String, endDs: String): Try[Unit] = {
Try {
val range = PartitionRange(startDs, endDs)(PartitionSpec.daily)
// TODO(tchow): implement partition listing
val kvStore = api.genKvStore
val (metadata, nodeContent) = loadNodeContent(confPath)
val node = ThriftJsonCodec.fromJsonFile[Node](confPath, check = true)

logger.info(s"Starting batch node runner for '${metadata.name}'")
run(metadata, nodeContent, Option(range))
logger.info(s"Successfully completed batch node runner for '${metadata.name}'")
logger.info(s"Starting batch node runner for '${node.metaData.name}'")
run(node.metaData, node.content, Option(range))
logger.info(s"Successfully completed batch node runner for '${node.metaData.name}'")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ai.chronon.api.planner.NodeRunner
import ai.chronon.api._
import ai.chronon.online.Api
import ai.chronon.online.fetcher.{FetchContext, MetadataStore}
import ai.chronon.planner.NodeContent
import ai.chronon.planner.{Node, NodeContent}
import org.rogach.scallop.ScallopConf
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -124,31 +124,16 @@ object KVUploadNodeRunner {

def runFromArgs(confPath: String, endDs: String, onlineClass: String, props: Map[String, String]): Try[Unit] = {
Try {
val nodeContent = loadNodeContent(confPath)
val metadata = extractMetadata(nodeContent)
val node = ThriftJsonCodec.fromJsonFile[Node](confPath, check = true)
val metadata = node.metaData

val api = instantiateApi(onlineClass, props)

implicit val partitionSpec: PartitionSpec = PartitionSpec.daily
val range = Some(PartitionRange(null, endDs))

val runner = new KVUploadNodeRunner(api)
runner.run(metadata, nodeContent, range)
}
}

private def loadNodeContent(confPath: String): NodeContent = {
ThriftJsonCodec.fromJsonFile[NodeContent](confPath, check = true)
}

private def extractMetadata(nodeContent: NodeContent): MetaData = {
nodeContent.getSetField match {
case NodeContent._Fields.GROUP_BY_UPLOAD_TO_KV =>
nodeContent.getGroupByUploadToKV.groupBy.metaData
case NodeContent._Fields.JOIN_METADATA_UPLOAD =>
nodeContent.getJoinMetadataUpload.join.metaData
case other =>
throw new IllegalArgumentException(s"Unsupported node type: $other")
runner.run(metadata, node.content, range)
}
}
}
Loading