Skip to content

Commit

Permalink
GH-263: Fix race condition in BufferedIoOutputStream
Browse files Browse the repository at this point in the history
Don't try to write a future that's already done in startWrite(). Just
skip it and try the next one, if any.

Fixes #263.

Bug: #263
  • Loading branch information
tomaswolf committed Nov 2, 2022
1 parent ae3851a commit ba82c13
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,38 +152,56 @@ protected void waitForAvailableWriteSpace(int requiredSize) throws IOException {
}
}

protected void startWriting() throws IOException {
IoWriteFutureImpl future = writes.peek();
// No more pending requests
if (future == null) {
return;
}
private IoWriteFutureImpl getWriteRequest() {
IoWriteFutureImpl future = null;
while (future == null) {
future = writes.peek();
// No more pending requests
if (future == null) {
return null;
}

// Don't try to write any further if pending exception signaled
Throwable pendingError = pendingException.get();
if (pendingError != null) {
log.error("startWriting({})[{}] propagate to {} write requests pending error={}[{}]", getId(), out,
writes.size(), getClass().getSimpleName(), pendingError.getMessage());

// Don't try to write any further if pending exception signaled
Throwable pendingError = pendingException.get();
if (pendingError != null) {
log.error("startWriting({})[{}] propagate to {} write requests pending error={}[{}]",
getId(), out, writes.size(), getClass().getSimpleName(), pendingError.getMessage());
IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
for (IoWriteFutureImpl pendingWrite : writes) {
// Checking reference by design
if (UnaryEquator.isSameReference(pendingWrite, currentFuture)) {
continue; // will be taken care of when its listener is eventually called
}

IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null);
for (IoWriteFutureImpl pendingWrite : writes) {
// Checking reference by design
if (UnaryEquator.isSameReference(pendingWrite, currentFuture)) {
continue; // will be taken care of when its listener is eventually called
future.setValue(pendingError);
}

future.setValue(pendingError);
writes.clear();
return null;
}

writes.clear();
return;
// Cannot honor this request yet since other pending one incomplete
if (!currentWrite.compareAndSet(null, future)) {
return null;
}

if (future.isDone()) {
// A write was on-going, and finishWrite hadn't removed the future yet when we got it
// above. See https://github.com/apache/mina-sshd/issues/263 .
// Re-try.
currentWrite.set(null);
future = null;
}
}
return future;
}

// Cannot honor this request yet since other pending one incomplete
if (!currentWrite.compareAndSet(null, future)) {
protected void startWriting() throws IOException {
IoWriteFutureImpl future = getWriteRequest();
if (future == null) {
return;
}

Buffer buffer = future.getBuffer();
int bufferSize = buffer.available();
out.writeBuffer(buffer).addListener(f -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ protected IoWriteFutureImpl writePacket(IoWriteFutureImpl future, boolean resume
if (chunkLength < stillToSend && !(f instanceof BufferedFuture)) {
// We can send only part of the data remaining: copy the buffer (if it hasn't been copied before) because
// the original may be re-used, then send the bit we can send, and queue up a future for sending the rest.
f = new BufferedFuture(future.getId(), new ByteArrayBuffer(buffer.getCompactData()));
Buffer copied = new ByteArrayBuffer(stillToSend);
copied.putBuffer(buffer, false);
f = new BufferedFuture(future.getId(), copied);
f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten()));
}
if (chunkLength <= 0) {
Expand Down

0 comments on commit ba82c13

Please sign in to comment.