Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,38 +100,33 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
.build()).withAutoCommit(false).withProperties(properties).build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
try {
ExecutorService executors = Executors.newFixedThreadPool(2);
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e1) {
assertTrue(e1 instanceof HoodieWriteConflictException);
throw new RuntimeException(e1);
}
});
Future future2 = executors.submit(() -> {
String newCommitTime = "005";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e2) {
assertTrue(e2 instanceof HoodieWriteConflictException);
throw new RuntimeException(e2);
}
});
future1.get();
future2.get();
fail("Should not reach here, this means concurrent writes were handled incorrectly");
} catch (Exception e) {
// Expected to fail due to overlapping commits
}
ExecutorService executors = Executors.newFixedThreadPool(2);
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e1) {
assertTrue(e1 instanceof HoodieWriteConflictException);
throw new RuntimeException(e1);
}
});
Future future2 = executors.submit(() -> {
String newCommitTime = "005";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e2) {
assertTrue(e2 instanceof HoodieWriteConflictException);
throw new RuntimeException(e2);
}
});
future1.get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the fix of the metadata sync. The writer can resolve the conflicts correctly now.

future2.get();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void close() {
* Clears the partition Map and reset view states.
*/
@Override
public final void reset() {
public void reset() {
try {
writeLock.lock();
clear();
Expand Down Expand Up @@ -1135,8 +1135,7 @@ public void sync() {
*/
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
refreshTimeline(newTimeline);
addedPartitions.clear();
resetViewState();
clear();
// Initialize with new Hoodie timeline.
init(metaClient, newTimeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ public HoodieTimeline getTimeline() {

@Override
public void sync() {
preferredView.reset();
secondaryView.reset();
preferredView.sync();
secondaryView.sync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
public boolean refresh() {
Map<String, String> paramsMap = getParams();
try {
// refresh the local timeline first.
this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException(e);
Expand Down Expand Up @@ -450,7 +452,6 @@ public void close() {

@Override
public void reset() {
timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
refresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,9 @@ public Option<String> getLatestCompactionTime() {
public void close() throws Exception {
// no-op
}

@Override
public void reset() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,10 @@ public Option<String> getLatestCompactionTime() {
}
return Option.empty();
}

@Override
public void reset() {
initIfNeeded();
dataMetaClient.reloadActiveTimeline();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ protected FileStatus[] listPartition(Path partitionPath) throws IOException {
return tableMetadata.getAllFilesInPartition(partitionPath);
}

@Override
public void reset() {
super.reset();
tableMetadata.reset();
}

@Override
public void sync() {
super.sync();
tableMetadata.reset();
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,9 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad
* Returns the timestamp of the latest compaction.
*/
Option<String> getLatestCompactionTime();

/**
* Clear the states of the table metadata.
*/
void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ public void testGetTimeline() {
@Test
public void testSync() {
fsView.sync();
verify(primary, times(1)).reset();
verify(secondary, times(1)).reset();
verify(primary, times(1)).sync();
verify(secondary, times(1)).sync();
}

@Test
Expand Down