diff --git a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala index ddb8325e64..fa176df38b 100644 --- a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala @@ -5,7 +5,7 @@ import ai.chronon.api.PartitionRange import ai.chronon.planner.NodeContent trait NodeRunner { - val TablePartitionsDataset = "TABLE_PARTITIONS" + val DefaultTablePartitionsDataset = "TABLE_PARTITIONS" def run(metadata: api.MetaData, conf: NodeContent, range: Option[PartitionRange]): Unit } diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index c188b579e5..541f44fb9f 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -6,6 +6,7 @@ import ai.chronon.api.{MetaData, PartitionRange, PartitionSpec, ThriftJsonCodec} import ai.chronon.online.Api import ai.chronon.online.KVStore.PutRequest import ai.chronon.planner._ +import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.submission.SparkSessionBuilder @@ -39,6 +40,10 @@ class BatchNodeRunnerArgs(args: Array[String]) extends ScallopConf(args) { val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API Store") + val tablePartitionsDataset = opt[String](required = true, + descr = "Name of table in kv store to use to keep track of partitions", + default = Option(DefaultTablePartitionsDataset)) + verify() } @@ -177,7 +182,11 @@ object BatchNodeRunner extends NodeRunner { run(metadata, conf, range.get, createTableUtils(metadata.name)) } - def runFromArgs(api: Api, confPath: String, startDs: String, endDs: String): Try[Unit] = { + def runFromArgs(api: Api, + confPath: String, + startDs: String, + endDs: String, + tablePartitionsDataset: String): Try[Unit] = { Try { val node = ThriftJsonCodec.fromJsonFile[Node](confPath, check = true) val metadata = node.metaData @@ -218,7 +227,7 @@ object BatchNodeRunner extends NodeRunner { } val kvStoreUpdates = kvStore.multiPut(allInputTablePartitions.map { case (tableName, allPartitions) => val partitionsJson = PartitionRange.collapsedPrint(allPartitions)(range.partitionSpec) - PutRequest(tableName.getBytes, partitionsJson.getBytes, TablePartitionsDataset) + PutRequest(tableName.getBytes, partitionsJson.getBytes, tablePartitionsDataset) }.toSeq) Await.result(kvStoreUpdates, Duration.Inf) @@ -250,7 +259,7 @@ object BatchNodeRunner extends NodeRunner { val outputTablePartitionsJson = PartitionRange.collapsedPrint(allOutputTablePartitions)(range.partitionSpec) val putRequest = PutRequest(metadata.executionInfo.outputTableInfo.table.getBytes, outputTablePartitionsJson.getBytes, - TablePartitionsDataset) + tablePartitionsDataset) val kvStoreUpdates = kvStore.put(putRequest) Await.result(kvStoreUpdates, Duration.Inf) logger.info(s"Successfully completed batch node runner for '${metadata.name}'") @@ -270,7 +279,11 @@ object BatchNodeRunner extends NodeRunner { try { val batchArgs = new BatchNodeRunnerArgs(args) val api = instantiateApi(batchArgs.onlineClass(), batchArgs.apiProps) - runFromArgs(api, batchArgs.confPath(), batchArgs.startDs(), batchArgs.endDs()) match { + runFromArgs(api, + batchArgs.confPath(), + batchArgs.startDs(), + batchArgs.endDs(), + batchArgs.tablePartitionsDataset()) match { case Success(_) => logger.info("Batch node runner completed successfully") System.exit(0) @@ -284,4 +297,6 @@ object BatchNodeRunner extends NodeRunner { System.exit(1) } } + + // override def tablePartitionsDataset(): String = tablePartitionsDataset } diff --git a/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala index ea0ac28662..65b127afca 100644 --- a/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala @@ -7,6 +7,7 @@ import ai.chronon.api._ import ai.chronon.online.Api import ai.chronon.online.fetcher.{FetchContext, MetadataStore} import ai.chronon.planner.{Node, NodeContent} +import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import org.rogach.scallop.ScallopConf import org.slf4j.{Logger, LoggerFactory} @@ -80,6 +81,8 @@ class KVUploadNodeRunner(api: Api) extends NodeRunner { throw e } } + + // override def tablePartitionsDataset(): String = tablePartitionsDataset } object KVUploadNodeRunner { @@ -91,6 +94,9 @@ object KVUploadNodeRunner { descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path") val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API Store") + val tablePartitionsDataset = opt[String](required = true, + descr = "Name of table in kv store to use to keep track of partitions", + default = Option(DefaultTablePartitionsDataset)) verify() } diff --git a/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala b/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala index 9f0f9af059..cab18e09c4 100644 --- a/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala +++ b/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala @@ -220,6 +220,8 @@ object JobSubmitterConstants { val GcpDataprocClusterNameEnvVar = "GCP_DATAPROC_CLUSTER_NAME" val GcpEnableUploadKVClientEnvVar = "ENABLE_UPLOAD_CLIENTS" + val TablePartitionsDatasetNameArgKeyword = "--table-partitions-dataset" + val CheckIfJobIsRunning = "check-if-job-is-running" val StreamingDeploy = "deploy" diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala index 64f2d1a5b8..6ef99d8d26 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala @@ -22,6 +22,7 @@ import ai.chronon.api.planner.{MetaDataUtils, TableDependencies} import ai.chronon.online.KVStore.PutRequest import ai.chronon.planner.{ExternalSourceSensorNode, MonolithJoinNode, Node, NodeContent} import ai.chronon.spark.batch.BatchNodeRunner +import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import ai.chronon.spark.submission.SparkSessionBuilder import ai.chronon.spark.test.{MockKVStore, TableTestUtils} import ai.chronon.spark.utils.MockApi @@ -210,7 +211,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val configPath = createTestConfigFile(twoDaysAgo, yesterday) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday) + val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset) result match { case Success(_) => @@ -227,7 +228,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Verify dataset name assertTrue("Should use TABLE_PARTITIONS dataset", - mockKVStore.putRequests.forall(_.dataset == "TABLE_PARTITIONS")) + mockKVStore.putRequests.forall(_.dataset == DefaultTablePartitionsDataset)) case Failure(exception) => fail(s"runFromArgs should have succeeded but failed with: ${exception.getMessage}") @@ -238,7 +239,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val configPath = createTestConfigFile(twoDaysAgo, today) // today's partition doesn't exist - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, today) + val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, today, DefaultTablePartitionsDataset) result match { case Success(_) => @@ -271,7 +272,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val configPath = createTestConfigFile(twoDaysAgo, yesterday) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday) + val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset) result match { case Success(_) => @@ -299,7 +300,8 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val configPath = createTestConfigFile(futureDate1, futureDate2) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, futureDate1, futureDate2) + val result = + BatchNodeRunner.runFromArgs(mockApi, configPath, futureDate1, futureDate2, DefaultTablePartitionsDataset) result match { case Success(_) => @@ -333,7 +335,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val threeDaysAgo = tableUtils.partitionSpec.before(twoDaysAgo) val configPath = createTestConfigFile(threeDaysAgo, today) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, threeDaysAgo, today) + val result = BatchNodeRunner.runFromArgs(mockApi, configPath, threeDaysAgo, today, DefaultTablePartitionsDataset) result match { case Success(_) => @@ -415,7 +417,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before partitionFormat = "yyyyMMdd" ) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday) + val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset) result match { case Success(_) =>