Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws
}
}

static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
long attemptNumber) {
static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
int subTaskId, long attemptNumber) {
TableOperations ops = ((HasTableOperations) table).operations();
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
subTaskId, attemptNumber);
}

static DeltaManifests writeCompletedFiles(WriteResult result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public void initializeState(StateInitializationContext context) throws Exception

int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
private final FileIO io;
private final Map<String, String> props;
private final String flinkJobId;
private final String operatorUniqueId;
private final int subTaskId;
private final long attemptNumber;
private final AtomicInteger fileCount = new AtomicInteger(0);

ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
String flinkJobId, int subTaskId, long attemptNumber) {
String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) {
this.ops = ops;
this.io = io;
this.props = props;
this.flinkJobId = flinkJobId;
this.operatorUniqueId = operatorUniqueId;
this.subTaskId = subTaskId;
this.attemptNumber = attemptNumber;
}

private String generatePath(long checkpointId) {
return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
attemptNumber, checkpointId, fileCount.incrementAndGet()));
return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
}

OutputFile create(long checkpointId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ public void before() throws IOException {
@Test
public void testIO() throws IOException {
String flinkJobId = newFlinkJobId();
String operatorId = newOperatorUniqueId();
for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
ManifestOutputFileFactory factory =
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1);
final long curCkpId = checkpointId;

List<DataFile> dataFiles = generateDataFiles(10);
Expand Down Expand Up @@ -122,11 +123,12 @@ public void testIO() throws IOException {
public void testUserProvidedManifestLocation() throws IOException {
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
String operatorId = newOperatorUniqueId();
File userProvidedFolder = tempFolder.newFolder();
Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
((HasTableOperations) table).operations(), table.io(), props,
flinkJobId, 1, 1);
flinkJobId, operatorId, 1, 1);

List<DataFile> dataFiles = generateDataFiles(5);
DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(
Expand Down Expand Up @@ -156,7 +158,9 @@ public void testUserProvidedManifestLocation() throws IOException {
public void testVersionedSerializer() throws IOException {
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
String operatorId = newOperatorUniqueId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
1, 1);

List<DataFile> dataFiles = generateDataFiles(10);
List<DeleteFile> eqDeleteFiles = generateEqDeleteFiles(10);
Expand Down Expand Up @@ -186,7 +190,9 @@ public void testCompatibility() throws IOException {
// The v2 deserializer should be able to deserialize the v1 binary.
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
String operatorId = newOperatorUniqueId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
1, 1);

List<DataFile> dataFiles = generateDataFiles(10);
ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles);
Expand Down Expand Up @@ -271,4 +277,8 @@ private List<DeleteFile> generatePosDeleteFiles(int fileNum) throws IOException
private static String newFlinkJobId() {
return UUID.randomUUID().toString();
}

private static String newOperatorUniqueId() {
return UUID.randomUUID().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,10 @@ public void testFlinkManifests() throws Exception {
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
String operatorId = harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals("File name should have the expected pattern.",
String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
manifestPath.getFileName().toString());

// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
Expand Down Expand Up @@ -640,8 +642,10 @@ public void testDeleteFiles() throws Exception {
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
String operatorId = harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals("File name should have the expected pattern.",
String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
manifestPath.getFileName().toString());

// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
Expand Down