From b6b4da4591f7c9ca38659e3da7df83d972143196 Mon Sep 17 00:00:00 2001 From: aokolnychyi Date: Fri, 2 Sep 2022 14:23:20 -0700 Subject: [PATCH 1/2] Spark 3.3: Add SparkChangelogTable --- .../org/apache/iceberg/ChangelogUtil.java | 49 ++++ .../org/apache/iceberg/MetadataColumns.java | 18 ++ .../java/org/apache/iceberg/hadoop/Util.java | 24 +- .../extensions/TestChangelogBatchReads.java | 248 ++++++++++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 126 +++++---- .../spark/source/ChangelogRowReader.java | 3 +- .../iceberg/spark/source/SparkBatch.java | 11 +- .../spark/source/SparkChangelogBatch.java | 138 ++++++++++ .../spark/source/SparkChangelogScan.java | 163 ++++++++++++ .../spark/source/SparkChangelogTable.java | 102 +++++++ .../spark/source/SparkInputPartition.java | 83 ++++++ .../spark/source/SparkMicroBatchStream.java | 12 +- .../iceberg/spark/source/SparkScan.java | 124 +++------ .../spark/source/SparkScanBuilder.java | 37 ++- .../iceberg/spark/source/SparkTable.java | 8 + .../actions/TestRemoveOrphanFilesAction3.java | 6 +- .../spark/source/TestPathIdentifier.java | 5 +- 17 files changed, 1002 insertions(+), 155 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/ChangelogUtil.java create mode 100644 spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java diff --git a/core/src/main/java/org/apache/iceberg/ChangelogUtil.java b/core/src/main/java/org/apache/iceberg/ChangelogUtil.java new file mode 100644 index 000000000000..d909c72e2394 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ChangelogUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.MetadataColumns.CHANGE_ORDINAL; +import static org.apache.iceberg.MetadataColumns.CHANGE_TYPE; +import static org.apache.iceberg.MetadataColumns.COMMIT_SNAPSHOT_ID; + +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +public class ChangelogUtil { + + private static final Schema CHANGELOG_METADATA = + new Schema(CHANGE_TYPE, CHANGE_ORDINAL, COMMIT_SNAPSHOT_ID); + + private static final Set CHANGELOG_METADATA_FIELD_IDS = + CHANGELOG_METADATA.columns().stream() + .map(Types.NestedField::fieldId) + .collect(Collectors.toSet()); + + private ChangelogUtil() {} + + public static Schema changelogSchema(Schema tableSchema) { + return TypeUtil.join(tableSchema, CHANGELOG_METADATA); + } + + public static Schema dropChangelogMetadata(Schema changelogSchema) { + return TypeUtil.selectNot(changelogSchema, CHANGELOG_METADATA_FIELD_IDS); + } +} diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index dc6de143b0e4..39777c2936ed 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -75,6 +75,24 @@ private MetadataColumns() {} public static final String DELETE_FILE_ROW_FIELD_NAME = "row"; public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103; public static final String DELETE_FILE_ROW_DOC = "Deleted row values"; + public static final NestedField CHANGE_TYPE = + NestedField.required( + Integer.MAX_VALUE - 104, + "_change_type", + Types.StringType.get(), + "Record type in changelog"); + public static final NestedField CHANGE_ORDINAL = + NestedField.optional( + Integer.MAX_VALUE - 105, + "_change_ordinal", + Types.IntegerType.get(), + "Change ordinal in changelog"); + public static final NestedField COMMIT_SNAPSHOT_ID = + NestedField.optional( + Integer.MAX_VALUE - 106, + "_commit_snapshot_id", + Types.LongType.get(), + "Commit snapshot ID"); private static final Map META_COLUMNS = ImmutableMap.of( diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java b/core/src/main/java/org/apache/iceberg/hadoop/Util.java index 86ab481a3ec1..8ff792920af4 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java @@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; @@ -69,19 +72,28 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf) return locationSets.toArray(new String[0]); } - public static String[] blockLocations(FileIO io, CombinedScanTask task) { + public static String[] blockLocations(FileIO io, ScanTaskGroup taskGroup) { Set locations = Sets.newHashSet(); - for (FileScanTask f : task.files()) { - InputFile in = io.newInputFile(f.file().path().toString()); - if (in instanceof HadoopInputFile) { - Collections.addAll( - locations, ((HadoopInputFile) in).getBlockLocations(f.start(), f.length())); + + for (ScanTask task : taskGroup.tasks()) { + if (task instanceof ContentScanTask) { + Collections.addAll(locations, blockLocations(io, (ContentScanTask) task)); } } return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE); } + private static String[] blockLocations(FileIO io, ContentScanTask task) { + InputFile inputFile = io.newInputFile(task.file().path().toString()); + if (inputFile instanceof HadoopInputFile) { + HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile; + return hadoopInputFile.getBlockLocations(task.start(), task.length()); + } else { + return HadoopInputFile.NO_LOCATION_PREFERENCE; + } + } + /** * From Apache Spark * diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java new file mode 100644 index 000000000000..530abe67c96e --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED; +import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Row; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized.Parameters; + +public class TestChangelogBatchReads extends SparkExtensionsTestBase { + + @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + 1, + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + }, + { + 2, + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties() + } + }; + } + + private final int formatVersion; + + public TestChangelogBatchReads( + int formatVersion, String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + this.formatVersion = formatVersion; + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testDataFilters() { + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ( " + + " '%s' = '%d' " + + ")", + tableName, FORMAT_VERSION, formatVersion); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + sql("INSERT INTO %s VALUES (3, 'c')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + + Snapshot snap3 = table.currentSnapshot(); + + sql("DELETE FROM %s WHERE id = 3", tableName); + + table.refresh(); + + Snapshot snap4 = table.currentSnapshot(); + + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(3, "c", "INSERT", 2, snap3.snapshotId()), + row(3, "c", "DELETE", 3, snap4.snapshotId())), + sql("SELECT * FROM %s.changes WHERE id = 3 ORDER BY _change_ordinal, id", tableName)); + } + + @Test + public void testOverwrites() { + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ( " + + " '%s' = '%d' " + + ")", + tableName, FORMAT_VERSION, formatVersion); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + + Snapshot snap2 = table.currentSnapshot(); + + sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName); + + table.refresh(); + + Snapshot snap3 = table.currentSnapshot(); + + assertEquals( + "Rows should match", + ImmutableList.of( + row(2, "b", "DELETE", 0, snap3.snapshotId()), + row(-2, "b", "INSERT", 0, snap3.snapshotId())), + changelogRecords(snap2, snap3)); + } + + @Test + public void testMetadataDeletes() { + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ( " + + " '%s' = '%d' " + + ")", + tableName, FORMAT_VERSION, formatVersion); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + + Snapshot snap2 = table.currentSnapshot(); + + sql("DELETE FROM %s WHERE data = 'a'", tableName); + + table.refresh(); + + Snapshot snap3 = table.currentSnapshot(); + Assert.assertEquals("Operation must match", DataOperations.DELETE, snap3.operation()); + + assertEquals( + "Rows should match", + ImmutableList.of(row(1, "a", "DELETE", 0, snap3.snapshotId())), + changelogRecords(snap2, snap3)); + } + + @Test + public void testExistingEntriesInNewDataManifestsAreIgnored() { + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ( " + + " '%s' = '%d', " + + " '%s' = '1', " + + " '%s' = 'true' " + + ")", + tableName, FORMAT_VERSION, formatVersion, MANIFEST_MIN_MERGE_COUNT, MANIFEST_MERGE_ENABLED); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + + Snapshot snap1 = table.currentSnapshot(); + + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + + table.refresh(); + + Snapshot snap2 = table.currentSnapshot(); + Assert.assertEquals("Manifest number must match", 1, snap2.dataManifests(table.io()).size()); + + assertEquals( + "Rows should match", + ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())), + changelogRecords(snap1, snap2)); + } + + @Test + public void testManifestRewritesAreIgnored() { + sql( + "CREATE TABLE %s (id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ( " + + " '%s' = '%d' " + + ")", + tableName, FORMAT_VERSION, formatVersion); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + + sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals("Num snapshots must match", 3, Iterables.size(table.snapshots())); + + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "INSERT"), row(2, "INSERT")), + sql("SELECT id, _change_type FROM %s.changes ORDER BY id", tableName)); + } + + private List changelogRecords(Snapshot startSnapshot, Snapshot endSnapshot) { + DataFrameReader reader = spark.read(); + + if (startSnapshot != null) { + reader = reader.option(SparkReadOptions.START_SNAPSHOT_ID, startSnapshot.snapshotId()); + } + + if (endSnapshot != null) { + reader = reader.option(SparkReadOptions.END_SNAPSHOT_ID, endSnapshot.snapshotId()); + } + + return rowsToJava(collect(reader)); + } + + private List collect(DataFrameReader reader) { + return reader + .table(tableName + "." + SparkChangelogTable.TABLE_NAME) + .orderBy("_change_ordinal", "_commit_snapshot_id", "_change_type", "id") + .collectAsList(); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index bad2aca031c8..33b624c60bae 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -36,7 +36,6 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -55,6 +54,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.spark.source.SparkChangelogTable; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.StagedSparkTable; import org.apache.iceberg.util.Pair; @@ -68,6 +68,7 @@ import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange; @@ -138,54 +139,68 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { } @Override - public SparkTable loadTable(Identifier ident) throws NoSuchTableException { + public Table loadTable(Identifier ident) throws NoSuchTableException { try { - Pair icebergTable = load(ident); - return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled); + return load(ident); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } } @Override - public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException { - try { - Pair icebergTable = load(ident); + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + Table table = loadTable(ident); + + if (table instanceof SparkTable) { + SparkTable sparkTable = (SparkTable) table; + Preconditions.checkArgument( - icebergTable.second() == null, + sparkTable.snapshotId() == null, "Cannot do time-travel based on both table identifier and AS OF"); - return new SparkTable(icebergTable.first(), Long.parseLong(version), !cacheEnabled); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - throw new NoSuchTableException(ident); + + return sparkTable.copyWithSnapshotId(Long.parseLong(version)); + + } else if (table instanceof SparkChangelogTable) { + throw new UnsupportedOperationException("AS OF is not supported for changelogs"); + + } else { + throw new IllegalArgumentException("Unknown Spark table type: " + table.getClass().getName()); } } @Override - public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - try { - Pair icebergTable = load(ident); - // spark returns timestamp in micro seconds precision, convert it to milliseconds, - // as iceberg snapshot's are stored in millisecond precision. - long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp); + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + Table table = loadTable(ident); + + if (table instanceof SparkTable) { + SparkTable sparkTable = (SparkTable) table; + Preconditions.checkArgument( - icebergTable.second() == null, + sparkTable.snapshotId() == null, "Cannot do time-travel based on both table identifier and AS OF"); - long snapshotIdAsOfTime = - SnapshotUtil.snapshotIdAsOfTime(icebergTable.first(), timestampMillis); - return new SparkTable(icebergTable.first(), snapshotIdAsOfTime, !cacheEnabled); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - throw new NoSuchTableException(ident); + + // convert the timestamp to milliseconds as Spark passes microseconds + // but Iceberg uses milliseconds for snapshot timestamps + long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp); + long snapshotId = SnapshotUtil.snapshotIdAsOfTime(sparkTable.table(), timestampMillis); + return sparkTable.copyWithSnapshotId(snapshotId); + + } else if (table instanceof SparkChangelogTable) { + throw new UnsupportedOperationException("AS OF is not supported for changelogs"); + + } else { + throw new IllegalArgumentException("Unknown Spark table type: " + table.getClass().getName()); } } @Override - public SparkTable createTable( + public Table createTable( Identifier ident, StructType schema, Transform[] transforms, Map properties) throws TableAlreadyExistsException { Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); - Table icebergTable = + org.apache.iceberg.Table icebergTable = builder .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) .withLocation(properties.get("location")) @@ -250,8 +265,7 @@ public StagedTable stageCreateOrReplace( } @Override - public SparkTable alterTable(Identifier ident, TableChange... changes) - throws NoSuchTableException { + public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { SetProperty setLocation = null; SetProperty setSnapshotId = null; SetProperty pickSnapshotId = null; @@ -284,7 +298,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) } try { - Table table = load(ident).first(); + org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); commitChanges( table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges); return new SparkTable(table, true /* refreshEagerly */); @@ -301,7 +315,7 @@ public boolean dropTable(Identifier ident) { @Override public boolean purgeTable(Identifier ident) { try { - Table table = load(ident).first(); + org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); ValidationException.check( PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT), "Cannot purge table: GC is disabled (deleting files may corrupt other tables)"); @@ -526,7 +540,7 @@ public String name() { } private static void commitChanges( - Table table, + org.apache.iceberg.Table table, SetProperty setLocation, SetProperty setSnapshotId, SetProperty pickSnapshotId, @@ -579,23 +593,24 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } - private Pair load(Identifier ident) { + private Table load(Identifier ident) { if (isPathIdentifier(ident)) { return loadFromPathIdentifier((PathIdentifier) ident); } try { - return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); + org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); + return new SparkTable(table, !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { if (ident.namespace().length == 0) { throw e; } - // if the original load didn't work, the identifier may be extended and include a snapshot - // selector + // if the original load didn't work, try using the namespace as an identifier because + // the original identifier may include a snapshot selector or may point to the changelog TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); - Table table; + org.apache.iceberg.Table table; try { table = icebergCatalog.loadTable(namespaceAsIdent); } catch (Exception ignored) { @@ -605,19 +620,27 @@ private Pair load(Identifier ident) { } // loading the namespace as a table worked, check the name to see if it is a valid selector + // or if the name points to the changelog + + if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) { + return new SparkChangelogTable(table, !cacheEnabled); + } + Matcher at = AT_TIMESTAMP.matcher(ident.name()); if (at.matches()) { long asOfTimestamp = Long.parseLong(at.group(1)); - return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + return new SparkTable(table, snapshotId, !cacheEnabled); } Matcher id = SNAPSHOT_ID.matcher(ident.name()); if (id.matches()) { long snapshotId = Long.parseLong(id.group(1)); - return Pair.of(table, snapshotId); + return new SparkTable(table, snapshotId, !cacheEnabled); } - // the name wasn't a valid snapshot selector. throw the original exception + // the name wasn't a valid snapshot selector and did not point to the changelog + // throw the original exception throw e; } } @@ -633,13 +656,21 @@ private Pair> parseLocationString(String location) { } } - private Pair loadFromPathIdentifier(PathIdentifier ident) { + @SuppressWarnings("CyclomaticComplexity") + private Table loadFromPathIdentifier(PathIdentifier ident) { Pair> parsed = parseLocationString(ident.location()); String metadataTableName = null; Long asOfTimestamp = null; Long snapshotId = null; + boolean isChangelog = false; + for (String meta : parsed.second()) { + if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) { + isChangelog = true; + continue; + } + if (MetadataTableType.from(meta) != null) { metadataTableName = meta; continue; @@ -662,15 +693,22 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { "Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location()); - Table table = + Preconditions.checkArgument( + !isChangelog || (snapshotId == null && asOfTimestamp == null), + "Cannot specify snapshot-id and as-of-timestamp for changelogs"); + + org.apache.iceberg.Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); - if (snapshotId != null) { - return Pair.of(table, snapshotId); + if (isChangelog) { + return new SparkChangelogTable(table, !cacheEnabled); + } else if (asOfTimestamp != null) { - return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); + return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled); + } else { - return Pair.of(table, null); + return new SparkTable(table, snapshotId, !cacheEnabled); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 20f0893bcca3..a1111eda123f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -23,6 +23,7 @@ import java.util.stream.Stream; import org.apache.iceberg.AddedRowsScanTask; import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.ChangelogUtil; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.DataFile; @@ -48,7 +49,7 @@ class ChangelogRowReader extends BaseRowReader { ScanTaskGroup taskGroup, Schema expectedSchema, boolean caseSensitive) { - super(table, taskGroup, expectedSchema, caseSensitive); + super(table, taskGroup, ChangelogUtil.dropChangelogMetadata(expectedSchema), caseSensitive); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 6d8504794310..19a1dce3c95d 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -25,7 +25,6 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.source.SparkScan.ReadTask; import org.apache.iceberg.spark.source.SparkScan.ReaderFactory; import org.apache.iceberg.util.TableScanUtil; import org.apache.iceberg.util.Tasks; @@ -62,22 +61,22 @@ public InputPartition[] planInputPartitions() { sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); String expectedSchemaString = SchemaParser.toJson(expectedSchema); - InputPartition[] readTasks = new InputPartition[tasks().size()]; + InputPartition[] partitions = new InputPartition[tasks().size()]; - Tasks.range(readTasks.length) + Tasks.range(partitions.length) .stopOnFailure() .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null) .run( index -> - readTasks[index] = - new ReadTask( + partitions[index] = + new SparkInputPartition( tasks().get(index), tableBroadcast, expectedSchemaString, caseSensitive, localityEnabled)); - return readTasks; + return partitions; } protected abstract List tasks(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java new file mode 100644 index 000000000000..77923145d976 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; + +public class SparkChangelogBatch implements Batch { + + private final JavaSparkContext sparkContext; + private final Table table; + private final List> taskGroups; + private final Schema expectedSchema; + private final boolean caseSensitive; + private final boolean localityEnabled; + private final int semanticBatchId; + + SparkChangelogBatch( + SparkSession spark, + Table table, + SparkReadConf readConf, + List> taskGroups, + Schema expectedSchema, + int semanticBatchId) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.taskGroups = taskGroups; + this.expectedSchema = expectedSchema; + this.caseSensitive = readConf.caseSensitive(); + this.localityEnabled = readConf.localityEnabled(); + this.semanticBatchId = semanticBatchId; + } + + @Override + public InputPartition[] planInputPartitions() { + Table serializableTable = SerializableTableWithSize.copyOf(table); + Broadcast tableBroadcast = sparkContext.broadcast(serializableTable); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + InputPartition[] partitions = new InputPartition[taskGroups.size()]; + + Tasks.range(partitions.length) + .stopOnFailure() + .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null) + .run( + index -> + partitions[index] = + new SparkInputPartition( + taskGroups.get(index), + tableBroadcast, + expectedSchemaString, + caseSensitive, + localityEnabled)); + + return partitions; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SparkChangelogBatch that = (SparkChangelogBatch) o; + return table.name().equals(that.table.name()) && semanticBatchId == that.semanticBatchId; + } + + @Override + public int hashCode() { + return Objects.hash(table.name(), semanticBatchId); + } + + private static class ReaderFactory implements PartitionReaderFactory { + @Override + public PartitionReader createReader(InputPartition partition) { + Preconditions.checkArgument( + partition instanceof SparkInputPartition, + "Unknown input partition type: %s", + partition.getClass().getName()); + + return new RowReader((SparkInputPartition) partition); + } + } + + private static class RowReader extends ChangelogRowReader + implements PartitionReader { + + RowReader(SparkInputPartition partition) { + super( + partition.table(), + partition.taskGroup(), + partition.expectedSchema(), + partition.isCaseSensitive()); + } + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java new file mode 100644 index 000000000000..3f7927c6d6c2 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.IncrementalChangelogScan; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.types.StructType; + +class SparkChangelogScan implements Scan, SupportsReportStatistics { + + private final SparkSession spark; + private final Table table; + private final IncrementalChangelogScan scan; + private final SparkReadConf readConf; + private final Schema expectedSchema; + private final List filters; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final boolean readTimestampWithoutZone; + + // lazy variables + private List> taskGroups = null; + private StructType expectedSparkType = null; + + SparkChangelogScan( + SparkSession spark, + Table table, + IncrementalChangelogScan scan, + SparkReadConf readConf, + Schema expectedSchema, + List filters) { + + SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); + + this.spark = spark; + this.table = table; + this.scan = scan; + this.readConf = readConf; + this.expectedSchema = expectedSchema; + this.filters = filters != null ? filters : Collections.emptyList(); + this.startSnapshotId = readConf.startSnapshotId(); + this.endSnapshotId = readConf.endSnapshotId(); + this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); + } + + @Override + public Statistics estimateStatistics() { + long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum(); + long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount); + return new Stats(sizeInBytes, rowsCount); + } + + @Override + public StructType readSchema() { + if (expectedSparkType == null) { + Preconditions.checkArgument( + readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(expectedSchema), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); + + this.expectedSparkType = SparkSchemaUtil.convert(expectedSchema); + } + + return expectedSparkType; + } + + @Override + public Batch toBatch() { + return new SparkChangelogBatch( + spark, table, readConf, taskGroups(), expectedSchema, hashCode()); + } + + private List> taskGroups() { + if (taskGroups == null) { + try (CloseableIterable> groups = scan.planTasks()) { + this.taskGroups = Lists.newArrayList(groups); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close changelog scan: " + scan, e); + } + } + + return taskGroups; + } + + @Override + public String description() { + return String.format( + "%s [fromSnapshotId=%d, toSnapshotId=%d, filters=%s]", + table, startSnapshotId, endSnapshotId, filtersAsString()); + } + + @Override + public String toString() { + return String.format( + "IcebergChangelogScan(table=%s, type=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)", + table, expectedSchema.asStruct(), startSnapshotId, endSnapshotId, filtersAsString()); + } + + private String filtersAsString() { + return filters.stream().map(Spark3Util::describe).collect(Collectors.joining(", ")); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SparkChangelogScan that = (SparkChangelogScan) o; + return table.name().equals(that.table.name()) + && readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field IDs + && filters.toString().equals(that.filters.toString()) + && Objects.equals(startSnapshotId, that.startSnapshotId) + && Objects.equals(endSnapshotId, that.endSnapshotId); + } + + @Override + public int hashCode() { + return Objects.hash( + table.name(), readSchema(), filters.toString(), startSnapshotId, endSnapshotId); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java new file mode 100644 index 000000000000..645a583e5daa --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Set; +import org.apache.iceberg.ChangelogUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkChangelogTable implements Table, SupportsRead { + + public static final String TABLE_NAME = "changes"; + + private static final Set CAPABILITIES = + ImmutableSet.of(TableCapability.BATCH_READ); + + private final org.apache.iceberg.Table icebergTable; + private final boolean refreshEagerly; + + private SparkSession lazySpark = null; + private StructType lazyTableSparkType = null; + private Schema lazyChangelogSchema = null; + + public SparkChangelogTable(org.apache.iceberg.Table icebergTable, boolean refreshEagerly) { + this.icebergTable = icebergTable; + this.refreshEagerly = refreshEagerly; + } + + @Override + public String name() { + return icebergTable.name() + "." + TABLE_NAME; + } + + @Override + public StructType schema() { + if (lazyTableSparkType == null) { + this.lazyTableSparkType = SparkSchemaUtil.convert(changelogSchema()); + } + + return lazyTableSparkType; + } + + @Override + public Set capabilities() { + return CAPABILITIES; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + if (refreshEagerly) { + icebergTable.refresh(); + } + + return new SparkScanBuilder(spark(), icebergTable, changelogSchema(), options) { + @Override + public Scan build() { + return buildChangelogScan(); + } + }; + } + + private Schema changelogSchema() { + if (lazyChangelogSchema == null) { + this.lazyChangelogSchema = ChangelogUtil.changelogSchema(icebergTable.schema()); + } + + return lazyChangelogSchema; + } + + private SparkSession spark() { + if (lazySpark == null) { + this.lazySpark = SparkSession.active(); + } + + return lazySpark; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java new file mode 100644 index 000000000000..4d5164dc0633 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.Serializable; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.Util; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; + +class SparkInputPartition implements InputPartition, Serializable { + private final ScanTaskGroup taskGroup; + private final Broadcast
tableBroadcast; + private final String expectedSchemaString; + private final boolean caseSensitive; + + private transient Schema expectedSchema = null; + private transient String[] preferredLocations = null; + + SparkInputPartition( + ScanTaskGroup taskGroup, + Broadcast
tableBroadcast, + String expectedSchemaString, + boolean caseSensitive, + boolean localityPreferred) { + this.taskGroup = taskGroup; + this.tableBroadcast = tableBroadcast; + this.expectedSchemaString = expectedSchemaString; + this.caseSensitive = caseSensitive; + if (localityPreferred) { + Table table = tableBroadcast.value(); + this.preferredLocations = Util.blockLocations(table.io(), taskGroup); + } else { + this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE; + } + } + + @Override + public String[] preferredLocations() { + return preferredLocations; + } + + @SuppressWarnings("unchecked") + public ScanTaskGroup taskGroup() { + return (ScanTaskGroup) taskGroup; + } + + public Table table() { + return tableBroadcast.value(); + } + + public boolean isCaseSensitive() { + return caseSensitive; + } + + public Schema expectedSchema() { + if (expectedSchema == null) { + this.expectedSchema = SchemaParser.fromJson(expectedSchemaString); + } + return expectedSchema; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index d3c299aa8b2d..972988b6b2c2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -47,7 +47,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.source.SparkScan.ReadTask; import org.apache.iceberg.spark.source.SparkScan.ReaderFactory; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; @@ -152,22 +151,23 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { List combinedScanTasks = Lists.newArrayList( TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); - InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; - Tasks.range(readTasks.length) + InputPartition[] partitions = new InputPartition[combinedScanTasks.size()]; + + Tasks.range(partitions.length) .stopOnFailure() .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null) .run( index -> - readTasks[index] = - new ReadTask( + partitions[index] = + new SparkInputPartition( combinedScanTasks.get(index), tableBroadcast, expectedSchema, caseSensitive, localityPreferred)); - return readTasks; + return partitions; } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 8dc2b47d2774..56a9b63a8759 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -18,22 +18,15 @@ */ package org.apache.iceberg.spark.source; -import java.io.Serializable; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; @@ -44,7 +37,6 @@ import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.PropertyUtil; -import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.metric.CustomMetric; @@ -178,20 +170,22 @@ static class ReaderFactory implements PartitionReaderFactory { @Override public PartitionReader createReader(InputPartition partition) { - if (partition instanceof ReadTask) { - return new RowReader((ReadTask) partition); - } else { - throw new UnsupportedOperationException("Incorrect input partition type: " + partition); - } + Preconditions.checkArgument( + partition instanceof SparkInputPartition, + "Unknown input partition type: %s", + partition.getClass().getName()); + + return new RowReader((SparkInputPartition) partition); } @Override public PartitionReader createColumnarReader(InputPartition partition) { - if (partition instanceof ReadTask) { - return new BatchReader((ReadTask) partition, batchSize); - } else { - throw new UnsupportedOperationException("Incorrect input partition type: " + partition); - } + Preconditions.checkArgument( + partition instanceof SparkInputPartition, + "Unknown input partition type: %s", + partition.getClass().getName()); + + return new BatchReader((SparkInputPartition) partition, batchSize); } @Override @@ -201,13 +195,19 @@ public boolean supportColumnarReads(InputPartition partition) { } private static class RowReader extends RowDataReader implements PartitionReader { - private long numSplits; + private static final Logger LOG = LoggerFactory.getLogger(RowReader.class); + + private final long numSplits; + + RowReader(SparkInputPartition partition) { + super( + partition.taskGroup(), + partition.table(), + partition.expectedSchema(), + partition.isCaseSensitive()); - RowReader(ReadTask task) { - super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); - numSplits = task.task.files().size(); - LOG.debug( - "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name()); + numSplits = partition.taskGroup().tasks().size(); + LOG.debug("Reading {} file split(s) for table {}", numSplits, partition.table().name()); } @Override @@ -220,15 +220,21 @@ public CustomTaskMetric[] currentMetricsValues() { private static class BatchReader extends BatchDataReader implements PartitionReader { - private long numSplits; - - BatchReader(ReadTask task, int batchSize) { - super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); - numSplits = task.task.files().size(); - LOG.debug( - "Reading {} file split(s) for table {} using BatchReader", - numSplits, - task.table().name()); + + private static final Logger LOG = LoggerFactory.getLogger(BatchReader.class); + + private final long numSplits; + + BatchReader(SparkInputPartition partition, int batchSize) { + super( + partition.taskGroup(), + partition.table(), + partition.expectedSchema(), + partition.isCaseSensitive(), + batchSize); + + numSplits = partition.taskGroup().tasks().size(); + LOG.debug("Reading {} file split(s) for table {}", numSplits, partition.table().name()); } @Override @@ -238,56 +244,4 @@ public CustomTaskMetric[] currentMetricsValues() { }; } } - - static class ReadTask implements InputPartition, Serializable { - private final CombinedScanTask task; - private final Broadcast
tableBroadcast; - private final String expectedSchemaString; - private final boolean caseSensitive; - - private transient Schema expectedSchema = null; - private transient String[] preferredLocations = null; - - ReadTask( - CombinedScanTask task, - Broadcast
tableBroadcast, - String expectedSchemaString, - boolean caseSensitive, - boolean localityPreferred) { - this.task = task; - this.tableBroadcast = tableBroadcast; - this.expectedSchemaString = expectedSchemaString; - this.caseSensitive = caseSensitive; - if (localityPreferred) { - Table table = tableBroadcast.value(); - this.preferredLocations = Util.blockLocations(table.io(), task); - } else { - this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE; - } - } - - @Override - public String[] preferredLocations() { - return preferredLocations; - } - - public Collection files() { - return task.files(); - } - - public Table table() { - return tableBroadcast.value(); - } - - public boolean isCaseSensitive() { - return caseSensitive; - } - - private Schema expectedSchema() { - if (expectedSchema == null) { - this.expectedSchema = SchemaParser.fromJson(expectedSchemaString); - } - return expectedSchema; - } - } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 21c34ed6f628..b291a8e2679e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.iceberg.IncrementalChangelogScan; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -238,6 +239,38 @@ public Scan build() { return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions); } + public Scan buildChangelogScan() { + Preconditions.checkArgument( + readConf.snapshotId() == null && readConf.asOfTimestamp() == null, + "Cannot set neither %s nor %s for changelogs", + SparkReadOptions.SNAPSHOT_ID, + SparkReadOptions.AS_OF_TIMESTAMP); + + Long startSnapshotId = readConf.startSnapshotId(); + Long endSnapshotId = readConf.endSnapshotId(); + + Schema expectedSchema = schemaWithMetadataColumns(); + + IncrementalChangelogScan scan = + table + .newIncrementalChangelogScan() + .caseSensitive(caseSensitive) + .filter(filterExpression()) + .project(expectedSchema); + + if (startSnapshotId != null) { + scan = scan.fromSnapshotExclusive(startSnapshotId); + } + + if (endSnapshotId != null) { + scan = scan.toSnapshot(endSnapshotId); + } + + scan = configureSplitPlanning(scan); + + return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions); + } + public Scan buildMergeOnReadScan() { Preconditions.checkArgument( readConf.snapshotId() == null && readConf.asOfTimestamp() == null, @@ -306,8 +339,8 @@ public Scan buildCopyOnWriteScan() { spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions); } - private TableScan configureSplitPlanning(TableScan scan) { - TableScan configuredScan = scan; + private > T configureSplitPlanning(T scan) { + T configuredScan = scan; Long splitSize = readConf.splitSizeOption(); if (splitSize != null) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 662d85eaa7dc..8127c057bed3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -148,6 +148,14 @@ public String name() { return icebergTable.toString(); } + public Long snapshotId() { + return snapshotId; + } + + public SparkTable copyWithSnapshotId(long newSnapshotId) { + return new SparkTable(icebergTable, newSnapshotId, refreshEagerly); + } + private Schema snapshotSchema() { return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java index e3699eaeded1..0abfd79d5ddb 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java @@ -46,7 +46,7 @@ public void testSparkCatalogTable() throws Exception { Map options = Maps.newHashMap(); Transform[] transforms = {}; cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options); - SparkTable table = cat.loadTable(id); + SparkTable table = (SparkTable) cat.loadTable(id); spark.sql("INSERT INTO mycat.default.table VALUES (1,1,1)"); @@ -76,7 +76,7 @@ public void testSparkCatalogNamedHadoopTable() throws Exception { Map options = Maps.newHashMap(); Transform[] transforms = {}; cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options); - SparkTable table = cat.loadTable(id); + SparkTable table = (SparkTable) cat.loadTable(id); spark.sql("INSERT INTO hadoop.default.table VALUES (1,1,1)"); @@ -106,7 +106,7 @@ public void testSparkCatalogNamedHiveTable() throws Exception { Map options = Maps.newHashMap(); Transform[] transforms = {}; cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options); - SparkTable table = cat.loadTable(id); + SparkTable table = (SparkTable) cat.loadTable(id); spark.sql("INSERT INTO hive.default.table VALUES (1,1,1)"); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java index f58451296cef..5baf6071233d 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java @@ -71,8 +71,9 @@ public void after() { @Test public void testPathIdentifier() throws TableAlreadyExistsException, NoSuchTableException { SparkTable table = - sparkCatalog.createTable( - identifier, SparkSchemaUtil.convert(SCHEMA), new Transform[0], ImmutableMap.of()); + (SparkTable) + sparkCatalog.createTable( + identifier, SparkSchemaUtil.convert(SCHEMA), new Transform[0], ImmutableMap.of()); Assert.assertEquals(table.table().location(), tableLocation.getAbsolutePath()); Assertions.assertThat(table.table()).isInstanceOf(BaseTable.class); From ca1993056c4abdf64ffe322c99db5fbddcf22e05 Mon Sep 17 00:00:00 2001 From: aokolnychyi Date: Fri, 7 Oct 2022 08:25:38 -0700 Subject: [PATCH 2/2] Minor things --- .../iceberg/spark/source/SparkChangelogBatch.java | 12 ++++++------ .../iceberg/spark/source/SparkInputPartition.java | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java index 77923145d976..38e25e99897f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java @@ -38,7 +38,7 @@ import org.apache.spark.sql.connector.read.PartitionReader; import org.apache.spark.sql.connector.read.PartitionReaderFactory; -public class SparkChangelogBatch implements Batch { +class SparkChangelogBatch implements Batch { private final JavaSparkContext sparkContext; private final Table table; @@ -46,7 +46,7 @@ public class SparkChangelogBatch implements Batch { private final Schema expectedSchema; private final boolean caseSensitive; private final boolean localityEnabled; - private final int semanticBatchId; + private final int scanHashCode; SparkChangelogBatch( SparkSession spark, @@ -54,14 +54,14 @@ public class SparkChangelogBatch implements Batch { SparkReadConf readConf, List> taskGroups, Schema expectedSchema, - int semanticBatchId) { + int scanHashCode) { this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.taskGroups = taskGroups; this.expectedSchema = expectedSchema; this.caseSensitive = readConf.caseSensitive(); this.localityEnabled = readConf.localityEnabled(); - this.semanticBatchId = semanticBatchId; + this.scanHashCode = scanHashCode; } @Override @@ -104,12 +104,12 @@ public boolean equals(Object o) { } SparkChangelogBatch that = (SparkChangelogBatch) o; - return table.name().equals(that.table.name()) && semanticBatchId == that.semanticBatchId; + return table.name().equals(that.table.name()) && scanHashCode == that.scanHashCode; } @Override public int hashCode() { - return Objects.hash(table.name(), semanticBatchId); + return Objects.hash(table.name(), scanHashCode); } private static class ReaderFactory implements PartitionReaderFactory { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java index 4d5164dc0633..7786ee0eaf41 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java @@ -78,6 +78,7 @@ public Schema expectedSchema() { if (expectedSchema == null) { this.expectedSchema = SchemaParser.fromJson(expectedSchemaString); } + return expectedSchema; } }