diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java deleted file mode 100644 index 8d80342a16a4..000000000000 --- a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink; - -import java.util.Locale; -import java.util.Map; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PropertyUtil; - -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; - -class IcebergSinkUtil { - private IcebergSinkUtil() { - } - - static IcebergStreamWriter createStreamWriter(Table table, TableSchema requestedSchema) { - Preconditions.checkArgument(table != null, "Iceberg table should't be null"); - - RowType flinkSchema; - if (requestedSchema != null) { - // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema. - Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema()); - TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true); - - // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to - // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will - // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink - // schema. - flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType(); - } else { - flinkSchema = FlinkSchemaUtil.convert(table.schema()); - } - - Map props = table.properties(); - long targetFileSize = getTargetFileSizeBytes(props); - FileFormat fileFormat = getFileFormat(props); - - TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema, - table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); - - return new IcebergStreamWriter<>(table.toString(), taskWriterFactory); - } - - private static FileFormat getFileFormat(Map properties) { - String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); - } - - private static long getTargetFileSizeBytes(Map properties) { - return PropertyUtil.propertyAsLong(properties, - WRITE_TARGET_FILE_SIZE_BYTES, - WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); - } -} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 1639a44b2917..c91b659b632c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -51,7 +51,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -class FlinkParquetWriters { +public class FlinkParquetWriters { private FlinkParquetWriters() { } @@ -207,6 +207,7 @@ private static ParquetValueWriters.PrimitiveWriter decimalAsInteger " wrong precision %s", precision); return new IntegerDecimalWriter(desc, precision, scale); } + private static ParquetValueWriters.PrimitiveWriter decimalAsLong(ColumnDescriptor desc, int precision, int scale) { Preconditions.checkArgument(precision <= 18, "Cannot write decimal value as long with precision larger than 18, " + diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java new file mode 100644 index 000000000000..96ce0ba6d661 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +public class FlinkSink { + + private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName(); + private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName(); + + private FlinkSink() { + } + + /** + * Initialize a {@link Builder} to export the data from generic input data stream into iceberg table. We use + * {@link RowData} inside the sink connector, so users need to provide a mapper function and a + * {@link TypeInformation} to convert those generic records to a RowData DataStream. + * + * @param input the generic source input data stream. + * @param mapper function to convert the generic data to {@link RowData} + * @param outputType to define the {@link TypeInformation} for the input data. + * @param the data type of records. + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder builderFor(DataStream input, + MapFunction mapper, + TypeInformation outputType) { + DataStream dataStream = input.map(mapper, outputType); + return forRowData(dataStream); + } + + /** + * Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into iceberg table. We use + * {@link RowData} inside the sink connector, so users need to provide a {@link TableSchema} for builder to convert + * those {@link Row}s to a {@link RowData} DataStream. + * + * @param input the source input data stream with {@link Row}s. + * @param tableSchema defines the {@link TypeInformation} for input data. + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder forRow(DataStream input, TableSchema tableSchema) { + RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); + + DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes); + return builderFor(input, rowConverter::toInternal, RowDataTypeInfo.of(rowType)) + .tableSchema(tableSchema); + } + + /** + * Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s into iceberg table. + * + * @param input the source input data stream with {@link RowData}s. + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder forRowData(DataStream input) { + return new Builder().forRowData(input); + } + + public static class Builder { + private DataStream rowDataInput = null; + private TableLoader tableLoader; + private Configuration hadoopConf; + private Table table; + private TableSchema tableSchema; + + private Builder() { + } + + private Builder forRowData(DataStream newRowDataInput) { + this.rowDataInput = newRowDataInput; + return this; + } + + /** + * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all + * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so many + * table loading from each separate task. + * + * @param newTable the loaded iceberg table instance. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder table(Table newTable) { + this.table = newTable; + return this; + } + + /** + * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader because + * {@link Table} is not serializable and could not just use the loaded table from Builder#table in the remote task + * manager. + * + * @param newTableLoader to load iceberg table inside tasks. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder tableLoader(TableLoader newTableLoader) { + this.tableLoader = newTableLoader; + return this; + } + + public Builder hadoopConf(Configuration newHadoopConf) { + this.hadoopConf = newHadoopConf; + return this; + } + + public Builder tableSchema(TableSchema newTableSchema) { + this.tableSchema = newTableSchema; + return this; + } + + @SuppressWarnings("unchecked") + public DataStreamSink build() { + Preconditions.checkArgument(rowDataInput != null, + "Please use forRowData() to initialize the input DataStream."); + Preconditions.checkNotNull(table, "Table shouldn't be null"); + Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null"); + Preconditions.checkNotNull(hadoopConf, "Hadoop configuration shouldn't be null"); + + IcebergStreamWriter streamWriter = createStreamWriter(table, tableSchema); + IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, hadoopConf); + + DataStream returnStream = rowDataInput + .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) + .setParallelism(rowDataInput.getParallelism()) + .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter) + .setParallelism(1) + .setMaxParallelism(1); + + return returnStream.addSink(new DiscardingSink()) + .name(String.format("IcebergSink %s", table.toString())) + .setParallelism(1); + } + } + + static IcebergStreamWriter createStreamWriter(Table table, TableSchema requestedSchema) { + Preconditions.checkArgument(table != null, "Iceberg table should't be null"); + + RowType flinkSchema; + if (requestedSchema != null) { + // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg schema. + Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema()); + TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true); + + // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted to + // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'), we will + // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must use flink + // schema. + flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType(); + } else { + flinkSchema = FlinkSchemaUtil.convert(table.schema()); + } + + Map props = table.properties(); + long targetFileSize = getTargetFileSizeBytes(props); + FileFormat fileFormat = getFileFormat(props); + + TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema, + table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props); + + return new IcebergStreamWriter<>(table.toString(), taskWriterFactory); + } + + private static FileFormat getFileFormat(Map properties) { + String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } + + private static long getTargetFileSizeBytes(Map properties) { + return PropertyUtil.propertyAsLong(properties, + WRITE_TARGET_FILE_SIZE_BYTES, + WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java new file mode 100644 index 000000000000..e0cfbafff765 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.SortedMap; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class IcebergFilesCommitter extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private static final long serialVersionUID = 1L; + private static final long INITIAL_CHECKPOINT_ID = -1L; + + private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); + private static final String FLINK_JOB_ID = "flink.job-id"; + + // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we could + // correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg table, for + // avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the + // iceberg transaction. + private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + + // TableLoader to load iceberg table lazily. + private final TableLoader tableLoader; + private final SerializableConfiguration hadoopConf; + + // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed + // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for + // example: the 1st checkpoint have 2 data files <1, >, the 2st checkpoint have 1 data files + // <2, >. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect + // any data loss in iceberg table. So we keep the finished files <1, > in memory and retry to commit + // iceberg table when the next checkpoint happen. + private final NavigableMap> dataFilesPerCheckpoint = Maps.newTreeMap(); + + // The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the + // 'dataFilesPerCheckpoint'. + private final List dataFilesOfCurrentCheckpoint = Lists.newArrayList(); + + // It will have an unique identifier for one job. + private transient String flinkJobId; + private transient Table table; + private transient long maxCommittedCheckpointId; + + // All pending checkpoints states for this function. + private static final ListStateDescriptor>> STATE_DESCRIPTOR = buildStateDescriptor(); + private transient ListState>> checkpointsState; + + IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf) { + this.tableLoader = tableLoader; + this.hadoopConf = new SerializableConfiguration(hadoopConf); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); + + // Open the table loader and load the table. + this.tableLoader.open(hadoopConf.get()); + this.table = tableLoader.loadTable(); + this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + + this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); + if (context.isRestored()) { + this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, flinkJobId); + // In the restoring path, it should have one valid snapshot for current flink job at least, so the max committed + // checkpoint id should be positive. If it's not positive, that means someone might have removed or expired the + // iceberg snapshot, in that case we should throw an exception in case of committing duplicated data files into + // the iceberg table. + Preconditions.checkState(maxCommittedCheckpointId != INITIAL_CHECKPOINT_ID, + "There should be an existing iceberg snapshot for current flink job: %s", flinkJobId); + + SortedMap> restoredDataFiles = checkpointsState.get().iterator().next(); + // Only keep the uncommitted data files in the cache. + this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId + 1)); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + long checkpointId = context.getCheckpointId(); + LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId); + + // Update the checkpoint state. + dataFilesPerCheckpoint.put(checkpointId, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint)); + + // Reset the snapshot state to the latest state. + checkpointsState.clear(); + checkpointsState.add(dataFilesPerCheckpoint); + + // Clear the local buffer for current checkpoint. + dataFilesOfCurrentCheckpoint.clear(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + // It's possible that we have the following events: + // 1. snapshotState(ckpId); + // 2. snapshotState(ckpId+1); + // 3. notifyCheckpointComplete(ckpId+1); + // 4. notifyCheckpointComplete(ckpId); + // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files, + // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. + if (checkpointId > maxCommittedCheckpointId) { + commitUpToCheckpoint(checkpointId); + this.maxCommittedCheckpointId = checkpointId; + } + } + + private void commitUpToCheckpoint(long checkpointId) { + NavigableMap> pendingFileMap = dataFilesPerCheckpoint.headMap(checkpointId, true); + + List pendingDataFiles = Lists.newArrayList(); + for (List dataFiles : pendingFileMap.values()) { + pendingDataFiles.addAll(dataFiles); + } + + AppendFiles appendFiles = table.newAppend(); + pendingDataFiles.forEach(appendFiles::appendFile); + appendFiles.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); + appendFiles.set(FLINK_JOB_ID, flinkJobId); + appendFiles.commit(); + + // Clear the committed data files from dataFilesPerCheckpoint. + pendingFileMap.clear(); + } + + @Override + public void processElement(StreamRecord element) { + this.dataFilesOfCurrentCheckpoint.add(element.getValue()); + } + + @Override + public void endInput() { + // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. + dataFilesPerCheckpoint.put(Long.MAX_VALUE, ImmutableList.copyOf(dataFilesOfCurrentCheckpoint)); + dataFilesOfCurrentCheckpoint.clear(); + + commitUpToCheckpoint(Long.MAX_VALUE); + } + + @Override + public void dispose() throws Exception { + if (tableLoader != null) { + tableLoader.close(); + } + } + + private static ListStateDescriptor>> buildStateDescriptor() { + Comparator longComparator = Comparators.forType(Types.LongType.get()); + // Construct a ListTypeInfo. + ListTypeInfo dataFileListTypeInfo = new ListTypeInfo<>(TypeInformation.of(DataFile.class)); + // Construct a SortedMapTypeInfo. + SortedMapTypeInfo> sortedMapTypeInfo = new SortedMapTypeInfo<>( + BasicTypeInfo.LONG_TYPE_INFO, dataFileListTypeInfo, longComparator + ); + return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo); + } + + static long getMaxCommittedCheckpointId(Table table, String flinkJobId) { + Snapshot snapshot = table.currentSnapshot(); + long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + + while (snapshot != null) { + Map summary = snapshot.summary(); + String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); + if (flinkJobId.equals(snapshotFlinkJobId)) { + String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + if (value != null) { + lastCommittedCheckpointId = Long.parseLong(value); + break; + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + + return lastCommittedCheckpointId; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java similarity index 94% rename from flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java rename to flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index e18c29311496..95daa9656c68 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.io.IOException; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.iceberg.DataFile; @@ -34,8 +35,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private static final long serialVersionUID = 1L; private final String fullTableName; + private final TaskWriterFactory taskWriterFactory; - private transient TaskWriterFactory taskWriterFactory; private transient TaskWriter writer; private transient int subTaskId; private transient int attemptId; @@ -43,6 +44,7 @@ class IcebergStreamWriter extends AbstractStreamOperator IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; + setChainingStrategy(ChainingStrategy.ALWAYS); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java similarity index 98% rename from flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java rename to flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java index 78c29c5d112b..ad846974adcf 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.io.IOException; import java.util.Map; diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java similarity index 80% rename from flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java rename to flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index e8c5301824ce..f50f432756ce 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -17,9 +17,10 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.io.IOException; +import java.io.Serializable; import java.io.UncheckedIOException; import java.util.Map; import org.apache.flink.table.data.RowData; @@ -33,6 +34,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; +import org.apache.iceberg.flink.data.FlinkParquetWriters; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -42,9 +44,10 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class RowDataTaskWriterFactory implements TaskWriterFactory { +public class RowDataTaskWriterFactory implements TaskWriterFactory { private final Schema schema; private final RowType flinkSchema; private final PartitionSpec spec; @@ -55,17 +58,17 @@ class RowDataTaskWriterFactory implements TaskWriterFactory { private final FileFormat format; private final FileAppenderFactory appenderFactory; - private OutputFileFactory outputFileFactory; - - RowDataTaskWriterFactory(Schema schema, - RowType flinkSchema, - PartitionSpec spec, - LocationProvider locations, - FileIO io, - EncryptionManager encryptionManager, - long targetFileSizeBytes, - FileFormat format, - Map tableProperties) { + private transient OutputFileFactory outputFileFactory; + + public RowDataTaskWriterFactory(Schema schema, + RowType flinkSchema, + PartitionSpec spec, + LocationProvider locations, + FileIO io, + EncryptionManager encryptionManager, + long targetFileSizeBytes, + FileFormat format, + Map tableProperties) { this.schema = schema; this.flinkSchema = flinkSchema; this.spec = spec; @@ -115,12 +118,12 @@ protected PartitionKey partition(RowData row) { } } - private static class FlinkFileAppenderFactory implements FileAppenderFactory { + public static class FlinkFileAppenderFactory implements FileAppenderFactory, Serializable { private final Schema schema; private final RowType flinkSchema; private final Map props; - private FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props) { + public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map props) { this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -128,7 +131,6 @@ private FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, Map newAppender(OutputFile outputFile, FileFormat format) { - // TODO MetricsConfig will be used for building parquet RowData writer. MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); try { switch (format) { @@ -149,6 +151,14 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .build(); case PARQUET: + return Parquet.write(outputFile) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) + .setAll(props) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + default: throw new UnsupportedOperationException("Cannot write unknown file format: " + format); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java similarity index 99% rename from flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java rename to flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java index cf0c09c4dcd4..27e97ae6ec4a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.lang.reflect.Array; import java.nio.ByteBuffer; diff --git a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java similarity index 97% rename from flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java rename to flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java index c47da24951be..6ed769638109 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.io.Serializable; import org.apache.iceberg.io.TaskWriter; diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 82c2a89e33c1..b377e54cde07 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -22,44 +22,58 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.Assert; +import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; + public class SimpleDataUtil { private SimpleDataUtil() { } - static final Schema SCHEMA = new Schema( + public static final Schema SCHEMA = new Schema( Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get()) ); - static final TableSchema FLINK_SCHEMA = TableSchema.builder() + public static final TableSchema FLINK_SCHEMA = TableSchema.builder() .field("id", DataTypes.INT()) .field("data", DataTypes.STRING()) .build(); - static final Record RECORD = GenericRecord.create(SCHEMA); + public static final RowType ROW_TYPE = (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(); + + public static final Record RECORD = GenericRecord.create(SCHEMA); - static Table createTable(String path, Map properties, boolean partitioned) { + public static Table createTable(String path, Map properties, boolean partitioned) { PartitionSpec spec; if (partitioned) { spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -69,34 +83,55 @@ static Table createTable(String path, Map properties, boolean pa return new HadoopTables().create(SCHEMA, spec, properties, path); } - static Record createRecord(Integer id, String data) { + public static Record createRecord(Integer id, String data) { Record record = RECORD.copy(); record.setField("id", id); record.setField("data", data); return record; } - static RowData createRowData(Integer id, String data) { + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } - static void assertTableRows(String tablePath, List rows) throws IOException { - List records = Lists.newArrayList(); - for (RowData row : rows) { + public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuration conf, + String location, String filename, List rows) + throws IOException { + Path path = new Path(location, filename); + FileFormat fileFormat = FileFormat.fromFileName(filename); + Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); + + RowType flinkSchema = FlinkSchemaUtil.convert(schema); + FileAppenderFactory appenderFactory = new RowDataTaskWriterFactory.FlinkFileAppenderFactory(schema, + flinkSchema, ImmutableMap.of()); + + FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); + try (FileAppender closeableAppender = appender) { + closeableAppender.addAll(rows); + } + + return DataFiles.builder(spec) + .withInputFile(HadoopInputFile.fromPath(path, conf)) + .withMetrics(appender.metrics()) + .build(); + } + + public static void assertTableRows(String tablePath, List expected) throws IOException { + List expectedRecords = Lists.newArrayList(); + for (RowData row : expected) { Integer id = row.isNullAt(0) ? null : row.getInt(0); String data = row.isNullAt(1) ? null : row.getString(1).toString(); - records.add(createRecord(id, data)); + expectedRecords.add(createRecord(id, data)); } - assertTableRecords(tablePath, records); + assertTableRecords(tablePath, expectedRecords); } - static void assertTableRecords(String tablePath, List expected) throws IOException { + public static void assertTableRecords(String tablePath, List expected) throws IOException { Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); Table newTable = new HadoopTables().load(tablePath); - Set resultSet; - try (CloseableIterable iterable = (CloseableIterable) IcebergGenerics.read(newTable).build()) { - resultSet = Sets.newHashSet(iterable); + try (CloseableIterable iterable = IcebergGenerics.read(newTable).build()) { + Assert.assertEquals("Should produce the expected record", + Sets.newHashSet(expected), Sets.newHashSet(iterable)); } - Assert.assertEquals("Should produce the expected record", resultSet, Sets.newHashSet(expected)); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java new file mode 100644 index 000000000000..0aabfda4ca9c --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.FiniteTestSource; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkIcebergSink extends AbstractTestBase { + private static final Configuration CONF = new Configuration(); + private static final TypeInformation ROW_TYPE_INFO = new RowTypeInfo( + SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + private static final DataFormatConverters.RowConverter CONVERTER = new DataFormatConverters.RowConverter( + SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private String tablePath; + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private final FileFormat format; + private final int parallelism; + private final boolean partitioned; + + @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"avro", 1, true}, + new Object[] {"avro", 1, false}, + new Object[] {"avro", 2, true}, + new Object[] {"avro", 2, false}, + new Object[] {"orc", 1, true}, + new Object[] {"orc", 1, false}, + new Object[] {"orc", 2, true}, + new Object[] {"orc", 2, false}, + new Object[] {"parquet", 1, true}, + new Object[] {"parquet", 1, false}, + new Object[] {"parquet", 2, true}, + new Object[] {"parquet", 2, false} + }; + } + + public TestFlinkIcebergSink(String format, int parallelism, boolean partitioned) { + this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + this.parallelism = parallelism; + this.partitioned = partitioned; + } + + @Before + public void before() throws IOException { + File folder = tempFolder.newFolder(); + String warehouse = folder.getAbsolutePath(); + + tablePath = warehouse.concat("/test"); + Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir()); + + Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + table = SimpleDataUtil.createTable(tablePath, props, partitioned); + + env = StreamExecutionEnvironment.getExecutionEnvironment() + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism); + + tableLoader = TableLoader.fromHadoopTable(tablePath); + } + + private List convertToRowData(List rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } + + @Test + public void testWriteRowData() throws Exception { + List rows = Lists.newArrayList( + Row.of(1, "hello"), + Row.of(2, "world"), + Row.of(3, "foo") + ); + DataStream dataStream = env.addSource(new FiniteTestSource<>(rows), ROW_TYPE_INFO) + .map(CONVERTER::toInternal, RowDataTypeInfo.of(SimpleDataUtil.ROW_TYPE)); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .hadoopConf(CONF) + .build(); + + // Execute the program. + env.execute("Test Iceberg DataStream"); + + // Assert the iceberg table's records. NOTICE: the FiniteTestSource will checkpoint the same rows twice, so it will + // commit the same row list into iceberg twice. + List expectedRows = Lists.newArrayList(Iterables.concat(convertToRowData(rows), convertToRowData(rows))); + SimpleDataUtil.assertTableRows(tablePath, expectedRows); + } + + private void testWriteRow(TableSchema tableSchema) throws Exception { + List rows = Lists.newArrayList( + Row.of(4, "bar"), + Row.of(5, "apache") + ); + DataStream dataStream = env.addSource(new FiniteTestSource<>(rows), ROW_TYPE_INFO); + + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .hadoopConf(CONF) + .build(); + + // Execute the program. + env.execute("Test Iceberg DataStream."); + + List expectedRows = Lists.newArrayList(Iterables.concat(convertToRowData(rows), convertToRowData(rows))); + SimpleDataUtil.assertTableRows(tablePath, expectedRows); + } + + @Test + public void testWriteRow() throws Exception { + testWriteRow(null); + } + + @Test + public void testWriteRowWithTableSchema() throws Exception { + testWriteRow(SimpleDataUtil.FLINK_SCHEMA); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java new file mode 100644 index 000000000000..e5a9a8d4766b --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestIcebergFilesCommitter { + private static final Configuration CONF = new Configuration(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private String tablePath; + private Table table; + + private final FileFormat format; + + @Parameterized.Parameters(name = "format = {0}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"avro"}, + new Object[] {"orc"}, + new Object[] {"parquet"} + }; + } + + public TestIcebergFilesCommitter(String format) { + this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + } + + @Before + public void before() throws IOException { + File folder = tempFolder.newFolder(); + String warehouse = folder.getAbsolutePath(); + + tablePath = warehouse.concat("/test"); + Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir()); + + // Construct the iceberg table. + Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + table = SimpleDataUtil.createTable(tablePath, props, false); + } + + @Test + public void testCommitTxnWithoutDataFiles() throws Exception { + long checkpointId = 0; + long timestamp = 0; + JobID jobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + SimpleDataUtil.assertTableRows(tablePath, Lists.newArrayList()); + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, -1L); + + // It's better to advance the max-committed-checkpoint-id in iceberg snapshot, so that the future flink job + // failover won't fail. + for (int i = 1; i <= 3; i++) { + harness.snapshot(++checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + assertSnapshotSize(i); + assertMaxCommittedCheckpointId(jobId, checkpointId); + } + } + } + + @Test + public void testCommitTxn() throws Exception { + // Test with 3 continues checkpoints: + // 1. snapshotState for checkpoint#1 + // 2. notifyCheckpointComplete for checkpoint#1 + // 3. snapshotState for checkpoint#2 + // 4. notifyCheckpointComplete for checkpoint#2 + // 5. snapshotState for checkpoint#3 + // 6. notifyCheckpointComplete for checkpoint#3 + long timestamp = 0; + + JobID jobID = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + assertSnapshotSize(0); + + List rows = Lists.newArrayListWithExpectedSize(3); + for (int i = 1; i <= 3; i++) { + RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i); + DataFile dataFile = writeDataFile("data-" + i, ImmutableList.of(rowData)); + harness.processElement(dataFile, ++timestamp); + rows.add(rowData); + + harness.snapshot(i, ++timestamp); + + harness.notifyOfCompletedCheckpoint(i); + + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows)); + assertSnapshotSize(i); + assertMaxCommittedCheckpointId(jobID, i); + } + } + } + + @Test + public void testOrderedEventsBetweenCheckpoints() throws Exception { + // It's possible that two checkpoints happen in the following orders: + // 1. snapshotState for checkpoint#1; + // 2. snapshotState for checkpoint#2; + // 3. notifyCheckpointComplete for checkpoint#1; + // 4. notifyCheckpointComplete for checkpoint#2; + long timestamp = 0; + + JobID jobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData row1 = SimpleDataUtil.createRowData(1, "hello"); + DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); + + harness.processElement(dataFile1, ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + + // 1. snapshotState for checkpoint#1 + long firstCheckpointId = 1; + harness.snapshot(firstCheckpointId, ++timestamp); + + RowData row2 = SimpleDataUtil.createRowData(2, "world"); + DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); + harness.processElement(dataFile2, ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + + // 2. snapshotState for checkpoint#2 + long secondCheckpointId = 2; + harness.snapshot(secondCheckpointId, ++timestamp); + + // 3. notifyCheckpointComplete for checkpoint#1 + harness.notifyOfCompletedCheckpoint(firstCheckpointId); + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1)); + assertMaxCommittedCheckpointId(jobId, firstCheckpointId); + + // 4. notifyCheckpointComplete for checkpoint#2 + harness.notifyOfCompletedCheckpoint(secondCheckpointId); + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + assertMaxCommittedCheckpointId(jobId, secondCheckpointId); + } + } + + @Test + public void testDisorderedEventsBetweenCheckpoints() throws Exception { + // It's possible that the two checkpoints happen in the following orders: + // 1. snapshotState for checkpoint#1; + // 2. snapshotState for checkpoint#2; + // 3. notifyCheckpointComplete for checkpoint#2; + // 4. notifyCheckpointComplete for checkpoint#1; + long timestamp = 0; + + JobID jobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData row1 = SimpleDataUtil.createRowData(1, "hello"); + DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); + + harness.processElement(dataFile1, ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + + // 1. snapshotState for checkpoint#1 + long firstCheckpointId = 1; + harness.snapshot(firstCheckpointId, ++timestamp); + + RowData row2 = SimpleDataUtil.createRowData(2, "world"); + DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); + harness.processElement(dataFile2, ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + + // 2. snapshotState for checkpoint#2 + long secondCheckpointId = 2; + harness.snapshot(secondCheckpointId, ++timestamp); + + // 3. notifyCheckpointComplete for checkpoint#2 + harness.notifyOfCompletedCheckpoint(secondCheckpointId); + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + assertMaxCommittedCheckpointId(jobId, secondCheckpointId); + + // 4. notifyCheckpointComplete for checkpoint#1 + harness.notifyOfCompletedCheckpoint(firstCheckpointId); + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + assertMaxCommittedCheckpointId(jobId, secondCheckpointId); + } + } + + @Test + public void testRecoveryFromInvalidSnapshot() throws Exception { + long checkpointId = 0; + long timestamp = 0; + OperatorSubtaskState snapshot; + + JobID jobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData row = SimpleDataUtil.createRowData(1, "hello"); + DataFile dataFile = writeDataFile("data-1", ImmutableList.of(row)); + + harness.processElement(dataFile, ++timestamp); + snapshot = harness.snapshot(++checkpointId, ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of()); + } + + // Restore from the given snapshot + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + AssertHelpers.assertThrows("Could not restore because there's no valid snapshot.", + IllegalStateException.class, + "There should be an existing iceberg snapshot for current flink job", + () -> { + harness.initializeState(snapshot); + return null; + }); + } + } + + @Test + public void testRecoveryFromValidSnapshot() throws Exception { + long checkpointId = 0; + long timestamp = 0; + List expectedRows = Lists.newArrayList(); + OperatorSubtaskState snapshot; + + JobID jobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData row = SimpleDataUtil.createRowData(1, "hello"); + expectedRows.add(row); + DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row)); + + harness.processElement(dataFile1, ++timestamp); + snapshot = harness.snapshot(++checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row)); + assertSnapshotSize(1); + assertMaxCommittedCheckpointId(jobId, checkpointId); + } + + // Restore from the given snapshot + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + SimpleDataUtil.assertTableRows(tablePath, expectedRows); + assertSnapshotSize(1); + assertMaxCommittedCheckpointId(jobId, checkpointId); + + RowData row = SimpleDataUtil.createRowData(2, "world"); + expectedRows.add(row); + DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); + harness.processElement(dataFile, ++timestamp); + + harness.snapshot(++checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + SimpleDataUtil.assertTableRows(tablePath, expectedRows); + assertSnapshotSize(2); + assertMaxCommittedCheckpointId(jobId, checkpointId); + } + } + + @Test + public void testStartAnotherJobToWriteSameTable() throws Exception { + long checkpointId = 0; + long timestamp = 0; + List rows = Lists.newArrayList(); + List tableRows = Lists.newArrayList(); + + JobID oldJobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(oldJobId)) { + harness.setup(); + harness.open(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(oldJobId, -1L); + + for (int i = 1; i <= 3; i++) { + rows.add(SimpleDataUtil.createRowData(i, "hello" + i)); + tableRows.addAll(rows); + + DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); + harness.processElement(dataFile, ++timestamp); + harness.snapshot(++checkpointId, ++timestamp); + + harness.notifyOfCompletedCheckpoint(checkpointId); + SimpleDataUtil.assertTableRows(tablePath, tableRows); + assertSnapshotSize(i); + assertMaxCommittedCheckpointId(oldJobId, checkpointId); + } + } + + // The new started job will start with checkpoint = 1 again. + checkpointId = 0; + timestamp = 0; + JobID newJobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { + harness.setup(); + harness.open(); + + assertSnapshotSize(3); + assertMaxCommittedCheckpointId(oldJobId, 3); + assertMaxCommittedCheckpointId(newJobId, -1); + + rows.add(SimpleDataUtil.createRowData(2, "world")); + tableRows.addAll(rows); + + DataFile dataFile = writeDataFile("data-new-1", rows); + harness.processElement(dataFile, ++timestamp); + harness.snapshot(++checkpointId, ++timestamp); + + harness.notifyOfCompletedCheckpoint(checkpointId); + SimpleDataUtil.assertTableRows(tablePath, tableRows); + assertSnapshotSize(4); + assertMaxCommittedCheckpointId(newJobId, checkpointId); + } + } + + @Test + public void testMultipleJobsWriteSameTable() throws Exception { + long timestamp = 0; + List tableRows = Lists.newArrayList(); + + JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()}; + for (int i = 0; i < 20; i++) { + int jobIndex = i % 3; + int checkpointId = i / 3; + JobID jobId = jobs[jobIndex]; + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertSnapshotSize(i); + assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 : checkpointId); + + List rows = Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i)); + tableRows.addAll(rows); + + DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); + harness.processElement(dataFile, ++timestamp); + harness.snapshot(checkpointId + 1, ++timestamp); + + harness.notifyOfCompletedCheckpoint(checkpointId + 1); + SimpleDataUtil.assertTableRows(tablePath, tableRows); + assertSnapshotSize(i + 1); + assertMaxCommittedCheckpointId(jobId, checkpointId + 1); + } + } + } + + @Test + public void testBoundedStream() throws Exception { + JobID jobId = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, -1L); + + List tableRows = Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1")); + + DataFile dataFile = writeDataFile("data-1", tableRows); + harness.processElement(dataFile, 1); + ((BoundedOneInput) harness.getOneInputOperator()).endInput(); + + SimpleDataUtil.assertTableRows(tablePath, tableRows); + assertSnapshotSize(1); + assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE); + } + } + + private DataFile writeDataFile(String filename, List rows) throws IOException { + return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows); + } + + private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { + table.refresh(); + long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table, jobID.toString()); + Assert.assertEquals(expectedId, actualId); + } + + private void assertSnapshotSize(int expectedSnapshotSize) { + table.refresh(); + Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size()); + } + + private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) + throws Exception { + TestOperatorFactory factory = TestOperatorFactory.of(tablePath); + return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID)); + } + + private static MockEnvironment createEnvironment(JobID jobID) { + return new MockEnvironmentBuilder() + .setTaskName("test task") + .setManagedMemorySize(32 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(256) + .setTaskConfiguration(new org.apache.flink.configuration.Configuration()) + .setExecutionConfig(new ExecutionConfig()) + .setMaxParallelism(16) + .setJobID(jobID) + .build(); + } + + private static class TestOperatorFactory extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + private final String tablePath; + + private TestOperatorFactory(String tablePath) { + this.tablePath = tablePath; + } + + private static TestOperatorFactory of(String tablePath) { + return new TestOperatorFactory(tablePath); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator(StreamOperatorParameters param) { + IcebergFilesCommitter committer = new IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), CONF); + committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput()); + return (T) committer; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return IcebergFilesCommitter.class; + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java similarity index 97% rename from flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java rename to flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index c6eee4635615..f4e10d56c330 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.io.File; import java.io.IOException; @@ -45,6 +45,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -70,14 +71,15 @@ public class TestIcebergStreamWriter { private final FileFormat format; private final boolean partitioned; - // TODO add Parquet unit test once the readers and writers are ready. @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") public static Object[][] parameters() { return new Object[][] { new Object[] {"avro", true}, new Object[] {"avro", false}, new Object[] {"orc", true}, - new Object[] {"orc", false} + new Object[] {"orc", false}, + new Object[] {"parquet", true}, + new Object[] {"parquet", false} }; } @@ -315,7 +317,7 @@ private OneInputStreamOperatorTestHarness createIcebergStream private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { - IcebergStreamWriter streamWriter = IcebergSinkUtil.createStreamWriter(icebergTable, flinkSchema); + IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java similarity index 99% rename from flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java rename to flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java index 22a8654e9401..d8c0de00f61c 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.util.List; import java.util.stream.Collectors; @@ -31,6 +31,7 @@ import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.RandomRowData; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java similarity index 97% rename from flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java rename to flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index bb3841efcb8f..c4c697974c9c 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.flink; +package org.apache.iceberg.flink.sink; import java.io.File; import java.io.IOException; @@ -35,6 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.data.RandomRowData; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -55,14 +56,15 @@ public class TestTaskWriters { @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - // TODO add Parquet unit test once the readers and writers are ready. @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") public static Object[][] parameters() { return new Object[][] { new Object[] {"avro", true}, new Object[] {"avro", false}, new Object[] {"orc", true}, - new Object[] {"orc", false} + new Object[] {"orc", false}, + new Object[] {"parquet", true}, + new Object[] {"parquet", false} }; }