diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/api/src/main/java/org/apache/iceberg/ManifestEntry.java similarity index 68% rename from core/src/main/java/org/apache/iceberg/ManifestEntry.java rename to api/src/main/java/org/apache/iceberg/ManifestEntry.java index e07b125a8c5c..f90e9f75556d 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/api/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -1,20 +1,15 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.iceberg; @@ -25,7 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -interface ManifestEntry> { +public interface ManifestEntry> { enum Status { EXISTING(0), ADDED(1), diff --git a/api/src/main/java/org/apache/iceberg/ManifestProcessor.java b/api/src/main/java/org/apache/iceberg/ManifestProcessor.java new file mode 100644 index 000000000000..3412b202450c --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ManifestProcessor.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg; + +import java.io.Serializable; +import java.util.function.BiFunction; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; + +public abstract class ManifestProcessor implements Serializable { + public abstract > Iterable>> readManifests(final Iterable fromIterable, + BiFunction>> reader); + + /** + * A Helper interface for making lambdas transform into the correct type for the ManfiestProcessor + * @param The ManifestEntry Type being read from Manifest files + */ + public interface Func> extends BiFunction>>, Serializable {} + +} diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java index 4537c7d81ca7..73aeedddecae 100644 --- a/api/src/main/java/org/apache/iceberg/TableScan.java +++ b/api/src/main/java/org/apache/iceberg/TableScan.java @@ -94,6 +94,11 @@ public interface TableScan { */ TableScan includeColumnStats(); + /** + * Doc doc doc + */ + TableScan withManifestProcessor(ManifestProcessor processor); + /** * Create a new {@link TableScan} from this that will read the given data columns. This produces * an expected schema that includes all fields that are either selected or used by this scan's diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index da1ff6326aa0..28e473425c59 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -54,6 +54,7 @@ abstract class BaseTableScan implements TableScan { private final Table table; private final Schema schema; private final TableScanContext context; + protected ManifestProcessor manifestProcessor; protected BaseTableScan(TableOperations ops, Table table, Schema schema) { this(ops, table, schema, new TableScanContext()); @@ -64,6 +65,7 @@ protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableSc this.table = table; this.schema = schema; this.context = context; + this.manifestProcessor = new LocalManifestProcessor(table.io()); } protected TableOperations tableOps() { @@ -131,6 +133,9 @@ public TableScan useSnapshot(long scanSnapshotId) { ops, table, schema, context.useSnapshotId(scanSnapshotId)); } + + + @Override public TableScan asOfTime(long timestampMillis) { Preconditions.checkArgument(context.snapshotId() == null, @@ -275,6 +280,12 @@ public String toString() { .toString(); } + @Override + public TableScan withManifestProcessor(ManifestProcessor processor) { + this.manifestProcessor = processor; + return this; + } + /** * To be able to make refinements {@link #select(Collection)} and {@link #caseSensitive(boolean)} in any order, * we resolve the schema to be projected lazily here. diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 9a6b85f652b8..e63d37a61fcc 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -86,7 +86,7 @@ public CloseableIterable planFiles(TableOperations ops, Snapshot s manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool()); } - return manifestGroup.planFiles(); + return manifestGroup.withProcessor(this.manifestProcessor).planFiles(); } @Override diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index dfc9475074dc..18092640edff 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import java.io.Serializable; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -26,7 +27,7 @@ import org.apache.iceberg.types.Types; class GenericManifestEntry> - implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike { + implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike, Serializable { private final org.apache.avro.Schema schema; private Status status = Status.EXISTING; private Long snapshotId = null; diff --git a/core/src/main/java/org/apache/iceberg/LocalManifestProcessor.java b/core/src/main/java/org/apache/iceberg/LocalManifestProcessor.java new file mode 100644 index 000000000000..331ad5f344a3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/LocalManifestProcessor.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg; + +import java.util.function.BiFunction; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; + +public class LocalManifestProcessor extends ManifestProcessor { + private final FileIO io; + + public LocalManifestProcessor(FileIO io){ + this.io = io; + } + + @Override + public > Iterable>> readManifests( + Iterable fromIterable, + BiFunction>> reader) { + return CloseableIterable.transform( + CloseableIterable.withNoopClose(fromIterable), manifestFile -> reader.apply(manifestFile, io)); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 54c809c419e5..c5e30095ea0d 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.iceberg.expressions.Evaluator; @@ -36,6 +35,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Function; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -61,6 +61,7 @@ class ManifestGroup { private List columns; private boolean caseSensitive; private ExecutorService executorService; + private ManifestProcessor manifestProcessor; ManifestGroup(FileIO io, Iterable manifests) { this(io, @@ -82,6 +83,12 @@ class ManifestGroup { this.caseSensitive = true; this.manifestPredicate = m -> true; this.manifestEntryPredicate = e -> true; + this.manifestProcessor = new LocalManifestProcessor(io); + } + + ManifestGroup withProcessor(ManifestProcessor processor){ + this.manifestProcessor = processor; + return this; } ManifestGroup specsById(Map newSpecsById) { @@ -169,20 +176,26 @@ public CloseableIterable planFiles() { select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList())); } - Iterable> tasks = entries((manifest, entries) -> { - int specId = manifest.partitionSpecId(); - PartitionSpec spec = specsById.get(specId); - String schemaString = SchemaParser.toJson(spec.schema()); - String specString = PartitionSpecParser.toJson(spec); - ResidualEvaluator residuals = residualCache.get(specId); - if (dropStats) { - return CloseableIterable.transform(entries, e -> new BaseFileScanTask( - e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals)); - } else { - return CloseableIterable.transform(entries, e -> new BaseFileScanTask( - e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals)); - } - }); + LoadingCache specCache = Caffeine.newBuilder().build( + specId -> { + PartitionSpec spec = specsById.get(specId); + return new SpecCacheEntry(SchemaParser.toJson(spec.schema()), PartitionSpecParser.toJson(spec), + residualCache.get(specId)); + } + ); + + //Todo Make this cleaner (maybe go back to old method of two traversals? Make api for BaseFileScanTask? + //This will have different performance characteristics than the old version since we are doing a look for every + // entry, but I think this will probably end up being essentially a noop with branch prediction since we look up + // the same thing over and over in order. + Iterable> tasks = entries(entries -> + CloseableIterable.transform(entries, e -> + { + SpecCacheEntry cached = specCache.get(e.file().specId()); + DataFile file = (dropStats) ? e.file().copyWithoutStats() : e.file().copy(); + return new BaseFileScanTask(file, deleteFiles.forEntry(e), cached.schemaString, cached.specString, + cached.residuals); + })); if (executorService != null) { return new ParallelIterable<>(tasks, executorService); @@ -200,11 +213,32 @@ public CloseableIterable planFiles() { * @return a CloseableIterable of manifest entries. */ public CloseableIterable> entries() { - return CloseableIterable.concat(entries((manifest, entries) -> entries)); + return CloseableIterable.concat(entries(entry -> entry)); + } + + /* + Generating the lambda in a static context allows us to ignore the serializability of this class + */ + private static ManifestProcessor.Func generateManifestProcessorFunc(Map specsById, + Expression dataFilter, Expression partitionFilter, boolean caseSensitive, List columns, boolean ignoreDeleted) { + return (ManifestProcessor.Func) (manifest, processorIO) -> { + ManifestReader reader = ManifestFiles.read(manifest, processorIO, specsById) + .filterRows(dataFilter) + .filterPartitions(partitionFilter) + .caseSensitive(caseSensitive) + .select(columns); + + CloseableIterable> entries = reader.entries(); + if (ignoreDeleted) { + entries = reader.liveEntries(); + } + return entries; + }; } private Iterable> entries( - BiFunction>, CloseableIterable> entryFn) { + Function>, CloseableIterable> entryFn) { + LoadingCache evalCache = specsById == null ? null : Caffeine.newBuilder().build(specId -> { PartitionSpec spec = specsById.get(specId); @@ -236,20 +270,13 @@ private Iterable> entries( matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test); - return Iterables.transform( - matchingManifests, - manifest -> { - ManifestReader reader = ManifestFiles.read(manifest, io, specsById) - .filterRows(dataFilter) - .filterPartitions(partitionFilter) - .caseSensitive(caseSensitive) - .select(columns); - - CloseableIterable> entries = reader.entries(); - if (ignoreDeleted) { - entries = reader.liveEntries(); - } + Iterable< CloseableIterable>> fileReader = + manifestProcessor.readManifests(matchingManifests, generateManifestProcessorFunc(specsById, dataFilter, + partitionFilter, caseSensitive, columns, ignoreDeleted)); + return Iterables.transform( + fileReader, + entries -> { if (ignoreExisting) { entries = CloseableIterable.filter(entries, entry -> entry.status() != ManifestEntry.Status.EXISTING); @@ -261,7 +288,21 @@ private Iterable> entries( } entries = CloseableIterable.filter(entries, manifestEntryPredicate); - return entryFn.apply(manifest, entries); + return entryFn.apply(entries); }); } + + + private class SpecCacheEntry { + private final String schemaString; + private final String specString; + private final ResidualEvaluator residuals; + + SpecCacheEntry(String schemaString, String specString, ResidualEvaluator residuals) { + this.schemaString = schemaString; + this.specString = specString; + this.residuals = residuals; + } + } + } diff --git a/core/src/main/java/org/apache/iceberg/util/SerializablePredicate.java b/core/src/main/java/org/apache/iceberg/util/SerializablePredicate.java new file mode 100644 index 000000000000..80e06e8a57e9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/SerializablePredicate.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.util; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Predicate; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public interface SerializablePredicate extends Predicate, Serializable { + + @Override + default SerializablePredicate and(Predicate other) { + Objects.requireNonNull(other); + Preconditions.checkArgument(other instanceof SerializablePredicate); + return (T x) -> this.test(x) && other.test(x); + } +} \ No newline at end of file diff --git a/spark/src/main/java/org/apache/iceberg/spark/DistributedManifestProcessor.java b/spark/src/main/java/org/apache/iceberg/spark/DistributedManifestProcessor.java new file mode 100644 index 000000000000..c0aa404d2ece --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/DistributedManifestProcessor.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.List; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestEntry; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestProcessor; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; + +public class DistributedManifestProcessor extends ManifestProcessor { + transient private final SparkSession spark; + private final Broadcast ioBroadcast; + + public DistributedManifestProcessor(SparkSession spark, FileIO io) { + this.spark = spark; + this.ioBroadcast = JavaSparkContext.fromSparkContext(spark.sparkContext()).broadcast(io); + } + + @Override + public > Iterable>> readManifests( + Iterable fromIterable, + BiFunction>> reader) { + List>> results = + JavaSparkContext.fromSparkContext(spark.sparkContext()) + .parallelize(Lists.newArrayList(fromIterable)) + .map(file -> { + CloseableIterable> closeable = reader.apply(file, ioBroadcast.getValue()); + Iterable> realizedEntries = Lists.newArrayList(CloseableIterable.transform(closeable, + ManifestEntry::copy)); + closeable.close(); + return realizedEntries; + }).collect(); + return results.stream().map(list -> CloseableIterable.withNoopClose(list)).collect(Collectors.toList()); + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 564f8f2168ff..deacb4148a9d 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -47,8 +47,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.DistributedManifestProcessor; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -378,7 +380,9 @@ private List tasks() { } } - try (CloseableIterable tasksIterable = scan.planTasks()) { + FileIO io = SparkUtil.serializableFileIO(table); + try (CloseableIterable tasksIterable = + scan.withManifestProcessor(new DistributedManifestProcessor(SparkSession.active(), io)).planTasks()) { this.tasks = Lists.newArrayList(tasksIterable); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 6cd166b72758..2ad4900fc22e 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -43,10 +43,13 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.DistributedManifestProcessor; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; @@ -246,7 +249,11 @@ private List tasks() { } } - try (CloseableIterable tasksIterable = scan.planTasks()) { + SparkSession spark = SparkSession.active(); + FileIO io = SparkUtil.serializableFileIO(table); + + try (CloseableIterable tasksIterable = + scan.withManifestProcessor(new DistributedManifestProcessor(spark, io)).planTasks()) { this.tasks = Lists.newArrayList(tasksIterable); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close table scan: %s", scan);