-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Add FlinkWriterFactory #2924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,248 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink.sink; | ||
|
|
||
| import java.io.Serializable; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.data.StringData; | ||
| import org.apache.flink.table.types.logical.RowType; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.avro.Avro; | ||
| import org.apache.iceberg.data.BaseWriterFactory; | ||
| import org.apache.iceberg.flink.FlinkSchemaUtil; | ||
| import org.apache.iceberg.flink.data.FlinkAvroWriter; | ||
| import org.apache.iceberg.flink.data.FlinkOrcWriter; | ||
| import org.apache.iceberg.flink.data.FlinkParquetWriters; | ||
| import org.apache.iceberg.io.DeleteSchemaUtil; | ||
| import org.apache.iceberg.orc.ORC; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; | ||
| import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; | ||
| import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; | ||
|
|
||
| class FlinkWriterFactory extends BaseWriterFactory<RowData> implements Serializable { | ||
| private RowType dataFlinkType; | ||
| private RowType equalityDeleteFlinkType; | ||
| private RowType positionDeleteFlinkType; | ||
|
|
||
| FlinkWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, RowType dataFlinkType, | ||
| SortOrder dataSortOrder, FileFormat deleteFileFormat, | ||
| int[] equalityFieldIds, Schema equalityDeleteRowSchema, RowType equalityDeleteFlinkType, | ||
| SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, | ||
| RowType positionDeleteFlinkType) { | ||
|
|
||
| super(table, dataFileFormat, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, | ||
| equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema); | ||
|
|
||
| this.dataFlinkType = dataFlinkType; | ||
| this.equalityDeleteFlinkType = equalityDeleteFlinkType; | ||
| this.positionDeleteFlinkType = positionDeleteFlinkType; | ||
| } | ||
|
|
||
| static Builder builderFor(Table table) { | ||
| return new Builder(table); | ||
| } | ||
|
|
||
| @Override | ||
| protected void configureDataWrite(Avro.DataWriteBuilder builder) { | ||
| builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType())); | ||
| } | ||
|
|
||
| @Override | ||
| protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { | ||
| builder.createWriterFunc(ignored -> new FlinkAvroWriter(equalityDeleteFlinkType())); | ||
| } | ||
|
|
||
| @Override | ||
| protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { | ||
| int rowFieldIndex = positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME); | ||
| if (rowFieldIndex >= 0) { | ||
| // FlinkAvroWriter accepts just the Flink type of the row ignoring the path and pos | ||
| RowType positionDeleteRowFlinkType = (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex); | ||
| builder.createWriterFunc(ignored -> new FlinkAvroWriter(positionDeleteRowFlinkType)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void configureDataWrite(Parquet.DataWriteBuilder builder) { | ||
| builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(dataFlinkType(), msgType)); | ||
| } | ||
|
|
||
| @Override | ||
| protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { | ||
| builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), msgType)); | ||
| } | ||
|
|
||
| @Override | ||
| protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { | ||
| builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(), msgType)); | ||
| builder.transformPaths(path -> StringData.fromString(path.toString())); | ||
| } | ||
|
|
||
| @Override | ||
| protected void configureDataWrite(ORC.DataWriteBuilder builder) { | ||
| builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema)); | ||
| } | ||
|
|
||
| private RowType dataFlinkType() { | ||
| if (dataFlinkType == null) { | ||
| Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); | ||
| this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema()); | ||
| } | ||
|
|
||
| return dataFlinkType; | ||
| } | ||
|
|
||
| private RowType equalityDeleteFlinkType() { | ||
| if (equalityDeleteFlinkType == null) { | ||
| Preconditions.checkNotNull(equalityDeleteRowSchema(), "Equality delete schema must not be null"); | ||
| this.equalityDeleteFlinkType = FlinkSchemaUtil.convert(equalityDeleteRowSchema()); | ||
| } | ||
|
|
||
| return equalityDeleteFlinkType; | ||
| } | ||
|
|
||
| private RowType positionDeleteFlinkType() { | ||
| if (positionDeleteFlinkType == null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar comment as above
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is similar to data files. The value is optional and may not be needed. |
||
| // wrap the optional row schema into the position delete schema that contains path and position | ||
| Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); | ||
| this.positionDeleteFlinkType = FlinkSchemaUtil.convert(positionDeleteSchema); | ||
| } | ||
|
|
||
| return positionDeleteFlinkType; | ||
| } | ||
|
|
||
| static class Builder { | ||
| private final Table table; | ||
| private FileFormat dataFileFormat; | ||
| private Schema dataSchema; | ||
| private RowType dataFlinkType; | ||
| private SortOrder dataSortOrder; | ||
| private FileFormat deleteFileFormat; | ||
| private int[] equalityFieldIds; | ||
| private Schema equalityDeleteRowSchema; | ||
| private RowType equalityDeleteFlinkType; | ||
| private SortOrder equalityDeleteSortOrder; | ||
| private Schema positionDeleteRowSchema; | ||
| private RowType positionDeleteFlinkType; | ||
|
|
||
| Builder(Table table) { | ||
| this.table = table; | ||
|
|
||
| Map<String, String> properties = table.properties(); | ||
|
|
||
| String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); | ||
| this.dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH)); | ||
|
|
||
| String deleteFileFormatName = properties.getOrDefault(DELETE_DEFAULT_FILE_FORMAT, dataFileFormatName); | ||
| this.deleteFileFormat = FileFormat.valueOf(deleteFileFormatName.toUpperCase(Locale.ENGLISH)); | ||
| } | ||
|
|
||
| Builder dataFileFormat(FileFormat newDataFileFormat) { | ||
| this.dataFileFormat = newDataFileFormat; | ||
| return this; | ||
| } | ||
|
|
||
| Builder dataSchema(Schema newDataSchema) { | ||
| this.dataSchema = newDataSchema; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets a Flink type for data. | ||
| * <p> | ||
| * If not set, the value is derived from the provided Iceberg schema. | ||
| */ | ||
| Builder dataFlinkType(RowType newDataFlinkType) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an optional param? In the other comment, I saw we are converting Maybe add Javadoc to mark all the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added docs to optional methods. |
||
| this.dataFlinkType = newDataFlinkType; | ||
| return this; | ||
| } | ||
|
|
||
| Builder dataSortOrder(SortOrder newDataSortOrder) { | ||
| this.dataSortOrder = newDataSortOrder; | ||
| return this; | ||
| } | ||
|
|
||
| Builder deleteFileFormat(FileFormat newDeleteFileFormat) { | ||
| this.deleteFileFormat = newDeleteFileFormat; | ||
| return this; | ||
| } | ||
|
|
||
| Builder equalityFieldIds(int[] newEqualityFieldIds) { | ||
| this.equalityFieldIds = newEqualityFieldIds; | ||
| return this; | ||
| } | ||
|
|
||
| Builder equalityDeleteRowSchema(Schema newEqualityDeleteRowSchema) { | ||
| this.equalityDeleteRowSchema = newEqualityDeleteRowSchema; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets a Flink type for equality deletes. | ||
| * <p> | ||
| * If not set, the value is derived from the provided Iceberg schema. | ||
| */ | ||
| Builder equalityDeleteFlinkType(RowType newEqualityDeleteFlinkType) { | ||
| this.equalityDeleteFlinkType = newEqualityDeleteFlinkType; | ||
| return this; | ||
| } | ||
|
|
||
| Builder equalityDeleteSortOrder(SortOrder newEqualityDeleteSortOrder) { | ||
| this.equalityDeleteSortOrder = newEqualityDeleteSortOrder; | ||
| return this; | ||
| } | ||
|
|
||
| Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) { | ||
| this.positionDeleteRowSchema = newPositionDeleteRowSchema; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets a Flink type for position deletes. | ||
| * <p> | ||
| * If not set, the value is derived from the provided Iceberg schema. | ||
| */ | ||
| Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType) { | ||
| this.positionDeleteFlinkType = newPositionDeleteFlinkType; | ||
| return this; | ||
| } | ||
|
|
||
| FlinkWriterFactory build() { | ||
| boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; | ||
| boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; | ||
| Preconditions.checkArgument(noEqualityDeleteConf || fullEqualityDeleteConf, | ||
| "Equality field IDs and equality delete row schema must be set together"); | ||
|
|
||
| return new FlinkWriterFactory( | ||
| table, dataFileFormat, dataSchema, dataFlinkType, dataSortOrder, deleteFileFormat, | ||
| equalityFieldIds, equalityDeleteRowSchema, equalityDeleteFlinkType, equalityDeleteSortOrder, | ||
| positionDeleteRowSchema, positionDeleteFlinkType); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink.sink; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.types.logical.RowType; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.flink.FlinkSchemaUtil; | ||
| import org.apache.iceberg.flink.RowDataWrapper; | ||
| import org.apache.iceberg.flink.SimpleDataUtil; | ||
| import org.apache.iceberg.io.TestWriterFactory; | ||
| import org.apache.iceberg.io.WriterFactory; | ||
| import org.apache.iceberg.util.ArrayUtil; | ||
| import org.apache.iceberg.util.StructLikeSet; | ||
|
|
||
| public class TestFlinkWriterFactory extends TestWriterFactory<RowData> { | ||
|
|
||
| public TestFlinkWriterFactory(FileFormat fileFormat, boolean partitioned) { | ||
| super(fileFormat, partitioned); | ||
| } | ||
|
|
||
| @Override | ||
| protected WriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds, | ||
| Schema equalityDeleteRowSchema, Schema positionDeleteRowSchema) { | ||
| return FlinkWriterFactory.builderFor(table) | ||
| .dataSchema(table.schema()) | ||
| .dataFileFormat(format()) | ||
| .deleteFileFormat(format()) | ||
| .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) | ||
| .equalityDeleteRowSchema(equalityDeleteRowSchema) | ||
| .positionDeleteRowSchema(positionDeleteRowSchema) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| protected RowData toRow(Integer id, String data) { | ||
| return SimpleDataUtil.createRowData(id, data); | ||
| } | ||
|
|
||
| @Override | ||
| protected StructLikeSet toSet(Iterable<RowData> rows) { | ||
| StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); | ||
| RowType flinkType = FlinkSchemaUtil.convert(table.schema()); | ||
| for (RowData row : rows) { | ||
| RowDataWrapper wrapper = new RowDataWrapper(flinkType, table.schema().asStruct()); | ||
| set.add(wrapper.wrap(row)); | ||
| } | ||
| return set; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this logic to constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe move this part to the Builder#build() method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are optional and may not be set/needed unless we actually write data files.
Pure DELETEs will not need it, for example.