Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void add(DataFile addedFile) {
addEntry(reused.wrapAppend(snapshotId, addedFile));
}

public void add(ManifestEntry entry) {
void add(ManifestEntry entry) {
addEntry(reused.wrapAppend(snapshotId, entry.file()));
}

Expand Down
36 changes: 33 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import static java.util.Collections.emptyIterator;

Expand Down Expand Up @@ -561,12 +563,32 @@ public Map<K, V> read(Decoder decoder, Object reuse) throws IOException {

public abstract static class StructReader<S> implements ValueReader<S> {
private final ValueReader<?>[] readers;
private final int[] positions;
private final Object[] constants;

protected StructReader(List<ValueReader<?>> readers) {
this.readers = new ValueReader[readers.size()];
for (int i = 0; i < this.readers.length; i += 1) {
this.readers[i] = readers.get(i);
this.readers = readers.toArray(new ValueReader[0]);
this.positions = new int[0];
this.constants = new Object[0];
}

protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
this.readers = readers.toArray(new ValueReader[0]);

List<Types.NestedField> fields = struct.fields();
List<Integer> positionList = Lists.newArrayListWithCapacity(fields.size());
List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
Object constant = idToConstant.get(field.fieldId());
if (constant != null) {
positionList.add(pos);
constantList.add(prepareConstant(field.type(), constant));
}
}

this.positions = positionList.stream().mapToInt(Integer::intValue).toArray();
this.constants = constantList.toArray();
}

protected abstract S reuseOrCreate(Object reuse);
Expand All @@ -575,6 +597,10 @@ protected StructReader(List<ValueReader<?>> readers) {

protected abstract void set(S struct, int pos, Object value);

protected Object prepareConstant(Type type, Object value) {
return value;
}

public ValueReader<?> reader(int pos) {
return readers[pos];
}
Expand All @@ -597,6 +623,10 @@ public S read(Decoder decoder, Object reuse) throws IOException {
}
}

for (int i = 0; i < positions.length; i += 1) {
set(struct, positions[i], constants[i]);
}

return struct;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static byte[] toByteArray(ByteBuffer buffer) {
}
} else {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.asReadOnlyBuffer().get(bytes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick check on this method to make sure the incoming buffer is not modified and found that it is for off-heap buffers. This fixes the problem, but isn't really related. If anyone prefers, I can move this to a separate PR.

return bytes;
}
}
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;

public class PartitionUtil {
private PartitionUtil() {
}

public static Map<Integer, ?> constantsMap(FileScanTask task) {
return constantsMap(task.spec(), task.file().partition());
}

private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData) {
// use java.util.HashMap because partition data may contain null values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true only for identity partitioning or in general?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general. All partition functions are required to return null when applied to null.

Map<Integer, Object> idToConstant = new HashMap<>();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
PartitionField field = fields.get(pos);
idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class));
}
return idToConstant;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
Expand All @@ -45,6 +46,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.PartitionUtil;

