diff --git a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala index fa176df38b..2b6914b8d9 100644 --- a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala @@ -5,11 +5,13 @@ import ai.chronon.api.PartitionRange import ai.chronon.planner.NodeContent trait NodeRunner { - val DefaultTablePartitionsDataset = "TABLE_PARTITIONS" - def run(metadata: api.MetaData, conf: NodeContent, range: Option[PartitionRange]): Unit } +object NodeRunner { + val DefaultTablePartitionsDataset = "TABLE_PARTITIONS" +} + object LineageOfflineRunner { def readFiles(folderPath: String): Seq[Any] = { // read files from folder using metadata diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index baf2aacf8f..5c3e859dcf 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -6,7 +6,6 @@ import ai.chronon.api.{MetaData, PartitionRange, PartitionSpec, ThriftJsonCodec} import ai.chronon.online.Api import ai.chronon.online.KVStore.PutRequest import ai.chronon.planner._ -import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.submission.SparkSessionBuilder @@ -42,19 +41,15 @@ class BatchNodeRunnerArgs(args: Array[String]) extends ScallopConf(args) { val tablePartitionsDataset = opt[String](required = true, descr = "Name of table in kv store to use to keep track of partitions", - default = Option(DefaultTablePartitionsDataset)) + default = Option(NodeRunner.DefaultTablePartitionsDataset)) verify() } -object BatchNodeRunner extends NodeRunner { - +class BatchNodeRunner(node: Node, tableUtils: TableUtils) extends NodeRunner { @transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass) - def checkPartitions(conf: ExternalSourceSensorNode, - metadata: MetaData, - tableUtils: TableUtils, - range: PartitionRange): Try[Unit] = { + def checkPartitions(conf: ExternalSourceSensorNode, metadata: MetaData, range: PartitionRange): Try[Unit] = { val tableName = conf.sourceName val retryCount = if (conf.isSetRetryCount) conf.retryCount else 3L val retryIntervalMin = if (conf.isSetRetryIntervalMin) conf.retryIntervalMin else 3L @@ -88,47 +83,7 @@ object BatchNodeRunner extends NodeRunner { retry(0) } - private def createTableUtils(name: String): TableUtils = { - val spark = SparkSessionBuilder.build(s"batch-node-runner-${name}") - TableUtils(spark) - } - - private[batch] def run(metadata: MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit = { - - conf.getSetField match { - case NodeContent._Fields.MONOLITH_JOIN => - runMonolithJoin(metadata, conf.getMonolithJoin, range, tableUtils) - case NodeContent._Fields.GROUP_BY_UPLOAD => - runGroupByUpload(metadata, conf.getGroupByUpload, range, tableUtils) - case NodeContent._Fields.GROUP_BY_BACKFILL => - logger.info(s"Running groupBy backfill for '${metadata.name}' for range: [${range.start}, ${range.end}]") - GroupBy.computeBackfill( - conf.getGroupByBackfill.groupBy, - range.end, - tableUtils, - overrideStartPartition = Option(range.start) - ) - logger.info(s"Successfully completed groupBy backfill for '${metadata.name}'") - case NodeContent._Fields.STAGING_QUERY => - runStagingQuery(metadata, conf.getStagingQuery, range, tableUtils) - case NodeContent._Fields.EXTERNAL_SOURCE_SENSOR => { - - checkPartitions(conf.getExternalSourceSensor, metadata, tableUtils, range) match { - case Success(_) => System.exit(0) - case Failure(exception) => - logger.error(s"ExternalSourceSensor check failed.", exception) - System.exit(1) - } - } - case _ => - throw new UnsupportedOperationException(s"Unsupported NodeContent type: ${conf.getSetField}") - } - } - - private def runStagingQuery(metaData: MetaData, - stagingQuery: StagingQueryNode, - range: PartitionRange, - tableUtils: TableUtils): Unit = { + private def runStagingQuery(metaData: MetaData, stagingQuery: StagingQueryNode, range: PartitionRange): Unit = { require(stagingQuery.isSetStagingQuery, "StagingQueryNode must have a stagingQuery set") logger.info(s"Running staging query for '${metaData.name}'") val stagingQueryConf = stagingQuery.stagingQuery @@ -143,22 +98,16 @@ object BatchNodeRunner extends NodeRunner { logger.info(s"Successfully completed staging query for '${metaData.name}'") } - private def runGroupByUpload(metadata: MetaData, - groupByUpload: GroupByUploadNode, - range: PartitionRange, - tableUtils: TableUtils): Unit = { + private def runGroupByUpload(metadata: MetaData, groupByUpload: GroupByUploadNode, range: PartitionRange): Unit = { require(groupByUpload.isSetGroupBy, "GroupByUploadNode must have a groupBy set") val groupBy = groupByUpload.groupBy logger.info(s"Running groupBy upload for '${metadata.name}' for day: ${range.end}") - GroupByUpload.run(groupBy, range.end, Some(tableUtils)) + GroupByUpload.run(groupBy, range.end, Option(tableUtils)) logger.info(s"Successfully completed groupBy upload for '${metadata.name}' for day: ${range.end}") } - private def runMonolithJoin(metadata: MetaData, - monolithJoin: MonolithJoinNode, - range: PartitionRange, - tableUtils: TableUtils): Unit = { + private def runMonolithJoin(metadata: MetaData, monolithJoin: MonolithJoinNode, range: PartitionRange): Unit = { require(monolithJoin.isSetJoin, "MonolithJoinNode must have a join set") val joinConf = monolithJoin.join @@ -185,21 +134,47 @@ object BatchNodeRunner extends NodeRunner { } } - override def run(metadata: MetaData, conf: NodeContent, range: Option[PartitionRange]): Unit = { - require(range.isDefined, "Partition range must be defined for batch node runner") + override def run(metadata: MetaData, conf: NodeContent, maybeRange: Option[PartitionRange]): Unit = { + require(maybeRange.isDefined, "Partition range must be defined for batch node runner") + val range = maybeRange.get + conf.getSetField match { + case NodeContent._Fields.MONOLITH_JOIN => + runMonolithJoin(metadata, conf.getMonolithJoin, range) + case NodeContent._Fields.GROUP_BY_UPLOAD => + runGroupByUpload(metadata, conf.getGroupByUpload, range) + case NodeContent._Fields.GROUP_BY_BACKFILL => + logger.info(s"Running groupBy backfill for '${metadata.name}' for range: [${range.start}, ${range.end}]") + GroupBy.computeBackfill( + conf.getGroupByBackfill.groupBy, + range.end, + tableUtils, + overrideStartPartition = Option(range.start) + ) + logger.info(s"Successfully completed groupBy backfill for '${metadata.name}'") + case NodeContent._Fields.STAGING_QUERY => + runStagingQuery(metadata, conf.getStagingQuery, range) + case NodeContent._Fields.EXTERNAL_SOURCE_SENSOR => { - run(metadata, conf, range.get, createTableUtils(metadata.name)) + checkPartitions(conf.getExternalSourceSensor, metadata, range) match { + case Success(_) => System.exit(0) + case Failure(exception) => + logger.error(s"ExternalSourceSensor check failed.", exception) + System.exit(1) + } + } + case _ => + throw new UnsupportedOperationException(s"Unsupported NodeContent type: ${conf.getSetField}") + } } - def runFromArgs(api: Api, - confPath: String, - startDs: String, - endDs: String, - tablePartitionsDataset: String): Try[Unit] = { + def runFromArgs( + api: Api, + startDs: String, + endDs: String, + tablePartitionsDataset: String + ): Try[Unit] = { Try { - val node = ThriftJsonCodec.fromJsonFile[Node](confPath, check = true) val metadata = node.metaData - val tableUtils = createTableUtils(metadata.name) val range = PartitionRange(startDs, endDs)(PartitionSpec.daily) val kvStore = api.genKvStore @@ -275,37 +250,35 @@ object BatchNodeRunner extends NodeRunner { } } } +} - def instantiateApi(onlineClass: String, props: Map[String, String]): Api = { - val cl = Thread.currentThread().getContextClassLoader - val cls = cl.loadClass(onlineClass) - val constructor = cls.getConstructors.apply(0) - val onlineImpl = constructor.newInstance(props) - onlineImpl.asInstanceOf[Api] - } +object BatchNodeRunner { def main(args: Array[String]): Unit = { - try { - val batchArgs = new BatchNodeRunnerArgs(args) - val api = instantiateApi(batchArgs.onlineClass(), batchArgs.apiProps) - runFromArgs(api, - batchArgs.confPath(), - batchArgs.startDs(), - batchArgs.endDs(), - batchArgs.tablePartitionsDataset()) match { + val batchArgs = new BatchNodeRunnerArgs(args) + val node = ThriftJsonCodec.fromJsonFile[Node](batchArgs.confPath(), check = true) + val tableUtils = TableUtils(SparkSessionBuilder.build(s"batch-node-runner-${node.metaData.name}")) + val runner = new BatchNodeRunner(node, tableUtils) + val api = instantiateApi(batchArgs.onlineClass(), batchArgs.apiProps) + val exitCode = { + runner.runFromArgs(api, batchArgs.startDs(), batchArgs.endDs(), batchArgs.tablePartitionsDataset()) match { case Success(_) => - logger.info("Batch node runner completed successfully") - System.exit(0) + println("Batch node runner succeeded") + 0 case Failure(exception) => - logger.error("Batch node runner failed", exception) - System.exit(1) + println("Batch node runner failed", exception) + 1 } - } catch { - case e: Exception => - logger.error("Failed to parse arguments or initialize runner", e) - System.exit(1) } + tableUtils.sparkSession.stop() + System.exit(exitCode) } - // override def tablePartitionsDataset(): String = tablePartitionsDataset + def instantiateApi(onlineClass: String, props: Map[String, String]): Api = { + val cl = Thread.currentThread().getContextClassLoader + val cls = cl.loadClass(onlineClass) + val constructor = cls.getConstructors.apply(0) + val onlineImpl = constructor.newInstance(props) + onlineImpl.asInstanceOf[Api] + } } diff --git a/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala index 65b127afca..d1b216f2fb 100644 --- a/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/kv_store/KVUploadNodeRunner.scala @@ -2,12 +2,11 @@ package ai.chronon.spark.kv_store import ai.chronon.api.Constants.MetadataDataset import ai.chronon.api.Extensions.MetadataOps -import ai.chronon.api.planner.NodeRunner import ai.chronon.api._ +import ai.chronon.api.planner.NodeRunner import ai.chronon.online.Api import ai.chronon.online.fetcher.{FetchContext, MetadataStore} import ai.chronon.planner.{Node, NodeContent} -import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import org.rogach.scallop.ScallopConf import org.slf4j.{Logger, LoggerFactory} @@ -82,7 +81,6 @@ class KVUploadNodeRunner(api: Api) extends NodeRunner { } } - // override def tablePartitionsDataset(): String = tablePartitionsDataset } object KVUploadNodeRunner { @@ -96,7 +94,7 @@ object KVUploadNodeRunner { val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API Store") val tablePartitionsDataset = opt[String](required = true, descr = "Name of table in kv store to use to keep track of partitions", - default = Option(DefaultTablePartitionsDataset)) + default = Option(NodeRunner.DefaultTablePartitionsDataset)) verify() } diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala index 6ef99d8d26..5cf866a588 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala @@ -18,11 +18,10 @@ package ai.chronon.spark.test.batch import ai.chronon.api.Extensions._ import ai.chronon.api._ -import ai.chronon.api.planner.{MetaDataUtils, TableDependencies} +import ai.chronon.api.planner.{MetaDataUtils, NodeRunner, TableDependencies} import ai.chronon.online.KVStore.PutRequest import ai.chronon.planner.{ExternalSourceSensorNode, MonolithJoinNode, Node, NodeContent} import ai.chronon.spark.batch.BatchNodeRunner -import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import ai.chronon.spark.submission.SparkSessionBuilder import ai.chronon.spark.test.{MockKVStore, TableTestUtils} import ai.chronon.spark.utils.MockApi @@ -155,17 +154,27 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Insert test data for available partitions spark.sql( s""" - |INSERT INTO test_db.input_table VALUES - |(1, 'value1', '$yesterday'), - |(2, 'value2', '$twoDaysAgo') + |INSERT OVERWRITE test_db.input_table PARTITION (ds='$yesterday') VALUES + |(1, 'value1') + |""".stripMargin + ) + spark.sql( + s""" + |INSERT OVERWRITE test_db.input_table PARTITION (ds='$twoDaysAgo') VALUES + |(2, 'value2') |""".stripMargin ) spark.sql( s""" - |INSERT INTO test_db.left_table VALUES - |(1, '$yesterday'), - |(2, '$twoDaysAgo') + |INSERT OVERWRITE test_db.left_table PARTITION (ds='$yesterday') VALUES + |(1) + |""".stripMargin + ) + spark.sql( + s""" + |INSERT OVERWRITE test_db.left_table PARTITION (ds='$twoDaysAgo') VALUES + |(2) |""".stripMargin ) } @@ -210,8 +219,10 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before "BatchNodeRunner.runFromArgs" should "calculate input table partitions and write them to kvStore" in { val configPath = createTestConfigFile(twoDaysAgo, yesterday) + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset) + val result = runner.runFromArgs(mockApi, twoDaysAgo, yesterday, NodeRunner.DefaultTablePartitionsDataset) result match { case Success(_) => @@ -228,7 +239,7 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Verify dataset name assertTrue("Should use TABLE_PARTITIONS dataset", - mockKVStore.putRequests.forall(_.dataset == DefaultTablePartitionsDataset)) + mockKVStore.putRequests.forall(_.dataset == NodeRunner.DefaultTablePartitionsDataset)) case Failure(exception) => fail(s"runFromArgs should have succeeded but failed with: ${exception.getMessage}") @@ -238,8 +249,10 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before it should "short circuit and throw exception when missing partitions are present" in { val configPath = createTestConfigFile(twoDaysAgo, today) // today's partition doesn't exist + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, today, DefaultTablePartitionsDataset) + val result = runner.runFromArgs(mockApi, twoDaysAgo, today, NodeRunner.DefaultTablePartitionsDataset) result match { case Success(_) => @@ -264,15 +277,22 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Insert some data into output table to simulate successful execution spark.sql( s""" - |INSERT INTO test_db.output_table VALUES - |(1, 'output1', '$yesterday'), - |(2, 'output2', '$twoDaysAgo') + |INSERT OVERWRITE test_db.output_table PARTITION (ds='$yesterday') VALUES + |(1, 'output1') + |""".stripMargin + ) + spark.sql( + s""" + |INSERT OVERWRITE test_db.output_table PARTITION (ds='$twoDaysAgo') VALUES + |(2, 'output2') |""".stripMargin ) val configPath = createTestConfigFile(twoDaysAgo, yesterday) + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset) + val result = runner.runFromArgs(mockApi, twoDaysAgo, yesterday, NodeRunner.DefaultTablePartitionsDataset) result match { case Success(_) => @@ -299,9 +319,10 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val futureDate2 = tableUtils.partitionSpec.after(futureDate1) val configPath = createTestConfigFile(futureDate1, futureDate2) + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = - BatchNodeRunner.runFromArgs(mockApi, configPath, futureDate1, futureDate2, DefaultTablePartitionsDataset) + val result = runner.runFromArgs(mockApi, futureDate1, futureDate2, NodeRunner.DefaultTablePartitionsDataset) result match { case Success(_) => @@ -317,15 +338,15 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Add one more partition to test partial availability spark.sql( s""" - |INSERT INTO test_db.input_table VALUES - |(3, 'value3', '$today') + |INSERT OVERWRITE test_db.input_table PARTITION (ds='$today') VALUES + |(3, 'value3') |""".stripMargin ) spark.sql( s""" - |INSERT INTO test_db.left_table VALUES - |(3, '$today') + |INSERT OVERWRITE test_db.left_table PARTITION (ds='$today') VALUES + |(3) |""".stripMargin ) @@ -334,8 +355,10 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Request a range that includes both available and missing partitions val threeDaysAgo = tableUtils.partitionSpec.before(twoDaysAgo) val configPath = createTestConfigFile(threeDaysAgo, today) + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, threeDaysAgo, today, DefaultTablePartitionsDataset) + val result = runner.runFromArgs(mockApi, threeDaysAgo, today, NodeRunner.DefaultTablePartitionsDataset) result match { case Success(_) => @@ -393,17 +416,27 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before // Insert data with different partition format spark.sql( s""" - |INSERT INTO test_db.input_table_alt VALUES - |(1, 'value1', '$yesterdayAlt'), - |(2, 'value2', '$twoDaysAgoAlt') + |INSERT OVERWRITE test_db.input_table_alt PARTITION (partition_date='$yesterdayAlt') VALUES + |(1, 'value1') + |""".stripMargin + ) + spark.sql( + s""" + |INSERT OVERWRITE test_db.input_table_alt PARTITION (partition_date='$twoDaysAgoAlt') VALUES + |(2, 'value2') |""".stripMargin ) spark.sql( s""" - |INSERT INTO test_db.left_table_alt VALUES - |(1, '$yesterdayAlt'), - |(2, '$twoDaysAgoAlt') + |INSERT OVERWRITE test_db.left_table_alt PARTITION (partition_date='$yesterdayAlt') VALUES + |(1) + |""".stripMargin + ) + spark.sql( + s""" + |INSERT OVERWRITE test_db.left_table_alt PARTITION (partition_date='$twoDaysAgoAlt') VALUES + |(2) |""".stripMargin ) @@ -416,8 +449,10 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before partitionColumn = "partition_date", partitionFormat = "yyyyMMdd" ) + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.runFromArgs(mockApi, configPath, twoDaysAgo, yesterday, DefaultTablePartitionsDataset) + val result = runner.runFromArgs(mockApi, twoDaysAgo, yesterday, NodeRunner.DefaultTablePartitionsDataset) result match { case Success(_) => @@ -454,8 +489,11 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val metadata = createTestMetadata("test_db.input_table", "test_db.output_table") val range = PartitionRange(twoDaysAgo, yesterday)(tableUtils.partitionSpec) + val configPath = createTestConfigFile(twoDaysAgo, yesterday) + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.checkPartitions(sensorNode, metadata, tableUtils, range) + val result = runner.checkPartitions(sensorNode, metadata, range) result match { case Success(_) => @@ -473,8 +511,11 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val metadata = createTestMetadata("test_db.external_table", "test_db.output_table") val range = PartitionRange(today, today)(tableUtils.partitionSpec) // today's partition doesn't exist + val configPath = createTestConfigFile(today, today, "test_db.external_table") + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.checkPartitions(sensorNode, metadata, tableUtils, range) + val result = runner.checkPartitions(sensorNode, metadata, range) result match { case Success(_) => @@ -493,14 +534,17 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val metadata = createTestMetadata("test_db.external_table", "test_db.output_table") val range = PartitionRange(today, today)(tableUtils.partitionSpec) // today's partition doesn't exist + val configPath = createTestConfigFile(today, today, "test_db.external_table") + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.checkPartitions(sensorNode, metadata, tableUtils, range) + val result = runner.checkPartitions(sensorNode, metadata, range) result match { case Success(_) => fail("checkPartitions should have failed due to missing partitions") case Failure(exception) => - // Should fail immediately with default retry count of 0 + // Should fail with default retry count (3) and retry interval (3 minutes) assertTrue("Exception should mention missing partitions", exception.getMessage.contains("missing partitions")) } } @@ -513,9 +557,12 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val metadata = createTestMetadata("test_db.external_table", "test_db.output_table") val range = PartitionRange(today, today)(tableUtils.partitionSpec) // today's partition doesn't exist + val configPath = createTestConfigFile(today, today, "test_db.external_table") + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) val startTime = System.currentTimeMillis() - val result = BatchNodeRunner.checkPartitions(sensorNode, metadata, tableUtils, range) + val result = runner.checkPartitions(sensorNode, metadata, range) val endTime = System.currentTimeMillis() result match { @@ -536,8 +583,11 @@ class BatchNodeRunnerTest extends AnyFlatSpec with BeforeAndAfterAll with Before val metadata = createTestMetadata("test_db.nonexistent_table", "test_db.output_table") val range = PartitionRange(yesterday, yesterday)(tableUtils.partitionSpec) + val configPath = createTestConfigFile(yesterday, yesterday, "test_db.nonexistent_table") + val node = ThriftJsonCodec.fromJsonFile[Node](configPath, check = true) + val runner = new BatchNodeRunner(node, tableUtils) - val result = BatchNodeRunner.checkPartitions(sensorNode, metadata, tableUtils, range) + val result = runner.checkPartitions(sensorNode, metadata, range) result match { case Success(_) => diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala index 171fd0246e..756f4c0b45 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala @@ -5,16 +5,16 @@ import ai.chronon.api import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions.MapOps import ai.chronon.api._ +import ai.chronon.planner._ import ai.chronon.spark.Extensions._ import ai.chronon.spark.batch._ import ai.chronon.spark.submission.SparkSessionBuilder import ai.chronon.spark.test.{DataFrameGen, TableTestUtils} -import ai.chronon.spark.{GroupBy, Join, _} +import ai.chronon.spark.{Join, _} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, rand} import org.junit.Assert.assertEquals import org.scalatest.flatspec.AnyFlatSpec -import ai.chronon.planner.{JoinBootstrapNode, JoinDerivationNode, JoinMergeNode, JoinPartNode, SourceWithFilterNode} import org.slf4j.LoggerFactory class ShortNamesTest extends AnyFlatSpec {