Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions site/docs/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,21 @@ spark.read

Time travel is not yet supported by Spark's SQL syntax.

### Table names and paths

Paths and table names can be loaded from the Spark3 dataframe interface. How paths/tables are loaded depends on how
the identifier is specified. When using `spark.read().format("iceberg").path(table)` or `spark.table(table)` the `table`
variable can take a number of forms as listed below:

* `file:/path/to/table` -> loads a HadoopTable at given path
* `tablename` -> loads `currentCatalog.currentNamespace.tablename`
* `catalog.tablename` -> load `tablename` from the specified catalog.
* `namespace.tablename` -> load `namespace.tablename` from current catalog
* `catalog.namespace.tablename` -> load `namespace.tablename` from the specified catalog.
* `namespace1.namespace2.tablename` -> load `namespace1.namespace2.tablename` from current catalog

The above list is in order of priority. For example: a matching catalog will take priority over any namespace resolution.

### Spark 2.4

Spark 2.4 requires using the DataFrame reader with `iceberg` as a format, because 2.4 does not support catalogs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,21 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public abstract class TestIdentityPartitionData {
public abstract class TestIdentityPartitionData extends SparkTestBase {
private static final Configuration CONF = new Configuration();
private static final HadoopTables TABLES = new HadoopTables(CONF);

Expand All @@ -72,20 +70,6 @@ public TestIdentityPartitionData(String format, boolean vectorized) {
this.vectorized = vectorized;
}

private static SparkSession spark = null;

@BeforeClass
public static void startSpark() {
TestIdentityPartitionData.spark = SparkSession.builder().master("local[2]").getOrCreate();
}

@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestIdentityPartitionData.spark;
TestIdentityPartitionData.spark = null;
currentSpark.stop();
}

private static final Schema LOG_SCHEMA = new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "date", Types.StringType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkValueConverter;
Expand Down Expand Up @@ -81,6 +82,14 @@ public TestSparkReadProjection(String format, boolean vectorized) {
@BeforeClass
public static void startSpark() {
TestSparkReadProjection.spark = SparkSession.builder().master("local[2]").getOrCreate();
ImmutableMap<String, String> config = ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
"parquet-enabled", "true",
"cache-enabled", "false"
);
spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value));
}

@AfterClass
Expand Down
10 changes: 10 additions & 0 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Term;
Expand Down Expand Up @@ -643,6 +645,10 @@ public static CatalogAndIdentifier catalogAndIdentifier(SparkSession spark, Stri
return catalogAndIdentifier(spark, javaMultiPartIdentifier, defaultCatalog);
}

public static CatalogAndIdentifier catalogAndIdentifier(String description, SparkSession spark, String name) {
return catalogAndIdentifier(description, spark, name, spark.sessionState().catalogManager().currentCatalog());
}

