diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala index 194388c148..b9746977fa 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala @@ -8,7 +8,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -case class GCS(project: String, sourceUri: String, fileFormat: String) extends Format { +case class GCS(sourceUri: String, fileFormat: String) extends Format { override def name: String = fileFormat diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala index 59a5856ecd..3abadff4b7 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala @@ -15,6 +15,8 @@ import com.google.cloud.bigquery.connector.common.BigQueryUtil import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId import org.apache.spark.sql.SparkSession +import scala.jdk.CollectionConverters._ + case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider { /** @@ -30,8 +32,8 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider override def resolveTableName(tableName: String): String = format(tableName) match { - case GCS(_, uri, _) => uri - case _ => tableName + case GCS(uri, _) => uri + case _ => tableName } override def readFormat(tableName: String): Format = format(tableName) @@ -66,11 +68,12 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider val uri = Option(externalTable.getHivePartitioningOptions) .map(_.getSourceUriPrefix) .getOrElse { - val uris = externalTable.getSourceUris + val uris = externalTable.getSourceUris.asScala require(uris.size == 1, s"External table ${table} can be backed by only one URI.") - uris.get(0).replaceAll("/\\*\\.parquet$", "") + uris.head.replaceAll("/\\*\\.parquet$", "") } - GCS(table.getTableId.getProject, uri, formatOptions.getType) + + GCS(uri, formatOptions.getType) case _: StandardTableDefinition => BigQueryFormat(table.getTableId.getProject, Map.empty) diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala index 45251bf256..c3ad4fd930 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GCSFormatTest.scala @@ -33,7 +33,7 @@ class GCSFormatTest extends AnyFlatSpec { val df = spark.createDataFrame(testData).toDF("ds", "first", "second") df.write.partitionBy("ds").format("parquet").mode(SaveMode.Overwrite).save(dir.getAbsolutePath) - val gcsFormat = GCS(project = "test-project", sourceUri = dir.getAbsolutePath, fileFormat = "parquet") + val gcsFormat = GCS(sourceUri = dir.getAbsolutePath, fileFormat = "parquet") val partitions = gcsFormat.partitions("unused_table")(spark) assertEquals(Set(Map("ds" -> "20241223"), Map("ds" -> "20241224"), Map("ds" -> "20241225")), partitions.toSet) @@ -53,7 +53,7 @@ class GCSFormatTest extends AnyFlatSpec { val df = spark.createDataFrame(testData).toDF("ds", "first", "second") df.write.format("parquet").mode(SaveMode.Overwrite).save(dir.getAbsolutePath) - val gcsFormat = GCS(project = "test-project", sourceUri = dir.getAbsolutePath, fileFormat = "parquet") + val gcsFormat = GCS(sourceUri = dir.getAbsolutePath, fileFormat = "parquet") val partitions = gcsFormat.partitions("unused_table")(spark) assertEquals(Set.empty, partitions.toSet) @@ -83,7 +83,7 @@ class GCSFormatTest extends AnyFlatSpec { .toDF("ds", "first", "second") .select(to_date(col("ds"), "yyyy-MM-dd").as("ds"), col("first"), col("second")) df.write.format("parquet").partitionBy("ds").mode(SaveMode.Overwrite).save(dir.getAbsolutePath) - val gcsFormat = GCS(project = "test-project", sourceUri = dir.getAbsolutePath, fileFormat = "parquet") + val gcsFormat = GCS(sourceUri = dir.getAbsolutePath, fileFormat = "parquet") val partitions = gcsFormat.partitions("unused_table")(spark) assertEquals(Set(Map("ds" -> "2024-12-23"), Map("ds" -> "2024-12-24"), Map("ds" -> "2024-12-25")), partitions.toSet) diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProviderTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProviderTest.scala index 8b3d9017c9..9ee38ccab2 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProviderTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProviderTest.scala @@ -34,7 +34,6 @@ class GcpFormatProviderTest extends AnyFlatSpec with MockitoSugar { val gcsFormat = gcpFormatProvider.getFormat(mockTable).asInstanceOf[GCS] assert(gcsFormat.sourceUri == tableName) - assert(gcsFormat.project == "project") assert(gcsFormat.fileFormat == "PARQUET") } }