Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.Locale;
import java.util.Map;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -39,22 +40,30 @@ class IcebergSinkUtil {
private IcebergSinkUtil() {
}

static IcebergStreamWriter<Row> createStreamWriter(Table table, TableSchema tableSchema) {
static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");

if (tableSchema != null) {
Schema writeSchema = FlinkSchemaUtil.convert(tableSchema);
// Reassign ids to match the existing table schema.
writeSchema = TypeUtil.reassignIds(writeSchema, table.schema());
RowType flinkSchema;
if (requestedSchema != null) {
// Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema.
Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);

// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to
// iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will
// read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink
// schema.
flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
} else {
flinkSchema = FlinkSchemaUtil.convert(table.schema());
}

Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);

TaskWriterFactory<Row> taskWriterFactory = new RowTaskWriterFactory(table.schema(), table.spec(),
table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props);
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema,
table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props);

return new IcebergStreamWriter<>(table.toString(), taskWriterFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.flink.types.Row;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.data.FlinkParquetWriters;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -38,37 +40,39 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class RowTaskWriterFactory implements TaskWriterFactory<Row> {
class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final Schema schema;
private final RowType flinkSchema;
private final PartitionSpec spec;
private final LocationProvider locations;
private final FileIO io;
private final EncryptionManager encryptionManager;
private final long targetFileSizeBytes;
private final FileFormat format;
private final FileAppenderFactory<Row> appenderFactory;
private final FileAppenderFactory<RowData> appenderFactory;

private OutputFileFactory outputFileFactory;

RowTaskWriterFactory(Schema schema,
PartitionSpec spec,
LocationProvider locations,
FileIO io,
EncryptionManager encryptionManager,
long targetFileSizeBytes,
FileFormat format,
Map<String, String> tableProperties) {
RowDataTaskWriterFactory(Schema schema,
RowType flinkSchema,
PartitionSpec spec,
LocationProvider locations,
FileIO io,
EncryptionManager encryptionManager,
long targetFileSizeBytes,
FileFormat format,
Map<String, String> tableProperties) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.spec = spec;
this.locations = locations;
this.io = io;
this.encryptionManager = encryptionManager;
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.appenderFactory = new FlinkFileAppenderFactory(schema, tableProperties);
this.appenderFactory = new FlinkFileAppenderFactory(schema, flinkSchema, tableProperties);
}

@Override
Expand All @@ -77,62 +81,63 @@ public void initialize(int taskId, int attemptId) {
}

@Override
public TaskWriter<Row> create() {
public TaskWriter<RowData> create() {
Preconditions.checkNotNull(outputFileFactory,
"The outputFileFactory shouldn't be null if we have invoked the initialize().");

if (spec.fields().isEmpty()) {
return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
} else {
return new RowPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
io, targetFileSizeBytes, schema);
return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
io, targetFileSizeBytes, schema, flinkSchema);
}
}

private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter<Row> {
private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter<RowData> {

private final PartitionKey partitionKey;
private final RowWrapper rowWrapper;
private final RowDataWrapper rowDataWrapper;

RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Row> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) {
RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
RowType flinkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
this.rowWrapper = new RowWrapper(schema.asStruct());
this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
}

@Override
protected PartitionKey partition(Row row) {
partitionKey.partition(rowWrapper.wrap(row));
protected PartitionKey partition(RowData row) {
partitionKey.partition(rowDataWrapper.wrap(row));
return partitionKey;
}
}

private static class FlinkFileAppenderFactory implements FileAppenderFactory<Row> {
private static class FlinkFileAppenderFactory implements FileAppenderFactory<RowData> {
private final Schema schema;
private final RowType flinkSchema;
private final Map<String, String> props;

private FlinkFileAppenderFactory(Schema schema, Map<String, String> props) {
private FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map<String, String> props) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
}

@Override
public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) {
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
// TODO MetricsConfig will be used for building parquet RowData writer.
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
try {
switch (format) {
case PARQUET:
return Parquet.write(outputFile)
.createWriterFunc(FlinkParquetWriters::buildWriter)
case AVRO:
return Avro.write(outputFile)
.createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema))
.setAll(props)
.metricsConfig(metricsConfig)
.schema(schema)
.overwrite()
.build();

case AVRO:
// TODO add the Avro writer building once RowDataWrapper is ready.
case PARQUET:
case ORC:
default:
throw new UnsupportedOperationException("Cannot write unknown file format: " + format);
Expand Down
67 changes: 33 additions & 34 deletions flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,52 +85,51 @@ private interface PositionalGetter<T> {
}

private static PositionalGetter<?> buildGetter(LogicalType logicalType, Type type) {
switch (type.typeId()) {
case STRING:
switch (logicalType.getTypeRoot()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the type from iceberg type to flink's logical type here, because the value of tinyint & smallint is a byte & short, when cast to the byte or short to Integer here, it will throw a cast failure exception. Using logical type here so that we could cast it to integer right way.

case TINYINT:
return (row, pos) -> (int) row.getByte(pos);
case SMALLINT:
return (row, pos) -> (int) row.getShort(pos);
case CHAR:
case VARCHAR:
return (row, pos) -> row.getString(pos).toString();

case FIXED:
case BINARY:
return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));

case UUID:
return (row, pos) -> {
ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos));
long mostSigBits = bb.getLong();
long leastSigBits = bb.getLong();
return new UUID(mostSigBits, leastSigBits);
};
case VARBINARY:
if (Type.TypeID.UUID.equals(type.typeId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an identity check would be okay since this is an enum symbol, but either way is fine.

return (row, pos) -> {
ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos));
long mostSigBits = bb.getLong();
long leastSigBits = bb.getLong();
return new UUID(mostSigBits, leastSigBits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like another area where we should have a util method to convert (though it shouldn't block this commit).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true, we could have a separate pull request for this.

};
} else {
return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
}

case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
return (row, pos) -> row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal();

case TIME:
case TIME_WITHOUT_TIME_ZONE:
// Time in RowData is in milliseconds (Integer), while iceberg's time is microseconds (Long).
return (row, pos) -> ((long) row.getInt(pos)) * 1_000;

case TIMESTAMP:
switch (logicalType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) logicalType;
return (row, pos) -> {
LocalDateTime localDateTime = row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime();
return DateTimeUtil.microsFromTimestamp(localDateTime);
};

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType;
return (row, pos) -> {
TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision());
return timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000;
};

default:
throw new IllegalArgumentException("Unhandled iceberg type: " + type + " corresponding flink type: " +
logicalType);
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) logicalType;
return (row, pos) -> {
LocalDateTime localDateTime = row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime();
return DateTimeUtil.microsFromTimestamp(localDateTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the same logic here that is used in the other timestamp type? Both of the values are TimestampData that is returned by getTimestamp. It seems like converting directly to a microsecond value is better than going through LocalDateTime here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it could be the same to convert TimestampData to a long. I separate them because the TimestampType are different, and we are depending the TimestampType.getPrecision() or LocalZonedTimestampType.getPrecision() to get the precision (though we could use the constant 6 here, but better to use the timestamp's precision getter).

};

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType;
return (row, pos) -> {
TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision());
return timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000;
};

case STRUCT:
case ROW:
RowType rowType = (RowType) logicalType;
Types.StructType structType = (Types.StructType) type;

Expand Down
105 changes: 0 additions & 105 deletions flink/src/main/java/org/apache/iceberg/flink/RowWrapper.java

This file was deleted.

Loading