Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.apache.iceberg.spark;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
Expand All @@ -39,14 +43,17 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -84,6 +91,9 @@
*/
public class SparkCatalog extends BaseCatalog {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -122,8 +132,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) {
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
Table icebergTable = load(ident);
return new SparkTable(icebergTable, !cacheEnabled);
Pair<Table, Long> icebergTable = load(ident);
return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand Down Expand Up @@ -224,7 +234,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No
}

try {
Table table = load(ident);
Table table = load(ident).first();
commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges);
return new SparkTable(table, true /* refreshEagerly */);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
Expand Down Expand Up @@ -259,7 +269,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept
@Override
public void invalidateTable(Identifier ident) {
try {
load(ident).refresh();
load(ident).first().refresh();
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// ignore if the table doesn't exist, it is not cached
}
Expand Down Expand Up @@ -471,10 +481,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
}
}

private Table load(Identifier ident) {
return isPathIdentifier(ident) ?
tables.load(((PathIdentifier) ident).location()) :
icebergCatalog.loadTable(buildIdentifier(ident));
private Pair<Table, Long> load(Identifier ident) {
if (isPathIdentifier(ident)) {
return loadFromPathIdentifier((PathIdentifier) ident);
}

try {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
try {
table = icebergCatalog.loadTable(namespaceAsIdent);
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// the namespace does not identify a table, so it cannot be a table with a snapshot selector
// throw the original exception
throw e;
}

// loading the namespace as a table worked, check the name to see if it is a valid selector
Matcher at = AT_TIMESTAMP.matcher(ident.name());
if (at.matches()) {
long asOfTimestamp = Long.parseLong(at.group(1));
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
}

Matcher id = SNAPSHOT_ID.matcher(ident.name());
if (id.matches()) {
long snapshotId = Long.parseLong(id.group(1));
return Pair.of(table, snapshotId);
}

// the name wasn't a valid snapshot selector. throw the original exception
throw e;
}
}

private Pair<String, List<String>> parseLocationString(String location) {
int hashIndex = location.lastIndexOf('#');
if (hashIndex != -1 && !location.endsWith("#")) {
String baseLocation = location.substring(0, hashIndex);
List<String> metadata = COMMA.splitToList(location.substring(hashIndex + 1));
return Pair.of(baseLocation, metadata);
} else {
return Pair.of(location, ImmutableList.of());
}
}

private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
Pair<String, List<String>> parsed = parseLocationString(ident.location());

String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
for (String meta : parsed.second()) {
if (MetadataTableType.from(meta) != null) {
metadataTableName = meta;
continue;
}

Matcher at = AT_TIMESTAMP.matcher(meta);
if (at.matches()) {
asOfTimestamp = Long.parseLong(at.group(1));
continue;
}

Matcher id = SNAPSHOT_ID.matcher(meta);
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}
}

Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location());

Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));

if (snapshotId != null) {
return Pair.of(table, snapshotId);
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
} else {
return Pair.of(table, null);
}
}

private Identifier namespaceToIdentifier(String[] namespace) {
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
}

private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iceberg.spark.source;

import java.util.Arrays;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand Down Expand Up @@ -56,6 +58,8 @@
public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";

@Override
public String shortName() {
Expand Down Expand Up @@ -101,24 +105,52 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
SparkSession spark = SparkSession.active();
setupDefaultSparkCatalog(spark);
String path = options.get("path");

Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp);

String selector = null;

if (snapshotId != null) {
selector = SNAPSHOT_ID + snapshotId;
}

if (asOfTimestamp != null) {
selector = AT_TIMESTAMP + asOfTimestamp;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
String newPath = (selector == null) ? path : path + "#" + selector;
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
new PathIdentifier(path));
new PathIdentifier(newPath));
}

final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
"path or identifier", spark, path);

Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector);
if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
!(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
// catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
catalogAndIdentifier.identifier());
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), ident);
} else {
return catalogAndIdentifier;
return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), ident);
}
}

private Identifier identifierWithSelector(Identifier ident, String selector) {
if (selector == null) {
return ident;
} else {
String[] namespace = ident.namespace();
String[] ns = Arrays.copyOf(namespace, namespace.length + 1);
ns[namespace.length] = ident.name();
return Identifier.of(ns, selector);
}
}

Expand All @@ -132,6 +164,15 @@ public String extractCatalog(CaseInsensitiveStringMap options) {
return catalogAndIdentifier(options).catalog().name();
}

private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) {
String value = options.get(property);
if (value != null) {
return Long.parseLong(value);
}

return null;
}

private static void setupDefaultSparkCatalog(SparkSession spark) {
if (spark.conf().contains(DEFAULT_CATALOG)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,17 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
private Filter[] pushedFilters = NO_FILTERS;
private boolean ignoreResiduals = false;

SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
SparkScanBuilder(SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
this.schema = schema;
this.readConf = new SparkReadConf(spark, table, options);
this.options = options;
this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
}

private Schema lazySchema() {
if (schema == null) {
if (requestedProjection != null) {
// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(table.schema(), requestedProjection, filterExpression(), caseSensitive);
} else {
this.schema = table.schema();
}
}
return schema;
SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
this(spark, table, table.schema(), options);
}

private Expression filterExpression() {
Expand Down Expand Up @@ -108,7 +101,7 @@ public Filter[] pushFilters(Filter[] filters) {
Expression expr = SparkFilters.convert(filter);
if (expr != null) {
try {
Binder.bind(table.schema().asStruct(), expr, caseSensitive);
Binder.bind(schema.asStruct(), expr, caseSensitive);
expressions.add(expr);
pushed.add(filter);
} catch (ValidationException e) {
Expand Down Expand Up @@ -136,6 +129,9 @@ public void pruneColumns(StructType requestedSchema) {
.filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
.toArray(StructField[]::new));

// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), caseSensitive);

Stream.of(requestedSchema.fields())
.map(StructField::name)
.filter(MetadataColumns::isMetadataColumn)
Expand All @@ -157,7 +153,7 @@ private Schema schemaWithMetadataColumns() {
Schema meta = new Schema(fields);

// schema or rows returned by readers
return TypeUtil.join(lazySchema(), meta);
return TypeUtil.join(schema, meta);
}

@Override
Expand Down
Loading