Skip to content

Commit d5f385b

Browse files
authored
[actor] Introduce module (#1107)
### Problem Dealing with effects that can't be directly forked like a local `Var` isn't trivial and, in some cases, `Isolate`s can't even be provided due to the specific semantics of effects. For example, I'm working on a `Browser` effect that uses playwright to interact with a browser. Given that playwright isn't thread-safe, providing an `Isolate` would require a more advanced mechanism like forking multiple playwright engines, which doesn't seem desirable. ### Solution Actors provide a straightforward way to isolate a computation with arbitrary effects and state. Inputs can be serially consumed via `Poll` while keeping the complete state of the computation, including its effect handlers. `Stream` provides a similar solution via `fold` but the execution model isn't very flexible. Unlike streaming, `Actor` references can be freely passed around and receive messages in multiple code paths. The implementation also provides supervision hierarchy, which is a more convenient way to handle resources. ### Notes This is based on the collaboration with @DamianReeves for Functional Scala last year. Thank you!
1 parent b8ac5d3 commit d5f385b

File tree

10 files changed

+1306
-35
lines changed

10 files changed

+1306
-35
lines changed

build.sbt

+17-3
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ lazy val kyoJVM = project
121121
`kyo-cats`.jvm,
122122
`kyo-combinators`.jvm,
123123
`kyo-examples`.jvm,
124-
`kyo-monix`.jvm
124+
`kyo-monix`.jvm,
125+
`kyo-actor`.jvm
125126
)
126127

127128
lazy val kyoJS = project
@@ -144,7 +145,8 @@ lazy val kyoJS = project
144145
`kyo-zio-test`.js,
145146
`kyo-zio`.js,
146147
`kyo-cats`.js,
147-
`kyo-combinators`.js
148+
`kyo-combinators`.js,
149+
`kyo-actor`.js
148150
)
149151

150152
lazy val kyoNative = project
@@ -164,7 +166,8 @@ lazy val kyoNative = project
164166
`kyo-offheap`.native,
165167
`kyo-direct`.native,
166168
`kyo-combinators`.native,
167-
`kyo-sttp`.native
169+
`kyo-sttp`.native,
170+
`kyo-actor`.native
168171
)
169172

170173
lazy val `kyo-scheduler` =
@@ -372,6 +375,17 @@ lazy val `kyo-stm` =
372375
.nativeSettings(`native-settings`)
373376
.jsSettings(`js-settings`)
374377

