diff --git a/core/src/main/java/org/apache/iceberg/io/WriteResult.java b/core/src/main/java/org/apache/iceberg/io/WriteResult.java index 39efaec7d34a..2ab2fff3c5ae 100644 --- a/core/src/main/java/org/apache/iceberg/io/WriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/WriteResult.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; @@ -134,4 +135,14 @@ public WriteResult build() { return new WriteResult(dataFiles, deleteFiles, referencedDataFiles, rewrittenDeleteFiles); } } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("dataFiles", dataFiles) + .add("deleteFiles", deleteFiles) + .add("referencedDataFiles", referencedDataFiles) + .add("rewrittenDeleteFiles", rewrittenDeleteFiles) + .toString(); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 1b786e46452f..523a76bf7ac1 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.sink; import java.util.Arrays; -import java.util.List; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.annotation.Internal; @@ -37,17 +36,11 @@ public class CommitSummary { private final AtomicLong deleteFilesRecordCount = new AtomicLong(); private final AtomicLong deleteFilesByteCount = new AtomicLong(); - public CommitSummary() {} - public CommitSummary(NavigableMap pendingResults) { pendingResults.values().forEach(this::addWriteResult); } - public void addAll(NavigableMap> pendingResults) { - pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult)); - } - - private void addWriteResult(WriteResult writeResult) { + public void addWriteResult(WriteResult writeResult) { dataFilesCount.addAndGet(writeResult.dataFiles().length); Arrays.stream(writeResult.dataFiles()) .forEach( diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java index 33edefe71eb0..4f0b68573ff5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java @@ -26,7 +26,7 @@ /** * The aggregated results of a single checkpoint which should be committed. Containing the - * serialized {@link DeltaManifests} file - which contains the commit data, and the jobId, + * serialized {@link DeltaManifests} files - which contains the commit data, and the jobId, * operatorId, checkpointId triplet to identify the specific commit. * *

{@link DynamicCommittableSerializer} is used to serialize {@link DynamicCommittable} between @@ -34,27 +34,27 @@ */ class DynamicCommittable implements Serializable { - private final WriteTarget key; - private final byte[] manifest; + private final TableKey key; + private final byte[][] manifests; private final String jobId; private final String operatorId; private final long checkpointId; DynamicCommittable( - WriteTarget key, byte[] manifest, String jobId, String operatorId, long checkpointId) { + TableKey key, byte[][] manifests, String jobId, String operatorId, long checkpointId) { this.key = key; - this.manifest = manifest; + this.manifests = manifests; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; } - WriteTarget key() { + TableKey key() { return key; } - byte[] manifest() { - return manifest; + byte[][] manifests() { + return manifests; } String jobId() { @@ -78,14 +78,14 @@ public boolean equals(Object o) { DynamicCommittable that = (DynamicCommittable) o; return checkpointId == that.checkpointId && Objects.equals(key, that.key) - && Objects.deepEquals(manifest, that.manifest) + && Arrays.deepEquals(manifests, that.manifests) && Objects.equals(jobId, that.jobId) && Objects.equals(operatorId, that.operatorId); } @Override public int hashCode() { - return Objects.hash(key, Arrays.hashCode(manifest), jobId, operatorId, checkpointId); + return Objects.hash(key, Arrays.deepHashCode(manifests), jobId, operatorId, checkpointId); } @Override @@ -97,8 +97,4 @@ public String toString() { .add("operatorId", operatorId) .toString(); } - - public WriteTarget writeTarget() { - return key; - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java index 4aadcf1f3620..b4d0ae7861b5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java @@ -31,11 +31,12 @@ */ class DynamicCommittableSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; @Override public int getVersion() { - return VERSION; + return VERSION_2; } @Override @@ -46,26 +47,59 @@ public byte[] serialize(DynamicCommittable committable) throws IOException { view.writeUTF(committable.jobId()); view.writeUTF(committable.operatorId()); view.writeLong(committable.checkpointId()); - view.writeInt(committable.manifest().length); - view.write(committable.manifest()); + + view.writeInt(committable.manifests().length); + for (int i = 0; i < committable.manifests().length; i++) { + view.writeInt(committable.manifests()[i].length); + view.write(committable.manifests()[i]); + } + return out.toByteArray(); } @Override public DynamicCommittable deserialize(int version, byte[] serialized) throws IOException { - if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); - String jobId = view.readUTF(); - String operatorId = view.readUTF(); - long checkpointId = view.readLong(); - int manifestLen = view.readInt(); - byte[] manifestBuf; - manifestBuf = new byte[manifestLen]; - view.read(manifestBuf); - return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); + if (version == VERSION_1) { + return deserializeV1(serialized); + } else if (version == VERSION_2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private DynamicCommittable deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + WriteTarget key = WriteTarget.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf; + manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + return new DynamicCommittable( + new TableKey(key.tableName(), key.branch()), + new byte[][] {manifestBuf}, + jobId, + operatorId, + checkpointId); + } + + private DynamicCommittable deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + TableKey key = TableKey.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + + byte[][] manifestsBuf = new byte[view.readInt()][]; + for (int i = 0; i < manifestsBuf.length; i++) { + byte[] manifest = new byte[view.readInt()]; + view.read(manifest); + manifestsBuf[i] = manifest; + } + + return new DynamicCommittable(key, manifestsBuf, jobId, operatorId, checkpointId); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 54d506b66328..fbd53f6366b5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -19,13 +19,11 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.IOException; -import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.flink.annotation.Internal; @@ -46,7 +44,6 @@ import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -72,13 +69,7 @@ class DynamicCommitter implements Committer { private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; - private static final WriteResult EMPTY_WRITE_RESULT = - WriteResult.builder() - .addDataFiles(Lists.newArrayList()) - .addDeleteFiles(Lists.newArrayList()) - .build(); - + private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][]; private static final long INITIAL_CHECKPOINT_ID = -1L; @VisibleForTesting @@ -119,6 +110,11 @@ public void commit(Collection> commitRequests) return; } + /* + TODO: Replace List> with a single CommitRequest + per TableKey in the next major release (once WriteResult aggregation fix in the upstream + DynamicWriteResultAggregator becomes public) + */ // For every table and every checkpoint, we store the list of to-be-committed // DynamicCommittable. // There may be DynamicCommittable from previous checkpoints which have not been committed yet. @@ -141,6 +137,7 @@ public void commit(Collection> commitRequests) getMaxCommittedCheckpointId( table, last.jobId(), last.operatorId(), entry.getKey().branch()); // Mark the already committed FilesCommittable(s) as finished + // TODO: Add more logging for visibility on skipped commit requests entry .getValue() .headMap(maxCommittedCheckpointId, true) @@ -155,6 +152,7 @@ public void commit(Collection> commitRequests) } } + /** TODO: Reuse {@link org.apache.iceberg.flink.sink.SinkUtil#getMaxCommittedCheckpointId} * */ private static long getMaxCommittedCheckpointId( Table table, String flinkJobId, String operatorId, String branch) { Snapshot snapshot = table.snapshot(branch); @@ -200,27 +198,27 @@ private void commitPendingRequests( throws IOException { long checkpointId = commitRequestMap.lastKey(); List manifests = Lists.newArrayList(); - NavigableMap> pendingResults = Maps.newTreeMap(); + NavigableMap pendingResults = Maps.newTreeMap(); for (Map.Entry>> e : commitRequestMap.entrySet()) { + WriteResult.Builder builder = WriteResult.builder(); for (CommitRequest committable : e.getValue()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.getCommittable().manifest())) { - pendingResults - .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(EMPTY_WRITE_RESULT); - } else { - DeltaManifests deltaManifests = - SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, committable.getCommittable().manifest()); - pendingResults - .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); - manifests.addAll(deltaManifests.manifests()); + if (!Arrays.deepEquals(EMPTY_MANIFEST_DATA, committable.getCommittable().manifests())) { + for (byte[] manifest : committable.getCommittable().manifests()) { + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, manifest); + builder.add( + FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); + manifests.addAll(deltaManifests.manifests()); + } } + + pendingResults.put(e.getKey(), builder.build()); } } - CommitSummary summary = new CommitSummary(); - summary.addAll(pendingResults); + // TODO: Fix aggregated commit summary logged multiple times per each each checkpoint commit + CommitSummary summary = new CommitSummary(pendingResults); commitPendingResult(table, branch, pendingResults, summary, newFlinkJobId, operatorId); if (committerMetrics != null) { committerMetrics.updateCommitSummary(table.name(), summary); @@ -232,7 +230,7 @@ private void commitPendingRequests( private void commitPendingResult( Table table, String branch, - NavigableMap> pendingResults, + NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) { @@ -270,7 +268,7 @@ private void commitPendingResult( private void replacePartitions( Table table, String branch, - NavigableMap> pendingResults, + NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) { @@ -278,10 +276,8 @@ private void replacePartitions( // Hence, we commit everything in one snapshot. ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool); - for (List writeResults : pendingResults.values()) { - for (WriteResult result : writeResults) { - Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); - } + for (WriteResult writeResult : pendingResults.values()) { + Arrays.stream(writeResult.dataFiles()).forEach(dynamicOverwrite::addFile); } commitOperation( @@ -298,34 +294,31 @@ private void replacePartitions( private void commitDeltaTxn( Table table, String branch, - NavigableMap> pendingResults, + NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId) { - for (Map.Entry> e : pendingResults.entrySet()) { - long checkpointId = e.getKey(); - List writeResults = e.getValue(); - + for (Map.Entry e : pendingResults.entrySet()) { + // We don't commit the merged result into a single transaction because for the sequential + // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied + // to data files from txn1. Committing the merged one will lead to the incorrect delete + // semantic. + WriteResult result = e.getValue(); + + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced by + // the position delete files that are being committed. RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); - for (WriteResult result : writeResults) { - // Row delta validations are not needed for streaming changes that write equality deletes. - // Equality deletes are applied to data in all previous sequence numbers, so retries may - // push deletes further in the future, but do not affect correctness. Position deletes - // committed to the table in this path are used only to delete rows from data files that are - // being added in this commit. There is no way for data files added along with the delete - // files to be concurrently removed, so there is no need to validate the files referenced by - // the position delete files that are being committed. - Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); - Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - } - // Every Flink checkpoint contains a set of independent changes which can be committed - // together. While it is technically feasible to combine append-only data across checkpoints, - // for the sake of simplicity, we do not implement this (premature) optimization. Multiple - // pending checkpoints here are very rare to occur, i.e. only with very short checkpoint - // intervals or when concurrent checkpointing is enabled. + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + commitOperation( - table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); + table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey()); } } @@ -374,54 +367,4 @@ void commitOperation( public void close() throws IOException { workerPool.shutdown(); } - - private static class TableKey implements Serializable { - private String tableName; - private String branch; - - TableKey(String tableName, String branch) { - this.tableName = tableName; - this.branch = branch; - } - - TableKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); - } - - String tableName() { - return tableName; - } - - String branch() { - return branch; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - TableKey that = (TableKey) other; - return tableName.equals(that.tableName) && branch.equals(that.branch); - } - - @Override - public int hashCode() { - return Objects.hash(tableName, branch); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("tableName", tableName) - .add("branch", branch) - .toString(); - } - } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java index 85806f932ad5..855859e64def 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java @@ -19,22 +19,30 @@ package org.apache.iceberg.flink.sink.dynamic; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; class DynamicWriteResult { - - private final WriteTarget key; + private final TableKey key; private final WriteResult writeResult; - DynamicWriteResult(WriteTarget key, WriteResult writeResult) { + DynamicWriteResult(TableKey key, WriteResult writeResult) { this.key = key; this.writeResult = writeResult; } - WriteTarget key() { + TableKey key() { return key; } WriteResult writeResult() { return writeResult; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key) + .add("writeResult", writeResult) + .toString(); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index 77bd2a0f975d..6168d33df1f0 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -40,6 +40,7 @@ import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.flink.sink.ManifestOutputFileFactory; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; @@ -55,11 +56,11 @@ class DynamicWriteResultAggregator implements OneInputStreamOperator< CommittableMessage, CommittableMessage> { private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteResultAggregator.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; + private static final byte[][] EMPTY_MANIFEST_DATA = new byte[0][]; private final CatalogLoader catalogLoader; private final int cacheMaximumSize; - private transient Map> results; + private transient Map> results; private transient Map> specs; private transient Map outputFileFactories; private transient String flinkJobId; @@ -95,12 +96,12 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { Collection> committables = Sets.newHashSetWithExpectedSize(results.size()); int count = 0; - for (Map.Entry> entries : results.entrySet()) { + for (Map.Entry> entries : results.entrySet()) { committables.add( new CommittableWithLineage<>( new DynamicCommittable( entries.getKey(), - writeToManifest(entries.getKey(), entries.getValue(), checkpointId), + writeToManifests(entries.getKey().tableName(), entries.getValue(), checkpointId), getContainingTask().getEnvironment().getJobID().toString(), getRuntimeContext().getOperatorUniqueID(), checkpointId), @@ -122,26 +123,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { } /** - * Write all the completed data files to a newly created manifest file and return the manifest's + * Write all the completed data files to a newly created manifest files and return the manifests' * avro serialized bytes. */ @VisibleForTesting - byte[] writeToManifest( - WriteTarget key, Collection writeResults, long checkpointId) + byte[][] writeToManifests( + String tableName, Collection writeResults, long checkpointId) throws IOException { if (writeResults.isEmpty()) { return EMPTY_MANIFEST_DATA; } + Map> writeResultsByPartitionSpec = + collectBySpecId(writeResults); + byte[][] deltaManifests = new byte[writeResultsByPartitionSpec.size()][]; + int idx = 0; + for (Map.Entry> entry : + writeResultsByPartitionSpec.entrySet()) { + deltaManifests[idx] = + writeToManifest(tableName, entry.getKey(), entry.getValue(), checkpointId); + idx++; + } + return deltaManifests; + } + + private byte[] writeToManifest( + String tableName, Integer specId, Collection writeResults, long checkpointId) + throws IOException { WriteResult.Builder builder = WriteResult.builder(); - writeResults.forEach(w -> builder.add(w.writeResult())); + writeResults.forEach(builder::add); WriteResult result = builder.build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( result, - () -> outputFileFactory(key.tableName()).create(checkpointId), - spec(key.tableName(), key.specId()), + () -> outputFileFactory(tableName).create(checkpointId), + spec(tableName, specId), 2); return SimpleVersionedSerialization.writeVersionAndSerialize( @@ -155,8 +172,10 @@ public void processElement(StreamRecord> if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { DynamicWriteResult result = ((CommittableWithLineage) element.getValue()).getCommittable(); - WriteTarget key = result.key(); - results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result); + Collection resultsPerTableKey = + results.computeIfAbsent(result.key(), unused -> Lists.newArrayList()); + resultsPerTableKey.add(result.writeResult()); + LOG.debug("Added {}, totalResults={}", result, resultsPerTableKey.size()); } } @@ -192,4 +211,25 @@ private PartitionSpec spec(String tableName, int specId) { Table table = catalog.loadTable(TableIdentifier.parse(tableName)); return table.specs().get(specId); } + + private static Map> collectBySpecId( + Collection writeResults) { + Map> resultsBySpecId = Maps.newHashMap(); + writeResults.forEach( + writeResult -> { + int specId; + if (writeResult.dataFiles().length > 0) { + specId = writeResult.dataFiles()[0].specId(); + } else if (writeResult.deleteFiles().length > 0) { + specId = writeResult.deleteFiles()[0].specId(); + } else if (writeResult.rewrittenDeleteFiles().length > 0) { + specId = writeResult.rewrittenDeleteFiles()[0].specId(); + } else { + // Empty write result + return; + } + resultsBySpecId.computeIfAbsent(specId, ignored -> Lists.newArrayList()).add(writeResult); + }); + return resultsBySpecId; + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java index cf5f423fd7ff..60765673004a 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java @@ -50,7 +50,7 @@ public byte[] serialize(DynamicWriteResult writeResult) throws IOException { public DynamicWriteResult deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); + TableKey key = TableKey.deserializeFrom(view); byte[] resultBuf = new byte[view.available()]; view.read(resultBuf); WriteResult writeResult = WRITE_RESULT_SERIALIZER.deserialize(version, resultBuf); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index 5ed9da8623e9..a31df810a2ee 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -193,7 +193,8 @@ public Collection prepareCommit() throws IOException { writeResult.dataFiles().length, writeResult.deleteFiles().length); - result.add(new DynamicWriteResult(writeTarget, writeResult)); + TableKey tableKey = new TableKey(writeTarget.tableName(), writeTarget.branch()); + result.add(new DynamicWriteResult(tableKey, writeResult)); } writers.clear(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java new file mode 100644 index 000000000000..fbb61fba56fb --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +class TableKey implements Serializable { + private final String tableName; + private final String branch; + + TableKey(String tableName, String branch) { + this.tableName = tableName; + this.branch = branch; + } + + TableKey(DynamicCommittable committable) { + this.tableName = committable.key().tableName(); + this.branch = committable.key().branch(); + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + void serializeTo(DataOutputView view) throws IOException { + view.writeUTF(tableName); + view.writeUTF(branch); + } + + static TableKey deserializeFrom(DataInputView view) throws IOException { + return new TableKey(view.readUTF(), view.readUTF()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + TableKey that = (TableKey) other; + return tableName.equals(that.tableName) && branch.equals(that.branch); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .toString(); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java index 13a06d362717..785118aa0674 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java @@ -24,38 +24,28 @@ import java.io.IOException; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; class TestDynamicCommittableSerializer { + private static final DynamicCommittable COMMITTABLE = + new DynamicCommittable( + new TableKey("table", "branch"), + new byte[][] {{3, 4}}, + JobID.generate().toHexString(), + new OperatorID().toHexString(), + 5); @Test void testRoundtrip() throws IOException { - DynamicCommittable committable = - new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, - JobID.generate().toHexString(), - new OperatorID().toHexString(), - 5); - DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(committable))) - .isEqualTo(committable); + assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(COMMITTABLE))) + .isEqualTo(COMMITTABLE); } @Test - void testUnsupportedVersion() throws IOException { - DynamicCommittable committable = - new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, - JobID.generate().toHexString(), - new OperatorID().toHexString(), - 5); - + void testUnsupportedVersion() { DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(committable))) + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(COMMITTABLE))) .hasMessage("Unrecognized version or corrupt state: -1") .isInstanceOf(IOException.class); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 7894428a781f..7ec18a7880bc 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -50,6 +50,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; @@ -85,6 +86,8 @@ class TestDynamicCommitter { ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds )) .build(); + private static final WriteResult WRITE_RESULT = + WriteResult.builder().addDataFiles(DATA_FILE).build(); private static final DataFile DATA_FILE_2 = DataFiles.builder(PartitionSpec.unpartitioned()) @@ -149,12 +152,9 @@ void testCommit() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch", 42, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget2 = - new WriteTarget(TABLE1, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget3 = - new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); + TableKey tableKey1 = new TableKey(TABLE1, "branch"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); + TableKey tableKey3 = new TableKey(TABLE2, "branch2"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -162,27 +162,12 @@ void testCommit() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget3, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey1.tableName(), Lists.newArrayList(WRITE_RESULT), 0); + byte[][] deltaManifests2 = + aggregator.writeToManifests(tableKey2.tableName(), Lists.newArrayList(WRITE_RESULT), 0); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey3.tableName(), Lists.newArrayList(WRITE_RESULT), 0); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); @@ -190,15 +175,15 @@ void testCommit() throws Exception { CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey1, deltaManifests1, jobId, operatorId, checkpointId)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey2, deltaManifests2, jobId, operatorId, checkpointId)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey3, deltaManifests3, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); @@ -261,7 +246,7 @@ void testCommit() throws Exception { } @Test - void testAlreadyCommitted() throws Exception { + void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); assertThat(table1.snapshots()).isEmpty(); @@ -279,8 +264,7 @@ void testAlreadyCommitted() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -292,24 +276,18 @@ void testAlreadyCommitted() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), Lists.newArrayList(WRITE_RESULT), 0); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); CommitRequest oldCommitRequest = new MockCommitRequest<>( - new DynamicCommittable( - writeTarget, deltaManifest, jobId, operatorId, checkpointId - 1)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId - 1)); // Old commits requests shouldn't affect the result dynamicCommitter.commit(Sets.newHashSet(oldCommitRequest)); @@ -346,12 +324,8 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); - // writeTarget2 has a different schema - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); - // Different branch for writeTarget3 - WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, Sets.newHashSet()); + TableKey tableKey1 = new TableKey(TABLE1, "branch1"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); @@ -361,35 +335,29 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { final int checkpointId1 = 1; final int checkpointId2 = 2; - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId1); + byte[][] deltaManifests1 = + aggregator.writeToManifests( + tableKey1.tableName(), Sets.newHashSet(writeResult1), checkpointId1); CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); + new DynamicCommittable(tableKey1, deltaManifests1, jobId, operatorId, checkpointId1)); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId1); + byte[][] deltaManifests2 = + aggregator.writeToManifests( + tableKey1.tableName(), Sets.newHashSet(writeResult2), checkpointId1); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId1)); + new DynamicCommittable(tableKey1, deltaManifests2, jobId, operatorId, checkpointId1)); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult2)), - checkpointId2); + byte[][] deltaManifests3 = + aggregator.writeToManifests( + tableKey2.tableName(), Sets.newHashSet(writeResult2), checkpointId2); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + new DynamicCommittable(tableKey2, deltaManifests3, jobId, operatorId, checkpointId2)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -461,49 +429,35 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet()); - // writeTarget2 has a different schema - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); - WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); + TableKey tableKey = new TableKey(TABLE1, "branch"); - WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); WriteResult writeResult2 = WriteResult.builder().addDeleteFiles(DELETE_FILE).build(); - WriteResult writeResult3 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + WriteResult writeResult3 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); + + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey.tableName(), Lists.newArrayList(WRITE_RESULT), 0); + byte[][] deltaManifests2 = + aggregator.writeToManifests(tableKey.tableName(), Lists.newArrayList(writeResult2), 0); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey.tableName(), Lists.newArrayList(writeResult3), 0); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId1 = 1; final int checkpointId2 = 2; - - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId1); + final int checkpointId3 = 3; CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); - - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId2); + new DynamicCommittable(tableKey, deltaManifests1, jobId, operatorId, checkpointId1)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId2)); - - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult3)), - checkpointId2); + new DynamicCommittable(tableKey, deltaManifests2, jobId, operatorId, checkpointId2)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + new DynamicCommittable(tableKey, deltaManifests3, jobId, operatorId, checkpointId3)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -549,10 +503,7 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { } table.refresh(); - // Three committables, but only two snapshots! WriteResults from different checkpoints are not - // getting - // combined due to one writeResult2 containing a delete file. - assertThat(table.snapshots()).hasSize(2); + assertThat(table.snapshots()).hasSize(3); Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); assertThat(snapshot1.summary()) @@ -576,18 +527,34 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThat(snapshot2.summary()) .containsAllEntriesOf( ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") .put("changed-partition-count", "1") .put("flink.job-id", jobId) .put("flink.max-committed-checkpoint-id", "" + checkpointId2) .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "1") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "24") + .put("total-records", "42") + .build()); + + Snapshot snapshot3 = Iterables.get(table.snapshots(), 2); + assertThat(snapshot3.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "24") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId3) + .put("flink.operator-id", operatorId) .put("total-data-files", "2") .put("total-delete-files", "1") .put("total-equality-deletes", "0") .put("total-files-size", "0") .put("total-position-deletes", "24") - .put("total-records", "84") + .put("total-records", "66") .build()); } @@ -611,8 +578,7 @@ void testReplacePartitions() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -624,32 +590,21 @@ void testReplacePartitions() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), Lists.newArrayList(WRITE_RESULT), 0); + byte[][] overwriteManifests = + aggregator.writeToManifests(tableKey.tableName(), Lists.newArrayList(WRITE_RESULT), 0); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); - byte[] overwriteManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId + 1); - CommitRequest overwriteRequest = new MockCommitRequest<>( new DynamicCommittable( - writeTarget, overwriteManifest, jobId, operatorId, checkpointId + 1)); + tableKey, overwriteManifests, jobId, operatorId, checkpointId + 1)); dynamicCommitter.commit(Sets.newHashSet(overwriteRequest)); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 20fae212b48e..7a56b5ec0939 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -55,6 +55,8 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -73,6 +75,7 @@ import org.apache.iceberg.inmemory.InMemoryInputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -423,6 +426,26 @@ void testPartitionSpecEvolution() throws Exception { new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, "t1", "main", spec2)); runTest(rows); + + // Validate the table has expected partition specs + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1")); + + Map tableSpecs = table.specs(); + List expectedSpecs = List.of(spec1, spec2, PartitionSpec.unpartitioned()); + + assertThat(tableSpecs).hasSize(expectedSpecs.size()); + expectedSpecs.forEach( + expectedSpec -> + assertThat( + tableSpecs.values().stream() + // TODO: Fix PartitionSpecEvolution#evolve to re-use PartitionField names of + // the target spec, + // which would allow us to compare specs with + // PartitionSpec#compatibleWith here. + .anyMatch( + spec -> PartitionSpecEvolution.checkCompatibility(spec, expectedSpec))) + .withFailMessage("Table spec not found: %s.", expectedSpec) + .isTrue()); } @Test @@ -570,6 +593,71 @@ void testCommitConcurrency() throws Exception { executeDynamicSink(rows, env, true, 1, commitHook); } + @Test + void testProducesSingleSnapshotPerTableBranchAndCheckpoint() throws Exception { + String tableName = "t1"; + String branch = SnapshotRef.MAIN_BRANCH; + PartitionSpec spec1 = PartitionSpec.unpartitioned(); + PartitionSpec spec2 = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + Set equalityFields = Sets.newHashSet("id"); + + List inputRecords = + Lists.newArrayList( + // Two schemas + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA2, tableName, branch, spec1), + // Two specs + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec2), + // Some upserts + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, false), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, true)); + + executeDynamicSink(inputRecords, env, true, 1, null); + + List actualRecords; + try (CloseableIterable iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + actualRecords = Lists.newArrayList(iterable); + } + + int expectedRecords = inputRecords.size() - 1; // 1 duplicate + assertThat(actualRecords).hasSize(expectedRecords); + + for (int i = 0; i < expectedRecords; i++) { + Record actual = actualRecords.get(0); + assertThat(inputRecords) + .anySatisfy( + inputRecord -> { + assertThat(actual.get(0)).isEqualTo(inputRecord.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(inputRecord.rowProvided.getField(1)); + if (inputRecord.schemaProvided.equals(SimpleDataUtil.SCHEMA2)) { + assertThat(actual.get(2)).isEqualTo(inputRecord.rowProvided.getField(2)); + } + // There is an additional _pos field which gets added + }); + } + + TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); + Table table = CATALOG_EXTENSION.catalog().loadTable(tableIdentifier); + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + + assertThat(snapshots).hasSize(2); // Table creation + data changes + assertThat(snapshots.get(1).summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-records", "6") + .put("changed-partition-count", "2") + .put("total-equality-deletes", "1") + .put("total-position-deletes", "1") + .put("total-records", "6") + .build()); + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java index d3f4385d972b..e88fd5d7f969 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.hadoop.util.Sets; @@ -58,7 +59,7 @@ class TestDynamicWriteResultAggregator { .build(); @Test - void testAggregator() throws Exception { + void testAggregatesWriteResultsForTwoTables() throws Exception { CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); @@ -69,13 +70,12 @@ void testAggregator() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = new WriteTarget("table", "branch", 42, 0, true, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = - new DynamicWriteResult(writeTarget1, WriteResult.builder().build()); - WriteTarget writeTarget2 = - new WriteTarget("table2", "branch", 42, 0, true, Sets.newHashSet(1, 2)); + new DynamicWriteResult(tableKey1, WriteResult.builder().build()); + TableKey tableKey2 = new TableKey("table2", "branch"); DynamicWriteResult dynamicWriteResult2 = - new DynamicWriteResult(writeTarget2, WriteResult.builder().build()); + new DynamicWriteResult(tableKey2, WriteResult.builder().build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -113,18 +113,14 @@ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = - new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(tableKey1, WriteResult.builder().addDataFiles(DATA_FILE).build()); // Different WriteTarget - WriteTarget writeTarget2 = - new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet()); + TableKey tableKey2 = new TableKey("table", "branch2"); DynamicWriteResult dynamicWriteResult2 = - new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(tableKey2, WriteResult.builder().addDataFiles(DATA_FILE).build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -158,15 +154,101 @@ private static Set getManifestPaths( for (StreamRecord> record : messages) { CommittableMessage message = record.getValue(); if (message instanceof CommittableWithLineage) { - DeltaManifests deltaManifests = - SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, - (((CommittableWithLineage) message).getCommittable()) - .manifest()); - deltaManifests.manifests().forEach(manifest -> manifestPaths.add(manifest.path())); + for (byte[] manifest : + (((CommittableWithLineage) message).getCommittable()).manifests()) { + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, manifest); + deltaManifests + .manifests() + .forEach(manifestFile -> manifestPaths.add(manifestFile.path())); + } } } return manifestPaths; } + + @Test + void testAggregatesWriteResultsForOneTable() throws Exception { + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); + + long checkpointId = 1L; + + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new DynamicWriteResultAggregator( + CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize))) { + testHarness.open(); + + TableKey tableKey = new TableKey("table", "branch"); + DataFile dataFile1 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-1.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + DataFile dataFile2 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-2.parquet") + .withFileSizeInBytes(20) + .withRecordCount(2) + .build(); + + testHarness.processElement(createRecord(tableKey, checkpointId, dataFile1)); + testHarness.processElement(createRecord(tableKey, checkpointId, dataFile2)); + + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.prepareSnapshotPreBarrier(checkpointId); + + List> outputValues = testHarness.extractOutputValues(); + // Contains a CommittableSummary + DynamicCommittable + assertThat(outputValues).hasSize(2); + + SinkV2Assertions.assertThat(extractAndAssertCommittableSummary(outputValues.get(0))) + .hasOverallCommittables(1) + .hasFailedCommittables(0) + .hasCheckpointId(checkpointId); + + CommittableWithLineage committable = + extractAndAssertCommittableWithLineage(outputValues.get(1)); + + SinkV2Assertions.assertThat(committable).hasCheckpointId(checkpointId); + + DynamicCommittable dynamicCommittable = committable.getCommittable(); + + assertThat(dynamicCommittable.manifests()).hasNumberOfRows(1); + assertThat(dynamicCommittable.key()).isEqualTo(tableKey); + assertThat(dynamicCommittable.checkpointId()).isEqualTo(checkpointId); + assertThat(dynamicCommittable.jobId()) + .isEqualTo(testHarness.getEnvironment().getJobID().toString()); + assertThat(dynamicCommittable.operatorId()) + .isEqualTo(testHarness.getOperator().getOperatorID().toString()); + } + } + + private static StreamRecord> createRecord( + TableKey tableKey, long checkpointId, DataFile... dataFiles) { + return new StreamRecord<>( + new CommittableWithLineage<>( + new DynamicWriteResult(tableKey, WriteResult.builder().addDataFiles(dataFiles).build()), + checkpointId, + 0)); + } + + static CommittableSummary extractAndAssertCommittableSummary( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableSummary.class); + return (CommittableSummary) message; + } + + static CommittableWithLineage extractAndAssertCommittableWithLineage( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableWithLineage.class); + return (CommittableWithLineage) message; + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java index a3a9691107eb..49647c7f30b3 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.util.Sets; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.Metrics; @@ -48,13 +47,12 @@ class TestDynamicWriteResultSerializer { ImmutableMap.of(1, ByteBuffer.allocate(1)), ImmutableMap.of(1, ByteBuffer.allocate(1)))) .build(); + private static final TableKey TABLE_KEY = new TableKey("table", "branch"); @Test void testRoundtrip() throws IOException { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); DynamicWriteResult copy = @@ -68,11 +66,9 @@ void testRoundtrip() throws IOException { } @Test - void testUnsupportedVersion() throws IOException { + void testUnsupportedVersion() { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(dynamicWriteResult)))