diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index 681370878ca6..e620a6164340 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -72,10 +72,16 @@ private Builder() { public Builder add(WriteResult result) { addDataFiles(result.dataFiles); addDeleteFiles(result.deleteFiles); + addReferencedDataFiles(result.referencedDataFiles); return this; } + public Builder addAll(Iterable results) { + results.forEach(this::add); + return this; + } + public Builder addDataFiles(DataFile... files) { Collections.addAll(dataFiles, files); return this; diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java new file mode 100644 index 000000000000..866b785d7e1e --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +class DeltaManifests { + + private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0]; + + private final ManifestFile dataManifest; + private final ManifestFile deleteManifest; + private final CharSequence[] referencedDataFiles; + + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { + this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES); + } + + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) { + Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null."); + + this.dataManifest = dataManifest; + this.deleteManifest = deleteManifest; + this.referencedDataFiles = referencedDataFiles; + } + + ManifestFile dataManifest() { + return dataManifest; + } + + ManifestFile deleteManifest() { + return deleteManifest; + } + + CharSequence[] referencedDataFiles() { + return referencedDataFiles; + } + + List manifests() { + List manifests = Lists.newArrayListWithCapacity(2); + if (dataManifest != null) { + manifests.add(dataManifest); + } + + if (deleteManifest != null) { + manifests.add(deleteManifest); + } + + return manifests; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java new file mode 100644 index 000000000000..859f97940116 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class DeltaManifestsSerializer implements SimpleVersionedSerializer { + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; + private static final byte[] EMPTY_BINARY = new byte[0]; + + static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer(); + + @Override + public int getVersion() { + return VERSION_2; + } + + @Override + public byte[] serialize(DeltaManifests deltaManifests) throws IOException { + Preconditions.checkNotNull(deltaManifests, "DeltaManifests to be serialized should not be null"); + + ByteArrayOutputStream binaryOut = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(binaryOut); + + byte[] dataManifestBinary = EMPTY_BINARY; + if (deltaManifests.dataManifest() != null) { + dataManifestBinary = ManifestFiles.encode(deltaManifests.dataManifest()); + } + + out.writeInt(dataManifestBinary.length); + out.write(dataManifestBinary); + + byte[] deleteManifestBinary = EMPTY_BINARY; + if (deltaManifests.deleteManifest() != null) { + deleteManifestBinary = ManifestFiles.encode(deltaManifests.deleteManifest()); + } + + out.writeInt(deleteManifestBinary.length); + out.write(deleteManifestBinary); + + CharSequence[] referencedDataFiles = deltaManifests.referencedDataFiles(); + out.writeInt(referencedDataFiles.length); + for (int i = 0; i < referencedDataFiles.length; i++) { + out.writeUTF(referencedDataFiles[i].toString()); + } + + return binaryOut.toByteArray(); + } + + @Override + public DeltaManifests deserialize(int version, byte[] serialized) throws IOException { + if (version == VERSION_1) { + return deserializeV1(serialized); + } else if (version == VERSION_2) { + return deserializeV2(serialized); + } else { + throw new RuntimeException("Unknown serialize version: " + version); + } + } + + private DeltaManifests deserializeV1(byte[] serialized) throws IOException { + return new DeltaManifests(ManifestFiles.decode(serialized), null); + } + + private DeltaManifests deserializeV2(byte[] serialized) throws IOException { + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + + ByteArrayInputStream binaryIn = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(binaryIn); + + int dataManifestSize = in.readInt(); + if (dataManifestSize > 0) { + byte[] dataManifestBinary = new byte[dataManifestSize]; + Preconditions.checkState(in.read(dataManifestBinary) == dataManifestSize); + + dataManifest = ManifestFiles.decode(dataManifestBinary); + } + + int deleteManifestSize = in.readInt(); + if (deleteManifestSize > 0) { + byte[] deleteManifestBinary = new byte[deleteManifestSize]; + Preconditions.checkState(in.read(deleteManifestBinary) == deleteManifestSize); + + deleteManifest = ManifestFiles.decode(deleteManifestBinary); + } + + int referenceDataFileNum = in.readInt(); + CharSequence[] referencedDataFiles = new CharSequence[referenceDataFileNum]; + for (int i = 0; i < referenceDataFileNum; i++) { + referencedDataFiles[i] = in.readUTF(); + } + + return new DeltaManifests(dataManifest, deleteManifest, referencedDataFiles); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java deleted file mode 100644 index bec4e65d0cad..000000000000 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestSerializer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; - -class FlinkManifestSerializer implements SimpleVersionedSerializer { - private static final int VERSION_NUM = 1; - static final FlinkManifestSerializer INSTANCE = new FlinkManifestSerializer(); - - @Override - public int getVersion() { - return VERSION_NUM; - } - - @Override - public byte[] serialize(ManifestFile manifestFile) throws IOException { - Preconditions.checkNotNull(manifestFile, "ManifestFile to be serialized should not be null"); - - return ManifestFiles.encode(manifestFile); - } - - @Override - public ManifestFile deserialize(int version, byte[] serialized) throws IOException { - return ManifestFiles.decode(serialized); - } -} diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index aa36c7344e67..b00018b3b770 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.List; +import java.util.function.Supplier; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; @@ -32,10 +34,11 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.Lists; class FlinkManifestUtil { - private static final int ICEBERG_FORMAT_VERSION = 2; + private static final int FORMAT_V2 = 2; private static final Long DUMMY_SNAPSHOT_ID = 0L; private FlinkManifestUtil() { @@ -43,7 +46,7 @@ private FlinkManifestUtil() { static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { - ManifestWriter writer = ManifestFiles.write(ICEBERG_FORMAT_VERSION, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestWriter writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -63,4 +66,54 @@ static ManifestOutputFileFactory createOutputFileFactory(Table table, String fli TableOperations ops = ((HasTableOperations) table).operations(); return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); } + + static DeltaManifests writeCompletedFiles(WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec) throws IOException { + + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + + // Write the completed data files into a newly created data manifest file. + if (result.dataFiles() != null && result.dataFiles().length > 0) { + dataManifest = writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + } + + // Write the completed delete files into a newly created delete manifest file. + if (result.deleteFiles() != null && result.deleteFiles().length > 0) { + OutputFile deleteManifestFile = outputFileSupplier.get(); + + ManifestWriter deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, + deleteManifestFile, DUMMY_SNAPSHOT_ID); + try (ManifestWriter writer = deleteManifestWriter) { + for (DeleteFile deleteFile : result.deleteFiles()) { + writer.add(deleteFile); + } + } + + deleteManifest = deleteManifestWriter.toManifestFile(); + } + + return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); + } + + static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException { + WriteResult.Builder builder = WriteResult.builder(); + + // Read the completed data files from persisted data manifest file. + if (deltaManifests.dataManifest() != null) { + builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io)); + } + + // Read the completed delete files from persisted delete manifests file. + if (deltaManifests.deleteManifest() != null) { + try (CloseableIterable deleteFiles = ManifestFiles + .readDeleteManifest(deltaManifests.deleteManifest(), io, null)) { + builder.addDeleteFiles(deleteFiles); + } + } + + return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()) + .build(); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 98f3b5ba3dbd..5ca68c4713b2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -42,6 +42,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; @@ -189,7 +190,7 @@ public DataStreamSink build() { this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism; DataStream returnStream = rowDataInput - .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) + .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter) .setParallelism(writeParallelism) .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter) .setParallelism(1) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ad890613a073..c1d3440db41b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -39,17 +39,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; @@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory; class IcebergFilesCommitter extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { private static final long serialVersionUID = 1L; private static final long INITIAL_CHECKPOINT_ID = -1L; @@ -85,9 +86,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator // iceberg table when the next checkpoint happen. private final NavigableMap dataFilesPerCheckpoint = Maps.newTreeMap(); - // The data files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the + // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the // 'dataFilesPerCheckpoint'. - private final List dataFilesOfCurrentCheckpoint = Lists.newArrayList(); + private final List writeResultsOfCurrentCkpt = Lists.newArrayList(); // It will have an unique identifier for one job. private transient String flinkJobId; @@ -165,7 +166,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception { jobIdState.add(flinkJobId); // Clear the local buffer for current checkpoint. - dataFilesOfCurrentCheckpoint.clear(); + writeResultsOfCurrentCkpt.clear(); } @Override @@ -184,44 +185,43 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } - private void commitUpToCheckpoint(NavigableMap manifestsMap, + private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException { - NavigableMap pendingManifestMap = manifestsMap.headMap(checkpointId, true); + NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); - List manifestFiles = Lists.newArrayList(); - List pendingDataFiles = Lists.newArrayList(); - for (byte[] manifestData : pendingManifestMap.values()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) { + List manifests = Lists.newArrayList(); + NavigableMap pendingResults = Maps.newTreeMap(); + for (Map.Entry e : pendingMap.entrySet()) { + if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) { // Skip the empty flink manifest. continue; } - ManifestFile manifestFile = - SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData); - - manifestFiles.add(manifestFile); - pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile, table.io())); + DeltaManifests deltaManifests = SimpleVersionedSerialization + .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); + pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io())); + manifests.addAll(deltaManifests.manifests()); } if (replacePartitions) { - replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId); + replacePartitions(pendingResults, newFlinkJobId, checkpointId); } else { - append(pendingDataFiles, newFlinkJobId, checkpointId); + commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId); } - pendingManifestMap.clear(); + pendingMap.clear(); - // Delete the committed manifests and clear the committed data files from dataFilesPerCheckpoint. - for (ManifestFile manifestFile : manifestFiles) { + // Delete the committed manifests. + for (ManifestFile manifest : manifests) { try { - table.io().deleteFile(manifestFile.path()); + table.io().deleteFile(manifest.path()); } catch (Exception e) { // The flink manifests cleaning failure shouldn't abort the completed checkpoint. String details = MoreObjects.toStringHelper(this) .add("flinkJobId", newFlinkJobId) .add("checkpointId", checkpointId) - .add("manifestPath", manifestFile.path()) + .add("manifestPath", manifest.path()) .toString(); LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, e); @@ -229,33 +229,68 @@ private void commitUpToCheckpoint(NavigableMap manifestsMap, } } - private void replacePartitions(List dataFiles, String newFlinkJobId, long checkpointId) { + private void replacePartitions(NavigableMap pendingResults, String newFlinkJobId, + long checkpointId) { + // Partition overwrite does not support delete files. + int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum(); + Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files."); + + // Commit the overwrite transaction. ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); int numFiles = 0; - for (DataFile file : dataFiles) { - numFiles += 1; - dynamicOverwrite.addFile(file); + for (WriteResult result : pendingResults.values()) { + Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); + + numFiles += result.dataFiles().length; + Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); } - commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", newFlinkJobId, checkpointId); + commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId); } - private void append(List dataFiles, String newFlinkJobId, long checkpointId) { - AppendFiles appendFiles = table.newAppend(); + private void commitDeltaTxn(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) { + int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum(); - int numFiles = 0; - for (DataFile file : dataFiles) { - numFiles += 1; - appendFiles.appendFile(file); - } + if (deleteFilesNum == 0) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend(); + + int numFiles = 0; + for (WriteResult result : pendingResults.values()) { + Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); - commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId); + numFiles += result.dataFiles().length; + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); + } + + commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId); + } else { + // To be compatible with iceberg format V2. + for (Map.Entry e : pendingResults.entrySet()) { + // We don't commit the merged result into a single transaction because for the sequential transaction txn1 and + // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the + // merged one will lead to the incorrect delete semantic. + WriteResult result = e.getValue(); + RowDelta rowDelta = table.newRowDelta() + .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())) + .validateDeletedFiles(); + + int numDataFiles = result.dataFiles().length; + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + + int numDeleteFiles = result.deleteFiles().length; + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + + commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey()); + } + } } - private void commitOperation(SnapshotUpdate operation, int numFiles, String description, + private void commitOperation(SnapshotUpdate operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) { - LOG.info("Committing {} with {} files to table {}", description, numFiles, table); + LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles, + numDeleteFiles, table); operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); operation.set(FLINK_JOB_ID, newFlinkJobId); @@ -266,8 +301,8 @@ private void commitOperation(SnapshotUpdate operation, int numFiles, String d } @Override - public void processElement(StreamRecord element) { - this.dataFilesOfCurrentCheckpoint.add(element.getValue()); + public void processElement(StreamRecord element) { + this.writeResultsOfCurrentCkpt.add(element.getValue()); } @Override @@ -275,7 +310,7 @@ public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. long currentCheckpointId = Long.MAX_VALUE; dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); - dataFilesOfCurrentCheckpoint.clear(); + writeResultsOfCurrentCkpt.clear(); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId); } @@ -284,14 +319,15 @@ public void endInput() throws IOException { * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized bytes. */ private byte[] writeToManifest(long checkpointId) throws IOException { - if (dataFilesOfCurrentCheckpoint.isEmpty()) { + if (writeResultsOfCurrentCkpt.isEmpty()) { return EMPTY_MANIFEST_DATA; } - OutputFile manifestOutputFile = manifestOutputFileFactory.create(checkpointId); - ManifestFile manifestFile = - FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), dataFilesOfCurrentCheckpoint); - return SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, manifestFile); + WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); + DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, + () -> manifestOutputFileFactory.create(checkpointId), table.spec()); + + return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index ef8b93619d19..6d12310dd1dc 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -25,12 +25,12 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.iceberg.DataFile; import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -class IcebergStreamWriter extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { +class IcebergStreamWriter extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { private static final long serialVersionUID = 1L; @@ -62,9 +62,7 @@ public void open() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // close all open files and emit files to downstream committer operator - for (DataFile dataFile : writer.dataFiles()) { - emit(dataFile); - } + emit(writer.complete()); this.writer = taskWriterFactory.create(); } @@ -86,10 +84,8 @@ public void dispose() throws Exception { @Override public void endInput() throws IOException { // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining - // data files to downstream before closing the writer so that we won't miss any of them. - for (DataFile dataFile : writer.dataFiles()) { - emit(dataFile); - } + // completed files to downstream before closing the writer so that we won't miss any of them. + emit(writer.complete()); } @Override @@ -101,7 +97,7 @@ public String toString() { .toString(); } - private void emit(DataFile dataFile) { - output.collect(new StreamRecord<>(dataFile)); + private void emit(WriteResult result) { + output.collect(new StreamRecord<>(result)); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index dc6528a289ee..da064eb057b5 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -40,6 +41,9 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; @@ -51,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.junit.Assert; import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; @@ -133,14 +138,51 @@ public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuratio .build(); } - public static void assertTableRows(String tablePath, List expected) throws IOException { - List expectedRecords = Lists.newArrayList(); - for (RowData row : expected) { + public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String tablePath, String filename, + FileAppenderFactory appenderFactory, + List deletes) throws IOException { + EncryptedOutputFile outputFile = + table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); + + EqualityDeleteWriter eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null); + try (EqualityDeleteWriter writer = eqWriter) { + writer.deleteAll(deletes); + } + return eqWriter.toDeleteFile(); + } + + public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String tablePath, + String filename, + FileAppenderFactory appenderFactory, + List> positions) throws IOException { + EncryptedOutputFile outputFile = + table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); + + PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null); + try (PositionDeleteWriter writer = posWriter) { + for (Pair p : positions) { + writer.delete(p.first(), p.second()); + } + } + return posWriter.toDeleteFile(); + } + + private static List convertToRecords(List rows) { + List records = Lists.newArrayList(); + for (RowData row : rows) { Integer id = row.isNullAt(0) ? null : row.getInt(0); String data = row.isNullAt(1) ? null : row.getString(1).toString(); - expectedRecords.add(createRecord(id, data)); + records.add(createRecord(id, data)); } - assertTableRecords(tablePath, expectedRecords); + return records; + } + + public static void assertTableRows(String tablePath, List expected) throws IOException { + assertTableRecords(tablePath, convertToRecords(expected)); + } + + public static void assertTableRows(Table table, List expected) throws IOException { + assertTableRecords(table, convertToRecords(expected)); } public static void assertTableRecords(Table table, List expected) throws IOException { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 9fd15070d7fa..b8f121e4f63d 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -27,17 +27,24 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -54,7 +61,8 @@ public class TestFlinkManifest { private String tablePath; private Table table; - private final AtomicInteger dataFileCount = new AtomicInteger(0); + private FileAppenderFactory appenderFactory; + private final AtomicInteger fileCount = new AtomicInteger(0); @Before public void before() throws IOException { @@ -66,6 +74,13 @@ public void before() throws IOException { // Construct the iceberg table. table = SimpleDataUtil.createTable(tablePath, ImmutableMap.of(), false); + + int[] equalityFieldIds = new int[] { + table.schema().findField("id").fieldId(), + table.schema().findField("data").fieldId() + }; + this.appenderFactory = new FlinkAppenderFactory(table.schema(), FlinkSchemaUtil.convert(table.schema()), + table.properties(), table.spec(), equalityFieldIds, table.schema(), null); } @@ -75,16 +90,30 @@ public void testIO() throws IOException { for (long checkpointId = 1; checkpointId <= 3; checkpointId++) { ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); - OutputFile manifestOutputFile = factory.create(checkpointId); - - List expectedDataFiles = generateDataFiles(10); - ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(manifestOutputFile, table.spec(), expectedDataFiles); + final long curCkpId = checkpointId; - List actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io()); + List dataFiles = generateDataFiles(10); + List eqDeleteFiles = generateEqDeleteFiles(5); + List posDeleteFiles = generatePosDeleteFiles(5); + DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( + WriteResult.builder() + .addDataFiles(dataFiles) + .addDeleteFiles(eqDeleteFiles) + .addDeleteFiles(posDeleteFiles) + .build(), + () -> factory.create(curCkpId), table.spec()); - Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size()); - for (int i = 0; i < expectedDataFiles.size(); i++) { - checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i)); + WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()); + Assert.assertEquals("Size of data file list are not equal.", 10, result.deleteFiles().length); + for (int i = 0; i < dataFiles.size(); i++) { + checkContentFile(dataFiles.get(i), result.dataFiles()[i]); + } + Assert.assertEquals("Size of delete file list are not equal.", 10, result.dataFiles().length); + for (int i = 0; i < 5; i++) { + checkContentFile(eqDeleteFiles.get(i), result.deleteFiles()[i]); + } + for (int i = 0; i < 5; i++) { + checkContentFile(posDeleteFiles.get(i), result.deleteFiles()[5 + i]); } } } @@ -99,18 +128,27 @@ public void testUserProvidedManifestLocation() throws IOException { ((HasTableOperations) table).operations(), table.io(), props, flinkJobId, 1, 1); - OutputFile outputFile = factory.create(checkpointId); - List expectedDataFiles = generateDataFiles(5); - ManifestFile manifestFile = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles); + List dataFiles = generateDataFiles(5); + DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( + WriteResult.builder() + .addDataFiles(dataFiles) + .build(), + () -> factory.create(checkpointId), + table.spec()); + Assert.assertNotNull("Data manifest shouldn't be null", deltaManifests.dataManifest()); + Assert.assertNull("Delete manifest should be null", deltaManifests.deleteManifest()); Assert.assertEquals("The newly created manifest file should be located under the user provided directory", - userProvidedFolder.toPath(), Paths.get(manifestFile.path()).getParent()); + userProvidedFolder.toPath(), Paths.get(deltaManifests.dataManifest().path()).getParent()); + + WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()); - List actualDataFiles = FlinkManifestUtil.readDataFiles(manifestFile, table.io()); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(5, result.dataFiles().length); - Assert.assertEquals("Size of data file list are not equal.", expectedDataFiles.size(), actualDataFiles.size()); - for (int i = 0; i < expectedDataFiles.size(); i++) { - checkDataFile(expectedDataFiles.get(i), actualDataFiles.get(i)); + Assert.assertEquals("Size of data file list are not equal.", dataFiles.size(), result.dataFiles().length); + for (int i = 0; i < dataFiles.size(); i++) { + checkContentFile(dataFiles.get(i), result.dataFiles()[i]); } } @@ -119,42 +157,126 @@ public void testVersionedSerializer() throws IOException { long checkpointId = 1; String flinkJobId = newFlinkJobId(); ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); - OutputFile outputFile = factory.create(checkpointId); - List expectedDataFiles = generateDataFiles(10); - ManifestFile expected = FlinkManifestUtil.writeDataFiles(outputFile, table.spec(), expectedDataFiles); + List dataFiles = generateDataFiles(10); + List eqDeleteFiles = generateEqDeleteFiles(10); + List posDeleteFiles = generatePosDeleteFiles(10); + DeltaManifests expected = FlinkManifestUtil.writeCompletedFiles( + WriteResult.builder() + .addDataFiles(dataFiles) + .addDeleteFiles(eqDeleteFiles) + .addDeleteFiles(posDeleteFiles) + .build(), + () -> factory.create(checkpointId), table.spec()); byte[] versionedSerializeData = - SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, expected); - ManifestFile actual = SimpleVersionedSerialization - .readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, versionedSerializeData); - checkManifestFile(expected, actual); + SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, expected); + DeltaManifests actual = SimpleVersionedSerialization + .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, versionedSerializeData); + checkManifestFile(expected.dataManifest(), actual.dataManifest()); + checkManifestFile(expected.deleteManifest(), actual.deleteManifest()); byte[] versionedSerializeData2 = - SimpleVersionedSerialization.writeVersionAndSerialize(FlinkManifestSerializer.INSTANCE, actual); + SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, actual); Assert.assertArrayEquals(versionedSerializeData, versionedSerializeData2); } + @Test + public void testCompatibility() throws IOException { + // The v2 deserializer should be able to deserialize the v1 binary. + long checkpointId = 1; + String flinkJobId = newFlinkJobId(); + ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + + List dataFiles = generateDataFiles(10); + ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles); + byte[] dataV1 = SimpleVersionedSerialization.writeVersionAndSerialize(new V1Serializer(), manifest); + + DeltaManifests delta = + SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, dataV1); + Assert.assertNull("Serialization v1 don't include delete files.", delta.deleteManifest()); + Assert.assertNotNull("Serialization v1 should not have null data manifest.", delta.dataManifest()); + checkManifestFile(manifest, delta.dataManifest()); + + List actualFiles = FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io()); + Assert.assertEquals(10, actualFiles.size()); + for (int i = 0; i < 10; i++) { + checkContentFile(dataFiles.get(i), actualFiles.get(i)); + } + } + + private static class V1Serializer implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(ManifestFile m) throws IOException { + return ManifestFiles.encode(m); + } + + @Override + public ManifestFile deserialize(int version, byte[] serialized) throws IOException { + return ManifestFiles.decode(serialized); + } + } + private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, tablePath, FileFormat.PARQUET.addExtension(filename), rows); } + private DeleteFile writeEqDeleteFile(String filename, List deletes) throws IOException { + return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); + } + + private DeleteFile writePosDeleteFile(String filename, List> positions) + throws IOException { + return SimpleDataUtil + .writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, positions); + } + private List generateDataFiles(int fileNum) throws IOException { List rowDataList = Lists.newArrayList(); List dataFiles = Lists.newArrayList(); for (int i = 0; i < fileNum; i++) { rowDataList.add(SimpleDataUtil.createRowData(i, "a" + i)); - dataFiles.add(writeDataFile("data-file-" + dataFileCount.incrementAndGet(), rowDataList)); + dataFiles.add(writeDataFile("data-file-" + fileCount.incrementAndGet(), rowDataList)); } return dataFiles; } + private List generateEqDeleteFiles(int fileNum) throws IOException { + List rowDataList = Lists.newArrayList(); + List deleteFiles = Lists.newArrayList(); + for (int i = 0; i < fileNum; i++) { + rowDataList.add(SimpleDataUtil.createDelete(i, "a" + i)); + deleteFiles.add(writeEqDeleteFile("eq-delete-file-" + fileCount.incrementAndGet(), rowDataList)); + } + return deleteFiles; + } + + private List generatePosDeleteFiles(int fileNum) throws IOException { + List> positions = Lists.newArrayList(); + List deleteFiles = Lists.newArrayList(); + for (int i = 0; i < fileNum; i++) { + positions.add(Pair.of("data-file-1", (long) i)); + deleteFiles.add(writePosDeleteFile("pos-delete-file-" + fileCount.incrementAndGet(), positions)); + } + return deleteFiles; + } + private static String newFlinkJobId() { return UUID.randomUUID().toString(); } private static void checkManifestFile(ManifestFile expected, ManifestFile actual) { + if (expected == actual) { + return; + } + Assert.assertTrue("Should not be null.", expected != null && actual != null); Assert.assertEquals("Path must match", expected.path(), actual.path()); Assert.assertEquals("Length must match", expected.length(), actual.length()); Assert.assertEquals("Spec id must match", expected.partitionSpecId(), actual.partitionSpecId()); @@ -174,11 +296,11 @@ private static void checkManifestFile(ManifestFile expected, ManifestFile actual Assert.assertEquals("PartitionFieldSummary must match", expected.partitions(), actual.partitions()); } - static void checkDataFile(DataFile expected, DataFile actual) { + static void checkContentFile(ContentFile expected, ContentFile actual) { if (expected == actual) { return; } - Assert.assertTrue("Shouldn't have null DataFile.", expected != null && actual != null); + Assert.assertTrue("Shouldn't be null.", expected != null && actual != null); Assert.assertEquals("SpecId", expected.specId(), actual.specId()); Assert.assertEquals("Content", expected.content(), actual.content()); Assert.assertEquals("Path", expected.path(), actual.path()); @@ -193,7 +315,6 @@ static void checkDataFile(DataFile expected, DataFile actual) { Assert.assertEquals("Upper bounds", expected.upperBounds(), actual.upperBounds()); Assert.assertEquals("Key metadata", expected.keyMetadata(), actual.keyMetadata()); Assert.assertEquals("Split offsets", expected.splitOffsets(), actual.splitOffsets()); - Assert.assertNull(actual.equalityFieldIds()); - Assert.assertNull(expected.equalityFieldIds()); + Assert.assertEquals("Equality field id list", actual.equalityFieldIds(), expected.equalityFieldIds()); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 583b6f18e336..5ec246b7af07 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -25,7 +25,6 @@ import java.nio.file.Path; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.stream.Collectors; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -41,22 +40,30 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.GenericManifestFile; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -64,48 +71,47 @@ import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION; @RunWith(Parameterized.class) -public class TestIcebergFilesCommitter { +public class TestIcebergFilesCommitter extends TableTestBase { private static final Configuration CONF = new Configuration(); - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - private String tablePath; - private Table table; private File flinkManifestFolder; private final FileFormat format; - @Parameterized.Parameters(name = "format = {0}") + @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}") public static Object[][] parameters() { return new Object[][] { - new Object[] {"avro"}, - new Object[] {"orc"}, - new Object[] {"parquet"} + new Object[] {"avro", 1}, + new Object[] {"avro", 2}, + new Object[] {"parquet", 1}, + new Object[] {"parquet", 2}, + new Object[] {"orc", 1}, }; } - public TestIcebergFilesCommitter(String format) { + public TestIcebergFilesCommitter(String format, int formatVersion) { + super(formatVersion); this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); } @Before - public void before() throws IOException { - File folder = tempFolder.newFolder(); - flinkManifestFolder = tempFolder.newFolder(); - String warehouse = folder.getAbsolutePath(); + public void setupTable() throws IOException { + flinkManifestFolder = temp.newFolder(); + + this.tableDir = temp.newFolder(); + this.metadataDir = new File(tableDir, "metadata"); + Assert.assertTrue(tableDir.delete()); - tablePath = warehouse.concat("/test"); - Assert.assertTrue("Should create the table directory correctly.", new File(tablePath).mkdir()); + tablePath = tableDir.getAbsolutePath(); // Construct the iceberg table. - Map props = ImmutableMap.of( - // file format. - DEFAULT_FILE_FORMAT, format.name(), - // temporary flink manifests location. - FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath() - ); - table = SimpleDataUtil.createTable(tablePath, props, false); + table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); + + table.updateProperties() + .set(DEFAULT_FILE_FORMAT, format.name()) + .set(FLINK_MANIFEST_LOCATION, flinkManifestFolder.getAbsolutePath()) + .commit(); } @Test @@ -113,11 +119,11 @@ public void testCommitTxnWithoutDataFiles() throws Exception { long checkpointId = 0; long timestamp = 0; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); - SimpleDataUtil.assertTableRows(tablePath, Lists.newArrayList()); + SimpleDataUtil.assertTableRows(table, Lists.newArrayList()); assertSnapshotSize(0); assertMaxCommittedCheckpointId(jobId, -1L); @@ -136,6 +142,10 @@ public void testCommitTxnWithoutDataFiles() throws Exception { } } + private WriteResult of(DataFile dataFile) { + return WriteResult.builder().addDataFiles(dataFile).build(); + } + @Test public void testCommitTxn() throws Exception { // Test with 3 continues checkpoints: @@ -148,7 +158,7 @@ public void testCommitTxn() throws Exception { long timestamp = 0; JobID jobID = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { harness.setup(); harness.open(); assertSnapshotSize(0); @@ -157,7 +167,7 @@ public void testCommitTxn() throws Exception { for (int i = 1; i <= 3; i++) { RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i); DataFile dataFile = writeDataFile("data-" + i, ImmutableList.of(rowData)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); rows.add(rowData); harness.snapshot(i, ++timestamp); @@ -166,7 +176,7 @@ public void testCommitTxn() throws Exception { harness.notifyOfCompletedCheckpoint(i); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows)); + SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows)); assertSnapshotSize(i); assertMaxCommittedCheckpointId(jobID, i); } @@ -183,7 +193,7 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { long timestamp = 0; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -192,7 +202,7 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -202,7 +212,7 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { RowData row2 = SimpleDataUtil.createRowData(2, "world"); DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); - harness.processElement(dataFile2, ++timestamp); + harness.processElement(of(dataFile2), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 2. snapshotState for checkpoint#2 @@ -212,13 +222,13 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(firstCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); assertMaxCommittedCheckpointId(jobId, firstCheckpointId); assertFlinkManifests(1); // 4. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(secondCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); assertMaxCommittedCheckpointId(jobId, secondCheckpointId); assertFlinkManifests(0); } @@ -234,7 +244,7 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { long timestamp = 0; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -243,7 +253,7 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -253,7 +263,7 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { RowData row2 = SimpleDataUtil.createRowData(2, "world"); DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); - harness.processElement(dataFile2, ++timestamp); + harness.processElement(of(dataFile2), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 2. snapshotState for checkpoint#2 @@ -263,13 +273,13 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { // 3. notifyCheckpointComplete for checkpoint#2 harness.notifyOfCompletedCheckpoint(secondCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); assertMaxCommittedCheckpointId(jobId, secondCheckpointId); assertFlinkManifests(0); // 4. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(firstCheckpointId); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2)); assertMaxCommittedCheckpointId(jobId, secondCheckpointId); assertFlinkManifests(0); } @@ -283,7 +293,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { OperatorSubtaskState snapshot; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -294,32 +304,32 @@ public void testRecoveryFromValidSnapshot() throws Exception { expectedRows.add(row); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row)); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, checkpointId); } // Restore from the given snapshot - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.initializeState(snapshot); harness.open(); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, checkpointId); RowData row = SimpleDataUtil.createRowData(2, "world"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -327,7 +337,7 @@ public void testRecoveryFromValidSnapshot() throws Exception { harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, checkpointId); } @@ -342,7 +352,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except OperatorSubtaskState snapshot; List expectedRows = Lists.newArrayList(); JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -352,15 +362,15 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(1, "hello"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-1", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of()); + SimpleDataUtil.assertTableRows(table, ImmutableList.of()); assertMaxCommittedCheckpointId(jobId, -1L); assertFlinkManifests(1); } - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -368,7 +378,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except // All flink manifests should be cleaned because it has committed the unfinished iceberg transaction. assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertMaxCommittedCheckpointId(jobId, checkpointId); harness.snapshot(++checkpointId, ++timestamp); @@ -378,14 +388,14 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(2); assertMaxCommittedCheckpointId(jobId, checkpointId); RowData row = SimpleDataUtil.createRowData(2, "world"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); snapshot = harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -393,7 +403,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except // Redeploying flink job from external checkpoint. JobID newJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { harness.setup(); harness.initializeState(snapshot); harness.open(); @@ -403,13 +413,13 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except assertMaxCommittedCheckpointId(newJobId, -1); assertMaxCommittedCheckpointId(jobId, checkpointId); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(3); RowData row = SimpleDataUtil.createRowData(3, "foo"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-3", ImmutableList.of(row)); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); @@ -417,7 +427,7 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, expectedRows); + SimpleDataUtil.assertTableRows(table, expectedRows); assertSnapshotSize(4); assertMaxCommittedCheckpointId(newJobId, checkpointId); } @@ -431,7 +441,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { List tableRows = Lists.newArrayList(); JobID oldJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(oldJobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(oldJobId)) { harness.setup(); harness.open(); @@ -443,14 +453,14 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(i); assertMaxCommittedCheckpointId(oldJobId, checkpointId); } @@ -460,7 +470,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { checkpointId = 0; timestamp = 0; JobID newJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { harness.setup(); harness.open(); @@ -472,13 +482,13 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile("data-new-1", rows); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(++checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(4); assertMaxCommittedCheckpointId(newJobId, checkpointId); } @@ -494,7 +504,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { int jobIndex = i % 3; int checkpointId = i / 3; JobID jobId = jobs[jobIndex]; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -505,13 +515,13 @@ public void testMultipleJobsWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); - harness.processElement(dataFile, ++timestamp); + harness.processElement(of(dataFile), ++timestamp); harness.snapshot(checkpointId + 1, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId + 1); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(i + 1); assertMaxCommittedCheckpointId(jobId, checkpointId + 1); } @@ -521,7 +531,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { @Test public void testBoundedStream() throws Exception { JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -532,11 +542,11 @@ public void testBoundedStream() throws Exception { List tableRows = Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1")); DataFile dataFile = writeDataFile("data-1", tableRows); - harness.processElement(dataFile, 1); + harness.processElement(of(dataFile), 1); ((BoundedOneInput) harness.getOneInputOperator()).endInput(); assertFlinkManifests(0); - SimpleDataUtil.assertTableRows(tablePath, tableRows); + SimpleDataUtil.assertTableRows(table, tableRows); assertSnapshotSize(1); assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE); } @@ -548,7 +558,7 @@ public void testFlinkManifests() throws Exception { final long checkpoint = 10; JobID jobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -557,7 +567,7 @@ public void testFlinkManifests() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(dataFile1, ++timestamp); + harness.processElement(of(dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, -1L); // 1. snapshotState for checkpoint#1 @@ -570,16 +580,214 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); Assert.assertEquals(1, dataFiles.size()); - TestFlinkManifest.checkDataFile(dataFile1, dataFiles.get(0)); + TestFlinkManifest.checkContentFile(dataFile1, dataFiles.get(0)); // 3. notifyCheckpointComplete for checkpoint#1 harness.notifyOfCompletedCheckpoint(checkpoint); - SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1)); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + assertMaxCommittedCheckpointId(jobId, checkpoint); + assertFlinkManifests(0); + } + } + + @Test + public void testDeleteFiles() throws Exception { + Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData row1 = SimpleDataUtil.createInsert(1, "aaa"); + DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(row1)); + harness.processElement(of(dataFile1), ++timestamp); + assertMaxCommittedCheckpointId(jobId, -1L); + + // 1. snapshotState for checkpoint#1 + harness.snapshot(checkpoint, ++timestamp); + List manifestPaths = assertFlinkManifests(1); + Path manifestPath = manifestPaths.get(0); + Assert.assertEquals("File name should have the expected pattern.", + String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); + + // 2. Read the data files from manifests and assert. + List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); + Assert.assertEquals(1, dataFiles.size()); + TestFlinkManifest.checkContentFile(dataFile1, dataFiles.get(0)); + + // 3. notifyCheckpointComplete for checkpoint#1 + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1)); + assertMaxCommittedCheckpointId(jobId, checkpoint); + assertFlinkManifests(0); + + // 4. process both data files and delete files. + RowData row2 = SimpleDataUtil.createInsert(2, "bbb"); + DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(row2)); + + RowData delete1 = SimpleDataUtil.createDelete(1, "aaa"); + DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete1)); + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile2) + .addDeleteFiles(deleteFile1) + .build(), + ++timestamp); + assertMaxCommittedCheckpointId(jobId, checkpoint); + + // 5. snapshotState for checkpoint#2 + harness.snapshot(++checkpoint, ++timestamp); + assertFlinkManifests(2); + + // 6. notifyCheckpointComplete for checkpoint#2 + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2)); + assertMaxCommittedCheckpointId(jobId, checkpoint); + assertFlinkManifests(0); + } + } + + @Test + public void testValidateDataFileExist() throws Exception { + Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + long timestamp = 0; + long checkpoint = 10; + JobID jobId = new JobID(); + FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1)); + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + // Txn#1: insert the row <1, 'aaa'> + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile1) + .build(), + ++timestamp); + harness.snapshot(checkpoint, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpoint); + + // Txn#2: Overwrite the committed data-file-1 + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2)); + new TestTableLoader(tablePath) + .loadTable() + .newOverwrite() + .addFile(dataFile2) + .deleteFile(dataFile1) + .commit(); + } + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + // Txn#3: position-delete the <1, 'aaa'> (NOT committed). + DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory, + "pos-delete-file-1", + ImmutableList.of(Pair.of(dataFile1.path(), 0L))); + harness.processElement(WriteResult.builder() + .addDeleteFiles(deleteFile1) + .addReferencedDataFiles(dataFile1.path()) + .build(), + ++timestamp); + harness.snapshot(++checkpoint, ++timestamp); + + // Txn#3: validate will be failure when committing. + final long currentCheckpointId = checkpoint; + AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.", + ValidationException.class, "Cannot commit, missing data files", + () -> { + harness.notifyOfCompletedCheckpoint(currentCheckpointId); + return null; + }); + } + } + + @Test + public void testCommitTwoCheckpointsInSingleTxn() throws Exception { + Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + harness.setup(); + harness.open(); + + assertMaxCommittedCheckpointId(jobId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + RowData delete3 = SimpleDataUtil.createDelete(3, "ccc"); + DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete3)); + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile1) + .addDeleteFiles(deleteFile1) + .build(), + ++timestamp); + + // The 1th snapshotState. + harness.snapshot(checkpoint, ++timestamp); + + RowData insert4 = SimpleDataUtil.createInsert(4, "ddd"); + RowData delete2 = SimpleDataUtil.createDelete(2, "bbb"); + DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert4)); + DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory, "delete-file-2", ImmutableList.of(delete2)); + harness.processElement(WriteResult.builder() + .addDataFiles(dataFile2) + .addDeleteFiles(deleteFile2) + .build(), + ++timestamp); + + // The 2nd snapshotState. + harness.snapshot(++checkpoint, ++timestamp); + + // Notify the 2nd snapshot to complete. + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4)); assertMaxCommittedCheckpointId(jobId, checkpoint); assertFlinkManifests(0); + Assert.assertEquals("Should have committed 2 txn.", 2, ImmutableList.copyOf(table.snapshots()).size()); } } + private DeleteFile writeEqDeleteFile(FileAppenderFactory appenderFactory, + String filename, List deletes) throws IOException { + return SimpleDataUtil.writeEqDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, deletes); + } + + private DeleteFile writePosDeleteFile(FileAppenderFactory appenderFactory, + String filename, + List> positions) throws IOException { + return SimpleDataUtil.writePosDeleteFile(table, FileFormat.PARQUET, tablePath, filename, appenderFactory, + positions); + } + + private FileAppenderFactory createDeletableAppenderFactory() { + int[] equalityFieldIds = new int[] { + table.schema().findField("id").fieldId(), + table.schema().findField("data").fieldId() + }; + return new FlinkAppenderFactory(table.schema(), + FlinkSchemaUtil.convert(table.schema()), table.properties(), table.spec(), equalityFieldIds, + table.schema(), null); + } + private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null); @@ -609,7 +817,7 @@ private void assertSnapshotSize(int expectedSnapshotSize) { Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size()); } - private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) + private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) throws Exception { TestOperatorFactory factory = TestOperatorFactory.of(tablePath); return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID)); @@ -629,7 +837,7 @@ private static MockEnvironment createEnvironment(JobID jobID) { } private static class TestOperatorFactory extends AbstractStreamOperatorFactory - implements OneInputStreamOperatorFactory { + implements OneInputStreamOperatorFactory { private final String tablePath; private TestOperatorFactory(String tablePath) { @@ -643,7 +851,7 @@ private static TestOperatorFactory of(String tablePath) { @Override @SuppressWarnings("unchecked") public > T createStreamOperator(StreamOperatorParameters param) { - IcebergFilesCommitter committer = new IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), false); + IcebergFilesCommitter committer = new IcebergFilesCommitter(new TestTableLoader(tablePath), false); committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput()); return (T) committer; } @@ -653,4 +861,27 @@ public Class getStreamOperatorClass(ClassLoader classL return IcebergFilesCommitter.class; } } + + private static class TestTableLoader implements TableLoader { + private File dir = null; + + TestTableLoader(String dir) { + this.dir = new File(dir); + } + + @Override + public void open() { + + } + + @Override + public Table loadTable() { + return TestTables.load(dir, "test"); + } + + @Override + public void close() { + + } + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 80771a4d233f..28db89456f7e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; @@ -47,6 +48,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -101,7 +103,7 @@ public void before() throws IOException { @Test public void testWritingTable() throws Exception { long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { // The first checkpoint testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 1); @@ -109,7 +111,9 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); long expectedDataFiles = partitioned ? 2 : 1; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); checkpointId = checkpointId + 1; @@ -119,11 +123,13 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); expectedDataFiles = partitioned ? 4 : 2; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // Commit the iceberg transaction. AppendFiles appendFiles = table.newAppend(); - testHarness.extractOutputValues().forEach(appendFiles::appendFile); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); // Assert the table records. @@ -141,31 +147,36 @@ public void testWritingTable() throws Exception { public void testSnapshotTwice() throws Exception { long checkpointId = 1; long timestamp = 1; - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), timestamp++); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), timestamp); testHarness.prepareSnapshotPreBarrier(checkpointId++); long expectedDataFiles = partitioned ? 2 : 1; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // snapshot again immediately. for (int i = 0; i < 5; i++) { testHarness.prepareSnapshotPreBarrier(checkpointId++); - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); } } } @Test public void testTableWithoutSnapshot() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { Assert.assertEquals(0, testHarness.extractOutputValues().size()); } // Even if we closed the iceberg stream writer, there's no orphan data file. Assert.assertEquals(0, scanDataFiles().size()); - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); // Still not emit the data file yet, because there is no checkpoint. Assert.assertEquals(0, testHarness.extractOutputValues().size()); @@ -197,7 +208,7 @@ private Set scanDataFiles() throws IOException { @Test public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2); @@ -205,11 +216,16 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); long expectedDataFiles = partitioned ? 2 : 1; - Assert.assertEquals(expectedDataFiles, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles, result.dataFiles().length); // invoke endInput again. ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); - Assert.assertEquals(expectedDataFiles * 2, testHarness.extractOutputValues().size()); + + result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(expectedDataFiles * 2, result.dataFiles().length); } } @@ -233,23 +249,25 @@ public void testTableWithTargetFileSize() throws Exception { } } - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { for (RowData row : rows) { testHarness.processElement(row, 1); } // snapshot the operator. testHarness.prepareSnapshotPreBarrier(1); - Assert.assertEquals(8, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(8, result.dataFiles().length); // Assert that the data file have the expected records. - for (DataFile serDataFile : testHarness.extractOutputValues()) { - Assert.assertEquals(1000, serDataFile.recordCount()); + for (DataFile dataFile : result.dataFiles()) { + Assert.assertEquals(1000, dataFile.recordCount()); } // Commit the iceberg transaction. AppendFiles appendFiles = table.newAppend(); - testHarness.extractOutputValues().forEach(appendFiles::appendFile); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); } @@ -294,31 +312,33 @@ public void testPromotedFlinkDataType() throws Exception { record.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103)) ); - try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, flinkSchema)) { for (RowData row : rows) { testHarness.processElement(row, 1); } testHarness.prepareSnapshotPreBarrier(1); - Assert.assertEquals(partitioned ? 3 : 1, testHarness.extractOutputValues().size()); + WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + Assert.assertEquals(0, result.deleteFiles().length); + Assert.assertEquals(partitioned ? 3 : 1, result.dataFiles().length); // Commit the iceberg transaction. AppendFiles appendFiles = icebergTable.newAppend(); - testHarness.extractOutputValues().forEach(appendFiles::appendFile); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); } SimpleDataUtil.assertTableRecords(location, expected); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { + private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { return createIcebergStreamWriter(table, SimpleDataUtil.FLINK_SCHEMA); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter( + private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema); - OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); harness.setup();