Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;

abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {
public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public? The only reference to FlinkTypeVisitor that I see in this PR is here, so I'm not sure why this is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, I used to think the reading and writing will rely on FlinkTypeVisitor.


// ------------------------- Unsupported types ------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkParquetWriters;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
Expand Down Expand Up @@ -102,13 +100,7 @@ public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) {
.build();

case AVRO:
return Avro.write(outputFile)
.createWriterFunc(FlinkAvroWriter::new)
.setAll(props)
.schema(schema)
.overwrite()
.build();

// TODO add AVRO once the RowDataWrapper are ready.
case ORC:
default:
throw new UnsupportedOperationException("Cannot write unknown file format: " + format);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.data;

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.avro.AvroWithPartnerByStructureVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.Pair;

public abstract class AvroWithFlinkSchemaVisitor<T> extends AvroWithPartnerByStructureVisitor<LogicalType, T> {

@Override
protected boolean isStringType(LogicalType logicalType) {
return logicalType.getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING);
}

@Override
protected boolean isMapType(LogicalType logicalType) {
return logicalType instanceof MapType;
}

@Override
protected LogicalType arrayElementType(LogicalType arrayType) {
Preconditions.checkArgument(arrayType instanceof ArrayType, "Invalid array: %s is not an array", arrayType);
return ((ArrayType) arrayType).getElementType();
}

@Override
protected LogicalType mapKeyType(LogicalType mapType) {
Preconditions.checkArgument(isMapType(mapType), "Invalid map: %s is not a map", mapType);
return ((MapType) mapType).getKeyType();
}

@Override
protected LogicalType mapValueType(LogicalType mapType) {
Preconditions.checkArgument(isMapType(mapType), "Invalid map: %s is not a map", mapType);
return ((MapType) mapType).getValueType();
}

@Override
protected Pair<String, LogicalType> fieldNameAndType(LogicalType structType, int pos) {
Preconditions.checkArgument(structType instanceof RowType, "Invalid struct: %s is not a struct", structType);
RowType.RowField field = ((RowType) structType).getFields().get(pos);
return Pair.of(field.getName(), field.getType());
}

@Override
protected LogicalType nullType() {
return new NullType();
}
}
132 changes: 109 additions & 23 deletions flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,140 @@

package org.apache.iceberg.flink.data;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.flink.types.Row;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class FlinkAvroReader extends DataReader<Row> {
public class FlinkAvroReader implements DatumReader<RowData> {

private final Schema readSchema;
private final ValueReader<RowData> reader;
private Schema fileSchema = null;

public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) {
super(expectedSchema, readSchema, ImmutableMap.of());
this(expectedSchema, readSchema, ImmutableMap.of());
}

@SuppressWarnings("unchecked")
public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map<Integer, ?> constants) {
this.readSchema = readSchema;
this.reader = (ValueReader<RowData>) AvroSchemaWithTypeVisitor
.visit(expectedSchema, readSchema, new ReadBuilder(constants));
}

@Override
protected ValueReader<?> createStructReader(Types.StructType struct,
List<ValueReader<?>> fields,
Map<Integer, ?> idToConstant) {
return new RowReader(fields, struct, idToConstant);
public void setSchema(Schema newFileSchema) {
this.fileSchema = Schema.applyAliases(newFileSchema, readSchema);
}

private static class RowReader extends ValueReaders.StructReader<Row> {
private final Types.StructType structType;
@Override
public RowData read(RowData reuse, Decoder decoder) throws IOException {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
}

private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;

private RowReader(List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
super(readers, struct, idToConstant);
this.structType = struct;
private ReadBuilder(Map<Integer, ?> idToConstant) {
this.idToConstant = idToConstant;
}

@Override
protected Row reuseOrCreate(Object reuse) {
if (reuse instanceof Row) {
return (Row) reuse;
} else {
return new Row(structType.fields().size());
}
public ValueReader<?> record(Types.StructType expected, Schema record, List<String> names,
List<ValueReader<?>> fields) {
return FlinkValueReaders.struct(fields, expected.asStructType(), idToConstant);
}

@Override
public ValueReader<?> union(Type expected, Schema union, List<ValueReader<?>> options) {
return ValueReaders.union(options);
}

@Override
public ValueReader<?> array(Types.ListType expected, Schema array, ValueReader<?> elementReader) {
return FlinkValueReaders.array(elementReader);
}

@Override
protected Object get(Row row, int pos) {
return row.getField(pos);
public ValueReader<?> map(Types.MapType expected, Schema map,
ValueReader<?> keyReader, ValueReader<?> valueReader) {
return FlinkValueReaders.arrayMap(keyReader, valueReader);
}

@Override
protected void set(Row row, int pos, Object value) {
row.setField(pos, value);
public ValueReader<?> map(Types.MapType expected, Schema map, ValueReader<?> valueReader) {
return FlinkValueReaders.map(FlinkValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> primitive(Type.PrimitiveType expected, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
return ValueReaders.ints();

case "time-micros":
return FlinkValueReaders.timeMicros();

case "timestamp-millis":
return FlinkValueReaders.timestampMills();

case "timestamp-micros":
return FlinkValueReaders.timestampMicros();

case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return FlinkValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
decimal.getPrecision(),
decimal.getScale());

case "uuid":
return FlinkValueReaders.uuids();

default:
throw new IllegalArgumentException("Unknown logical type: " + logicalType);
}
}

switch (primitive.getType()) {
case NULL:
return ValueReaders.nulls();
case BOOLEAN:
return ValueReaders.booleans();
case INT:
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
case STRING:
return FlinkValueReaders.strings();
case FIXED:
return ValueReaders.fixed(primitive.getFixedSize());
case BYTES:
return ValueReaders.bytes();
case ENUM:
return FlinkValueReaders.enums(primitive.getEnumSymbols());
default:
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}
}
}
Loading