diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 1b8092639b9b..09956293d5fd 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -80,7 +80,8 @@ static StructType getType(StructType partitionType) { UPPER_BOUNDS, KEY_METADATA, SPLIT_OFFSETS, - EQUALITY_IDS + EQUALITY_IDS, + ManifestFile.SPEC_ID.asOptional() ); } diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java index 7bad98ccc9f4..16515ebd504a 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java +++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java @@ -19,6 +19,7 @@ package org.apache.iceberg.expressions; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Comparator; @@ -45,7 +46,7 @@ * rows and false if the file cannot contain matching rows. Files may be skipped if and only if the * return value of {@code eval} is false. */ -public class InclusiveMetricsEvaluator { +public class InclusiveMetricsEvaluator implements Serializable { private final Expression expr; public InclusiveMetricsEvaluator(Schema schema, Expression unbound) { diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 1d746717d3ab..b407c9f61b73 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -100,6 +100,10 @@ public PartitionData copy() { found = true; fromProjectionPos[i] = j; } + if (fields.get(i).fieldId() == ManifestFile.SPEC_ID.fieldId()) { + found = true; + fromProjectionPos[i] = 14; + } } if (!found) { @@ -255,6 +259,9 @@ public void put(int i, Object value) { case 13: this.equalityIds = ArrayUtil.toIntArray((List) value); return; + case 14: + this.partitionSpecId = (value != null) ? (Integer) value : -1; + return; default: // ignore the object, it must be from a newer version of the format } @@ -301,6 +308,8 @@ public Object get(int i) { return splitOffsets(); case 13: return equalityFieldIds(); + case 14: + return partitionSpecId; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index db0c18f217d6..86134a96bfa4 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -19,10 +19,6 @@ package org.apache.iceberg; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -47,7 +43,6 @@ abstract class BaseTableScan implements TableScan { private static final Logger LOG = LoggerFactory.getLogger(TableScan.class); - private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private final TableOperations ops; private final Table table; @@ -65,6 +60,10 @@ protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableSc this.context = context; } + protected TableScanContext tableScanContext() { + return context; + } + protected TableOperations tableOps() { return ops; } @@ -85,7 +84,7 @@ protected Collection selectedColumns() { return context.selectedColumns(); } - protected Map options() { + public Map options() { return context.options(); } @@ -105,6 +104,14 @@ protected abstract CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats); + public Long fromSnapshotId() { + return context().fromSnapshotId(); + } + + public Long toSnapshotId() { + return context.toSnapshotId(); + } + @Override public Table table() { return table; @@ -145,7 +152,7 @@ public TableScan asOfTime(long timestampMillis) { // the snapshot ID could be null if no entries were older than the requested time. in that case, // there is no valid snapshot to read. Preconditions.checkArgument(lastSnapshotId != null, - "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis)); + "Cannot find a snapshot older than %s", TableScanUtil.formatTimestampMillis(timestampMillis)); return useSnapshot(lastSnapshotId); } @@ -202,7 +209,7 @@ public CloseableIterable planFiles() { Snapshot snapshot = snapshot(); if (snapshot != null) { LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table, - snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()), + snapshot.snapshotId(), TableScanUtil.formatTimestampMillis(snapshot.timestampMillis()), context.rowFilter()); Listeners.notifyAll( @@ -307,8 +314,4 @@ private Schema lazyColumnProjection() { return schema; } - - private static String formatTimestampMillis(long millis) { - return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.systemDefault())); - } } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 9a6b85f652b8..1d7d8e43babf 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -94,4 +94,9 @@ protected long targetSplitSize(TableOperations ops) { return ops.current().propertyAsLong( TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT); } + + @Override + public TableScanContext tableScanContext() { + return super.tableScanContext(); + } } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 7605e1c3548b..a84f2fed98d7 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -62,7 +62,8 @@ * Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or * {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file. */ -class DeleteFileIndex { +public class DeleteFileIndex { + private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; private final Map specsById; private final Map partitionTypeById; private final Map> wrapperById; @@ -100,7 +101,10 @@ DeleteFile[] forEntry(ManifestEntry entry) { return forDataFile(entry.sequenceNumber(), entry.file()); } - DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { + public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { + if (isEmpty()) { + return EMPTY_DELETES; + } Pair partition = partition(file.specId(), file.partition()); Pair partitionDeletes = sortedDeletesByPartition.get(partition); @@ -305,11 +309,11 @@ private static Stream limitBySequenceNumber(long sequenceNumber, lon return Arrays.stream(files, start, files.length); } - static Builder builderFor(FileIO io, Iterable deleteManifests) { + public static Builder builderFor(FileIO io, Iterable deleteManifests) { return new Builder(io, Sets.newHashSet(deleteManifests)); } - static class Builder { + public static class Builder { private final FileIO io; private final Set deleteManifests; private Map specsById = null; @@ -318,37 +322,37 @@ static class Builder { private boolean caseSensitive = true; private ExecutorService executorService = null; - Builder(FileIO io, Set deleteManifests) { + public Builder(FileIO io, Set deleteManifests) { this.io = io; this.deleteManifests = Sets.newHashSet(deleteManifests); } - Builder specsById(Map newSpecsById) { + public Builder specsById(Map newSpecsById) { this.specsById = newSpecsById; return this; } - Builder filterData(Expression newDataFilter) { + public Builder filterData(Expression newDataFilter) { this.dataFilter = Expressions.and(dataFilter, newDataFilter); return this; } - Builder filterPartitions(Expression newPartitionFilter) { + public Builder filterPartitions(Expression newPartitionFilter) { this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter); return this; } - Builder caseSensitive(boolean newCaseSensitive) { + public Builder caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; return this; } - Builder planWith(ExecutorService newExecutorService) { + public Builder planWith(ExecutorService newExecutorService) { this.executorService = newExecutorService; return this; } - DeleteFileIndex build() { + public DeleteFileIndex build() { // read all of the matching delete manifests in parallel and accumulate the matching files in a queue Queue> deleteEntries = new ConcurrentLinkedQueue<>(); Tasks.foreach(deleteManifestReaders()) diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index 46756b0830a3..ad20f5c45917 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -25,7 +25,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; @@ -34,7 +33,7 @@ class IncrementalDataTableScan extends DataTableScan { IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) { super(ops, table, schema, context.useSnapshotId(null)); - validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId()); + SnapshotUtil.validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId()); } @Override @@ -53,7 +52,7 @@ public TableScan useSnapshot(long scanSnapshotId) { @Override public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) { - validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId); + SnapshotUtil.validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId, table(), context()); return new IncrementalDataTableScan(tableOps(), table(), schema(), context().fromSnapshotId(newFromSnapshotId).toSnapshotId(newToSnapshotId)); } @@ -69,7 +68,7 @@ public TableScan appendsAfter(long newFromSnapshotId) { @Override public CloseableIterable planFiles() { // TODO publish an incremental appends scan event - List snapshots = snapshotsWithin(table(), + List snapshots = SnapshotUtil.snapshotsWithin(table(), context().fromSnapshotId(), context().toSnapshotId()); Set snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId)); Set manifests = FluentIterable @@ -106,45 +105,5 @@ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema sche return new IncrementalDataTableScan(ops, table, schema, context); } - private static List snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) { - List snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId); - List snapshots = Lists.newArrayList(); - for (Long snapshotId : snapshotIds) { - Snapshot snapshot = table.snapshot(snapshotId); - // for now, incremental scan supports only appends - if (snapshot.operation().equals(DataOperations.APPEND)) { - snapshots.add(snapshot); - } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) { - throw new UnsupportedOperationException( - String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]", - DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId)); - } - } - return snapshots; - } - private void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId) { - Set snapshotIdsRange = Sets.newHashSet( - SnapshotUtil.snapshotIdsBetween(table(), context().fromSnapshotId(), context().toSnapshotId())); - // since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId] - snapshotIdsRange.add(context().fromSnapshotId()); - Preconditions.checkArgument( - snapshotIdsRange.contains(newFromSnapshotId), - "from snapshot id %s not in existing snapshot ids range (%s, %s]", - newFromSnapshotId, context().fromSnapshotId(), newToSnapshotId); - Preconditions.checkArgument( - snapshotIdsRange.contains(newToSnapshotId), - "to snapshot id %s not in existing snapshot ids range (%s, %s]", - newToSnapshotId, context().fromSnapshotId(), context().toSnapshotId()); - } - - private static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) { - Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "from and to snapshot ids cannot be the same"); - Preconditions.checkArgument( - table.snapshot(fromSnapshotId) != null, "from snapshot %s does not exist", fromSnapshotId); - Preconditions.checkArgument( - table.snapshot(toSnapshotId) != null, "to snapshot %s does not exist", toSnapshotId); - Preconditions.checkArgument(SnapshotUtil.ancestorOf(table, toSnapshotId, fromSnapshotId), - "from snapshot %s is not an ancestor of to snapshot %s", fromSnapshotId, toSnapshotId); - } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 262cd92223fa..855494e442fd 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -102,8 +102,8 @@ protected long targetSplitSize(TableOperations ops) { protected CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { - // return entries from both data and delete manifests - CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.allManifests()); + // Only return Data Manifests, TODO Handle Delete Manifests + CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.dataManifests()); Type fileProjection = schema().findType("data_file"); Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); diff --git a/core/src/main/java/org/apache/iceberg/ScanTasks.java b/core/src/main/java/org/apache/iceberg/ScanTasks.java new file mode 100644 index 000000000000..42e434b3a942 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ScanTasks.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.expressions.ResidualEvaluator; + +public class ScanTasks { + + /** + * Utilty class no public constructor + */ + private ScanTasks() { + } + + public static BaseFileScanTask createBaseFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString, + String specString, ResidualEvaluator residuals) { + return new BaseFileScanTask(file, deletes, schemaString, specString, residuals); + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index dcf7d9753530..a2a49262df4e 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -29,7 +29,7 @@ /** * Context object with optional arguments for a TableScan. */ -final class TableScanContext { +public final class TableScanContext { private final Long snapshotId; private final Expression rowFilter; private final boolean ignoreResiduals; @@ -41,7 +41,7 @@ final class TableScanContext { private final Long fromSnapshotId; private final Long toSnapshotId; - TableScanContext() { + public TableScanContext() { this.snapshotId = null; this.rowFilter = Expressions.alwaysTrue(); this.ignoreResiduals = false; @@ -70,7 +70,7 @@ private TableScanContext(Long snapshotId, Expression rowFilter, boolean ignoreRe this.toSnapshotId = toSnapshotId; } - Long snapshotId() { + public Long snapshotId() { return snapshotId; } @@ -79,7 +79,7 @@ TableScanContext useSnapshotId(Long scanSnapshotId) { caseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - Expression rowFilter() { + public Expression rowFilter() { return rowFilter; } @@ -88,7 +88,7 @@ TableScanContext filterRows(Expression filter) { caseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - boolean ignoreResiduals() { + public boolean ignoreResiduals() { return ignoreResiduals; } @@ -97,7 +97,7 @@ TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) { caseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - boolean caseSensitive() { + public boolean caseSensitive() { return caseSensitive; } @@ -106,7 +106,7 @@ TableScanContext setCaseSensitive(boolean isCaseSensitive) { isCaseSensitive, colStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - boolean returnColumnStats() { + public boolean returnColumnStats() { return colStats; } @@ -115,7 +115,7 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { caseSensitive, returnColumnStats, projectedSchema, selectedColumns, options, fromSnapshotId, toSnapshotId); } - Collection selectedColumns() { + public Collection selectedColumns() { return selectedColumns; } @@ -125,7 +125,7 @@ TableScanContext selectColumns(Collection columns) { caseSensitive, colStats, null, columns, options, fromSnapshotId, toSnapshotId); } - Schema projectedSchema() { + public Schema projectedSchema() { return projectedSchema; } @@ -135,7 +135,7 @@ TableScanContext project(Schema schema) { caseSensitive, colStats, schema, null, options, fromSnapshotId, toSnapshotId); } - Map options() { + public Map options() { return options; } @@ -147,7 +147,7 @@ TableScanContext withOption(String property, String value) { caseSensitive, colStats, projectedSchema, selectedColumns, builder.build(), fromSnapshotId, toSnapshotId); } - Long fromSnapshotId() { + public Long fromSnapshotId() { return fromSnapshotId; } @@ -156,7 +156,7 @@ TableScanContext fromSnapshotId(long id) { caseSensitive, colStats, projectedSchema, selectedColumns, options, id, toSnapshotId); } - Long toSnapshotId() { + public Long toSnapshotId() { return toSnapshotId; } diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 85f08e057340..a3836e5397a9 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -20,13 +20,18 @@ package org.apache.iceberg.util; import java.util.List; +import java.util.Set; import java.util.function.Function; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataOperations; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScanContext; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class SnapshotUtil { private SnapshotUtil() { @@ -107,4 +112,70 @@ public static List newFiles(Long baseSnapshotId, long latestSnapshotId return newFiles; } + + public static List snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) { + List snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId); + List snapshots = Lists.newArrayList(); + for (Long snapshotId : snapshotIds) { + Snapshot snapshot = table.snapshot(snapshotId); + // for now, incremental scan supports only appends + if (snapshot.operation().equals(DataOperations.APPEND)) { + snapshots.add(snapshot); + } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) { + throw new UnsupportedOperationException( + String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]", + DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId)); + } + } + return snapshots; + } + + /** + * Checks whether or not the bounds presented in the form of snapshotIds both reside within the table in question + * and that there exists a valid set of snapshots between them. Uses the passed in scan context to verify that this + * range also resides within any previously defined snapshot range in the scan. Throws exceptions if the arguments + * passed cannot be used to generate a legitimate snapshot range within the previously defined range. + * <p> + * Used for validating incremental scan parameters + * + * @param newFromSnapshotId beginning of snapshot range + * @param newToSnapshotId end of snapshot range + * @param table containing the snapshots we are building a range for + * @param context containing current scan restrictions + */ + public static void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId, Table table, + TableScanContext context) { + Set snapshotIdsRange = Sets.newHashSet( + SnapshotUtil.snapshotIdsBetween(table, context.fromSnapshotId(), context.toSnapshotId())); + // since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId] + snapshotIdsRange.add(context.fromSnapshotId()); + Preconditions.checkArgument( + snapshotIdsRange.contains(newFromSnapshotId), + "from snapshot id %s not in existing snapshot ids range (%s, %s]", + newFromSnapshotId, context.fromSnapshotId(), newToSnapshotId); + Preconditions.checkArgument( + snapshotIdsRange.contains(newToSnapshotId), + "to snapshot id %s not in existing snapshot ids range (%s, %s]", + newToSnapshotId, context.fromSnapshotId(), context.toSnapshotId()); + } + + /** + * Validates whether two snapshots represent the beginning and end of a continuous range of snapshots in a given + * table. Throws exceptions if this is not the case. + * <p> + * Used for validating incremental scan parameters + * + * @param table containing snapshots + * @param fromSnapshotId beginning of snapshot range + * @param toSnapshotId end of snapshot range + */ + public static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) { + Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "from and to snapshot ids cannot be the same"); + Preconditions.checkArgument( + table.snapshot(fromSnapshotId) != null, "from snapshot %s does not exist", fromSnapshotId); + Preconditions.checkArgument( + table.snapshot(toSnapshotId) != null, "to snapshot %s does not exist", toSnapshotId); + Preconditions.checkArgument(SnapshotUtil.ancestorOf(table, toSnapshotId, fromSnapshotId), + "from snapshot %s is not an ancestor of to snapshot %s", fromSnapshotId, toSnapshotId); + } } diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index a9c60df32cdd..c55d13397c8a 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -19,6 +19,10 @@ package org.apache.iceberg.util; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.function.Function; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; @@ -57,4 +61,10 @@ public static CloseableIterable planTasks(CloseableIterable> { + private static final Logger LOG = LoggerFactory.getLogger(PlanScanAction.class); + + public enum PlanMode { + LOCAL, + DISTRIBUTED + } + + public static final String ICEBERG_PLAN_MODE = "plan_mode"; + public static final String ICEBERG_TEST_PLAN_MODE = "test_plan_mode"; + + public static PlanMode parsePlanMode(String mode) { + try { + return PlanMode.valueOf(mode.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException(String.format("Cannot use planning mode %s, Available modes are: %s", mode, + Arrays.toString(PlanMode.values()))); + } + } + + private final Table table; + private final SparkSession spark; + private final JavaSparkContext sparkContext; + private final TableOperations ops; + private final Schema schema; + + private TableScanContext context; + private Snapshot lazySnapshot; + + public PlanScanAction(SparkSession spark, Table table) { + this.table = table; + this.spark = spark; + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.schema = table.schema(); + this.ops = ((HasTableOperations) table).operations(); + this.context = new TableScanContext(); + } + + public PlanScanAction withContext(TableScanContext newContext) { + this.context = newContext; + return this; + } + + @Override + protected Table table() { + return table; + } + + @Override + public CloseableIterable execute() { + LOG.info("Preparing distributed planning of scan for {} snapshot {} created at {} with filter {}", + table, snapshot().snapshotId(), TableScanUtil.formatTimestampMillis(snapshot().timestampMillis()), + context.rowFilter()); + long start = System.currentTimeMillis(); + CloseableIterable result = planTasks(); + long elapsed = System.currentTimeMillis() - start; + LOG.info("Planning complete. Took {} ms", elapsed); + return result; + } + + protected CloseableIterable planTasks() { + Map options = context.options(); + long splitSize; + if (options.containsKey(TableProperties.SPLIT_SIZE)) { + splitSize = Long.parseLong(options.get(TableProperties.SPLIT_SIZE)); + } else { + splitSize = ops.current().propertyAsLong(TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT); + } + int lookback; + if (options.containsKey(TableProperties.SPLIT_LOOKBACK)) { + lookback = Integer.parseInt(options.get(TableProperties.SPLIT_LOOKBACK)); + } else { + lookback = ops.current().propertyAsInt( + TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT); + } + long openFileCost; + if (options.containsKey(TableProperties.SPLIT_OPEN_FILE_COST)) { + openFileCost = Long.parseLong(options.get(TableProperties.SPLIT_OPEN_FILE_COST)); + } else { + openFileCost = ops.current().propertyAsLong( + TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + } + + CloseableIterable fileScanTasks = planFiles(); + CloseableIterable splitFiles = TableScanUtil.splitFiles(fileScanTasks, splitSize); + return TableScanUtil.planTasks(splitFiles, splitSize, lookback, openFileCost); + } + + private Snapshot snapshot() { + if (lazySnapshot == null) { + lazySnapshot = context.snapshotId() != null ? + ops.current().snapshot(context.snapshotId()) : + ops.current().currentSnapshot(); + } + return lazySnapshot; + } + + public CloseableIterable planFiles() { + // Create a dataframe of all DataFile entries + String dataFilesMetadataTable = metadataTableName(MetadataTableType.ENTRIES); + Dataset manifestEntries = + spark.read() + .format("iceberg") + .option("snapshot-id", snapshot().snapshotId()) + .load(dataFilesMetadataTable); + + // Todo pushdown filters to ManifestEntriesTable + // Read entries which are not deleted and are datafiles and not delete files + Dataset dataFileEntries = manifestEntries + .filter(manifestEntries.col("data_file").getField(DataFile.CONTENT.name()).equalTo(0)) // Only DataFiles + .filter(manifestEntries.col("status").notEqual(2)); // No Deleted Files + + dataFileEntries = handleIncrementalScan(dataFileEntries); + + // Build up evaluators and filters for Metrics and Partition values + Expression scanFilter = context.rowFilter(); + boolean isCaseSensitive = context.caseSensitive(); + + // Build cache of partition evaluators + Broadcast> broadcastPartitionEvaluators = buildPartitionEvaluators(); + + // Build metric evaluators + Broadcast broadcastMetricsEvaluator = sparkContext.broadcast( + new InclusiveMetricsEvaluator(schema, scanFilter, isCaseSensitive)); + + // Cache residual information and Partition spec information + Types.StructType partitionStruct = DataFile.getType(table().spec().partitionType()); + StructType dataFileSchema = (StructType) dataFileEntries.schema().apply("data_file").dataType(); + + // Evaluate all files based on their partition info and collect the rows back locally + Dataset scanTaskDataset = dataFileEntries + .mapPartitions( + (MapPartitionsFunction) it -> { + SparkDataFile container = new SparkDataFile(partitionStruct, dataFileSchema); + return Streams.stream(it) + .filter(row -> { + Row dataFile = row.getAs("data_file"); + SparkDataFile file = container.wrap(dataFile); + return broadcastPartitionEvaluators.getValue().get(file.specId()).eval(file.partition()) && + broadcastMetricsEvaluator.getValue().eval(file); + }).iterator(); + }, RowEncoder.apply(dataFileEntries.schema())); + + LoadingCache specCache = buildSpecCache(); + + // Build delete index locally + DeleteFileIndex deleteFileIndex = buildDeleteFileIndex(); + + SparkDataFile container = new SparkDataFile(partitionStruct, dataFileSchema); + List tasks = scanTaskDataset.collectAsList().stream().map(row -> { + Row dataFile = row.getAs("data_file"); + SparkDataFile file = container.wrap(dataFile); + DeleteFile[] deletes = + deleteFileIndex.forDataFile(row.getAs("sequence_number"), file); + SpecCacheEntry cached = specCache.get(file.specId()); + return (FileScanTask) ScanTasks + .createBaseFileScanTask(file.copy(), deletes, cached.schemaString, cached.specString, cached.residuals); + }).collect(Collectors.toList()); + + return CloseableIterable.withNoopClose(tasks); + } + + private Dataset handleIncrementalScan(Dataset dataFileEntries) { + if (context.fromSnapshotId() != null) { + LOG.debug("Planning incremental scan from {} to {}", context.fromSnapshotId(), context.toSnapshotId()); + List snapshots = SnapshotUtil.snapshotsWithin(table, context.fromSnapshotId(), context.toSnapshotId()); + List validSnapshotIds = snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toList()); + return dataFileEntries + .filter(dataFileEntries.col("snapshot_id").isin(validSnapshotIds.toArray())) + .filter(dataFileEntries.col("status").equalTo(1)); // Added files only + } else { + return dataFileEntries; + } + } + + private LoadingCache buildSpecCache() { + return Caffeine.newBuilder().build((CacheLoader & Serializable) specId -> { + PartitionSpec spec = table().specs().get(specId); + Expression filter = context.ignoreResiduals() ? Expressions.alwaysTrue() : context.rowFilter(); + return new SpecCacheEntry(SchemaParser.toJson(spec.schema()), PartitionSpecParser.toJson(spec), + ResidualEvaluator.of(spec, filter, context.caseSensitive())); + }); + } + + private Broadcast> buildPartitionEvaluators() { + ImmutableMap.Builder evalMapBuilder = ImmutableMap.builder(); + boolean caseSensitive = context.caseSensitive(); + Expression filter = context.rowFilter(); + table.specs().entrySet().forEach(entry -> + evalMapBuilder.put(entry.getKey(), + new Evaluator(entry.getValue().partitionType(), + Projections.inclusive(entry.getValue(), caseSensitive).project(filter)))); + + Map partitionEvaluatorsById = evalMapBuilder.build(); + return sparkContext.broadcast(partitionEvaluatorsById); + } + + + private DeleteFileIndex buildDeleteFileIndex() { + // Build delete index locally + List deleteManifests = snapshot().deleteManifests(); + DeleteFileIndex.Builder deleteFileIndexBuilder = DeleteFileIndex.builderFor(table.io(), deleteManifests); + deleteFileIndexBuilder.caseSensitive(context.caseSensitive()); + deleteFileIndexBuilder.specsById(table.specs()); + deleteFileIndexBuilder.filterData(context.rowFilter()); + return deleteFileIndexBuilder.build(); + } + + private static class SpecCacheEntry implements Serializable { + private final String schemaString; + private final String specString; + private final ResidualEvaluator residuals; + + SpecCacheEntry(String schemaString, String specString, ResidualEvaluator residuals) { + this.schemaString = schemaString; + this.specString = specString; + this.residuals = residuals; + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 48dd00124273..a0f6c2c4c6e8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; import java.util.Locale; @@ -32,7 +33,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; -public class SparkDataFile implements DataFile { +public class SparkDataFile implements DataFile, Serializable { private final int filePathPosition; private final int fileFormatPosition; @@ -46,18 +47,38 @@ public class SparkDataFile implements DataFile { private final int upperBoundsPosition; private final int keyMetadataPosition; private final int splitOffsetsPosition; + private final int specIdPosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; private final SparkStructLike wrappedPartition; + private final Types.StructType partitionStruct; private Row wrapped; + private static final StructLike EMPTY_PARTITION_INFO = new StructLike() { + @Override + public int size() { + return 0; + } + + @Override + public T get(int pos, Class javaClass) { + throw new UnsupportedOperationException("Cannot get a value from an empty partition"); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot set a value in an empty partition"); + } + }; + public SparkDataFile(Types.StructType type, StructType sparkType) { this.lowerBoundsType = type.fieldType("lower_bounds"); this.upperBoundsType = type.fieldType("upper_bounds"); this.keyMetadataType = type.fieldType("key_metadata"); - this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType()); + this.partitionStruct = type.fieldType("partition").asStructType(); + this.wrappedPartition = new SparkStructLike(partitionStruct); Map positions = Maps.newHashMap(); type.fields().forEach(field -> { @@ -77,19 +98,47 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { upperBoundsPosition = positions.get("upper_bounds"); keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); + specIdPosition = positions.get("partition_spec_id"); + } + + private SparkDataFile(SparkDataFile other) { + this.lowerBoundsType = other.lowerBoundsType; + this.upperBoundsType = other.upperBoundsType; + this.keyMetadataType = other.keyMetadataType; + this.wrappedPartition = new SparkStructLike(other.partitionStruct); + this.filePathPosition = other.filePathPosition; + this.fileFormatPosition = other.fileFormatPosition; + this.partitionPosition = other.partitionPosition; + this.recordCountPosition = other.recordCountPosition; + this.fileSizeInBytesPosition = other.fileSizeInBytesPosition; + this.columnSizesPosition = other.columnSizesPosition; + this.valueCountsPosition = other.valueCountsPosition; + this.nullValueCountsPosition = other.nullValueCountsPosition; + this.lowerBoundsPosition = other.lowerBoundsPosition; + this.upperBoundsPosition = other.upperBoundsPosition; + this.keyMetadataPosition = other.keyMetadataPosition; + this.splitOffsetsPosition = other.splitOffsetsPosition; + this.specIdPosition = other.specIdPosition; + this.partitionStruct = other.partitionStruct; + this.wrap(other.wrapped.copy()); } public SparkDataFile wrap(Row row) { this.wrapped = row; if (wrappedPartition.size() > 0) { - this.wrappedPartition.wrap(row.getAs(partitionPosition)); + Row partition = row.getAs(partitionPosition); + this.wrappedPartition.wrap(partition); } return this; } @Override public int specId() { - return -1; + if (wrappedPartition.size() > 0) { + return wrapped.getAs(specIdPosition); + } else { + return 0; + } } @Override @@ -105,7 +154,11 @@ public FileFormat format() { @Override public StructLike partition() { - return wrappedPartition; + if (wrappedPartition.size() > 0) { + return wrappedPartition; + } else { + return EMPTY_PARTITION_INFO; + } } @Override @@ -152,7 +205,7 @@ public ByteBuffer keyMetadata() { @Override public DataFile copy() { - throw new UnsupportedOperationException("Not implemented: copy"); + return new SparkDataFile(this); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java b/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java index 30509e3381dc..7ba7a0956a9d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkStructLike.java @@ -19,11 +19,12 @@ package org.apache.iceberg.spark; +import java.io.Serializable; import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Row; -public class SparkStructLike implements StructLike { +public class SparkStructLike implements StructLike, Serializable { private final Types.StructType type; private Row wrapped; diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 9ac098cae376..da2e678bc7b5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hive.HiveCatalog; @@ -57,6 +58,7 @@ public static void startMetastoreAndSpark() { .master("local[2]") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") .enableHiveSupport() .getOrCreate(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index cf1b8c9b59a7..289b581e3ba5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -31,6 +31,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -47,9 +48,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +@RunWith(Parameterized.class) public abstract class TestDataSourceOptions { private static final Configuration CONF = new Configuration(); @@ -59,12 +63,26 @@ public abstract class TestDataSourceOptions { ); private static SparkSession spark = null; + private final PlanScanAction.PlanMode planMode; + + @Parameterized.Parameters(name = "Plan Mode = {0}") + public static Object[] parameters() { + return new Object[] {PlanScanAction.PlanMode.LOCAL, PlanScanAction.PlanMode.DISTRIBUTED}; + } + + public TestDataSourceOptions(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @Rule public TemporaryFolder temp = new TemporaryFolder(); @BeforeClass public static void startSpark() { - TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate(); + TestDataSourceOptions.spark = SparkSession + .builder() + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") + .master("local[2]").getOrCreate(); } @AfterClass @@ -163,6 +181,7 @@ public void testHadoopOptions() throws IOException { Dataset resultDf = spark.read() .format("iceberg") .option("hadoop.fs.default.name", "file:///") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List resultRecords = resultDf.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -197,6 +216,7 @@ public void testSplitOptionsOverridesTableProperties() throws IOException { Dataset resultDf = spark.read() .format("iceberg") .option("split-size", String.valueOf(611)) // 611 bytes is the size of SimpleRecord(1,"a") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); Assert.assertEquals("Spark partitions should match", 2, resultDf.javaRDD().getNumPartitions()); @@ -236,6 +256,7 @@ public void testIncrementalScanOptions() throws IOException { .format("iceberg") .option("snapshot-id", snapshotIds.get(3).toString()) .option("start-snapshot-id", snapshotIds.get(3).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation).explain(); }); @@ -249,6 +270,7 @@ public void testIncrementalScanOptions() throws IOException { .format("iceberg") .option("as-of-timestamp", Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis())) .option("end-snapshot-id", snapshotIds.get(2).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation).explain(); }); @@ -261,6 +283,7 @@ public void testIncrementalScanOptions() throws IOException { spark.read() .format("iceberg") .option("end-snapshot-id", snapshotIds.get(2).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation).explain(); }); @@ -268,6 +291,7 @@ public void testIncrementalScanOptions() throws IOException { List result = spark.read() .format("iceberg") .option("start-snapshot-id", snapshotIds.get(3).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation) .orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -279,6 +303,7 @@ public void testIncrementalScanOptions() throws IOException { .format("iceberg") .option("start-snapshot-id", snapshotIds.get(2).toString()) .option("end-snapshot-id", snapshotIds.get(1).toString()) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation) .orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -322,6 +347,7 @@ public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOExcept Dataset entriesDf = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation + "#entries"); Assert.assertEquals("Num partitions must match", 2, entriesDf.javaRDD().getNumPartitions()); @@ -329,6 +355,7 @@ public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOExcept entriesDf = spark.read() .format("iceberg") .option("split-size", String.valueOf(128 * 1024 * 1024)) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation + "#entries"); Assert.assertEquals("Num partitions must match", 1, entriesDf.javaRDD().getNumPartitions()); } @@ -359,6 +386,7 @@ public void testDefaultMetadataSplitSize() throws IOException { Dataset metadataDf = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation + "#entries"); int partitionNum = metadataDf.javaRDD().getNumPartitions(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java index b869d7f8dc9d..b63adbdec4bc 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java @@ -24,6 +24,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopTables; import org.junit.Before; @@ -35,6 +36,10 @@ public abstract class TestIcebergSourceHadoopTables extends TestIcebergSourceTab File tableDir = null; String tableLocation = null; + public TestIcebergSourceHadoopTables(PlanScanAction.PlanMode planMode) { + super(planMode); + } + @Before public void setupTable() throws Exception { this.tableDir = temp.newFolder(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index 70e920c8bd34..8493bb14bd13 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -25,6 +25,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.junit.After; @@ -34,6 +35,10 @@ public abstract class TestIcebergSourceHiveTables extends TestIcebergSourceTable private static TableIdentifier currentIdentifier; + public TestIcebergSourceHiveTables(PlanScanAction.PlanMode planMode) { + super(planMode); + } + @BeforeClass public static void start() { Namespace db = Namespace.of("db"); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 257042179c9b..2d74f3c219c0 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.Actions; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; @@ -53,10 +54,13 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +@RunWith(Parameterized.class) public abstract class TestIcebergSourceTablesBase extends SparkTestBase { private static final Schema SCHEMA = new Schema( @@ -64,6 +68,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private final PlanScanAction.PlanMode planMode; + + @Parameterized.Parameters(name = "Plan Mode = {0}") + public static Object[] parameters() { + return new Object[] {PlanScanAction.PlanMode.LOCAL, PlanScanAction.PlanMode.DISTRIBUTED}; + } + + public TestIcebergSourceTablesBase(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -93,6 +108,7 @@ public synchronized void testTablesSupport() { Dataset resultDf = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier)); List actualRecords = resultDf.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -119,6 +135,7 @@ public void testEntriesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "entries")) .collectAsList(); @@ -134,6 +151,7 @@ public void testEntriesTable() throws Exception { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(13, 0); // SpecId expected.add(row); }); } @@ -171,6 +189,7 @@ public void testAllEntriesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_entries")) .orderBy("snapshot_id") .collectAsList(); @@ -184,6 +203,7 @@ public void testAllEntriesTable() throws Exception { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(13, 0); // SpecId expected.add(row); }); } @@ -215,11 +235,15 @@ public void testCountEntriesTable() { // count entries Assert.assertEquals("Count should return " + expectedEntryCount, - expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries")).count()); + expectedEntryCount, spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) + .load(loadLocation(tableIdentifier, "entries")).count()); // count all_entries Assert.assertEquals("Count should return " + expectedEntryCount, - expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "all_entries")).count()); + expectedEntryCount, spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) + .load(loadLocation(tableIdentifier, "all_entries")).count()); } @Test @@ -248,6 +272,7 @@ public void testFilesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "files")) .collectAsList(); @@ -259,6 +284,7 @@ public void testFilesTable() throws Exception { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(14, 0); // SpecId expected.add(file); } } @@ -305,6 +331,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "files")) .collectAsList(); @@ -315,6 +342,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { for (GenericData.Record record : rows) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(14, 0); // SpecId expected.add(file); } } @@ -364,6 +392,7 @@ public void testEntriesTableWithSnapshotIdInheritance() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "entries")) .select("sequence_number", "snapshot_id", "data_file") .collectAsList(); @@ -411,6 +440,7 @@ public void testFilesUnpartitionedTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "files")) .collectAsList(); @@ -422,6 +452,7 @@ public void testFilesUnpartitionedTable() throws Exception { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(13, 0); // SpecId expected.add(file); } } @@ -456,16 +487,19 @@ public void testAllMetadataTablesWithStagedCommits() throws Exception { List actualAllData = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_data_files")) .collectAsList(); List actualAllManifests = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_manifests")) .collectAsList(); List actualAllEntries = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_entries")) .collectAsList(); @@ -506,6 +540,7 @@ public void testAllDataFilesTable() throws Exception { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_data_files")) .orderBy("file_path") .collectAsList(); @@ -519,6 +554,7 @@ public void testAllDataFilesTable() throws Exception { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); file.put(0, FileContent.DATA.id()); + file.put(14, 0); // SpecId expected.add(file); } } @@ -576,6 +612,7 @@ public void testHistoryTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "history")) .collectAsList(); @@ -639,6 +676,7 @@ public void testSnapshotsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "snapshots")) .collectAsList(); @@ -691,6 +729,7 @@ public void testManifestsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "manifests")) .collectAsList(); @@ -744,6 +783,7 @@ public void testAllManifestsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "all_manifests")) .orderBy("path") .collectAsList(); @@ -810,6 +850,7 @@ public void testUnpartitionedPartitionsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "partitions")) .collectAsList(); @@ -841,6 +882,7 @@ public void testPartitionsTable() { List actual = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -871,6 +913,7 @@ public void testPartitionsTable() { List actualAfterFirstCommit = spark.read() .format("iceberg") .option("snapshot-id", String.valueOf(firstCommitId)) + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -913,7 +956,9 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { .execute(); Assert.assertEquals("Should delete 1 data file", 1, result2.size()); - Dataset resultDF = spark.read().format("iceberg").load(loadLocation(tableIdentifier)); + Dataset resultDF = spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, this.planMode.name()) + .load(loadLocation(tableIdentifier)); List actualRecords = resultDF .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java index 9159563db12f..ac0d82fc0ced 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -42,6 +42,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -75,23 +76,27 @@ public abstract class TestPartitionPruning { private static final Configuration CONF = new Configuration(); private static final HadoopTables TABLES = new HadoopTables(CONF); - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + @Parameterized.Parameters(name = "File format {0} - Vectorized Read {1} - Plan Mode {2}") public static Object[][] parameters() { return new Object[][] { - { "parquet", false }, - { "parquet", true }, - { "avro", false }, - { "orc", false }, - { "orc", true } + new Object[] { "parquet", false, PlanScanAction.PlanMode.LOCAL }, + new Object[] { "parquet", false, PlanScanAction.PlanMode.DISTRIBUTED }, + new Object[] { "parquet", true, PlanScanAction.PlanMode.LOCAL }, + new Object[] { "avro", false, PlanScanAction.PlanMode.DISTRIBUTED }, + new Object[] { "orc", false, PlanScanAction.PlanMode.LOCAL }, + new Object[] { "orc", false, PlanScanAction.PlanMode.DISTRIBUTED }, + new Object[] { "orc", true, PlanScanAction.PlanMode.LOCAL}, }; } private final String format; private final boolean vectorized; + private final PlanScanAction.PlanMode planMode; - public TestPartitionPruning(String format, boolean vectorized) { + public TestPartitionPruning(String format, boolean vectorized, PlanScanAction.PlanMode planMode) { this.format = format; this.vectorized = vectorized; + this.planMode = planMode; } private static SparkSession spark = null; @@ -109,6 +114,7 @@ public static void startSpark() { String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme); CONF.set(optionKey, CountOpenLocalFileSystem.class.getName()); spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName()); + spark.conf().set(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true"); spark.conf().set("spark.sql.session.timeZone", "UTC"); spark.udf().register("bucket3", (Integer num) -> bucketTransform.apply(num), DataTypes.IntegerType); spark.udf().register("truncate5", (String str) -> truncateTransform.apply(str), DataTypes.StringType); @@ -255,6 +261,7 @@ private void runTest(String filterCond, Predicate partCondition) { List actual = spark.read() .format("iceberg") .option("vectorization-enabled", String.valueOf(vectorized)) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(table.location()) .select("id", "date", "level", "message", "timestamp") .filter(filterCond) @@ -315,7 +322,9 @@ private void assertAccessOnDataFiles(File originTableLocation, Table table, Pred .stream().filter(path -> path.startsWith(originTableLocation.getAbsolutePath())) .collect(Collectors.toSet()); - List files = spark.read().format("iceberg").load(table.location() + "#files").collectAsList(); + List files = spark.read().format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) + .load(table.location() + "#files").collectAsList(); Set filesToRead = extractFilePathsMatchingConditionOnPartition(files, partCondition); Set filesToNotRead = extractFilePathsNotIn(files, filesToRead); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 9ea7d261911e..965c5e67a216 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -40,25 +41,45 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.iceberg.types.Types.NestedField.optional; +@RunWith(Parameterized.class) public abstract class TestSnapshotSelection { + @Parameterized.Parameters(name = "Plan Mode {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {PlanScanAction.PlanMode.LOCAL}, + new Object[] {PlanScanAction.PlanMode.DISTRIBUTED}, + }; + } + private static final Configuration CONF = new Configuration(); private static final Schema SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()) ); + private final PlanScanAction.PlanMode planMode; + @Rule public TemporaryFolder temp = new TemporaryFolder(); private static SparkSession spark = null; + public TestSnapshotSelection(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @BeforeClass public static void startSpark() { - TestSnapshotSelection.spark = SparkSession.builder().master("local[2]").getOrCreate(); + TestSnapshotSelection.spark = SparkSession + .builder() + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") + .master("local[2]").getOrCreate(); } @AfterClass @@ -99,6 +120,7 @@ public void testSnapshotSelectionById() throws IOException { // verify records in the current snapshot Dataset currentSnapshotResult = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List currentSnapshotRecords = currentSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -114,6 +136,7 @@ public void testSnapshotSelectionById() throws IOException { Dataset previousSnapshotResult = spark.read() .format("iceberg") .option("snapshot-id", parentSnapshotId) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List previousSnapshotRecords = previousSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -155,6 +178,7 @@ public void testSnapshotSelectionByTimestamp() throws IOException { // verify records in the current snapshot Dataset currentSnapshotResult = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List currentSnapshotRecords = currentSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -168,6 +192,7 @@ public void testSnapshotSelectionByTimestamp() throws IOException { Dataset previousSnapshotResult = spark.read() .format("iceberg") .option("as-of-timestamp", firstSnapshotTimestamp) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); List previousSnapshotRecords = previousSnapshotResult.orderBy("id") .as(Encoders.bean(SimpleRecord.class)) @@ -186,6 +211,7 @@ public void testSnapshotSelectionByInvalidSnapshotId() throws IOException { Dataset df = spark.read() .format("iceberg") .option("snapshot-id", -10) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); df.collectAsList(); @@ -203,6 +229,7 @@ public void testSnapshotSelectionByInvalidTimestamp() throws IOException { Dataset df = spark.read() .format("iceberg") .option("as-of-timestamp", timestamp) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); df.collectAsList(); @@ -230,6 +257,7 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .format("iceberg") .option("snapshot-id", snapshotId) .option("as-of-timestamp", timestamp) + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(tableLocation); df.collectAsList(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 229a361f6924..8baf86a154e3 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.actions.PlanScanAction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.DeleteReadTests; @@ -41,15 +42,32 @@ import org.apache.spark.sql.internal.SQLConf; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +@RunWith(Parameterized.class) public abstract class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + @Parameterized.Parameters(name = "Plan Mode {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {PlanScanAction.PlanMode.LOCAL}, + new Object[] {PlanScanAction.PlanMode.DISTRIBUTED}, + }; + } + + private final PlanScanAction.PlanMode planMode; + + public TestSparkReaderDeletes(PlanScanAction.PlanMode planMode) { + this.planMode = planMode; + } + @BeforeClass public static void startMetastoreAndSpark() { metastore = new TestHiveMetastore(); @@ -58,6 +76,7 @@ public static void startMetastoreAndSpark() { spark = SparkSession.builder() .master("local[2]") + .config(PlanScanAction.ICEBERG_TEST_PLAN_MODE, "true") .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() @@ -101,6 +120,7 @@ protected void dropTable(String name) { public StructLikeSet rowSet(String name, Table table, String... columns) { Dataset df = spark.read() .format("iceberg") + .option(PlanScanAction.ICEBERG_PLAN_MODE, planMode.name()) .load(TableIdentifier.of("default", name).toString()) .selectExpr(columns); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 826c4508381b..9e7258d05704 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataTableScan; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; @@ -36,6 +37,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.Actions; +import org.apache.iceberg.actions.PlanScanAction; +import org.apache.iceberg.actions.PlanScanAction.PlanMode; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -96,6 +100,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final boolean localityPreferred; private final boolean batchReadsEnabled; private final int batchSize; + private final PlanMode planMode; // lazy variables private Schema schema = null; @@ -108,6 +113,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.table = table; this.snapshotId = options.get("snapshot-id").map(Long::parseLong).orElse(null); this.asOfTimestamp = options.get("as-of-timestamp").map(Long::parseLong).orElse(null); + this.planMode = options.get(PlanScanAction.ICEBERG_PLAN_MODE).map(PlanScanAction::parsePlanMode) + .orElse(PlanMode.LOCAL); if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); @@ -385,14 +392,40 @@ private List tasks() { } } - try (CloseableIterable tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + this.tasks = planScan(scan); + } + return tasks; + } + + private List planScan(TableScan scan) { + // TODO Add Automatic mode for determining when to do Distributed Planning + if (planMode == PlanMode.DISTRIBUTED && scan instanceof DataTableScan) { + return planDistributedScan((DataTableScan) scan); + } else { + return planLocalScan(scan); + } + } + + private List planDistributedScan(DataTableScan scan) { + List result; + try { + result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); + } catch (Exception e) { + if (SparkSession.active().conf().get(PlanScanAction.ICEBERG_TEST_PLAN_MODE).equals("true")) { + throw e; } + LOG.error("Cannot run distributed job planning, falling back to local planning.", e); + result = planLocalScan(scan); } + return result; + } - return tasks; + private List planLocalScan(TableScan scan) { + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + } } @Override diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java index 26af76e52614..d73187d51970 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestDataSourceOptions24 extends TestDataSourceOptions { + public TestDataSourceOptions24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java index 1252f3427973..4275dfc61074 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHadoopTables24 extends TestIcebergSourceHadoopTables { + public TestIcebergSourceHadoopTables24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java index 7ba46cb90aca..a338208e8018 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHiveTables24 extends TestIcebergSourceHiveTables { + public TestIcebergSourceHiveTables24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java index 4b3889e8af21..e5232f5640df 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning24.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestPartitionPruning24 extends TestPartitionPruning { - public TestPartitionPruning24(String format, boolean vectorized) { - super(format, vectorized); + public TestPartitionPruning24(String format, boolean vectorized, PlanScanAction.PlanMode distributedPlanning) { + super(format, vectorized, distributedPlanning); } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java index c003845fba49..0f9488935ab3 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSnapshotSelection24 extends TestSnapshotSelection { + public TestSnapshotSelection24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java index 5b13bffca166..3d22ff3fcd68 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSparkReaderDeletes24 extends TestSparkReaderDeletes { + public TestSparkReaderDeletes24(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 13b85ad2135a..8a2fc757350a 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataTableScan; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; @@ -34,6 +35,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.Actions; +import org.apache.iceberg.actions.PlanScanAction; +import org.apache.iceberg.actions.PlanScanAction.PlanMode; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -49,6 +53,7 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; @@ -82,6 +87,7 @@ class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Broadcast encryptionManager; private final boolean batchReadsEnabled; private final int batchSize; + private final PlanMode planMode; // lazy variables private StructType readSchema = null; @@ -97,6 +103,8 @@ class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.filterExpressions = filters; this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null); this.asOfTimestamp = Spark3Util.propertyAsLong(options, "as-of-timestamp", null); + this.planMode = PlanScanAction.parsePlanMode( + options.getOrDefault(PlanScanAction.ICEBERG_PLAN_MODE, PlanMode.LOCAL.name())); if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( @@ -280,15 +288,41 @@ private List tasks() { scan = scan.filter(filter); } } + this.tasks = planScan(scan); + } + return tasks; + } - try (CloseableIterable tasksIterable = scan.planTasks()) { - this.tasks = Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + private List planScan(TableScan scan) { + // TODO Need to only use distributed planner for supported implementations and add some heuristics about when + // to use + if (planMode == PlanMode.DISTRIBUTED && scan instanceof DataTableScan) { + return planDistributedScan((DataTableScan) scan); + } else { + return planLocalScan(scan); + } + } + + private List planDistributedScan(DataTableScan scan) { + List result; + try { + result = Lists.newArrayList(Actions.forTable(table).planScan().withContext(scan.tableScanContext()).execute()); + } catch (Exception e) { + if (SparkSession.active().conf().get(PlanScanAction.ICEBERG_TEST_PLAN_MODE).equals("true")) { + throw e; } + LOG.error("Cannot run distributed job planning, falling back to local planning.", e); + result = planLocalScan(scan); } + return result; + } - return tasks; + private List planLocalScan(TableScan scan) { + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); + } } @Override diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java index 8dbf08c3caf6..5f8071944f83 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestDataSourceOptions3 extends TestDataSourceOptions { + public TestDataSourceOptions3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java index 166a2bef0979..5a3d229d6545 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHadoopTables3 extends TestIcebergSourceHadoopTables { + public TestIcebergSourceHadoopTables3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java index c69f3bb8dd56..68062f0c86e3 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestIcebergSourceHiveTables3 extends TestIcebergSourceHiveTables { + public TestIcebergSourceHiveTables3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java index 2d09c8287ea1..56acf7125e53 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning3.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestPartitionPruning3 extends TestPartitionPruning { - public TestPartitionPruning3(String format, boolean vectorized) { - super(format, vectorized); + public TestPartitionPruning3(String format, boolean vectorized, PlanScanAction.PlanMode distributedPlanning) { + super(format, vectorized, distributedPlanning); } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java index 4a08f844ce48..e40ca5c6f585 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSnapshotSelection3 extends TestSnapshotSelection { + public TestSnapshotSelection3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java index 82835d695106..ca10bb467d36 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java @@ -19,5 +19,10 @@ package org.apache.iceberg.spark.source; +import org.apache.iceberg.actions.PlanScanAction; + public class TestSparkReaderDeletes3 extends TestSparkReaderDeletes { + public TestSparkReaderDeletes3(PlanScanAction.PlanMode distributedPlanning) { + super(distributedPlanning); + } }