Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.Constants
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.{GroupByOps, JoinPartOps, MetadataOps, SourceOps}
import ai.chronon.api.planner.RelevantLeftForJoinPart
Expand All @@ -27,30 +28,39 @@ import ai.chronon.online.{Api, MetadataDirWalker, MetadataEndPoint, TopicChecker
import ai.chronon.orchestration.{JoinMergeNode, JoinPartNode}
import ai.chronon.spark.batch._
import ai.chronon.spark.format.Format
import ai.chronon.spark.stats.drift.{Summarizer, SummaryPacker, SummaryUploader}
import ai.chronon.spark.stats.{CompareBaseJob, CompareJob, ConsistencyJob}
import ai.chronon.spark.stats.CompareBaseJob
import ai.chronon.spark.stats.CompareJob
import ai.chronon.spark.stats.ConsistencyJob
import ai.chronon.spark.stats.drift.Summarizer
import ai.chronon.spark.stats.drift.SummaryPacker
import ai.chronon.spark.stats.drift.SummaryUploader
import ai.chronon.spark.streaming.JoinSourceRunner
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkFiles
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.{
QueryProgressEvent,
QueryStartedEvent,
QueryTerminatedEvent
}
import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSessionExtensions
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand}
import org.slf4j.{Logger, LoggerFactory}
import org.rogach.scallop.ScallopConf
import org.rogach.scallop.ScallopOption
import org.rogach.scallop.Subcommand
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.yaml.snakeyaml.Yaml

import java.io.File
import java.nio.file.{Files, Paths}
import java.nio.file.Files
import java.nio.file.Paths
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.concurrent.Await
import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.reflect.internal.util.ScalaClassLoader

Expand Down Expand Up @@ -990,7 +1000,10 @@ object Driver {
val partitionNames = args.partitionNames()
val tablesToPartitionSpec = partitionNames.map((p) =>
p.split("/").toList match {
case fullTableName :: partitionSpec :: Nil => (fullTableName, Format.parseHiveStylePartition(partitionSpec))
case fullTableName :: partitionParts if partitionParts.nonEmpty =>
// Join all partition parts with "/" and parse as one combined partition spec.
val partitionSpec = partitionParts.mkString("/")
(fullTableName, Format.parseHiveStylePartition(partitionSpec))
case fullTableName :: Nil =>
throw new IllegalArgumentException(
s"A partition spec must be specified for ${fullTableName}. ${helpNamingConvention}")
Expand Down