Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class DateTimeUtil {
private DateTimeUtil() {
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

public static LocalDate dateFromDays(int daysFromEpoch) {
return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
Expand Down
71 changes: 51 additions & 20 deletions core/src/test/java/org/apache/iceberg/TestMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,24 @@ public abstract class TestMetrics {
required(7, "stringCol", StringType.get()),
optional(8, "dateCol", DateType.get()),
required(9, "timeCol", TimeType.get()),
required(10, "timestampCol", TimestampType.withoutZone()),
required(10, "timestampColAboveEpoch", TimestampType.withoutZone()),
required(11, "fixedCol", FixedType.ofLength(4)),
required(12, "binaryCol", BinaryType.get())
required(12, "binaryCol", BinaryType.get()),
required(13, "timestampColBelowEpoch", TimestampType.withoutZone())
);

private final byte[] fixed = "abcd".getBytes(StandardCharsets.UTF_8);

public abstract FileFormat fileFormat();

public abstract Metrics getMetrics(InputFile file);

public abstract InputFile writeRecords(Schema schema, Record... records) throws IOException;

public abstract InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records)
throws IOException;

public abstract int splitCount(InputFile parquetFile) throws IOException;
public abstract int splitCount(InputFile inputFile) throws IOException;

public boolean supportsSmallRowGroups() {
return false;
Expand All @@ -119,9 +122,10 @@ public void testMetricsForRepeatedValues() throws IOException {
firstRecord.setField("stringCol", "AAA");
firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500));
firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L));
firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L));
firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L));
firstRecord.setField("fixedCol", fixed);
firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L));
Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA);
secondRecord.setField("booleanCol", true);
secondRecord.setField("intCol", 3);
Expand All @@ -132,9 +136,10 @@ public void testMetricsForRepeatedValues() throws IOException {
secondRecord.setField("stringCol", "AAA");
secondRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500));
secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L));
secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L));
secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L));
secondRecord.setField("fixedCol", fixed);
secondRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L));

InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord);

Expand All @@ -152,6 +157,7 @@ public void testMetricsForRepeatedValues() throws IOException {
assertCounts(10, 2L, 0L, metrics);
assertCounts(11, 2L, 0L, metrics);
assertCounts(12, 2L, 0L, metrics);
assertCounts(13, 2L, 0L, metrics);
}

@Test
Expand All @@ -166,9 +172,10 @@ public void testMetricsForTopLevelFields() throws IOException {
firstRecord.setField("stringCol", "AAA");
firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500));
firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L));
firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L));
firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L));
firstRecord.setField("fixedCol", fixed);
firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(-1_900_300L));
Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA);
secondRecord.setField("booleanCol", false);
secondRecord.setField("intCol", Integer.MIN_VALUE);
Expand All @@ -179,9 +186,10 @@ public void testMetricsForTopLevelFields() throws IOException {
secondRecord.setField("stringCol", "ZZZ");
secondRecord.setField("dateCol", null);
secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(3000L));
secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(1000L));
secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(900L));
secondRecord.setField("fixedCol", fixed);
secondRecord.setField("binaryCol", ByteBuffer.wrap("W".getBytes()));
secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L));

InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord);

Expand All @@ -206,13 +214,27 @@ public void testMetricsForTopLevelFields() throws IOException {
assertCounts(9, 2L, 0L, metrics);
assertBounds(9, TimeType.get(), 2000L, 3000L, metrics);
assertCounts(10, 2L, 0L, metrics);
assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics);
if (fileFormat() == FileFormat.ORC) {
// ORC-611: ORC only supports millisecond precision, so we adjust by 1 millisecond
assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics);
} else {
assertBounds(10, TimestampType.withoutZone(), 0L, 900L, metrics);
}
assertCounts(11, 2L, 0L, metrics);
assertBounds(11, FixedType.ofLength(4),
ByteBuffer.wrap(fixed), ByteBuffer.wrap(fixed), metrics);
assertCounts(12, 2L, 0L, metrics);
assertBounds(12, BinaryType.get(),
ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics);
if (fileFormat() == FileFormat.ORC) {
// TODO: enable when ORC-342 is fixed - ORC-342: creates inaccurate timestamp/stats below epoch
// ORC-611: ORC only supports millisecond precision, so we adjust by 1 millisecond, e.g.
// assertBounds(13, TimestampType.withoutZone(), -1000L, 1000L, metrics); would fail for a value
// in the range `[1970-01-01 00:00:00.000,1970-01-01 00:00:00.999]`
assertBounds(13, TimestampType.withoutZone(), -1_901_000L, 1000L, metrics);
} else {
assertBounds(13, TimestampType.withoutZone(), -1_900_300L, 0L, metrics);
}
}

@Test
Expand Down Expand Up @@ -292,14 +314,22 @@ public void testMetricsForListAndMapElements() throws IOException {

Metrics metrics = getMetrics(recordsFile);
Assert.assertEquals(1L, (long) metrics.recordCount());
assertCounts(1, 1, 0, metrics);
if (fileFormat() != FileFormat.ORC) {
assertCounts(1, 1L, 0L, metrics);
assertCounts(2, 1L, 0L, metrics);
assertCounts(4, 3L, 0L, metrics);
assertCounts(6, 1L, 0L, metrics);
} else {
assertCounts(1, null, null, metrics);
assertCounts(2, null, null, metrics);
assertCounts(4, null, null, metrics);
assertCounts(6, null, null, metrics);
}
assertBounds(1, IntegerType.get(), null, null, metrics);
assertCounts(2, 1, 0, metrics);
assertBounds(2, StringType.get(), null, null, metrics);
assertCounts(4, 3, 0, metrics);
assertBounds(4, IntegerType.get(), null, null, metrics);
assertCounts(6, 1, 0, metrics);
assertBounds(6, StringType.get(), null, null, metrics);
assertBounds(7, structType, null, null, metrics);
}

