Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud_gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ shared_deps = [
maven_artifact("ch.qos.reload4j:reload4j"),
maven_artifact("org.threeten:threetenbp"),
maven_artifact("org.apache.kafka:kafka-clients"),
maven_artifact_with_suffix("org.apache.spark:spark-mllib"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait what!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sadly required by spark bigquery connector to map types accordingly.

maven_artifact("com.google.cloud.spark:spark-3.5-bigquery"),
maven_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5"),
maven_artifact("org.objenesis:objenesis"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package ai.chronon.integrations.cloud_gcp

import com.google.cloud.bigquery.{
import ai.chronon.api.Extensions.StringOps
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{
BigQuery,
BigQueryOptions,
ExternalTableDefinition,
FormatOptions,
Schema,
StandardTableDefinition,
TableDefinition,
TableId
TableId,
TableInfo
}
import com.google.cloud.spark.bigquery.BigQueryCatalog
import com.google.cloud.spark.bigquery.{BigQueryCatalog, SchemaConverters, SchemaConvertersConfiguration}
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
Expand All @@ -20,7 +25,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException

import java.util
import scala.jdk.CollectionConverters._
Expand All @@ -35,14 +39,15 @@ import scala.util.Try
*/
class DelegatingTable(internalTable: Table,
additionalProperties: Map[String, String],
schema: Option[StructType] = None,
partitioning: Option[Array[Transform]] = None)
extends Table
with SupportsRead
with SupportsWrite {

override def name(): String = internalTable.name

override def schema(): StructType = internalTable.schema
override def schema(): StructType = schema.getOrElse(internalTable.schema)

override def capabilities(): util.Set[TableCapability] = internalTable.capabilities()

Expand Down Expand Up @@ -136,11 +141,14 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
uris.head.replaceAll("/\\*\\.parquet$", "")
}

val schemaConverter = SchemaConverters.from(SchemaConvertersConfiguration.createDefault())
val sparkSchema = schemaConverter.toSpark(externalTable.getSchema)

val fileBasedTable = ParquetTable(tId.toString,
SparkSession.active,
CaseInsensitiveStringMap.empty(),
List(uri),
None,
Option(sparkSchema),
classOf[ParquetFileFormat])
DelegatingTable(fileBasedTable,
Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
Expand All @@ -167,10 +175,61 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
val provider = properties.get(TableCatalog.PROP_PROVIDER)
if (provider.toUpperCase != "ICEBERG") {
throw new UnsupportedOperationException("Only creating iceberg tables supported.")
provider.toUpperCase match {
case "ICEBERG" => icebergCatalog.createTable(ident, schema, partitions, properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make these constants somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, probably a good time now

case "PARQUET" => {

val rootLocation = properties.get(TableCatalog.PROP_LOCATION)

val fullTableName = SparkBQUtils.toTableString(ident)
val tableId = SparkBQUtils.toTableId(ident)
val uri = rootLocation.stripSuffix("/") + f"/${fullTableName.sanitize}"
val glob = uri.stripSuffix("/") + "/*.parquet"

val schemaConverter = SchemaConverters.from(SchemaConvertersConfiguration.createDefault())

val bqSchema: Schema = schemaConverter.toBigQuerySchema(schema)
val baseTableDef = ExternalTableDefinition
.newBuilder(glob, bqSchema, FormatOptions.parquet())
.setAutodetect(false)

if (partitions.nonEmpty) {
assert(
partitions.forall(_.name.equals("identity")),
s"Only identity partitioning is supported for external tables. Received unsupported partition spec: ${partitions}"
)
// Disable this as it requires data to already exist at the source URI. All we need to do is create the table. We don't need to use bigquery for anything else.
// val identityPartitions = partitions.map((t) => t.toString).toList
// val explicitPartitionedUri = uri + identityPartitions.map((p) => f"${p}")
// val hivePartitions = HivePartitioningOptions
// .newBuilder()
// .setFields(identityPartitions.asJava)
// .setSourceUriPrefix(uri)
// .setMode("CUSTOM")
// .build()
// baseTableDef.setHivePartitioningOptions(hivePartitions)
}

val shadedTableId = scala
.Option(tableId.getProject)
.map(TableId.of(_, tableId.getDataset, tableId.getTable))
.getOrElse(TableId.of(tableId.getDataset, tableId.getTable))

val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()

// Create this out of band, and then pass a parquet table back to spark.
bigQueryClient.create(tableInfo)

val fileBasedTable = ParquetTable(tableId.toString,
SparkSession.active,
CaseInsensitiveStringMap.empty(),
List(uri),
Option(schema),
classOf[ParquetFileFormat])
DelegatingTable(fileBasedTable, Map(TableCatalog.PROP_EXTERNAL -> "true", TableCatalog.PROP_LOCATION -> uri))
}
case unsupported => throw new UnsupportedOperationException(s"Unsupported format: ${unsupported}")
}
icebergCatalog.createTable(ident, schema, partitions, properties)
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@ package ai.chronon.integrations.cloud_gcp
import com.google.cloud.bigquery.connector.common.BigQueryUtil
import org.apache.spark.sql.SparkSession
import com.google.cloud.bigquery.TableId
import org.apache.spark.sql.connector.catalog.Identifier

object SparkBQUtils {

def toTableString(ident: Identifier): String = {
(ident.namespace() :+ ident.name).mkString(".")
}
def toTableId(tableName: String)(implicit spark: SparkSession): TableId = {
val parseIdentifier = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName)
val shadedTid = BigQueryUtil.parseTableId(parseIdentifier.mkString("."))
parseBQIdentifier(parseIdentifier.mkString("."))
}

def toTableId(tableIdentifier: Identifier): TableId = {
val stringified = (tableIdentifier.namespace :+ tableIdentifier.name).mkString(".")
parseBQIdentifier(stringified)
}

private def parseBQIdentifier(parsedIdentifier: String) = {
val shadedTid = BigQueryUtil.parseTableId(parsedIdentifier)
scala
.Option(shadedTid.getProject)
.map(TableId.of(_, shadedTid.getDataset, shadedTid.getTable))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
// "spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString,
//
// "spark.sql.catalog.default_bigquery" -> classOf[BigQueryCatalog].getName,
// "spark.chronon.table_write.format" -> "PARQUET",
// "spark.chronon.table_write.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
))
)
lazy val tableUtils: TableUtils = TableUtils(spark)
Expand Down Expand Up @@ -113,6 +115,16 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
println(allParts)
}

it should "create external parquet table" ignore {
val externalTable = "default_iceberg.data.tchow_external_parquet"

val testDf = spark.createDataFrame(Seq((1, "2021-01-01"))).toDF("id", "ds")
val table = tableUtils.createTable(testDf, externalTable, List("ds"), Map.empty[String, String], "PARQUET")
tableUtils.insertPartitions(testDf, externalTable, Map.empty[String, String], List("ds"))
val roundTripped = tableUtils.loadTable(externalTable)
println(roundTripped)
}

it should "integration testing bigquery partitions" ignore {
// TODO(tchow): This test is ignored because it requires a running instance of the bigquery. Need to figure out stubbing locally.
// to run, set `GOOGLE_APPLICATION_CREDENTIALS=<path_to_application_default_credentials.json>
Expand Down
Loading
Loading