Skip to content

Commit

Permalink
Align InputStreamSubscriber copies
Browse files Browse the repository at this point in the history
There are legitimate differences, but also some are fixes that
should be on both sides.

See gh-31677
  • Loading branch information
rstoyanchev committed Oct 28, 2024
1 parent 37622a7 commit a366ea0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package org.springframework.core.io.buffer;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.lang.Nullable;
import reactor.core.Exceptions;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand All @@ -18,6 +12,14 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;

import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Bridges between {@link Publisher Publisher<DataBuffer>} and {@link InputStream}.
*
Expand Down Expand Up @@ -73,6 +75,8 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onNext(DataBuffer t) {
Assert.notNull(t, "DataBuffer must not be null");

if (this.done) {
discard(t);
return;
Expand Down Expand Up @@ -351,5 +355,4 @@ private void resume() {
}
}


}
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package org.springframework.http.client;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.Exceptions;

import java.io.IOException;
import java.io.InputStream;
import java.util.ConcurrentModificationException;
Expand All @@ -24,6 +14,17 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Bridges between {@link Flow.Publisher Flow.Publisher<T>} and {@link InputStream}.
*
Expand All @@ -43,10 +44,10 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri

final int prefetch;
final int limit;
final Function<T, byte[]> mapper;
final Consumer<T> onDiscardHandler;
final ReentrantLock lock;
final Queue<T> queue;
final Function<T, byte[]> mapper;
final Consumer<T> onDiscardHandler;

final AtomicReference<Object> parkedThread = new AtomicReference<>();
final AtomicInteger workAmount = new AtomicInteger();
Expand Down Expand Up @@ -248,20 +249,24 @@ public int read(byte[] b, int off, int len) throws IOException {
byte[] bytes = getBytesOrAwait();

if (bytes == DONE) {
this.closed = true;
cleanAndFinalize();
if (this.error == null) {
this.closed = true;
return j == 0 ? -1 : j;
}
else {
throw Exceptions.propagate(error);
if (j == 0) {
this.closed = true;
throw Exceptions.propagate(error);
}

return j;
}
} else if (bytes == CLOSED) {
this.s.cancel();
cleanAndFinalize();
return -1;
}

int i = this.position;
for (; i < bytes.length && j < len; i++, j++) {
b[off + j] = bytes[i];
Expand Down Expand Up @@ -311,7 +316,7 @@ byte[] getBytesOrAwait() {

actualWorkAmount = workAmount.addAndGet(-actualWorkAmount);
if (actualWorkAmount == 0) {
await();
await();
}
}
}
Expand Down

0 comments on commit a366ea0

Please sign in to comment.