-
Notifications
You must be signed in to change notification settings - Fork 86
Iceberg unit tests, support Iceberg + nonhive catalogs, Iceberg Kryo Serializer #993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
00d4da8
93b7c92
2d4ed6f
878f64a
4739e93
65c81a1
cab2c6a
f4bfb70
ac505d3
ae2088f
dee01ab
69cd50d
5526abe
2b9c246
b423f4b
9e5eaae
558adc3
c1eb8a2
510266c
58a58ff
6ea4121
cb012a6
117c4f7
7d3272f
f009f97
02c1463
b05ef1a
059e16e
907d2f8
0065406
a956c7f
e980fa1
4054892
2ffd32d
50445f0
589eba9
8cfd661
c72b74f
d13dc5e
bd46d01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import org.apache.spark.SPARK_VERSION | |
| import java.io.File | ||
| import java.util.logging.Logger | ||
| import scala.util.Properties | ||
| import java.util.UUID | ||
|
|
||
| object SparkSessionBuilder { | ||
| @transient private lazy val logger = LoggerFactory.getLogger(getClass) | ||
|
|
@@ -39,6 +40,8 @@ object SparkSessionBuilder { | |
| additionalConfig: Option[Map[String, String]] = None, | ||
| enforceKryoSerializer: Boolean = true): SparkSession = { | ||
|
|
||
| val userName = Properties.userName | ||
| val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) | ||
| // allow us to override the format by specifying env vars. This allows us to not have to worry about interference | ||
| // between Spark sessions created in existing chronon tests that need the hive format and some specific tests | ||
| // that require a format override like delta lake. | ||
|
|
@@ -50,6 +53,19 @@ object SparkSessionBuilder { | |
| "spark.chronon.table_write.format" -> "delta" | ||
| ) | ||
| (configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") | ||
| (configMap, "ai.chronon.spark.ChrononKryoRegistrator") | ||
|
||
| case Some("iceberg") => | ||
| val configMap = Map( | ||
| "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", | ||
| "spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog", | ||
| "spark.chronon.table_write.format" -> "iceberg", | ||
| "spark.chronon.table_read.format" -> "iceberg", | ||
| "spark.sql.catalog.local" -> "org.apache.iceberg.spark.SparkCatalog", | ||
| "spark.sql.catalog.spark_catalog.type" -> "hadoop", | ||
| "spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data" | ||
| ) | ||
| // TODO add an iceberg kryo registrator | ||
| (configMap, "ai.chronon.spark.ChrononIcebergKryoRegistrator") | ||
| case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") | ||
| } | ||
|
|
||
|
|
@@ -60,8 +76,7 @@ object SparkSessionBuilder { | |
| //required to run spark locally with hive support enabled - for sbt test | ||
| System.setSecurityManager(null) | ||
| } | ||
| val userName = Properties.userName | ||
| val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) | ||
|
|
||
| var baseBuilder = SparkSession | ||
| .builder() | ||
| .appName(name) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -239,9 +239,15 @@ case object Iceberg extends Format { | |
| override def partitions(tableName: String, partitionColumns: Seq[String])(implicit | ||
| sparkSession: SparkSession): Seq[Map[String, String]] = { | ||
| sparkSession.sqlContext | ||
| .sql(s"SHOW PARTITIONS $tableName") | ||
| .sql(s"SELECT partition FROM $tableName" ++ ".partitions") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooc does this work for regular hive tables? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for iceberg, Hive support is here
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should work for hive tables, and the internal target I'm hitting are more or less "regular hive tables". Iceberg abstracts itself from the catalog implementation, so as long as your iceberg has an interface to your catalog implementation, it will work. |
||
| .collect() | ||
| .map(row => parseHivePartition(row.getString(0))) | ||
| .map { row => | ||
| val partitionStruct = row.getStruct(0) | ||
| partitionStruct.schema.fieldNames.zipWithIndex.map { | ||
| case (fieldName, idx) => | ||
| fieldName -> partitionStruct.get(idx).toString | ||
| }.toMap | ||
| } | ||
| } | ||
|
|
||
| private def getIcebergPartitions(tableName: String, | ||
|
|
@@ -395,7 +401,14 @@ case class TableUtils(sparkSession: SparkSession) { | |
| rdd | ||
| } | ||
|
|
||
| def tableExists(tableName: String): Boolean = sparkSession.catalog.tableExists(tableName) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious, does the old method not work for iceberg?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question! It does work for non Iceberg tables IF your underlying catalog supports this operation. This line of code is querying the catalog directly, but the more idiomatic thing to do with Iceberg is to use its built in partition APIs, which will be agnostic to your underlying catalog https://iceberg.apache.org/docs/latest/spark-queries/#spark-queries (note here that this also documents the point I made that Iceberg doesn't support DSv1)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sure most setups use a catalog that works with DSv1, but ours does not. I have read that there's better pushdown in V2 sources but I can't really be a good source for that benchmark considering my setup doesn't work with V1
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, this should be fine and they are equivalent anyways. |
||
| def tableExists(tableName: String): Boolean = { | ||
| try { | ||
| sparkSession.sql(s"DESCRIBE TABLE $tableName") | ||
| true | ||
| } catch { | ||
| case _: AnalysisException => false | ||
| } | ||
| } | ||
|
|
||
| def loadEntireTable(tableName: String): DataFrame = sparkSession.table(tableName) | ||
|
|
||
|
|
@@ -973,17 +986,38 @@ case class TableUtils(sparkSession: SparkSession) { | |
| partitions: Seq[String], | ||
| partitionColumn: String = partitionColumn, | ||
| subPartitionFilters: Map[String, String] = Map.empty): Unit = { | ||
| // TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you explain more on the dsv1 and dsv2? Do you have a pointer?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure! It's largely historical, but the tl;dr was that at some point the way you build a datasource connector was redone to support more sink formats with better performance. This is a good article: https://blog.madhukaraphatak.com/spark-datasource-v2-part-1 and this pdf: https://issues.apache.org/jira/browse/SPARK-15689. The most notable reason this is coming up is that Iceberg is not integrated with DataSourceV1. |
||
| // notably, the unit test iceberg integration uses hadoop because of | ||
| // https://github.com/apache/iceberg/issues/7847 | ||
| if (partitions.nonEmpty && tableExists(tableName)) { | ||
| val partitionSpecs = partitions | ||
| .map { partition => | ||
| val mainSpec = s"$partitionColumn='$partition'" | ||
| val specs = mainSpec +: subPartitionFilters.map { | ||
| case (key, value) => s"${key}='${value}'" | ||
| }.toSeq | ||
| specs.mkString("PARTITION (", ",", ")") | ||
| } | ||
| .mkString(",") | ||
| val dropSql = s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs" | ||
| val dropSql = tableFormatProvider.readFormat(tableName) match { | ||
| // really this is Dsv1 vs Dsv2, not hive vs iceberg, | ||
| // but we break this way since only Iceberg is migrated to Dsv2 | ||
| case Iceberg => | ||
| // Build WHERE clause: (ds='2024-05-01' OR ds='2024-05-02') [AND k='v' AND …] | ||
| val mainPred = partitions | ||
| .map(p => s"$partitionColumn='${p}'") | ||
| .mkString("(", " OR ", ")") | ||
|
|
||
| val extraPred = subPartitionFilters | ||
| .map { case (k, v) => s"$k='${v}'" } | ||
| .mkString(" AND ") | ||
|
|
||
| val where = Seq(mainPred, extraPred).filter(_.nonEmpty).mkString(" AND ") | ||
|
|
||
| s"DELETE FROM $tableName WHERE $where" | ||
| case _ => | ||
abbywh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val partitionSpecs = partitions | ||
| .map { partition => | ||
| val mainSpec = s"$partitionColumn='$partition'" | ||
| val specs = mainSpec +: subPartitionFilters.map { | ||
| case (key, value) => s"${key}='${value}'" | ||
| }.toSeq | ||
| specs.mkString("PARTITION (", ",", ")") | ||
| } | ||
| .mkString(",") | ||
| s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs" | ||
| } | ||
| sql(dropSql) | ||
| } else { | ||
| logger.info(s"$tableName doesn't exist, please double check before drop partitions") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is working with metals relative to intellij? does the debugger work as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really good actually. The debugged worked out of the box, I found it comparable to IntelliJ overall.
I'd recommend it to anyone who has remote dev boxes since VSCode's integration is far better in my experience. All the tests run a lot faster and I got in way more dev cycles. I probably would only recommend over IntelliJ with a dev box though.