Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.api.Builders.MetaData
import ai.chronon.spark.submission.{JobSubmitter, JobType, FlinkJob => TypeFlinkJob, SparkJob => TypeSparkJob}
import ai.chronon.spark.submission.JobSubmitterConstants._
import ai.chronon.spark.submission.{JobSubmitter, JobType, FlinkJob => TypeFlinkJob, SparkJob => TypeSparkJob}
import com.google.api.gax.rpc.ApiException
import com.google.cloud.dataproc.v1._
import com.google.protobuf.util.JsonFormat
Expand Down
8 changes: 4 additions & 4 deletions spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api._
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.online.OnlineDerivationUtil.timeFields
import ai.chronon.online._
import ai.chronon.online.OnlineDerivationUtil.timeFields
import ai.chronon.online.metrics._
import ai.chronon.online.serde._
import ai.chronon.spark.Extensions.{StructTypeOps, _}
import ai.chronon.spark.catalog.TableUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.{Logger, LoggerFactory}

import java.util.Base64
import scala.collection.{Seq, mutable}
import scala.collection.{mutable, Seq}
import scala.util.{Failure, Success, Try}

/** Purpose of LogFlattenerJob is to unpack serialized Avro data from online requests and flatten each field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ class BatchNodeRunner(node: Node, tableUtils: TableUtils) extends NodeRunner {
logger.info(s"Running staging query for '${metaData.name}'")
val stagingQueryConf = stagingQuery.stagingQuery
val sq = new StagingQuery(stagingQueryConf, range.end, tableUtils)
sq.computeStagingQuery(
stepDays = Option(metaData.executionInfo.stepDays),
enableAutoExpand = Some(true),
overrideStartPartition = Option(range.start),
forceOverwrite = true
sq.compute(
range,
Option(stagingQuery.stagingQuery.setups).map(_.asScala).getOrElse(Seq.empty),
Option(true)
)

logger.info(s"Successfully completed staging query for '${metaData.name}'")
Expand Down
21 changes: 14 additions & 7 deletions spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab

private val partitionCols: Seq[String] =
Seq(tableUtils.partitionColumn) ++
(Option(stagingQueryConf.metaData.additionalOutputPartitionColumns.toScala)
(Option(stagingQueryConf.metaData.additionalOutputPartitionColumns)
.map(_.toScala)
.getOrElse(Seq.empty))

def computeStagingQuery(stepDays: Option[Int] = None,
Expand Down Expand Up @@ -69,12 +70,7 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
stepRanges.zipWithIndex.foreach { case (range, index) =>
val progress = s"| [${index + 1}/${stepRanges.size}]"
logger.info(s"Computing staging query for range: $range $progress")
val renderedQuery =
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
val df = tableUtils.sql(renderedQuery)
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
logger.info(s"Wrote to table $outputTable, into partitions: $range $progress")
compute(range, Seq.empty[String], enableAutoExpand)
}
logger.info(s"Finished writing Staging Query data to $outputTable")
} catch {
Expand All @@ -94,6 +90,17 @@ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tab
}
}
}

def compute(range: PartitionRange, setups: Seq[String], enableAutoExpand: Option[Boolean]): Unit = {
Option(setups).foreach(_.foreach(tableUtils.sql))
val renderedQuery =
StagingQuery.substitute(tableUtils, stagingQueryConf.query, range.start, range.end, endPartition)
logger.info(s"Rendered Staging Query to run is:\n$renderedQuery")
val df = tableUtils.sql(renderedQuery)
df.save(outputTable, tableProps, partitionCols, autoExpand = enableAutoExpand.get)
logger.info(s"Wrote to table $outputTable, into partitions: $range")
logger.info(s"Finished writing Staging Query data to $outputTable")
}
}

class Args(args: Seq[String]) extends ScallopConf(args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,16 @@ package ai.chronon.spark.test.fetcher

import ai.chronon.aggregator.test.Column
import ai.chronon.api
import ai.chronon.api.Builders.Derivation
import ai.chronon.api.Extensions._
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.{JoinOps, MetadataOps}
import ai.chronon.api.{
Accuracy,
BooleanType,
Builders,
DoubleType,
IntType,
ListType,
LongType,
Operation,
StringType,
StructField,
StructType,
TimeUnit,
TsUtils,
Window,
Constants
}
import ai.chronon.api.Builders.Derivation
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online._
import ai.chronon.online.{fetcher, _}
import ai.chronon.api._
import ai.chronon.online.fetcher.FetchContext
import ai.chronon.spark.Extensions._
import ai.chronon.online.fetcher.Fetcher.{Request, Response}
import ai.chronon.online.serde.{SparkConversions, _}
import ai.chronon.online.serde.SparkConversions
import ai.chronon.online._
import ai.chronon.spark.Extensions._
import ai.chronon.spark.catalog.TableUtils
import ai.chronon.spark.stats.ConsistencyJob
import ai.chronon.spark.test.{DataFrameGen, OnlineUtils, SchemaEvolutionUtils}
Expand All @@ -39,7 +22,6 @@ import org.apache.spark.sql.functions.{avg, col, lit}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.slf4j.{Logger, LoggerFactory}

import java.util.TimeZone
import java.util.concurrent.Executors
import java.{lang, util}
import scala.collection.Seq
Expand Down