diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index b00018b3b770..d20859377ffc 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -61,10 +61,11 @@ static List readDataFiles(ManifestFile manifestFile, FileIO io) throws } } - static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId, - long attemptNumber) { + static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId, + int subTaskId, long attemptNumber) { TableOperations ops = ((HasTableOperations) table).operations(); - return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); + return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId, + subTaskId, attemptNumber); } static DeltaManifests writeCompletedFiles(WriteResult result, diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 8f8bdad6c852..beffff68fde8 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -129,7 +129,9 @@ public void initializeState(StateInitializationContext context) throws Exception int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); int attemptId = getRuntimeContext().getAttemptNumber(); - this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId); + String operatorUniqueId = getRuntimeContext().getOperatorUniqueID(); + this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId, + subTaskId, attemptId); this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index fca86080b11a..b7d575bb446b 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -35,23 +35,25 @@ class ManifestOutputFileFactory { private final FileIO io; private final Map props; private final String flinkJobId; + private final String operatorUniqueId; private final int subTaskId; private final long attemptNumber; private final AtomicInteger fileCount = new AtomicInteger(0); ManifestOutputFileFactory(TableOperations ops, FileIO io, Map props, - String flinkJobId, int subTaskId, long attemptNumber) { + String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { this.ops = ops; this.io = io; this.props = props; this.flinkJobId = flinkJobId; + this.operatorUniqueId = operatorUniqueId; this.subTaskId = subTaskId; this.attemptNumber = attemptNumber; } private String generatePath(long checkpointId) { - return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId, - attemptNumber, checkpointId, fileCount.incrementAndGet())); + return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId, + subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet())); } OutputFile create(long checkpointId) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index c1538bcaff9d..4a47656e847d 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -87,9 +87,10 @@ public void before() throws IOException { @Test public void testIO() throws IOException { String flinkJobId = newFlinkJobId(); + String operatorId = newOperatorUniqueId(); for (long checkpointId = 1; checkpointId <= 3; checkpointId++) { ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); final long curCkpId = checkpointId; List dataFiles = generateDataFiles(10); @@ -122,11 +123,12 @@ public void testIO() throws IOException { public void testUserProvidedManifestLocation() throws IOException { long checkpointId = 1; String flinkJobId = newFlinkJobId(); + String operatorId = newOperatorUniqueId(); File userProvidedFolder = tempFolder.newFolder(); Map props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///"); ManifestOutputFileFactory factory = new ManifestOutputFileFactory( ((HasTableOperations) table).operations(), table.io(), props, - flinkJobId, 1, 1); + flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(5); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( @@ -156,7 +158,9 @@ public void testUserProvidedManifestLocation() throws IOException { public void testVersionedSerializer() throws IOException { long checkpointId = 1; String flinkJobId = newFlinkJobId(); - ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + String operatorId = newOperatorUniqueId(); + ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, + 1, 1); List dataFiles = generateDataFiles(10); List eqDeleteFiles = generateEqDeleteFiles(10); @@ -186,7 +190,9 @@ public void testCompatibility() throws IOException { // The v2 deserializer should be able to deserialize the v1 binary. long checkpointId = 1; String flinkJobId = newFlinkJobId(); - ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + String operatorId = newOperatorUniqueId(); + ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, + 1, 1); List dataFiles = generateDataFiles(10); ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles); @@ -271,4 +277,8 @@ private List generatePosDeleteFiles(int fileNum) throws IOException private static String newFlinkJobId() { return UUID.randomUUID().toString(); } + + private static String newOperatorUniqueId() { + return UUID.randomUUID().toString(); + } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 135fa84ee94a..9c23d8a0889f 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -599,8 +599,10 @@ public void testFlinkManifests() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); + String operatorId = harness.getOneInputOperator().getOperatorID().toString(); Assert.assertEquals("File name should have the expected pattern.", - String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), + manifestPath.getFileName().toString()); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); @@ -640,8 +642,10 @@ public void testDeleteFiles() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); + String operatorId = harness.getOneInputOperator().getOperatorID().toString(); Assert.assertEquals("File name should have the expected pattern.", - String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), + manifestPath.getFileName().toString()); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());