diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 8975c7f32db1..d084c06b2b7c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager; import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SessionConfigSupport; import org.apache.spark.sql.connector.catalog.SupportsCatalogOptions; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -61,7 +62,8 @@ *

The above list is in order of priority. For example: a matching catalog will take priority * over any namespace resolution. */ -public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions { +public class IcebergSource + implements DataSourceRegister, SupportsCatalogOptions, SessionConfigSupport { private static final String DEFAULT_CATALOG_NAME = "default_iceberg"; private static final String DEFAULT_CACHE_CATALOG_NAME = "default_cache_iceberg"; private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME; @@ -80,6 +82,11 @@ public String shortName() { return "iceberg"; } + @Override + public String keyPrefix() { + return shortName(); + } + @Override public StructType inferSchema(CaseInsensitiveStringMap options) { return null; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 29216150d362..21afd7460ec6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2210,6 +2210,52 @@ private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected); } + @Test + public void testSessionConfigSupport() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + TableIdentifier tableIdentifier = TableIdentifier.of("db", "session_config_table"); + Table table = createTable(tableIdentifier, SCHEMA, spec); + + List initialRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + + Dataset df = spark.createDataFrame(initialRecords, SimpleRecord.class); + + df.select("id", "data") + .write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + long s1 = table.currentSnapshot().snapshotId(); + + withSQLConf( + // set write option through session configuration + ImmutableMap.of("spark.datasource.iceberg.snapshot-property.foo", "bar"), + () -> { + df.select("id", "data") + .write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + }); + + table.refresh(); + assertThat(table.currentSnapshot().summary()).containsEntry("foo", "bar"); + + withSQLConf( + // set read option through session configuration + ImmutableMap.of("spark.datasource.iceberg.snapshot-id", String.valueOf(s1)), + () -> { + Dataset result = spark.read().format("iceberg").load(loadLocation(tableIdentifier)); + List actual = result.as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual) + .as("Rows must match") + .containsExactlyInAnyOrderElementsOf(initialRecords); + }); + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder =