Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,35 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.{Constants, DateRange, RelevantLeftForJoinPart, ThriftJsonCodec}
import ai.chronon.api.Constants
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.{GroupByOps, JoinPartOps, MetadataOps, SourceOps}
import ai.chronon.api.DateRange
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.JoinPartOps
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.api.Extensions.SourceOps
import ai.chronon.api.RelevantLeftForJoinPart
import ai.chronon.api.ThriftJsonCodec
import ai.chronon.api.thrift.TBase
import ai.chronon.spark.batch._
import ai.chronon.online.Api
import ai.chronon.online.MetadataDirWalker
import ai.chronon.online.MetadataEndPoint
import ai.chronon.online.TopicChecker
import ai.chronon.online.fetcher.{ConfPathOrName, FetchContext, FetcherMain, MetadataStore}
import ai.chronon.orchestration.{JoinMergeNode, JoinPartNode}
import ai.chronon.online.fetcher.ConfPathOrName
import ai.chronon.online.fetcher.FetchContext
import ai.chronon.online.fetcher.FetcherMain
import ai.chronon.online.fetcher.MetadataStore
import ai.chronon.orchestration.JoinMergeNode
import ai.chronon.orchestration.JoinPartNode
import ai.chronon.spark.batch._
import ai.chronon.spark.format.Format
import ai.chronon.spark.stats.CompareBaseJob
import ai.chronon.spark.stats.CompareJob
import ai.chronon.spark.stats.ConsistencyJob
import ai.chronon.spark.stats.drift.Summarizer
import ai.chronon.spark.stats.drift.SummaryPacker
import ai.chronon.spark.stats.drift.SummaryUploader
import ai.chronon.spark.streaming.JoinSourceRunner
import ai.chronon.spark.format.Format
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkFiles
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -998,7 +1008,10 @@ object Driver {
val partitionNames = args.partitionNames()
val tablesToPartitionSpec = partitionNames.map((p) =>
p.split("/").toList match {
case fullTableName :: partitionSpec :: Nil => (fullTableName, Format.parseHiveStylePartition(partitionSpec))
case fullTableName :: partitionParts if partitionParts.nonEmpty =>
// Join all partition parts with "/" and parse as one combined partition spec
val partitionSpec = partitionParts.mkString("/")
(fullTableName, Format.parseHiveStylePartition(partitionSpec))
case fullTableName :: Nil =>
throw new IllegalArgumentException(
s"A partition spec must be specified for ${fullTableName}. ${helpNamingConvention}")
Expand Down