From 14c028ba1cbe1ea7e3c4b7935fa00566a8e13d5d Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 10 Dec 2020 16:57:26 +0800 Subject: [PATCH 1/7] Flink: Commit both data files and delete files to iceberg transaction. --- .../org/apache/iceberg/io/WriteResult.java | 5 + .../iceberg/flink/sink/DeltaManifests.java | 60 +++++++ .../flink/sink/DeltaManifestsSerializer.java | 94 +++++++++++ .../iceberg/flink/sink/FlinkManifestUtil.java | 58 ++++++- .../apache/iceberg/flink/sink/FlinkSink.java | 3 +- .../flink/sink/IcebergFilesCommitter.java | 122 +++++++++----- .../flink/sink/IcebergStreamWriter.java | 20 +-- .../iceberg/flink/sink/TestFlinkManifest.java | 159 ++++++++++++++---- .../flink/sink/TestIcebergFilesCommitter.java | 69 ++++---- .../flink/sink/TestIcebergStreamWriter.java | 66 +++++--- 10 files changed, 514 insertions(+), 142 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index 681370878ca6..7bb008bed6c1 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -76,6 +76,11 @@ public Builder add(WriteResult result) { return this; } + public Builder add(Iterable results) { + results.forEach(this::add); + return this; + } + public Builder addDataFiles(DataFile... files) { Collections.addAll(dataFiles, files); return this; diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java new file mode 100644 index 000000000000..d58b856c7da0 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.jetbrains.annotations.NotNull; + +class DeltaManifests implements Iterable { + + private final ManifestFile dataManifest; + private final ManifestFile deleteManifest; + + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { + this.dataManifest = dataManifest; + this.deleteManifest = deleteManifest; + } + + ManifestFile dataManifest() { + return dataManifest; + } + + ManifestFile deleteManifest() { + return deleteManifest; + } + + @NotNull + @Override + public Iterator iterator() { + List manifests = Lists.newArrayListWithCapacity(2); + if (dataManifest != null) { + manifests.add(dataManifest); + } + + if (deleteManifest != null) { + manifests.add(deleteManifest); + } + + return manifests.iterator(); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java new file mode 100644 index 000000000000..d99964b30796 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class DeltaManifestsSerializer implements SimpleVersionedSerializer { + private static final int VERSION_NUM = 1; + private static final byte[] EMPTY_BINARY = new byte[0]; + + static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer(); + + @Override + public int getVersion() { + return VERSION_NUM; + } + + @Override + public byte[] serialize(DeltaManifests deltaManifests) throws IOException { + Preconditions.checkNotNull(deltaManifests, "DeltaManifests to be serialized should not be null"); + + ByteArrayOutputStream binaryOut = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(binaryOut); + + byte[] dataManifestBinary = EMPTY_BINARY; + if (deltaManifests.dataManifest() != null) { + dataManifestBinary = ManifestFiles.encode(deltaManifests.dataManifest()); + } + + out.writeInt(dataManifestBinary.length); + out.write(dataManifestBinary); + + byte[] deleteManifestBinary = EMPTY_BINARY; + if (deltaManifests.deleteManifest() != null) { + deleteManifestBinary = ManifestFiles.encode(deltaManifests.deleteManifest()); + } + + out.writeInt(deleteManifestBinary.length); + out.write(deleteManifestBinary); + + return binaryOut.toByteArray(); + } + + @Override + public DeltaManifests deserialize(int version, byte[] serialized) throws IOException { + ByteArrayInputStream binaryIn = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(binaryIn); + + ManifestFile dataManifest = null; + int dataManifestSize = in.readInt(); + if (dataManifestSize > 0) { + byte[] dataManifestBinary = new byte[dataManifestSize]; + Preconditions.checkState(in.read(dataManifestBinary) == dataManifestSize); + + dataManifest = ManifestFiles.decode(dataManifestBinary); + } + + ManifestFile deleteManifest = null; + int deleteManifestSize = in.readInt(); + if (deleteManifestSize > 0) { + byte[] deleteManifestBinary = new byte[deleteManifestSize]; + Preconditions.checkState(in.read(deleteManifestBinary) == deleteManifestSize); + + deleteManifest = ManifestFiles.decode(deleteManifestBinary); + } + + return new DeltaManifests(dataManifest, deleteManifest); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index aa36c7344e67..61401e16dc04 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.List; +import java.util.function.Supplier; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; @@ -32,18 +34,19 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.Lists; class FlinkManifestUtil { - private static final int ICEBERG_FORMAT_VERSION = 2; + private static final int FORMAT_V2 = 2; private static final Long DUMMY_SNAPSHOT_ID = 0L; private FlinkManifestUtil() { } - static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List dataFiles) + private static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { - ManifestWriter writer = ManifestFiles.write(ICEBERG_FORMAT_VERSION, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestWriter writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -63,4 +66,53 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli TableOperations ops = ((HasTableOperations) table).operations(); return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); } + + static DeltaManifests writeCompletedFiles(WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec) throws IOException { + + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + + // Write the completed data files into a newly created data manifest file. + if (result.dataFiles() != null && result.dataFiles().length > 0) { + dataManifest = writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + } + + // Write the completed delete files into a newly created delete manifest file. + if (result.deleteFiles() != null && result.deleteFiles().length > 0) { + OutputFile deleteManifestFile = outputFileSupplier.get(); + + ManifestWriter deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, + deleteManifestFile, DUMMY_SNAPSHOT_ID); + try (ManifestWriter writer = deleteManifestWriter) { + for (DeleteFile deleteFile : result.deleteFiles()) { + writer.add(deleteFile); + } + } + + deleteManifest = deleteManifestWriter.toManifestFile(); + } + + return new DeltaManifests(dataManifest, deleteManifest); + } + + static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException { + WriteResult.Builder builder = WriteResult.builder(); + + // Read the completed data files from persisted data manifest file. + if (deltaManifests.dataManifest() != null) { + builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io)); + } + + // Read the completed delete files from persisted delete manifests file. + if (deltaManifests.deleteManifest() != null) { + try (CloseableIterable deleteFiles = ManifestFiles + .readDeleteManifest(deltaManifests.deleteManifest(), io, null)) { + builder.addDeleteFiles(deleteFiles); + } + } + + return builder.build(); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 98f3b5ba3dbd..5ca68c4713b2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -42,6 +42,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; @@ -189,7 +190,7 @@ public DataStreamSink build() { this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism; DataStream returnStream = rowDataInput - .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) + .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter) .setParallelism(writeParallelism) .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter) .setParallelism(1) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ad890613a073..d76047a3e593 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -40,16 +40,19 @@ import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; @@ -58,7 +61,7 @@ import org.slf4j.LoggerFactory; class IcebergFilesCommitter extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { private static final long serialVersionUID = 1L; private static final long INITIAL_CHECKPOINT_ID = -1L; @@ -85,9 +88,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator // iceberg table when the next checkpoint happen. private final NavigableMap dataFilesPerCheckpoint = Maps.newTreeMap(); - // The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the + // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the // 'dataFilesPerCheckpoint'. - private final List dataFilesOfCurrentCheckpoint = Lists.newArrayList(); + private final List writeResultsOfCurrentCkpt = Lists.newArrayList(); // It will have an unique identifier for one job. private transient String flinkJobId; @@ -165,7 +168,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { jobIdState.add(flinkJobId); // Clear the local buffer for current checkpoint. - dataFilesOfCurrentCheckpoint.clear(); + writeResultsOfCurrentCkpt.clear(); } @Override @@ -184,36 +187,36 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } - private void commitUpToCheckpoint(NavigableMap manifestsMap, + private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException { - NavigableMap pendingManifestMap = manifestsMap.headMap(checkpointId, true); + NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); - List manifestFiles = Lists.newArrayList(); - List pendingDataFiles = Lists.newArrayList(); - for (byte[] manifestData : pendingManifestMap.values()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) { + List deltaManifestsList = Lists.newArrayList(); + NavigableMap pendingResults = Maps.newTreeMap(); + for (Map.Entry e : pendingMap.entrySet()) { + if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) { // Skip the empty flink manifest. continue; } - ManifestFile manifestFile = - SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData); + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); + deltaManifestsList.add(deltaManifests); - manifestFiles.add(manifestFile); - pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io())); + pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io())); } if (replacePartitions) { - replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId); + replacePartitions(pendingResults, newFlinkJobId, checkpointId); } else { - append(pendingDataFiles, newFlinkJobId, checkpointId); + commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId); } - pendingManifestMap.clear(); + pendingMap.clear(); // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint. - for (ManifestFile manifestFile : manifestFiles) { + for (ManifestFile manifestFile : Iterables.concat(deltaManifestsList)) { try { table.io().deleteFile(manifestFile.path()); } catch (Exception e) { @@ -229,33 +232,71 @@ private void commitUpToCheckpoint(NavigableMap manifestsMap, } } - private void replacePartitions(List dataFiles, String newFlinkJobId, long checkpointId) { + private void replacePartitions(NavigableMap pendingResults, String newFlinkJobId, + long checkpointId) { + // Merge all the pending results into a single write result. + WriteResult result = WriteResult.builder().add(pendingResults.values()).build(); + + // Partition overwrite does not support delete files. + Preconditions.checkArgument(result.deleteFiles().length == 0, + "Cannot overwrite partitions with delete files."); ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); + // Commit the overwrite transaction. int numFiles = 0; - for (DataFile file : dataFiles) { + for (DataFile file : result.dataFiles()) { numFiles += 1; dynamicOverwrite.addFile(file); } - commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId); + commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId); } - private void append(List dataFiles, String newFlinkJobId, long checkpointId) { - AppendFiles appendFiles = table.newAppend(); + private void commitDeltaTxn(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) { + // Merge all pending results into a single write result. + WriteResult mergedResult = WriteResult.builder().add(pendingResults.values()).build(); - int numFiles = 0; - for (DataFile file : dataFiles) { - numFiles += 1; - appendFiles.appendFile(file); - } + if (mergedResult.deleteFiles().length < 1) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend(); + + int numFiles = 0; + for (DataFile file : mergedResult.dataFiles()) { + numFiles += 1; + appendFiles.appendFile(file); + } + + commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId); + } else { + // To be compatible with iceberg format V2. + for (Map.Entry e : pendingResults.entrySet()) { + // We don't commit the merged result into a single transaction because for the sequential transaction txn1 and + // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the + // merged one will lead to the incorrect delete semantic. + WriteResult result = e.getValue(); + RowDelta rowDelta = table.newRowDelta(); + + int numDataFiles = 0; + for (DataFile file : result.dataFiles()) { + numDataFiles += 1; + rowDelta.addRows(file); + } + + int numDeleteFiles = 0; + for (DeleteFile file : result.deleteFiles()) { + numDeleteFiles += 1; + rowDelta.addDeletes(file); + } - commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId); + commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, checkpointId); + } + } } - private void commitOperation(SnapshotUpdate operation, int numFiles, String description, + private void commitOperation(SnapshotUpdate operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) { - LOG.info("Committing {} with {} files to table {}", description, numFiles, table); + LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles, + numDeleteFiles, table); operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); operation.set(FLINK_JOB_ID, newFlinkJobId); @@ -266,8 +307,8 @@ private void commitOperation(SnapshotUpdate operation, int numFiles, String d } @Override - public void processElement(StreamRecord element) { - this.dataFilesOfCurrentCheckpoint.add(element.getValue()); + public void processElement(StreamRecord element) { + this.writeResultsOfCurrentCkpt.add(element.getValue()); } @Override @@ -275,7 +316,7 @@ public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. long currentCheckpointId = Long.MAX_VALUE; dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); - dataFilesOfCurrentCheckpoint.clear(); + writeResultsOfCurrentCkpt.clear(); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId); } @@ -284,14 +325,15 @@ public void endInput() throws IOException { * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized bytes. */ private byte[] writeToManifest(long checkpointId) throws IOException { - if (dataFilesOfCurrentCheckpoint.isEmpty()) { + if (writeResultsOfCurrentCkpt.isEmpty()) { return EMPTY_MANIFEST_DATA; } - OutputFile manifestOutputFile = manifestOutputFileFactory.create(checkpointId); - ManifestFile manifestFile = - FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), dataFilesOfCurrentCheckpoint); - return SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, manifestFile); + WriteResult result = WriteResult.builder().add(writeResultsOfCurrentCkpt).build(); + DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, + () -> manifestOutputFileFactory.create(checkpointId), table.spec()); + + return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index ef8b93619d19..6d12310dd1dc 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -25,12 +25,12 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.iceberg.DataFile; import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -class IcebergStreamWriter extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { +class IcebergStreamWriter extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { private static final long serialVersionUID = 1L; @@ -62,9 +62,7 @@ public void open() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // close all open files and emit files to downstream committer operator - for (DataFile dataFile : writer.dataFiles()) { - emit(dataFile); - } + emit(writer.complete()); this.writer = taskWriterFactory.create(); } @@ -86,10 +84,8 @@ public void dispose() throws Exception { @Override public void endInput() throws IOException { // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining - // data files to downstream before closing the writer so that we won't miss any of them. - for (DataFile dataFile : writer.dataFiles()) { - emit(dataFile); - } + // completed files to downstream before closing the writer so that we won't miss any of them. + emit(writer.complete()); } @Override @@ -101,7 +97,7 @@ public String toString() { .toString(); } - private void emit(DataFile dataFile) { - output.collect(new StreamRecord<>(dataFile)); + private void emit(WriteResult result) { + output.collect(new StreamRecord<>(result)); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 9fd15070d7fa..404ead447d6e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -29,15 +29,25 @@ import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -54,7 +64,8 @@ public class TestFlinkManifest { private String tablePath; private Table table; - private final AtomicInteger dataFileCount = new AtomicInteger(0); + private FileAppenderFactory appenderFactory; + private final AtomicInteger fileCount = new AtomicInteger(0); @Before public void before() throws IOException { @@ -66,6 +77,13 @@ public void before() throws IOException { // Construct the iceberg table. table = SimpleDataUtil.createTable(tablePath, ImmutableMap.of(), false); + + int[] equalityFieldIds = new int[] { + table.schema().findField("id").fieldId(), + table.schema().findField("data").fieldId() + }; + this.appenderFactory = new FlinkAppenderFactory(table.schema(), FlinkSchemaUtil.convert(table.schema()), + table.properties(), table.spec(), equalityFieldIds, table.schema(), null); } @@ -75,16 +93,30 @@ public void testIO() throws IOException { for (long checkpointId = 1; checkpointId <= 3; checkpointId++) { ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); - OutputFile manifestOutputFile = factory.create(checkpointId); - - List expectedDataFiles = generateDataFiles(10); - ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), expectedDataFiles); + final long curCkpId = checkpointId; - List actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io()); + List dataFiles = generateDataFiles(10); + List eqDeleteFiles = generateEqDeleteFiles(5); + List posDeleteFiles = generatePosDeleteFiles(5); + DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( + WriteResult.builder() + .addDataFiles(dataFiles) + .addDeleteFiles(eqDeleteFiles) + .addDeleteFiles(posDeleteFiles) + .build(), + () -> factory.create(curCkpId), table.spec()); - Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size()); - for (int i = 0; i < expectedDataFiles.size(); i++) { - checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i)); + WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()); + Assert.assertEquals("Size of data file list are not equal.", 10, result.deleteFiles().length); + for (int i = 0; i < dataFiles.size(); i++) { + checkContentFile(dataFiles.get(i), result.dataFiles()[i]); + } + Assert.assertEquals("Size of delete file list are not equal.", 10, result.dataFiles().length); + for (int i = 0; i < 5; i++) { + checkContentFile(eqDeleteFiles.get(i), result.deleteFiles()[i]); + } + for (int i = 0; i < 5; i++) { + checkContentFile(posDeleteFiles.get(i), result.deleteFiles()[5 + i]); } } } @@ -99,18 +131,27 @@ public void testUserProvidedManifestLocation() throws IOException { ((HasTableOperations) table).operations(), table.io(), props, flinkJobId, 1, 1); - OutputFile outputFile = factory.create(checkpointId); - List expectedDataFiles = generateDataFiles(5); - ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles); + List dataFiles = generateDataFiles(5); + DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( + WriteResult.builder() + .addDataFiles(dataFiles) + .build(), + () -> factory.create(checkpointId), + table.spec()); + Assert.assertNotNull("Data manifest shouldn't be null", deltaManifests.dataManifest()); + Assert.assertNull("Delete manifest should be null", deltaManifests.deleteManifest()); Assert.assertEquals("The newly created manifest file should be located under the user provided directory", - userProvidedFolder.toPath(), Paths.get(manifestFile.path()).getParent()); + userProvidedFolder.toPath(), Paths.get(deltaManifests.dataManifest().path()).getParent()); - List actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io()); + WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()); - Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size()); - for (int i = 0; i < expectedDataFiles.size(); i++) { - checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i)); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(5, result.dataFiles().length); + + Assert.assertEquals("Size of data file list are not equal.", dataFiles.size(), result.dataFiles().length); + for (int i = 0; i < dataFiles.size(); i++) { + checkContentFile(dataFiles.get(i), result.dataFiles()[i]); } } @@ -119,19 +160,27 @@ public void testVersionedSerializer() throws IOException { long checkpointId = 1; String flinkJobId = newFlinkJobId(); ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); - OutputFile outputFile = factory.create(checkpointId); - List expectedDataFiles = generateDataFiles(10); - ManifestFile expected = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles); + List dataFiles = generateDataFiles(10); + List eqDeleteFiles = generateEqDeleteFiles(10); + List posDeleteFiles = generatePosDeleteFiles(10); + DeltaManifests expected = FlinkManifestUtil.writeCompletedFiles( + WriteResult.builder() + .addDataFiles(dataFiles) + .addDeleteFiles(eqDeleteFiles) + .addDeleteFiles(posDeleteFiles) + .build(), + () -> factory.create(checkpointId), table.spec()); byte[] versionedSerializeData = - SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, expected); - ManifestFile actual = SimpleVersionedSerialization - .readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, versionedSerializeData); - checkManifestFile(expected, actual); + SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, expected); + DeltaManifests actual = SimpleVersionedSerialization + .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, versionedSerializeData); + checkManifestFile(expected.dataManifest(), actual.dataManifest()); + checkManifestFile(expected.deleteManifest(), actual.deleteManifest()); byte[] versionedSerializeData2 = - SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, actual); + SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, actual); Assert.assertArrayEquals(versionedSerializeData, versionedSerializeData2); } @@ -140,21 +189,70 @@ private DataFile writeDataFile(String filename, List rows) throws IOExc tablePath, FileFormat.PARQUET.addExtension(filename), rows); } + private DeleteFile writeEqDeleteFile(String filename, List deletes) throws IOException { + EncryptedOutputFile outputFile = + table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), CONF)); + + EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, FileFormat.PARQUET, null); + try (EqualityDeleteWriter writer = eqWriter) { + writer.deleteAll(deletes); + } + return eqWriter.toDeleteFile(); + } + + private DeleteFile writePosDeleteFile(String filename, List> positions) + throws IOException { + EncryptedOutputFile outputFile = + table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), CONF)); + + PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, FileFormat.PARQUET, null); + try (PositionDeleteWriter writer = posWriter) { + for (Pair p : positions) { + writer.delete(p.first(), p.second()); + } + } + return posWriter.toDeleteFile(); + } + private List generateDataFiles(int fileNum) throws IOException { List rowDataList = Lists.newArrayList(); List dataFiles = Lists.newArrayList(); for (int i = 0; i < fileNum; i++) { rowDataList.add(SimpleDataUtil.createRowData(i, "a" + i)); - dataFiles.add(writeDataFile("data-file-" + dataFileCount.incrementAndGet(), rowDataList)); + dataFiles.add(writeDataFile("data-file-" + fileCount.incrementAndGet(), rowDataList)); } return dataFiles; } + private List generateEqDeleteFiles(int fileNum) throws IOException { + List rowDataList = Lists.newArrayList(); + List deleteFiles = Lists.newArrayList(); + for (int i = 0; i < fileNum; i++) { + rowDataList.add(SimpleDataUtil.createDelete(i, "a" + i)); + deleteFiles.add(writeEqDeleteFile("eq-delete-file-" + fileCount.incrementAndGet(), rowDataList)); + } + return deleteFiles; + } + + private List generatePosDeleteFiles(int fileNum) throws IOException { + List> positions = Lists.newArrayList(); + List deleteFiles = Lists.newArrayList(); + for (int i = 0; i < fileNum; i++) { + positions.add(Pair.of("data-file-1", (long) i)); + deleteFiles.add(writePosDeleteFile("pos-delete-file-" + fileCount.incrementAndGet(), positions)); + } + return deleteFiles; + } + private static String newFlinkJobId() { return UUID.randomUUID().toString(); } private static void checkManifestFile(ManifestFile expected, ManifestFile actual) { + if (expected == actual) { + return; + } + Assert.assertTrue("Should not be null.", expected != null && actual != null); Assert.assertEquals("Path must match", expected.path(), actual.path()); Assert.assertEquals("Length must match", expected.length(), actual.length()); Assert.assertEquals("Spec id must match", expected.partitionSpecId(), actual.partitionSpecId()); @@ -174,11 +272,11 @@ private static void checkManifestFile(ManifestFile expected, ManifestFile actual Assert.assertEquals("PartitionFieldSummary must match", expected.partitions(), actual.partitions()); } - static void checkDataFile(DataFile expected, DataFile actual) { + static void checkContentFile(ContentFile expected, ContentFile actual) { if (expected == actual) { return; } - Assert.assertTrue("Shouldn't have null DataFile.", expected != null && actual != null); + Assert.assertTrue("Shouldn't be null.", expected != null && actual != null); Assert.assertEquals("SpecId", expected.specId(), actual.specId()); Assert.assertEquals("Content", expected.content(), actual.content()); Assert.assertEquals("Path", expected.path(), actual.path()); @@ -193,7 +291,6 @@ static void checkDataFile(DataFile expected, DataFile actual) { Assert.assertEquals("Upper bounds", expected.upperBounds(), actual.upperBounds()); Assert.assertEquals("Key metadata", expected.keyMetadata(), actual.keyMetadata()); Assert.assertEquals("Split offsets", expected.splitOffsets(), actual.splitOffsets()); - Assert.assertNull(actual.equalityFieldIds()); - Assert.assertNull(expected.equalityFieldIds()); + Assert.assertEquals("Equality field id list", actual.equalityFieldIds(), expected.equalityFieldIds()); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 583b6f18e336..b3a1743f5208 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -49,6 +49,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -113,7 +114,7 @@ public void testCommitTxnWithoutDataFiles() throws Exception { long checkpointId = 0; long timestamp = 0; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -136,6 +137,10 @@ public void testCommitTxnWithoutDataFiles() throws Exception { } } + private WriteResult of(DataFile dataFile) { + return WriteResult.builder().addDataFiles(dataFile).build(); + } + @Test public void testCommitTxn() throws Exception { // Test with 3 continues checkpoints: @@ -148,7 +153,7 @@ public void testCommitTxn() throws Exception { long timestamp = 0; JobID jobID = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { harness.setup(); harness.open(); assertSnapshotSize(0); @@ -157,7 +162,7 @@ public void testCommitTxn() throws Exception { for (int i = 1; i <= 3; i++) { RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i); DataFile dataFile = writeDataFile("data-" + i, ImmutableList.of(rowData)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); rows.add(rowData); harness.snapshot(i, ++timestamp); @@ -183,7 +188,7 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { long timestamp = 0; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -192,7 +197,7 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -202,7 +207,7 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { RowData row2 = SimpleDataUtil.createRowData(2, "world"); DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); - harness.processElement(dataFile2, ++timestamp); + harness.processElement(of(dataFile2), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 2. snapshotState for checkpoint#2 @@ -234,7 +239,7 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { long timestamp = 0; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -243,7 +248,7 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -253,7 +258,7 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { RowData row2 = SimpleDataUtil.createRowData(2, "world"); DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); - harness.processElement(dataFile2, ++timestamp); + harness.processElement(of(dataFile2), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 2. snapshotState for checkpoint#2 @@ -283,7 +288,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { OperatorSubtaskState snapshot; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -294,7 +299,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { expectedRows.add(row); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -307,7 +312,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { } // Restore from the given snapshot - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -319,7 +324,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { RowData row = SimpleDataUtil.createRowData(2, "world"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -342,7 +347,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except OperatorSubtaskState snapshot; List expectedRows = Lists.newArrayList(); JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -352,7 +357,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(1, "hello"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-1", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of()); @@ -360,7 +365,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except assertFlinkManifests(1); } - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -385,7 +390,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(2, "world"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -393,7 +398,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except // Redeploying flink job from external checkpoint. JobID newJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -409,7 +414,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(3, "foo"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-3", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -431,7 +436,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { List tableRows = Lists.newArrayList(); JobID oldJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(oldJobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(oldJobId)) { harness.setup(); harness.open(); @@ -443,7 +448,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -460,7 +465,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { checkpointId = 0; timestamp = 0; JobID newJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { harness.setup(); harness.open(); @@ -472,7 +477,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile("data-new-1", rows); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -494,7 +499,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { int jobIndex = i % 3; int checkpointId = i / 3; JobID jobId = jobs[jobIndex]; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -505,7 +510,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(checkpointId + 1, ++timestamp); assertFlinkManifests(1); @@ -521,7 +526,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { @Test public void testBoundedStream() throws Exception { JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -532,7 +537,7 @@ public void testBoundedStream() throws Exception { List tableRows = Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1")); DataFile dataFile = writeDataFile("data-1", tableRows); - harness.processElement(dataFile, 1); + harness.processElement(of(dataFile), 1); ((BoundedOneInput) harness.getOneInputOperator()).endInput(); assertFlinkManifests(0); @@ -548,7 +553,7 @@ public void testFlinkManifests() throws Exception { final long checkpoint = 10; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -557,7 +562,7 @@ public void testFlinkManifests() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -570,7 +575,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); Assert.assertEquals(1, dataFiles.size()); - TestFlinkManifest.checkDataFile(dataFile1, dataFiles.get(0)); + TestFlinkManifest.checkContentFile(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(checkpoint); @@ -609,7 +614,7 @@ private void assertSnapshotSize(int expectedSnapshotSize) { Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size()); } - private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) + private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) throws Exception { TestOperatorFactory factory = TestOperatorFactory.of(tablePath); return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID)); @@ -629,7 +634,7 @@ private static MockEnvironment createEnvironment(JobID jobID) { } private static class TestOperatorFactory extends AbstractStreamOperatorFactory - implements OneInputStreamOperatorFactory { + implements OneInputStreamOperatorFactory { private final String tablePath; private TestOperatorFactory(String tablePath) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 80771a4d233f..3419916cddfa 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; @@ -47,6 +48,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -101,7 +103,7 @@ public void before() throws IOException { @Test public void testWritingTable() throws Exception { long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { // The first checkpoint testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 1); @@ -109,7 +111,9 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); long expectedDataFiles = partitioned ? 2 : 1; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); checkpointId = checkpointId + 1; @@ -119,11 +123,13 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); expectedDataFiles = partitioned ? 4 : 2; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // Commit the iceberg transaction. AppendFiles appendFiles = table.newAppend(); - testHarness.extractOutputValues().forEach(appendFiles::appendFile); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); // Assert the table records. @@ -141,31 +147,36 @@ public void testWritingTable() throws Exception { public void testSnapshotTwice() throws Exception { long checkpointId = 1; long timestamp = 1; - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), timestamp++); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), timestamp); testHarness.prepareSnapshotPreBarrier(checkpointId++); long expectedDataFiles = partitioned ? 2 : 1; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // snapshot again immediately. for (int i = 0; i < 5; i++) { testHarness.prepareSnapshotPreBarrier(checkpointId++); - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + + result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); } } } @Test public void testTableWithoutSnapshot() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { Assert.assertEquals(0, testHarness.extractOutputValues().size()); } // Even if we closed the iceberg stream writer, there's no orphan data file. Assert.assertEquals(0, scanDataFiles().size()); - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); // Still not emit the data file yet, because there is no checkpoint. Assert.assertEquals(0, testHarness.extractOutputValues().size()); @@ -197,7 +208,7 @@ private Set scanDataFiles() throws IOException { @Test public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2); @@ -205,11 +216,16 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); long expectedDataFiles = partitioned ? 2 : 1; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // invoke endInput again. ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); - Assert.assertEquals(expectedDataFiles * 2, testHarness.extractOutputValues().size()); + + result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles * 2, result.dataFiles().length); } } @@ -233,23 +249,25 @@ public void testTableWithTargetFileSize() throws Exception { } } - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { for (RowData row : rows) { testHarness.processElement(row, 1); } // snapshot the operator. testHarness.prepareSnapshotPreBarrier(1); - Assert.assertEquals(8, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(8, result.dataFiles().length); // Assert that the data file have the expected records. - for (DataFile serDataFile : testHarness.extractOutputValues()) { - Assert.assertEquals(1000, serDataFile.recordCount()); + for (DataFile dataFile : result.dataFiles()) { + Assert.assertEquals(1000, dataFile.recordCount()); } // Commit the iceberg transaction. AppendFiles appendFiles = table.newAppend(); - testHarness.extractOutputValues().forEach(appendFiles::appendFile); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); } @@ -294,31 +312,33 @@ public void testPromotedFlinkDataType() throws Exception { record.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103)) ); - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, flinkSchema)) { for (RowData row : rows) { testHarness.processElement(row, 1); } testHarness.prepareSnapshotPreBarrier(1); - Assert.assertEquals(partitioned ? 3 : 1, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(partitioned ? 3 : 1, result.dataFiles().length); // Commit the iceberg transaction. AppendFiles appendFiles = icebergTable.newAppend(); - testHarness.extractOutputValues().forEach(appendFiles::appendFile); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); } SimpleDataUtil.assertTableRecords(location, expected); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { + private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { return createIcebergStreamWriter(table, SimpleDataUtil.FLINK_SCHEMA); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter( + private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema); - OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); harness.setup(); From b5d2b09be04560167ccf2c5dd9cc10f3ba0d05a2 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 16 Dec 2020 20:57:20 +0800 Subject: [PATCH 2/7] Minor changes. --- .../flink/sink/FlinkManifestSerializer.java | 48 ----- .../flink/sink/IcebergFilesCommitter.java | 4 +- .../apache/iceberg/flink/SimpleDataUtil.java | 53 ++++- .../iceberg/flink/sink/TestFlinkManifest.java | 26 +-- .../flink/sink/TestIcebergFilesCommitter.java | 193 +++++++++++++----- 5 files changed, 199 insertions(+), 125 deletions(-) delete mode 100644 flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java deleted file mode 100644 index bec4e65d0cad..000000000000 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; - -class FlinkManifestSerializer implements SimpleVersionedSerializer { - private static final int VERSION_NUM = 1; - static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer(); - - @Override - public int getVersion() { - return VERSION_NUM; - } - - @Override - public byte[] serialize(ManifestFile manifestFile) throws IOException { - Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null"); - - return ManifestFiles.encode(manifestFile); - } - - @Override - public ManifestFile deserialize(int version, byte[] serialized) throws IOException { - return ManifestFiles.decode(serialized); - } -} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index d76047a3e593..7d3ac1bb8f48 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -215,7 +215,7 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, pendingMap.clear(); - // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint. + // Delete the committed manifests and clear the committed files from dataFilesPerCheckpoint. for (ManifestFile manifestFile : Iterables.concat(deltaManifestsList)) { try { table.io().deleteFile(manifestFile.path()); @@ -288,7 +288,7 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri rowDelta.addDeletes(file); } - commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, checkpointId); + commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey()); } } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index dc6528a289ee..1035a6f3603c 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -40,8 +41,12 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; @@ -51,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.junit.Assert; import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; @@ -133,14 +139,51 @@ public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuratio .build(); } - public static void assertTableRows(String tablePath, List expected) throws IOException { - List expectedRecords = Lists.newArrayList(); - for (RowData row : expected) { + public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String tablePath, String filename, + FileAppenderFactory appenderFactory, + List deletes) throws IOException { + EncryptedOutputFile outputFile = + table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), new Configuration())); + + EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null); + try (EqualityDeleteWriter writer = eqWriter) { + writer.deleteAll(deletes); + } + return eqWriter.toDeleteFile(); + } + + public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String tablePath, + String filename, + FileAppenderFactory appenderFactory, + List> positions) throws IOException { + EncryptedOutputFile outputFile = + table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), new Configuration())); + + PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null); + try (PositionDeleteWriter writer = posWriter) { + for (Pair p : positions) { + writer.delete(p.first(), p.second()); + } + } + return posWriter.toDeleteFile(); + } + + private static List convertToRecords(List rows) { + List records = Lists.newArrayList(); + for (RowData row : rows) { Integer id = row.isNullAt(0) ? null : row.getInt(0); String data = row.isNullAt(1) ? null : row.getString(1).toString(); - expectedRecords.add(createRecord(id, data)); + records.add(createRecord(id, data)); } - assertTableRecords(tablePath, expectedRecords); + return records; + } + + public static void assertTableRows(String tablePath, List expected) throws IOException { + assertTableRecords(tablePath, convertToRecords(expected)); + } + + public static void assertTableRows(Table table, List expected) throws IOException { + assertTableRecords(table, convertToRecords(expected)); } public static void assertTableRecords(Table table, List expected) throws IOException { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 404ead447d6e..345b835e909a 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -29,7 +29,6 @@ import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -37,12 +36,8 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Table; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.deletes.PositionDeleteWriter; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -190,28 +185,13 @@ private DataFile writeDataFile(String filename, List rows) throws IOExc } private DeleteFile writeEqDeleteFile(String filename, List deletes) throws IOException { - EncryptedOutputFile outputFile = - table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), CONF)); - - EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, FileFormat.PARQUET, null); - try (EqualityDeleteWriter writer = eqWriter) { - writer.deleteAll(deletes); - } - return eqWriter.toDeleteFile(); + return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); } private DeleteFile writePosDeleteFile(String filename, List> positions) throws IOException { - EncryptedOutputFile outputFile = - table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), CONF)); - - PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, FileFormat.PARQUET, null); - try (PositionDeleteWriter writer = posWriter) { - for (Pair p : positions) { - writer.delete(p.first(), p.second()); - } - } - return posWriter.toDeleteFile(); + return SimpleDataUtil + .writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions); } private List generateDataFiles(int fileNum) throws IOException { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index b3a1743f5208..4b40b86181b0 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -25,7 +25,6 @@ import java.nio.file.Path; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.stream.Collectors; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -42,22 +41,26 @@ import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.GenericManifestFile; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -65,48 +68,47 @@ import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION; @RunWith(Parameterized.class) -public class TestIcebergFilesCommitter { +public class TestIcebergFilesCommitter extends TableTestBase { private static final Configuration CONF = new Configuration(); - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - private String tablePath; - private Table table; private File flinkManifestFolder; private final FileFormat format; - @Parameterized.Parameters(name = "format = {0}") + @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}") public static Object[][] parameters() { return new Object[][] { - new Object[] {"avro"}, - new Object[] {"orc"}, - new Object[] {"parquet"} + new Object[] {"avro", 1}, + new Object[] {"avro", 2}, + new Object[] {"parquet", 1}, + new Object[] {"parquet", 2}, + new Object[] {"orc", 1}, }; } - public TestIcebergFilesCommitter(String format) { + public TestIcebergFilesCommitter(String format, int formatVersion) { + super(formatVersion); this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); } @Before - public void before() throws IOException { - File folder = tempFolder.newFolder(); - flinkManifestFolder = tempFolder.newFolder(); - String warehouse = folder.getAbsolutePath(); + public void setupTable() throws IOException { + flinkManifestFolder = temp.newFolder(); + + this.tableDir = temp.newFolder(); + this.metadataDir = new File(tableDir, "metadata"); + Assert.assertTrue(tableDir.delete()); - tablePath = warehouse.concat("/test"); - Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir()); + tablePath = tableDir.getAbsolutePath(); // Construct the iceberg table. - Map props = ImmutableMap.of( - // file format. - DEFAULT_FILE_FORMAT, format.name(), - // temporary flink manifests location. - FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath() - ); - table = SimpleDataUtil.createTable(tablePath, props, false); + table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); + + table.updateProperties() + .set(DEFAULT_FILE_FORMAT, format.name()) + .set(FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath()) + .commit(); } @Test @@ -118,7 +120,7 @@ public void testCommitTxnWithoutDataFiles() throws Exception { harness.setup(); harness.open(); - SimpleDataUtil.assertTableRows(tablePath, Lists.newArrayList()); + SimpleDataUtil.assertTableRows(table, Lists.newArrayList()); assertSnapshotSize(0); assertMaxCommittedCheckpointId(jobId, -1L); @@ -171,7 +173,7 @@ public void testCommitTxn() throws Exception { harness.notifyOfCompletedCheckpoint(i); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows)); + SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows)); assertSnapshotSize(i); assertMaxCommittedCheckpointId(jobID, i); } @@ -217,13 +219,13 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(firstCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); assertMaxCommittedCheckpointId(jobId, firstCheckpointId); assertFlinkManifests(1); // 4. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(secondCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); assertMaxCommittedCheckpointId(jobId, secondCheckpointId); assertFlinkManifests(0); } @@ -268,13 +270,13 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { // 3. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(secondCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); assertMaxCommittedCheckpointId(jobId, secondCheckpointId); assertFlinkManifests(0); // 4. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(firstCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); assertMaxCommittedCheckpointId(jobId, secondCheckpointId); assertFlinkManifests(0); } @@ -306,7 +308,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row)); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, checkpointId); } @@ -317,7 +319,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.initializeState(snapshot); harness.open(); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, checkpointId); @@ -332,7 +334,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, checkpointId); } @@ -360,7 +362,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.processElement(of(dataFile), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of()); + SimpleDataUtil.assertTableRows(table, ImmutableList.of()); assertMaxCommittedCheckpointId(jobId, -1L); assertFlinkManifests(1); } @@ -373,7 +375,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except // All flink manifests should be cleaned because it has committed the unfinished iceberg transaction. assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertMaxCommittedCheckpointId(jobId, checkpointId); harness.snapshot(++checkpointId, ++timestamp); @@ -383,7 +385,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, checkpointId); @@ -408,7 +410,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except assertMaxCommittedCheckpointId(newJobId, -1); assertMaxCommittedCheckpointId(jobId, checkpointId); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(3); RowData row = SimpleDataUtil.createRowData(3, "foo"); @@ -422,7 +424,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(4); assertMaxCommittedCheckpointId(newJobId, checkpointId); } @@ -455,7 +457,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(i); assertMaxCommittedCheckpointId(oldJobId, checkpointId); } @@ -483,7 +485,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(4); assertMaxCommittedCheckpointId(newJobId, checkpointId); } @@ -516,7 +518,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId + 1); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(i + 1); assertMaxCommittedCheckpointId(jobId, checkpointId + 1); } @@ -541,7 +543,7 @@ public void testBoundedStream() throws Exception { ((BoundedOneInput) harness.getOneInputOperator()).endInput(); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE); } @@ -579,12 +581,86 @@ public void testFlinkManifests() throws Exception { // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(checkpoint); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + assertMaxCommittedCheckpointId(jobId, checkpoint); + assertFlinkManifests(0); + } + } + + @Test + public void testDeleteFiles() throws Exception { + Assume.assumeFalse("Support equality-delete in format v2.", formatVersion < 2); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + + int[] equalityFieldIds = new int[] { + table.schema().findField("id").fieldId(), + table.schema().findField("data").fieldId() + }; + FileAppenderFactory appenderFactory = new FlinkAppenderFactory(table.schema(), + FlinkSchemaUtil.convert(table.schema()), table.properties(), table.spec(), equalityFieldIds, + table.schema(), null); + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData row1 = SimpleDataUtil.createInsert(1, "aaa"); + DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(row1)); + WriteResult result = WriteResult.builder().addDataFiles(dataFile1).build(); + harness.processElement(result, ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + + // 1. snapshotState for checkpoint#1 + harness.snapshot(checkpoint, ++timestamp); + List manifestPaths = assertFlinkManifests(1); + Path manifestPath = manifestPaths.get(0); + Assert.assertEquals("File name should have the expected pattern.", + String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); + + // 2. Read the data files from manifests and assert. + List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); + Assert.assertEquals(1, dataFiles.size()); + TestFlinkManifest.checkContentFile(dataFile1, dataFiles.get(0)); + + // 3. notifyCheckpointComplete for checkpoint#1 + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + assertMaxCommittedCheckpointId(jobId, checkpoint); + assertFlinkManifests(0); + + // 4. process both data files and delete files. + RowData row2 = SimpleDataUtil.createInsert(2, "bbb"); + DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(row2)); + + RowData delete1 = SimpleDataUtil.createDelete(1, "aaa"); + DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete1)); + result = WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(); + harness.processElement(result, ++timestamp); + assertMaxCommittedCheckpointId(jobId, checkpoint); + + // 5. snapshotState for checkpoint#2 + harness.snapshot(++checkpoint, ++timestamp); + assertFlinkManifests(2); + + // 6. notifyCheckpointComplete for checkpoint#2 + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2)); assertMaxCommittedCheckpointId(jobId, checkpoint); assertFlinkManifests(0); } } + private DeleteFile writeEqDeleteFile(FileAppenderFactory appenderFactory, + String filename, List deletes) throws IOException { + return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); + } + private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null); @@ -648,7 +724,7 @@ private static TestOperatorFactory of(String tablePath) { @Override @SuppressWarnings("unchecked") public > T createStreamOperator(StreamOperatorParameters param) { - IcebergFilesCommitter committer = new IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), false); + IcebergFilesCommitter committer = new IcebergFilesCommitter(new TestTableLoader(tablePath), false); committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput()); return (T) committer; } @@ -658,4 +734,27 @@ public Class getStreamOperatorClass(ClassLoader classL return IcebergFilesCommitter.class; } } + + private static class TestTableLoader implements TableLoader { + private File dir = null; + + TestTableLoader(String dir) { + this.dir = new File(dir); + } + + @Override + public void open() { + + } + + @Override + public Table loadTable() { + return TestTables.load(dir, "test"); + } + + @Override + public void close() { + + } + } } From c00716509e57423ba3e7cf21212a0f674004a2c7 Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 17 Dec 2020 15:13:42 +0800 Subject: [PATCH 3/7] Addressing the comments. --- .../java/org/apache/iceberg/io/WriteResult.java | 2 +- .../iceberg/flink/sink/DeltaManifests.java | 2 -- .../flink/sink/IcebergFilesCommitter.java | 6 +++--- .../flink/sink/TestIcebergStreamWriter.java | 16 ++++++++-------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index 7bb008bed6c1..af1528142d7f 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -76,7 +76,7 @@ public Builder add(WriteResult result) { return this; } - public Builder add(Iterable results) { + public Builder addAll(Iterable results) { results.forEach(this::add); return this; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java index d58b856c7da0..d46dff3d3151 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.jetbrains.annotations.NotNull; class DeltaManifests implements Iterable { @@ -43,7 +42,6 @@ ManifestFile deleteManifest() { return deleteManifest; } - @NotNull @Override public Iterator iterator() { List manifests = Lists.newArrayListWithCapacity(2); diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 7d3ac1bb8f48..5a04208c08ae 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -235,7 +235,7 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, private void replacePartitions(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) { // Merge all the pending results into a single write result. - WriteResult result = WriteResult.builder().add(pendingResults.values()).build(); + WriteResult result = WriteResult.builder().addAll(pendingResults.values()).build(); // Partition overwrite does not support delete files. Preconditions.checkArgument(result.deleteFiles().length == 0, @@ -254,7 +254,7 @@ private void replacePartitions(NavigableMap pendingResults, S private void commitDeltaTxn(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) { // Merge all pending results into a single write result. - WriteResult mergedResult = WriteResult.builder().add(pendingResults.values()).build(); + WriteResult mergedResult = WriteResult.builder().addAll(pendingResults.values()).build(); if (mergedResult.deleteFiles().length < 1) { // To be compatible with iceberg format V1. @@ -329,7 +329,7 @@ private byte[] writeToManifest(long checkpointId) throws IOException { return EMPTY_MANIFEST_DATA; } - WriteResult result = WriteResult.builder().add(writeResultsOfCurrentCkpt).build(); + WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> manifestOutputFileFactory.create(checkpointId), table.spec()); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 3419916cddfa..28db89456f7e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -111,7 +111,7 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); long expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); @@ -123,7 +123,7 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); expectedDataFiles = partitioned ? 4 : 2; - result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); @@ -153,7 +153,7 @@ public void testSnapshotTwice() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId++); long expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); @@ -161,7 +161,7 @@ public void testSnapshotTwice() throws Exception { for (int i = 0; i < 5; i++) { testHarness.prepareSnapshotPreBarrier(checkpointId++); - result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); } @@ -216,14 +216,14 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); long expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // invoke endInput again. ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); - result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(expectedDataFiles * 2, result.dataFiles().length); } @@ -256,7 +256,7 @@ public void testTableWithTargetFileSize() throws Exception { // snapshot the operator. testHarness.prepareSnapshotPreBarrier(1); - WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(8, result.dataFiles().length); @@ -318,7 +318,7 @@ public void testPromotedFlinkDataType() throws Exception { testHarness.processElement(row, 1); } testHarness.prepareSnapshotPreBarrier(1); - WriteResult result = WriteResult.builder().add(testHarness.extractOutputValues()).build(); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(partitioned ? 3 : 1, result.dataFiles().length); From 1eae61a63938e85eb40eee9c27f39889b9bf2e07 Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 17 Dec 2020 15:53:08 +0800 Subject: [PATCH 4/7] Add unit tests to address the state compatibility issues. --- .../flink/sink/DeltaManifestsSerializer.java | 25 ++++++++--- .../iceberg/flink/sink/FlinkManifestUtil.java | 2 +- .../flink/sink/IcebergFilesCommitter.java | 17 ++++--- .../iceberg/flink/sink/TestFlinkManifest.java | 44 +++++++++++++++++++ 4 files changed, 73 insertions(+), 15 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java index d99964b30796..b37009b13f5d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -30,14 +30,15 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; class DeltaManifestsSerializer implements SimpleVersionedSerializer { - private static final int VERSION_NUM = 1; + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; private static final byte[] EMPTY_BINARY = new byte[0]; static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer(); @Override public int getVersion() { - return VERSION_NUM; + return VERSION_2; } @Override @@ -68,10 +69,26 @@ public byte[] serialize(DeltaManifests deltaManifests) throws IOException { @Override public DeltaManifests deserialize(int version, byte[] serialized) throws IOException { + if (version == VERSION_1) { + return deserializeV1(serialized); + } else if (version == VERSION_2) { + return deserializeV2(serialized); + } else { + throw new RuntimeException("Unknown serialize version: " + version); + } + } + + private DeltaManifests deserializeV1(byte[] serialized) throws IOException { + return new DeltaManifests(ManifestFiles.decode(serialized), null); + } + + private DeltaManifests deserializeV2(byte[] serialized) throws IOException { + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + ByteArrayInputStream binaryIn = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(binaryIn); - ManifestFile dataManifest = null; int dataManifestSize = in.readInt(); if (dataManifestSize > 0) { byte[] dataManifestBinary = new byte[dataManifestSize]; @@ -80,7 +97,6 @@ public DeltaManifests deserialize(int version, byte[] serialized) throws IOExcep dataManifest = ManifestFiles.decode(dataManifestBinary); } - ManifestFile deleteManifest = null; int deleteManifestSize = in.readInt(); if (deleteManifestSize > 0) { byte[] deleteManifestBinary = new byte[deleteManifestSize]; @@ -88,7 +104,6 @@ public DeltaManifests deserialize(int version, byte[] serialized) throws IOExcep deleteManifest = ManifestFiles.decode(deleteManifestBinary); } - return new DeltaManifests(dataManifest, deleteManifest); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 61401e16dc04..70827e16e83d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -44,7 +44,7 @@ class FlinkManifestUtil { private FlinkManifestUtil() { } - private static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List dataFiles) + static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { ManifestWriter writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 5a04208c08ae..52c5ea0c60d6 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -192,7 +192,7 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, long checkpointId) throws IOException { NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); - List deltaManifestsList = Lists.newArrayList(); + List manifests = Lists.newArrayList(); NavigableMap pendingResults = Maps.newTreeMap(); for (Map.Entry e : pendingMap.entrySet()) { if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) { @@ -200,11 +200,10 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, continue; } - DeltaManifests deltaManifests = - SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); - deltaManifestsList.add(deltaManifests); - + DeltaManifests deltaManifests = SimpleVersionedSerialization + .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io())); + Iterables.addAll(manifests, deltaManifests); } if (replacePartitions) { @@ -215,16 +214,16 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, pendingMap.clear(); - // Delete the committed manifests and clear the committed files from dataFilesPerCheckpoint. - for (ManifestFile manifestFile : Iterables.concat(deltaManifestsList)) { + // Delete the committed manifests. + for (ManifestFile manifest : manifests) { try { - table.io().deleteFile(manifestFile.path()); + table.io().deleteFile(manifest.path()); } catch (Exception e) { // The flink manifests cleaning failure shouldn't abort the completed checkpoint. String details = MoreObjects.toStringHelper(this) .add("flinkJobId", newFlinkJobId) .add("checkpointId", checkpointId) - .add("manifestPath", manifestFile.path()) + .add("manifestPath", manifest.path()) .toString(); LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, e); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 345b835e909a..b8f121e4f63d 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.ContentFile; @@ -35,6 +36,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; @@ -179,6 +181,48 @@ public void testVersionedSerializer() throws IOException { Assert.assertArrayEquals(versionedSerializeData, versionedSerializeData2); } + @Test + public void testCompatibility() throws IOException { + // The v2 deserializer should be able to deserialize the v1 binary. + long checkpointId = 1; + String flinkJobId = newFlinkJobId(); + ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + + List dataFiles = generateDataFiles(10); + ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles); + byte[] dataV1 = SimpleVersionedSerialization.writeVersionAndSerialize(new V1Serializer(), manifest); + + DeltaManifests delta = + SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, dataV1); + Assert.assertNull("Serialization v1 don't include delete files.", delta.deleteManifest()); + Assert.assertNotNull("Serialization v1 should not have null data manifest.", delta.dataManifest()); + checkManifestFile(manifest, delta.dataManifest()); + + List actualFiles = FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io()); + Assert.assertEquals(10, actualFiles.size()); + for (int i = 0; i < 10; i++) { + checkContentFile(dataFiles.get(i), actualFiles.get(i)); + } + } + + private static class V1Serializer implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(ManifestFile m) throws IOException { + return ManifestFiles.encode(m); + } + + @Override + public ManifestFile deserialize(int version, byte[] serialized) throws IOException { + return ManifestFiles.decode(serialized); + } + } + private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, FileFormat.PARQUET.addExtension(filename), rows); From 0fdec6764e2b8d4962bc027512895581ba31b89f Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 17 Dec 2020 17:27:27 +0800 Subject: [PATCH 5/7] Minor changes. --- .../flink/sink/IcebergFilesCommitter.java | 43 +++++++------------ .../flink/sink/TestIcebergFilesCommitter.java | 12 +++--- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 52c5ea0c60d6..389d33392cac 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -39,8 +39,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; @@ -233,36 +231,33 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, private void replacePartitions(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) { - // Merge all the pending results into a single write result. - WriteResult result = WriteResult.builder().addAll(pendingResults.values()).build(); - // Partition overwrite does not support delete files. - Preconditions.checkArgument(result.deleteFiles().length == 0, - "Cannot overwrite partitions with delete files."); - ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); + int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum(); + Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files."); // Commit the overwrite transaction. + ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); + int numFiles = 0; - for (DataFile file : result.dataFiles()) { - numFiles += 1; - dynamicOverwrite.addFile(file); + for (WriteResult result : pendingResults.values()) { + numFiles += result.dataFiles().length; + Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); } commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId); } private void commitDeltaTxn(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) { - // Merge all pending results into a single write result. - WriteResult mergedResult = WriteResult.builder().addAll(pendingResults.values()).build(); + int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum(); - if (mergedResult.deleteFiles().length < 1) { + if (deleteFilesNum == 0) { // To be compatible with iceberg format V1. AppendFiles appendFiles = table.newAppend(); int numFiles = 0; - for (DataFile file : mergedResult.dataFiles()) { - numFiles += 1; - appendFiles.appendFile(file); + for (WriteResult result : pendingResults.values()) { + numFiles += result.dataFiles().length; + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId); @@ -275,17 +270,11 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri WriteResult result = e.getValue(); RowDelta rowDelta = table.newRowDelta(); - int numDataFiles = 0; - for (DataFile file : result.dataFiles()) { - numDataFiles += 1; - rowDelta.addRows(file); - } + int numDataFiles = result.dataFiles().length; + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - int numDeleteFiles = 0; - for (DeleteFile file : result.deleteFiles()) { - numDeleteFiles += 1; - rowDelta.addDeletes(file); - } + int numDeleteFiles = result.deleteFiles().length; + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey()); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 4b40b86181b0..ecc7c3875f13 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -589,7 +589,7 @@ public void testFlinkManifests() throws Exception { @Test public void testDeleteFiles() throws Exception { - Assume.assumeFalse("Support equality-delete in format v2.", formatVersion < 2); + Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); long timestamp = 0; long checkpoint = 10; @@ -612,8 +612,7 @@ public void testDeleteFiles() throws Exception { RowData row1 = SimpleDataUtil.createInsert(1, "aaa"); DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(row1)); - WriteResult result = WriteResult.builder().addDataFiles(dataFile1).build(); - harness.processElement(result, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -640,8 +639,11 @@ public void testDeleteFiles() throws Exception { RowData delete1 = SimpleDataUtil.createDelete(1, "aaa"); DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete1)); - result = WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(); - harness.processElement(result, ++timestamp); + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile2) + .addDeleteFiles(deleteFile1) + .build(), + ++timestamp); assertMaxCommittedCheckpointId(jobId, checkpoint); // 5. snapshotState for checkpoint#2 From 4e769b2c522c8ab7df7bf10eb267c758361e0a01 Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 17 Dec 2020 18:07:18 +0800 Subject: [PATCH 6/7] Add unit tests: addressing the case that commit two failure checkpoints in the lastest sucessful checkpoint. --- .../apache/iceberg/flink/SimpleDataUtil.java | 5 +- .../flink/sink/TestIcebergFilesCommitter.java | 71 ++++++++++++++++--- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 1035a6f3603c..da064eb057b5 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -46,7 +46,6 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; @@ -143,7 +142,7 @@ public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, Strin FileAppenderFactory appenderFactory, List deletes) throws IOException { EncryptedOutputFile outputFile = - table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), new Configuration())); + table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null); try (EqualityDeleteWriter writer = eqWriter) { @@ -157,7 +156,7 @@ public static DeleteFile writePosDeleteFile(Table table, FileFormat format, Stri FileAppenderFactory appenderFactory, List> positions) throws IOException { EncryptedOutputFile outputFile = - table.encryption().encrypt(HadoopOutputFile.fromPath(new Path(tablePath, filename), new Configuration())); + table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null); try (PositionDeleteWriter writer = posWriter) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index ecc7c3875f13..d9e191d7fc82 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -595,14 +595,7 @@ public void testDeleteFiles() throws Exception { long checkpoint = 10; JobID jobId = new JobID(); - - int[] equalityFieldIds = new int[] { - table.schema().findField("id").fieldId(), - table.schema().findField("data").fieldId() - }; - FileAppenderFactory appenderFactory = new FlinkAppenderFactory(table.schema(), - FlinkSchemaUtil.convert(table.schema()), table.properties(), table.spec(), equalityFieldIds, - table.schema(), null); + FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); @@ -658,11 +651,73 @@ public void testDeleteFiles() throws Exception { } } + @Test + public void testCommitTwoCheckpointsInSingleTxn() throws Exception { + Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + RowData delete3 = SimpleDataUtil.createDelete(3, "ccc"); + DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete3)); + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile1) + .addDeleteFiles(deleteFile1) + .build(), + ++timestamp); + + // The 1th snapshotState. + harness.snapshot(checkpoint, ++timestamp); + + RowData insert4 = SimpleDataUtil.createInsert(4, "ddd"); + RowData delete2 = SimpleDataUtil.createDelete(2, "bbb"); + DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert4)); + DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory, "delete-file-2", ImmutableList.of(delete2)); + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile2) + .addDeleteFiles(deleteFile2) + .build(), + ++timestamp); + + // The 2nd snapshotState. + harness.snapshot(++checkpoint, ++timestamp); + + // Notify the 2nd snapshot to complete. + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4)); + assertMaxCommittedCheckpointId(jobId, checkpoint); + assertFlinkManifests(0); + Assert.assertEquals("Should have committed 2 txn.", 2, ImmutableList.copyOf(table.snapshots()).size()); + } + } + private DeleteFile writeEqDeleteFile(FileAppenderFactory appenderFactory, String filename, List deletes) throws IOException { return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); } + private FileAppenderFactory createDeletableAppenderFactory() { + int[] equalityFieldIds = new int[] { + table.schema().findField("id").fieldId(), + table.schema().findField("data").fieldId() + }; + return new FlinkAppenderFactory(table.schema(), + FlinkSchemaUtil.convert(table.schema()), table.properties(), table.spec(), equalityFieldIds, + table.schema(), null); + } + private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null); From 0c6d008515af75c9f2cc13864c6fed4ead6b4915 Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 18 Dec 2020 11:58:53 +0800 Subject: [PATCH 7/7] Address the comments and add more unit tests. --- .../org/apache/iceberg/io/WriteResult.java | 1 + .../iceberg/flink/sink/DeltaManifests.java | 23 ++++-- .../flink/sink/DeltaManifestsSerializer.java | 15 +++- .../iceberg/flink/sink/FlinkManifestUtil.java | 5 +- .../flink/sink/IcebergFilesCommitter.java | 12 +++- .../flink/sink/TestIcebergFilesCommitter.java | 70 +++++++++++++++++++ 6 files changed, 115 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index af1528142d7f..e620a6164340 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -72,6 +72,7 @@ private Builder() { public Builder add(WriteResult result) { addDataFiles(result.dataFiles); addDeleteFiles(result.deleteFiles); + addReferencedDataFiles(result.referencedDataFiles); return this; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java index d46dff3d3151..866b785d7e1e 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -19,19 +19,29 @@ package org.apache.iceberg.flink.sink; -import java.util.Iterator; import java.util.List; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -class DeltaManifests implements Iterable { +class DeltaManifests { + + private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0]; private final ManifestFile dataManifest; private final ManifestFile deleteManifest; + private final CharSequence[] referencedDataFiles; DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { + this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES); + } + + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) { + Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null."); + this.dataManifest = dataManifest; this.deleteManifest = deleteManifest; + this.referencedDataFiles = referencedDataFiles; } ManifestFile dataManifest() { @@ -42,8 +52,11 @@ ManifestFile deleteManifest() { return deleteManifest; } - @Override - public Iterator iterator() { + CharSequence[] referencedDataFiles() { + return referencedDataFiles; + } + + List manifests() { List manifests = Lists.newArrayListWithCapacity(2); if (dataManifest != null) { manifests.add(dataManifest); @@ -53,6 +66,6 @@ public Iterator iterator() { manifests.add(deleteManifest); } - return manifests.iterator(); + return manifests; } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java index b37009b13f5d..859f97940116 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -64,6 +64,12 @@ public byte[] serialize(DeltaManifests deltaManifests) throws IOException { out.writeInt(deleteManifestBinary.length); out.write(deleteManifestBinary); + CharSequence[] referencedDataFiles = deltaManifests.referencedDataFiles(); + out.writeInt(referencedDataFiles.length); + for (int i = 0; i < referencedDataFiles.length; i++) { + out.writeUTF(referencedDataFiles[i].toString()); + } + return binaryOut.toByteArray(); } @@ -104,6 +110,13 @@ private DeltaManifests deserializeV2(byte[] serialized) throws IOException { deleteManifest = ManifestFiles.decode(deleteManifestBinary); } - return new DeltaManifests(dataManifest, deleteManifest); + + int referenceDataFileNum = in.readInt(); + CharSequence[] referencedDataFiles = new CharSequence[referenceDataFileNum]; + for (int i = 0; i < referenceDataFileNum; i++) { + referencedDataFiles[i] = in.readUTF(); + } + + return new DeltaManifests(dataManifest, deleteManifest, referencedDataFiles); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 70827e16e83d..b00018b3b770 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -94,7 +94,7 @@ static DeltaManifests writeCompletedFiles(WriteResult result, deleteManifest = deleteManifestWriter.toManifestFile(); } - return new DeltaManifests(dataManifest, deleteManifest); + return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); } static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException { @@ -113,6 +113,7 @@ static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) } } - return builder.build(); + return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()) + .build(); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 389d33392cac..c1d3440db41b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -50,7 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; @@ -201,7 +201,7 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, DeltaManifests deltaManifests = SimpleVersionedSerialization .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io())); - Iterables.addAll(manifests, deltaManifests); + manifests.addAll(deltaManifests.manifests()); } if (replacePartitions) { @@ -240,6 +240,8 @@ private void replacePartitions(NavigableMap pendingResults, S int numFiles = 0; for (WriteResult result : pendingResults.values()) { + Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); + numFiles += result.dataFiles().length; Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); } @@ -256,6 +258,8 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri int numFiles = 0; for (WriteResult result : pendingResults.values()) { + Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); + numFiles += result.dataFiles().length; Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } @@ -268,7 +272,9 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the // merged one will lead to the incorrect delete semantic. WriteResult result = e.getValue(); - RowDelta rowDelta = table.newRowDelta(); + RowDelta rowDelta = table.newRowDelta() + .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())) + .validateDeletedFiles(); int numDataFiles = result.dataFiles().length; Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index d9e191d7fc82..5ec246b7af07 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -40,6 +40,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -50,6 +51,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableTestBase; import org.apache.iceberg.TestTables; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; @@ -57,6 +59,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -651,6 +654,66 @@ public void testDeleteFiles() throws Exception { } } + @Test + public void testValidateDataFileExist() throws Exception { + Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + long timestamp = 0; + long checkpoint = 10; + JobID jobId = new JobID(); + FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1)); + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + // Txn#1: insert the row <1, 'aaa'> + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile1) + .build(), + ++timestamp); + harness.snapshot(checkpoint, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpoint); + + // Txn#2: Overwrite the committed data-file-1 + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2)); + new TestTableLoader(tablePath) + .loadTable() + .newOverwrite() + .addFile(dataFile2) + .deleteFile(dataFile1) + .commit(); + } + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + // Txn#3: position-delete the <1, 'aaa'> (NOT committed). + DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory, + "pos-delete-file-1", + ImmutableList.of(Pair.of(dataFile1.path(), 0L))); + harness.processElement(WriteResult.builder() + .addDeleteFiles(deleteFile1) + .addReferencedDataFiles(dataFile1.path()) + .build(), + ++timestamp); + harness.snapshot(++checkpoint, ++timestamp); + + // Txn#3: validate will be failure when committing. + final long currentCheckpointId = checkpoint; + AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.", + ValidationException.class, "Cannot commit, missing data files", + () -> { + harness.notifyOfCompletedCheckpoint(currentCheckpointId); + return null; + }); + } + } + @Test public void testCommitTwoCheckpointsInSingleTxn() throws Exception { Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); @@ -708,6 +771,13 @@ private DeleteFile writeEqDeleteFile(FileAppenderFactory appenderFactor return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); } + private DeleteFile writePosDeleteFile(FileAppenderFactory appenderFactory, + String filename, + List> positions) throws IOException { + return SimpleDataUtil.writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, + positions); + } + private FileAppenderFactory createDeletableAppenderFactory() { int[] equalityFieldIds = new int[] { table.schema().findField("id").fieldId(),