diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java index 6afdc1b1862..4dca1b0ea10 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java @@ -4,9 +4,10 @@ package akka.actor.typed.eventstream; -// #imports +import akka.actor.Actor; import akka.actor.AllDeadLetters; import akka.actor.SuppressedDeadLetter; +import akka.actor.Terminated; import akka.actor.testkit.typed.javadsl.ActorTestKit; import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.Behavior; @@ -17,12 +18,13 @@ import akka.actor.typed.eventstream.EventStream.Subscribe; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.AskPattern; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; -import akka.testkit.javadsl.TestKit; import java.time.Duration; import java.util.concurrent.CompletionStage; +import org.junit.Assert; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; // #imports-deadletter @@ -35,10 +37,15 @@ public class LoggingDocTest extends JUnitSuite { @Test public void subscribeToDeadLetters() { - // #deadletters - ActorSystem system = ActorSystem.create(Behaviors.empty(), "DeadLetters"); - system.eventStream().tell(new Subscribe<>(DeadLetter.class, system)); - // #deadletters + ActorSystem system = ActorSystem.create( + Behaviors.setup(ctx -> { + Behavior deadLetterListener = Behaviors.empty(); + // #subscribe-deadletter + ActorRef listener = ctx.spawn(deadLetterListener, "listener"); + ctx.getSystem().eventStream().tell(new Subscribe<>(DeadLetter.class, listener)); + // #subscribe-deadletter + return SpawnProtocol.create(); + }), "DeadLettersSystem"); ActorTestKit.shutdown(system); } @@ -56,6 +63,7 @@ public DeadLetterActor(ActorContext context) { DeadLetter.class, d -> d.message().toString() ); + // subscribe DeadLetter at startup. context.getSystem().eventStream() .tell(new Subscribe<>(DeadLetter.class, messageAdapter)); } @@ -63,7 +71,7 @@ public DeadLetterActor(ActorContext context) { @Override public Receive createReceive() { return newReceiveBuilder().onMessage(String.class, msg -> { - System.out.println(msg); + getContext().getLog().info("receive dead letter: {}", msg); return Behaviors.same(); }).build(); } @@ -108,15 +116,13 @@ public Listener(ActorContext context) { public Receive createReceive() { return newReceiveBuilder() .onMessage(Jazz.class, msg -> { - System.out.printf("%s is listening to: %s%n", - getContext().getSelf().path().name(), + getContext().getLog().info("{} is listening to Jazz: {}", getContext().getSelf().path().name(), msg); return Behaviors.same(); }) .onMessage(Electronic.class, msg -> { - System.out.printf("%s is listening to: %s%n", - getContext().getSelf().path().name(), - msg); + getContext().getLog().info("{} is listening to Electronic: {}", + getContext().getSelf().path().name(), msg); return Behaviors.same(); }).build(); } @@ -164,10 +170,19 @@ public void subscribeBySubclassification() { public void subscribeToSuppressedDeadLetters() { ActorSystem system = ActorSystem.create(Behaviors.empty(), "SuppressedDeadLetter"); TestProbe probe = TestProbe.create(system); - ActorRef actor = probe.ref(); + ActorRef listener = probe.ref(); + akka.actor.ActorRef mockRef = Adapter.toClassic(listener); // #suppressed-deadletters - system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor)); + system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, listener)); // #suppressed-deadletters + Terminated suppression = Terminated.apply(mockRef, false, false); + SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, mockRef, mockRef); + system.eventStream().tell(new Publish<>(deadLetter)); + + SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass( + SuppressedDeadLetter.class); + Assert.assertNotNull(suppressedDeadLetter); + Assert.assertEquals(deadLetter, suppressedDeadLetter); ActorTestKit.shutdown(system); } @@ -176,11 +191,29 @@ public void subscribeToSuppressedDeadLetters() { public void subscribeToAllDeadLetters() { ActorSystem system = ActorSystem.create(Behaviors.empty(), "AllDeadLetters"); TestProbe probe = TestProbe.create(system); - ActorRef actor = probe.ref(); + ActorRef listener = probe.ref(); + akka.actor.ActorRef mockRef = Adapter.toClassic(listener); // #all-deadletters - system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor)); + system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, listener)); // #all-deadletters + Terminated suppression = Terminated.apply(Actor.noSender(), false, false); + SuppressedDeadLetter suppressedDeadLetter = SuppressedDeadLetter.apply(suppression, + mockRef, + mockRef); + system.eventStream().tell(new Publish<>(suppressedDeadLetter)); + DeadLetter deadLetter = DeadLetter.apply("deadLetter", mockRef, mockRef); + system.eventStream().tell(new Publish<>(deadLetter)); + + // both of the following messages will be received by the subscription actor + SuppressedDeadLetter receiveSuppressed = probe.expectMessageClass( + SuppressedDeadLetter.class); + Assert.assertNotNull(receiveSuppressed); + Assert.assertEquals(suppressedDeadLetter, receiveSuppressed); + DeadLetter receiveDeadLetter = probe.expectMessageClass(DeadLetter.class); + Assert.assertNotNull(receiveDeadLetter); + Assert.assertEquals(deadLetter, receiveDeadLetter); + ActorTestKit.shutdown(system); } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala index db093f6dc8e..3401285a2df 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala @@ -5,6 +5,7 @@ package akka.actor.typed.eventstream import akka.actor.DeadLetter +import akka.actor.Terminated import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe @@ -15,6 +16,7 @@ import akka.actor.typed.SpawnProtocol import akka.actor.typed.SpawnProtocol.Spawn import akka.actor.typed.eventstream.EventStream.Publish import akka.actor.typed.eventstream.EventStream.Subscribe +import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.Behaviors import org.scalatest.wordspec.AnyWordSpecLike @@ -24,6 +26,7 @@ import scala.concurrent.Future object LoggingDocSpec { //#deadletters + import akka.actor.DeadLetter import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream.Subscribe import akka.actor.typed.scaladsl.Behaviors @@ -37,7 +40,7 @@ object LoggingDocSpec { Behaviors.receiveMessage { case msg: String => - println(msg) + context.log.info("receive dead letter: {}", msg) Behaviors.same } } @@ -80,6 +83,18 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with // #deadletters } + "allow registration to dead letters" in { + ActorSystem(Behaviors.setup[Void] { context => + val deadLetterListener = Behaviors.empty[DeadLetter] + // #subscribe-deadletter + val listenerRef: ActorRef[DeadLetter] = context.spawn(deadLetterListener, "DeadLetterListener") + context.system.eventStream ! Subscribe[DeadLetter](listenerRef) + // #subscribe-deadletter + + Behaviors.empty + }, "System") + } + "demonstrate superclass subscriptions on typed eventStream" in { import LoggingDocSpec.ListenerActor._ //#superclass-subscription-eventstream @@ -106,17 +121,29 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with } "allow registration to suppressed dead letters" in { - val listener: ActorRef[Any] = TestProbe().ref + val probe: TestProbe[Any] = TestProbe() + val listener: ActorRef[Any] = probe.ref + val mockRef = listener.toClassic //#suppressed-deadletters import akka.actor.SuppressedDeadLetter system.eventStream ! Subscribe[SuppressedDeadLetter](listener) //#suppressed-deadletters + val suppression = Terminated(mockRef)(existenceConfirmed = false, addressTerminated = false) + val suppressionDeadLetter = SuppressedDeadLetter(suppression, mockRef, mockRef) + system.eventStream ! Publish(suppressionDeadLetter) + + val receivedSuppression = probe.expectMessageType[SuppressedDeadLetter] + receivedSuppression shouldBe suppressionDeadLetter //#all-deadletters import akka.actor.AllDeadLetters system.eventStream ! Subscribe[AllDeadLetters](listener) //#all-deadletters + val deadLetter = DeadLetter("deadLetter", mockRef, mockRef) + system.eventStream ! Publish(deadLetter) + val receivedDeadLetter = probe.expectMessageType[DeadLetter] + receivedDeadLetter shouldBe deadLetter } } diff --git a/akka-docs/src/main/paradox/typed/event-stream.md b/akka-docs/src/main/paradox/typed/event-stream.md index 52635b1589d..8fba277752d 100644 --- a/akka-docs/src/main/paradox/typed/event-stream.md +++ b/akka-docs/src/main/paradox/typed/event-stream.md @@ -36,7 +36,7 @@ It uses @ref:[Subchannel Classification](#subchannel-classification) which enabl ## How to use -The following example demonstrates how a subscription works. Given an actor: +The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from startup: Scala : @@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #deadletters } @@ -45,15 +45,24 @@ Java : @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #imports-deadletter } +@@@ div { .group-scala } + +Or you can also subscribe after Actor starts: + +@@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #subscribe-deadletter } + +@@@ + + @@@ div { .group-java } the actor definition like this: @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletter-actor } -it can be subscribed like this: +Or you can also subscribe after Actor starts: -@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletters } +@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #subscribe-deadletter } @@@