diff --git a/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java new file mode 100644 index 000000000000..1d85077bca3f --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/BaseScanTaskGroup.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Collection; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; + +public class BaseScanTaskGroup implements ScanTaskGroup { + private final Object[] tasks; + private transient volatile List taskList; + + public BaseScanTaskGroup(Collection tasks) { + Preconditions.checkNotNull(tasks, "tasks cannot be null"); + this.tasks = tasks.toArray(); + } + + @Override + @SuppressWarnings("unchecked") + public Collection tasks() { + if (taskList == null) { + synchronized (this) { + if (taskList == null) { + ImmutableList.Builder listBuilder = ImmutableList.builderWithExpectedSize(tasks.length); + for (Object task : tasks) { + listBuilder.add((T) task); + } + taskList = listBuilder.build(); + } + } + } + + return taskList; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tasks", Joiner.on(", ").join(tasks)) + .toString(); + } +} diff --git a/api/src/main/java/org/apache/iceberg/CombinedScanTask.java b/api/src/main/java/org/apache/iceberg/CombinedScanTask.java index 87a5495ba7da..956fc333d7f3 100644 --- a/api/src/main/java/org/apache/iceberg/CombinedScanTask.java +++ b/api/src/main/java/org/apache/iceberg/CombinedScanTask.java @@ -24,13 +24,18 @@ /** * A scan task made of several ranges from files. */ -public interface CombinedScanTask extends ScanTask { +public interface CombinedScanTask extends ScanTaskGroup { /** * Return the {@link FileScanTask tasks} in this combined task. * @return a Collection of FileScanTask instances. */ Collection files(); + @Override + default Collection tasks() { + return files(); + } + @Override default CombinedScanTask asCombinedScanTask() { return this; diff --git a/api/src/main/java/org/apache/iceberg/ContentScanTask.java b/api/src/main/java/org/apache/iceberg/ContentScanTask.java new file mode 100644 index 000000000000..0077c4c7813d --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ContentScanTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.expressions.Expression; + +/** + * A scan task over a range of bytes in a content file. + * + * @param the Java class of the content file + */ +public interface ContentScanTask> extends ScanTask { + /** + * The {@link ContentFile file} to scan. + * + * @return the file to scan + */ + F file(); + + /** + * The {@link PartitionSpec spec} used to store this file. + * + * @return the partition spec from this file's manifest + */ + PartitionSpec spec(); + + /** + * The starting position of this scan range in the file. + * + * @return the start position of this scan range + */ + long start(); + + /** + * The number of bytes to scan from the {@link #start()} position in the file. + * + * @return the length of this scan range in bytes + */ + long length(); + + /** + * Returns the residual expression that should be applied to rows in this file scan. + *

+ * The residual expression for a file is a filter expression created by partially evaluating the scan's filter + * using the file's partition data. + * + * @return a residual expression to apply to rows from this scan + */ + Expression residual(); +} diff --git a/api/src/main/java/org/apache/iceberg/FileScanTask.java b/api/src/main/java/org/apache/iceberg/FileScanTask.java index b390066a09e1..5cc91747a733 100644 --- a/api/src/main/java/org/apache/iceberg/FileScanTask.java +++ b/api/src/main/java/org/apache/iceberg/FileScanTask.java @@ -20,19 +20,11 @@ package org.apache.iceberg; import java.util.List; -import org.apache.iceberg.expressions.Expression; /** - * A scan task over a range of a single file. + * A scan task over a range of bytes in a single data file. */ -public interface FileScanTask extends ScanTask { - /** - * The {@link DataFile file} to scan. - * - * @return the file to scan - */ - DataFile file(); - +public interface FileScanTask extends ContentScanTask, SplittableScanTask { /** * A list of {@link DeleteFile delete files} to apply when reading the task's data file. * @@ -40,44 +32,15 @@ public interface FileScanTask extends ScanTask { */ List deletes(); - /** - * The {@link PartitionSpec spec} used to store this file. - * - * @return the partition spec from this file's manifest - */ - PartitionSpec spec(); - - /** - * The starting position of this scan range in the file. - * - * @return the start position of this scan range - */ - long start(); - - /** - * The number of bytes to scan from the {@link #start()} position in the file. - * - * @return the length of this scan range in bytes - */ - long length(); - - /** - * Returns the residual expression that should be applied to rows in this file scan. - *

- * The residual expression for a file is a filter expression created from the scan's filter, inclusive - * any predicates that are true or false for the entire file removed, based on the file's - * partition data. - * - * @return a residual expression to apply to rows from this scan - */ - Expression residual(); + @Override + default long sizeBytes() { + return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + } - /** - * Splits this scan task into component {@link FileScanTask scan tasks}, each of {@code splitSize} size - * @param splitSize The size of a component scan task - * @return an Iterable of {@link FileScanTask scan tasks} - */ - Iterable split(long splitSize); + @Override + default int filesCount() { + return 1 + deletes().size(); + } @Override default boolean isFileScanTask() { diff --git a/api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java b/api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java index d651967bac88..2817a16b435c 100644 --- a/api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java +++ b/api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java @@ -23,7 +23,7 @@ /** * API for configuring an incremental table scan for appends only snapshots */ -public interface IncrementalAppendScan extends Scan { +public interface IncrementalAppendScan extends Scan { /** * Refine the incremental scan with the start snapshot inclusive. diff --git a/api/src/main/java/org/apache/iceberg/MergeableScanTask.java b/api/src/main/java/org/apache/iceberg/MergeableScanTask.java new file mode 100644 index 000000000000..e1f30a630f95 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/MergeableScanTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * A scan task that can be potentially merged with other scan tasks. + * + * @param the child Java API class + */ +public interface MergeableScanTask extends ScanTask { + /** + * Checks if this task can merge with a given task. + * + * @param other another task + * @return whether the tasks can be merged + */ + boolean canMerge(ScanTask other); + + /** + * Merges this task with a given task. + *

+ * Note this method will be called only if {@link #canMerge(ScanTask)} returns true. + * + * @param other another task + * @return a new merged task + */ + ThisT merge(ScanTask other); +} diff --git a/api/src/main/java/org/apache/iceberg/Scan.java b/api/src/main/java/org/apache/iceberg/Scan.java index 26125a6bf57d..72e005649093 100644 --- a/api/src/main/java/org/apache/iceberg/Scan.java +++ b/api/src/main/java/org/apache/iceberg/Scan.java @@ -28,8 +28,12 @@ /** * Scan objects are immutable and can be shared between threads. Refinement methods, like * {@link #select(Collection)} and {@link #filter(Expression)}, create new TableScan instances. + * + * @param the child Java API class, returned by method chaining + * @param the Java type of tasks produces by this scan + * @param the Java type of task groups produces by this scan */ -public interface Scan> { +public interface Scan> { /** * Create a new scan from this scan's configuration that will override the {@link Table}'s behavior based * on the incoming pair. Unknown properties will be ignored. @@ -38,7 +42,7 @@ public interface Scan> { * @param value value to override with * @return a new scan based on this with overridden behavior */ - T option(String property, String value); + ThisT option(String property, String value); /** * Create a new scan from this with the schema as its projection. @@ -46,7 +50,7 @@ public interface Scan> { * @param schema a projection schema * @return a new scan based on this with the given projection */ - T project(Schema schema); + ThisT project(Schema schema); /** * Create a new scan from this that, if data columns where selected @@ -55,7 +59,7 @@ public interface Scan> { * * @return a new scan based on this with case sensitivity as stated */ - T caseSensitive(boolean caseSensitive); + ThisT caseSensitive(boolean caseSensitive); /** * Create a new scan from this that loads the column stats with each data file. @@ -64,7 +68,7 @@ public interface Scan> { * * @return a new scan based on this that loads column stats. */ - T includeColumnStats(); + ThisT includeColumnStats(); /** * Create a new scan from this that will read the given data columns. This produces @@ -74,7 +78,7 @@ public interface Scan> { * @param columns column names from the table's schema * @return a new scan based on this with the given projection columns */ - T select(Collection columns); + ThisT select(Collection columns); /** * Create a new scan from the results of this filtered by the {@link Expression}. @@ -82,14 +86,14 @@ public interface Scan> { * @param expr a filter expression * @return a new scan based on this with results filtered by the expression */ - T filter(Expression expr); + ThisT filter(Expression expr); /** * Create a new scan from this that applies data filtering to files but not to rows in those files. * * @return a new scan based on this that does not filter rows in files. */ - T ignoreResiduals(); + ThisT ignoreResiduals(); /** * Create a new scan to use a particular executor to plan. The default worker pool will be @@ -98,7 +102,7 @@ public interface Scan> { * @param executorService the provided executor * @return a table scan that uses the provided executor to access manifests */ - T planWith(ExecutorService executorService); + ThisT planWith(ExecutorService executorService); /** * Returns this scan's projection {@link Schema}. @@ -113,26 +117,23 @@ public interface Scan> { Schema schema(); /** - * Plan the {@link FileScanTask files} that will be read by this scan. - *

- * Each file has a residual expression that should be applied to filter the file's rows. + * Plan tasks for this scan where each task reads a single file. *

- * This simple plan returns file scans for each file from position 0 to the file's length. For - * planning that will combine small files, split large files, and attempt to balance work, use - * {@link #planTasks()} instead. + * Use {@link #planTasks()} for planning balanced tasks where each task will read either a single file, + * a part of a file, or multiple files. * - * @return an Iterable of file tasks that are required by this scan + * @return an Iterable of tasks scanning entire files required by this scan */ - CloseableIterable planFiles(); + CloseableIterable planFiles(); /** - * Plan the {@link CombinedScanTask tasks} for this scan. + * Plan balanced task groups for this scan by splitting large and combining small tasks. *

- * Tasks created by this method may read partial input files, multiple input files, or both. + * Task groups created by this method may read partial input files, multiple input files or both. * - * @return an Iterable of tasks for this scan + * @return an Iterable of balanced task groups required by this scan */ - CloseableIterable planTasks(); + CloseableIterable planTasks(); /** * Returns the target split size for this scan. diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/ScanTask.java index 8863e50bddfc..1b202f506a2b 100644 --- a/api/src/main/java/org/apache/iceberg/ScanTask.java +++ b/api/src/main/java/org/apache/iceberg/ScanTask.java @@ -25,6 +25,24 @@ * A scan task. */ public interface ScanTask extends Serializable { + /** + * The number of bytes that should be read by this scan task. + * + * @return the total number of bytes to read + */ + default long sizeBytes() { + return 4 * 1028 * 1028; // 4 MB + } + + /** + * The number of files that will be opened by this scan task. + * + * @return the number of files to open + */ + default int filesCount() { + return 1; + } + /** * Returns true if this is a {@link FileScanTask}, false otherwise. */ diff --git a/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java b/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java new file mode 100644 index 000000000000..71a2d3fa43c3 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ScanTaskGroup.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Collection; + +/** + * A scan task that may include partial input files, multiple input files or both. + * + * @param the type of scan tasks + */ +public interface ScanTaskGroup extends ScanTask { + /** + * Returns scan tasks in this group. + */ + Collection tasks(); + + @Override + default long sizeBytes() { + return tasks().stream().mapToLong(ScanTask::sizeBytes).sum(); + } + + @Override + default int filesCount() { + return tasks().stream().mapToInt(ScanTask::filesCount).sum(); + } +} diff --git a/api/src/main/java/org/apache/iceberg/SplittableScanTask.java b/api/src/main/java/org/apache/iceberg/SplittableScanTask.java new file mode 100644 index 000000000000..0385e55972b9 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/SplittableScanTask.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * A scan task that can be split into smaller scan tasks. + * + * @param the child Java API class + */ +public interface SplittableScanTask extends ScanTask { + /** + * Attempts to split this scan task into several smaller scan tasks, each close to {@code splitSize} size. + *

+ * Note the target split size is just guidance and the actual split size may be either smaller or larger. + * File formats like Parquet may leverage the row group offset information while splitting tasks. + * + * @param targetSplitSize the target size of each new scan task in bytes + * @return an Iterable of smaller tasks + */ + Iterable split(long targetSplitSize); +} diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java index b3ca8fb3a7c0..fedeb05203f1 100644 --- a/api/src/main/java/org/apache/iceberg/TableScan.java +++ b/api/src/main/java/org/apache/iceberg/TableScan.java @@ -25,7 +25,7 @@ /** * API for configuring a table scan. */ -public interface TableScan extends Scan { +public interface TableScan extends Scan { /** * Returns the {@link Table} from which this scan loads data. * diff --git a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java b/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java index e300476c826c..aa65bf7c4ce8 100644 --- a/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseCombinedScanTask.java @@ -25,6 +25,7 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.TableScanUtil; public class BaseCombinedScanTask implements CombinedScanTask { private final FileScanTask[] tasks; @@ -36,7 +37,7 @@ public BaseCombinedScanTask(FileScanTask... tasks) { public BaseCombinedScanTask(List tasks) { Preconditions.checkNotNull(tasks, "tasks cannot be null"); - this.tasks = BaseFileScanTask.combineAdjacentTasks(tasks).stream().toArray(FileScanTask[]::new); + this.tasks = TableScanUtil.mergeTasks(tasks).toArray(new FileScanTask[0]); } @Override diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index 3f701b881882..8167ed82f508 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -171,7 +171,7 @@ public FileScanTask next() { } } - private static final class SplitScanTask implements FileScanTask { + private static final class SplitScanTask implements FileScanTask, MergeableScanTask { private final long len; private final long offset; private final FileScanTask fileScanTask; @@ -217,50 +217,20 @@ public Iterable split(long splitSize) { throw new UnsupportedOperationException("Cannot split a task which is already split"); } - public boolean isAdjacent(SplitScanTask other) { - return other != null && - this.file().equals(other.file()) && - this.offset + this.len == other.offset; - } - } - - static List combineAdjacentTasks(List tasks) { - if (tasks.isEmpty()) { - return tasks; - } - - List combinedScans = Lists.newArrayList(); - SplitScanTask lastSplit = null; - - for (FileScanTask fileScanTask : tasks) { - if (!(fileScanTask instanceof SplitScanTask)) { - // Return any tasks not produced by split un-modified - combinedScans.add(fileScanTask); + @Override + public boolean canMerge(ScanTask other) { + if (other instanceof SplitScanTask) { + SplitScanTask that = (SplitScanTask) other; + return file().equals(that.file()) && offset + len == that.start(); } else { - SplitScanTask split = (SplitScanTask) fileScanTask; - if (lastSplit != null) { - if (lastSplit.isAdjacent(split)) { - // Merge with the last split - lastSplit = new SplitScanTask( - lastSplit.offset, - lastSplit.len + split.len, - lastSplit.fileScanTask); - } else { - // Last split is not adjacent, add it to finished adjacent groups - combinedScans.add(lastSplit); - lastSplit = split; - } - } else { - // First split - lastSplit = split; - } + return false; } } - if (lastSplit != null) { - combinedScans.add(lastSplit); + @Override + public SplitScanTask merge(ScanTask other) { + SplitScanTask that = (SplitScanTask) other; + return new SplitScanTask(offset, len + that.length(), fileScanTask); } - - return combinedScans; } } diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java index c86682e86578..cb98fdcd8daf 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java @@ -32,7 +32,9 @@ import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; -class BaseIncrementalAppendScan extends BaseScan implements IncrementalAppendScan { +class BaseIncrementalAppendScan + extends BaseScan + implements IncrementalAppendScan { BaseIncrementalAppendScan(TableOperations ops, Table table) { this(ops, table, table.schema(), new TableScanContext()); diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 44e8d38eb533..7cf88f5b1c48 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -30,7 +30,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; -abstract class BaseScan> implements Scan { +abstract class BaseScan> implements Scan { private final TableOperations ops; private final Table table; private final Schema schema; @@ -59,46 +59,46 @@ protected TableScanContext context() { return context; } - protected abstract T newRefinedScan( + protected abstract ThisT newRefinedScan( TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext); @Override - public T option(String property, String value) { + public ThisT option(String property, String value) { return newRefinedScan(ops, table, schema, context.withOption(property, value)); } @Override - public T project(Schema projectedSchema) { + public ThisT project(Schema projectedSchema) { return newRefinedScan(ops, table, schema, context.project(projectedSchema)); } @Override - public T caseSensitive(boolean caseSensitive) { + public ThisT caseSensitive(boolean caseSensitive) { return newRefinedScan(ops, table, schema, context.setCaseSensitive(caseSensitive)); } @Override - public T includeColumnStats() { + public ThisT includeColumnStats() { return newRefinedScan(ops, table, schema, context.shouldReturnColumnStats(true)); } @Override - public T select(Collection columns) { + public ThisT select(Collection columns) { return newRefinedScan(ops, table, schema, context.selectColumns(columns)); } @Override - public T filter(Expression expr) { + public ThisT filter(Expression expr) { return newRefinedScan(ops, table, schema, context.filterRows(Expressions.and(context.rowFilter(), expr))); } @Override - public T ignoreResiduals() { + public ThisT ignoreResiduals() { return newRefinedScan(ops, table, schema, context.ignoreResiduals(true)); } @Override - public T planWith(ExecutorService executorService) { + public ThisT planWith(ExecutorService executorService) { return newRefinedScan(ops, table, schema, context.planWith(executorService)); } diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 68862d466c7e..f2a2f0d472b8 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -37,7 +37,7 @@ /** * Base class for {@link TableScan} implementations. */ -abstract class BaseTableScan extends BaseScan implements TableScan { +abstract class BaseTableScan extends BaseScan implements TableScan { private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class); protected BaseTableScan(TableOperations ops, Table table, Schema schema) { diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index d8d2d68ba7dc..dc62d9f5a4f2 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -19,15 +19,23 @@ package org.apache.iceberg.util; +import java.util.List; import java.util.function.Function; import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.BaseScanTaskGroup; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.ContentFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MergeableScanTask; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.SplittableScanTask; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class TableScanUtil { @@ -78,4 +86,65 @@ public static CloseableIterable planTasks(CloseableIterable CloseableIterable> planTaskGroups(CloseableIterable tasks, + long splitSize, int lookback, + long openFileCost) { + + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument(lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument(openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + // capture manifests which can be closed after scan planning + CloseableIterable splitTasks = CloseableIterable.combine( + FluentIterable.from(tasks).transformAndConcat(task -> { + if (task instanceof SplittableScanTask) { + return ((SplittableScanTask) task).split(splitSize); + } else { + return ImmutableList.of(task); + } + }), + tasks); + + Function weightFunc = task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + + return CloseableIterable.transform( + CloseableIterable.combine( + new BinPacking.PackingIterable<>(splitTasks, splitSize, lookback, weightFunc, true), + splitTasks), + combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks))); + } + + @SuppressWarnings("unchecked") + public static List mergeTasks(List tasks) { + List mergedTasks = Lists.newArrayList(); + + T lastTask = null; + + for (T task : tasks) { + if (lastTask != null) { + if (lastTask instanceof MergeableScanTask) { + MergeableScanTask mergeableLastTask = (MergeableScanTask) lastTask; + if (mergeableLastTask.canMerge(task)) { + lastTask = mergeableLastTask.merge(task); + } else { + mergedTasks.add(lastTask); + lastTask = task; + } + } else { + mergedTasks.add(lastTask); + lastTask = task; + } + } else { + lastTask = task; + } + } + + if (lastTask != null) { + mergedTasks.add(lastTask); + } + + return mergedTasks; + } } diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java b/core/src/test/java/org/apache/iceberg/ScanTestBase.java index a800e4032997..1b7b5ac421e2 100644 --- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java @@ -36,7 +36,7 @@ import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) -public abstract class ScanTestBase> extends TableTestBase { +public abstract class ScanTestBase> extends TableTestBase { @Parameterized.Parameters(name = "formatVersion = {0}") public static Object[] parameters() { return new Object[] { 1, 2 }; diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java index b595ac170cba..55c98cde3a7d 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java @@ -28,8 +28,14 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MergeableScanTask; import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.SplittableScanTask; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; @@ -88,4 +94,109 @@ public void testPlanTaskWithDeleteFiles() { expectedCombinedTasks.get(i).files(), combinedScanTasks.get(i).files()); } } + + @Test + public void testTaskGroupPlanning() { + List tasks = ImmutableList.of( + new ChildTask1(64), + new ChildTask1(32), + new ChildTask3(64), + new ChildTask3(32), + new ChildTask2(128), + new ChildTask3(32), + new ChildTask3(32) + ); + + CloseableIterable> taskGroups = TableScanUtil.planTaskGroups( + CloseableIterable.withNoopClose(tasks), 128, 10, 4); + + Assert.assertEquals("Must have 3 task groups", 3, Iterables.size(taskGroups)); + } + + @Test + public void testTaskMerging() { + List tasks = ImmutableList.of( + new ChildTask1(64), + new ChildTask1(64), + new ChildTask2(128), + new ChildTask3(32), + new ChildTask3(32) + ); + List mergedTasks = TableScanUtil.mergeTasks(tasks); + Assert.assertEquals("Appropriate tasks should be merged", 3, mergedTasks.size()); + } + + private interface ParentTask extends ScanTask { + } + + private static class ChildTask1 implements ParentTask, SplittableScanTask, MergeableScanTask { + private final long sizeBytes; + + ChildTask1(long sizeBytes) { + this.sizeBytes = sizeBytes; + } + + @Override + public Iterable split(long targetSplitSize) { + return ImmutableList.of(new ChildTask1(sizeBytes / 2), new ChildTask1(sizeBytes / 2)); + } + + @Override + public boolean canMerge(ScanTask other) { + return other instanceof ChildTask1; + } + + @Override + public ChildTask1 merge(ScanTask other) { + ChildTask1 that = (ChildTask1) other; + return new ChildTask1(sizeBytes + that.sizeBytes); + } + + @Override + public long sizeBytes() { + return sizeBytes; + } + } + + private static class ChildTask2 implements ParentTask, SplittableScanTask { + private final long sizeBytes; + + ChildTask2(long sizeBytes) { + this.sizeBytes = sizeBytes; + } + + @Override + public Iterable split(long targetSplitSize) { + return ImmutableList.of(new ChildTask2(sizeBytes / 2), new ChildTask2(sizeBytes / 2)); + } + + @Override + public long sizeBytes() { + return sizeBytes; + } + } + + private static class ChildTask3 implements ParentTask, MergeableScanTask { + private final long sizeBytes; + + ChildTask3(long sizeBytes) { + this.sizeBytes = sizeBytes; + } + + @Override + public boolean canMerge(ScanTask other) { + return other instanceof ChildTask3; + } + + @Override + public ChildTask3 merge(ScanTask other) { + ChildTask3 that = (ChildTask3) other; + return new ChildTask3(sizeBytes + that.sizeBytes); + } + + @Override + public long sizeBytes() { + return sizeBytes; + } + } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index 2e50243240f6..d1628fbca794 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.Scan; import org.apache.iceberg.Table; @@ -124,7 +125,7 @@ private static ScanMode checkScanMode(ScanContext context) { /** * refine scan with common configs */ - private static > T refineScanWithBaseConfigs( + private static > T refineScanWithBaseConfigs( T scan, ScanContext context, ExecutorService workerPool) { T refinedScan = scan .caseSensitive(context.caseSensitive()) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index 2e50243240f6..d1628fbca794 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.Scan; import org.apache.iceberg.Table; @@ -124,7 +125,7 @@ private static ScanMode checkScanMode(ScanContext context) { /** * refine scan with common configs */ - private static > T refineScanWithBaseConfigs( + private static > T refineScanWithBaseConfigs( T scan, ScanContext context, ExecutorService workerPool) { T refinedScan = scan .caseSensitive(context.caseSensitive()) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TaskCheckHelper.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TaskCheckHelper.java index 99396647ee3e..dd187c0b6bf0 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TaskCheckHelper.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TaskCheckHelper.java @@ -28,7 +28,7 @@ public final class TaskCheckHelper { private TaskCheckHelper() { } - public static void assertEquals(BaseCombinedScanTask expected, BaseCombinedScanTask actual) { + public static void assertEquals(ScanTaskGroup expected, ScanTaskGroup actual) { List expectedTasks = getFileScanTasksInFilePathOrder(expected); List actualTasks = getFileScanTasksInFilePathOrder(actual); @@ -84,8 +84,8 @@ public static void assertEquals(DataFile expected, DataFile actual) { expected.keyMetadata(), actual.keyMetadata()); } - private static List getFileScanTasksInFilePathOrder(BaseCombinedScanTask task) { - return task.files().stream() + private static List getFileScanTasksInFilePathOrder(ScanTaskGroup taskGroup) { + return taskGroup.tasks().stream() // use file path + start position to differentiate the tasks .sorted(Comparator.comparing(o -> o.file().path().toString() + "##" + o.start())) .collect(Collectors.toList()); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java index e234ee2617aa..1fe736b51aad 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestScanTaskSerialization.java @@ -29,11 +29,13 @@ import java.io.FileOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.nio.file.Files; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkTestBase; @@ -107,7 +109,60 @@ public void testBaseCombinedScanTaskJavaSerialization() throws Exception { } } + @Test + @SuppressWarnings("unchecked") + public void testBaseScanTaskGroupKryoSerialization() throws Exception { + BaseScanTaskGroup taskGroup = prepareBaseScanTaskGroupForSerDeTest(); + + Assert.assertTrue("Task group can't be empty", taskGroup.tasks().size() > 0); + + File data = temp.newFile(); + Assert.assertTrue(data.delete()); + Kryo kryo = new KryoSerializer(new SparkConf()).newKryo(); + + try (Output out = new Output(Files.newOutputStream(data.toPath()))) { + kryo.writeClassAndObject(out, taskGroup); + } + + try (Input in = new Input(Files.newInputStream(data.toPath()))) { + Object obj = kryo.readClassAndObject(in); + Assertions.assertThat(obj).as("should be a BaseScanTaskGroup").isInstanceOf(BaseScanTaskGroup.class); + TaskCheckHelper.assertEquals(taskGroup, (BaseScanTaskGroup) obj); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testBaseScanTaskGroupJavaSerialization() throws Exception { + BaseScanTaskGroup taskGroup = prepareBaseScanTaskGroupForSerDeTest(); + + Assert.assertTrue("Task group can't be empty", taskGroup.tasks().size() > 0); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (ObjectOutputStream out = new ObjectOutputStream(bytes)) { + out.writeObject(taskGroup); + } + + try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) { + Object obj = in.readObject(); + Assertions.assertThat(obj).as("should be a BaseScanTaskGroup").isInstanceOf(BaseScanTaskGroup.class); + TaskCheckHelper.assertEquals(taskGroup, (BaseScanTaskGroup) obj); + } + } + private BaseCombinedScanTask prepareBaseCombinedScanTaskForSerDeTest() { + Table table = initTable(); + CloseableIterable tasks = table.newScan().planFiles(); + return new BaseCombinedScanTask(Lists.newArrayList(tasks)); + } + + private BaseScanTaskGroup prepareBaseScanTaskGroupForSerDeTest() { + Table table = initTable(); + CloseableIterable tasks = table.newScan().planFiles(); + return new BaseScanTaskGroup<>(ImmutableList.copyOf(tasks)); + } + + private Table initTable() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); @@ -126,8 +181,7 @@ private BaseCombinedScanTask prepareBaseCombinedScanTaskForSerDeTest() { table.refresh(); - CloseableIterable tasks = table.newScan().planFiles(); - return new BaseCombinedScanTask(Lists.newArrayList(tasks)); + return table; } private void writeRecords(List records) {