Skip to content

Commit

Permalink
chore: Use Source.queue (#537)
Browse files Browse the repository at this point in the history
* Use Source.queue
  • Loading branch information
laglangyue authored Apr 20, 2024
1 parent 355eb90 commit 0c18a23
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 52 deletions.
18 changes: 10 additions & 8 deletions docs/src/test/scala/docs/http/scaladsl/Http2ClientApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import pekko.http.scaladsl.model.headers.HttpEncodings
import pekko.http.scaladsl.model.HttpRequest
import pekko.http.scaladsl.model.HttpResponse
import pekko.http.scaladsl.model.headers
import pekko.stream.OverflowStrategy
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.QueueOfferResult

import com.typesafe.config.ConfigFactory

import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -75,14 +75,11 @@ object Http2ClientApp extends App {
.flatMap(_.toStrict(1.second))
.onComplete(res => println(s"[4] Got favicon: $res"))

// OverflowStrategy.dropNew has been deprecated in latest Pekko versions
// FIXME: replace with 2.6 queue when 2.5 support is dropped, see #3069
@nowarn("msg=Use Source.queue") //
// #response-future-association
def singleRequest(
connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
val queue =
Source.queue(bufferSize, OverflowStrategy.dropNew)
Source.queue(bufferSize)
.via(connection)
.to(Sink.foreach { response =>
// complete the response promise with the response when it arrives
Expand All @@ -94,9 +91,14 @@ object Http2ClientApp extends App {
req => {
// create a promise of the response for each request and set it as an attribute on the request
val p = Promise[HttpResponse]()
queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p)))
queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))) match {
// return the future response
.flatMap(_ => p.future)
case QueueOfferResult.Enqueued => p.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(
new RuntimeException("Queue was closed (pool shut down)."))
}
}
}
// #response-future-association
Expand Down
79 changes: 35 additions & 44 deletions docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,28 @@

package docs.http.scaladsl

import scala.concurrent.ExecutionContext
import org.apache.pekko
import pekko.http.scaladsl.model.HttpRequest
import pekko.http.scaladsl.settings.ClientConnectionSettings
import pekko.http.scaladsl.settings.ConnectionPoolSettings
import scala.annotation.nowarn

import docs.CompileOnlySpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scala.concurrent.ExecutionContext

// OverflowStrategy.dropNew has been deprecated in latest Pekko versions
// FIXME: replace with 2.6 queue when 2.5 support is dropped, see #3069
@nowarn("msg=will not be a runnable program|Use Source.queue")
class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySpec {

"manual-entity-consume-example-1" in compileOnlySpec {
// #manual-entity-consume-example-1
import java.io.File

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl.{ FileIO, Framing }
import pekko.util.ByteString

import java.io.File

implicit val system: ActorSystem = ActorSystem()

val response: HttpResponse = ???
Expand All @@ -54,15 +51,14 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"manual-entity-consume-example-2" in compileOnlySpec {
// #manual-entity-consume-example-2
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.util.ByteString

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

Expand Down Expand Up @@ -91,16 +87,15 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"manual-entity-consume-example-3" in compileOnlySpec {
// #manual-entity-consume-example-3
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.util.ByteString
import pekko.stream.scaladsl.{ Flow, Sink, Source }
import pekko.util.ByteString

import scala.concurrent.{ ExecutionContext, Future }

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher
Expand Down Expand Up @@ -140,12 +135,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"manual-entity-discard-example-1" in compileOnlySpec {
// #manual-entity-discard-example-1
import scala.concurrent.ExecutionContext

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model.HttpMessage.DiscardedEntity
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.HttpMessage.DiscardedEntity

import scala.concurrent.ExecutionContext

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher
Expand All @@ -158,14 +153,13 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
// #manual-entity-discard-example-1
}
"manual-entity-discard-example-2" in compileOnlySpec {
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import pekko.Done
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl.Sink

import scala.concurrent.{ ExecutionContext, Future }

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

Expand All @@ -179,27 +173,25 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"host-level-queue-example" in compileOnlySpec {
// #host-level-queue-example
import scala.util.{ Failure, Success }
import scala.concurrent.{ Future, Promise }

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model._
import pekko.stream.scaladsl._
import pekko.stream.QueueOfferResult

import pekko.stream.{ OverflowStrategy, QueueOfferResult }
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }

implicit val system: ActorSystem = ActorSystem()
import system.dispatcher // to get an implicit ExecutionContext into scope
implicit val system: ActorSystem = ActorSystem() // to get an implicit ExecutionContext into scope

val QueueSize = 10

// This idea came initially from this blog post (link broken):
// http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("pekko.apache.org")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize)
.via(poolClientFlow)
.to(Sink.foreach {
case ((Success(resp), p)) => p.success(resp)
Expand All @@ -209,12 +201,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
queue.offer(request -> responsePromise) match {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(
new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
new RuntimeException("Queue was closed (pool shut down) while running the request."))
}
}

Expand All @@ -224,20 +216,18 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"host-level-streamed-example" in compileOnlySpec {
// #host-level-streamed-example
import java.nio.file.{ Path, Paths }

import scala.util.{ Failure, Success }
import scala.concurrent.Future

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.marshalling.Marshal
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.Multipart.FormData
import pekko.stream.scaladsl._

import pekko.http.scaladsl.model.Multipart.FormData
import pekko.http.scaladsl.marshalling.Marshal
import java.nio.file.{ Path, Paths }
import scala.concurrent.Future
import scala.util.{ Failure, Success }

implicit val system: ActorSystem = ActorSystem()
import system.dispatcher // to get an implicit ExecutionContext into scope
Expand Down Expand Up @@ -289,9 +279,10 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
}

"single-request-example" in compileOnlySpec {
import scala.concurrent.Future
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model._

import scala.concurrent.Future
// #create-simple-request
HttpRequest(uri = "https://pekko.apache.org")

Expand Down Expand Up @@ -320,8 +311,8 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
val response: HttpResponse = null
// #unmarshal-response-body
import org.apache.pekko
import pekko.http.scaladsl.unmarshalling.Unmarshal
import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import pekko.http.scaladsl.unmarshalling.Unmarshal
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

Expand All @@ -343,8 +334,8 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
class Myself extends Actor
with ActorLogging {

import pekko.pattern.pipe
import context.dispatcher
import pekko.pattern.pipe

implicit val system: ActorSystem = context.system
val http = Http(system)
Expand All @@ -370,12 +361,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp

"https-proxy-example-single-request" in compileOnlySpec {
// #https-proxy-example-single-request
import java.net.InetSocketAddress

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.{ ClientTransport, Http }

import java.net.InetSocketAddress

implicit val system = ActorSystem()

val proxyHost = "localhost"
Expand All @@ -391,12 +382,12 @@ class HttpClientExampleSpec extends AnyWordSpec with Matchers with CompileOnlySp
}

"https-proxy-example-single-request with auth" in compileOnlySpec {
import java.net.InetSocketAddress

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.{ ClientTransport, Http }

import java.net.InetSocketAddress

implicit val system = ActorSystem()

val proxyHost = "localhost"
Expand Down

0 comments on commit 0c18a23

Please sign in to comment.