Skip to content

Conversation

@armanbilge
Copy link
Member

Registering event listeners currently presents two conflicting challenges:

  1. Registering a listener is resourceful i.e. it should eventually be unregistered.

  2. Listeners must be registered without any risk of suspending to the event loop.

    To be specific, something like openConnection(...).flatTap(registerListener(_)) is not safe in general, because the runtime may yield to the event loop between opening the connection and registering the listener, in which case an event may fire before the listener is installed (in the worst cases this leads to crashes due to unhandled error events).

    Two techniques to avoid suspending to the event loop are:

    • running in a single atomic delay { val c = openConnection(...); c.registerListener(...); c }
    • evaluating the effects on the MicrotaskExecutor which does not yield to the global event loop

If both of these constraints are not considered when registering listeners we can encounter bugs such as #2663 and #3336.

This PR introduces a new concept EventEmitter.Scope. This is a resourceful construct à la Supervisor that may "supervise" a number of event listeners i.e. take responsibility for their cleanup, satisfying constraint (1). Furthermore, listeners may be registered with the Scope unsafely e.g. within a delay block, satisfying constraint (2).

NB "Scope" is a bit overloaded in FS2, but this is just internal private implementation so probably fine ... open to other names.

Closes #3336.

_ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, js.Error]("error", dispatcher) { e =>
error.complete(js.JavaScriptException(e)).void
scope <- facade.events.EventEmitter.openScope
Copy link

@mikeshepherd mikeshepherd Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to change the ordering of unregistering the listeners wrt the finalizer for the thunk. The unregistering will now happen after rather than prior to that.

Notably this means that the registered listeners will get a close event that they wouldn't have previously.

This feels like a good thing? but I just wanted to call it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out. Yes, you're right. I wonder if it is worth doing the unregister in reverse order in EventEmitter.Scope 🤔

But yes, in general I think it's okay to continue receiving events, because the resource is already in the process of shutting down anyway, so I don't think it should be user-visible.

@mikeshepherd
Copy link

Hi @armanbilge, we've been running this change for a short while now, and it's had some positive effect, but not completely removed this class of error.

We're now seeing it here instead: https://github.com/typelevel/fs2/blob/main/io/js/src/main/scala/fs2/io/ioplatform.scala#L237 but at a much lower rate.
In this case it's not on a listener so it's not about making sure it's unregistered correctly, but the circumstances are very similar. We finish with the socket for whatever reason, start doing the cleanup, and then when we come to destroying the node duplex stream the resource finalizing has already gone past the point of shutting down the dispatcher and we get another "dispatcher already shutdown" error.

I'm happy to raise this up as a separate issue if you'd prefer. I'm also happy to have a go at solving it, but the solution is not so obvious to me here so I'd need some guidance.

@armanbilge
Copy link
Member Author

@mikeshepherd thanks for trying that and reporting the issue ... can you share any more details, like the stack trace possibly? Trying to understand when/where that destroy is being called, since the error suggests that the Duplex has leaked outside of its Resource-lifecycle.

@mikeshepherd
Copy link

Yeah absolutely.

"java.lang.IllegalStateException: dispatcher already shutdown","   
at $ct_jl_IllegalStateException__T__ (https://raw.githubusercontent.com/typelevel/cats-effect/v3.5.2/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala:366:21)","
at $s_Lcats_effect_std_Dispatcher$__cats$effect$std$Dispatcher$$anon$2$$_$enqueue$1__ju_concurrent_atomic_AtomicReference__Lcats_effect_std_Dispatcher$Registration$1__V (https://raw.githubusercontent.com/typelevel/cats-effect/v3.5.2/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala:366:21)","    
at $c_Lcats_effect_std_Dispatcher$$anon$2.prototype.unsafeToFutureCancelable__O__T2 (https://raw.githubusercontent.com/typelevel/cats-effect/v3.5.2/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala:384:18)","    
at $f_Lcats_effect_std_Dispatcher__unsafeRunAsync__O__F1__V (https://raw.githubusercontent.com/typelevel/cats-effect/v3.5.2/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala:60:29)","    
at $c_Lcats_effect_std_Dispatcher$$anon$2.prototype.unsafeRunAndForget__O__V (https://raw.githubusercontent.com/typelevel/cats-effect/v3.5.2/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala:317:7)","    
at Duplex._destroy (https://raw.githubusercontent.com/typelevel/fs2/e4d38ae4a80005dc27cd3f03227bc22efe32537a/io/js/src/main/scala/fs2/io/ioplatform.scala:242:52)","    
at _destroy (node:internal/streams/destroy:109:10)","    
at Duplex.destroy (node:internal/streams/destroy:71:5)","    
at JSStreamSocket.doClose (node:internal/js_stream_socket:228:17)","    
at JSStreamSocket.handle.close (node:internal/js_stream_socket:52:12)"

Apologies for the random quotes in it, it's just how it came, seemed more effort that it was worth to remove them 😀

I'm afraid I can't offer much more direction as to how and when this actually happens. I currently don't have enough detail in what's going on around that to tell you what the lambda is doing at that point.

I can tell you that it's an occasional thing, we're consuming a kinesis stream so AWS retries on errors, and we get through the events eventually, just with a lot of retries caused by this

@armanbilge
Copy link
Member Author

Not at all, thanks, that's great! I will think about how the leak may be happening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants