diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala index cd3f7fbf3d..d5556d0599 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala @@ -12,7 +12,7 @@ case object BigQueryNative extends Format { private val bqFormat = classOf[Spark35BigQueryTableProvider].getName private lazy val bqOptions = BigQueryOptions.getDefaultInstance - private val internalBQCol = "__chronon_internal_bq_col__" + private val internalBQPartitionCol = "__chronon_internal_bq_partition_col__" // TODO(tchow): use the cache flag override def table(tableName: String, partitionFilters: String, cacheDf: Boolean = false)(implicit @@ -50,13 +50,14 @@ case object BigQueryNative extends Format { // Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter. val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters val partitionFormat = TableUtils(sparkSession).partitionFormat - val select = s"SELECT distinct(${partColName}) AS ${internalBQCol} FROM ${bqFriendlyName} ${partitionWheres}" + val select = + s"SELECT distinct(${partColName}) AS ${internalBQPartitionCol} FROM ${bqFriendlyName} ${partitionWheres}" val selectedParts = sparkSession.read .format(bqFormat) .option("viewsEnabled", true) .option("materializationDataset", bqTableId.getDataset) .load(select) - .select(date_format(col(internalBQCol), partitionFormat)) + .select(date_format(col(internalBQPartitionCol), partitionFormat)) .as[String] .collect .toList diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala index 317a74e3e9..a926139729 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala @@ -5,6 +5,7 @@ import com.google.cloud.bigquery.{ BigQueryOptions, ExternalTableDefinition, StandardTableDefinition, + ViewDefinition, TableDefinition, TableId } @@ -95,6 +96,9 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames .Option(bigQueryClient.getTable(tId)) .getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found.")) table.getDefinition.asInstanceOf[TableDefinition] match { + case view: ViewDefinition => { + connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable)) + } case externalTable: ExternalTableDefinition => { val uris = externalTable.getSourceUris.asScala val uri = scala