378+
lazy val `kyo-actor` =
379+
crossProject(JSPlatform, JVMPlatform, NativePlatform)
380+
.withoutSuffixFor(JVMPlatform)
381+
.crossType(CrossType.Full)
382+
.in(file("kyo-actor"))
383+
.dependsOn(`kyo-core`)
384+
.settings(`kyo-settings`)
385+
.jvmSettings(mimaCheck(false))
386+
.nativeSettings(`native-settings`)
387+
.jsSettings(`js-settings`)
388+
375389
lazy val `kyo-stats-registry` =
376390
crossProject(JSPlatform, JVMPlatform, NativePlatform)
377391
.withoutSuffixFor(JVMPlatform)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
package kyo
2+
3+
import java.io.IOException
4+
import kyo.*
5+
import kyo.kernel.ArrowEffect
6+
import kyo.kernel.ContextEffect
7+
import scala.annotation.*
8+
9+
/** An actor that processes messages asynchronously through a mailbox until completion or failure.
10+
*
11+
* WARNING: Actor is a low-level primitive with complex semantics. For most concurrent programming needs, consider using simpler primitives
12+
* like kyo.Async or kyo.Stream instead.
13+
*
14+
* Actors provide a message-based concurrency model where each instance:
15+
* - Maintains a private mailbox for receiving messages
16+
* - Processes messages sequentially in FIFO order
17+
* - Can communicate with other actors by sending messages
18+
* - Can maintain and modify internal state between messages
19+
* - Can spawn new child actors
20+
* - Completes with either a success value of type B or failure of type E
21+
*
22+
* Messages can be sent to an actor using its `subject` interface, which provides both fire-and-forget `send` operations and
23+
* request-response style `ask` operations. The actor processes these messages one at a time until it either successfully completes,
24+
* encounters an error, or is explicitly closed.
25+
*
26+
* Actors can form parent-child hierarchies where:
27+
* - Parent actors can spawn and supervise child actors
28+
* - Child actors inherit the resource scope of their parent
29+
* - When a parent actor completes or fails, its children are automatically shut down
30+
*
31+
* Graceful shutdown can be initiated by:
32+
* - Calling `close()` on the actor, which prevents new messages from being accepted
33+
* - Allowing the actor to process any remaining messages in its mailbox
34+
* - Awaiting the actor's completion via its `fiber` or `result`
35+
*
36+
* If an error of type E occurs during message processing and is not handled within the actor's implementation (via Abort), the actor will
37+
* fail and complete with that error. The actor's lifecycle can be monitored through its underlying `fiber`, or by awaiting its final
38+
* `result`.
39+
*
40+
* @tparam E
41+
* The type of errors that can terminate the actor if not handled internally
42+
* @tparam A
43+
* The type of messages this actor can receive
44+
* @tparam B
45+
* The type of result this actor produces upon completion
46+
*/
47+
sealed abstract class Actor[+E, A, B](_subject: Subject[A], _fiber: Fiber[Closed | E, B]):
48+
49+
/** Returns the message subject interface for sending messages to this actor.
50+
*
51+
* Messages sent through this subject will be queued in the actor's mailbox and processed sequentially in FIFO order.
52+
*
53+
* @return
54+
* A Subject[A] that can be used to send messages to this actor
55+
*/
56+
def subject: Subject[A] = _subject
57+
58+
export _subject.*
59+
60+
/** Returns the fiber executing this actor's message processing.
61+
*
62+
* The fiber completes when the actor finishes processing all messages and produces its final result. It will fail with Closed if the
63+
* actor's mailbox is closed, or with error E if an unhandled error occurs during message processing.
64+
*
65+
* @return
66+
* A Fiber containing the actor's execution
67+
*/
68+
def fiber: Fiber[Closed | E, B] = _fiber
69+
70+
/** Retrieves the final result of this actor.
71+
*
72+
* Waits for the actor to complete processing all messages and return its final value. Will fail with error E if an unhandled error
73+
* occurs during message processing, or with Closed if the actor is closed prematurely.
74+
*
75+
* @return
76+
* The actor's final result of type B
77+
*/
78+
def await(using Frame): B < (Async & Abort[Closed | E]) = fiber.get
79+
80+
/** Closes the actor's mailbox, preventing it from receiving any new messages.
81+
*
82+
* When called, this method:
83+
* - Prevents new messages from being sent to the actor
84+
* - Returns any messages that were queued but not yet processed
85+
* - Does not interrupt the processing of the current message if one is being handled
86+
*
87+
* @return
88+
* A Maybe containing a sequence of any messages that were in the mailbox when it was closed
89+
*/
90+
def close(using Frame): Maybe[Seq[A]] < IO
91+
92+
end Actor
93+
94+
object Actor:
95+
96+
/** Default mailbox capacity for actors.
97+
*
98+
* This value can be configured through the system property "kyo.actor.capacity.default". If not specified, it defaults to 100
99+
* messages.
100+
*/
101+
val defaultCapacity =
102+
import AllowUnsafe.embrace.danger
103+
given Frame = Frame.internal
104+
IO.Unsafe.evalOrThrow(System.property[Int]("kyo.actor.capacity.default", 128))
105+
end defaultCapacity
106+
107+
/** The execution context for actor behaviors, providing the essential capabilities for actor-based concurrency.
108+
*
109+
* Actor.Context is a combination of five foundational effect types that together create the environment in which actor behaviors
110+
* operate:
111+
*
112+
* - [[Poll]]: Allows receiving and processing messages from the actor's mailbox. Used by `receiveAll`, `receiveMax`, and
113+
* `receiveLoop` methods.
114+
* - [[Env[Subject[A]]]]: Provides access to the actor's own subject, enabling self-reference and communication with itself. Used by
115+
* `self` and `selfWith` methods.
116+
* - [[Abort[Closed]]]: Supports handling of mailbox closure situations with the specialized Closed error type. Triggered when
117+
* `close` is called on the actor.
118+
* - [[Resource]]: Enables proper management and cleanup of acquired resources. Used within the actor implementation for mailbox
119+
* cleanup and for maintaining actor hierarchies where child actors are automatically cleaned up when their parent completes or
120+
* fails.
121+
* - [[Async]]: Provides asynchronous execution capabilities. Used to run the actor's processing loop concurrently.
122+
*
123+
* @tparam A
124+
* The type of messages this actor context can process
125+
*/
126+
opaque type Context[A] <: Poll[A] & Env[Subject[A]] & Abort[Closed] & Resource & Async =
127+
Poll[A] & Env[Subject[A]] & Abort[Closed] & Resource & Async
128+
129+
/** Retrieves the current actor's Subject from the environment.
130+
*
131+
* This method is designed be called within an Actor.run body, where the type parameter A matches the Poll message type of that Actor.
132+
*
133+
* @tparam A
134+
* The type of messages the Subject can receive - should match the Actor's Poll type
135+
* @return
136+
* A Subject[A] representing the current actor's message interface
137+
*/
138+
def self[A: Tag](using Frame): Subject[A] < Context[A] =
139+
Env.get
140+
141+
/** Retrieves the current actor's Subject from the environment and applies a function to it.
142+
*
143+
* This method is designed to be called within an Actor.run body, providing a convenient way to access the actor's Subject and perform
144+
* operations on it in a single call.
145+
*
146+
* @param f
147+
* A function that takes the actor's Subject and returns a value of type B with effects S
148+
* @tparam A
149+
* The type of messages the Subject can receive - should match the Actor's Poll type
150+
* @return
151+
* The result of applying function f to the actor's Subject
152+
*/
153+
def selfWith[A: Tag](using Frame)[B, S](f: Subject[A] => B < S): B < (Context[A] & S) =
154+
Env.use(f)
155+
156+
/** Sends a message to the actor designated as the current subject in the environment.
157+
*
158+
* This method is designed be called within an Actor.run body and to re-enqueue messages for later processing by the actor itself.
159+
*
160+
* @param msg
161+
* The message to re-enqueue
162+
* @tparam A
163+
* The type of the message
164+
* @return
165+
* An effect representing the message enqueuing
166+
*/
167+
def reenqueue[A: Tag](msg: A)(using Frame): Unit < Context[A] =
168+
Env.use[Subject[A]](_.send(msg))
169+
170+
/** Receives and processes a single message from the actor's mailbox.
171+
*
172+
* This method polls for the next available message and applies the provided processing function. Message processing is done
173+
* sequentially, ensuring only one message is handled at a time.
174+
*
175+
* @param f
176+
* The function to process each received message
177+
* @tparam A
178+
* The type of messages being received
179+
*/
180+
def receiveAll[A](using Tag[A])[B, S](f: A => B < S)(using Frame): Unit < (Context[A] & S) =
181+
Poll.values[A](f)
182+
183+
/** Receives and processes up to n messages from the actor's mailbox.
184+
*
185+
* This method polls for messages and applies the provided processing function to each one, up to the specified limit. Message
186+
* processing is done sequentially.
187+
*
188+
* @param max
189+
* The maximum number of messages to process
190+
* @param f
191+
* The function to process each received message
192+
* @tparam A
193+
* The type of messages being received
194+
*/
195+
def receiveMax[A: Tag](max: Int)[S](f: A => Any < S)(using Frame): Unit < (Context[A] & S) =
196+
Poll.values[A](max)(f)
197+
198+
/** Receives and processes messages from the actor's mailbox in a loop until a termination condition is met.
199+
*
200+
* This method continuously polls for messages and applies the provided processing function to each one. The function returns a
201+
* Loop.Outcome that determines whether to continue processing more messages or stop.
202+
*
203+
* To control the loop:
204+
* - Return `Loop.continue` to process the next message
205+
* - Return `Loop.done` to stop processing and complete the receive loop
206+
*
207+
* Use this when you need fine-grained control over message processing termination conditions beyond what receiveAll or receiveMax
208+
* provide.
209+
*
210+
* @param f
211+
* A function that processes each received message and returns a Loop.Outcome indicating whether to continue or stop
212+
* @tparam A
213+
* The type of messages being received
214+
* @return
215+
* An effect representing the message processing loop
216+
*/
217+
def receiveLoop[A](using Tag[A])[S](f: A => Loop.Outcome[Unit, Unit] < S)(using Frame): Unit < (Context[A] & S) =
218+
Loop(()) { _ =>
219+
Poll.one[A].map {
220+
case Absent => Loop.done
221+
case Present(v) => f(v)
222+
}
223+
}
224+
225+
/** Creates and starts a new actor with default capacity from a message processing behavior.
226+
*
227+
* This is a convenience method that calls `run(defaultCapacity)(behavior)`. It creates an actor with the default mailbox capacity as
228+
* specified by `defaultCapacity`.
229+
*
230+
* @param behavior
231+
* The behavior defining how messages are processed
232+
* @tparam E
233+
* The type of errors that can occur
234+
* @tparam A
235+
* The type of messages accepted
236+
* @tparam B
237+
* The type of result produced
238+
* @tparam S
239+
* Additional context effects required by the behavior
240+
* @return
241+
* A new Actor instance in an async effect
242+
*/
243+
def run[E, A: Tag, B: Flat, S](
244+
using Isolate.Contextual[S, IO]
245+
)(behavior: B < (Context[A] & Abort[E] & S))(
246+
using
247+
Tag[Poll[A]],
248+
Tag[Emit[A]],
249+
Frame
250+
): Actor[E, A, B] < (Resource & Async & S) =
251+
run(defaultCapacity)(behavior)
252+
253+
/** Creates and starts new actor from a message processing behavior.
254+
*
255+
* The behavior defines how messages are processed and can utilize several effects:
256+
* - Poll[A]: For receiving messages from the actor's mailbox
257+
* - Env[Subject[A]]: For accessing self-reference to send messages to self via Actor.reenqueue
258+
* - Abort[E]: For handling potential errors during message processing
259+
* - S: For any additional context effects needed by the behavior
260+
*
261+
* Message processing continues until either:
262+
* - The behavior completes normally, producing a final result
263+
* - The behavior explicitly stops polling for messages
264+
* - An unhandled error of type E occurs during message processing
265+
* - The actor's mailbox is closed
266+
*
267+
* Messages are processed sequentially in FIFO order, with the behavior having full control over when to receive the next message
268+
* through polling.
269+
*
270+
* @param b
271+
* The behavior defining how messages are processed
272+
* @tparam E
273+
* The type of errors that can occur
274+
* @tparam A
275+
* The type of messages accepted
276+
* @tparam B
277+
* The type of result produced
278+
* @tparam Ctx
279+
* Additional context effects required by the behavior
280+
* @return
281+
* A new Actor instance in an async effect
282+
*/
283+
def run[E, A: Tag, B: Flat, S](
284+
using Isolate.Contextual[S, IO]
285+
)(capacity: Int)(behavior: B < (Context[A] & Abort[E] & S))(
286+
using
287+
Tag[Poll[A]],
288+
Tag[Emit[A]],
289+
Frame
290+
): Actor[E, A, B] < (Resource & Async & S) =
291+
for
292+
mailbox <-
293+
// Create a bounded channel to serve as the actor's mailbox
294+
Channel.init[A](capacity, Access.MultiProducerSingleConsumer)
295+
_subject =
296+
// Create the actor's message interface (Subject)
297+
// Messages sent through this subject are queued in the mailbox
298+
Subject.init(mailbox)
299+
_consumer <-
300+
Loop(behavior) { b =>
301+
Poll.runFirst(b).map {
302+
case Left(r) =>
303+
Loop.done(r)
304+
case Right(cont) =>
305+
mailbox.take.map(v => Loop.continue(cont(Maybe(v))))
306+
}
307+
}.pipe(
308+
IO.ensure(mailbox.close), // Ensure mailbox cleanup by closing it when the actor completes or fails
309+
Env.run(_subject), // Provide the actor's Subject to the environment so it can be accessed via Actor.self
310+
Resource.run, // Close used resources
311+
Async.run // Start the actor's processing loop in an async context
312+
)
313+
_ <- Resource.ensure(mailbox.close) // Registers a finalizer in the outer scope to provide the actor hierarchy behavior
314+
yield new Actor[E, A, B](_subject, _consumer):
315+
def close(using Frame) = mailbox.close
316+
end Actor

0 commit comments

Comments
 (0)