-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Use snapshot schema when reading snapshot #3722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,13 @@ | |
|
|
||
| package org.apache.iceberg.spark.source; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.spark.PathIdentifier; | ||
| import org.apache.iceberg.spark.Spark3Util; | ||
| import org.apache.iceberg.spark.SparkReadOptions; | ||
| import org.apache.iceberg.spark.SparkSessionCatalog; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; | ||
|
|
@@ -56,6 +58,8 @@ | |
| public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions { | ||
| private static final String DEFAULT_CATALOG_NAME = "default_iceberg"; | ||
| private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME; | ||
| private static final String AT_TIMESTAMP = "at_timestamp_"; | ||
| private static final String SNAPSHOT_ID = "snapshot_id_"; | ||
|
|
||
| @Override | ||
| public String shortName() { | ||
|
|
@@ -101,24 +105,52 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri | |
| SparkSession spark = SparkSession.active(); | ||
| setupDefaultSparkCatalog(spark); | ||
| String path = options.get("path"); | ||
|
|
||
| Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); | ||
| Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); | ||
| Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, | ||
| "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp); | ||
|
|
||
| String selector = null; | ||
|
|
||
| if (snapshotId != null) { | ||
| selector = SNAPSHOT_ID + snapshotId; | ||
| } | ||
|
|
||
| if (asOfTimestamp != null) { | ||
| selector = AT_TIMESTAMP + asOfTimestamp; | ||
| } | ||
|
|
||
| CatalogManager catalogManager = spark.sessionState().catalogManager(); | ||
|
|
||
| if (path.contains("/")) { | ||
| // contains a path. Return iceberg default catalog and a PathIdentifier | ||
| String newPath = (selector == null) ? path : path + "#" + selector; | ||
| return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), | ||
| new PathIdentifier(path)); | ||
| new PathIdentifier(newPath)); | ||
| } | ||
|
|
||
| final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier( | ||
| "path or identifier", spark, path); | ||
|
|
||
| Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector); | ||
| if (catalogAndIdentifier.catalog().name().equals("spark_catalog") && | ||
| !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) { | ||
| // catalog is a session catalog but does not support Iceberg. Use Iceberg instead. | ||
| return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), | ||
| catalogAndIdentifier.identifier()); | ||
| return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), ident); | ||
| } else { | ||
| return catalogAndIdentifier; | ||
| return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), ident); | ||
| } | ||
| } | ||
|
|
||
| private Identifier identifierWithSelector(Identifier ident, String selector) { | ||
| if (selector == null) { | ||
| return ident; | ||
| } else { | ||
| String[] namespace = ident.namespace(); | ||
| String[] ns = Arrays.copyOf(namespace, namespace.length + 1); | ||
| ns[namespace.length] = ident.name(); | ||
| return Identifier.of(ns, selector); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -132,6 +164,15 @@ public String extractCatalog(CaseInsensitiveStringMap options) { | |
| return catalogAndIdentifier(options).catalog().name(); | ||
| } | ||
|
|
||
| private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) { | ||
| String value = options.get(property); | ||
| if (value != null) { | ||
| return Long.parseLong(value); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newline after if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will add a blank line.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a blank line.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes agree. I think it's mostly just general codestyle rules the community follows, maybe we should just put these into checkstyle instead of being human linters |
||
|
|
||
| return null; | ||
| } | ||
|
|
||
| private static void setupDefaultSparkCatalog(SparkSession spark) { | ||
| if (spark.conf().contains(DEFAULT_CATALOG)) { | ||
| return; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.