class TableScanIterable extends CloseableGroup implements CloseableIterable<Record> {
private final TableOperations ops;
Expand Down Expand Up @@ -74,13 +76,15 @@ public Iterator<Record> iterator() {

private CloseableIterable<Record> open(FileScanTask task) {
InputFile input = ops.io().newInputFile(task.file().path().toString());
Map<Integer, ?> partition = PartitionUtil.constantsMap(task);

// TODO: join to partition data from the manifest file
switch (task.file().format()) {
case AVRO:
Avro.ReadBuilder avro = Avro.read(input)
.project(projection)
.createReaderFunc(DataReader::create)
.createReaderFunc(
avroSchema -> DataReader.create(projection, avroSchema, partition))
.split(task.start(), task.length());

if (reuseContainers) {
Expand Down
19 changes: 14 additions & 5 deletions data/src/main/java/org/apache/iceberg/data/avro/DataReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.data.avro;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import java.io.IOException;
import java.util.HashMap;
Expand All @@ -45,17 +46,23 @@ public class DataReader<T> implements DatumReader<T> {
ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());

public static <D> DataReader<D> create(org.apache.iceberg.Schema expectedSchema, Schema readSchema) {
return new DataReader<>(expectedSchema, readSchema);
return create(expectedSchema, readSchema, ImmutableMap.of());
}

public static <D> DataReader<D> create(org.apache.iceberg.Schema expectedSchema, Schema readSchema,
Map<Integer, ?> idToConstant) {
return new DataReader<>(expectedSchema, readSchema, idToConstant);
}

private final Schema readSchema;
private final ValueReader<T> reader;
private Schema fileSchema = null;

@SuppressWarnings("unchecked")
private DataReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) {
private DataReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map<Integer, ?> idToConstant) {
this.readSchema = readSchema;
this.reader = (ValueReader<T>) AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder());
this.reader = (ValueReader<T>) AvroSchemaWithTypeVisitor
.visit(expectedSchema, readSchema, new ReadBuilder(idToConstant));
}

@Override
Expand Down Expand Up @@ -96,14 +103,16 @@ private ResolvingDecoder newResolver() {
}

private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;

private ReadBuilder() {
private ReadBuilder(Map<Integer, ?> idToConstant) {
this.idToConstant = idToConstant;
}

@Override
public ValueReader<?> record(Types.StructType struct, Schema record,
List<String> names, List<ValueReader<?>> fields) {
return GenericReaders.struct(struct, fields);
return GenericReaders.struct(struct, fields, idToConstant);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
Expand Down Expand Up @@ -55,8 +56,8 @@ static ValueReader<OffsetDateTime> timestamptz() {
return TimestamptzReader.INSTANCE;
}

static ValueReader<Record> struct(StructType struct, List<ValueReader<?>> readers) {
return new GenericRecordReader(readers, struct);
static ValueReader<Record> struct(StructType struct, List<ValueReader<?>> readers, Map<Integer, ?> idToConstant) {
return new GenericRecordReader(readers, struct, idToConstant);
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
Expand Down Expand Up @@ -113,8 +114,8 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
private static class GenericRecordReader extends ValueReaders.StructReader<Record> {
private final StructType structType;

private GenericRecordReader(List<ValueReader<?>> readers, StructType struct) {
super(readers);
private GenericRecordReader(List<ValueReader<?>> readers, StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
this.structType = struct;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.spark.data;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import java.io.IOException;
import java.util.HashMap;
Expand All @@ -31,10 +32,12 @@
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;


Expand All @@ -47,10 +50,15 @@ public class SparkAvroReader implements DatumReader<InternalRow> {
private final ValueReader<InternalRow> reader;
private Schema fileSchema = null;

public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) {
this(expectedSchema, readSchema, ImmutableMap.of());
}

@SuppressWarnings("unchecked")
public SparkAvroReader(Schema readSchema) {
public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map<Integer, ?> constants) {
this.readSchema = readSchema;
this.reader = (ValueReader<InternalRow>) AvroSchemaVisitor.visit(readSchema, new ReadBuilder());
this.reader = (ValueReader<InternalRow>) AvroSchemaWithTypeVisitor
.visit(expectedSchema, readSchema, new ReadBuilder(constants));
}

@Override
Expand Down Expand Up @@ -90,38 +98,42 @@ private ResolvingDecoder newResolver() {
}
}

private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
private ReadBuilder() {
private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;

private ReadBuilder(Map<Integer, ?> idToConstant) {
this.idToConstant = idToConstant;
}

@Override
public ValueReader<?> record(Schema record, List<String> names, List<ValueReader<?>> fields) {
return SparkValueReaders.struct(fields);
public ValueReader<?> record(Types.StructType expected, Schema record, List<String> names,
List<ValueReader<?>> fields) {
return SparkValueReaders.struct(fields, expected, idToConstant);
}

@Override
public ValueReader<?> union(Schema union, List<ValueReader<?>> options) {
public ValueReader<?> union(Type expected, Schema union, List<ValueReader<?>> options) {
return ValueReaders.union(options);
}

@Override
public ValueReader<?> array(Schema array, ValueReader<?> elementReader) {
LogicalType logical = array.getLogicalType();
if (logical != null && "map".equals(logical.getName())) {
ValueReader<?>[] keyValueReaders = ((SparkValueReaders.StructReader) elementReader).readers();
return SparkValueReaders.arrayMap(keyValueReaders[0], keyValueReaders[1]);
}

public ValueReader<?> array(Types.ListType expected, Schema array, ValueReader<?> elementReader) {
return SparkValueReaders.array(elementReader);
}

@Override
public ValueReader<?> map(Schema map, ValueReader<?> valueReader) {
public ValueReader<?> map(Types.MapType expected, Schema map,
ValueReader<?> keyReader, ValueReader<?> valueReader) {
return SparkValueReaders.arrayMap(keyReader, valueReader);
}

@Override
public ValueReader<?> map(Types.MapType expected, Schema map, ValueReader<?> valueReader) {
return SparkValueReaders.map(SparkValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> primitive(Schema primitive) {
public ValueReader<?> primitive(Type.PrimitiveType expected, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
Expand Down
Loading