@Test
Expand All @@ -316,7 +346,7 @@ public void testMetricsForNullColumns() throws IOException {

Metrics metrics = getMetrics(recordsFile);
Assert.assertEquals(2L, (long) metrics.recordCount());
assertCounts(1, 2, 2, metrics);
assertCounts(1, 2L, 2L, metrics);
assertBounds(1, IntegerType.get(), null, null, metrics);
}

Expand All @@ -338,13 +368,14 @@ public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception {
newRecord.setField("stringCol", "AAA");
newRecord.setField("dateCol", DateTimeUtil.dateFromDays(i + 1));
newRecord.setField("timeCol", DateTimeUtil.timeFromMicros(i + 1L));
newRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(i + 1L));
newRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(i + 1L));
newRecord.setField("fixedCol", fixed);
newRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes()));
newRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros((i + 1L) * -1L));
records.add(newRecord);
}

// create parquet file with multiple row groups. by using smaller number of bytes
// create file with multiple row groups. by using smaller number of bytes
InputFile recordsFile = writeRecordsWithSmallRowGroups(SIMPLE_SCHEMA, records.toArray(new Record[0]));

Assert.assertNotNull(recordsFile);
Expand Down Expand Up @@ -387,7 +418,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce
records.add(newRecord);
}

// create parquet file with multiple row groups. by using smaller number of bytes
// create file with multiple row groups. by using smaller number of bytes
InputFile recordsFile = writeRecordsWithSmallRowGroups(NESTED_SCHEMA, records.toArray(new Record[0]));

Assert.assertNotNull(recordsFile);
Expand All @@ -407,14 +438,14 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce
ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
}

private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) {
private void assertCounts(int fieldId, Long valueCount, Long nullValueCount, Metrics metrics) {
Map<Integer, Long> valueCounts = metrics.valueCounts();
Map<Integer, Long> nullValueCounts = metrics.nullValueCounts();
Assert.assertEquals(valueCount, (long) valueCounts.get(fieldId));
Assert.assertEquals(nullValueCount, (long) nullValueCounts.get(fieldId));
Assert.assertEquals(valueCount, valueCounts.get(fieldId));
Assert.assertEquals(nullValueCount, nullValueCounts.get(fieldId));
}

private <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
protected <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
Map<Integer, ByteBuffer> lowerBounds = metrics.lowerBounds();
Map<Integer, ByteBuffer> upperBounds = metrics.upperBounds();

Expand Down
109 changes: 109 additions & 0 deletions data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestMetrics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

/**
* Test Metrics for ORC.
*/
public class TestOrcMetrics extends TestMetrics {

static final ImmutableSet<Object> BINARY_TYPES = ImmutableSet.of(Type.TypeID.BINARY,
Type.TypeID.FIXED, Type.TypeID.UUID);

@Rule
public TemporaryFolder temp = new TemporaryFolder();

@Override
public FileFormat fileFormat() {
return FileFormat.ORC;
}

@Override
public Metrics getMetrics(InputFile file) {
return OrcMetrics.fromInputFile(file);
}

@Override
public InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException {
throw new UnsupportedOperationException("supportsSmallRowGroups = " + supportsSmallRowGroups());
}

@Override
public InputFile writeRecords(Schema schema, Record... records) throws IOException {
return writeRecords(schema, ImmutableMap.of(), records);
}

private InputFile writeRecords(Schema schema, Map<String, String> properties, Record... records) throws IOException {
File tmpFolder = temp.newFolder("orc");
String filename = UUID.randomUUID().toString();
OutputFile file = Files.localOutput(new File(tmpFolder, FileFormat.ORC.addExtension(filename)));
try (FileAppender<Record> writer = ORC.write(file)
.schema(schema)
.setAll(properties)
.createWriterFunc(GenericOrcWriter::buildWriter)
.build()) {
writer.addAll(Lists.newArrayList(records));
}
return file.toInputFile();
}

@Override
public int splitCount(InputFile inputFile) throws IOException {
return 0;
}

private boolean isBinaryType(Type type) {
return BINARY_TYPES.contains(type.typeId());
}

@Override
protected <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) {
if (isBinaryType(type)) {
Assert.assertFalse("ORC binary field should not have lower bounds.",
metrics.lowerBounds().containsKey(fieldId));
Assert.assertFalse("ORC binary field should not have upper bounds.",
metrics.lowerBounds().containsKey(fieldId));
return;
}
super.assertBounds(fieldId, type, lowerBound, upperBound, metrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public class TestParquetMetrics extends TestMetrics {
@Rule
public TemporaryFolder temp = new TemporaryFolder();

@Override
public FileFormat fileFormat() {
return FileFormat.PARQUET;
}

@Override
public Metrics getMetrics(InputFile file) {
return ParquetUtil.fileMetrics(file);
Expand Down Expand Up @@ -80,8 +85,8 @@ private InputFile writeRecords(Schema schema, Map<String, String> properties, Re
}

@Override
public int splitCount(InputFile parquetFile) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(parquetFile))) {
public int splitCount(InputFile inputFile) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) {
return reader.getRowGroups().size();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ private static boolean isSameType(TypeDescription orcType, Type icebergType) {
}
}

private static Optional<Integer> icebergID(TypeDescription orcType) {
static Optional<Integer> icebergID(TypeDescription orcType) {
return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE))
.map(Integer::parseInt);
}
Expand Down
2 changes: 2 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public void add(D datum) {

@Override
public Metrics metrics() {
Preconditions.checkState(isClosed,
"Cannot return metrics while appending to an open file.");
return OrcMetrics.fromWriter(writer);
}

Expand Down
Loading