public static CatalogAndIdentifier catalogAndIdentifier(String description, SparkSession spark,
String name, CatalogPlugin defaultCatalog) {
try {
Expand Down Expand Up @@ -716,4 +722,8 @@ public Identifier identifier() {
return identifier;
}
}

public static TableIdentifier identifierToTableIdentifier(Identifier identifier) {
return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap opti
* @return an Iceberg identifier
*/
protected TableIdentifier buildIdentifier(Identifier identifier) {
return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name());
return Spark3Util.identifierToTableIdentifier(identifier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,43 @@
package org.apache.iceberg.spark.source;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.PathIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsCatalogOptions;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class IcebergSource implements DataSourceRegister, TableProvider {
/**
* The IcebergSource loads/writes tables with format "iceberg". It can load paths and tables.
*
* How paths/tables are loaded when using spark.read().format("iceberg").path(table)
*
* table = "file:/path/to/table" -> loads a HadoopTable at given path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe important to note that these are used with this priority as well, ie
if "catalog.namespace.table" is valid it will be read and "namespace.namespace.table" will be ignored ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good point @RussellSpitzer, I have included a note to that effect

* table = "tablename" -> loads currentCatalog.currentNamespace.tablename
* table = "catalog.tablename" -> load "tablename" from the specified catalog.
* table = "namespace.tablename" -> load "namespace.tablename" from current catalog
* table = "catalog.namespace.tablename" -> "namespace.tablename" from the specified catalog.
* table = "namespace1.namespace2.tablename" -> load "namespace1.namespace2.tablename" from current catalog
*
* The above list is in order of priority. For example: a matching catalog will take priority over any namespace
* resolution.
*/
public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME;

@Override
public String shortName() {
return "iceberg";
Expand All @@ -56,48 +78,72 @@ public boolean supportsExternalMetadata() {
}

@Override
public SparkTable getTable(StructType schema, Transform[] partitioning, Map<String, String> options) {
// Get Iceberg table from options
Configuration conf = SparkSession.active().sessionState().newHadoopConf();
Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);

// Build Spark table based on Iceberg table, and return it
// Eagerly refresh the table before reading to ensure views containing this table show up-to-date data
return new SparkTable(icebergTable, schema, true);
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When implementing SupportsCatalogOptions, I don't think this will ever be called. Should we remove it and throw UnsupportedOperationException instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inferPartitioning method uses it :-( Can we return Transform[0] and have Spark do the work?

Spark3Util.CatalogAndIdentifier catalogIdentifier = catalogAndIdentifier(new CaseInsensitiveStringMap(options));
CatalogPlugin catalog = catalogIdentifier.catalog();
Identifier ident = catalogIdentifier.identifier();

try {
if (catalog instanceof TableCatalog) {
return ((TableCatalog) catalog).loadTable(ident);
}
} catch (NoSuchTableException e) {
// throwing an iceberg NoSuchTableException because the Spark one is typed and cant be thrown from this interface
throw new org.apache.iceberg.exceptions.NoSuchTableException(e, "Cannot find table for %s.", ident);
}

// throwing an iceberg NoSuchTableException because the Spark one is typed and cant be thrown from this interface
throw new org.apache.iceberg.exceptions.NoSuchTableException("Cannot find table for %s.", ident);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct exception is the Spark exception since this is going to be called from Spark code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark NoSuchTableException is typed which would change the interface. It appears there is no well defined way to return from this function w/o a table being found. It NPEs if you return null so an untyped exception seems to be the expected way to denote no table found. I thought the Iceberg NoSuchTableException was the best compromise here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I agree in that case. Thanks for explaining! You may want to add a comment here to explain in the code as well.

}

protected Table findTable(Map<String, String> options, Configuration conf) {
private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStringMap options) {
Preconditions.checkArgument(options.containsKey("path"), "Cannot open table: path is not set");
SparkSession spark = SparkSession.active();
setupDefaultSparkCatalog(spark);
String path = options.get("path");
CatalogManager catalogManager = spark.sessionState().catalogManager();

if (path.contains("/")) {
HadoopTables tables = new HadoopTables(conf);
return tables.load(path);
} else {
HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
TableIdentifier tableIdentifier = TableIdentifier.parse(path);
return hiveCatalog.loadTable(tableIdentifier);
// contains a path. Return iceberg default catalog and a PathIdentifier
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
new PathIdentifier(path));
}
}

private Table getTableAndResolveHadoopConfiguration(Map<String, String> options, Configuration conf) {
// Overwrite configurations from the Spark Context with configurations from the options.
mergeIcebergHadoopConfs(conf, options);
final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier(
"path or identifier", spark, path);

Table table = findTable(options, conf);

// Set confs from table properties
mergeIcebergHadoopConfs(conf, table.properties());
if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
!(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
// catalog is a session catalog but does not support Iceberg. Use Iceberg instead.
return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
catalogAndIdentifier.identifier());
} else {
return catalogAndIdentifier;
}
}

// Re-overwrite values set in options and table properties but were not in the environment.
mergeIcebergHadoopConfs(conf, options);
@Override
public Identifier extractIdentifier(CaseInsensitiveStringMap options) {
return catalogAndIdentifier(options).identifier();
}

return table;
@Override
public String extractCatalog(CaseInsensitiveStringMap options) {
return catalogAndIdentifier(options).catalog().name();
}

private static void mergeIcebergHadoopConfs(Configuration baseConf, Map<String, String> options) {
options.keySet().stream()
.filter(key -> key.startsWith("hadoop."))
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
private static void setupDefaultSparkCatalog(SparkSession spark) {
if (spark.conf().contains(DEFAULT_CATALOG)) {
return;
}
ImmutableMap<String, String> config = ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
"parquet-enabled", "true",
"cache-enabled", "false" // the source should not use a cache
);
String catalogName = "org.apache.iceberg.spark.SparkCatalog";
spark.conf().set(DEFAULT_CATALOG, catalogName);
config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + key, value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public void testSparkSessionCatalogHadoopTable() throws Exception {
.olderThan(System.currentTimeMillis() + 1000).execute();
Assert.assertTrue("trash file should be removed",
results.contains("file:" + location + "/data/trashfile"));
// reset spark_catalog to default
spark.conf().unset("spark.sql.catalog.spark_catalog");
spark.conf().unset("spark.sql.catalog.spark_catalog.type");
spark.conf().unset("spark.sql.catalog.spark_catalog.warehouse");
}

@Test
Expand All @@ -156,6 +160,9 @@ public void testSparkSessionCatalogHiveTable() throws Exception {
.olderThan(System.currentTimeMillis() + 1000).execute();
Assert.assertTrue("trash file should be removed",
results.contains("file:" + location + "/data/trashfile"));
// reset spark_catalog to default
spark.conf().unset("spark.sql.catalog.spark_catalog");
spark.conf().unset("spark.sql.catalog.spark_catalog.type");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.apache.iceberg.spark.source;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class TestIcebergSource extends IcebergSource {
@Override
Expand All @@ -30,7 +31,14 @@ public String shortName() {
}

@Override
protected Table findTable(Map<String, String> options, Configuration conf) {
return TestTables.load(options.get("iceberg.table.name"));
public Identifier extractIdentifier(CaseInsensitiveStringMap options) {
TableIdentifier ti = TableIdentifier.parse(options.get("iceberg.table.name"));
return Identifier.of(ti.namespace().levels(), ti.name());
}

@Override
public String extractCatalog(CaseInsensitiveStringMap options) {
return SparkSession.active().sessionState().catalogManager().currentCatalog().name();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,19 @@

package org.apache.iceberg.spark.source;

public class TestSparkSchema3 extends TestSparkSchema {
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;

public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> extends SparkSessionCatalog<T> {

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString());
return new SparkTable(table, false);
}
}