Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -47,6 +48,7 @@ class FSEditLogAsync extends FSEditLog implements Runnable {

// requires concurrent access from caller threads and syncing thread.
private final BlockingQueue<Edit> editPendingQ;
private final BlockingQueue<EditSyncEx> logSyncNotifyQ;

// only accessed by syncing thread so no synchronization required.
// queue is unbounded because it's effectively limited by the size
Expand All @@ -63,6 +65,7 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
DFS_NAMENODE_EDITS_ASYNC_LOGGING_PENDING_QUEUE_SIZE_DEFAULT);

editPendingQ = new ArrayBlockingQueue<>(editPendingQSize);
logSyncNotifyQ = new LinkedBlockingQueue<>();
}

private boolean isSyncThreadAlive() {
Expand Down Expand Up @@ -227,8 +230,33 @@ private Edit dequeueEdit() throws InterruptedException {
return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll();
}

private static class EditSyncEx {
final Edit edit;
final RuntimeException ex;
EditSyncEx(Edit edit, RuntimeException ex) {
this.edit = edit;
this.ex = ex;
}
}

private class LogSyncNotifyThread extends Thread {
volatile boolean stopped = false;

@Override
public void run() {
try {
while (!stopped) {
EditSyncEx editSyncEx = logSyncNotifyQ.take();
editSyncEx.edit.logSyncNotify(editSyncEx.ex);
}
} catch(InterruptedException ie) {} // just swallow it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch InterruptedException and no handle here? does it cause client never get result? IMO, we should set stopped = true here.

}
}

@Override
public void run() {
final LogSyncNotifyThread logSyncNotifyThread = new LogSyncNotifyThread();
logSyncNotifyThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single thread to push the result back to client here, I think it is safe to trigger multi-thread to do that. right?

try {
while (true) {
boolean doSync;
Expand All @@ -251,12 +279,14 @@ public void run() {
syncEx = ex;
}
while ((edit = syncWaitQ.poll()) != null) {
edit.logSyncNotify(syncEx);
logSyncNotifyQ.put(new EditSyncEx(edit, syncEx));
}
}
}
} catch (InterruptedException ie) {
LOG.info(Thread.currentThread().getName() + " was interrupted, exiting");
logSyncNotifyThread.stopped = true;
logSyncNotifyThread.interrupt();
} catch (Throwable t) {
terminate(t);
}
Expand Down