Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ public static Type find(Schema schema, Predicate<Type> predicate) {
return visit(schema, new FindTypeVisitor(predicate));
}

public static Type find(Type type, Predicate<Type> predicate) {
return visit(type, new FindTypeVisitor(predicate));
}

public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) {
// Warning! Before changing this function, make sure that the type change doesn't introduce
// compatibility problems in partitioning.
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
}

public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) {
return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch);
}

public static long microsFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime);
}
Expand Down
9 changes: 9 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, Random rand
int choice = random.nextInt(20);

switch (primitive.typeId()) {
case UNKNOWN:
return null;

case BOOLEAN:
return choice < 10;

Expand Down Expand Up @@ -126,6 +129,9 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, Random rand
case TIMESTAMP:
return random.nextLong() % FIFTY_YEARS_IN_MICROS;

case TIMESTAMP_NANO:
return random.nextLong() % ABOUT_TEN_YEARS_IN_NANOS;

case STRING:
return randomString(random);

Expand Down Expand Up @@ -161,6 +167,8 @@ public static Object generateDictionaryEncodablePrimitive(
Type.PrimitiveType primitive, Random random) {
int value = random.nextInt(3);
switch (primitive.typeId()) {
case UNKNOWN:
return null;
case BOOLEAN:
return true; // doesn't really matter for booleans since they are not dictionary encoded
case INTEGER:
Expand Down Expand Up @@ -201,6 +209,7 @@ public static Object generateDictionaryEncodablePrimitive(

private static final long FIFTY_YEARS_IN_MICROS =
(50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4;
private static final long ABOUT_TEN_YEARS_IN_NANOS = 10L * 365 * 24 * 60 * 60 * 1_000_000_000;
private static final int ABOUT_380_YEARS_IN_DAYS = 380 * 365;
private static final long ONE_DAY_IN_MICROS = 24 * 60 * 60 * 1_000_000L;
private static final String CHARS =
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public static Schema applyNameMapping(Schema fileSchema, NameMapping nameMapping
public static boolean isTimestamptz(Schema schema) {
LogicalType logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.TimestampMillis
|| logicalType instanceof LogicalTypes.TimestampMicros) {
|| logicalType instanceof LogicalTypes.TimestampMicros
|| logicalType instanceof LogicalTypes.TimestampNanos) {
// timestamptz is adjusted to UTC
Object value = schema.getObjectProp(ADJUST_TO_UTC_PROP);

Expand All @@ -172,6 +173,10 @@ public static boolean isTimestamptz(Schema schema) {
return false;
}

public static boolean isOptional(Schema schema) {
return isOptionSchema(schema) || schema.getType() == Schema.Type.NULL;
}

public static boolean isOptionSchema(Schema schema) {
if (schema.getType() == UNION && schema.getTypes().size() == 2) {
if (schema.getTypes().get(0).getType() == Schema.Type.NULL) {
Expand All @@ -184,12 +189,15 @@ public static boolean isOptionSchema(Schema schema) {
}

static Schema toOption(Schema schema) {
if (schema.getType() == UNION) {
Preconditions.checkArgument(
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
return schema;
} else {
return Schema.createUnion(NULL, schema);
switch (schema.getType()) {
case UNION:
Preconditions.checkArgument(
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
return schema;
case NULL:
return schema;
default:
return Schema.createUnion(NULL, schema);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.Pair;

/**
Expand Down Expand Up @@ -103,16 +104,19 @@ private static <P, T> T visitUnion(
}
}
} else {
boolean encounteredNull = false;
boolean encounteredNullWithoutUnknown = false;
for (int i = 0; i < types.size(); i++) {
// For a union-type (a, b, NULL, c) and the corresponding struct type (tag, a, b, c), the
// types match according to the following pattern:
// Before NULL, branch type i in the union maps to struct field i + 1.
// After NULL, branch type i in the union maps to struct field i.
int structFieldIndex = encounteredNull ? i : i + 1;
int structFieldIndex = encounteredNullWithoutUnknown ? i : i + 1;
if (types.get(i).getType() == Schema.Type.NULL) {
visit(visitor.nullType(), types.get(i), visitor);
encounteredNull = true;
Pair<String, P> nameAndType = visitor.fieldNameAndType(type, structFieldIndex);
if (((Type) nameAndType.second()).typeId() != Type.TypeID.UNKNOWN) {
encounteredNullWithoutUnknown = true;
}
} else {
options.add(
visit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public ValueWriter<?> primitive(Schema primitive) {
case "timestamp-micros":
return ValueWriters.longs();

case "timestamp-nanos":
return ValueWriters.longs();

case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public ValueReader<?> primitive(Type partner, Schema primitive) {
return (ValueReader<Long>) (decoder, ignored) -> longs.read(decoder, null) * 1000L;

case "timestamp-micros":
// Spark uses the same representation
case "timestamp-nanos":
// both are handled in memory as long values, using the type to track units
return ValueReaders.longs();

case "decimal":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema primitive) {
return (ValueReader<Long>) (decoder, ignored) -> longs.read(decoder, null) * 1000L;

case "timestamp-micros":
case "timestamp-nanos":
// both are handled in memory as long values, using the type to track units
return ValueReaders.longs();

case "decimal":
Expand Down
70 changes: 43 additions & 27 deletions core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
Type fieldType = fieldTypes.get(i);
int fieldId = getId(field);

if (AvroSchemaUtil.isOptionSchema(field.schema())) {
if (AvroSchemaUtil.isOptional(field.schema())) {
newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc()));
} else {
newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
Expand All @@ -105,7 +105,7 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
@Override
public Type union(Schema union, List<Type> options) {
if (AvroSchemaUtil.isOptionSchema(union)) {
if (options.get(0) == null) {
if (union.getTypes().get(0).getType() == Schema.Type.NULL) {
return options.get(1);
} else {
return options.get(0);
Expand Down Expand Up @@ -165,7 +165,7 @@ public Type map(Schema map, Type valueType) {
int keyId = getKeyId(map);
int valueId = getValueId(map);

if (AvroSchemaUtil.isOptionSchema(valueSchema)) {
if (AvroSchemaUtil.isOptional(valueSchema)) {
return Types.MapType.ofOptional(keyId, valueId, Types.StringType.get(), valueType);
} else {
return Types.MapType.ofRequired(keyId, valueId, Types.StringType.get(), valueType);
Expand All @@ -177,34 +177,50 @@ public Type variant(Schema variant, Type metadataType, Type valueType) {
return Types.VariantType.get();
}

public Type logicalType(Schema primitive, LogicalType logical) {
String name = logical.getName();
if (logical instanceof LogicalTypes.Decimal) {
return Types.DecimalType.of(
((LogicalTypes.Decimal) logical).getPrecision(),
((LogicalTypes.Decimal) logical).getScale());

} else if (logical instanceof LogicalTypes.Date) {
return Types.DateType.get();

} else if (logical instanceof LogicalTypes.TimeMillis
|| logical instanceof LogicalTypes.TimeMicros) {
return Types.TimeType.get();

} else if (logical instanceof LogicalTypes.TimestampMillis
|| logical instanceof LogicalTypes.TimestampMicros) {
if (AvroSchemaUtil.isTimestamptz(primitive)) {
return Types.TimestampType.withZone();
} else {
return Types.TimestampType.withoutZone();
}

} else if (logical instanceof LogicalTypes.TimestampNanos) {
if (AvroSchemaUtil.isTimestamptz(primitive)) {
return Types.TimestampNanoType.withZone();
} else {
return Types.TimestampNanoType.withoutZone();
}

} else if (LogicalTypes.uuid().getName().equals(name)) {
return Types.UUIDType.get();
}

return null;
}

@Override
public Type primitive(Schema primitive) {
// first check supported logical types
LogicalType logical = primitive.getLogicalType();
if (logical != null) {
String name = logical.getName();
if (logical instanceof LogicalTypes.Decimal) {
return Types.DecimalType.of(
((LogicalTypes.Decimal) logical).getPrecision(),
((LogicalTypes.Decimal) logical).getScale());

} else if (logical instanceof LogicalTypes.Date) {
return Types.DateType.get();

} else if (logical instanceof LogicalTypes.TimeMillis
|| logical instanceof LogicalTypes.TimeMicros) {
return Types.TimeType.get();

} else if (logical instanceof LogicalTypes.TimestampMillis
|| logical instanceof LogicalTypes.TimestampMicros) {
if (AvroSchemaUtil.isTimestamptz(primitive)) {
return Types.TimestampType.withZone();
} else {
return Types.TimestampType.withoutZone();
}

} else if (LogicalTypes.uuid().getName().equals(name)) {
return Types.UUIDType.get();
Type result = logicalType(primitive, logical);
if (result != null) {
return result;
}
}

Expand All @@ -227,7 +243,7 @@ public Type primitive(Schema primitive) {
case BYTES:
return Types.BinaryType.get();
case NULL:
return null;
return Types.UnknownType.get();
}

throw new UnsupportedOperationException("Unsupported primitive type: " + primitive);
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.types.Types;

abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN);
private static final Schema INTEGER_SCHEMA = Schema.create(Schema.Type.INT);
private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG);
Expand All @@ -45,6 +46,10 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema TIMESTAMPTZ_SCHEMA =
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema TIMESTAMP_NANO_SCHEMA =
LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema TIMESTAMPTZ_NANO_SCHEMA =
LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
private static final Schema UUID_SCHEMA =
LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
Expand All @@ -53,6 +58,8 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
static {
TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
TIMESTAMPTZ_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true);
TIMESTAMP_NANO_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
TIMESTAMPTZ_NANO_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true);
}

private final Deque<Integer> fieldIds = Lists.newLinkedList();
Expand Down Expand Up @@ -206,6 +213,9 @@ public Schema variant(Types.VariantType variant) {
public Schema primitive(Type.PrimitiveType primitive) {
Schema primitiveSchema;
switch (primitive.typeId()) {
case UNKNOWN:
primitiveSchema = NULL_SCHEMA;
break;
case BOOLEAN:
primitiveSchema = BOOLEAN_SCHEMA;
break;
Expand Down Expand Up @@ -234,6 +244,13 @@ public Schema primitive(Type.PrimitiveType primitive) {
primitiveSchema = TIMESTAMP_SCHEMA;
}
break;
case TIMESTAMP_NANO:
if (((Types.TimestampNanoType) primitive).shouldAdjustToUTC()) {
primitiveSchema = TIMESTAMPTZ_NANO_SCHEMA;
} else {
primitiveSchema = TIMESTAMP_NANO_SCHEMA;
}
break;
case STRING:
primitiveSchema = STRING_SCHEMA;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public ValueReader<?> primitive(Type.PrimitiveType ignored, Schema primitive) {
}
return GenericReaders.timestamps();

case "timestamp-nanos":
if (AvroSchemaUtil.isTimestamptz(primitive)) {
return GenericReaders.timestamptzNanos();
}
return GenericReaders.timestampNanos();

case "decimal":
return ValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public ValueWriter<?> primitive(Schema primitive) {
}
return GenericWriters.timestamps();

case "timestamp-nanos":
if (AvroSchemaUtil.isTimestamptz(primitive)) {
return GenericWriters.timestamptzNanos();
}
return GenericWriters.timestampNanos();

case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ static ValueReader<OffsetDateTime> timestamptz() {
return TimestamptzReader.INSTANCE;
}

static ValueReader<LocalDateTime> timestampNanos() {
return TimestampNanoReader.INSTANCE;
}

static ValueReader<OffsetDateTime> timestamptzNanos() {
return TimestamptzNanoReader.INSTANCE;
}

static ValueReader<Record> struct(
List<Pair<Integer, ValueReader<?>>> readPlan, StructType struct) {
return new PlannedRecordReader(readPlan, struct);
Expand Down Expand Up @@ -107,6 +115,28 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
}
}

private static class TimestampNanoReader implements ValueReader<LocalDateTime> {
private static final TimestampNanoReader INSTANCE = new TimestampNanoReader();

private TimestampNanoReader() {}

@Override
public LocalDateTime read(Decoder decoder, Object reuse) throws IOException {
return DateTimeUtil.timestampFromNanos(decoder.readLong());
}
}

private static class TimestamptzNanoReader implements ValueReader<OffsetDateTime> {
private static final TimestamptzNanoReader INSTANCE = new TimestamptzNanoReader();

private TimestamptzNanoReader() {}

@Override
public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
return DateTimeUtil.timestamptzFromNanos(decoder.readLong());
}
}

private static class PlannedRecordReader extends ValueReaders.PlannedStructReader<Record> {
private final StructType structType;

Expand Down
Loading