diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java index ad09079a727b..0835795119f8 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java @@ -20,12 +20,17 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.NullType; public class FlinkRowData { private FlinkRowData() {} public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + if (fieldType instanceof NullType) { + return rowData -> null; + } + RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos); return rowData -> { // Be sure to check for null values, even if the field is required. Flink diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java index f8f1b74b1ceb..72a646991456 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java @@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.TimestampType; @@ -85,6 +86,8 @@ public LogicalType map(Types.MapType map, LogicalType keyResult, LogicalType val @Override public LogicalType primitive(Type.PrimitiveType primitive) { switch (primitive.typeId()) { + case UNKNOWN: + return new NullType(); case BOOLEAN: return new BooleanType(); case INTEGER: @@ -113,6 +116,15 @@ public LogicalType primitive(Type.PrimitiveType primitive) { // MICROS return new TimestampType(6); } + case TIMESTAMP_NANO: + Types.TimestampNanoType timestamp9 = (Types.TimestampNanoType) primitive; + if (timestamp9.shouldAdjustToUTC()) { + // NANOS + return new LocalZonedTimestampType(9); + } else { + // NANOS + return new TimestampType(9); + } case STRING: return new VarCharType(VarCharType.MAX_LENGTH); case UUID: diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java index 873e65783119..66ed95792e62 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java @@ -119,6 +119,9 @@ public ValueWriter primitive(LogicalType type, Schema primitive) { case "timestamp-micros": return FlinkValueWriters.timestampMicros(); + case "timestamp-nanos": + return FlinkValueWriters.timestampNanos(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return FlinkValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index fc407fe2a1a8..5c3581aef3ec 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -21,8 +21,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.time.Instant; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -274,17 +272,11 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - if (timestampLogicalType.isAdjustedToUTC()) { - return Optional.of(new MillisToTimestampTzReader(desc)); - } else { - return Optional.of(new MillisToTimestampReader(desc)); - } + return Optional.of(new MillisToTimestampReader(desc)); } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - if (timestampLogicalType.isAdjustedToUTC()) { - return Optional.of(new MicrosToTimestampTzReader(desc)); - } else { - return Optional.of(new MicrosToTimestampReader(desc)); - } + return Optional.of(new MicrosToTimestampReader(desc)); + } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.NANOS) { + return Optional.of(new NanosToTimestampReader(desc)); } return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); @@ -412,25 +404,17 @@ public DecimalData read(DecimalData ignored) { } } - private static class MicrosToTimestampTzReader + private static class NanosToTimestampReader extends ParquetValueReaders.UnboxedReader { - MicrosToTimestampTzReader(ColumnDescriptor desc) { + NanosToTimestampReader(ColumnDescriptor desc) { super(desc); } @Override public TimestampData read(TimestampData ignored) { long value = readLong(); - return TimestampData.fromLocalDateTime( - Instant.ofEpochSecond( - Math.floorDiv(value, 1000_000L), Math.floorMod(value, 1000_000L) * 1000L) - .atOffset(ZoneOffset.UTC) - .toLocalDateTime()); - } - - @Override - public long readLong() { - return column.nextLong(); + return TimestampData.fromEpochMillis( + Math.floorDiv(value, 1_000_000L), Math.floorMod(value, 1_000_000)); } } @@ -442,15 +426,9 @@ private static class MicrosToTimestampReader @Override public TimestampData read(TimestampData ignored) { - long value = readLong(); - return TimestampData.fromInstant( - Instant.ofEpochSecond( - Math.floorDiv(value, 1000_000L), Math.floorMod(value, 1000_000L) * 1000L)); - } - - @Override - public long readLong() { - return column.nextLong(); + long micros = readLong(); + return TimestampData.fromEpochMillis( + Math.floorDiv(micros, 1000L), Math.floorMod(micros, 1000) * 1000); } } @@ -465,30 +443,6 @@ public TimestampData read(TimestampData ignored) { long millis = readLong(); return TimestampData.fromEpochMillis(millis); } - - @Override - public long readLong() { - return column.nextLong(); - } - } - - private static class MillisToTimestampTzReader - extends ParquetValueReaders.UnboxedReader { - MillisToTimestampTzReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public TimestampData read(TimestampData ignored) { - long millis = readLong(); - return TimestampData.fromLocalDateTime( - Instant.ofEpochMilli(millis).atOffset(ZoneOffset.UTC).toLocalDateTime()); - } - - @Override - public long readLong() { - return column.nextLong(); - } } private static class StringReader extends ParquetValueReaders.PrimitiveReader { diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 30ec5069ae09..0a9ad475f733 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.MapData; @@ -46,7 +47,17 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.BsonLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -134,43 +145,15 @@ private ParquetValueWriter newOption(Type fieldType, ParquetValueWriter wr public ParquetValueWriter primitive(LogicalType fType, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); - if (primitive.getOriginalType() != null) { - switch (primitive.getOriginalType()) { - case ENUM: - case JSON: - case UTF8: - return strings(desc); - case DATE: - case INT_8: - case INT_16: - case INT_32: - return ints(fType, desc); - case INT_64: - return ParquetValueWriters.longs(desc); - case TIME_MICROS: - return timeMicros(desc); - case TIMESTAMP_MICROS: - return timestamps(desc); - case DECIMAL: - DecimalLogicalTypeAnnotation decimal = - (DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation(); - switch (primitive.getPrimitiveTypeName()) { - case INT32: - return decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale()); - case INT64: - return decimalAsLong(desc, decimal.getPrecision(), decimal.getScale()); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale()); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - case BSON: - return byteArrays(desc); - default: - throw new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getOriginalType()); + LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); + if (annotation != null) { + Optional> writer = + annotation.accept(new LogicalTypeWriterBuilder(fType, desc)); + if (writer.isPresent()) { + return writer.get(); + } else { + throw new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getOriginalType()); } } @@ -194,8 +177,104 @@ public ParquetValueWriter primitive(LogicalType fType, PrimitiveType primitiv } } - private static ParquetValueWriters.PrimitiveWriter ints( - LogicalType type, ColumnDescriptor desc) { + private static class LogicalTypeWriterBuilder + implements LogicalTypeAnnotationVisitor> { + private final LogicalType flinkType; + private final ColumnDescriptor desc; + + private LogicalTypeWriterBuilder(LogicalType flinkType, ColumnDescriptor desc) { + this.flinkType = flinkType; + this.desc = desc; + } + + @Override + public Optional> visit(StringLogicalTypeAnnotation strings) { + return Optional.of(strings(desc)); + } + + @Override + public Optional> visit(EnumLogicalTypeAnnotation enums) { + return Optional.of(strings(desc)); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimal) { + ParquetValueWriter writer; + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + writer = decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale()); + break; + case INT64: + writer = decimalAsLong(desc, decimal.getPrecision(), decimal.getScale()); + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + writer = decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale()); + break; + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + + desc.getPrimitiveType().getPrimitiveTypeName()); + } + return Optional.of(writer); + } + + @Override + public Optional> visit(DateLogicalTypeAnnotation dates) { + return Optional.of(ints(flinkType, desc)); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation times) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(times.getUnit()), + "Cannot write time in %s, only MICROS is supported", + times.getUnit()); + return Optional.of(timeMicros(desc)); + } + + @Override + public Optional> visit(TimestampLogicalTypeAnnotation timestamps) { + ParquetValueWriter writer; + switch (timestamps.getUnit()) { + case NANOS: + writer = timestampNanos(desc); + break; + case MICROS: + writer = timestamps(desc); + break; + default: + throw new UnsupportedOperationException("Unsupported timestamp type: " + timestamps); + } + + return Optional.of(writer); + } + + @Override + public Optional> visit(IntLogicalTypeAnnotation type) { + Preconditions.checkArgument(type.isSigned(), "Cannot write unsigned integer type: %s", type); + ParquetValueWriter writer; + if (type.getBitWidth() < 64) { + writer = ints(flinkType, desc); + } else { + writer = ParquetValueWriters.longs(desc); + } + + return Optional.of(writer); + } + + @Override + public Optional> visit(JsonLogicalTypeAnnotation ignored) { + return Optional.of(strings(desc)); + } + + @Override + public Optional> visit(BsonLogicalTypeAnnotation ignored) { + return Optional.of(byteArrays(desc)); + } + } + + private static ParquetValueWriter ints(LogicalType type, ColumnDescriptor desc) { if (type instanceof TinyIntType) { return ParquetValueWriters.tinyints(desc); } else if (type instanceof SmallIntType) { @@ -204,15 +283,15 @@ private static ParquetValueWriters.PrimitiveWriter ints( return ParquetValueWriters.ints(desc); } - private static ParquetValueWriters.PrimitiveWriter strings(ColumnDescriptor desc) { + private static ParquetValueWriter strings(ColumnDescriptor desc) { return new StringDataWriter(desc); } - private static ParquetValueWriters.PrimitiveWriter timeMicros(ColumnDescriptor desc) { + private static ParquetValueWriter timeMicros(ColumnDescriptor desc) { return new TimeMicrosWriter(desc); } - private static ParquetValueWriters.PrimitiveWriter decimalAsInteger( + private static ParquetValueWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { Preconditions.checkArgument( precision <= 9, @@ -222,7 +301,7 @@ private static ParquetValueWriters.PrimitiveWriter decimalAsInteger return new IntegerDecimalWriter(desc, precision, scale); } - private static ParquetValueWriters.PrimitiveWriter decimalAsLong( + private static ParquetValueWriter decimalAsLong( ColumnDescriptor desc, int precision, int scale) { Preconditions.checkArgument( precision <= 18, @@ -232,17 +311,20 @@ private static ParquetValueWriters.PrimitiveWriter decimalAsLong( return new LongDecimalWriter(desc, precision, scale); } - private static ParquetValueWriters.PrimitiveWriter decimalAsFixed( + private static ParquetValueWriter decimalAsFixed( ColumnDescriptor desc, int precision, int scale) { return new FixedDecimalWriter(desc, precision, scale); } - private static ParquetValueWriters.PrimitiveWriter timestamps( - ColumnDescriptor desc) { + private static ParquetValueWriter timestamps(ColumnDescriptor desc) { return new TimestampDataWriter(desc); } - private static ParquetValueWriters.PrimitiveWriter byteArrays(ColumnDescriptor desc) { + private static ParquetValueWriter timestampNanos(ColumnDescriptor desc) { + return new TimestampNanoDataWriter(desc); + } + + private static ParquetValueWriter byteArrays(ColumnDescriptor desc) { return new ByteArrayWriter(desc); } @@ -362,6 +444,19 @@ public void write(int repetitionLevel, TimestampData value) { } } + private static class TimestampNanoDataWriter + extends ParquetValueWriters.PrimitiveWriter { + private TimestampNanoDataWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, TimestampData value) { + column.writeLong( + repetitionLevel, value.getMillisecond() * 1_000_000L + value.getNanoOfMillisecond()); + } + } + private static class ByteArrayWriter extends ParquetValueWriters.PrimitiveWriter { private ByteArrayWriter(ColumnDescriptor desc) { super(desc); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java index b7a81752d4a0..edc7041a4d04 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java @@ -97,7 +97,8 @@ public ValueReader record(Type partner, Schema record, List> f Types.StructType expected = partner.asStructType(); List>> readPlan = - ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + ValueReaders.buildReadPlan( + expected, record, fieldReaders, idToConstant, RowDataUtil::convertConstant); // TODO: should this pass expected so that struct.get can reuse containers? return FlinkValueReaders.struct(readPlan, expected.fields().size()); @@ -142,6 +143,9 @@ public ValueReader primitive(Type partner, Schema primitive) { case "timestamp-micros": return FlinkValueReaders.timestampMicros(); + case "timestamp-nanos": + return FlinkValueReaders.timestampNanos(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return FlinkValueReaders.decimal( diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 0c6ff2411160..80b36d939ece 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -70,6 +70,10 @@ static ValueReader timestampMicros() { return TimestampMicrosReader.INSTANCE; } + static ValueReader timestampNanos() { + return TimestampNanosReader.INSTANCE; + } + static ValueReader decimal( ValueReader unscaledReader, int precision, int scale) { return new DecimalReader(unscaledReader, precision, scale); @@ -176,16 +180,24 @@ private static class TimestampMicrosReader implements ValueReader @Override public TimestampData read(Decoder decoder, Object reuse) throws IOException { long micros = decoder.readLong(); - long mills = micros / 1000; - int nanos = ((int) (micros % 1000)) * 1000; - if (nanos < 0) { - nanos += 1_000_000; - mills -= 1; - } + long mills = Math.floorDiv(micros, 1000); + int nanos = Math.floorMod(micros, 1000) * 1000; return TimestampData.fromEpochMillis(mills, nanos); } } + private static class TimestampNanosReader implements ValueReader { + private static final TimestampNanosReader INSTANCE = new TimestampNanosReader(); + + @Override + public TimestampData read(Decoder decoder, Object reuse) throws IOException { + long nanos = decoder.readLong(); + long mills = Math.floorDiv(nanos, 1_000_000); + int leftover = Math.floorMod(nanos, 1_000_000); + return TimestampData.fromEpochMillis(mills, leftover); + } + } + private static class ArrayReader implements ValueReader { private final ValueReader elementReader; private final List reusedList = Lists.newArrayList(); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java index e82ab2648146..f87e63704965 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java @@ -51,6 +51,10 @@ static ValueWriter timestampMicros() { return TimestampMicrosWriter.INSTANCE; } + static ValueWriter timestampNanos() { + return TimestampNanosWriter.INSTANCE; + } + static ValueWriter decimal(int precision, int scale) { return new DecimalWriter(precision, scale); } @@ -130,6 +134,17 @@ public void write(TimestampData timestampData, Encoder encoder) throws IOExcepti } } + private static class TimestampNanosWriter implements ValueWriter { + private static final TimestampNanosWriter INSTANCE = new TimestampNanosWriter(); + + @Override + public void write(TimestampData timestampData, Encoder encoder) throws IOException { + long nanos = + timestampData.getMillisecond() * 1_000_000 + timestampData.getNanoOfMillisecond(); + encoder.writeLong(nanos); + } + } + private static class ArrayWriter implements ValueWriter { private final ValueWriter elementWriter; private final ArrayData.ElementGetter elementGetter; diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java index 33feb2e32118..6bb2693a0986 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType.RowField; @@ -173,18 +174,24 @@ private static T visitField( private static List visitFields( RowType struct, GroupType group, ParquetWithFlinkSchemaVisitor visitor) { List sFields = struct.getFields(); - Preconditions.checkArgument( - sFields.size() == group.getFieldCount(), "Structs do not match: %s and %s", struct, group); List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (int i = 0; i < sFields.size(); i += 1) { - Type field = group.getFields().get(i); - RowType.RowField sField = sFields.get(i); + + int pos = 0; + for (RowField sField : sFields) { + if (sField.getType().getTypeRoot() == LogicalTypeRoot.NULL) { + // skip null types that are not in the Parquet schema + continue; + } + + Type field = group.getFields().get(pos); Preconditions.checkArgument( field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.getName())), "Structs do not match: field %s != %s", field.getName(), sField.getName()); results.add(visitField(sField, field, visitor)); + + pos += 1; } return results; diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java index 4bd85bbd97b4..f23a7ee3d0d3 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.UUID; import org.apache.avro.generic.GenericData; import org.apache.avro.util.Utf8; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -34,6 +35,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; public class RowDataUtil { @@ -67,6 +69,8 @@ public static Object convertConstant(Type type, Object value) { return (int) ((Long) value / 1000); case TIMESTAMP: // TimestampData return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value)); + case UUID: + return UUIDUtil.convert((UUID) value); default: } return value; diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java index e532fb62615c..a79406b75cf2 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java @@ -87,11 +87,9 @@ private static Object convert(Type type, Object object) { LocalTime localTime = (LocalTime) object; return (int) TimeUnit.NANOSECONDS.toMillis(localTime.toNanoOfDay()); case TIMESTAMP: - if (((Types.TimestampType) type).shouldAdjustToUTC()) { - return TimestampData.fromInstant(((OffsetDateTime) object).toInstant()); - } else { - return TimestampData.fromLocalDateTime((LocalDateTime) object); - } + return convertTimestamp(object, ((Types.TimestampType) type).shouldAdjustToUTC()); + case TIMESTAMP_NANO: + return convertTimestamp(object, ((Types.TimestampNanoType) type).shouldAdjustToUTC()); case STRING: return StringData.fromString((String) object); case UUID: @@ -132,4 +130,16 @@ private static Object convert(Type type, Object object) { throw new UnsupportedOperationException("Not a supported type: " + type); } } + + private static TimestampData convertTimestamp(Object timestamp, boolean shouldAdjustToUTC) { + if (shouldAdjustToUTC) { + return TimestampData.fromEpochMillis( + ((OffsetDateTime) timestamp).toInstant().toEpochMilli(), + ((OffsetDateTime) timestamp).getNano() % 1_000_000); + } else { + return TimestampData.fromEpochMillis( + ((LocalDateTime) timestamp).toInstant(ZoneOffset.UTC).toEpochMilli(), + ((LocalDateTime) timestamp).getNano() % 1_000_000); + } + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 6f8fc518a8f9..05e32555b0a5 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -59,6 +59,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; @@ -187,11 +188,38 @@ public static void assertRowData( types.add(field.type()); } - for (int i = 0; i < types.size(); i += 1) { - LogicalType logicalType = ((RowType) rowType).getTypeAt(i); - Object expected = expectedRecord.get(i, Object.class); - Object actual = FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); - assertEquals(types.get(i), logicalType, expected, actual); + if (expectedRecord instanceof Record) { + Record expected = (Record) expectedRecord; + Types.StructType expectedType = expected.struct(); + int pos = 0; + for (Types.NestedField field : structType.fields()) { + Types.NestedField expectedField = expectedType.field(field.fieldId()); + LogicalType logicalType = ((RowType) rowType).getTypeAt(pos); + Object actualValue = + FlinkRowData.createFieldGetter(logicalType, pos).getFieldOrNull(actualRowData); + if (expectedField != null) { + assertEquals( + field.type(), logicalType, expected.getField(expectedField.name()), actualValue); + } else { + // convert the initial value to generic because that is the data model used to generate + // the expected records + assertEquals( + field.type(), + logicalType, + GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()), + actualValue); + } + pos += 1; + } + + } else { + for (int i = 0; i < types.size(); i += 1) { + LogicalType logicalType = ((RowType) rowType).getTypeAt(i); + Object expected = expectedRecord.get(i, Object.class); + Object actual = + FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); + assertEquals(types.get(i), logicalType, expected, actual); + } } } @@ -256,6 +284,25 @@ private static void assertEquals( .isEqualTo(ts); } break; + case TIMESTAMP_NANO: + if (((Types.TimestampNanoType) type).shouldAdjustToUTC()) { + assertThat(expected) + .as("Should expect a OffsetDataTime") + .isInstanceOf(OffsetDateTime.class); + OffsetDateTime ts = (OffsetDateTime) expected; + assertThat(((TimestampData) actual).toLocalDateTime()) + .as("OffsetDataTime should be equal") + .isEqualTo(ts.toLocalDateTime()); + } else { + assertThat(expected) + .as("Should expect a LocalDataTime") + .isInstanceOf(LocalDateTime.class); + LocalDateTime ts = (LocalDateTime) expected; + assertThat(((TimestampData) actual).toLocalDateTime()) + .as("LocalDataTime should be equal") + .isEqualTo(ts); + } + break; case BINARY: assertThat(ByteBuffer.wrap((byte[]) actual)) .as("Should expect a ByteBuffer") diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java deleted file mode 100644 index fc5c688321c1..000000000000 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.data; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.File; -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.util.Iterator; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.Files; -import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.DataTest; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.RandomGenericData; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; -import org.junit.jupiter.api.Test; - -public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { - - private static final int NUM_RECORDS = 100; - - private static final Schema SCHEMA_NUM_TYPE = - new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "int", Types.IntegerType.get()), - Types.NestedField.optional(3, "float", Types.FloatType.get()), - Types.NestedField.optional(4, "double", Types.DoubleType.get()), - Types.NestedField.optional(5, "date", Types.DateType.get()), - Types.NestedField.optional(6, "time", Types.TimeType.get()), - Types.NestedField.optional(7, "timestamp", Types.TimestampType.withoutZone()), - Types.NestedField.optional(8, "bigint", Types.LongType.get()), - Types.NestedField.optional(9, "decimal", Types.DecimalType.of(4, 2))); - - @Override - protected void writeAndValidate(Schema schema) throws IOException { - List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords); - } - - protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - - @Override - protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); - List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); - - File recordsFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(recordsFile.delete()).isTrue(); - - // Write the expected records into AVRO file, then read them into RowData and assert with the - // expected Record list. - try (FileAppender writer = - Avro.write(Files.localOutput(recordsFile)) - .schema(schema) - .createWriterFunc(DataWriter::create) - .build()) { - writer.addAll(expectedRecords); - } - - try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { - Iterator expected = expectedRecords.iterator(); - Iterator rows = reader.iterator(); - for (int i = 0; i < expectedRecords.size(); i++) { - assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); - } - assertThat(rows).isExhausted(); - } - - File rowDataFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(rowDataFile.delete()).isTrue(); - - // Write the expected RowData into AVRO file, then read them into Record and assert with the - // expected RowData list. - try (FileAppender writer = - Avro.write(Files.localOutput(rowDataFile)) - .schema(schema) - .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) - .build()) { - writer.addAll(expectedRows); - } - - try (CloseableIterable reader = - Avro.read(Files.localInput(rowDataFile)) - .project(schema) - .createReaderFunc(DataReader::create) - .build()) { - Iterator expected = expectedRows.iterator(); - Iterator records = reader.iterator(); - for (int i = 0; i < expectedRecords.size(); i += 1) { - assertThat(records).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); - } - assertThat(records).isExhausted(); - } - } - - private Record recordNumType( - int id, - int intV, - float floatV, - double doubleV, - long date, - long time, - long timestamp, - long bigint, - double decimal) { - Record record = GenericRecord.create(SCHEMA_NUM_TYPE); - record.setField("id", id); - record.setField("int", intV); - record.setField("float", floatV); - record.setField("double", doubleV); - record.setField( - "date", DateTimeUtil.dateFromDays((int) new Date(date).toLocalDate().toEpochDay())); - record.setField("time", new Time(time).toLocalTime()); - record.setField("timestamp", DateTimeUtil.timestampFromMicros(timestamp * 1000)); - record.setField("bigint", bigint); - record.setField("decimal", BigDecimal.valueOf(decimal)); - return record; - } - - @Test - public void testNumericTypes() throws IOException { - List expected = - ImmutableList.of( - recordNumType( - 2, - Integer.MAX_VALUE, - Float.MAX_VALUE, - Double.MAX_VALUE, - Long.MAX_VALUE, - 1643811742000L, - 1643811742000L, - 1643811742000L, - 10.24d), - recordNumType( - 2, - Integer.MIN_VALUE, - Float.MIN_VALUE, - Double.MIN_VALUE, - Long.MIN_VALUE, - 1643811742000L, - 1643811742000L, - 1643811742000L, - 10.24d)); - - writeAndValidate(SCHEMA_NUM_TYPE, expected); - } -} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java deleted file mode 100644 index 102a26a94784..000000000000 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.data; - -import java.io.File; -import org.apache.iceberg.Files; -import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; - -public class TestFlinkAvroPlannedReaderWriter extends AbstractTestFlinkAvroReaderWriter { - - @Override - protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) { - return Avro.read(Files.localInput(recordsFile)) - .project(schema) - .createResolvingReader(FlinkPlannedAvroReader::create); - } -} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java new file mode 100644 index 000000000000..7000536c2688 --- /dev/null +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.DataTest; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestFlinkAvroReaderWriter extends DataTest { + + private static final int NUM_RECORDS = 100; + + @Override + protected boolean supportsDefaultValues() { + return true; + } + + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + writeAndValidate(schema, schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expectedRecords = RandomGenericData.generate(writeSchema, NUM_RECORDS, 1991L); + writeAndValidate(writeSchema, expectedSchema, expectedRecords); + } + + protected void writeAndValidate( + Schema writeSchema, Schema expectedSchema, List expectedRecords) throws IOException { + List expectedRows = + Lists.newArrayList(RandomRowData.convert(writeSchema, expectedRecords)); + + OutputFile outputFile = new InMemoryOutputFile(); + + // Write the expected records into AVRO file, then read them into RowData and assert with the + // expected Record list. + try (FileAppender writer = + Avro.write(outputFile).schema(writeSchema).createWriterFunc(DataWriter::create).build()) { + writer.addAll(expectedRecords); + } + + RowType flinkSchema = FlinkSchemaUtil.convert(expectedSchema); + + try (CloseableIterable reader = + Avro.read(outputFile.toInputFile()) + .project(expectedSchema) + .createResolvingReader(FlinkPlannedAvroReader::create) + .build()) { + Iterator expected = expectedRecords.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < expectedRecords.size(); i++) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData( + expectedSchema.asStruct(), flinkSchema, expected.next(), rows.next()); + } + assertThat(rows).isExhausted(); + } + + OutputFile file = new InMemoryOutputFile(); + + // Write the expected RowData into AVRO file, then read them into Record and assert with the + // expected RowData list. + try (FileAppender writer = + Avro.write(file) + .schema(writeSchema) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .build()) { + writer.addAll(expectedRows); + } + + try (CloseableIterable reader = + Avro.read(file.toInputFile()) + .project(expectedSchema) + .createResolvingReader(FlinkPlannedAvroReader::create) + .build()) { + Iterator expected = expectedRecords.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < expectedRecords.size(); i += 1) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData( + expectedSchema.asStruct(), flinkSchema, expected.next(), rows.next()); + } + assertThat(rows).isExhausted(); + } + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 341b38793616..7c2df72270bf 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -64,6 +64,16 @@ protected boolean supportsDefaultValues() { return true; } + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + @Test public void testBuildReader() { MessageType fileSchema = diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index 657a2da542a5..f21f1fadd7ec 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; @@ -29,7 +28,6 @@ import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DataTest; import org.apache.iceberg.data.RandomGenericData; @@ -38,8 +36,10 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; @@ -49,14 +49,23 @@ public class TestFlinkParquetWriter extends DataTest { @TempDir private Path temp; + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).isTrue(); + OutputFile outputFile = new InMemoryOutputFile(); LogicalType logicalType = FlinkSchemaUtil.convert(schema); try (FileAppender writer = - Parquet.write(Files.localOutput(testFile)) + Parquet.write(outputFile) .schema(schema) .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) .build()) { @@ -64,7 +73,7 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws } try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(outputFile.toInputFile()) .project(schema) .createReaderFunc(msgType -> GenericParquetReaders.buildReader(schema, msgType)) .build()) { diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java index ad09079a727b..0835795119f8 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java @@ -20,12 +20,17 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.NullType; public class FlinkRowData { private FlinkRowData() {} public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + if (fieldType instanceof NullType) { + return rowData -> null; + } + RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos); return rowData -> { // Be sure to check for null values, even if the field is required. Flink diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java index f8f1b74b1ceb..72a646991456 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java @@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimeType; import org.apache.flink.table.types.logical.TimestampType; @@ -85,6 +86,8 @@ public LogicalType map(Types.MapType map, LogicalType keyResult, LogicalType val @Override public LogicalType primitive(Type.PrimitiveType primitive) { switch (primitive.typeId()) { + case UNKNOWN: + return new NullType(); case BOOLEAN: return new BooleanType(); case INTEGER: @@ -113,6 +116,15 @@ public LogicalType primitive(Type.PrimitiveType primitive) { // MICROS return new TimestampType(6); } + case TIMESTAMP_NANO: + Types.TimestampNanoType timestamp9 = (Types.TimestampNanoType) primitive; + if (timestamp9.shouldAdjustToUTC()) { + // NANOS + return new LocalZonedTimestampType(9); + } else { + // NANOS + return new TimestampType(9); + } case STRING: return new VarCharType(VarCharType.MAX_LENGTH); case UUID: diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java index 873e65783119..66ed95792e62 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java @@ -119,6 +119,9 @@ public ValueWriter primitive(LogicalType type, Schema primitive) { case "timestamp-micros": return FlinkValueWriters.timestampMicros(); + case "timestamp-nanos": + return FlinkValueWriters.timestampNanos(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return FlinkValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index fc407fe2a1a8..5c3581aef3ec 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -21,8 +21,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.time.Instant; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -274,17 +272,11 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - if (timestampLogicalType.isAdjustedToUTC()) { - return Optional.of(new MillisToTimestampTzReader(desc)); - } else { - return Optional.of(new MillisToTimestampReader(desc)); - } + return Optional.of(new MillisToTimestampReader(desc)); } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - if (timestampLogicalType.isAdjustedToUTC()) { - return Optional.of(new MicrosToTimestampTzReader(desc)); - } else { - return Optional.of(new MicrosToTimestampReader(desc)); - } + return Optional.of(new MicrosToTimestampReader(desc)); + } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.NANOS) { + return Optional.of(new NanosToTimestampReader(desc)); } return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); @@ -412,25 +404,17 @@ public DecimalData read(DecimalData ignored) { } } - private static class MicrosToTimestampTzReader + private static class NanosToTimestampReader extends ParquetValueReaders.UnboxedReader { - MicrosToTimestampTzReader(ColumnDescriptor desc) { + NanosToTimestampReader(ColumnDescriptor desc) { super(desc); } @Override public TimestampData read(TimestampData ignored) { long value = readLong(); - return TimestampData.fromLocalDateTime( - Instant.ofEpochSecond( - Math.floorDiv(value, 1000_000L), Math.floorMod(value, 1000_000L) * 1000L) - .atOffset(ZoneOffset.UTC) - .toLocalDateTime()); - } - - @Override - public long readLong() { - return column.nextLong(); + return TimestampData.fromEpochMillis( + Math.floorDiv(value, 1_000_000L), Math.floorMod(value, 1_000_000)); } } @@ -442,15 +426,9 @@ private static class MicrosToTimestampReader @Override public TimestampData read(TimestampData ignored) { - long value = readLong(); - return TimestampData.fromInstant( - Instant.ofEpochSecond( - Math.floorDiv(value, 1000_000L), Math.floorMod(value, 1000_000L) * 1000L)); - } - - @Override - public long readLong() { - return column.nextLong(); + long micros = readLong(); + return TimestampData.fromEpochMillis( + Math.floorDiv(micros, 1000L), Math.floorMod(micros, 1000) * 1000); } } @@ -465,30 +443,6 @@ public TimestampData read(TimestampData ignored) { long millis = readLong(); return TimestampData.fromEpochMillis(millis); } - - @Override - public long readLong() { - return column.nextLong(); - } - } - - private static class MillisToTimestampTzReader - extends ParquetValueReaders.UnboxedReader { - MillisToTimestampTzReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public TimestampData read(TimestampData ignored) { - long millis = readLong(); - return TimestampData.fromLocalDateTime( - Instant.ofEpochMilli(millis).atOffset(ZoneOffset.UTC).toLocalDateTime()); - } - - @Override - public long readLong() { - return column.nextLong(); - } } private static class StringReader extends ParquetValueReaders.PrimitiveReader { diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 30ec5069ae09..0a9ad475f733 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.MapData; @@ -46,7 +47,17 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.BsonLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -134,43 +145,15 @@ private ParquetValueWriter newOption(Type fieldType, ParquetValueWriter wr public ParquetValueWriter primitive(LogicalType fType, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); - if (primitive.getOriginalType() != null) { - switch (primitive.getOriginalType()) { - case ENUM: - case JSON: - case UTF8: - return strings(desc); - case DATE: - case INT_8: - case INT_16: - case INT_32: - return ints(fType, desc); - case INT_64: - return ParquetValueWriters.longs(desc); - case TIME_MICROS: - return timeMicros(desc); - case TIMESTAMP_MICROS: - return timestamps(desc); - case DECIMAL: - DecimalLogicalTypeAnnotation decimal = - (DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation(); - switch (primitive.getPrimitiveTypeName()) { - case INT32: - return decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale()); - case INT64: - return decimalAsLong(desc, decimal.getPrecision(), decimal.getScale()); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale()); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - case BSON: - return byteArrays(desc); - default: - throw new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getOriginalType()); + LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); + if (annotation != null) { + Optional> writer = + annotation.accept(new LogicalTypeWriterBuilder(fType, desc)); + if (writer.isPresent()) { + return writer.get(); + } else { + throw new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getOriginalType()); } } @@ -194,8 +177,104 @@ public ParquetValueWriter primitive(LogicalType fType, PrimitiveType primitiv } } - private static ParquetValueWriters.PrimitiveWriter ints( - LogicalType type, ColumnDescriptor desc) { + private static class LogicalTypeWriterBuilder + implements LogicalTypeAnnotationVisitor> { + private final LogicalType flinkType; + private final ColumnDescriptor desc; + + private LogicalTypeWriterBuilder(LogicalType flinkType, ColumnDescriptor desc) { + this.flinkType = flinkType; + this.desc = desc; + } + + @Override + public Optional> visit(StringLogicalTypeAnnotation strings) { + return Optional.of(strings(desc)); + } + + @Override + public Optional> visit(EnumLogicalTypeAnnotation enums) { + return Optional.of(strings(desc)); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimal) { + ParquetValueWriter writer; + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + writer = decimalAsInteger(desc, decimal.getPrecision(), decimal.getScale()); + break; + case INT64: + writer = decimalAsLong(desc, decimal.getPrecision(), decimal.getScale()); + break; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + writer = decimalAsFixed(desc, decimal.getPrecision(), decimal.getScale()); + break; + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + + desc.getPrimitiveType().getPrimitiveTypeName()); + } + return Optional.of(writer); + } + + @Override + public Optional> visit(DateLogicalTypeAnnotation dates) { + return Optional.of(ints(flinkType, desc)); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation times) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(times.getUnit()), + "Cannot write time in %s, only MICROS is supported", + times.getUnit()); + return Optional.of(timeMicros(desc)); + } + + @Override + public Optional> visit(TimestampLogicalTypeAnnotation timestamps) { + ParquetValueWriter writer; + switch (timestamps.getUnit()) { + case NANOS: + writer = timestampNanos(desc); + break; + case MICROS: + writer = timestamps(desc); + break; + default: + throw new UnsupportedOperationException("Unsupported timestamp type: " + timestamps); + } + + return Optional.of(writer); + } + + @Override + public Optional> visit(IntLogicalTypeAnnotation type) { + Preconditions.checkArgument(type.isSigned(), "Cannot write unsigned integer type: %s", type); + ParquetValueWriter writer; + if (type.getBitWidth() < 64) { + writer = ints(flinkType, desc); + } else { + writer = ParquetValueWriters.longs(desc); + } + + return Optional.of(writer); + } + + @Override + public Optional> visit(JsonLogicalTypeAnnotation ignored) { + return Optional.of(strings(desc)); + } + + @Override + public Optional> visit(BsonLogicalTypeAnnotation ignored) { + return Optional.of(byteArrays(desc)); + } + } + + private static ParquetValueWriter ints(LogicalType type, ColumnDescriptor desc) { if (type instanceof TinyIntType) { return ParquetValueWriters.tinyints(desc); } else if (type instanceof SmallIntType) { @@ -204,15 +283,15 @@ private static ParquetValueWriters.PrimitiveWriter ints( return ParquetValueWriters.ints(desc); } - private static ParquetValueWriters.PrimitiveWriter strings(ColumnDescriptor desc) { + private static ParquetValueWriter strings(ColumnDescriptor desc) { return new StringDataWriter(desc); } - private static ParquetValueWriters.PrimitiveWriter timeMicros(ColumnDescriptor desc) { + private static ParquetValueWriter timeMicros(ColumnDescriptor desc) { return new TimeMicrosWriter(desc); } - private static ParquetValueWriters.PrimitiveWriter decimalAsInteger( + private static ParquetValueWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { Preconditions.checkArgument( precision <= 9, @@ -222,7 +301,7 @@ private static ParquetValueWriters.PrimitiveWriter decimalAsInteger return new IntegerDecimalWriter(desc, precision, scale); } - private static ParquetValueWriters.PrimitiveWriter decimalAsLong( + private static ParquetValueWriter decimalAsLong( ColumnDescriptor desc, int precision, int scale) { Preconditions.checkArgument( precision <= 18, @@ -232,17 +311,20 @@ private static ParquetValueWriters.PrimitiveWriter decimalAsLong( return new LongDecimalWriter(desc, precision, scale); } - private static ParquetValueWriters.PrimitiveWriter decimalAsFixed( + private static ParquetValueWriter decimalAsFixed( ColumnDescriptor desc, int precision, int scale) { return new FixedDecimalWriter(desc, precision, scale); } - private static ParquetValueWriters.PrimitiveWriter timestamps( - ColumnDescriptor desc) { + private static ParquetValueWriter timestamps(ColumnDescriptor desc) { return new TimestampDataWriter(desc); } - private static ParquetValueWriters.PrimitiveWriter byteArrays(ColumnDescriptor desc) { + private static ParquetValueWriter timestampNanos(ColumnDescriptor desc) { + return new TimestampNanoDataWriter(desc); + } + + private static ParquetValueWriter byteArrays(ColumnDescriptor desc) { return new ByteArrayWriter(desc); } @@ -362,6 +444,19 @@ public void write(int repetitionLevel, TimestampData value) { } } + private static class TimestampNanoDataWriter + extends ParquetValueWriters.PrimitiveWriter { + private TimestampNanoDataWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, TimestampData value) { + column.writeLong( + repetitionLevel, value.getMillisecond() * 1_000_000L + value.getNanoOfMillisecond()); + } + } + private static class ByteArrayWriter extends ParquetValueWriters.PrimitiveWriter { private ByteArrayWriter(ColumnDescriptor desc) { super(desc); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java index b7a81752d4a0..edc7041a4d04 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java @@ -97,7 +97,8 @@ public ValueReader record(Type partner, Schema record, List> f Types.StructType expected = partner.asStructType(); List>> readPlan = - ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + ValueReaders.buildReadPlan( + expected, record, fieldReaders, idToConstant, RowDataUtil::convertConstant); // TODO: should this pass expected so that struct.get can reuse containers? return FlinkValueReaders.struct(readPlan, expected.fields().size()); @@ -142,6 +143,9 @@ public ValueReader primitive(Type partner, Schema primitive) { case "timestamp-micros": return FlinkValueReaders.timestampMicros(); + case "timestamp-nanos": + return FlinkValueReaders.timestampNanos(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return FlinkValueReaders.decimal( diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 0c6ff2411160..80b36d939ece 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -70,6 +70,10 @@ static ValueReader timestampMicros() { return TimestampMicrosReader.INSTANCE; } + static ValueReader timestampNanos() { + return TimestampNanosReader.INSTANCE; + } + static ValueReader decimal( ValueReader unscaledReader, int precision, int scale) { return new DecimalReader(unscaledReader, precision, scale); @@ -176,16 +180,24 @@ private static class TimestampMicrosReader implements ValueReader @Override public TimestampData read(Decoder decoder, Object reuse) throws IOException { long micros = decoder.readLong(); - long mills = micros / 1000; - int nanos = ((int) (micros % 1000)) * 1000; - if (nanos < 0) { - nanos += 1_000_000; - mills -= 1; - } + long mills = Math.floorDiv(micros, 1000); + int nanos = Math.floorMod(micros, 1000) * 1000; return TimestampData.fromEpochMillis(mills, nanos); } } + private static class TimestampNanosReader implements ValueReader { + private static final TimestampNanosReader INSTANCE = new TimestampNanosReader(); + + @Override + public TimestampData read(Decoder decoder, Object reuse) throws IOException { + long nanos = decoder.readLong(); + long mills = Math.floorDiv(nanos, 1_000_000); + int leftover = Math.floorMod(nanos, 1_000_000); + return TimestampData.fromEpochMillis(mills, leftover); + } + } + private static class ArrayReader implements ValueReader { private final ValueReader elementReader; private final List reusedList = Lists.newArrayList(); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java index e82ab2648146..f87e63704965 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java @@ -51,6 +51,10 @@ static ValueWriter timestampMicros() { return TimestampMicrosWriter.INSTANCE; } + static ValueWriter timestampNanos() { + return TimestampNanosWriter.INSTANCE; + } + static ValueWriter decimal(int precision, int scale) { return new DecimalWriter(precision, scale); } @@ -130,6 +134,17 @@ public void write(TimestampData timestampData, Encoder encoder) throws IOExcepti } } + private static class TimestampNanosWriter implements ValueWriter { + private static final TimestampNanosWriter INSTANCE = new TimestampNanosWriter(); + + @Override + public void write(TimestampData timestampData, Encoder encoder) throws IOException { + long nanos = + timestampData.getMillisecond() * 1_000_000 + timestampData.getNanoOfMillisecond(); + encoder.writeLong(nanos); + } + } + private static class ArrayWriter implements ValueWriter { private final ValueWriter elementWriter; private final ArrayData.ElementGetter elementGetter; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java index 33feb2e32118..6bb2693a0986 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType.RowField; @@ -173,18 +174,24 @@ private static T visitField( private static List visitFields( RowType struct, GroupType group, ParquetWithFlinkSchemaVisitor visitor) { List sFields = struct.getFields(); - Preconditions.checkArgument( - sFields.size() == group.getFieldCount(), "Structs do not match: %s and %s", struct, group); List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (int i = 0; i < sFields.size(); i += 1) { - Type field = group.getFields().get(i); - RowType.RowField sField = sFields.get(i); + + int pos = 0; + for (RowField sField : sFields) { + if (sField.getType().getTypeRoot() == LogicalTypeRoot.NULL) { + // skip null types that are not in the Parquet schema + continue; + } + + Type field = group.getFields().get(pos); Preconditions.checkArgument( field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.getName())), "Structs do not match: field %s != %s", field.getName(), sField.getName()); results.add(visitField(sField, field, visitor)); + + pos += 1; } return results; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java index 4bd85bbd97b4..f23a7ee3d0d3 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.UUID; import org.apache.avro.generic.GenericData; import org.apache.avro.util.Utf8; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -34,6 +35,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; public class RowDataUtil { @@ -67,6 +69,8 @@ public static Object convertConstant(Type type, Object value) { return (int) ((Long) value / 1000); case TIMESTAMP: // TimestampData return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value)); + case UUID: + return UUIDUtil.convert((UUID) value); default: } return value; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java index e532fb62615c..a79406b75cf2 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java @@ -87,11 +87,9 @@ private static Object convert(Type type, Object object) { LocalTime localTime = (LocalTime) object; return (int) TimeUnit.NANOSECONDS.toMillis(localTime.toNanoOfDay()); case TIMESTAMP: - if (((Types.TimestampType) type).shouldAdjustToUTC()) { - return TimestampData.fromInstant(((OffsetDateTime) object).toInstant()); - } else { - return TimestampData.fromLocalDateTime((LocalDateTime) object); - } + return convertTimestamp(object, ((Types.TimestampType) type).shouldAdjustToUTC()); + case TIMESTAMP_NANO: + return convertTimestamp(object, ((Types.TimestampNanoType) type).shouldAdjustToUTC()); case STRING: return StringData.fromString((String) object); case UUID: @@ -132,4 +130,16 @@ private static Object convert(Type type, Object object) { throw new UnsupportedOperationException("Not a supported type: " + type); } } + + private static TimestampData convertTimestamp(Object timestamp, boolean shouldAdjustToUTC) { + if (shouldAdjustToUTC) { + return TimestampData.fromEpochMillis( + ((OffsetDateTime) timestamp).toInstant().toEpochMilli(), + ((OffsetDateTime) timestamp).getNano() % 1_000_000); + } else { + return TimestampData.fromEpochMillis( + ((LocalDateTime) timestamp).toInstant(ZoneOffset.UTC).toEpochMilli(), + ((LocalDateTime) timestamp).getNano() % 1_000_000); + } + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 6f8fc518a8f9..05e32555b0a5 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -59,6 +59,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; @@ -187,11 +188,38 @@ public static void assertRowData( types.add(field.type()); } - for (int i = 0; i < types.size(); i += 1) { - LogicalType logicalType = ((RowType) rowType).getTypeAt(i); - Object expected = expectedRecord.get(i, Object.class); - Object actual = FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); - assertEquals(types.get(i), logicalType, expected, actual); + if (expectedRecord instanceof Record) { + Record expected = (Record) expectedRecord; + Types.StructType expectedType = expected.struct(); + int pos = 0; + for (Types.NestedField field : structType.fields()) { + Types.NestedField expectedField = expectedType.field(field.fieldId()); + LogicalType logicalType = ((RowType) rowType).getTypeAt(pos); + Object actualValue = + FlinkRowData.createFieldGetter(logicalType, pos).getFieldOrNull(actualRowData); + if (expectedField != null) { + assertEquals( + field.type(), logicalType, expected.getField(expectedField.name()), actualValue); + } else { + // convert the initial value to generic because that is the data model used to generate + // the expected records + assertEquals( + field.type(), + logicalType, + GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()), + actualValue); + } + pos += 1; + } + + } else { + for (int i = 0; i < types.size(); i += 1) { + LogicalType logicalType = ((RowType) rowType).getTypeAt(i); + Object expected = expectedRecord.get(i, Object.class); + Object actual = + FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); + assertEquals(types.get(i), logicalType, expected, actual); + } } } @@ -256,6 +284,25 @@ private static void assertEquals( .isEqualTo(ts); } break; + case TIMESTAMP_NANO: + if (((Types.TimestampNanoType) type).shouldAdjustToUTC()) { + assertThat(expected) + .as("Should expect a OffsetDataTime") + .isInstanceOf(OffsetDateTime.class); + OffsetDateTime ts = (OffsetDateTime) expected; + assertThat(((TimestampData) actual).toLocalDateTime()) + .as("OffsetDataTime should be equal") + .isEqualTo(ts.toLocalDateTime()); + } else { + assertThat(expected) + .as("Should expect a LocalDataTime") + .isInstanceOf(LocalDateTime.class); + LocalDateTime ts = (LocalDateTime) expected; + assertThat(((TimestampData) actual).toLocalDateTime()) + .as("LocalDataTime should be equal") + .isEqualTo(ts); + } + break; case BINARY: assertThat(ByteBuffer.wrap((byte[]) actual)) .as("Should expect a ByteBuffer") diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java deleted file mode 100644 index fc5c688321c1..000000000000 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.data; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.File; -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.util.Iterator; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.Files; -import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.DataTest; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.RandomGenericData; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; -import org.junit.jupiter.api.Test; - -public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { - - private static final int NUM_RECORDS = 100; - - private static final Schema SCHEMA_NUM_TYPE = - new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "int", Types.IntegerType.get()), - Types.NestedField.optional(3, "float", Types.FloatType.get()), - Types.NestedField.optional(4, "double", Types.DoubleType.get()), - Types.NestedField.optional(5, "date", Types.DateType.get()), - Types.NestedField.optional(6, "time", Types.TimeType.get()), - Types.NestedField.optional(7, "timestamp", Types.TimestampType.withoutZone()), - Types.NestedField.optional(8, "bigint", Types.LongType.get()), - Types.NestedField.optional(9, "decimal", Types.DecimalType.of(4, 2))); - - @Override - protected void writeAndValidate(Schema schema) throws IOException { - List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords); - } - - protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - - @Override - protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); - List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); - - File recordsFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(recordsFile.delete()).isTrue(); - - // Write the expected records into AVRO file, then read them into RowData and assert with the - // expected Record list. - try (FileAppender writer = - Avro.write(Files.localOutput(recordsFile)) - .schema(schema) - .createWriterFunc(DataWriter::create) - .build()) { - writer.addAll(expectedRecords); - } - - try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { - Iterator expected = expectedRecords.iterator(); - Iterator rows = reader.iterator(); - for (int i = 0; i < expectedRecords.size(); i++) { - assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); - } - assertThat(rows).isExhausted(); - } - - File rowDataFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(rowDataFile.delete()).isTrue(); - - // Write the expected RowData into AVRO file, then read them into Record and assert with the - // expected RowData list. - try (FileAppender writer = - Avro.write(Files.localOutput(rowDataFile)) - .schema(schema) - .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) - .build()) { - writer.addAll(expectedRows); - } - - try (CloseableIterable reader = - Avro.read(Files.localInput(rowDataFile)) - .project(schema) - .createReaderFunc(DataReader::create) - .build()) { - Iterator expected = expectedRows.iterator(); - Iterator records = reader.iterator(); - for (int i = 0; i < expectedRecords.size(); i += 1) { - assertThat(records).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); - } - assertThat(records).isExhausted(); - } - } - - private Record recordNumType( - int id, - int intV, - float floatV, - double doubleV, - long date, - long time, - long timestamp, - long bigint, - double decimal) { - Record record = GenericRecord.create(SCHEMA_NUM_TYPE); - record.setField("id", id); - record.setField("int", intV); - record.setField("float", floatV); - record.setField("double", doubleV); - record.setField( - "date", DateTimeUtil.dateFromDays((int) new Date(date).toLocalDate().toEpochDay())); - record.setField("time", new Time(time).toLocalTime()); - record.setField("timestamp", DateTimeUtil.timestampFromMicros(timestamp * 1000)); - record.setField("bigint", bigint); - record.setField("decimal", BigDecimal.valueOf(decimal)); - return record; - } - - @Test - public void testNumericTypes() throws IOException { - List expected = - ImmutableList.of( - recordNumType( - 2, - Integer.MAX_VALUE, - Float.MAX_VALUE, - Double.MAX_VALUE, - Long.MAX_VALUE, - 1643811742000L, - 1643811742000L, - 1643811742000L, - 10.24d), - recordNumType( - 2, - Integer.MIN_VALUE, - Float.MIN_VALUE, - Double.MIN_VALUE, - Long.MIN_VALUE, - 1643811742000L, - 1643811742000L, - 1643811742000L, - 10.24d)); - - writeAndValidate(SCHEMA_NUM_TYPE, expected); - } -} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java deleted file mode 100644 index 102a26a94784..000000000000 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.data; - -import java.io.File; -import org.apache.iceberg.Files; -import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; - -public class TestFlinkAvroPlannedReaderWriter extends AbstractTestFlinkAvroReaderWriter { - - @Override - protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) { - return Avro.read(Files.localInput(recordsFile)) - .project(schema) - .createResolvingReader(FlinkPlannedAvroReader::create); - } -} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java new file mode 100644 index 000000000000..7000536c2688 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.DataTest; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestFlinkAvroReaderWriter extends DataTest { + + private static final int NUM_RECORDS = 100; + + @Override + protected boolean supportsDefaultValues() { + return true; + } + + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + writeAndValidate(schema, schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expectedRecords = RandomGenericData.generate(writeSchema, NUM_RECORDS, 1991L); + writeAndValidate(writeSchema, expectedSchema, expectedRecords); + } + + protected void writeAndValidate( + Schema writeSchema, Schema expectedSchema, List expectedRecords) throws IOException { + List expectedRows = + Lists.newArrayList(RandomRowData.convert(writeSchema, expectedRecords)); + + OutputFile outputFile = new InMemoryOutputFile(); + + // Write the expected records into AVRO file, then read them into RowData and assert with the + // expected Record list. + try (FileAppender writer = + Avro.write(outputFile).schema(writeSchema).createWriterFunc(DataWriter::create).build()) { + writer.addAll(expectedRecords); + } + + RowType flinkSchema = FlinkSchemaUtil.convert(expectedSchema); + + try (CloseableIterable reader = + Avro.read(outputFile.toInputFile()) + .project(expectedSchema) + .createResolvingReader(FlinkPlannedAvroReader::create) + .build()) { + Iterator expected = expectedRecords.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < expectedRecords.size(); i++) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData( + expectedSchema.asStruct(), flinkSchema, expected.next(), rows.next()); + } + assertThat(rows).isExhausted(); + } + + OutputFile file = new InMemoryOutputFile(); + + // Write the expected RowData into AVRO file, then read them into Record and assert with the + // expected RowData list. + try (FileAppender writer = + Avro.write(file) + .schema(writeSchema) + .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) + .build()) { + writer.addAll(expectedRows); + } + + try (CloseableIterable reader = + Avro.read(file.toInputFile()) + .project(expectedSchema) + .createResolvingReader(FlinkPlannedAvroReader::create) + .build()) { + Iterator expected = expectedRecords.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < expectedRecords.size(); i += 1) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData( + expectedSchema.asStruct(), flinkSchema, expected.next(), rows.next()); + } + assertThat(rows).isExhausted(); + } + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 341b38793616..7c2df72270bf 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -64,6 +64,16 @@ protected boolean supportsDefaultValues() { return true; } + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + @Test public void testBuildReader() { MessageType fileSchema = diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index 657a2da542a5..f21f1fadd7ec 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; @@ -29,7 +28,6 @@ import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DataTest; import org.apache.iceberg.data.RandomGenericData; @@ -38,8 +36,10 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; @@ -49,14 +49,23 @@ public class TestFlinkParquetWriter extends DataTest { @TempDir private Path temp; + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).isTrue(); + OutputFile outputFile = new InMemoryOutputFile(); LogicalType logicalType = FlinkSchemaUtil.convert(schema); try (FileAppender writer = - Parquet.write(Files.localOutput(testFile)) + Parquet.write(outputFile) .schema(schema) .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) .build()) { @@ -64,7 +73,7 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws } try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(outputFile.toInputFile()) .project(schema) .createReaderFunc(msgType -> GenericParquetReaders.buildReader(schema, msgType)) .build()) {