diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index a02e0eff55a2..83d5939755cf 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -632,4 +633,70 @@ public String toString() { .add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes) .toString(); } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + @Override + public boolean equals(Object other) { + if (other == null || getClass() != other.getClass()) { + return false; + } + BaseFile baseFile = (BaseFile) other; + return partitionSpecId == baseFile.partitionSpecId + && fileSizeInBytes == baseFile.fileSizeInBytes + && Objects.equals(partitionType, baseFile.partitionType) + && Objects.equals(fileOrdinal, baseFile.fileOrdinal) + && Objects.equals(manifestLocation, baseFile.manifestLocation) + && content == baseFile.content + && Objects.equals(filePath, baseFile.filePath) + && format == baseFile.format + && Objects.equals(partitionData, baseFile.partitionData) + && Objects.equals(recordCount, baseFile.recordCount) + && Objects.equals(dataSequenceNumber, baseFile.dataSequenceNumber) + && Objects.equals(fileSequenceNumber, baseFile.fileSequenceNumber) + && Objects.equals(columnSizes, baseFile.columnSizes) + && Objects.equals(valueCounts, baseFile.valueCounts) + && Objects.equals(nullValueCounts, baseFile.nullValueCounts) + && Objects.equals(nanValueCounts, baseFile.nanValueCounts) + && Objects.equals(lowerBounds, baseFile.lowerBounds) + && Objects.equals(upperBounds, baseFile.upperBounds) + && Objects.deepEquals(splitOffsets, baseFile.splitOffsets) + && Objects.deepEquals(equalityIds, baseFile.equalityIds) + && Objects.deepEquals(keyMetadata, baseFile.keyMetadata) + && Objects.equals(sortOrderId, baseFile.sortOrderId) + && Objects.equals(firstRowId, baseFile.firstRowId) + && Objects.equals(referencedDataFile, baseFile.referencedDataFile) + && Objects.equals(contentOffset, baseFile.contentOffset) + && Objects.equals(contentSizeInBytes, baseFile.contentSizeInBytes); + } + + @Override + public int hashCode() { + return Objects.hash( + partitionType, + fileOrdinal, + manifestLocation, + partitionSpecId, + content, + filePath, + format, + partitionData, + recordCount, + fileSizeInBytes, + dataSequenceNumber, + fileSequenceNumber, + columnSizes, + valueCounts, + nullValueCounts, + nanValueCounts, + lowerBounds, + upperBounds, + Arrays.hashCode(splitOffsets), + Arrays.hashCode(equalityIds), + Arrays.hashCode(keyMetadata), + sortOrderId, + firstRowId, + referencedDataFile, + contentOffset, + contentSizeInBytes); + } } diff --git a/core/src/main/java/org/apache/iceberg/ContentFileAvroEncoder.java b/core/src/main/java/org/apache/iceberg/ContentFileAvroEncoder.java new file mode 100644 index 000000000000..dce618dd5138 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ContentFileAvroEncoder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.inmemory.InMemoryInputFile; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.apache.iceberg.types.Types; + +/** + * A utility class to encode {@link ContentFile} implementations as Avro in a backwards compatible + * way. It uses the same Avro encoding mechanism as {@link ManifestWriter} and {@link + * ManifestReader}. * + */ +public class ContentFileAvroEncoder { + private ContentFileAvroEncoder() {} + + public static byte[] encode(ContentFile[] files) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream view = new DataOutputStream(out); + + Map>> filesByPartitionType = Maps.newLinkedHashMap(); + for (ContentFile dataFile : files) { + Types.StructType partitionType = ((PartitionData) dataFile.partition()).getPartitionType(); + filesByPartitionType + .computeIfAbsent(partitionType, ignoredSpec -> Lists.newArrayList()) + .add(dataFile); + } + // Number of unique partition types + view.writeInt(filesByPartitionType.size()); + + for (Map.Entry>> entry : + filesByPartitionType.entrySet()) { + Types.StructType partitionType = entry.getKey(); + List> dataFiles = entry.getValue(); + Schema fileSchema = new Schema(DataFile.getType(partitionType).fields()); + + String partitionSchema = SchemaParser.toJson(partitionType.asSchema()); + view.writeUTF(partitionSchema); + + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + try (FileAppender> fileAppender = + InternalData.write(FileFormat.AVRO, outputFile).schema(fileSchema).build()) { + fileAppender.addAll(dataFiles); + } + + byte[] serialisedFiles = outputFile.toByteArray(); + view.writeInt(serialisedFiles.length); + view.write(serialisedFiles); + } + + return out.toByteArray(); + } + + public static DataFile[] decodeDataFiles(byte[] serialized) throws IOException { + return decode(serialized, GenericDataFile.class); + } + + public static DeleteFile[] decodeDeleteFiles(byte[] serialized) throws IOException { + return decode(serialized, GenericDeleteFile.class); + } + + private static T[] decode(byte[] serialized, Class fileClass) + throws IOException { + DataInputStream view = new DataInputStream(new ByteArrayInputStream(serialized)); + List files = Lists.newArrayList(); + + int uniqueSpecTypes = view.readInt(); + for (int i = 0; i < uniqueSpecTypes; i++) { + Schema partitionSchema = SchemaParser.fromJson(view.readUTF()); + Schema fileSchema = new Schema(DataFile.getType(partitionSchema.asStruct()).fields()); + + byte[] fileBuffer = new byte[view.readInt()]; + ByteStreams.readFully(view, fileBuffer); + + try (CloseableIterable reader = + InternalData.read(FileFormat.AVRO, new InMemoryInputFile(fileBuffer)) + .project(fileSchema) + .setRootType(fileClass) + .setCustomType(DataFile.PARTITION_ID, PartitionData.class) + .build()) { + reader.forEach(files::add); + } + } + + return files.toArray((T[]) Array.newInstance(fileClass, files.size())); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index 39efaec7d34a..2ac9f0777cec 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -19,10 +19,13 @@ package org.apache.iceberg.io; import java.io.Serializable; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; @@ -134,4 +137,35 @@ public WriteResult build() { return new WriteResult(dataFiles, deleteFiles, referencedDataFiles, rewrittenDeleteFiles); } } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("dataFiles", dataFiles) + .add("deleteFiles", deleteFiles) + .add("referencedDataFiles", referencedDataFiles) + .add("rewrittenDeleteFiles", rewrittenDeleteFiles) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other == null || getClass() != other.getClass()) { + return false; + } + WriteResult that = (WriteResult) other; + return Objects.deepEquals(dataFiles, that.dataFiles) + && Objects.deepEquals(deleteFiles, that.deleteFiles) + && Objects.deepEquals(referencedDataFiles, that.referencedDataFiles) + && Objects.deepEquals(rewrittenDeleteFiles, that.rewrittenDeleteFiles); + } + + @Override + public int hashCode() { + return Objects.hash( + Arrays.hashCode(dataFiles), + Arrays.hashCode(deleteFiles), + Arrays.hashCode(referencedDataFiles), + Arrays.hashCode(rewrittenDeleteFiles)); + } } diff --git a/core/src/test/java/org/apache/iceberg/io/TestWriteResult.java b/core/src/test/java/org/apache/iceberg/io/TestWriteResult.java new file mode 100644 index 000000000000..dd5e782cec65 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestWriteResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.TestHelpers; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestWriteResult { + static final DataFile FILE_A = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + private static final WriteResult WRITE_RESULT = + WriteResult.builder().addDataFiles(FILE_A).addDeleteFiles(FILE_A_DELETES).build(); + + @ParameterizedTest + @MethodSource("org.apache.iceberg.TestHelpers#serializers") + public void serialization(TestHelpers.RoundTripSerializer roundTripSerializer) + throws IOException, ClassNotFoundException { + assertThat(roundTripSerializer.apply(WRITE_RESULT)).isEqualTo(WRITE_RESULT); + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 1b786e46452f..40ab84c6bb64 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.sink; import java.util.Arrays; -import java.util.List; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.annotation.Internal; @@ -43,11 +42,7 @@ public CommitSummary(NavigableMap pendingResults) { pendingResults.values().forEach(this::addWriteResult); } - public void addAll(NavigableMap> pendingResults) { - pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult)); - } - - private void addWriteResult(WriteResult writeResult) { + public void addWriteResult(WriteResult writeResult) { dataFilesCount.addAndGet(writeResult.dataFiles().length); Arrays.stream(writeResult.dataFiles()) .forEach( diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java index 40a3ce0cb846..e7e5c3f746c5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java @@ -18,14 +18,18 @@ */ package org.apache.iceberg.flink.sink; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.util.InstantiationUtil; +import org.apache.iceberg.ContentFileAvroEncoder; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; @Internal public class WriteResultSerializer implements SimpleVersionedSerializer { @@ -39,25 +43,55 @@ public int getVersion() { @Override public byte[] serialize(WriteResult writeResult) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); - byte[] result = InstantiationUtil.serializeObject(writeResult); - view.write(result); + DataOutputStream view = new DataOutputStream(out); + + writeByteArray(ContentFileAvroEncoder.encode(writeResult.dataFiles()), view); + writeByteArray(ContentFileAvroEncoder.encode(writeResult.deleteFiles()), view); + + view.writeInt(writeResult.referencedDataFiles().length); + for (CharSequence referencedDataFile : writeResult.referencedDataFiles()) { + view.writeUTF(referencedDataFile.toString()); + } + + writeByteArray(ContentFileAvroEncoder.encode(writeResult.rewrittenDeleteFiles()), view); + return out.toByteArray(); } @Override public WriteResult deserialize(int version, byte[] serialized) throws IOException { - if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - byte[] resultBuf = new byte[serialized.length]; - view.read(resultBuf); - try { - return InstantiationUtil.deserializeObject( - resultBuf, IcebergCommittableSerializer.class.getClassLoader()); - } catch (ClassNotFoundException cnc) { - throw new IOException("Could not deserialize the WriteResult object", cnc); - } + if (version != 1) { + throw new IOException("Unrecognized version or corrupt state: " + version); + } + DataInputStream view = new DataInputStream(new ByteArrayInputStream(serialized)); + + DataFile[] dataFiles = ContentFileAvroEncoder.decodeDataFiles(readByteArray(view)); + DeleteFile[] deleteFiles = ContentFileAvroEncoder.decodeDeleteFiles(readByteArray(view)); + + CharSequence[] referencedDataFiles = new CharSequence[view.readInt()]; + for (int i = 0; i < referencedDataFiles.length; i++) { + referencedDataFiles[i] = view.readUTF(); } - throw new IOException("Unrecognized version or corrupt state: " + version); + + DeleteFile[] rewrittenDeleteFiles = + ContentFileAvroEncoder.decodeDeleteFiles(readByteArray(view)); + + return WriteResult.builder() + .addDataFiles(dataFiles) + .addDeleteFiles(deleteFiles) + .addReferencedDataFiles(referencedDataFiles) + .addRewrittenDeleteFiles(rewrittenDeleteFiles) + .build(); + } + + private static void writeByteArray(byte[] buffer, DataOutputStream view) throws IOException { + view.writeInt(buffer.length); + view.write(buffer); + } + + private static byte[] readByteArray(DataInputStream inputStream) throws IOException { + byte[] buffer = new byte[inputStream.readInt()]; + ByteStreams.readFully(inputStream, buffer); + return buffer; } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java index 33edefe71eb0..9287f894e444 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java @@ -19,42 +19,41 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.Serializable; -import java.util.Arrays; import java.util.Objects; -import org.apache.iceberg.flink.sink.DeltaManifests; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** * The aggregated results of a single checkpoint which should be committed. Containing the - * serialized {@link DeltaManifests} file - which contains the commit data, and the jobId, - * operatorId, checkpointId triplet to identify the specific commit. + * serialized {@link org.apache.iceberg.io.WriteResult} and the jobId, operatorId, checkpointId + * triplet to identify the specific commit. * *

{@link DynamicCommittableSerializer} is used to serialize {@link DynamicCommittable} between * the {@link DynamicWriter} and the {@link DynamicWriteResultAggregator}. */ class DynamicCommittable implements Serializable { - private final WriteTarget key; - private final byte[] manifest; + private final TableKey key; + private final WriteResult writeResult; private final String jobId; private final String operatorId; private final long checkpointId; DynamicCommittable( - WriteTarget key, byte[] manifest, String jobId, String operatorId, long checkpointId) { + TableKey key, WriteResult writeResult, String jobId, String operatorId, long checkpointId) { this.key = key; - this.manifest = manifest; + this.writeResult = writeResult; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; } - WriteTarget key() { + TableKey key() { return key; } - byte[] manifest() { - return manifest; + WriteResult writeResult() { + return writeResult; } String jobId() { @@ -78,27 +77,24 @@ public boolean equals(Object o) { DynamicCommittable that = (DynamicCommittable) o; return checkpointId == that.checkpointId && Objects.equals(key, that.key) - && Objects.deepEquals(manifest, that.manifest) + && Objects.equals(writeResult, that.writeResult) && Objects.equals(jobId, that.jobId) && Objects.equals(operatorId, that.operatorId); } @Override public int hashCode() { - return Objects.hash(key, Arrays.hashCode(manifest), jobId, operatorId, checkpointId); + return Objects.hash(key, writeResult, jobId, operatorId, checkpointId); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("key", key) + .add("writeResult", writeResult) .add("jobId", jobId) .add("checkpointId", checkpointId) .add("operatorId", operatorId) .toString(); } - - public WriteTarget writeTarget() { - return key; - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java index 4aadcf1f3620..6c757a0811ce 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java @@ -23,6 +23,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.iceberg.flink.sink.WriteResultSerializer; +import org.apache.iceberg.io.WriteResult; /** * This serializer is used for serializing the {@link DynamicCommittable} objects between the {@link @@ -32,6 +34,7 @@ class DynamicCommittableSerializer implements SimpleVersionedSerializer { private static final int VERSION = 1; + private static final WriteResultSerializer WRITE_RESULT_SERIALIZER = new WriteResultSerializer(); @Override public int getVersion() { @@ -46,8 +49,10 @@ public byte[] serialize(DynamicCommittable committable) throws IOException { view.writeUTF(committable.jobId()); view.writeUTF(committable.operatorId()); view.writeLong(committable.checkpointId()); - view.writeInt(committable.manifest().length); - view.write(committable.manifest()); + + byte[] result = WRITE_RESULT_SERIALIZER.serialize(committable.writeResult()); + view.write(result); + return out.toByteArray(); } @@ -55,15 +60,16 @@ public byte[] serialize(DynamicCommittable committable) throws IOException { public DynamicCommittable deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); + TableKey key = TableKey.deserializeFrom(view); String jobId = view.readUTF(); String operatorId = view.readUTF(); long checkpointId = view.readLong(); - int manifestLen = view.readInt(); - byte[] manifestBuf; - manifestBuf = new byte[manifestLen]; - view.read(manifestBuf); - return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); + + byte[] resultBuf = new byte[view.available()]; + view.read(resultBuf); + WriteResult writeResult = WRITE_RESULT_SERIALIZER.deserialize(version, resultBuf); + + return new DynamicCommittable(key, writeResult, jobId, operatorId, checkpointId); } throw new IOException("Unrecognized version or corrupt state: " + version); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index e58066aac6ca..ff690c147387 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -19,36 +19,25 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.IOException; -import java.io.Serializable; import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.sink.CommitSummary; -import org.apache.iceberg.flink.sink.DeltaManifests; -import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; -import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.ThreadPools; @@ -72,12 +61,6 @@ class DynamicCommitter implements Committer { private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; - private static final WriteResult EMPTY_WRITE_RESULT = - WriteResult.builder() - .addDataFiles(Lists.newArrayList()) - .addDeleteFiles(Lists.newArrayList()) - .build(); private static final long INITIAL_CHECKPOINT_ID = -1L; @@ -114,39 +97,45 @@ class DynamicCommitter implements Committer { @Override public void commit(Collection> commitRequests) - throws IOException, InterruptedException { + throws InterruptedException { if (commitRequests.isEmpty()) { return; } - // For every table and every checkpoint, we store the list of to-be-committed - // DynamicCommittable. + // For every table and every checkpoint, we store a to-be-committed DynamicCommittable. // There may be DynamicCommittable from previous checkpoints which have not been committed yet. - Map>>> commitRequestMap = + Map>> commitRequestMap = Maps.newHashMap(); for (CommitRequest request : commitRequests) { - NavigableMap>> committables = - commitRequestMap.computeIfAbsent( - new TableKey(request.getCommittable()), unused -> Maps.newTreeMap()); - committables - .computeIfAbsent(request.getCommittable().checkpointId(), unused -> Lists.newArrayList()) - .add(request); + TableKey tableKey = new TableKey(request.getCommittable()); + NavigableMap> committables = + commitRequestMap.computeIfAbsent(tableKey, unused -> Maps.newTreeMap()); + + CommitRequest previousRequest = + committables.put(request.getCommittable().checkpointId(), request); + if (previousRequest != null) { + throw new IllegalStateException( + String.format( + "Duplicate commit request per tableKey=%s and checkpointId=%s: %s", + tableKey, request.getCommittable().checkpointId(), request)); + } } - for (Map.Entry>>> entry : + for (Map.Entry>> entry : commitRequestMap.entrySet()) { Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); - DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); + DynamicCommittable last = entry.getValue().lastEntry().getValue().getCommittable(); long maxCommittedCheckpointId = getMaxCommittedCheckpointId( table, last.jobId(), last.operatorId(), entry.getKey().branch()); // Mark the already committed FilesCommittable(s) as finished + // TODO: Add more logging for visibility on skipped commit requests entry .getValue() .headMap(maxCommittedCheckpointId, true) .values() - .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); - NavigableMap>> uncommitted = + .forEach(CommitRequest::signalAlreadyCommitted); + NavigableMap> uncommitted = entry.getValue().tailMap(maxCommittedCheckpointId, false); if (!uncommitted.isEmpty()) { commitPendingRequests( @@ -155,6 +144,7 @@ public void commit(Collection> commitRequests) } } + /** TODO: Reuse {@link org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */ private static long getMaxCommittedCheckpointId( Table table, String flinkJobId, String operatorId, String branch) { Snapshot snapshot = table.snapshot(branch); @@ -180,62 +170,18 @@ private static long getMaxCommittedCheckpointId( return lastCommittedCheckpointId; } - /** - * Commits the data to the Iceberg table by reading the file data from the {@link DeltaManifests} - * ordered by the checkpointId, and writing the new snapshot to the Iceberg table. The {@link - * SnapshotSummary} will contain the jobId, snapshotId, checkpointId so in case of job restart we - * can identify which changes are committed, and which are still waiting for the commit. - * - * @param commitRequestMap The checkpointId to {@link CommitRequest} map of the changes to commit - * @param newFlinkJobId The jobId to store in the {@link SnapshotSummary} - * @param operatorId The operatorId to store in the {@link SnapshotSummary} - * @throws IOException On commit failure - */ private void commitPendingRequests( Table table, String branch, - NavigableMap>> commitRequestMap, + NavigableMap> commitRequestMap, String newFlinkJobId, - String operatorId) - throws IOException { - long checkpointId = commitRequestMap.lastKey(); - List manifests = Lists.newArrayList(); - NavigableMap> pendingResults = Maps.newTreeMap(); - for (Map.Entry>> e : commitRequestMap.entrySet()) { - for (CommitRequest committable : e.getValue()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.getCommittable().manifest())) { - pendingResults - .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(EMPTY_WRITE_RESULT); - } else { - DeltaManifests deltaManifests = - SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, committable.getCommittable().manifest()); - pendingResults - .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); - manifests.addAll(deltaManifests.manifests()); - } - } - } - + String operatorId) { + // TODO: Fix aggregated commit summary logged multiple times per each each checkpoint commit CommitSummary summary = new CommitSummary(); - summary.addAll(pendingResults); - commitPendingResult(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - if (committerMetrics != null) { - committerMetrics.updateCommitSummary(table.name(), summary); - } - - FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId); - } + commitRequestMap.forEach( + (checkpointId, commitRequest) -> + summary.addWriteResult(commitRequest.getCommittable().writeResult())); - private void commitPendingResult( - Table table, - String branch, - NavigableMap> pendingResults, - CommitSummary summary, - String newFlinkJobId, - String operatorId) { long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); TableKey key = new TableKey(table.name(), branch); int continuousEmptyCheckpoints = @@ -253,76 +199,80 @@ private void commitPendingResult( continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { if (replacePartitions) { - replacePartitions(table, branch, pendingResults, summary, newFlinkJobId, operatorId); + replacePartitions(table, branch, commitRequestMap, summary, newFlinkJobId, operatorId); } else { - commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId); + commitDeltaTxn(table, branch, commitRequestMap, summary, newFlinkJobId, operatorId); } continuousEmptyCheckpoints = 0; } else { - long checkpointId = pendingResults.lastKey(); + long checkpointId = commitRequestMap.lastKey(); LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId); } continuousEmptyCheckpointsMap.put(key, continuousEmptyCheckpoints); + + if (committerMetrics != null) { + committerMetrics.updateCommitSummary(table.name(), summary); + } } private void replacePartitions( Table table, String branch, - NavigableMap> pendingResults, + NavigableMap> pendingRequests, CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { + for (Map.Entry> e : pendingRequests.entrySet()) { // We don't commit the merged result into a single transaction because for the sequential // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied // to data files from txn1. Committing the merged one will lead to the incorrect delete // semantic. - for (WriteResult result : e.getValue()) { - ReplacePartitions dynamicOverwrite = - table.newReplacePartitions().scanManifestsWith(workerPool); - Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); - commitOperation( - table, - branch, - dynamicOverwrite, - summary, - "dynamic partition overwrite", - newFlinkJobId, - operatorId, - e.getKey()); - } + WriteResult writeResult = e.getValue().getCommittable().writeResult(); + + ReplacePartitions dynamicOverwrite = + table.newReplacePartitions().scanManifestsWith(workerPool); + Arrays.stream(writeResult.dataFiles()).forEach(dynamicOverwrite::addFile); + commitOperation( + table, + branch, + dynamicOverwrite, + summary, + "dynamic partition overwrite", + newFlinkJobId, + operatorId, + e.getKey()); } } private void commitDeltaTxn( Table table, String branch, - NavigableMap> pendingResults, + NavigableMap> pendingRequests, CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { + for (Map.Entry> e : pendingRequests.entrySet()) { // We don't commit the merged result into a single transaction because for the sequential // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied // to data files from txn1. Committing the merged one will lead to the incorrect delete // semantic. - for (WriteResult result : e.getValue()) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced by - // the position delete files that are being committed. - RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey()); - } + WriteResult result = e.getValue().getCommittable().writeResult(); + + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced by + // the position delete files that are being committed. + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + commitOperation( + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey()); } } @@ -371,54 +321,4 @@ void commitOperation( public void close() throws IOException { workerPool.shutdown(); } - - private static class TableKey implements Serializable { - private String tableName; - private String branch; - - TableKey(String tableName, String branch) { - this.tableName = tableName; - this.branch = branch; - } - - TableKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); - } - - String tableName() { - return tableName; - } - - String branch() { - return branch; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - TableKey that = (TableKey) other; - return tableName.equals(that.tableName) && branch.equals(that.branch); - } - - @Override - public int hashCode() { - return Objects.hash(tableName, branch); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("tableName", tableName) - .add("branch", branch) - .toString(); - } - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 2715a01608d6..a05066f15550 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -168,7 +168,7 @@ public DataStream> addPreCommitTopology( .transform( prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"), typeInformation, - new DynamicWriteResultAggregator(catalogLoader)) + new DynamicWriteResultAggregator()) .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology")); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java index 85806f932ad5..549fe5db6825 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java @@ -22,15 +22,15 @@ class DynamicWriteResult { - private final WriteTarget key; + private final TableKey key; private final WriteResult writeResult; - DynamicWriteResult(WriteTarget key, WriteResult writeResult) { + DynamicWriteResult(TableKey key, WriteResult writeResult) { this.key = key; this.writeResult = writeResult; } - WriteTarget key() { + TableKey key() { return key; } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index 58ba183dfcd4..2b2fefe61653 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -18,30 +18,17 @@ */ package org.apache.iceberg.flink.sink.dynamic; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import java.io.IOException; -import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Map; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.flink.CatalogLoader; -import org.apache.iceberg.flink.sink.DeltaManifests; -import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; -import org.apache.iceberg.flink.sink.FlinkManifestUtil; -import org.apache.iceberg.flink.sink.ManifestOutputFileFactory; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; @@ -49,63 +36,45 @@ /** * Operator which aggregates the individual {@link WriteResult} objects to a single {@link - * DynamicCommittable} per checkpoint (storing the serialized {@link DeltaManifests}, jobId, - * operatorId, checkpointId) + * DynamicCommittable} per checkpoint. */ class DynamicWriteResultAggregator extends AbstractStreamOperator> implements OneInputStreamOperator< CommittableMessage, CommittableMessage> { private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteResultAggregator.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; - private static final Duration CACHE_EXPIRATION_DURATION = Duration.ofMinutes(1); - private final CatalogLoader catalogLoader; - private transient Map> results; - private transient Cache> specs; - private transient Cache outputFileFactories; + private transient Map> results; private transient String flinkJobId; private transient String operatorId; private transient int subTaskId; - private transient int attemptId; - private transient Catalog catalog; - - DynamicWriteResultAggregator(CatalogLoader catalogLoader) { - this.catalogLoader = catalogLoader; - } @Override public void open() throws Exception { this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); this.operatorId = getOperatorID().toString(); this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); - this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber(); this.results = Maps.newHashMap(); - this.specs = - Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build(); - this.outputFileFactories = - Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build(); - this.catalog = catalogLoader.loadCatalog(); } @Override - public void finish() throws IOException { + public void finish() { prepareSnapshotPreBarrier(Long.MAX_VALUE); } @Override - public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { + public void prepareSnapshotPreBarrier(long checkpointId) { Collection> committables = Sets.newHashSetWithExpectedSize(results.size()); int count = 0; - for (Map.Entry> entries : results.entrySet()) { + for (Map.Entry> entries : results.entrySet()) { committables.add( new CommittableWithLineage<>( new DynamicCommittable( entries.getKey(), - writeToManifest(entries.getKey(), entries.getValue(), checkpointId), - getContainingTask().getEnvironment().getJobID().toString(), - getRuntimeContext().getOperatorUniqueID(), + aggregate(entries.getValue()), + flinkJobId, + operatorId, checkpointId), checkpointId, count)); @@ -124,32 +93,6 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { results.clear(); } - /** - * Write all the completed data files to a newly created manifest file and return the manifest's - * avro serialized bytes. - */ - @VisibleForTesting - byte[] writeToManifest( - WriteTarget key, Collection writeResults, long checkpointId) - throws IOException { - if (writeResults.isEmpty()) { - return EMPTY_MANIFEST_DATA; - } - - WriteResult.Builder builder = WriteResult.builder(); - writeResults.forEach(w -> builder.add(w.writeResult())); - WriteResult result = builder.build(); - - DeltaManifests deltaManifests = - FlinkManifestUtil.writeCompletedFiles( - result, - () -> outputFileFactory(key.tableName()).create(checkpointId), - spec(key.tableName(), key.specId())); - - return SimpleVersionedSerialization.writeVersionAndSerialize( - DeltaManifestsSerializer.INSTANCE, deltaManifests); - } - @Override public void processElement(StreamRecord> element) throws Exception { @@ -157,32 +100,16 @@ public void processElement(StreamRecord> if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { DynamicWriteResult result = ((CommittableWithLineage) element.getValue()).getCommittable(); - WriteTarget key = result.key(); - results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result); + Collection resultsPerTableKey = + results.computeIfAbsent(result.key(), unused -> Lists.newArrayList()); + resultsPerTableKey.add(result.writeResult()); + LOG.debug("Added {}, totalResults={}", result, resultsPerTableKey.size()); } } - private ManifestOutputFileFactory outputFileFactory(String tableName) { - return outputFileFactories.get( - tableName, - unused -> { - Table table = catalog.loadTable(TableIdentifier.parse(tableName)); - specs.put(tableName, table.specs()); - return FlinkManifestUtil.createOutputFileFactory( - () -> table, table.properties(), flinkJobId, operatorId, subTaskId, attemptId); - }); - } - - private PartitionSpec spec(String tableName, int specId) { - Map knownSpecs = specs.getIfPresent(tableName); - if (knownSpecs != null) { - PartitionSpec spec = knownSpecs.get(specId); - if (spec != null) { - return spec; - } - } - - Table table = catalog.loadTable(TableIdentifier.parse(tableName)); - return table.specs().get(specId); + private static WriteResult aggregate(List writeResults) { + WriteResult.Builder builder = WriteResult.builder(); + writeResults.forEach(builder::add); + return builder.build(); } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java index cf5f423fd7ff..60765673004a 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java @@ -50,7 +50,7 @@ public byte[] serialize(DynamicWriteResult writeResult) throws IOException { public DynamicWriteResult deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); + TableKey key = TableKey.deserializeFrom(view); byte[] resultBuf = new byte[view.available()]; view.read(resultBuf); WriteResult writeResult = WRITE_RESULT_SERIALIZER.deserialize(version, resultBuf); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index ae24efafa6af..40acda419495 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -187,7 +187,8 @@ public Collection prepareCommit() throws IOException { writeResult.dataFiles().length, writeResult.deleteFiles().length); - result.add(new DynamicWriteResult(writeTarget, writeResult)); + TableKey tableKey = new TableKey(writeTarget.tableName(), writeTarget.branch()); + result.add(new DynamicWriteResult(tableKey, writeResult)); } writers.clear(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java index 90b6c7295cb7..d8a14287725d 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/PartitionSpecEvolution.java @@ -34,9 +34,9 @@ public class PartitionSpecEvolution { private PartitionSpecEvolution() {} /** - * Checks whether two PartitionSpecs are compatible with each other. Less strict than {@code - * PartitionSpec#compatible} in the sense that it tolerates differently named partition fields, as - * long as their transforms and field names corresponding to their source ids match. + * Checks whether two PartitionSpecs are compatible with each other. Less strict than {@link + * PartitionSpec#compatibleWith} in the sense that it tolerates differently named partition + * fields, as long as their transforms and field names corresponding to their source ids match. */ public static boolean checkCompatibility(PartitionSpec spec1, PartitionSpec spec2) { if (spec1.equals(spec2)) { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java new file mode 100644 index 000000000000..fbb61fba56fb --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +class TableKey implements Serializable { + private final String tableName; + private final String branch; + + TableKey(String tableName, String branch) { + this.tableName = tableName; + this.branch = branch; + } + + TableKey(DynamicCommittable committable) { + this.tableName = committable.key().tableName(); + this.branch = committable.key().branch(); + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + void serializeTo(DataOutputView view) throws IOException { + view.writeUTF(tableName); + view.writeUTF(branch); + } + + static TableKey deserializeFrom(DataInputView view) throws IOException { + return new TableKey(view.readUTF(), view.readUTF()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + TableKey that = (TableKey) other; + return tableName.equals(that.tableName) && branch.equals(that.branch); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .toString(); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestWriteResultSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestWriteResultSerializer.java new file mode 100644 index 000000000000..72588380e10d --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestWriteResultSerializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.io.WriteResult; +import org.junit.jupiter.api.Test; + +public class TestWriteResultSerializer { + private static final PartitionSpec SPEC_1 = + PartitionSpec.builderFor(SCHEMA).withSpecId(0).bucket("data", 2).build(); + private static final PartitionSpec SPEC_2 = + PartitionSpec.builderFor(SCHEMA).withSpecId(1).bucket("data", 5).build(); + + private static final DataFile FILE_1 = + DataFiles.builder(SPEC_1) + .withPath("/path/to/data-1.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + private static final DataFile FILE_2 = + DataFiles.builder(SPEC_2) + .withPath("/path/to/data-2.parquet") + .withFileSizeInBytes(11) + .withPartitionPath("data_bucket=3") + .withRecordCount(1) + .build(); + + private static final DeleteFile FILE_1_DELETES = + FileMetadata.deleteFileBuilder(SPEC_1) + .ofPositionDeletes() + .withPath("/path/to/data-1-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + + private static final WriteResult WRITE_RESULT = + WriteResult.builder() + .addDataFiles(FILE_1, FILE_2) + .addDeleteFiles(FILE_1_DELETES) + .addReferencedDataFiles("foo", "bar") + .addRewrittenDeleteFiles(FILE_1_DELETES) + .build(); + + @Test + public void testRoundTripSerialize() throws IOException { + WriteResultSerializer serializer = new WriteResultSerializer(); + + WriteResult copy = + serializer.deserialize(serializer.getVersion(), serializer.serialize(WRITE_RESULT)); + assertThat(copy).isEqualTo(WRITE_RESULT); + } + + @Test + void testUnsupportedVersion() { + WriteResultSerializer serializer = new WriteResultSerializer(); + + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(WRITE_RESULT))) + .hasMessage("Unrecognized version or corrupt state: -1") + .isInstanceOf(IOException.class); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java index 13a06d362717..702ece14aea6 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java @@ -18,44 +18,45 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import static org.apache.iceberg.TestBase.SPEC; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.io.WriteResult; import org.junit.jupiter.api.Test; class TestDynamicCommittableSerializer { + private static final DynamicCommittable COMMITTABLE = + new DynamicCommittable( + new TableKey("table", "branch"), + WriteResult.builder() + .addDataFiles( + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build()) + .build(), + JobID.generate().toHexString(), + new OperatorID().toHexString(), + 5); @Test void testRoundtrip() throws IOException { - DynamicCommittable committable = - new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, - JobID.generate().toHexString(), - new OperatorID().toHexString(), - 5); - DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(committable))) - .isEqualTo(committable); + assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(COMMITTABLE))) + .isEqualTo(COMMITTABLE); } @Test - void testUnsupportedVersion() throws IOException { - DynamicCommittable committable = - new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, - JobID.generate().toHexString(), - new OperatorID().toHexString(), - 5); - + void testUnsupportedVersion() { DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(committable))) + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(COMMITTABLE))) .hasMessage("Unrecognized version or corrupt state: -1") .isInstanceOf(IOException.class); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 99a546536208..de64b976f3db 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.Metrics; @@ -73,6 +72,8 @@ class TestDynamicCommitter { ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds )) .build(); + private static final WriteResult WRITE_RESULT = + WriteResult.builder().addDataFiles(DATA_FILE).build(); @BeforeEach void before() { @@ -104,40 +105,9 @@ void testCommit() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch", 42, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget2 = - new WriteTarget(TABLE1, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget3 = - new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); - - DynamicWriteResultAggregator aggregator = - new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); - OneInputStreamOperatorTestHarness aggregatorHarness = - new OneInputStreamOperatorTestHarness(aggregator); - aggregatorHarness.open(); - - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget3, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); + TableKey tableKey1 = new TableKey(TABLE1, "branch"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); + TableKey tableKey3 = new TableKey(TABLE2, "branch2"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); @@ -145,15 +115,15 @@ void testCommit() throws Exception { CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey1, WRITE_RESULT, jobId, operatorId, checkpointId)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey2, WRITE_RESULT, jobId, operatorId, checkpointId)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey3, WRITE_RESULT, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); @@ -219,7 +189,7 @@ void testCommit() throws Exception { } @Test - void testAlreadyCommitted() throws Exception { + void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); assertThat(table1.snapshots()).isEmpty(); @@ -237,37 +207,21 @@ void testAlreadyCommitted() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); - - DynamicWriteResultAggregator aggregator = - new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); - OneInputStreamOperatorTestHarness aggregatorHarness = - new OneInputStreamOperatorTestHarness(aggregator); - aggregatorHarness.open(); + TableKey tableKey = new TableKey(TABLE1, "branch"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); - CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, WRITE_RESULT, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); CommitRequest oldCommitRequest = new MockCommitRequest<>( - new DynamicCommittable( - writeTarget, deltaManifest, jobId, operatorId, checkpointId - 1)); + new DynamicCommittable(tableKey, WRITE_RESULT, jobId, operatorId, checkpointId - 1)); // Old commits requests shouldn't affect the result dynamicCommitter.commit(Sets.newHashSet(oldCommitRequest)); @@ -314,45 +268,21 @@ void testReplacePartitions() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); - - DynamicWriteResultAggregator aggregator = - new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); - OneInputStreamOperatorTestHarness aggregatorHarness = - new OneInputStreamOperatorTestHarness(aggregator); - aggregatorHarness.open(); + TableKey tableKey = new TableKey(TABLE1, "branch"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); - CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, WRITE_RESULT, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); - byte[] overwriteManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId + 1); - CommitRequest overwriteRequest = new MockCommitRequest<>( - new DynamicCommittable( - writeTarget, overwriteManifest, jobId, operatorId, checkpointId + 1)); + new DynamicCommittable(tableKey, WRITE_RESULT, jobId, operatorId, checkpointId + 1)); dynamicCommitter.commit(Sets.newHashSet(overwriteRequest)); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index b61e297cc140..fe9d84263455 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -56,6 +56,8 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -74,6 +76,7 @@ import org.apache.iceberg.inmemory.InMemoryInputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -424,6 +427,26 @@ void testPartitionSpecEvolution() throws Exception { new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2)); runTest(rows); + + // Validate the table has expected partition specs + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1")); + + Map tableSpecs = table.specs(); + List expectedSpecs = List.of(spec1, spec2, PartitionSpec.unpartitioned()); + + assertThat(tableSpecs).hasSize(expectedSpecs.size()); + expectedSpecs.forEach( + expectedSpec -> + assertThat( + tableSpecs.values().stream() + // TODO: Fix PartitionSpecEvolution#evolve to re-use PartitionField names of + // the target spec, + // which would allow us to compare specs with + // PartitionSpec#compatibleWith here. + .anyMatch( + spec -> PartitionSpecEvolution.checkCompatibility(spec, expectedSpec))) + .withFailMessage("Table spec not found: %s.", expectedSpec) + .isTrue()); } @Test @@ -569,6 +592,71 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } + @Test + void testProducesSingleSnapshotPerTableBranchAndCheckpoint() throws Exception { + String tableName = "t1"; + String branch = SnapshotRef.MAIN_BRANCH; + PartitionSpec spec1 = PartitionSpec.unpartitioned(); + PartitionSpec spec2 = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + Set equalityFields = Sets.newHashSet("id"); + + List inputRecords = + Lists.newArrayList( + // Two schemas + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA2, tableName, branch, spec1), + // Two specs + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec2), + // Some upserts + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, false), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, true)); + + executeDynamicSink(inputRecords, env, true, 1, null); + + List actualRecords; + try (CloseableIterable iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + actualRecords = Lists.newArrayList(iterable); + } + + int expectedRecords = inputRecords.size() - 1; // 1 duplicate + assertThat(actualRecords).hasSize(expectedRecords); + + for (int i = 0; i < expectedRecords; i++) { + Record actual = actualRecords.get(0); + assertThat(inputRecords) + .anySatisfy( + inputRecord -> { + assertThat(actual.get(0)).isEqualTo(inputRecord.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(inputRecord.rowProvided.getField(1)); + if (inputRecord.schemaProvided.equals(SimpleDataUtil.SCHEMA2)) { + assertThat(actual.get(2)).isEqualTo(inputRecord.rowProvided.getField(2)); + } + // There is an additional _pos field which gets added + }); + } + + TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); + Table table = CATALOG_EXTENSION.catalog().loadTable(tableIdentifier); + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + + assertThat(snapshots).hasSize(2); // Table creation + data changes + assertThat(snapshots.get(1).summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-records", "6") + .put("changed-partition-count", "2") + .put("total-equality-deletes", "1") + .put("total-position-deletes", "1") + .put("total-records", "6") + .build()); + } + interface CommitHook extends Serializable { void beforeCommit(); @@ -764,7 +852,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) - throws IOException, InterruptedException { + throws InterruptedException { commitHook.beforeCommit(); super.commit(commitRequests); commitHook.afterCommit(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java index 713c67da170a..044391f8fca4 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java @@ -20,42 +20,33 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.hadoop.util.Sets; -import org.apache.iceberg.Schema; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.io.WriteResult; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; class TestDynamicWriteResultAggregator { - - @RegisterExtension - static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); - @Test - void testAggregator() throws Exception { - CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); - CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); - - DynamicWriteResultAggregator aggregator = - new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader()); + void testAggregatesWriteResultsForTwoTables() throws Exception { try (OneInputStreamOperatorTestHarness< CommittableMessage, CommittableMessage> - testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { + testHarness = new OneInputStreamOperatorTestHarness<>(new DynamicWriteResultAggregator())) { testHarness.open(); - WriteTarget writeTarget1 = new WriteTarget("table", "branch", 42, 0, true, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = - new DynamicWriteResult(writeTarget1, WriteResult.builder().build()); - WriteTarget writeTarget2 = - new WriteTarget("table2", "branch", 42, 0, true, Sets.newHashSet(1, 2)); + new DynamicWriteResult(tableKey1, WriteResult.builder().build()); + TableKey tableKey2 = new TableKey("table2", "branch"); DynamicWriteResult dynamicWriteResult2 = - new DynamicWriteResult(writeTarget2, WriteResult.builder().build()); + new DynamicWriteResult(tableKey2, WriteResult.builder().build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -79,4 +70,82 @@ void testAggregator() throws Exception { assertThat(testHarness.getRecordOutput()).hasSize(4); } } + + @Test + void testAggregatesWriteResultsForOneTable() throws Exception { + long checkpointId = 1L; + + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = new OneInputStreamOperatorTestHarness<>(new DynamicWriteResultAggregator())) { + testHarness.open(); + + TableKey tableKey = new TableKey("table", "branch"); + DataFile dataFile1 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-1.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + DataFile dataFile2 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-2.parquet") + .withFileSizeInBytes(20) + .withRecordCount(2) + .build(); + + testHarness.processElement(createRecord(tableKey, checkpointId, dataFile1)); + testHarness.processElement(createRecord(tableKey, checkpointId, dataFile2)); + + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.prepareSnapshotPreBarrier(checkpointId); + + List> outputValues = testHarness.extractOutputValues(); + // Contains a CommittableSummary + DynamicCommittable + assertThat(outputValues).hasSize(2); + + SinkV2Assertions.assertThat(extractAndAssertCommittableSummary(outputValues.get(0))) + .hasOverallCommittables(1) + .hasFailedCommittables(0) + .hasCheckpointId(checkpointId); + + CommittableWithLineage committable = + extractAndAssertCommittableWithLineage(outputValues.get(1)); + + SinkV2Assertions.assertThat(committable).hasCheckpointId(checkpointId); + + DynamicCommittable dynamicCommittable = committable.getCommittable(); + + assertThat(dynamicCommittable.writeResult()) + .isEqualTo(WriteResult.builder().addDataFiles(dataFile1, dataFile2).build()); + assertThat(dynamicCommittable.key()).isEqualTo(tableKey); + assertThat(dynamicCommittable.checkpointId()).isEqualTo(checkpointId); + assertThat(dynamicCommittable.jobId()) + .isEqualTo(testHarness.getEnvironment().getJobID().toString()); + assertThat(dynamicCommittable.operatorId()) + .isEqualTo(testHarness.getOperator().getOperatorID().toString()); + } + } + + private static StreamRecord> createRecord( + TableKey tableKey, long checkpointId, DataFile... dataFiles) { + return new StreamRecord<>( + new CommittableWithLineage<>( + new DynamicWriteResult(tableKey, WriteResult.builder().addDataFiles(dataFiles).build()), + checkpointId, + 0)); + } + + static CommittableSummary extractAndAssertCommittableSummary( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableSummary.class); + return (CommittableSummary) message; + } + + static CommittableWithLineage extractAndAssertCommittableWithLineage( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableWithLineage.class); + return (CommittableWithLineage) message; + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java index a3a9691107eb..49647c7f30b3 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.util.Sets; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.Metrics; @@ -48,13 +47,12 @@ class TestDynamicWriteResultSerializer { ImmutableMap.of(1, ByteBuffer.allocate(1)), ImmutableMap.of(1, ByteBuffer.allocate(1)))) .build(); + private static final TableKey TABLE_KEY = new TableKey("table", "branch"); @Test void testRoundtrip() throws IOException { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); DynamicWriteResult copy = @@ -68,11 +66,9 @@ void testRoundtrip() throws IOException { } @Test - void testUnsupportedVersion() throws IOException { + void testUnsupportedVersion() { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(dynamicWriteResult)))