diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 1b786e46452f..1da0f8564938 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.sink; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; @@ -44,7 +45,11 @@ public CommitSummary(NavigableMap pendingResults) { } public void addAll(NavigableMap> pendingResults) { - pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult)); + pendingResults.values().forEach(this::addAll); + } + + public void addAll(Collection pendingResults) { + pendingResults.forEach(this::addWriteResult); } private void addWriteResult(WriteResult writeResult) { diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java index 33edefe71eb0..4f0b68573ff5 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java @@ -26,7 +26,7 @@ /** * The aggregated results of a single checkpoint which should be committed. Containing the - * serialized {@link DeltaManifests} file - which contains the commit data, and the jobId, + * serialized {@link DeltaManifests} files - which contains the commit data, and the jobId, * operatorId, checkpointId triplet to identify the specific commit. * *

{@link DynamicCommittableSerializer} is used to serialize {@link DynamicCommittable} between @@ -34,27 +34,27 @@ */ class DynamicCommittable implements Serializable { - private final WriteTarget key; - private final byte[] manifest; + private final TableKey key; + private final byte[][] manifests; private final String jobId; private final String operatorId; private final long checkpointId; DynamicCommittable( - WriteTarget key, byte[] manifest, String jobId, String operatorId, long checkpointId) { + TableKey key, byte[][] manifests, String jobId, String operatorId, long checkpointId) { this.key = key; - this.manifest = manifest; + this.manifests = manifests; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; } - WriteTarget key() { + TableKey key() { return key; } - byte[] manifest() { - return manifest; + byte[][] manifests() { + return manifests; } String jobId() { @@ -78,14 +78,14 @@ public boolean equals(Object o) { DynamicCommittable that = (DynamicCommittable) o; return checkpointId == that.checkpointId && Objects.equals(key, that.key) - && Objects.deepEquals(manifest, that.manifest) + && Arrays.deepEquals(manifests, that.manifests) && Objects.equals(jobId, that.jobId) && Objects.equals(operatorId, that.operatorId); } @Override public int hashCode() { - return Objects.hash(key, Arrays.hashCode(manifest), jobId, operatorId, checkpointId); + return Objects.hash(key, Arrays.deepHashCode(manifests), jobId, operatorId, checkpointId); } @Override @@ -97,8 +97,4 @@ public String toString() { .add("operatorId", operatorId) .toString(); } - - public WriteTarget writeTarget() { - return key; - } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java index 4aadcf1f3620..d599d29dba01 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java @@ -31,11 +31,12 @@ */ class DynamicCommittableSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; @Override public int getVersion() { - return VERSION; + return VERSION_2; } @Override @@ -46,26 +47,60 @@ public byte[] serialize(DynamicCommittable committable) throws IOException { view.writeUTF(committable.jobId()); view.writeUTF(committable.operatorId()); view.writeLong(committable.checkpointId()); - view.writeInt(committable.manifest().length); - view.write(committable.manifest()); + + int numManifests = committable.manifests().length; + view.writeInt(numManifests); + for (int i = 0; i < numManifests; i++) { + byte[] manifest = committable.manifests()[i]; + view.writeInt(manifest.length); + view.write(manifest); + } + return out.toByteArray(); } @Override public DynamicCommittable deserialize(int version, byte[] serialized) throws IOException { - if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); - String jobId = view.readUTF(); - String operatorId = view.readUTF(); - long checkpointId = view.readLong(); - int manifestLen = view.readInt(); - byte[] manifestBuf; - manifestBuf = new byte[manifestLen]; - view.read(manifestBuf); - return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); + if (version == VERSION_1) { + return deserializeV1(serialized); + } else if (version == VERSION_2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private DynamicCommittable deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + WriteTarget key = WriteTarget.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + return new DynamicCommittable( + new TableKey(key.tableName(), key.branch()), + new byte[][] {manifestBuf}, + jobId, + operatorId, + checkpointId); + } + + private DynamicCommittable deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + TableKey key = TableKey.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + + byte[][] manifestsBuf = new byte[view.readInt()][]; + for (int i = 0; i < manifestsBuf.length; i++) { + byte[] manifest = new byte[view.readInt()]; + view.read(manifest); + manifestsBuf[i] = manifest; + } + + return new DynamicCommittable(key, manifestsBuf, jobId, operatorId, checkpointId); + } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 1cddc64d6016..5e824773f4bf 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -19,15 +19,15 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.IOException; -import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; @@ -51,12 +51,10 @@ import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ContentFileUtil; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; @@ -79,26 +77,15 @@ class DynamicCommitter implements Committer { private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; - private static final WriteResult EMPTY_WRITE_RESULT = - WriteResult.builder() - .addDataFiles(Lists.newArrayList()) - .addDeleteFiles(Lists.newArrayList()) - .build(); private static final long INITIAL_CHECKPOINT_ID = -1L; - @VisibleForTesting - static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits"; - private static final String FLINK_JOB_ID = "flink.job-id"; private static final String OPERATOR_ID = "flink.operator-id"; private final Map snapshotProperties; private final boolean replacePartitions; private final DynamicCommitterMetrics committerMetrics; private final Catalog catalog; - private final Map maxContinuousEmptyCommitsMap; - private final Map continuousEmptyCheckpointsMap; private final ExecutorService workerPool; DynamicCommitter( @@ -112,9 +99,6 @@ class DynamicCommitter implements Committer { this.replacePartitions = replacePartitions; this.committerMetrics = committerMetrics; this.catalog = catalog; - this.maxContinuousEmptyCommitsMap = Maps.newHashMap(); - this.continuousEmptyCheckpointsMap = Maps.newHashMap(); - this.workerPool = ThreadPools.newFixedThreadPool("iceberg-committer-pool-" + sinkId, workerPoolSize); } @@ -126,9 +110,15 @@ public void commit(Collection> commitRequests) return; } - // For every table and every checkpoint, we store the list of to-be-committed - // DynamicCommittable. - // There may be DynamicCommittable from previous checkpoints which have not been committed yet. + /* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests created by the upstream + DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users should upgrade to the 1.11 release first + to migrate their state to a single commit request per checkpoint. + */ Map>>> commitRequestMap = Maps.newHashMap(); for (CommitRequest request : commitRequests) { @@ -151,12 +141,16 @@ public void commit(Collection> commitRequests) : List.of(); long maxCommittedCheckpointId = getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId()); + + NavigableMap>> skippedCommitRequests = + entry.getValue().headMap(maxCommittedCheckpointId, true); + LOG.debug( + "Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests); // Mark the already committed FilesCommittable(s) as finished - entry - .getValue() - .headMap(maxCommittedCheckpointId, true) + skippedCommitRequests .values() .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); + NavigableMap>> uncommitted = entry.getValue().tailMap(maxCommittedCheckpointId, false); if (!uncommitted.isEmpty()) { @@ -210,89 +204,45 @@ private void commitPendingRequests( NavigableMap> pendingResults = Maps.newTreeMap(); for (Map.Entry>> e : commitRequestMap.entrySet()) { for (CommitRequest committable : e.getValue()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.getCommittable().manifest())) { - pendingResults - .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(EMPTY_WRITE_RESULT); - } else { + for (byte[] manifest : committable.getCommittable().manifests()) { DeltaManifests deltaManifests = SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, committable.getCommittable().manifest()); - - WriteResult writeResult = - FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()); - if (TableUtil.formatVersion(table) > 2) { - for (DeleteFile deleteFile : writeResult.deleteFiles()) { - if (deleteFile.content() == FileContent.POSITION_DELETES) { - Preconditions.checkArgument( - ContentFileUtil.isDV(deleteFile), - "Can't add position delete file to the %s table. Concurrent table upgrade to V3 is not supported.", - table.name()); - } - } - } - + DeltaManifestsSerializer.INSTANCE, manifest); pendingResults .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(writeResult); + .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); manifests.addAll(deltaManifests.manifests()); } } } - CommitSummary summary = new CommitSummary(); - summary.addAll(pendingResults); - commitPendingResult(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - if (committerMetrics != null) { - committerMetrics.updateCommitSummary(table.name(), summary); + if (TableUtil.formatVersion(table) > 2) { + Optional positionalDelete = + pendingResults.values().stream() + .flatMap(List::stream) + .flatMap(writeResult -> Arrays.stream(writeResult.deleteFiles())) + .filter(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES) + .filter(Predicate.not(ContentFileUtil::isDV)) + .findAny(); + Preconditions.checkArgument( + positionalDelete.isEmpty(), + "Can't add position delete file to the %s table. Concurrent table upgrade to V3 is not supported.", + table.name()); } - FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId); - } - - private void commitPendingResult( - Table table, - String branch, - NavigableMap> pendingResults, - CommitSummary summary, - String newFlinkJobId, - String operatorId) { - long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); - TableKey key = new TableKey(table.name(), branch); - int continuousEmptyCheckpoints = - continuousEmptyCheckpointsMap.computeIfAbsent(key, unused -> 0); - int maxContinuousEmptyCommits = - maxContinuousEmptyCommitsMap.computeIfAbsent( - key, - unused -> { - int result = - PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10); - Preconditions.checkArgument( - result > 0, MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive"); - return result; - }); - continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; - if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { - if (replacePartitions) { - replacePartitions(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - } else { - commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - } - - continuousEmptyCheckpoints = 0; + if (replacePartitions) { + replacePartitions(table, branch, pendingResults, newFlinkJobId, operatorId); } else { - long checkpointId = pendingResults.lastKey(); - LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId); + commitDeltaTxn(table, branch, pendingResults, newFlinkJobId, operatorId); } - continuousEmptyCheckpointsMap.put(key, continuousEmptyCheckpoints); + FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId); } private void replacePartitions( Table table, String branch, NavigableMap> pendingResults, - CommitSummary summary, String newFlinkJobId, String operatorId) { // Iceberg tables are unsorted. So the order of the append data does not matter. @@ -305,6 +255,9 @@ private void replacePartitions( } } + CommitSummary summary = new CommitSummary(); + summary.addAll(pendingResults); + commitOperation( table, branch, @@ -320,7 +273,6 @@ private void commitDeltaTxn( Table table, String branch, NavigableMap> pendingResults, - CommitSummary summary, String newFlinkJobId, String operatorId) { for (Map.Entry> e : pendingResults.entrySet()) { @@ -340,6 +292,9 @@ private void commitDeltaTxn( Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); } + CommitSummary summary = new CommitSummary(); + summary.addAll(writeResults); + // Every Flink checkpoint contains a set of independent changes which can be committed // together. While it is technically feasible to combine append-only data across checkpoints, // for the sake of simplicity, we do not implement this (premature) optimization. Multiple @@ -434,6 +389,7 @@ void commitOperation( durationMs); if (committerMetrics != null) { committerMetrics.commitDuration(table.name(), durationMs); + committerMetrics.updateCommitSummary(table.name(), summary); } } @@ -441,54 +397,4 @@ void commitOperation( public void close() throws IOException { workerPool.shutdown(); } - - private static class TableKey implements Serializable { - private String tableName; - private String branch; - - TableKey(String tableName, String branch) { - this.tableName = tableName; - this.branch = branch; - } - - TableKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); - } - - String tableName() { - return tableName; - } - - String branch() { - return branch; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - TableKey that = (TableKey) other; - return tableName.equals(that.tableName) && branch.equals(that.branch); - } - - @Override - public int hashCode() { - return Objects.hash(tableName, branch); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("tableName", tableName) - .add("branch", branch) - .toString(); - } - } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java index 85806f932ad5..d8d0ed6b573e 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java @@ -19,22 +19,37 @@ package org.apache.iceberg.flink.sink.dynamic; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; class DynamicWriteResult { - - private final WriteTarget key; + private final TableKey key; + private final int specId; private final WriteResult writeResult; - DynamicWriteResult(WriteTarget key, WriteResult writeResult) { + DynamicWriteResult(TableKey key, int specId, WriteResult writeResult) { this.key = key; + this.specId = specId; this.writeResult = writeResult; } - WriteTarget key() { + TableKey key() { return key; } + public int specId() { + return specId; + } + WriteResult writeResult() { return writeResult; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key) + .add("specId", specId) + .add("writeResult", writeResult) + .toString(); + } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index 927491fa89ea..47d239200b06 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -42,6 +42,7 @@ import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.flink.sink.ManifestOutputFileFactory; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; @@ -57,11 +58,10 @@ class DynamicWriteResultAggregator implements OneInputStreamOperator< CommittableMessage, CommittableMessage> { private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteResultAggregator.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; private final CatalogLoader catalogLoader; private final int cacheMaximumSize; - private transient Map> results; + private transient Map>> resultsByTableKeyAndSpec; private transient Map> specs; private transient Map> outputFileFactoriesAndFormatVersions; @@ -82,7 +82,7 @@ public void open() throws Exception { this.operatorId = getOperatorID().toString(); this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber(); - this.results = Maps.newHashMap(); + this.resultsByTableKeyAndSpec = Maps.newHashMap(); this.specs = new LRUCache<>(cacheMaximumSize); this.outputFileFactoriesAndFormatVersions = new LRUCache<>(cacheMaximumSize); this.catalog = catalogLoader.loadCatalog(); @@ -96,14 +96,15 @@ public void finish() throws IOException { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { Collection> committables = - Sets.newHashSetWithExpectedSize(results.size()); + Sets.newHashSetWithExpectedSize(resultsByTableKeyAndSpec.size()); int count = 0; - for (Map.Entry> entries : results.entrySet()) { + for (Map.Entry>> entries : + resultsByTableKeyAndSpec.entrySet()) { committables.add( new CommittableWithLineage<>( new DynamicCommittable( entries.getKey(), - writeToManifest(entries.getKey(), entries.getValue(), checkpointId), + writeToManifests(entries.getKey().tableName(), entries.getValue(), checkpointId), getContainingTask().getEnvironment().getJobID().toString(), getRuntimeContext().getOperatorUniqueID(), checkpointId), @@ -121,32 +122,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { new StreamRecord<>( new CommittableWithLineage<>(c.getCommittable(), checkpointId, subTaskId)))); LOG.info("Emitted {} commit message to downstream committer operator", count); - results.clear(); + resultsByTableKeyAndSpec.clear(); } /** - * Write all the completed data files to a newly created manifest file and return the manifest's + * Write all the completed data files to a newly created manifest files and return the manifests' * avro serialized bytes. */ @VisibleForTesting - byte[] writeToManifest( - WriteTarget key, Collection writeResults, long checkpointId) + byte[][] writeToManifests( + String tableName, Map> writeResultsBySpec, long checkpointId) throws IOException { - if (writeResults.isEmpty()) { - return EMPTY_MANIFEST_DATA; + byte[][] deltaManifestsBySpec = new byte[writeResultsBySpec.size()][]; + int idx = 0; + for (Map.Entry> entry : writeResultsBySpec.entrySet()) { + deltaManifestsBySpec[idx] = + writeToManifest(tableName, entry.getKey(), entry.getValue(), checkpointId); + idx++; } + return deltaManifestsBySpec; + } + + private byte[] writeToManifest( + String tableName, int specId, Collection writeResults, long checkpointId) + throws IOException { WriteResult.Builder builder = WriteResult.builder(); - writeResults.forEach(w -> builder.add(w.writeResult())); + writeResults.forEach(builder::add); WriteResult result = builder.build(); Tuple2 outputFileFactoryAndVersion = - outputFileFactoryAndFormatVersion(key.tableName()); + outputFileFactoryAndFormatVersion(tableName); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( result, () -> outputFileFactoryAndVersion.f0.create(checkpointId), - spec(key.tableName(), key.specId()), + spec(tableName, specId), outputFileFactoryAndVersion.f1); return SimpleVersionedSerialization.writeVersionAndSerialize( @@ -160,8 +171,16 @@ public void processElement(StreamRecord> if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { DynamicWriteResult result = ((CommittableWithLineage) element.getValue()).getCommittable(); - WriteTarget key = result.key(); - results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result); + Collection resultsPerTableKeyAndSpec = + resultsByTableKeyAndSpec + .computeIfAbsent(result.key(), unused -> Maps.newHashMap()) + .computeIfAbsent(result.specId(), unused -> Lists.newArrayList()); + resultsPerTableKeyAndSpec.add(result.writeResult()); + LOG.debug( + "Added {}, specId={}, totalResults={}", + result, + result.specId(), + resultsPerTableKeyAndSpec.size()); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java index cf5f423fd7ff..5153ec6a49ee 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java @@ -41,6 +41,7 @@ public byte[] serialize(DynamicWriteResult writeResult) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); writeResult.key().serializeTo(view); + view.writeInt(writeResult.specId()); byte[] result = WRITE_RESULT_SERIALIZER.serialize(writeResult.writeResult()); view.write(result); return out.toByteArray(); @@ -50,11 +51,12 @@ public byte[] serialize(DynamicWriteResult writeResult) throws IOException { public DynamicWriteResult deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); + TableKey key = TableKey.deserializeFrom(view); + int specId = view.readInt(); byte[] resultBuf = new byte[view.available()]; view.read(resultBuf); WriteResult writeResult = WRITE_RESULT_SERIALIZER.deserialize(version, resultBuf); - return new DynamicWriteResult(key, writeResult); + return new DynamicWriteResult(key, specId, writeResult); } throw new IOException("Unrecognized version or corrupt state: " + version); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index 907385797495..8425ea747fb7 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -189,7 +189,11 @@ public Collection prepareCommit() throws IOException { writeResult.dataFiles().length, writeResult.deleteFiles().length); - result.add(new DynamicWriteResult(writeTarget, writeResult)); + result.add( + new DynamicWriteResult( + new TableKey(writeTarget.tableName(), writeTarget.branch()), + writeTarget.specId(), + writeResult)); } writers.clear(); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java new file mode 100644 index 000000000000..08b755fe14a8 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.util.Objects; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +class TableKey { + private final String tableName; + private final String branch; + + TableKey(String tableName, String branch) { + this.tableName = tableName; + this.branch = branch; + } + + TableKey(DynamicCommittable committable) { + this.tableName = committable.key().tableName(); + this.branch = committable.key().branch(); + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + void serializeTo(DataOutputView view) throws IOException { + view.writeUTF(tableName); + view.writeUTF(branch); + } + + static TableKey deserializeFrom(DataInputView view) throws IOException { + return new TableKey(view.readUTF(), view.readUTF()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + TableKey that = (TableKey) other; + return tableName.equals(that.tableName) && branch.equals(that.branch); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .toString(); + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java index beb244b72eab..211f28c6518a 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.Set; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -76,18 +75,6 @@ Set equalityFields() { return equalityFields; } - void serializeTo(DataOutputView view) throws IOException { - view.writeUTF(tableName); - view.writeUTF(branch); - view.writeInt(schemaId); - view.writeInt(specId); - view.writeBoolean(upsertMode); - view.writeInt(equalityFields.size()); - for (Integer equalityField : equalityFields) { - view.writeInt(equalityField); - } - } - static WriteTarget deserializeFrom(DataInputView view) throws IOException { return new WriteTarget( view.readUTF(), diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java index 13a06d362717..16890d1f63d0 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java @@ -21,42 +21,94 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Set; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.junit.jupiter.api.Test; class TestDynamicCommittableSerializer { + private static final DynamicCommittable COMMITTABLE = + new DynamicCommittable( + new TableKey("table", "branch"), + new byte[][] {{3, 4}, {5, 6}}, + JobID.generate().toHexString(), + new OperatorID().toHexString(), + 5); @Test - void testRoundtrip() throws IOException { - DynamicCommittable committable = + void testV1() throws IOException { + var committable = new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, + new TableKey("table", "branch"), + new byte[][] {{3, 4}}, JobID.generate().toHexString(), new OperatorID().toHexString(), 5); - DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(committable))) - .isEqualTo(committable); + assertThat(serializer.deserialize(1, serializeV1(committable))).isEqualTo(committable); } @Test - void testUnsupportedVersion() throws IOException { - DynamicCommittable committable = - new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, - JobID.generate().toHexString(), - new OperatorID().toHexString(), - 5); + void testLatestVersion() throws IOException { + DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); + assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(COMMITTABLE))) + .isEqualTo(COMMITTABLE); + } + @Test + void testUnsupportedVersion() { DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(committable))) + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(COMMITTABLE))) .hasMessage("Unrecognized version or corrupt state: -1") .isInstanceOf(IOException.class); } + + byte[] serializeV1(DynamicCommittable committable) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + + // Wrap TableKey into a testing WriteTarget to match the V1 format + WriteTarget writeTarget = + new WriteTarget( + committable.key().tableName(), + committable.key().branch(), + -1, + -1, + false, + Set.of(1, 2, 3)); + view.write(serializeV1(writeTarget)); + + view.writeUTF(committable.jobId()); + view.writeUTF(committable.operatorId()); + view.writeLong(committable.checkpointId()); + + Preconditions.checkArgument( + committable.manifests().length == 1, + "V1 serialization format must have only one manifest per committable."); + view.writeInt(committable.manifests()[0].length); + view.write(committable.manifests()[0]); + + return out.toByteArray(); + } + + byte[] serializeV1(WriteTarget writeTarget) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + + view.writeUTF(writeTarget.tableName()); + view.writeUTF(writeTarget.branch()); + view.writeInt(writeTarget.schemaId()); + view.writeInt(writeTarget.specId()); + view.writeBoolean(writeTarget.upsertMode()); + view.writeInt(writeTarget.equalityFields().size()); + for (Integer equalityField : writeTarget.equalityFields()) { + view.writeInt(equalityField); + } + + return out.toByteArray(); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 1497458e6083..24832c48be1a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -54,6 +54,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; @@ -125,6 +126,15 @@ class TestDynamicCommitter { .ofPositionDeletes() .build(); + private static final Map> WRITE_RESULT_BY_SPEC = + Map.of( + DATA_FILE.specId(), + Lists.newArrayList(WriteResult.builder().addDataFiles(DATA_FILE).build())); + private static final Map> WRITE_RESULT_BY_SPEC_2 = + Map.of( + DATA_FILE_2.specId(), + Lists.newArrayList(WriteResult.builder().addDataFiles(DATA_FILE_2).build())); + @BeforeEach void before() { catalog = CATALOG_EXTENSION.catalog(); @@ -155,12 +165,9 @@ void testCommit() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch", 42, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget2 = - new WriteTarget(TABLE1, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget3 = - new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); + TableKey tableKey1 = new TableKey(TABLE1, "branch"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); + TableKey tableKey3 = new TableKey(TABLE2, "branch2"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -168,27 +175,12 @@ void testCommit() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget3, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey1.tableName(), WRITE_RESULT_BY_SPEC, 0); + byte[][] deltaManifests2 = + aggregator.writeToManifests(tableKey2.tableName(), WRITE_RESULT_BY_SPEC, 0); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey3.tableName(), WRITE_RESULT_BY_SPEC, 0); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); @@ -196,15 +188,15 @@ void testCommit() throws Exception { CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey1, deltaManifests1, jobId, operatorId, checkpointId)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey2, deltaManifests2, jobId, operatorId, checkpointId)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey3, deltaManifests3, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); @@ -267,7 +259,7 @@ void testCommit() throws Exception { } @Test - void testAlreadyCommitted() throws Exception { + void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); assertThat(table1.snapshots()).isEmpty(); @@ -285,8 +277,7 @@ void testAlreadyCommitted() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -298,24 +289,18 @@ void testAlreadyCommitted() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); CommitRequest oldCommitRequest = new MockCommitRequest<>( - new DynamicCommittable( - writeTarget, deltaManifest, jobId, operatorId, checkpointId - 1)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId - 1)); // Old commits requests shouldn't affect the result dynamicCommitter.commit(Sets.newHashSet(oldCommitRequest)); @@ -360,25 +345,23 @@ void testCommitDeleteInDifferentFormatVersion() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); - DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); OneInputStreamOperatorTestHarness aggregatorHarness = new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); + TableKey tableKey = new TableKey(TABLE1, "branch"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, + byte[][] deltaManifests = + aggregator.writeToManifests( + tableKey.tableName(), + Map.of( + DATA_FILE.specId(), + Sets.newHashSet( WriteResult.builder() .addDataFiles(DATA_FILE) .addDeleteFiles(DELETE_FILE) @@ -387,7 +370,7 @@ void testCommitDeleteInDifferentFormatVersion() throws Exception { CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); // Upgrade the table version UpdateProperties updateApi = table1.updateProperties(); @@ -421,8 +404,7 @@ void testCommitOnlyDataInDifferentFormatVersion() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -434,17 +416,12 @@ void testCommitOnlyDataInDifferentFormatVersion() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, checkpointId); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); @@ -486,50 +463,34 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); - // writeTarget2 has a different schema - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); - // Different branch for writeTarget3 - WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, Sets.newHashSet()); - - WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); - WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); + TableKey tableKey1 = new TableKey(TABLE1, "branch1"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId1 = 1; final int checkpointId2 = 2; - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId1); + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey1.tableName(), WRITE_RESULT_BY_SPEC, checkpointId1); CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); + new DynamicCommittable(tableKey1, deltaManifests1, jobId, operatorId, checkpointId1)); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId1); + byte[][] deltaManifests2 = + aggregator.writeToManifests(tableKey1.tableName(), WRITE_RESULT_BY_SPEC_2, checkpointId1); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId1)); + new DynamicCommittable(tableKey1, deltaManifests2, jobId, operatorId, checkpointId1)); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult2)), - checkpointId2); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey2.tableName(), WRITE_RESULT_BY_SPEC_2, checkpointId2); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + new DynamicCommittable(tableKey2, deltaManifests3, jobId, operatorId, checkpointId2)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -601,49 +562,36 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet()); - // writeTarget2 has a different schema - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); - WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); + TableKey tableKey = new TableKey(TABLE1, "branch"); - WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); - WriteResult writeResult2 = WriteResult.builder().addDeleteFiles(DELETE_FILE).build(); - WriteResult writeResult3 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + Map> writeResults = + Map.of( + DELETE_FILE.specId(), + Lists.newArrayList(WriteResult.builder().addDeleteFiles(DELETE_FILE).build())); + + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); + byte[][] deltaManifests2 = aggregator.writeToManifests(tableKey.tableName(), writeResults, 0); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC_2, 0); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId1 = 1; final int checkpointId2 = 2; - - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId1); + final int checkpointId3 = 3; CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); - - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId2); + new DynamicCommittable(tableKey, deltaManifests1, jobId, operatorId, checkpointId1)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId2)); - - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult3)), - checkpointId2); + new DynamicCommittable(tableKey, deltaManifests2, jobId, operatorId, checkpointId2)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + new DynamicCommittable(tableKey, deltaManifests3, jobId, operatorId, checkpointId3)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -693,10 +641,7 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { } table.refresh(); - // Three committables, but only two snapshots! WriteResults from different checkpoints are not - // getting - // combined due to one writeResult2 containing a delete file. - assertThat(table.snapshots()).hasSize(2); + assertThat(table.snapshots()).hasSize(3); Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); assertThat(snapshot1.summary()) @@ -720,18 +665,34 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThat(snapshot2.summary()) .containsAllEntriesOf( ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") .put("changed-partition-count", "1") .put("flink.job-id", jobId) .put("flink.max-committed-checkpoint-id", "" + checkpointId2) .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "1") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "24") + .put("total-records", "42") + .build()); + + Snapshot snapshot3 = Iterables.get(table.snapshots(), 2); + assertThat(snapshot3.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "24") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId3) + .put("flink.operator-id", operatorId) .put("total-data-files", "2") .put("total-delete-files", "1") .put("total-equality-deletes", "0") .put("total-files-size", "0") .put("total-position-deletes", "24") - .put("total-records", "84") + .put("total-records", "66") .build()); } @@ -755,8 +716,7 @@ void testReplacePartitions() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -768,32 +728,22 @@ void testReplacePartitions() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); - byte[] overwriteManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId + 1); + byte[][] overwriteManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); CommitRequest overwriteRequest = new MockCommitRequest<>( new DynamicCommittable( - writeTarget, overwriteManifest, jobId, operatorId, checkpointId + 1)); + tableKey, overwriteManifests, jobId, operatorId, checkpointId + 1)); dynamicCommitter.commit(Sets.newHashSet(overwriteRequest)); @@ -836,18 +786,13 @@ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throw final int checkpointId = 1; final String branch = SnapshotRef.MAIN_BRANCH; - WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, Sets.newHashSet(1, 2)); - byte[] manifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + TableKey tableKey = new TableKey(TABLE1, branch); + byte[][] manifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, checkpointId); CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, manifests, jobId, operatorId, checkpointId)); Collection> commitRequests = Sets.newHashSet(commitRequest1); int workerPoolSize = 1; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 0c07bc946189..a428571e9e28 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -29,8 +29,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -58,6 +61,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -79,6 +83,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -486,6 +491,21 @@ void testPartitionSpecEvolution() throws Exception { SimpleDataUtil.SCHEMA, "t1", SnapshotRef.MAIN_BRANCH, spec2)); runTest(rows); + + // Validate the table has expected partition specs + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1")); + Map tableSpecs = table.specs(); + List expectedSpecs = List.of(spec1, spec2, PartitionSpec.unpartitioned()); + + assertThat(tableSpecs).hasSize(expectedSpecs.size()); + expectedSpecs.forEach( + expectedSpec -> + assertThat( + tableSpecs.values().stream() + .anyMatch( + spec -> PartitionSpecEvolution.checkCompatibility(spec, expectedSpec))) + .withFailMessage("Table spec not found: %s.", expectedSpec) + .isTrue()); } @Test @@ -926,6 +946,79 @@ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws assertThat(totalAddedRecords).isEqualTo(records.size()); } + @Test + void testCommitsOncePerTableBranchAndCheckpoint() throws Exception { + String tableName = "t1"; + String branch = SnapshotRef.MAIN_BRANCH; + PartitionSpec spec1 = PartitionSpec.unpartitioned(); + PartitionSpec spec2 = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + Set equalityFields = Sets.newHashSet("id"); + + List inputRecords = + Lists.newArrayList( + // Two schemas + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA2, tableName, branch, spec1), + // Two specs + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec2), + // Some upserts + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, false), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, true)); + + executeDynamicSink(inputRecords, env, true, 1, null); + + List actualRecords; + try (CloseableIterable iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + actualRecords = Lists.newArrayList(iterable); + } + + // Validate records + int expectedRecords = inputRecords.size() - 1; // 1 duplicate + assertThat(actualRecords).hasSize(expectedRecords); + + for (int i = 0; i < expectedRecords; i++) { + Record actual = actualRecords.get(0); + assertThat(inputRecords) + .anySatisfy( + inputRecord -> { + assertThat(actual.get(0)).isEqualTo(inputRecord.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(inputRecord.rowProvided.getField(1)); + if (inputRecord.schemaProvided.equals(SimpleDataUtil.SCHEMA2)) { + assertThat(actual.get(2)).isEqualTo(inputRecord.rowProvided.getField(2)); + } + // There is an additional _pos field which gets added + }); + } + + TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); + Table table = CATALOG_EXTENSION.catalog().loadTable(tableIdentifier); + + Snapshot lastSnapshot = Iterables.getLast(table.snapshots()); + assertThat(lastSnapshot).isNotNull(); + assertThat(lastSnapshot.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("total-equality-deletes", "1") + .put("total-position-deletes", "1") + .put("total-records", "6") + .build()); + + // Count commits per checkpoint + Map commitsPerCheckpoint = + StreamSupport.stream(table.snapshots().spliterator(), false) + .map(snapshot -> snapshot.summary().get("flink.max-committed-checkpoint-id")) + .filter(Objects::nonNull) + .map(Long::parseLong) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + assertThat(commitsPerCheckpoint.values()).allMatch(count -> count == 1); + } + @Test void testOptInDropUnusedColumns() throws Exception { Schema schema1 = diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java index b9aa56ab2e61..41c1f67d2926 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.apache.iceberg.flink.TestFixtures.TABLE; -import static org.apache.iceberg.flink.sink.dynamic.DynamicCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; import java.util.List; import java.util.function.Consumer; @@ -123,10 +122,7 @@ void before() { CATALOG_EXTENSION .catalog() .createTable( - IDENTIFIERS[i], - SCHEMA, - PartitionSpec.unpartitioned(), - ImmutableMap.of(MAX_CONTINUOUS_EMPTY_COMMITS, "100000")); + IDENTIFIERS[i], SCHEMA, PartitionSpec.unpartitioned(), ImmutableMap.of()); table.manageSnapshots().createBranch(SnapshotRef.MAIN_BRANCH).commit(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java index 17a4d98b2938..f68e8dae174e 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.iceberg.DataFile; @@ -58,7 +59,7 @@ class TestDynamicWriteResultAggregator { .build(); @Test - void testAggregator() throws Exception { + void testAggregatesWriteResultsForTwoTables() throws Exception { CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); @@ -69,13 +70,12 @@ void testAggregator() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = new WriteTarget("table", "branch", 42, 0, true, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = - new DynamicWriteResult(writeTarget1, WriteResult.builder().build()); - WriteTarget writeTarget2 = - new WriteTarget("table2", "branch", 42, 0, true, Sets.newHashSet(1, 2)); + new DynamicWriteResult(tableKey1, -1, WriteResult.builder().build()); + TableKey tableKey2 = new TableKey("table2", "branch"); DynamicWriteResult dynamicWriteResult2 = - new DynamicWriteResult(writeTarget2, WriteResult.builder().build()); + new DynamicWriteResult(tableKey2, -1, WriteResult.builder().build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -113,18 +113,16 @@ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build()); + tableKey1, DATA_FILE.specId(), WriteResult.builder().addDataFiles(DATA_FILE).build()); - // Different WriteTarget - WriteTarget writeTarget2 = - new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet()); + // Different TableKey + TableKey tableKey2 = new TableKey("table", "branch2"); DynamicWriteResult dynamicWriteResult2 = new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build()); + tableKey2, DATA_FILE.specId(), WriteResult.builder().addDataFiles(DATA_FILE).build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -151,6 +149,70 @@ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception { } } + @Test + void testAggregatesWriteResultsForOneTable() throws Exception { + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); + + long checkpointId = 1L; + + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new DynamicWriteResultAggregator( + CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize))) { + testHarness.open(); + + TableKey tableKey = new TableKey("table", "branch"); + DataFile dataFile1 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-1.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + DataFile dataFile2 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-2.parquet") + .withFileSizeInBytes(20) + .withRecordCount(2) + .build(); + + testHarness.processElement( + createRecord(tableKey, checkpointId, dataFile1.specId(), dataFile1)); + testHarness.processElement( + createRecord(tableKey, checkpointId, dataFile2.specId(), dataFile2)); + + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.prepareSnapshotPreBarrier(checkpointId); + + List> outputValues = testHarness.extractOutputValues(); + // Contains a CommittableSummary + DynamicCommittable + assertThat(outputValues).hasSize(2); + + SinkV2Assertions.assertThat(extractAndAssertCommittableSummary(outputValues.get(0))) + .hasOverallCommittables(1) + .hasFailedCommittables(0) + .hasCheckpointId(checkpointId); + + CommittableWithLineage committable = + extractAndAssertCommittableWithLineage(outputValues.get(1)); + + SinkV2Assertions.assertThat(committable).hasCheckpointId(checkpointId); + + DynamicCommittable dynamicCommittable = committable.getCommittable(); + + assertThat(dynamicCommittable.manifests()).hasNumberOfRows(1); + assertThat(dynamicCommittable.key()).isEqualTo(tableKey); + assertThat(dynamicCommittable.checkpointId()).isEqualTo(checkpointId); + assertThat(dynamicCommittable.jobId()) + .isEqualTo(testHarness.getEnvironment().getJobID().toString()); + assertThat(dynamicCommittable.operatorId()) + .isEqualTo(testHarness.getOperator().getOperatorID().toString()); + } + } + private static Set getManifestPaths( List>> messages) throws IOException { Set manifestPaths = Sets.newHashSet(); @@ -158,15 +220,40 @@ private static Set getManifestPaths( for (StreamRecord> record : messages) { CommittableMessage message = record.getValue(); if (message instanceof CommittableWithLineage) { - DeltaManifests deltaManifests = - SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, - (((CommittableWithLineage) message).getCommittable()) - .manifest()); - deltaManifests.manifests().forEach(manifest -> manifestPaths.add(manifest.path())); + for (byte[] manifest : + (((CommittableWithLineage) message).getCommittable()).manifests()) { + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, manifest); + deltaManifests + .manifests() + .forEach(manifestFile -> manifestPaths.add(manifestFile.path())); + } } } return manifestPaths; } + + private static StreamRecord> createRecord( + TableKey tableKey, long checkpointId, int specId, DataFile... dataFiles) { + return new StreamRecord<>( + new CommittableWithLineage<>( + new DynamicWriteResult( + tableKey, specId, WriteResult.builder().addDataFiles(dataFiles).build()), + checkpointId, + 0)); + } + + static CommittableSummary extractAndAssertCommittableSummary( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableSummary.class); + return (CommittableSummary) message; + } + + static CommittableWithLineage extractAndAssertCommittableWithLineage( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableWithLineage.class); + return (CommittableWithLineage) message; + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java index b1bca6099d7a..35dc45d8fd41 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java @@ -29,7 +29,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; class TestDynamicWriteResultSerializer { @@ -48,13 +47,12 @@ class TestDynamicWriteResultSerializer { ImmutableMap.of(1, ByteBuffer.allocate(1)), ImmutableMap.of(1, ByteBuffer.allocate(1)))) .build(); + private static final TableKey TABLE_KEY = new TableKey("table", "branch"); @Test void testRoundtrip() throws IOException { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, 1, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); DynamicWriteResult copy = @@ -68,11 +66,9 @@ void testRoundtrip() throws IOException { } @Test - void testUnsupportedVersion() throws IOException { + void testUnsupportedVersion() { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, 1, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(dynamicWriteResult))) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 1b786e46452f..1da0f8564938 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.sink; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; @@ -44,7 +45,11 @@ public CommitSummary(NavigableMap pendingResults) { } public void addAll(NavigableMap> pendingResults) { - pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult)); + pendingResults.values().forEach(this::addAll); + } + + public void addAll(Collection pendingResults) { + pendingResults.forEach(this::addWriteResult); } private void addWriteResult(WriteResult writeResult) { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java index 33edefe71eb0..4f0b68573ff5 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittable.java @@ -26,7 +26,7 @@ /** * The aggregated results of a single checkpoint which should be committed. Containing the - * serialized {@link DeltaManifests} file - which contains the commit data, and the jobId, + * serialized {@link DeltaManifests} files - which contains the commit data, and the jobId, * operatorId, checkpointId triplet to identify the specific commit. * *

{@link DynamicCommittableSerializer} is used to serialize {@link DynamicCommittable} between @@ -34,27 +34,27 @@ */ class DynamicCommittable implements Serializable { - private final WriteTarget key; - private final byte[] manifest; + private final TableKey key; + private final byte[][] manifests; private final String jobId; private final String operatorId; private final long checkpointId; DynamicCommittable( - WriteTarget key, byte[] manifest, String jobId, String operatorId, long checkpointId) { + TableKey key, byte[][] manifests, String jobId, String operatorId, long checkpointId) { this.key = key; - this.manifest = manifest; + this.manifests = manifests; this.jobId = jobId; this.operatorId = operatorId; this.checkpointId = checkpointId; } - WriteTarget key() { + TableKey key() { return key; } - byte[] manifest() { - return manifest; + byte[][] manifests() { + return manifests; } String jobId() { @@ -78,14 +78,14 @@ public boolean equals(Object o) { DynamicCommittable that = (DynamicCommittable) o; return checkpointId == that.checkpointId && Objects.equals(key, that.key) - && Objects.deepEquals(manifest, that.manifest) + && Arrays.deepEquals(manifests, that.manifests) && Objects.equals(jobId, that.jobId) && Objects.equals(operatorId, that.operatorId); } @Override public int hashCode() { - return Objects.hash(key, Arrays.hashCode(manifest), jobId, operatorId, checkpointId); + return Objects.hash(key, Arrays.deepHashCode(manifests), jobId, operatorId, checkpointId); } @Override @@ -97,8 +97,4 @@ public String toString() { .add("operatorId", operatorId) .toString(); } - - public WriteTarget writeTarget() { - return key; - } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java index 4aadcf1f3620..d599d29dba01 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java @@ -31,11 +31,12 @@ */ class DynamicCommittableSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; @Override public int getVersion() { - return VERSION; + return VERSION_2; } @Override @@ -46,26 +47,60 @@ public byte[] serialize(DynamicCommittable committable) throws IOException { view.writeUTF(committable.jobId()); view.writeUTF(committable.operatorId()); view.writeLong(committable.checkpointId()); - view.writeInt(committable.manifest().length); - view.write(committable.manifest()); + + int numManifests = committable.manifests().length; + view.writeInt(numManifests); + for (int i = 0; i < numManifests; i++) { + byte[] manifest = committable.manifests()[i]; + view.writeInt(manifest.length); + view.write(manifest); + } + return out.toByteArray(); } @Override public DynamicCommittable deserialize(int version, byte[] serialized) throws IOException { - if (version == 1) { - DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); - String jobId = view.readUTF(); - String operatorId = view.readUTF(); - long checkpointId = view.readLong(); - int manifestLen = view.readInt(); - byte[] manifestBuf; - manifestBuf = new byte[manifestLen]; - view.read(manifestBuf); - return new DynamicCommittable(key, manifestBuf, jobId, operatorId, checkpointId); + if (version == VERSION_1) { + return deserializeV1(serialized); + } else if (version == VERSION_2) { + return deserializeV2(serialized); } throw new IOException("Unrecognized version or corrupt state: " + version); } + + private DynamicCommittable deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + WriteTarget key = WriteTarget.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + int manifestLen = view.readInt(); + byte[] manifestBuf = new byte[manifestLen]; + view.read(manifestBuf); + return new DynamicCommittable( + new TableKey(key.tableName(), key.branch()), + new byte[][] {manifestBuf}, + jobId, + operatorId, + checkpointId); + } + + private DynamicCommittable deserializeV2(byte[] serialized) throws IOException { + DataInputDeserializer view = new DataInputDeserializer(serialized); + TableKey key = TableKey.deserializeFrom(view); + String jobId = view.readUTF(); + String operatorId = view.readUTF(); + long checkpointId = view.readLong(); + + byte[][] manifestsBuf = new byte[view.readInt()][]; + for (int i = 0; i < manifestsBuf.length; i++) { + byte[] manifest = new byte[view.readInt()]; + view.read(manifest); + manifestsBuf[i] = manifest; + } + + return new DynamicCommittable(key, manifestsBuf, jobId, operatorId, checkpointId); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 1cddc64d6016..5e824773f4bf 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -19,15 +19,15 @@ package org.apache.iceberg.flink.sink.dynamic; import java.io.IOException; -import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.core.io.SimpleVersionedSerialization; @@ -51,12 +51,10 @@ import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ContentFileUtil; -import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; @@ -79,26 +77,15 @@ class DynamicCommitter implements Committer { private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; - private static final WriteResult EMPTY_WRITE_RESULT = - WriteResult.builder() - .addDataFiles(Lists.newArrayList()) - .addDeleteFiles(Lists.newArrayList()) - .build(); private static final long INITIAL_CHECKPOINT_ID = -1L; - @VisibleForTesting - static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits"; - private static final String FLINK_JOB_ID = "flink.job-id"; private static final String OPERATOR_ID = "flink.operator-id"; private final Map snapshotProperties; private final boolean replacePartitions; private final DynamicCommitterMetrics committerMetrics; private final Catalog catalog; - private final Map maxContinuousEmptyCommitsMap; - private final Map continuousEmptyCheckpointsMap; private final ExecutorService workerPool; DynamicCommitter( @@ -112,9 +99,6 @@ class DynamicCommitter implements Committer { this.replacePartitions = replacePartitions; this.committerMetrics = committerMetrics; this.catalog = catalog; - this.maxContinuousEmptyCommitsMap = Maps.newHashMap(); - this.continuousEmptyCheckpointsMap = Maps.newHashMap(); - this.workerPool = ThreadPools.newFixedThreadPool("iceberg-committer-pool-" + sinkId, workerPoolSize); } @@ -126,9 +110,15 @@ public void commit(Collection> commitRequests) return; } - // For every table and every checkpoint, we store the list of to-be-committed - // DynamicCommittable. - // There may be DynamicCommittable from previous checkpoints which have not been committed yet. + /* + Each (table, branch, checkpoint) triplet must have only one commit request. + There may be commit requests from previous checkpoints which have not been committed yet. + + We currently keep a List of commit requests per checkpoint instead of a single CommitRequest + to process the Flink state from previous releases, which had multiple commit requests created by the upstream + DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users should upgrade to the 1.11 release first + to migrate their state to a single commit request per checkpoint. + */ Map>>> commitRequestMap = Maps.newHashMap(); for (CommitRequest request : commitRequests) { @@ -151,12 +141,16 @@ public void commit(Collection> commitRequests) : List.of(); long maxCommittedCheckpointId = getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId()); + + NavigableMap>> skippedCommitRequests = + entry.getValue().headMap(maxCommittedCheckpointId, true); + LOG.debug( + "Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests); // Mark the already committed FilesCommittable(s) as finished - entry - .getValue() - .headMap(maxCommittedCheckpointId, true) + skippedCommitRequests .values() .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); + NavigableMap>> uncommitted = entry.getValue().tailMap(maxCommittedCheckpointId, false); if (!uncommitted.isEmpty()) { @@ -210,89 +204,45 @@ private void commitPendingRequests( NavigableMap> pendingResults = Maps.newTreeMap(); for (Map.Entry>> e : commitRequestMap.entrySet()) { for (CommitRequest committable : e.getValue()) { - if (Arrays.equals(EMPTY_MANIFEST_DATA, committable.getCommittable().manifest())) { - pendingResults - .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(EMPTY_WRITE_RESULT); - } else { + for (byte[] manifest : committable.getCommittable().manifests()) { DeltaManifests deltaManifests = SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, committable.getCommittable().manifest()); - - WriteResult writeResult = - FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()); - if (TableUtil.formatVersion(table) > 2) { - for (DeleteFile deleteFile : writeResult.deleteFiles()) { - if (deleteFile.content() == FileContent.POSITION_DELETES) { - Preconditions.checkArgument( - ContentFileUtil.isDV(deleteFile), - "Can't add position delete file to the %s table. Concurrent table upgrade to V3 is not supported.", - table.name()); - } - } - } - + DeltaManifestsSerializer.INSTANCE, manifest); pendingResults .computeIfAbsent(e.getKey(), unused -> Lists.newArrayList()) - .add(writeResult); + .add(FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); manifests.addAll(deltaManifests.manifests()); } } } - CommitSummary summary = new CommitSummary(); - summary.addAll(pendingResults); - commitPendingResult(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - if (committerMetrics != null) { - committerMetrics.updateCommitSummary(table.name(), summary); + if (TableUtil.formatVersion(table) > 2) { + Optional positionalDelete = + pendingResults.values().stream() + .flatMap(List::stream) + .flatMap(writeResult -> Arrays.stream(writeResult.deleteFiles())) + .filter(deleteFile -> deleteFile.content() == FileContent.POSITION_DELETES) + .filter(Predicate.not(ContentFileUtil::isDV)) + .findAny(); + Preconditions.checkArgument( + positionalDelete.isEmpty(), + "Can't add position delete file to the %s table. Concurrent table upgrade to V3 is not supported.", + table.name()); } - FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId); - } - - private void commitPendingResult( - Table table, - String branch, - NavigableMap> pendingResults, - CommitSummary summary, - String newFlinkJobId, - String operatorId) { - long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); - TableKey key = new TableKey(table.name(), branch); - int continuousEmptyCheckpoints = - continuousEmptyCheckpointsMap.computeIfAbsent(key, unused -> 0); - int maxContinuousEmptyCommits = - maxContinuousEmptyCommitsMap.computeIfAbsent( - key, - unused -> { - int result = - PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10); - Preconditions.checkArgument( - result > 0, MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive"); - return result; - }); - continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; - if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { - if (replacePartitions) { - replacePartitions(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - } else { - commitDeltaTxn(table, branch, pendingResults, summary, newFlinkJobId, operatorId); - } - - continuousEmptyCheckpoints = 0; + if (replacePartitions) { + replacePartitions(table, branch, pendingResults, newFlinkJobId, operatorId); } else { - long checkpointId = pendingResults.lastKey(); - LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId); + commitDeltaTxn(table, branch, pendingResults, newFlinkJobId, operatorId); } - continuousEmptyCheckpointsMap.put(key, continuousEmptyCheckpoints); + FlinkManifestUtil.deleteCommittedManifests(table, manifests, newFlinkJobId, checkpointId); } private void replacePartitions( Table table, String branch, NavigableMap> pendingResults, - CommitSummary summary, String newFlinkJobId, String operatorId) { // Iceberg tables are unsorted. So the order of the append data does not matter. @@ -305,6 +255,9 @@ private void replacePartitions( } } + CommitSummary summary = new CommitSummary(); + summary.addAll(pendingResults); + commitOperation( table, branch, @@ -320,7 +273,6 @@ private void commitDeltaTxn( Table table, String branch, NavigableMap> pendingResults, - CommitSummary summary, String newFlinkJobId, String operatorId) { for (Map.Entry> e : pendingResults.entrySet()) { @@ -340,6 +292,9 @@ private void commitDeltaTxn( Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); } + CommitSummary summary = new CommitSummary(); + summary.addAll(writeResults); + // Every Flink checkpoint contains a set of independent changes which can be committed // together. While it is technically feasible to combine append-only data across checkpoints, // for the sake of simplicity, we do not implement this (premature) optimization. Multiple @@ -434,6 +389,7 @@ void commitOperation( durationMs); if (committerMetrics != null) { committerMetrics.commitDuration(table.name(), durationMs); + committerMetrics.updateCommitSummary(table.name(), summary); } } @@ -441,54 +397,4 @@ void commitOperation( public void close() throws IOException { workerPool.shutdown(); } - - private static class TableKey implements Serializable { - private String tableName; - private String branch; - - TableKey(String tableName, String branch) { - this.tableName = tableName; - this.branch = branch; - } - - TableKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); - } - - String tableName() { - return tableName; - } - - String branch() { - return branch; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - TableKey that = (TableKey) other; - return tableName.equals(that.tableName) && branch.equals(that.branch); - } - - @Override - public int hashCode() { - return Objects.hash(tableName, branch); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("tableName", tableName) - .add("branch", branch) - .toString(); - } - } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java index 85806f932ad5..d8d0ed6b573e 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResult.java @@ -19,22 +19,37 @@ package org.apache.iceberg.flink.sink.dynamic; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; class DynamicWriteResult { - - private final WriteTarget key; + private final TableKey key; + private final int specId; private final WriteResult writeResult; - DynamicWriteResult(WriteTarget key, WriteResult writeResult) { + DynamicWriteResult(TableKey key, int specId, WriteResult writeResult) { this.key = key; + this.specId = specId; this.writeResult = writeResult; } - WriteTarget key() { + TableKey key() { return key; } + public int specId() { + return specId; + } + WriteResult writeResult() { return writeResult; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key) + .add("specId", specId) + .add("writeResult", writeResult) + .toString(); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index 991f72bc4564..41f01c64757f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -44,6 +44,7 @@ import org.apache.iceberg.flink.sink.FlinkManifestUtil; import org.apache.iceberg.flink.sink.ManifestOutputFileFactory; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; @@ -60,14 +61,13 @@ class DynamicWriteResultAggregator CommittableMessage, CommittableMessage> { private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteResultAggregator.class); - private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; private final CatalogLoader catalogLoader; private final int cacheMaximumSize; private long lastCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; - private transient Map> results; + private transient Map>> resultsByTableKeyAndSpec; private transient Map> specs; private transient Map> outputFileFactoriesAndFormatVersions; @@ -95,7 +95,7 @@ public void open() throws Exception { this.operatorId = getOperatorID().toString(); this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber(); - this.results = Maps.newHashMap(); + this.resultsByTableKeyAndSpec = Maps.newHashMap(); this.specs = new LRUCache<>(cacheMaximumSize); this.outputFileFactoriesAndFormatVersions = new LRUCache<>(cacheMaximumSize); this.catalog = catalogLoader.loadCatalog(); @@ -119,14 +119,15 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { this.lastCheckpointId = checkpointId; Collection> committables = - Sets.newHashSetWithExpectedSize(results.size()); + Sets.newHashSetWithExpectedSize(resultsByTableKeyAndSpec.size()); int count = 0; - for (Map.Entry> entries : results.entrySet()) { + for (Map.Entry>> entries : + resultsByTableKeyAndSpec.entrySet()) { committables.add( new CommittableWithLineage<>( new DynamicCommittable( entries.getKey(), - writeToManifest(entries.getKey(), entries.getValue(), checkpointId), + writeToManifests(entries.getKey().tableName(), entries.getValue(), checkpointId), getContainingTask().getEnvironment().getJobID().toString(), getRuntimeContext().getOperatorUniqueID(), checkpointId), @@ -144,32 +145,42 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { new StreamRecord<>( new CommittableWithLineage<>(c.getCommittable(), checkpointId, subTaskId)))); LOG.info("Emitted {} commit message to downstream committer operator", count); - results.clear(); + resultsByTableKeyAndSpec.clear(); } /** - * Write all the completed data files to a newly created manifest file and return the manifest's + * Write all the completed data files to a newly created manifest files and return the manifests' * avro serialized bytes. */ @VisibleForTesting - byte[] writeToManifest( - WriteTarget key, Collection writeResults, long checkpointId) + byte[][] writeToManifests( + String tableName, Map> writeResultsBySpec, long checkpointId) throws IOException { - if (writeResults.isEmpty()) { - return EMPTY_MANIFEST_DATA; + byte[][] deltaManifestsBySpec = new byte[writeResultsBySpec.size()][]; + int idx = 0; + for (Map.Entry> entry : writeResultsBySpec.entrySet()) { + deltaManifestsBySpec[idx] = + writeToManifest(tableName, entry.getKey(), entry.getValue(), checkpointId); + idx++; } + return deltaManifestsBySpec; + } + + private byte[] writeToManifest( + String tableName, int specId, Collection writeResults, long checkpointId) + throws IOException { WriteResult.Builder builder = WriteResult.builder(); - writeResults.forEach(w -> builder.add(w.writeResult())); + writeResults.forEach(builder::add); WriteResult result = builder.build(); Tuple2 outputFileFactoryAndVersion = - outputFileFactoryAndFormatVersion(key.tableName()); + outputFileFactoryAndFormatVersion(tableName); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( result, () -> outputFileFactoryAndVersion.f0.create(checkpointId), - spec(key.tableName(), key.specId()), + spec(tableName, specId), outputFileFactoryAndVersion.f1); return SimpleVersionedSerialization.writeVersionAndSerialize( @@ -183,8 +194,16 @@ public void processElement(StreamRecord> if (element.isRecord() && element.getValue() instanceof CommittableWithLineage) { DynamicWriteResult result = ((CommittableWithLineage) element.getValue()).getCommittable(); - WriteTarget key = result.key(); - results.computeIfAbsent(key, unused -> Sets.newHashSet()).add(result); + Collection resultsPerTableKeyAndSpec = + resultsByTableKeyAndSpec + .computeIfAbsent(result.key(), unused -> Maps.newHashMap()) + .computeIfAbsent(result.specId(), unused -> Lists.newArrayList()); + resultsPerTableKeyAndSpec.add(result.writeResult()); + LOG.debug( + "Added {}, specId={}, totalResults={}", + result, + result.specId(), + resultsPerTableKeyAndSpec.size()); } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java index cf5f423fd7ff..5153ec6a49ee 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java @@ -41,6 +41,7 @@ public byte[] serialize(DynamicWriteResult writeResult) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); writeResult.key().serializeTo(view); + view.writeInt(writeResult.specId()); byte[] result = WRITE_RESULT_SERIALIZER.serialize(writeResult.writeResult()); view.write(result); return out.toByteArray(); @@ -50,11 +51,12 @@ public byte[] serialize(DynamicWriteResult writeResult) throws IOException { public DynamicWriteResult deserialize(int version, byte[] serialized) throws IOException { if (version == 1) { DataInputDeserializer view = new DataInputDeserializer(serialized); - WriteTarget key = WriteTarget.deserializeFrom(view); + TableKey key = TableKey.deserializeFrom(view); + int specId = view.readInt(); byte[] resultBuf = new byte[view.available()]; view.read(resultBuf); WriteResult writeResult = WRITE_RESULT_SERIALIZER.deserialize(version, resultBuf); - return new DynamicWriteResult(key, writeResult); + return new DynamicWriteResult(key, specId, writeResult); } throw new IOException("Unrecognized version or corrupt state: " + version); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index 907385797495..8425ea747fb7 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -189,7 +189,11 @@ public Collection prepareCommit() throws IOException { writeResult.dataFiles().length, writeResult.deleteFiles().length); - result.add(new DynamicWriteResult(writeTarget, writeResult)); + result.add( + new DynamicWriteResult( + new TableKey(writeTarget.tableName(), writeTarget.branch()), + writeTarget.specId(), + writeResult)); } writers.clear(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java new file mode 100644 index 000000000000..08b755fe14a8 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.util.Objects; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +class TableKey { + private final String tableName; + private final String branch; + + TableKey(String tableName, String branch) { + this.tableName = tableName; + this.branch = branch; + } + + TableKey(DynamicCommittable committable) { + this.tableName = committable.key().tableName(); + this.branch = committable.key().branch(); + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + void serializeTo(DataOutputView view) throws IOException { + view.writeUTF(tableName); + view.writeUTF(branch); + } + + static TableKey deserializeFrom(DataInputView view) throws IOException { + return new TableKey(view.readUTF(), view.readUTF()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + TableKey that = (TableKey) other; + return tableName.equals(that.tableName) && branch.equals(that.branch); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableName", tableName) + .add("branch", branch) + .toString(); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java index beb244b72eab..211f28c6518a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.Set; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -76,18 +75,6 @@ Set equalityFields() { return equalityFields; } - void serializeTo(DataOutputView view) throws IOException { - view.writeUTF(tableName); - view.writeUTF(branch); - view.writeInt(schemaId); - view.writeInt(specId); - view.writeBoolean(upsertMode); - view.writeInt(equalityFields.size()); - for (Integer equalityField : equalityFields) { - view.writeInt(equalityField); - } - } - static WriteTarget deserializeFrom(DataInputView view) throws IOException { return new WriteTarget( view.readUTF(), diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java index 13a06d362717..16890d1f63d0 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommittableSerializer.java @@ -21,42 +21,94 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Set; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.junit.jupiter.api.Test; class TestDynamicCommittableSerializer { + private static final DynamicCommittable COMMITTABLE = + new DynamicCommittable( + new TableKey("table", "branch"), + new byte[][] {{3, 4}, {5, 6}}, + JobID.generate().toHexString(), + new OperatorID().toHexString(), + 5); @Test - void testRoundtrip() throws IOException { - DynamicCommittable committable = + void testV1() throws IOException { + var committable = new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, + new TableKey("table", "branch"), + new byte[][] {{3, 4}}, JobID.generate().toHexString(), new OperatorID().toHexString(), 5); - DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(committable))) - .isEqualTo(committable); + assertThat(serializer.deserialize(1, serializeV1(committable))).isEqualTo(committable); } @Test - void testUnsupportedVersion() throws IOException { - DynamicCommittable committable = - new DynamicCommittable( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - new byte[] {3, 4}, - JobID.generate().toHexString(), - new OperatorID().toHexString(), - 5); + void testLatestVersion() throws IOException { + DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); + assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(COMMITTABLE))) + .isEqualTo(COMMITTABLE); + } + @Test + void testUnsupportedVersion() { DynamicCommittableSerializer serializer = new DynamicCommittableSerializer(); - assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(committable))) + assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(COMMITTABLE))) .hasMessage("Unrecognized version or corrupt state: -1") .isInstanceOf(IOException.class); } + + byte[] serializeV1(DynamicCommittable committable) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + + // Wrap TableKey into a testing WriteTarget to match the V1 format + WriteTarget writeTarget = + new WriteTarget( + committable.key().tableName(), + committable.key().branch(), + -1, + -1, + false, + Set.of(1, 2, 3)); + view.write(serializeV1(writeTarget)); + + view.writeUTF(committable.jobId()); + view.writeUTF(committable.operatorId()); + view.writeLong(committable.checkpointId()); + + Preconditions.checkArgument( + committable.manifests().length == 1, + "V1 serialization format must have only one manifest per committable."); + view.writeInt(committable.manifests()[0].length); + view.write(committable.manifests()[0]); + + return out.toByteArray(); + } + + byte[] serializeV1(WriteTarget writeTarget) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + + view.writeUTF(writeTarget.tableName()); + view.writeUTF(writeTarget.branch()); + view.writeInt(writeTarget.schemaId()); + view.writeInt(writeTarget.specId()); + view.writeBoolean(writeTarget.upsertMode()); + view.writeInt(writeTarget.equalityFields().size()); + for (Integer equalityField : writeTarget.equalityFields()) { + view.writeInt(equalityField); + } + + return out.toByteArray(); + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 5f938d4e8827..4cc27151b094 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -54,6 +54,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; @@ -125,6 +126,15 @@ class TestDynamicCommitter { .ofPositionDeletes() .build(); + private static final Map> WRITE_RESULT_BY_SPEC = + Map.of( + DATA_FILE.specId(), + Lists.newArrayList(WriteResult.builder().addDataFiles(DATA_FILE).build())); + private static final Map> WRITE_RESULT_BY_SPEC_2 = + Map.of( + DATA_FILE_2.specId(), + Lists.newArrayList(WriteResult.builder().addDataFiles(DATA_FILE_2).build())); + @BeforeEach void before() { catalog = CATALOG_EXTENSION.catalog(); @@ -155,12 +165,9 @@ void testCommit() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch", 42, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget2 = - new WriteTarget(TABLE1, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget3 = - new WriteTarget(TABLE2, "branch2", 43, 0, true, Sets.newHashSet(1, 2)); + TableKey tableKey1 = new TableKey(TABLE1, "branch"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); + TableKey tableKey3 = new TableKey(TABLE2, "branch2"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -168,27 +175,12 @@ void testCommit() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget3, WriteResult.builder().addDataFiles(DATA_FILE).build())), - 0); + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey1.tableName(), WRITE_RESULT_BY_SPEC, 0); + byte[][] deltaManifests2 = + aggregator.writeToManifests(tableKey2.tableName(), WRITE_RESULT_BY_SPEC, 0); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey3.tableName(), WRITE_RESULT_BY_SPEC, 0); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); @@ -196,15 +188,15 @@ void testCommit() throws Exception { CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey1, deltaManifests1, jobId, operatorId, checkpointId)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey2, deltaManifests2, jobId, operatorId, checkpointId)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey3, deltaManifests3, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2, commitRequest3)); @@ -267,7 +259,7 @@ void testCommit() throws Exception { } @Test - void testAlreadyCommitted() throws Exception { + void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); assertThat(table1.snapshots()).isEmpty(); @@ -285,8 +277,7 @@ void testAlreadyCommitted() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -298,24 +289,18 @@ void testAlreadyCommitted() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); CommitRequest oldCommitRequest = new MockCommitRequest<>( - new DynamicCommittable( - writeTarget, deltaManifest, jobId, operatorId, checkpointId - 1)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId - 1)); // Old commits requests shouldn't affect the result dynamicCommitter.commit(Sets.newHashSet(oldCommitRequest)); @@ -360,25 +345,23 @@ void testCommitDeleteInDifferentFormatVersion() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); - DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); OneInputStreamOperatorTestHarness aggregatorHarness = new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); + TableKey tableKey = new TableKey(TABLE1, "branch"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, + byte[][] deltaManifests = + aggregator.writeToManifests( + tableKey.tableName(), + Map.of( + DATA_FILE.specId(), + Sets.newHashSet( WriteResult.builder() .addDataFiles(DATA_FILE) .addDeleteFiles(DELETE_FILE) @@ -387,7 +370,7 @@ void testCommitDeleteInDifferentFormatVersion() throws Exception { CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); // Upgrade the table version UpdateProperties updateApi = table1.updateProperties(); @@ -421,8 +404,7 @@ void testCommitOnlyDataInDifferentFormatVersion() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -434,17 +416,12 @@ void testCommitOnlyDataInDifferentFormatVersion() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, checkpointId); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); @@ -486,50 +463,34 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); - // writeTarget2 has a different schema - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); - // Different branch for writeTarget3 - WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true, Sets.newHashSet()); - - WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); - WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); + TableKey tableKey1 = new TableKey(TABLE1, "branch1"); + TableKey tableKey2 = new TableKey(TABLE1, "branch2"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId1 = 1; final int checkpointId2 = 2; - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId1); + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey1.tableName(), WRITE_RESULT_BY_SPEC, checkpointId1); CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); + new DynamicCommittable(tableKey1, deltaManifests1, jobId, operatorId, checkpointId1)); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId1); + byte[][] deltaManifests2 = + aggregator.writeToManifests(tableKey1.tableName(), WRITE_RESULT_BY_SPEC_2, checkpointId1); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId1)); + new DynamicCommittable(tableKey1, deltaManifests2, jobId, operatorId, checkpointId1)); - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult2)), - checkpointId2); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey2.tableName(), WRITE_RESULT_BY_SPEC_2, checkpointId2); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + new DynamicCommittable(tableKey2, deltaManifests3, jobId, operatorId, checkpointId2)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -601,49 +562,36 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet()); - // writeTarget2 has a different schema - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); - WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false, Sets.newHashSet()); + TableKey tableKey = new TableKey(TABLE1, "branch"); + + Map> writeResults = + Map.of( + DELETE_FILE.specId(), + Lists.newArrayList(WriteResult.builder().addDeleteFiles(DELETE_FILE).build())); - WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); - WriteResult writeResult2 = WriteResult.builder().addDeleteFiles(DELETE_FILE).build(); - WriteResult writeResult3 = WriteResult.builder().addDataFiles(DATA_FILE).build(); + byte[][] deltaManifests1 = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); + byte[][] deltaManifests2 = aggregator.writeToManifests(tableKey.tableName(), writeResults, 0); + byte[][] deltaManifests3 = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC_2, 0); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId1 = 1; final int checkpointId2 = 2; - - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId1); + final int checkpointId3 = 3; CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId1)); - - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId2); + new DynamicCommittable(tableKey, deltaManifests1, jobId, operatorId, checkpointId1)); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId2)); - - byte[] deltaManifest3 = - aggregator.writeToManifest( - writeTarget3, - Sets.newHashSet(new DynamicWriteResult(writeTarget3, writeResult3)), - checkpointId2); + new DynamicCommittable(tableKey, deltaManifests2, jobId, operatorId, checkpointId2)); CommitRequest commitRequest3 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget3, deltaManifest3, jobId, operatorId, checkpointId2)); + new DynamicCommittable(tableKey, deltaManifests3, jobId, operatorId, checkpointId3)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -693,10 +641,7 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { } table.refresh(); - // Three committables, but only two snapshots! WriteResults from different checkpoints are not - // getting - // combined due to one writeResult2 containing a delete file. - assertThat(table.snapshots()).hasSize(2); + assertThat(table.snapshots()).hasSize(3); Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); assertThat(snapshot1.summary()) @@ -720,18 +665,34 @@ void testTableBranchAtomicCommitWithFailures() throws Exception { assertThat(snapshot2.summary()) .containsAllEntriesOf( ImmutableMap.builder() - .put("added-data-files", "1") - .put("added-records", "42") .put("changed-partition-count", "1") .put("flink.job-id", jobId) .put("flink.max-committed-checkpoint-id", "" + checkpointId2) .put("flink.operator-id", operatorId) + .put("total-data-files", "1") + .put("total-delete-files", "1") + .put("total-equality-deletes", "0") + .put("total-files-size", "0") + .put("total-position-deletes", "24") + .put("total-records", "42") + .build()); + + Snapshot snapshot3 = Iterables.get(table.snapshots(), 2); + assertThat(snapshot3.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("added-data-files", "1") + .put("added-records", "24") + .put("changed-partition-count", "1") + .put("flink.job-id", jobId) + .put("flink.max-committed-checkpoint-id", "" + checkpointId3) + .put("flink.operator-id", operatorId) .put("total-data-files", "2") .put("total-delete-files", "1") .put("total-equality-deletes", "0") .put("total-files-size", "0") .put("total-position-deletes", "24") - .put("total-records", "84") + .put("total-records", "66") .build()); } @@ -746,36 +707,24 @@ void testCommitDeltaTxnWithAppendFiles() throws Exception { new OneInputStreamOperatorTestHarness(aggregator); aggregatorHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2)); - WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet()); - - WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build(); - WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build(); - + TableKey tableKey = new TableKey(TABLE1, "branch1"); final String jobId = JobID.generate().toHexString(); final String operatorId = new OperatorID().toHexString(); final int checkpointId = 1; - byte[] deltaManifest1 = - aggregator.writeToManifest( - writeTarget1, - Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)), - checkpointId); + byte[][] deltaManifest1 = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, checkpointId); CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifest1, jobId, operatorId, checkpointId)); - byte[] deltaManifest2 = - aggregator.writeToManifest( - writeTarget2, - Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)), - checkpointId); + byte[][] deltaManifest2 = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC_2, checkpointId); CommitRequest commitRequest2 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifest2, jobId, operatorId, checkpointId)); boolean overwriteMode = false; int workerPoolSize = 1; @@ -820,8 +769,7 @@ void testReplacePartitions() throws Exception { sinkId, committerMetrics); - WriteTarget writeTarget = - new WriteTarget(TABLE1, "branch", 42, 0, false, Sets.newHashSet(1, 2)); + TableKey tableKey = new TableKey(TABLE1, "branch"); DynamicWriteResultAggregator aggregator = new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); @@ -833,32 +781,22 @@ void testReplacePartitions() throws Exception { final String operatorId = new OperatorID().toHexString(); final int checkpointId = 10; - byte[] deltaManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + byte[][] deltaManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); CommitRequest commitRequest = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, deltaManifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, deltaManifests, jobId, operatorId, checkpointId)); dynamicCommitter.commit(Sets.newHashSet(commitRequest)); - byte[] overwriteManifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId + 1); + byte[][] overwriteManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, 0); CommitRequest overwriteRequest = new MockCommitRequest<>( new DynamicCommittable( - writeTarget, overwriteManifest, jobId, operatorId, checkpointId + 1)); + tableKey, overwriteManifests, jobId, operatorId, checkpointId + 1)); dynamicCommitter.commit(Sets.newHashSet(overwriteRequest)); @@ -901,18 +839,13 @@ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throw final int checkpointId = 1; final String branch = SnapshotRef.MAIN_BRANCH; - WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false, Sets.newHashSet(1, 2)); - byte[] manifest = - aggregator.writeToManifest( - writeTarget, - Sets.newHashSet( - new DynamicWriteResult( - writeTarget, WriteResult.builder().addDataFiles(DATA_FILE).build())), - checkpointId); + TableKey tableKey = new TableKey(TABLE1, branch); + byte[][] manifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC, checkpointId); CommitRequest commitRequest1 = new MockCommitRequest<>( - new DynamicCommittable(writeTarget, manifest, jobId, operatorId, checkpointId)); + new DynamicCommittable(tableKey, manifests, jobId, operatorId, checkpointId)); Collection> commitRequests = Sets.newHashSet(commitRequest1); int workerPoolSize = 1; diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 0c07bc946189..a428571e9e28 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -29,8 +29,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -58,6 +61,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -79,6 +83,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -486,6 +491,21 @@ void testPartitionSpecEvolution() throws Exception { SimpleDataUtil.SCHEMA, "t1", SnapshotRef.MAIN_BRANCH, spec2)); runTest(rows); + + // Validate the table has expected partition specs + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1")); + Map tableSpecs = table.specs(); + List expectedSpecs = List.of(spec1, spec2, PartitionSpec.unpartitioned()); + + assertThat(tableSpecs).hasSize(expectedSpecs.size()); + expectedSpecs.forEach( + expectedSpec -> + assertThat( + tableSpecs.values().stream() + .anyMatch( + spec -> PartitionSpecEvolution.checkCompatibility(spec, expectedSpec))) + .withFailMessage("Table spec not found: %s.", expectedSpec) + .isTrue()); } @Test @@ -926,6 +946,79 @@ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws assertThat(totalAddedRecords).isEqualTo(records.size()); } + @Test + void testCommitsOncePerTableBranchAndCheckpoint() throws Exception { + String tableName = "t1"; + String branch = SnapshotRef.MAIN_BRANCH; + PartitionSpec spec1 = PartitionSpec.unpartitioned(); + PartitionSpec spec2 = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); + Set equalityFields = Sets.newHashSet("id"); + + List inputRecords = + Lists.newArrayList( + // Two schemas + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA2, tableName, branch, spec1), + // Two specs + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec1), + new DynamicIcebergDataImpl(SimpleDataUtil.SCHEMA, tableName, branch, spec2), + // Some upserts + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, false), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, tableName, branch, spec1, true, equalityFields, true)); + + executeDynamicSink(inputRecords, env, true, 1, null); + + List actualRecords; + try (CloseableIterable iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + actualRecords = Lists.newArrayList(iterable); + } + + // Validate records + int expectedRecords = inputRecords.size() - 1; // 1 duplicate + assertThat(actualRecords).hasSize(expectedRecords); + + for (int i = 0; i < expectedRecords; i++) { + Record actual = actualRecords.get(0); + assertThat(inputRecords) + .anySatisfy( + inputRecord -> { + assertThat(actual.get(0)).isEqualTo(inputRecord.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(inputRecord.rowProvided.getField(1)); + if (inputRecord.schemaProvided.equals(SimpleDataUtil.SCHEMA2)) { + assertThat(actual.get(2)).isEqualTo(inputRecord.rowProvided.getField(2)); + } + // There is an additional _pos field which gets added + }); + } + + TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); + Table table = CATALOG_EXTENSION.catalog().loadTable(tableIdentifier); + + Snapshot lastSnapshot = Iterables.getLast(table.snapshots()); + assertThat(lastSnapshot).isNotNull(); + assertThat(lastSnapshot.summary()) + .containsAllEntriesOf( + ImmutableMap.builder() + .put("total-equality-deletes", "1") + .put("total-position-deletes", "1") + .put("total-records", "6") + .build()); + + // Count commits per checkpoint + Map commitsPerCheckpoint = + StreamSupport.stream(table.snapshots().spliterator(), false) + .map(snapshot -> snapshot.summary().get("flink.max-committed-checkpoint-id")) + .filter(Objects::nonNull) + .map(Long::parseLong) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + assertThat(commitsPerCheckpoint.values()).allMatch(count -> count == 1); + } + @Test void testOptInDropUnusedColumns() throws Exception { Schema schema1 = diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java index b9aa56ab2e61..41c1f67d2926 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSinkPerf.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.apache.iceberg.flink.TestFixtures.TABLE; -import static org.apache.iceberg.flink.sink.dynamic.DynamicCommitter.MAX_CONTINUOUS_EMPTY_COMMITS; import java.util.List; import java.util.function.Consumer; @@ -123,10 +122,7 @@ void before() { CATALOG_EXTENSION .catalog() .createTable( - IDENTIFIERS[i], - SCHEMA, - PartitionSpec.unpartitioned(), - ImmutableMap.of(MAX_CONTINUOUS_EMPTY_COMMITS, "100000")); + IDENTIFIERS[i], SCHEMA, PartitionSpec.unpartitioned(), ImmutableMap.of()); table.manageSnapshots().createBranch(SnapshotRef.MAIN_BRANCH).commit(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java index 17a4d98b2938..f68e8dae174e 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultAggregator.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.iceberg.DataFile; @@ -58,7 +59,7 @@ class TestDynamicWriteResultAggregator { .build(); @Test - void testAggregator() throws Exception { + void testAggregatesWriteResultsForTwoTables() throws Exception { CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); @@ -69,13 +70,12 @@ void testAggregator() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = new WriteTarget("table", "branch", 42, 0, true, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = - new DynamicWriteResult(writeTarget1, WriteResult.builder().build()); - WriteTarget writeTarget2 = - new WriteTarget("table2", "branch", 42, 0, true, Sets.newHashSet(1, 2)); + new DynamicWriteResult(tableKey1, -1, WriteResult.builder().build()); + TableKey tableKey2 = new TableKey("table2", "branch"); DynamicWriteResult dynamicWriteResult2 = - new DynamicWriteResult(writeTarget2, WriteResult.builder().build()); + new DynamicWriteResult(tableKey2, -1, WriteResult.builder().build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -113,18 +113,16 @@ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception { testHarness = new OneInputStreamOperatorTestHarness<>(aggregator)) { testHarness.open(); - WriteTarget writeTarget1 = - new WriteTarget("table", "branch", 42, 0, false, Sets.newHashSet()); + TableKey tableKey1 = new TableKey("table", "branch"); DynamicWriteResult dynamicWriteResult1 = new DynamicWriteResult( - writeTarget1, WriteResult.builder().addDataFiles(DATA_FILE).build()); + tableKey1, DATA_FILE.specId(), WriteResult.builder().addDataFiles(DATA_FILE).build()); - // Different WriteTarget - WriteTarget writeTarget2 = - new WriteTarget("table", "branch2", 23, 0, true, Sets.newHashSet()); + // Different TableKey + TableKey tableKey2 = new TableKey("table", "branch2"); DynamicWriteResult dynamicWriteResult2 = new DynamicWriteResult( - writeTarget2, WriteResult.builder().addDataFiles(DATA_FILE).build()); + tableKey2, DATA_FILE.specId(), WriteResult.builder().addDataFiles(DATA_FILE).build()); CommittableWithLineage committable1 = new CommittableWithLineage<>(dynamicWriteResult1, 0, 0); @@ -151,6 +149,70 @@ void testPreventOutputFileFactoryCacheEvictionDuringFlush() throws Exception { } } + @Test + void testAggregatesWriteResultsForOneTable() throws Exception { + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table"), new Schema()); + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.of("table2"), new Schema()); + + long checkpointId = 1L; + + try (OneInputStreamOperatorTestHarness< + CommittableMessage, CommittableMessage> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new DynamicWriteResultAggregator( + CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize))) { + testHarness.open(); + + TableKey tableKey = new TableKey("table", "branch"); + DataFile dataFile1 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-1.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + DataFile dataFile2 = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/data-2.parquet") + .withFileSizeInBytes(20) + .withRecordCount(2) + .build(); + + testHarness.processElement( + createRecord(tableKey, checkpointId, dataFile1.specId(), dataFile1)); + testHarness.processElement( + createRecord(tableKey, checkpointId, dataFile2.specId(), dataFile2)); + + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.prepareSnapshotPreBarrier(checkpointId); + + List> outputValues = testHarness.extractOutputValues(); + // Contains a CommittableSummary + DynamicCommittable + assertThat(outputValues).hasSize(2); + + SinkV2Assertions.assertThat(extractAndAssertCommittableSummary(outputValues.get(0))) + .hasOverallCommittables(1) + .hasFailedCommittables(0) + .hasCheckpointId(checkpointId); + + CommittableWithLineage committable = + extractAndAssertCommittableWithLineage(outputValues.get(1)); + + SinkV2Assertions.assertThat(committable).hasCheckpointId(checkpointId); + + DynamicCommittable dynamicCommittable = committable.getCommittable(); + + assertThat(dynamicCommittable.manifests()).hasNumberOfRows(1); + assertThat(dynamicCommittable.key()).isEqualTo(tableKey); + assertThat(dynamicCommittable.checkpointId()).isEqualTo(checkpointId); + assertThat(dynamicCommittable.jobId()) + .isEqualTo(testHarness.getEnvironment().getJobID().toString()); + assertThat(dynamicCommittable.operatorId()) + .isEqualTo(testHarness.getOperator().getOperatorID().toString()); + } + } + private static Set getManifestPaths( List>> messages) throws IOException { Set manifestPaths = Sets.newHashSet(); @@ -158,15 +220,40 @@ private static Set getManifestPaths( for (StreamRecord> record : messages) { CommittableMessage message = record.getValue(); if (message instanceof CommittableWithLineage) { - DeltaManifests deltaManifests = - SimpleVersionedSerialization.readVersionAndDeSerialize( - DeltaManifestsSerializer.INSTANCE, - (((CommittableWithLineage) message).getCommittable()) - .manifest()); - deltaManifests.manifests().forEach(manifest -> manifestPaths.add(manifest.path())); + for (byte[] manifest : + (((CommittableWithLineage) message).getCommittable()).manifests()) { + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, manifest); + deltaManifests + .manifests() + .forEach(manifestFile -> manifestPaths.add(manifestFile.path())); + } } } return manifestPaths; } + + private static StreamRecord> createRecord( + TableKey tableKey, long checkpointId, int specId, DataFile... dataFiles) { + return new StreamRecord<>( + new CommittableWithLineage<>( + new DynamicWriteResult( + tableKey, specId, WriteResult.builder().addDataFiles(dataFiles).build()), + checkpointId, + 0)); + } + + static CommittableSummary extractAndAssertCommittableSummary( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableSummary.class); + return (CommittableSummary) message; + } + + static CommittableWithLineage extractAndAssertCommittableWithLineage( + CommittableMessage message) { + assertThat(message).isInstanceOf(CommittableWithLineage.class); + return (CommittableWithLineage) message; + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java index b1bca6099d7a..35dc45d8fd41 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriteResultSerializer.java @@ -29,7 +29,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; class TestDynamicWriteResultSerializer { @@ -48,13 +47,12 @@ class TestDynamicWriteResultSerializer { ImmutableMap.of(1, ByteBuffer.allocate(1)), ImmutableMap.of(1, ByteBuffer.allocate(1)))) .build(); + private static final TableKey TABLE_KEY = new TableKey("table", "branch"); @Test void testRoundtrip() throws IOException { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, 1, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); DynamicWriteResult copy = @@ -68,11 +66,9 @@ void testRoundtrip() throws IOException { } @Test - void testUnsupportedVersion() throws IOException { + void testUnsupportedVersion() { DynamicWriteResult dynamicWriteResult = - new DynamicWriteResult( - new WriteTarget("table", "branch", 42, 23, false, Sets.newHashSet(1, 2)), - WriteResult.builder().addDataFiles(DATA_FILE).build()); + new DynamicWriteResult(TABLE_KEY, 1, WriteResult.builder().addDataFiles(DATA_FILE).build()); DynamicWriteResultSerializer serializer = new DynamicWriteResultSerializer(); assertThatThrownBy(() -> serializer.deserialize(-1, serializer.serialize(dynamicWriteResult)))