diff --git a/build.sbt b/build.sbt index 2c7127f584..574fd46e96 100644 --- a/build.sbt +++ b/build.sbt @@ -22,6 +22,7 @@ import com.lightbend.paradox.apidoc.ApidocPlugin.autoImport.apidocRootPackage sourceDistName := "apache-pekko-http" sourceDistIncubating := false +ThisBuild / resolvers += Resolver.ApacheMavenSnapshotsRepo ThisBuild / reproducibleBuildsCheckResolver := Resolver.ApacheMavenStagingRepo addCommandAlias("verifyCodeStyle", "scalafmtCheckAll; scalafmtSbtCheck; +headerCheckAll; javafmtCheckAll") diff --git a/docs/src/test/scala/docs/http/scaladsl/HttpServerExampleSpec.scala b/docs/src/test/scala/docs/http/scaladsl/HttpServerExampleSpec.scala index 9420dc868d..467bb51e2c 100644 --- a/docs/src/test/scala/docs/http/scaladsl/HttpServerExampleSpec.scala +++ b/docs/src/test/scala/docs/http/scaladsl/HttpServerExampleSpec.scala @@ -111,7 +111,7 @@ class HttpServerExampleSpec extends AnyWordSpec with Matchers val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props) val reactToTopLevelFailures = Flow[IncomingConnection] - .watchTermination()((_, termination) => + .watchTermination((_, termination) => termination.failed.foreach { cause => failureMonitor ! cause }) diff --git a/http-core/src/main/mima-filters/2.0.x.backwards.excludes/uptake-pekko-core-2.0.0.excludes b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/uptake-pekko-core-2.0.0.excludes new file mode 100644 index 0000000000..ca65208f1b --- /dev/null +++ b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/uptake-pekko-core-2.0.0.excludes @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# changes needed to uptake Pekko Core 2.0.0 +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.client.PoolInterface#Logic.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#IncomingStreamBuffer.onDownstreamFinish") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.hpack.HandleOrPassOnStage#State.onDownstreamFinish") diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala index bd992c837d..2cdb6a42c4 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala @@ -106,7 +106,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) (incoming: Tcp.IncomingConnection) => try { httpPlusSwitching(http1, http2).addAttributes(prepareServerAttributes(settings, incoming)) - .watchTermination() { + .watchTermination { case (connectionTerminatorF, future) => connectionTerminatorF.foreach { connectionTerminator => masterTerminator.registerConnection(connectionTerminator)(fm.executionContext) @@ -170,7 +170,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) val serverLayer: Flow[ByteString, ByteString, Future[Done]] = Flow.fromGraph( Flow[HttpRequest] - .watchTermination()(Keep.right) + .watchTermination(Keep.right) .prepend(injectedRequest) .via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)( system.dispatcher)) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala index c9603f25ac..c0f7f68918 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala @@ -721,12 +721,11 @@ private[http] object HttpServerBluePrint { }) private var activeTimers = 0 - private val timeout: FiniteDuration = { + private val timeout: FiniteDuration = inheritedAttributes.get[ActorAttributes.StreamSubscriptionTimeout] match { case Some(attr) => attr.timeout case None => 5.minutes // should not happen } - } private def addTimeout(s: SubscriptionTimeout): Unit = { if (activeTimers == 0) setKeepGoing(true) activeTimers += 1 diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StageLoggingWithOverride.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StageLoggingWithOverride.scala index c7ace934a2..dd6592eb00 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/StageLoggingWithOverride.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/StageLoggingWithOverride.scala @@ -19,8 +19,7 @@ package org.apache.pekko.http.impl.util import org.apache.pekko import pekko.annotation.InternalApi import pekko.stream.stage.GraphStageLogic -import pekko.event.{ LogSource, LoggingAdapter, NoLogging } -import pekko.stream.ActorMaterializer +import pekko.event.{ LogSource, LoggingAdapter } // TODO Try to reconcile with what Pekko provides in StageLogging. // We thought this could be removed when https://github.com/akka/akka/issues/18793 had been implemented @@ -43,10 +42,7 @@ private[pekko] trait StageLoggingWithOverride extends GraphStageLogic { _log = logOverride match { case DefaultNoLogging => - materializer match { - case a: ActorMaterializer => pekko.event.Logging(a.system, logSource)(LogSource.fromClass) - case _ => NoLogging - } + pekko.event.Logging(materializer.system, logSource)(LogSource.fromClass) case x => x } case _ => diff --git a/http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala b/http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala index 6a7f2692dd..4025265b22 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/scaladsl/Http.scala @@ -121,9 +121,9 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme handler: Flow[HttpRequest, HttpResponse, Any]): ServerLayerFlow = Flow.fromGraph( Flow[HttpRequest] - .watchTermination()(Keep.right) + .watchTermination(Keep.right) .via(handler) - .watchTermination() { (termWatchBefore, termWatchAfter) => + .watchTermination { (termWatchBefore, termWatchAfter) => // flag termination when the user handler has gotten (or has emitted) termination // signals in both directions termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContext.parasitic) @@ -234,7 +234,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme .mapAsyncUnordered(settings.maxConnections) { incoming => try { fullLayer - .watchTermination() { + .watchTermination { case ((done, connectionTerminator), whenTerminates) => whenTerminates.onComplete { _ => masterTerminator.removeConnection(connectionTerminator) diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala index 9266b28cd8..cc2697dc44 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/client/HostConnectionPoolSpec.scala @@ -700,7 +700,7 @@ class HostConnectionPoolSpec extends PekkoSpecWithMaterializer( Sink.fromSubscriber(serverRequests), Source.fromPublisher(serverResponses)) .joinMat(clientServerImplementation.get(killSwitch))(Keep.right) - .watchTermination()(Keep.both) + .watchTermination(Keep.both) .join( Flow.fromSinkAndSource( Sink.fromSubscriber(responseSubscriber), diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WSClientAutobahnTest.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WSClientAutobahnTest.scala index 24dc9f08e7..1a45add642 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WSClientAutobahnTest.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WSClientAutobahnTest.scala @@ -173,7 +173,7 @@ object WSClientAutobahnTest extends App { Http().singleWebSocketRequest(uri, clientFlow)._2 def completionSignal[T]: Flow[T, T, Future[Done]] = - Flow[T].watchTermination()((_, res) => res) + Flow[T].watchTermination((_, res) => res) /** * The autobahn tests define a weird API where every request must be a WebSocket request and diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WebSocketIntegrationSpec.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WebSocketIntegrationSpec.scala index 211934cbf6..c0b3ee6fab 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WebSocketIntegrationSpec.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/ws/WebSocketIntegrationSpec.scala @@ -210,7 +210,7 @@ class WebSocketIntegrationSpec extends PekkoSpecWithMaterializer( val handlerTermination = Promise[Done]() val handler = Flow[Message] - .watchTermination()(Keep.right) + .watchTermination(Keep.right) .mapMaterializedValue(handlerTermination.completeWith(_)) .map(m => TextMessage.Strict(s"Echo [${m.asTextMessage.getStrictText}]")) diff --git a/http/src/main/java/org/apache/pekko/http/javadsl/server/ExceptionHandlerBuilder.java b/http/src/main/java/org/apache/pekko/http/javadsl/server/ExceptionHandlerBuilder.java index ab0e7a5aa7..7ec3d606bd 100644 --- a/http/src/main/java/org/apache/pekko/http/javadsl/server/ExceptionHandlerBuilder.java +++ b/http/src/main/java/org/apache/pekko/http/javadsl/server/ExceptionHandlerBuilder.java @@ -13,7 +13,8 @@ package org.apache.pekko.http.javadsl.server; -import org.apache.pekko.japi.pf.FI; +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.japi.function.Predicate; import org.apache.pekko.japi.pf.PFBuilder; public class ExceptionHandlerBuilder { @@ -35,7 +36,7 @@ private ExceptionHandlerBuilder(PFBuilder delegate) { * @return a builder with the case statement added */ public

ExceptionHandlerBuilder match( - final Class

type, FI.Apply apply) { + final Class

type, final Function apply) { delegate.match(type, apply); return this; } @@ -50,7 +51,7 @@ public

ExceptionHandlerBuilder match( * @return a builder with the case statement added */ public

ExceptionHandlerBuilder match( - final Class

type, final FI.TypedPredicate

predicate, final FI.Apply apply) { + final Class

type, final Predicate

predicate, final Function apply) { delegate.match(type, predicate, apply); return this; } @@ -63,7 +64,7 @@ public

ExceptionHandlerBuilder match( * @return a builder with the case statement added */ public

ExceptionHandlerBuilder matchEquals( - final P object, final FI.Apply apply) { + final P object, final Function apply) { delegate.matchEquals(object, apply); return this; } @@ -74,7 +75,7 @@ public

ExceptionHandlerBuilder matchEquals( * @param apply an action to apply to the argument * @return a builder with the case statement added */ - public ExceptionHandlerBuilder matchAny(final FI.Apply apply) { + public ExceptionHandlerBuilder matchAny(final Function apply) { delegate.matchAny(apply); return this; } diff --git a/http/src/main/mima-filters/2.0.x.backwards.excludes/refactor-javadsl-exception-handler.excludes b/http/src/main/mima-filters/2.0.x.backwards.excludes/refactor-javadsl-exception-handler.excludes new file mode 100644 index 0000000000..e9c10df5bd --- /dev/null +++ b/http/src/main/mima-filters/2.0.x.backwards.excludes/refactor-javadsl-exception-handler.excludes @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# refactor Java DSL ExceptionHandler (due to function changes in Pekko Core 2.0.0) +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.http.javadsl.server.ExceptionHandlerBuilder.match") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.http.javadsl.server.ExceptionHandlerBuilder.matchEquals") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.http.javadsl.server.ExceptionHandlerBuilder.matchAny") diff --git a/http/src/main/resources/reference.conf b/http/src/main/resources/reference.conf index e3f50b7d9a..43ce790f69 100644 --- a/http/src/main/resources/reference.conf +++ b/http/src/main/resources/reference.conf @@ -7,6 +7,9 @@ # This is the reference config file that contains all the default settings. # Make your edits/overrides in your application.conf. +# Disable version checks (should not be merged to main branch) +pekko.fail-mixed-versions=off + pekko.http { routing { # Enables/disables the returning of more detailed error messages to the diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/TelemetrySpiSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/TelemetrySpiSpec.scala index 5a0d07cf11..c0c35b11f1 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/TelemetrySpiSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/TelemetrySpiSpec.scala @@ -126,7 +126,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ attrs.get[TelemetryAttributes.ClientMeta].foreach(probe.ref ! _) probe.ref ! requestId request.addAttribute(requestIdAttr, requestId).addHeader(headers.RawHeader("request-id", requestId.id)) - }.watchTermination() { (_, done) => + }.watchTermination { (_, done) => done.onComplete { case Success(_) => probe.ref ! "close-seen" // this is the expected case case Failure(t) => probe.ref ! t.getMessage // useful to diagnose cases where there's a failure @@ -193,7 +193,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ telemetryProbe.ref ! "connection-seen" telemetryProbe.ref ! connId conn.copy(flow = conn.flow.addAttributes(Attributes(connId))) - }.watchTermination() { (notUsed, done) => + }.watchTermination { (notUsed, done) => done.onComplete(_ => telemetryProbe.ref ! "unbind-seen")(system.dispatcher) notUsed } @@ -203,7 +203,7 @@ abstract class TelemetrySpiSpec(useTls: Boolean) extends PekkoSpecWithMaterializ Flow[HttpResponse].map { response => telemetryProbe.ref ! "response-seen" response - }.watchTermination() { (_, done) => + }.watchTermination { (_, done) => done.foreach(_ => telemetryProbe.ref ! "close-seen")(system.dispatcher) }, StreamUtils.statefulAttrsMap[HttpRequest, HttpRequest](attributes => { request => diff --git a/project/PekkoCoreDependency.scala b/project/PekkoCoreDependency.scala index 992921fe7c..0b4c96f007 100644 --- a/project/PekkoCoreDependency.scala +++ b/project/PekkoCoreDependency.scala @@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency object PekkoCoreDependency extends PekkoDependency { override val checkProject: String = "pekko-cluster-sharding-typed" override val module: Option[String] = None - override val currentVersion: String = "1.1.5" + override val currentVersion: String = "2.0.0-M0+307-9560d2b1-SNAPSHOT" }