Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,78 @@

package akka.actor.typed.eventstream;

// #imports
import akka.actor.Actor;
import akka.actor.AllDeadLetters;
import akka.actor.SuppressedDeadLetter;
import akka.actor.Terminated;
import akka.actor.testkit.typed.javadsl.ActorTestKit;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.Behavior;
import akka.actor.typed.Props;
import akka.actor.typed.SpawnProtocol;
import akka.actor.typed.SpawnProtocol.Spawn;
import akka.actor.typed.eventstream.EventStream.Publish;
import akka.actor.typed.eventstream.EventStream.Subscribe;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.testkit.javadsl.TestKit;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import org.junit.Assert;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
// #imports-deadletter
import akka.actor.DeadLetter;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.eventstream.EventStream.Subscribe;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import org.slf4j.Logger;
// #imports-deadletter

public class LoggingDocTest extends JUnitSuite {

@Test
public void subscribeToDeadLetters() {
// #deadletters
ActorSystem<DeadLetter> system = ActorSystem.create(Behaviors.empty(), "DeadLetters");
system.eventStream().tell(new Subscribe<>(DeadLetter.class, system));
// #deadletters
ActorSystem<SpawnProtocol.Command> system = ActorSystem.create(SpawnProtocol.create(),
"DeadLettersSystem");
// #subscribe-deadletter
ActorSystem.create(Behaviors.setup(ctx -> {
ActorRef<DeadLetter> listener = ctx.spawn(DeadLetterActor.create(), "listener");
ctx.getSystem().eventStream().tell(new Subscribe<>(DeadLetter.class, listener));
return Behaviors.empty();
}), "DeadLettersSystem");
// #subscribe-deadletter
ActorTestKit.shutdown(system);
}

public
// #deadletter-actor
static class DeadLetterActor extends AbstractBehavior<String> {
static class DeadLetterActor extends AbstractBehavior<DeadLetter> {

final Logger log = getContext().getLog();

public static Behavior<String> create() {
public static Behavior<DeadLetter> create() {
return Behaviors.setup(DeadLetterActor::new);
}

public DeadLetterActor(ActorContext<String> context) {
public DeadLetterActor(ActorContext<DeadLetter> context) {
super(context);
ActorRef<DeadLetter> messageAdapter = context.messageAdapter(
DeadLetter.class,
d -> d.message().toString()
d -> d
);
// subscribe DeadLetter at start up.
context.getSystem().eventStream()
.tell(new Subscribe<>(DeadLetter.class, messageAdapter));
}

@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onMessage(String.class, msg -> {
System.out.println(msg);
public Receive<DeadLetter> createReceive() {
return newReceiveBuilder().onMessage(DeadLetter.class, msg -> {
log.info("receive dead letter: {} from <{}> to <{}>", msg, msg.sender(),
msg.recipient());
return Behaviors.same();
}).build();
}
Expand Down Expand Up @@ -95,6 +107,8 @@ public Electronic(String artist) {

static class Listener extends AbstractBehavior<AllKindsOfMusic> {

final Logger log = getContext().getLog();

public static Behavior<AllKindsOfMusic> create() {
return Behaviors.setup(Listener::new);
}
Expand All @@ -108,15 +122,13 @@ public Listener(ActorContext<AllKindsOfMusic> context) {
public Receive<AllKindsOfMusic> createReceive() {
return newReceiveBuilder()
.onMessage(Jazz.class, msg -> {
System.out.printf("%s is listening to: %s%n",
getContext().getSelf().path().name(),
log.info("{} is listening to Jazz: {}", getContext().getSelf().path().name(),
msg);
return Behaviors.same();
})
.onMessage(Electronic.class, msg -> {
System.out.printf("%s is listening to: %s%n",
getContext().getSelf().path().name(),
msg);
log.info("{} is listening to Electronic: {}",
getContext().getSelf().path().name(), msg);
return Behaviors.same();
}).build();
}
Expand Down Expand Up @@ -164,10 +176,19 @@ public void subscribeBySubclassification() {
public void subscribeToSuppressedDeadLetters() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SuppressedDeadLetter");
TestProbe<SuppressedDeadLetter> probe = TestProbe.create(system);
ActorRef<SuppressedDeadLetter> actor = probe.ref();
ActorRef<SuppressedDeadLetter> listener = probe.ref();
akka.actor.ActorRef mockRef = Adapter.toClassic(listener);
// #suppressed-deadletters
system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor));
system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, listener));
// #suppressed-deadletters
Terminated suppression = Terminated.apply(mockRef, false, false);
SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, mockRef, mockRef);
system.eventStream().tell(new Publish<>(deadLetter));

SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass(
SuppressedDeadLetter.class);
Assert.assertNotNull(suppressedDeadLetter);
Assert.assertEquals(deadLetter, suppressedDeadLetter);

ActorTestKit.shutdown(system);
}
Expand All @@ -176,11 +197,29 @@ public void subscribeToSuppressedDeadLetters() {
public void subscribeToAllDeadLetters() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "AllDeadLetters");
TestProbe<AllDeadLetters> probe = TestProbe.create(system);
ActorRef<AllDeadLetters> actor = probe.ref();
ActorRef<AllDeadLetters> listener = probe.ref();
akka.actor.ActorRef mockRef = Adapter.toClassic(listener);
// #all-deadletters
system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor));
system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, listener));
// #all-deadletters

Terminated suppression = Terminated.apply(Actor.noSender(), false, false);
SuppressedDeadLetter suppressedDeadLetter = SuppressedDeadLetter.apply(suppression,
mockRef,
mockRef);
system.eventStream().tell(new Publish<>(suppressedDeadLetter));
DeadLetter deadLetter = DeadLetter.apply("deadLetter", mockRef, mockRef);
system.eventStream().tell(new Publish<>(deadLetter));

// both of the following messages will be received by the subscription actor
SuppressedDeadLetter receiveSuppressed = probe.expectMessageClass(
SuppressedDeadLetter.class);
Assert.assertNotNull(receiveSuppressed);
Assert.assertEquals(suppressedDeadLetter, receiveSuppressed);
DeadLetter receiveDeadLetter = probe.expectMessageClass(DeadLetter.class);
Assert.assertNotNull(receiveDeadLetter);
Assert.assertEquals(deadLetter, receiveDeadLetter);

ActorTestKit.shutdown(system);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.actor.typed.eventstream

import akka.actor.DeadLetter
import akka.actor.Terminated
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
Expand All @@ -15,6 +16,7 @@ import akka.actor.typed.SpawnProtocol
import akka.actor.typed.SpawnProtocol.Spawn
import akka.actor.typed.eventstream.EventStream.Publish
import akka.actor.typed.eventstream.EventStream.Subscribe
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.wordspec.AnyWordSpecLike

Expand All @@ -24,20 +26,25 @@ import scala.concurrent.Future
object LoggingDocSpec {

//#deadletters
import akka.actor.DeadLetter
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream.Subscribe
import akka.actor.typed.scaladsl.Behaviors

object DeadLetterListener {

def apply(): Behavior[String] = Behaviors.setup { context =>
// subscribe DeadLetter at startup.
val adapter = context.messageAdapter[DeadLetter](d => d.message.toString)
def apply(): Behavior[DeadLetter] = Behaviors.setup { context =>
// subscribe DeadLetter at start up.
val adapter = context.messageAdapter[DeadLetter](d => d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for an adapter then, if the actor already accepts DeadLetter

context.system.eventStream ! Subscribe(adapter)

Behaviors.receiveMessage {
case msg: String =>
println(msg)
case deadLetter: DeadLetter =>
context.log.info(
"receive dead letter: {} from <{}> to <{}>",
deadLetter.message,
deadLetter.sender,
deadLetter.recipient)
Behaviors.same
}
}
Expand Down Expand Up @@ -71,7 +78,7 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
import LoggingDocSpec._
import akka.actor.typed.scaladsl.AskPattern._

"allow registration to dead letters" in {
"allow registration to dead letters from start up" in {
// #deadletters
ActorSystem(Behaviors.setup[Void] { context =>
context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty)
Expand All @@ -80,6 +87,16 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
// #deadletters
}

"allow registration to dead letters" in {
// #subscribe-deadletter
ActorSystem(Behaviors.setup[Void] { context =>
val deadLetterListener = context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty)
context.system.eventStream ! Subscribe[DeadLetter](deadLetterListener)
Behaviors.empty
}, "System")
// #subscribe-deadletter
}

"demonstrate superclass subscriptions on typed eventStream" in {
import LoggingDocSpec.ListenerActor._
//#superclass-subscription-eventstream
Expand All @@ -106,17 +123,29 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
}

"allow registration to suppressed dead letters" in {
val listener: ActorRef[Any] = TestProbe().ref
val probe: TestProbe[Any] = TestProbe()
val listener: ActorRef[Any] = probe.ref
val mockRef = listener.toClassic

//#suppressed-deadletters
import akka.actor.SuppressedDeadLetter
system.eventStream ! Subscribe[SuppressedDeadLetter](listener)
//#suppressed-deadletters
val suppression = Terminated(mockRef)(existenceConfirmed = false, addressTerminated = false)
val suppressionDeadLetter = SuppressedDeadLetter(suppression, mockRef, mockRef)
system.eventStream ! Publish(suppressionDeadLetter)

val receivedSuppression = probe.expectMessageType[SuppressedDeadLetter]
receivedSuppression shouldBe suppressionDeadLetter

//#all-deadletters
import akka.actor.AllDeadLetters
system.eventStream ! Subscribe[AllDeadLetters](listener)
//#all-deadletters
val deadLetter = DeadLetter("deadLetter", mockRef, mockRef)
system.eventStream ! Publish(deadLetter)
val receivedDeadLetter = probe.expectMessageType[DeadLetter]
receivedDeadLetter shouldBe deadLetter
}

}
15 changes: 12 additions & 3 deletions akka-docs/src/main/paradox/typed/event-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ It uses @ref:[Subchannel Classification](#subchannel-classification) which enabl

## How to use

The following example demonstrates how a subscription works. Given an actor:
The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from start up:

Scala
: @@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #deadletters }
Expand All @@ -45,15 +45,24 @@ Java
: @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #imports-deadletter }


@@@ div { .group-scala }

Or you can also subscribe after Actor starts:

@@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #subscribe-deadletter }

@@@


@@@ div { .group-java }

the actor definition like this:

@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletter-actor }

it can be subscribed like this:
Or you can also subscribe after Actor starts:

@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletters }
@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #subscribe-deadletter }

@@@

Expand Down