diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index b38782feb2..8e75e4136a 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -1,7 +1,7 @@ package ai.chronon.integrations.cloud_gcp import ai.chronon.api.Builders.MetaData -import ai.chronon.spark.submission.{JobSubmitter, JobType, FlinkJob => TypeFlinkJob, SparkJob => TypeSparkJob} import ai.chronon.spark.submission.JobSubmitterConstants._ +import ai.chronon.spark.submission.{JobSubmitter, JobType, FlinkJob => TypeFlinkJob, SparkJob => TypeSparkJob} import com.google.api.gax.rpc.ApiException import com.google.cloud.dataproc.v1._ import com.google.protobuf.util.JsonFormat diff --git a/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala b/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala index 4d148548d8..2d5b013dbd 100644 --- a/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala @@ -17,23 +17,23 @@ package ai.chronon.spark import ai.chronon.api +import ai.chronon.api._ import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions._ -import ai.chronon.api._ -import ai.chronon.online.OnlineDerivationUtil.timeFields import ai.chronon.online._ +import ai.chronon.online.OnlineDerivationUtil.timeFields import ai.chronon.online.metrics._ import ai.chronon.online.serde._ import ai.chronon.spark.Extensions.{StructTypeOps, _} import ai.chronon.spark.catalog.TableUtils import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.slf4j.{Logger, LoggerFactory} import java.util.Base64 -import scala.collection.{Seq, mutable} +import scala.collection.{mutable, Seq} import scala.util.{Failure, Success, Try} /** Purpose of LogFlattenerJob is to unpack serialized Avro data from online requests and flatten each field diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 5c3e859dcf..b72afbedda 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -88,11 +88,10 @@ class BatchNodeRunner(node: Node, tableUtils: TableUtils) extends NodeRunner { logger.info(s"Running staging query for '${metaData.name}'") val stagingQueryConf = stagingQuery.stagingQuery val sq = new StagingQuery(stagingQueryConf, range.end, tableUtils) - sq.computeStagingQuery( - stepDays = Option(metaData.executionInfo.stepDays), - enableAutoExpand = Some(true), - overrideStartPartition = Option(range.start), - forceOverwrite = true + sq.compute( + range, + Option(stagingQuery.stagingQuery.setups).map(_.asScala).getOrElse(Seq.empty), + Option(true) ) logger.info(s"Successfully completed staging query for '${metaData.name}'") diff --git a/spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala b/spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala index 3486457aee..84b0a0341e 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala @@ -23,7 +23,8 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab private val partitionCols: Seq[String] = Seq(tableUtils.partitionColumn) ++ - (Option(stagingQueryConf.metaData.additionalOutputPartitionColumns.toScala) + (Option(stagingQueryConf.metaData.additionalOutputPartitionColumns) + .map(_.toScala) .getOrElse(Seq.empty)) def computeStagingQuery(stepDays: Option[Int] = None, @@ -69,12 +70,7 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab stepRanges.zipWithIndex.foreach { case (range, index) => val progress = s"| [${index + 1}/${stepRanges.size}]" logger.info(s"Computing staging query for range: $range $progress") - val renderedQuery = - StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition) - logger.info(s"Rendered Staging Query to run is:\n$renderedQuery") - val df = tableUtils.sql(renderedQuery) - df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get) - logger.info(s"Wrote to table $outputTable, into partitions: $range $progress") + compute(range, Seq.empty[String], enableAutoExpand) } logger.info(s"Finished writing Staging Query data to $outputTable") } catch { @@ -94,6 +90,17 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab } } } + + def compute(range: PartitionRange, setups: Seq[String], enableAutoExpand: Option[Boolean]): Unit = { + Option(setups).foreach(_.foreach(tableUtils.sql)) + val renderedQuery = + StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition) + logger.info(s"Rendered Staging Query to run is:\n$renderedQuery") + val df = tableUtils.sql(renderedQuery) + df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get) + logger.info(s"Wrote to table $outputTable, into partitions: $range") + logger.info(s"Finished writing Staging Query data to $outputTable") + } } class Args(args: Seq[String]) extends ScallopConf(args) { diff --git a/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala b/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala index c4caa9c060..72703c65c7 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala @@ -2,33 +2,16 @@ package ai.chronon.spark.test.fetcher import ai.chronon.aggregator.test.Column import ai.chronon.api -import ai.chronon.api.Builders.Derivation +import ai.chronon.api.Extensions._ import ai.chronon.api.Constants.MetadataDataset -import ai.chronon.api.Extensions.{JoinOps, MetadataOps} -import ai.chronon.api.{ - Accuracy, - BooleanType, - Builders, - DoubleType, - IntType, - ListType, - LongType, - Operation, - StringType, - StructField, - StructType, - TimeUnit, - TsUtils, - Window, - Constants -} +import ai.chronon.api.Builders.Derivation import ai.chronon.api.ScalaJavaConversions._ -import ai.chronon.online._ -import ai.chronon.online.{fetcher, _} +import ai.chronon.api._ import ai.chronon.online.fetcher.FetchContext -import ai.chronon.spark.Extensions._ import ai.chronon.online.fetcher.Fetcher.{Request, Response} -import ai.chronon.online.serde.{SparkConversions, _} +import ai.chronon.online.serde.SparkConversions +import ai.chronon.online._ +import ai.chronon.spark.Extensions._ import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.stats.ConsistencyJob import ai.chronon.spark.test.{DataFrameGen, OnlineUtils, SchemaEvolutionUtils} @@ -39,7 +22,6 @@ import org.apache.spark.sql.functions.{avg, col, lit} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.slf4j.{Logger, LoggerFactory} -import java.util.TimeZone import java.util.concurrent.Executors import java.{lang, util} import scala.collection.Seq