Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import static java.util.Collections.emptyIterator;
Expand Down Expand Up @@ -580,10 +579,9 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
List<Object> constantList = Lists.newArrayListWithCapacity(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
Object constant = idToConstant.get(field.fieldId());
if (constant != null) {
if (idToConstant.containsKey(field.fieldId())) {
positionList.add(pos);
constantList.add(prepareConstant(field.type(), constant));
constantList.add(idToConstant.get(field.fieldId()));
}
}

Expand All @@ -597,10 +595,6 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma

protected abstract void set(S struct, int pos, Object value);

protected Object prepareConstant(Type type, Object value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm moving this out of Avro and adding a callback to convert the constants to PartitionUtil.constantsMap. That way, Spark can supply a conversion function and use it in both places, instead of duplicating the conversion in Avro and Parquet readers.

return value;
}

public ValueReader<?> reader(int pos) {
return readers[pos];
}
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,36 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class PartitionUtil {
private PartitionUtil() {
}

public static Map<Integer, ?> constantsMap(FileScanTask task) {
return constantsMap(task.spec(), task.file().partition());
return constantsMap(task, (type, constant) -> constant);
}

private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData) {
public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
return constantsMap(task.spec(), task.file().partition(), convertConstant);
}

private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData,
BiFunction<Type, Object, Object> convertConstant) {
// use java.util.HashMap because partition data may contain null values
Map<Integer, Object> idToConstant = new HashMap<>();
List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
PartitionField field = fields.get(pos);
idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class));
Object converted = convertConstant.apply(partitionFields.get(pos).type(), partitionData.get(pos, Object.class));
idToConstant.put(field.sourceId(), converted);
}
return idToConstant;
}
Expand Down
52 changes: 52 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.data;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;

public class DateTimeUtil {
private DateTimeUtil() {
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

public static LocalDate dateFromDays(int daysFromEpoch) {
return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
}

public static LocalTime timeFromMicros(long microFromMidnight) {
return LocalTime.ofNanoOfDay(microFromMidnight * 1000);
}

public static LocalDateTime timestampFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime();
}

public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
}
}
38 changes: 36 additions & 2 deletions data/src/main/java/org/apache/iceberg/data/TableScanIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
Expand All @@ -46,6 +47,8 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;

class TableScanIterable extends CloseableGroup implements CloseableIterable<Record> {
Expand Down Expand Up @@ -76,7 +79,7 @@ public Iterator<Record> iterator() {

private CloseableIterable<Record> open(FileScanTask task) {
InputFile input = ops.io().newInputFile(task.file().path().toString());
Map<Integer, ?> partition = PartitionUtil.constantsMap(task);
Map<Integer, ?> partition = PartitionUtil.constantsMap(task, TableScanIterable::convertConstant);

// TODO: join to partition data from the manifest file
switch (task.file().format()) {
Expand All @@ -96,7 +99,7 @@ private CloseableIterable<Record> open(FileScanTask task) {
case PARQUET:
Parquet.ReadBuilder parquet = Parquet.read(input)
.project(projection)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema))
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema, partition))
.split(task.start(), task.length());

if (reuseContainers) {
Expand Down Expand Up @@ -185,4 +188,35 @@ public void close() throws IOException {
}
}
}

/**
* Conversions from generic Avro values to Iceberg generic values.
*/
private static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
}

switch (type.typeId()) {
case STRING:
return value.toString();
case TIME:
return DateTimeUtil.timeFromMicros((Long) value);
case DATE:
return DateTimeUtil.dateFromDays((Integer) value);
case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
return DateTimeUtil.timestamptzFromMicros((Long) value);
} else {
return DateTimeUtil.timestampFromMicros((Long) value);
}
case FIXED:
if (value instanceof GenericData.Fixed) {
return ((GenericData.Fixed) value).bytes();
}
return value;
default:
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
package org.apache.iceberg.data.avro;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.DateTimeUtil;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -60,9 +58,6 @@ static ValueReader<Record> struct(StructType struct, List<ValueReader<?>> reader
return new GenericRecordReader(readers, struct, idToConstant);
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader implements ValueReader<LocalDate> {
private static final DateReader INSTANCE = new DateReader();

Expand All @@ -71,7 +66,7 @@ private DateReader() {

@Override
public LocalDate read(Decoder decoder, Object reuse) throws IOException {
return EPOCH_DAY.plusDays(decoder.readInt());
return DateTimeUtil.dateFromDays(decoder.readInt());
}
}

Expand All @@ -83,7 +78,7 @@ private TimeReader() {

@Override
public LocalTime read(Decoder decoder, Object reuse) throws IOException {
return LocalTime.ofNanoOfDay(decoder.readLong() * 1000);
return DateTimeUtil.timeFromMicros(decoder.readLong());
}
}

Expand All @@ -95,7 +90,7 @@ private TimestampReader() {

@Override
public LocalDateTime read(Decoder decoder, Object reuse) throws IOException {
return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS).toLocalDateTime();
return DateTimeUtil.timestampFromMicros(decoder.readLong());
}
}

Expand All @@ -107,7 +102,7 @@ private TimestamptzReader() {

@Override
public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
return EPOCH.plus(decoder.readLong(), ChronoUnit.MICROS);
return DateTimeUtil.timestamptzFromMicros(decoder.readLong());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.data.parquet;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.time.Instant;
Expand Down Expand Up @@ -65,23 +66,28 @@ public class GenericParquetReaders {
private GenericParquetReaders() {
}

@SuppressWarnings("unchecked")
public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
MessageType fileSchema) {
return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
}
@SuppressWarnings("unchecked")
public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<GenericRecord>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema));
new ReadBuilder(fileSchema, idToConstant));
} else {
return (ParquetValueReader<GenericRecord>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema));
new FallbackReadBuilder(fileSchema, idToConstant));
}
}

private static class FallbackReadBuilder extends ReadBuilder {
FallbackReadBuilder(MessageType type) {
super(type);
FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
super(type, idToConstant);
}

@Override
Expand Down Expand Up @@ -112,9 +118,11 @@ public ParquetValueReader<?> struct(StructType expected, GroupType struct,

private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;

ReadBuilder(MessageType type) {
ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
this.type = type;
this.idToConstant = idToConstant;
}

@Override
Expand Down Expand Up @@ -145,13 +153,19 @@ public ParquetValueReader<?> struct(StructType expected, GroupType struct,
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
}
}
}

Expand Down
Loading