Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import ai.chronon.api.PartitionRange
import ai.chronon.planner.NodeContent
trait NodeRunner {

val TablePartitionsDataset = "TABLE_PARTITIONS"
val DefaultTablePartitionsDataset = "TABLE_PARTITIONS"

def run(metadata: api.MetaData, conf: NodeContent, range: Option[PartitionRange]): Unit
}
Expand Down
23 changes: 19 additions & 4 deletions spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ai.chronon.api.{MetaData, PartitionRange, PartitionSpec, ThriftJsonCodec}
import ai.chronon.online.Api
import ai.chronon.online.KVStore.PutRequest
import ai.chronon.planner._
import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.join.UnionJoin
import ai.chronon.spark.submission.SparkSessionBuilder
Expand Down Expand Up @@ -39,6 +40,10 @@ class BatchNodeRunnerArgs(args: Array[String]) extends ScallopConf(args) {

val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API Store")

val tablePartitionsDataset = opt[String](required = true,
descr = "Name of table in kv store to use to keep track of partitions",
default = Option(DefaultTablePartitionsDataset))

verify()
}

Expand Down Expand Up @@ -177,7 +182,11 @@ object BatchNodeRunner extends NodeRunner {
run(metadata, conf, range.get, createTableUtils(metadata.name))
}

def runFromArgs(api: Api, confPath: String, startDs: String, endDs: String): Try[Unit] = {
def runFromArgs(api: Api,
confPath: String,
startDs: String,
endDs: String,
tablePartitionsDataset: String): Try[Unit] = {
Try {
val node = ThriftJsonCodec.fromJsonFile[Node](confPath, check = true)
val metadata = node.metaData
Expand Down Expand Up @@ -218,7 +227,7 @@ object BatchNodeRunner extends NodeRunner {
}
val kvStoreUpdates = kvStore.multiPut(allInputTablePartitions.map { case (tableName, allPartitions) =>
val partitionsJson = PartitionRange.collapsedPrint(allPartitions)(range.partitionSpec)
PutRequest(tableName.getBytes, partitionsJson.getBytes, TablePartitionsDataset)
PutRequest(tableName.getBytes, partitionsJson.getBytes, tablePartitionsDataset)
}.toSeq)

Await.result(kvStoreUpdates, Duration.Inf)
Expand Down Expand Up @@ -250,7 +259,7 @@ object BatchNodeRunner extends NodeRunner {
val outputTablePartitionsJson = PartitionRange.collapsedPrint(allOutputTablePartitions)(range.partitionSpec)
val putRequest = PutRequest(metadata.executionInfo.outputTableInfo.table.getBytes,
outputTablePartitionsJson.getBytes,
TablePartitionsDataset)
tablePartitionsDataset)
val kvStoreUpdates = kvStore.put(putRequest)
Await.result(kvStoreUpdates, Duration.Inf)
logger.info(s"Successfully completed batch node runner for '${metadata.name}'")
Expand All @@ -270,7 +279,11 @@ object BatchNodeRunner extends NodeRunner {
try {
val batchArgs = new BatchNodeRunnerArgs(args)
val api = instantiateApi(batchArgs.onlineClass(), batchArgs.apiProps)
runFromArgs(api, batchArgs.confPath(), batchArgs.startDs(), batchArgs.endDs()) match {
runFromArgs(api,
batchArgs.confPath(),
batchArgs.startDs(),
batchArgs.endDs(),
batchArgs.tablePartitionsDataset()) match {
case Success(_) =>
logger.info("Batch node runner completed successfully")
System.exit(0)
Expand All @@ -284,4 +297,6 @@ object BatchNodeRunner extends NodeRunner {
System.exit(1)
}
}

// override def tablePartitionsDataset(): String = tablePartitionsDataset
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ai.chronon.api._
import ai.chronon.online.Api
import ai.chronon.online.fetcher.{FetchContext, MetadataStore}
import ai.chronon.planner.{Node, NodeContent}
import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset
import org.rogach.scallop.ScallopConf
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -80,6 +81,8 @@ class KVUploadNodeRunner(api: Api) extends NodeRunner {
throw e
}
}

// override def tablePartitionsDataset(): String = tablePartitionsDataset
}

object KVUploadNodeRunner {
Expand All @@ -91,6 +94,9 @@ object KVUploadNodeRunner {
descr =
"Fully qualified Online.Api based class. We expect the jar to be on the class path")
val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API Store")
val tablePartitionsDataset = opt[String](required = true,
descr = "Name of table in kv store to use to keep track of partitions",
default = Option(DefaultTablePartitionsDataset))
verify()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ object JobSubmitterConstants {
val GcpDataprocClusterNameEnvVar = "GCP_DATAPROC_CLUSTER_NAME"
val GcpEnableUploadKVClientEnvVar = "ENABLE_UPLOAD_CLIENTS"

val TablePartitionsDatasetNameArgKeyword = "--table-partitions-dataset"

val CheckIfJobIsRunning = "check-if-job-is-running"
val StreamingDeploy = "deploy"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import ai.chronon.api.planner.{MetaDataUtils, TableDependencies}
import ai.chronon.online.KVStore.PutRequest
import ai.chronon.planner.{ExternalSourceSensorNode, MonolithJoinNode, Node, NodeContent}
import ai.chronon.spark.batch.BatchNodeRunner
import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset
import ai.chronon.spark.submission.SparkSessionBuilder
import ai.chronon.spark.test.{MockKVStore, TableTestUtils}
import ai.chronon.spark.utils.MockApi
Expand Down Expand Up @@ -210,7 +211,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before

val configPath = createTestConfigFile(twoDaysAgo, yesterday)

val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday)
val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset)

result match {
case Success(_) =>
Expand All @@ -227,7 +228,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before

// Verify dataset name
assertTrue("Should use TABLE_PARTITIONS dataset",
mockKVStore.putRequests.forall(_.dataset == "TABLE_PARTITIONS"))
mockKVStore.putRequests.forall(_.dataset == DefaultTablePartitionsDataset))

case Failure(exception) =>
fail(s"runFromArgs should have succeeded but failed with: ${exception.getMessage}")
Expand All @@ -238,7 +239,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before

val configPath = createTestConfigFile(twoDaysAgo, today) // today's partition doesn't exist

val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, today)
val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, today, DefaultTablePartitionsDataset)

result match {
case Success(_) =>
Expand Down Expand Up @@ -271,7 +272,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before

val configPath = createTestConfigFile(twoDaysAgo, yesterday)

val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday)
val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset)

result match {
case Success(_) =>
Expand Down Expand Up @@ -299,7 +300,8 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before

val configPath = createTestConfigFile(futureDate1, futureDate2)

val result = BatchNodeRunner.runFromArgs(mockApi, configPath, futureDate1, futureDate2)
val result =
BatchNodeRunner.runFromArgs(mockApi, configPath, futureDate1, futureDate2, DefaultTablePartitionsDataset)

result match {
case Success(_) =>
Expand Down Expand Up @@ -333,7 +335,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before
val threeDaysAgo = tableUtils.partitionSpec.before(twoDaysAgo)
val configPath = createTestConfigFile(threeDaysAgo, today)

val result = BatchNodeRunner.runFromArgs(mockApi, configPath, threeDaysAgo, today)
val result = BatchNodeRunner.runFromArgs(mockApi, configPath, threeDaysAgo, today, DefaultTablePartitionsDataset)

result match {
case Success(_) =>
Expand Down Expand Up @@ -415,7 +417,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before
partitionFormat = "yyyyMMdd"
)

val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday)
val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset)

result match {
case Success(_) =>
Expand Down