Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error using reactive streams toStreamBuffered with newer versions of the AWS SDK #3359

Closed
MancunianSam opened this issue Dec 12, 2023 · 9 comments · Fixed by #3360
Closed
Labels

Comments

@MancunianSam
Copy link

I have a method which uses the S3 transfer manager to return a publisher:

  def download(bucket: String, key: String): IO[Publisher[ByteBuffer]] = {
    val getObjectRequest = GetObjectRequest.builder.key(key).bucket(bucket).build
    val downloadRequest = DownloadRequest.builder
      .getObjectRequest(getObjectRequest)
      .responseTransformer(AsyncResponseTransformer.toPublisher[GetObjectResponse])
      .build()
    val completableFuture = transferManager
      .download(downloadRequest)
      .completionFuture()
    IO.fromCompletableFuture(IO(completableFuture)).map(_.result())
  }

I'm then calling this and calling toStreamBuffered

s3.download(downloadBucket, downloadBucketKey)
      .flatMap(
        _.toStreamBuffered[IO](10 * 1024)

With recent (I'm not 100% sure which one) versions of the AWS SDK, I'm now getting the following error:

Exception in thread "Thread-71" java.lang.NullPointerException: Cannot invoke "org.reactivestreams.Subscription.request(long)" because "this.subscription" is null
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.maybeRequestMore(ByteBufferStoringSubscriber.java:205)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.addBufferedDataAmount(ByteBufferStoringSubscriber.java:200)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.transferTo(ByteBufferStoringSubscriber.java:102)
	at software.amazon.awssdk.services.s3.internal.crt.S3CrtRequestBodyStreamAdapter.sendRequestBody(S3CrtRequestBodyStreamAdapter.java:47)

SDK version 2.20.69 is working but 2.21.42 is not. The only recent change in the SDK I can see is aws/aws-sdk-java-v2#4308
If I revert this change locally and run my code again, it's working so it looks like that change to the AWS SDK is at least partly responsible.

Do you know what is causing this error or if there are any workarounds I can use?

Thanks

@armanbilge
Copy link
Member

Thanks for reporting!

if there are any workarounds I can use?

Sure, are you on JDK 9+? In that case, try using the converters in fs2.interop.flow with the FlowAdapters from the reactive-streams Java library.

@MancunianSam
Copy link
Author

Thanks for your reply. Sorry about this but I've got my wires crossed in the original report. The download method with the publisher created by toStreamBuffered is working ok. The issue is with a similar upload method and toUnicastPublisher

def upload(bucket: String, key: String, contentLength: Long, publisher: Publisher[ByteBuffer]): IO[CompletedUpload] = {
    val putObjectRequest = PutObjectRequest.builder
    // usual put object config
    .build

    val requestBody = AsyncRequestBody.fromPublisher(publisher)
    val response: Upload = transferManager.upload(
      UploadRequest.builder().requestBody(requestBody).putObjectRequest(putObjectRequest).build()
    )
    IO.fromCompletableFuture(IO(response.completionFuture()))
  }

I'm then calling it like:

Stream.eval(IO(ByteBuffer.wrap("test".getBytes)))
      .toUnicastPublisher
      .use { pub =>
        s3.upload("uploadBucket", "key", 4, pub)
      }

Then I'm getting the exception reported above.

I've tried to use the fs2.interop.flow converter with the FlowAdapters as you suggested to create the Resource

def streamToPublisher(stream: Stream[IO, ByteBuffer]): Resource[IO, Publisher[ByteBuffer]] =
    fs2.interop.flow.toPublisher(stream).map(pub => FlowAdapters.toPublisher[ByteBuffer](pub))

I'm still getting the same exception when I try to upload with a Publisher created this way.

@armanbilge
Copy link
Member

I'm still getting the same exception when I try to upload with a Publisher created this way.

Great, thanks for trying that! Can you please share the stack trace?

@armanbilge
Copy link
Member

Can you try removing FS2 completely, and using this custom Publisher?

import java.nio.ByteBuffer
import java.util.concurrent.Flow.Publisher
import java.util.concurrent.Flow.Subscriber
import java.util.concurrent.Flow.Subscription

val publisher = new Publisher[ByteBuffer] {
  def subscribe(subscriber: Subscriber[? >: ByteBuffer]): Unit = {
    subscriber.onSubscribe {
      new Subscription {
        def request(n: Long): Unit = {
          subscriber.onNext(ByteBuffer.wrap("test".getBytes))
          subscriber.onComplete()
        }
        def cancel(): Unit = ()
      }
    }
  }
}

@MancunianSam
Copy link
Author

If I create the publisher like that and then use the FlowAdapters to convert from the Flow.Publisher to reactivestreams.Publisher I'm getting a StackOverflowError

Exception in thread "Thread-1" java.lang.StackOverflowError
	at software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress.lambda$updateAndGet$0(DefaultTransferProgress.java:51)
	at java.base/java.util.concurrent.atomic.AtomicReference.updateAndGet(AtomicReference.java:210)
	at software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress.updateAndGet(DefaultTransferProgress.java:51)
	at software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater.incrementBytesTransferred(TransferProgressUpdater.java:183)
	at software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater.access$100(TransferProgressUpdater.java:42)
	at software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater$1.subscriberOnNext(TransferProgressUpdater.java:88)
	at software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater$1.subscriberOnNext(TransferProgressUpdater.java:78)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber.lambda$onNext$0(SubscriberListener.java:84)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber.invoke(SubscriberListener.java:102)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber.onNext(SubscriberListener.java:84)
	at uk.gov.nationalarchives.LambdaRunner$$anon$1$$anon$2.request(LambdaRunner.scala:50)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber$NotifyingSubscription.request(SubscriberListener.java:118)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.maybeRequestMore(ByteBufferStoringSubscriber.java:205)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.addBufferedDataAmount(ByteBufferStoringSubscriber.java:200)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.onNext(ByteBufferStoringSubscriber.java:182)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.onNext(ByteBufferStoringSubscriber.java:36)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber.onNext(SubscriberListener.java:85)
	at uk.gov.nationalarchives.LambdaRunner$$anon$1$$anon$2.request(LambdaRunner.scala:50)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber$NotifyingSubscription.request(SubscriberListener.java:118)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.maybeRequestMore(ByteBufferStoringSubscriber.java:205)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.addBufferedDataAmount(ByteBufferStoringSubscriber.java:200)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.onNext(ByteBufferStoringSubscriber.java:182)
	at software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.onNext(ByteBufferStoringSubscriber.java:36)
	at software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber.onNext(SubscriberListener.java:85)
	at uk.gov.nationalarchives.LambdaRunner$$anon$1$$anon$2.request(LambdaRunner.scala:50)

	...repeats a lot

	Caused by: software.amazon.awssdk.core.exception.SdkClientException: Failed to send the request: A callback has reported failure.
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
	at software.amazon.awssdk.services.s3.internal.crt.S3CrtResponseHandlerAdapter.handleError(S3CrtResponseHandlerAdapter.java:139)
	at software.amazon.awssdk.services.s3.internal.crt.S3CrtResponseHandlerAdapter.onFinished(S3CrtResponseHandlerAdapter.java:99)
	at software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandlerNativeAdapter.onFinished(S3MetaRequestResponseHandlerNativeAdapter.java:24)

I don't know if this helps but it does work using this https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/utils/async/SimplePublisher.html

  val publisher = new SimplePublisher[ByteBuffer]
  publisher.send(ByteBuffer.wrap("test".getBytes))
  val requestBody = AsyncRequestBody.fromPublisher(publisher)
  transferManager.upload(UploadRequest.builder.requestBody(requestBody).build()).completionFuture().get
  publisher.complete()

@armanbilge
Copy link
Member

armanbilge commented Dec 13, 2023

I don't know if this helps

Ah, that is very helpful ... so it still suggests there's something wrong with how we are doing interop with FS2 🤔

Ok, I think we will need to dig into this more to understand what the problem is. Is there a possibility that you can make a self-contained example? i.e. something that can run against localstack, works with the SimplePublisher, but doesn't work with FS2. That would be really helpful.

@armanbilge
Copy link
Member

@MancunianSam I discussed this issue with @BalmungSan and we think we know what the problem is. You can hold on the reproducer for now, we'll put up our fix and see if it works for you. Thanks!

@armanbilge
Copy link
Member

@MancunianSam can you try 3.9.3-43-da2e1ce-SNAPSHOT with the snapshot resolver and let us know if it solves your issues? Thanks

resolvers += "s01-oss-sonatype-org-snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"

@armanbilge armanbilge reopened this Dec 14, 2023
@MancunianSam
Copy link
Author

@armanbilge I've tried converting the stream to a publisher using fs2.interop.flow.toPublisher(stream).map(pub => FlowAdapters.toPublisher[ByteBuffer](pub)) with the new version and it's working now. I'll swap out the calls to toUnicastPublisher with this. Thanks for your help with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants