Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,29 @@

package org.apache.iceberg.avro;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.DelegatingInputStream;
import org.apache.iceberg.io.SeekableInputStream;

class AvroIO {
private static final byte[] AVRO_MAGIC = new byte[] { 'O', 'b', 'j', 1 };
private static final ValueReader<byte[]> MAGIC_READER = ValueReaders.fixed(AVRO_MAGIC.length);
private static final ValueReader<Map<String, String>> META_READER = ValueReaders.map(
ValueReaders.strings(), ValueReaders.strings());
private static final ValueReader<byte[]> SYNC_READER = ValueReaders.fixed(16);

private AvroIO() {
}

Expand Down Expand Up @@ -131,4 +145,58 @@ public boolean markSupported() {
return stream.markSupported();
}
}

static long findStartingRowPos(Supplier<SeekableInputStream> open, long start) {
long totalRows = 0;
try (SeekableInputStream in = open.get()) {
// use a direct decoder that will not buffer so the position of the input stream is accurate
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);

// an Avro file's layout looks like this:
// header|block|block|...
// the header contains:
// magic|string-map|sync
// each block consists of:
// row-count|compressed-size-in-bytes|block-bytes|sync

// it is necessary to read the header here because this is the only way to get the expected file sync bytes
byte[] magic = MAGIC_READER.read(decoder, null);
if (!Arrays.equals(AVRO_MAGIC, magic)) {
throw new InvalidAvroMagicException("Not an Avro file");
}

META_READER.read(decoder, null); // ignore the file metadata, it isn't needed
byte[] fileSync = SYNC_READER.read(decoder, null);

// the while loop reads row counts and seeks past the block bytes until the next sync pos is >= start, which
// indicates that the next sync is the start of the split.
byte[] blockSync = new byte[16];
long nextSyncPos = in.getPos();

while (nextSyncPos < start) {
Copy link
Contributor

@shardulm94 shardulm94 Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this valid start state, but I think there can be an edge case here

row-count|compressed-size-in-bytes|block-bytes|sync|EOF
                                                 ^
                                                 |
                                               start

In this case it will seek and read the sync and then continue to read next block row count even after EOF

So should probably be while (nextSyncPos + 16 < start)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop condition is correct with respect to start because a block starts at the beginning of a sync. You're right about the EOF behavior, though. I'll take a look at that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the insightful comment! (before it went away 😃 ) I was not familiar Avro's split reading behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have edited, not deleted. I missed the part about it being just before the EOF.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix is to catch EOFException and return the number of rows.

if (nextSyncPos != in.getPos()) {
in.seek(nextSyncPos);
SYNC_READER.read(decoder, blockSync);

if (!Arrays.equals(fileSync, blockSync)) {
throw new RuntimeIOException("Invalid sync at %s", nextSyncPos);
}
}

long rowCount = decoder.readLong();
long compressedBlockSize = decoder.readLong();

totalRows += rowCount;
nextSyncPos = in.getPos() + compressedBlockSize;
}

return totalRows;

} catch (EOFException e) {
return totalRows;

} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read stream while finding starting row position");
}
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ public CloseableIterator<D> iterator() {
FileReader<D> fileReader = initMetadata(newFileReader());

if (start != null) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(
() -> AvroIO.findStartingRowPos(file::newStream, start));
}
fileReader = new AvroRangeIterator<>(fileReader, start, end);
} else if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(() -> 0L);
}

addCloseable(fileReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Supplier;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -48,6 +49,7 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Schema record(Schema record, List<String> names, Iterable<Schema.Field> schemaIterable) {
Preconditions.checkArgument(
current.isNestedType() && current.asNestedType().isStructType(),
Expand Down Expand Up @@ -93,7 +95,9 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
updatedFields.add(avroField);

} else {
Preconditions.checkArgument(field.isOptional(), "Missing required field: %s", field.name());
Preconditions.checkArgument(
field.isOptional() || field.fieldId() == MetadataColumns.ROW_POSITION.fieldId(),
"Missing required field: %s", field.name());
// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField = new Schema.Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.TypeUtil;

public class ProjectionDatumReader<D> implements DatumReader<D> {
public class ProjectionDatumReader<D> implements DatumReader<D>, SupportsRowPosition {
private final Function<Schema, DatumReader<?>> getReader;
private final org.apache.iceberg.Schema expectedSchema;
private final Map<String, String> renames;
Expand All @@ -49,6 +50,13 @@ public ProjectionDatumReader(Function<Schema, DatumReader<?>> getReader,
this.nameMapping = nameMapping;
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (wrapped instanceof SupportsRowPosition) {
((SupportsRowPosition) wrapped).setRowPositionSupplier(posSupplier);
}
}

@Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import java.util.function.Supplier;

/**
* Interface for readers that accept a callback to determine the starting row position of an Avro split.
*/
public interface SupportsRowPosition {
void setRowPositionSupplier(Supplier<Long> posSupplier);
}
42 changes: 41 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -560,10 +562,11 @@ public Map<K, V> read(Decoder decoder, Object reuse) throws IOException {
}
}

public abstract static class StructReader<S> implements ValueReader<S> {
public abstract static class StructReader<S> implements ValueReader<S>, SupportsRowPosition {
private final ValueReader<?>[] readers;
private final int[] positions;
private final Object[] constants;
private int posField = -1;

protected StructReader(List<ValueReader<?>> readers) {
this.readers = readers.toArray(new ValueReader[0]);
Expand All @@ -582,13 +585,36 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
if (idToConstant.containsKey(field.fieldId())) {
positionList.add(pos);
constantList.add(idToConstant.get(field.fieldId()));
} else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
}
}

this.positions = positionList.stream().mapToInt(Integer::intValue).toArray();
this.constants = constantList.toArray();
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (posField > 0) {
long startingPos = posSupplier.get();
this.readers[posField] = new PositionReader(startingPos);
for (ValueReader<?> reader : readers) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(() -> startingPos);
}
}

} else {
for (ValueReader<?> reader : readers) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}
}
}

protected abstract S reuseOrCreate(Object reuse);

protected abstract Object get(S struct, int pos);
Expand Down Expand Up @@ -687,4 +713,18 @@ protected void set(R struct, int pos, Object value) {
struct.put(pos, value);
}
}

static class PositionReader implements ValueReader<Long> {
private long currentPosition;

PositionReader(long rowPosition) {
this.currentPosition = rowPosition - 1;
}

@Override
public Long read(Decoder ignored, Object reuse) throws IOException {
currentPosition += 1;
return currentPosition;
}
}
}
11 changes: 10 additions & 1 deletion core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -32,6 +33,7 @@
import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand All @@ -40,7 +42,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class DataReader<T> implements DatumReader<T> {
public class DataReader<T> implements DatumReader<T>, SupportsRowPosition {

private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> DECODER_CACHES =
ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
Expand Down Expand Up @@ -78,6 +80,13 @@ public T read(T reuse, Decoder decoder) throws IOException {
return value;
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}

private ResolvingDecoder resolve(Decoder decoder) throws IOException {
Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
Expand Down
Loading