Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this not match the rest of the files in the project? I'd prefer not to reformat headers in a different PR if we can avoid it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this is just a proof of concept PR, to compare with the other idea demonstrated here: #1421

I think once one is decided upon / if either of the versions are decided upon, then @RussellSpitzer will be updating it to be properly formatted etc. Please feel free to correct me if I'm wrong though Russell 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep sorry, I was just trying to get to a prototype real fast here.

*/

package org.apache.iceberg;
Expand All @@ -25,7 +20,7 @@
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

interface ManifestEntry<F extends ContentFile<F>> {
public interface ManifestEntry<F extends ContentFile<F>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've really tried to avoid making ManifestEntry public because exposing it makes the API much harder to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on why making ManifestEntry public makes the API harder to use? Is it just because we then have to account for the fact that this is a public API?

I personally am more fond of this proof of concept as opposed to the other one, so I'd love to get insight into your comment here.

If it's just because we would then have to treat this API public, is it possible something could be done to mark it as private to iceberg, equivalent to scala's private[iceberg]?

enum Status {
EXISTING(0),
ADDED(1),
Expand Down
33 changes: 33 additions & 0 deletions api/src/main/java/org/apache/iceberg/ManifestProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg;

import java.io.Serializable;
import java.util.function.BiFunction;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;

public abstract class ManifestProcessor implements Serializable {
public abstract <T extends ContentFile<T>> Iterable<CloseableIterable<ManifestEntry<T>>> readManifests(final Iterable<ManifestFile> fromIterable,
BiFunction<ManifestFile, FileIO, CloseableIterable<ManifestEntry<T>>> reader);

/**
* A Helper interface for making lambdas transform into the correct type for the ManfiestProcessor
* @param <T> The ManifestEntry Type being read from Manifest files
*/
public interface Func<T extends ContentFile<T>> extends BiFunction<ManifestFile, FileIO,
CloseableIterable<ManifestEntry<T>>>, Serializable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a WIP, so I'm partially commenting to follow along.

But I'm not a huge fan of this interface being named Func. Its usage also feels somewhat clunky to me to be honest, but I'm more of a scala developer than a java developer by day so that could possibly just be my own bias showing.

Overall this is shaping up to be a great addition though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out the other PR too :) And if you have another approach I'd be glad to look into that as well. This is in my opinion definitely a more kludgy approach in Java. The only reason we need this interface is because otherwise you have to do
(All of that type info) lambda, this is basically a type alias java will use to covert the lambda into a real function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Ok. Thank you so much! As I've mentioned, in my day job I'm more of a scala developer so I'm much more used to working with all of the well enriched type stuff that's available there. Thankfully I've finally stood all of this up on my own K8s cluster somewhere so I can continue to get more experienced with practical usage of Iceberg (especially between query systems).

When you explain it that way though, it makes perfect sense. :)

As for the approach being kludgy, if it gets the job done / speeds up manifest reading and makes things like exception stack traces etc more readable than a very large number of lambda functions, I'm not necessarily opposed at all. As a person that spends a lot of time in their current position as a sort of developer advocate helping people with the more esoteric parts of Spark and Flink etc, I can absolutely get behind more simplified type info and less concern with the serializability of functions, especially if the lambdas get raised to real functions so that they have more readable stack traces. :)

I'll be sure to check out the other approach. And if I can come up with any approaches or modifications to this approach that might be cleaner, I'll definitely let you know. I think that java doc comment on the interface is likely sufficient to explain Func. It's possible that maybe the relatively generic name Func is what tripped me up, but I can't think of a better name that doesn't conflict with other current Iceberg concepts (e.g. Transformers, etc).


}
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/TableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public interface TableScan {
*/
TableScan includeColumnStats();

/**
* Doc doc doc
*/
TableScan withManifestProcessor(ManifestProcessor processor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you did here to get past the linter ;-)

If we decide to use this approach, let's be sure to not merge in Doc doc doc but instead a more descriptive comment or even a TODO comment. However since this is a WIP it's not worth your time to explain a pretty self documenting method name for the purpose of appeasing the linter. Leaving this comment so we don't forget to update if we do merge this in. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course :)


/**
* Create a new {@link TableScan} from this that will read the given data columns. This produces
* an expected schema that includes all fields that are either selected or used by this scan's
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ abstract class BaseTableScan implements TableScan {
private final Table table;
private final Schema schema;
private final TableScanContext context;
protected ManifestProcessor manifestProcessor;

protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
this(ops, table, schema, new TableScanContext());
Expand All @@ -64,6 +65,7 @@ protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableSc
this.table = table;
this.schema = schema;
this.context = context;
this.manifestProcessor = new LocalManifestProcessor(table.io());
}

protected TableOperations tableOps() {
Expand Down Expand Up @@ -131,6 +133,9 @@ public TableScan useSnapshot(long scanSnapshotId) {
ops, table, schema, context.useSnapshotId(scanSnapshotId));
}




Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary whitespace change

@Override
public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(context.snapshotId() == null,
Expand Down Expand Up @@ -275,6 +280,12 @@ public String toString() {
.toString();
}

@Override
public TableScan withManifestProcessor(ManifestProcessor processor) {
this.manifestProcessor = processor;
return this;
}

/**
* To be able to make refinements {@link #select(Collection)} and {@link #caseSensitive(boolean)} in any order,
* we resolve the schema to be projected lazily here.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot s
manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}

return manifestGroup.planFiles();
return manifestGroup.withProcessor(this.manifestProcessor).planFiles();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.iceberg;

import java.io.Serializable;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.types.Types;

class GenericManifestEntry<F extends ContentFile<F>>
implements ManifestEntry<F>, IndexedRecord, SpecificData.SchemaConstructable, StructLike {
implements ManifestEntry<F>, IndexedRecord, SpecificData.SchemaConstructable, StructLike, Serializable {
private final org.apache.avro.Schema schema;
private Status status = Status.EXISTING;
private Long snapshotId = null;
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/java/org/apache/iceberg/LocalManifestProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg;

import java.util.function.BiFunction;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;

public class LocalManifestProcessor extends ManifestProcessor {
private final FileIO io;

public LocalManifestProcessor(FileIO io){
this.io = io;
}

@Override
public <T extends ContentFile<T>> Iterable<CloseableIterable<ManifestEntry<T>>> readManifests(
Iterable<ManifestFile> fromIterable,
BiFunction<ManifestFile, FileIO, CloseableIterable<ManifestEntry<T>>> reader) {
return CloseableIterable.transform(
CloseableIterable.withNoopClose(fromIterable), manifestFile -> reader.apply(manifestFile, io));
}
}
103 changes: 72 additions & 31 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Evaluator;
Expand All @@ -36,6 +35,7 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Function;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand All @@ -61,6 +61,7 @@ class ManifestGroup {
private List<String> columns;
private boolean caseSensitive;
private ExecutorService executorService;
private ManifestProcessor manifestProcessor;

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(io,
Expand All @@ -82,6 +83,12 @@ class ManifestGroup {
this.caseSensitive = true;
this.manifestPredicate = m -> true;
this.manifestEntryPredicate = e -> true;
this.manifestProcessor = new LocalManifestProcessor(io);
}

ManifestGroup withProcessor(ManifestProcessor processor){
this.manifestProcessor = processor;
return this;
}

ManifestGroup specsById(Map<Integer, PartitionSpec> newSpecsById) {
Expand Down Expand Up @@ -169,20 +176,26 @@ public CloseableIterable<FileScanTask> planFiles() {
select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList()));
}

Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
int specId = manifest.partitionSpecId();
PartitionSpec spec = specsById.get(specId);
String schemaString = SchemaParser.toJson(spec.schema());
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = residualCache.get(specId);
if (dropStats) {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals));
} else {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals));
}
});
LoadingCache<Integer, SpecCacheEntry> specCache = Caffeine.newBuilder().build(
specId -> {
PartitionSpec spec = specsById.get(specId);
return new SpecCacheEntry(SchemaParser.toJson(spec.schema()), PartitionSpecParser.toJson(spec),
residualCache.get(specId));
}
);

//Todo Make this cleaner (maybe go back to old method of two traversals? Make api for BaseFileScanTask?
//This will have different performance characteristics than the old version since we are doing a look for every
// entry, but I think this will probably end up being essentially a noop with branch prediction since we look up
// the same thing over and over in order.
Iterable<CloseableIterable<FileScanTask>> tasks = entries(entries ->
CloseableIterable.transform(entries, e ->
{
SpecCacheEntry cached = specCache.get(e.file().specId());
DataFile file = (dropStats) ? e.file().copyWithoutStats() : e.file().copy();
return new BaseFileScanTask(file, deleteFiles.forEntry(e), cached.schemaString, cached.specString,
cached.residuals);
}));

if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
Expand All @@ -200,11 +213,32 @@ public CloseableIterable<FileScanTask> planFiles() {
* @return a CloseableIterable of manifest entries.
*/
public CloseableIterable<ManifestEntry<DataFile>> entries() {
return CloseableIterable.concat(entries((manifest, entries) -> entries));
return CloseableIterable.concat(entries(entry -> entry));
}

/*
Generating the lambda in a static context allows us to ignore the serializability of this class
*/
private static ManifestProcessor.Func<DataFile> generateManifestProcessorFunc(Map<Integer, PartitionSpec> specsById,
Expression dataFilter, Expression partitionFilter, boolean caseSensitive, List<String> columns, boolean ignoreDeleted) {
return (ManifestProcessor.Func<DataFile>) (manifest, processorIO) -> {
ManifestReader<DataFile> reader = ManifestFiles.read(manifest, processorIO, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.select(columns);

CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
if (ignoreDeleted) {
entries = reader.liveEntries();
}
return entries;
};
}

private <T> Iterable<CloseableIterable<T>> entries(
BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {
Function<CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {

LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ?
null : Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
Expand Down Expand Up @@ -236,20 +270,13 @@ private <T> Iterable<CloseableIterable<T>> entries(

matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test);

return Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.select(columns);

CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
if (ignoreDeleted) {
entries = reader.liveEntries();
}
Iterable< CloseableIterable<ManifestEntry<DataFile>>> fileReader =
manifestProcessor.readManifests(matchingManifests, generateManifestProcessorFunc(specsById, dataFilter,
partitionFilter, caseSensitive, columns, ignoreDeleted));

return Iterables.transform(
fileReader,
entries -> {
if (ignoreExisting) {
entries = CloseableIterable.filter(entries,
entry -> entry.status() != ManifestEntry.Status.EXISTING);
Expand All @@ -261,7 +288,21 @@ private <T> Iterable<CloseableIterable<T>> entries(
}

entries = CloseableIterable.filter(entries, manifestEntryPredicate);
return entryFn.apply(manifest, entries);
return entryFn.apply(entries);
});
}


private class SpecCacheEntry {
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;

SpecCacheEntry(String schemaString, String specString, ResidualEvaluator residuals) {
this.schemaString = schemaString;
this.specString = specString;
this.residuals = residuals;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.util;

import java.io.Serializable;
import java.util.Objects;
import java.util.function.Predicate;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public interface SerializablePredicate<T> extends Predicate<T>, Serializable {

@Override
default SerializablePredicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
Preconditions.checkArgument(other instanceof SerializablePredicate);
return (T x) -> this.test(x) && other.test(x);
}
}
Loading