Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,21 @@ import com.google.cloud.bigquery.{
TableId
}
import com.google.cloud.spark.bigquery.BigQueryCatalog
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog

import java.util
import scala.jdk.CollectionConverters._
import scala.util.Try

/** A table that delegates all operations to an internal table, but with additional properties.
* This is mostly for enriching SparkTables with metadata that cannot be accessed by spark directly.
* For example, we can use a bigquery client to fetch table metadata / properties and then hydrate the Spark table
* with that information, before we pass it back to the Spark compute engine.
*
* Down the line, we could also support custom partition management.
*/
class DelegatingTable(internalTable: Table,
additionalProperties: Map[String, String],
partitioning: Option[Array[Transform]] = None)
extends Table
with SupportsRead
with SupportsWrite {

override def name(): String = internalTable.name

override def schema(): StructType = internalTable.schema

override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)

override def properties(): util.Map[String, String] =
(internalTable.properties().asScala ++ additionalProperties).asJava

override def partitioning(): Array[Transform] = partitioning.getOrElse(internalTable.partitioning())

}

object DelegatingTable {
def apply(table: Table, additionalProperties: Map[String, String] = Map.empty): Table =
new DelegatingTable(table, additionalProperties = additionalProperties)
}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import scala.util.{Failure, Success, Try}

/** Galactus catalog that allows us to interact with BigQuery metastore as a spark catalog. This allows for
* querying of a variety of table types directly in spark sql or the dataframe api.
Expand Down Expand Up @@ -118,13 +78,10 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames

override def loadTable(identNoCatalog: Identifier): Table = {
Try {
val icebergSparkTable = icebergCatalog.loadTable(identNoCatalog)
DelegatingTable(icebergSparkTable,
additionalProperties =
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "ICEBERG"))
icebergCatalog.loadTable(identNoCatalog)
}
.recover {
case _ => {
case noIcebergTableEx: NoSuchTableException => {
val project =
catalogProps.getOrElse(BigQueryMetastoreCatalog.PROPERTIES_KEY_GCP_PROJECT, bqOptions.getProjectId)
val tId = identNoCatalog.namespace().toList match {
Expand All @@ -134,7 +91,9 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
throw new IllegalArgumentException(
s"Table identifier namespace ${identNoCatalog} must have at least one part.")
}
val table = bigQueryClient.getTable(tId)
val table = scala
.Option(bigQueryClient.getTable(tId))
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found."))
table.getDefinition.asInstanceOf[TableDefinition] match {
case externalTable: ExternalTableDefinition => {
val uris = externalTable.getSourceUris.asScala
Expand All @@ -146,33 +105,36 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
uris.head.replaceAll("/\\*\\.parquet$", "")
}

val fileBasedTable = ParquetTable(tId.toString,
SparkSession.active,
CaseInsensitiveStringMap.empty(),
List(uri),
None,
classOf[ParquetFileFormat])
DelegatingTable(fileBasedTable,
Map(TableCatalog.PROP_EXTERNAL -> "true",
TableCatalog.PROP_LOCATION -> uri,
TableCatalog.PROP_PROVIDER -> "PARQUET"))
val fileBasedTable = ParquetTable(
tId.toString,
SparkSession.active,
new CaseInsensitiveStringMap(
Map(TableCatalog.PROP_EXTERNAL -> "true",
TableCatalog.PROP_LOCATION -> uri,
TableCatalog.PROP_PROVIDER -> "PARQUET").asJava),
List(uri),
None,
classOf[ParquetFileFormat]
)
fileBasedTable
}
case _: StandardTableDefinition => {
//todo(tchow): Support partitioning

// Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1340
val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
// ideally it should be the below:
// val connectorTable = connectorCatalog.loadTable(ident)
DelegatingTable(connectorTable,
Map(TableCatalog.PROP_EXTERNAL -> "false", TableCatalog.PROP_PROVIDER -> "BIGQUERY"))
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
}
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
}
}
}
.getOrElse(throw new NoSuchTableException(f"Table: ${identNoCatalog} not found in bigquery catalog."))
case other: Throwable => throw other
} match {
case Success(table) => table
case Failure(exception) => throw exception
}
}

override def createTable(ident: Identifier,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.format.{DefaultFormatProvider, Format, Iceberg}
import com.google.cloud.bigquery._
import com.google.cloud.spark.bigquery.v2.Spark31BigQueryTable
import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable

import scala.jdk.CollectionConverters._
import scala.util.Try
import scala.util.{Failure, Success, Try}

class GcpFormatProvider(override val sparkSession: SparkSession) extends DefaultFormatProvider(sparkSession) {

Expand All @@ -26,18 +28,17 @@ class GcpFormatProvider(override val sparkSession: SparkSession) extends Default
cat match {
case delegating: DelegatingBigQueryMetastoreCatalog =>
Try {
delegating
.loadTable(identifier)
.properties
.asScala
.getOrElse(TableCatalog.PROP_PROVIDER, "")
.toUpperCase match {
case "ICEBERG" => Iceberg
case "BIGQUERY" => BigQueryNative
case "PARQUET" => BigQueryExternal
val tbl = delegating.loadTable(identifier)
tbl match {
case iceberg: SparkTable => Iceberg
case bigquery: Spark31BigQueryTable => BigQueryNative
case parquet: ParquetTable => BigQueryExternal
case unsupported => throw new IllegalStateException(s"Unsupported provider type: ${unsupported}")
}
}.toOption
} match {
case s @ Success(_) => s.toOption
case Failure(exception) => throw exception
}
case iceberg: SparkCatalog if (iceberg.icebergCatalog().isInstanceOf[BigQueryMetastoreCatalog]) =>
scala.Option(Iceberg)
case _ => super.readFormat(tableName)
Expand Down