Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions io/js/src/main/scala/fs2/io/internal/facade/events.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ private[io] trait EventEmitter extends js.Object {
}

private[io] object EventEmitter {
final class Scope private[EventEmitter] {
private[EventEmitter] val cleanup = new js.Array[js.Function0[Unit]]
}

def openScope[F[_]](implicit F: Sync[F]): Resource[F, Scope] =
Resource.make(F.delay(new Scope)) { scope =>
F.delay {
scope.cleanup.foreach { task =>
try
task()
catch { case _: Throwable => () }
}
}
}

implicit class ops(val eventTarget: EventEmitter) extends AnyVal {

def registerListener[F[_], E](eventName: String, dispatcher: Dispatcher[F])(
Expand Down Expand Up @@ -87,5 +102,40 @@ private[io] object EventEmitter {
eventTarget.once(eventName, fn)
Some(F.delay(eventTarget.removeListener(eventName, fn)))
}

def unsafeRegisterListener[F[_], E](eventName: String, dispatcher: Dispatcher[F], scope: Scope)(
listener: E => F[Unit]
): Unit = {
val fn: js.Function1[E, Unit] = e => dispatcher.unsafeRunAndForget(listener(e))
eventTarget.on(eventName, fn)
scope.cleanup.push(() => eventTarget.removeListener(eventName, fn))
()
}

def unsafeRegisterOneTimeListener0[F[_], E](
eventName: String,
dispatcher: Dispatcher[F],
scope: Scope
)(
listener: () => F[Unit]
): Unit = {
val fn: js.Function0[Unit] = () => dispatcher.unsafeRunAndForget(listener())
eventTarget.once(eventName, fn)
scope.cleanup.push(() => eventTarget.removeListener(eventName, fn))
()
}

def unsafeRegisterOneTimeListener[F[_], E](
eventName: String,
dispatcher: Dispatcher[F],
scope: Scope
)(
listener: E => F[Unit]
): Unit = {
val fn: js.Function1[E, Unit] = e => dispatcher.unsafeRunAndForget(listener(e))
eventTarget.once(eventName, fn)
scope.cleanup.push(() => eventTarget.removeListener(eventName, fn))
()
}
}
}
66 changes: 35 additions & 31 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.internal.MicrotaskExecutor
import fs2.io.internal.facade

import java.nio.charset.Charset
Expand Down Expand Up @@ -63,37 +62,42 @@ private[fs2] trait ioplatform {
dispatcher <- Dispatcher.sequential[F]
channel <- Channel.unbounded[F, Unit].toResource
error <- F.deferred[Throwable].toResource
readableResource = for {
readable <- Resource.makeCase(F.delay(thunk)) {
case (readable, Resource.ExitCase.Succeeded) =>
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
else
F.unit
}
_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void)
_ <- readable.registerListener[F, Any]("end", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, js.Error]("error", dispatcher) { e =>
error.complete(js.JavaScriptException(e)).void
scope <- facade.events.EventEmitter.openScope
Copy link

@mikeshepherd mikeshepherd Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to change the ordering of unregistering the listeners wrt the finalizer for the thunk. The unregistering will now happen after rather than prior to that.

Notably this means that the registered listeners will get a close event that they wouldn't have previously.

This feels like a good thing? but I just wanted to call it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out. Yes, you're right. I wonder if it is worth doing the unregister in reverse order in EventEmitter.Scope 🤔

But yes, in general I think it's okay to continue receiving events, because the resource is already in the process of shutting down anyway, so I don't think it should be user-visible.

readable <- Resource.makeCase {
F.delay {
val readable = thunk

readable.unsafeRegisterListener[F, Any]("readable", dispatcher, scope) { _ =>
channel.send(()).void
}
readable.unsafeRegisterListener[F, Any]("end", dispatcher, scope) { _ =>
channel.close.void
}
readable.unsafeRegisterListener[F, Any]("close", dispatcher, scope) { _ =>
channel.close.void
}
readable.unsafeRegisterListener[F, js.Error]("error", dispatcher, scope) { e =>
error.complete(js.JavaScriptException(e)).void
}

readable
}
} yield readable
// Implementation note: why run on the MicrotaskExecutor?
// In many cases creating a `Readable` starts async side-effects (e.g. negotiating TLS handshake or opening a file handle).
// Furthermore, these side-effects will invoke the listeners we register to the `Readable`.
// Therefore, it is critical that the listeners are registered to the `Readable` _before_ these async side-effects occur:
// in other words, before we next yield (cede) to the event loop. Because an arbitrary effect `F` (particularly `IO`) may cede at any time,
// our only recourse is to run the entire creation/listener registration process on the microtask executor.
readable <- readableResource.evalOn(MicrotaskExecutor)
} {
case (readable, Resource.ExitCase.Succeeded) =>
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
else
F.unit
}
stream =
(channel.stream
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
Expand Down
134 changes: 65 additions & 69 deletions io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,79 +62,75 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
clientMode: Boolean,
params: TLSParameters,
logger: TLSLogger[F]
): Resource[F, TLSSocket[F]] = (Dispatcher.sequential[F], Dispatcher.parallel[F])
.flatMapN { (seqDispatcher, parDispatcher) =>
if (clientMode) {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake =>
TLSSocket
.forAsync(
socket,
sock => {
val options = params.toTLSConnectOptions(parDispatcher)
options.secureContext = context
if (insecure)
options.rejectUnauthorized = false
options.enableTrace = logger != TLSLogger.Disabled
options.socket = sock
val tlsSock = facade.tls.connect(options)
tlsSock.once(
"secureConnect",
() => seqDispatcher.unsafeRunAndForget(handshake.complete(Either.unit))
)
tlsSock.once[js.Error](
"error",
e =>
seqDispatcher.unsafeRunAndForget(
handshake.complete(Left(new js.JavaScriptException(e)))
): Resource[F, TLSSocket[F]] =
(Dispatcher.sequential[F], Dispatcher.parallel[F], facade.events.EventEmitter.openScope)
.flatMapN { (seqDispatcher, parDispatcher, scope) =>
if (clientMode) {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake =>
TLSSocket
.forAsync(
socket,
sock => {
val options = params.toTLSConnectOptions(parDispatcher)
options.secureContext = context
if (insecure)
options.rejectUnauthorized = false
options.enableTrace = logger != TLSLogger.Disabled
options.socket = sock
val tlsSock = facade.tls.connect(options)
tlsSock
.unsafeRegisterOneTimeListener0("secureConnect", seqDispatcher, scope)(
() => handshake.complete(Either.unit).void
)
)
tlsSock
}
)
.evalTap(_ => handshake.get.rethrow)
}
} else {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { verifyError =>
TLSSocket
.forAsync(
socket,
sock => {
val options = params.toTLSSocketOptions(parDispatcher)
options.secureContext = context
if (insecure)
options.rejectUnauthorized = false
options.enableTrace = logger != TLSLogger.Disabled
options.isServer = true
val tlsSock = new facade.tls.TLSSocket(sock, options)
tlsSock.once(
"secure",
{ () =>
val requestCert = options.requestCert.getOrElse(false)
val rejectUnauthorized = options.rejectUnauthorized.getOrElse(true)
val result =
if (requestCert && rejectUnauthorized)
Option(tlsSock.ssl.verifyError())
.map(e => new JavaScriptSSLException(js.JavaScriptException(e)))
.toLeft(())
else Either.unit
seqDispatcher.unsafeRunAndForget(verifyError.complete(result))
tlsSock.unsafeRegisterOneTimeListener[F, js.Error](
"error",
seqDispatcher,
scope
)(e => handshake.complete(Left(new js.JavaScriptException(e))).void)
tlsSock
}
)
.evalTap(_ => handshake.get.rethrow)
}
} else {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { verifyError =>
TLSSocket
.forAsync(
socket,
sock => {
val options = params.toTLSSocketOptions(parDispatcher)
options.secureContext = context
if (insecure)
options.rejectUnauthorized = false
options.enableTrace = logger != TLSLogger.Disabled
options.isServer = true
val tlsSock = new facade.tls.TLSSocket(sock, options)
tlsSock.unsafeRegisterOneTimeListener0("secure", seqDispatcher, scope) {
() =>
val requestCert = options.requestCert.getOrElse(false)
val rejectUnauthorized = options.rejectUnauthorized.getOrElse(true)
val result =
if (requestCert && rejectUnauthorized)
// side-effect must run in the callback
Option(tlsSock.ssl.verifyError())
.map(e => new JavaScriptSSLException(js.JavaScriptException(e)))
.toLeft(())
else Either.unit
verifyError.complete(result).void
}
)
tlsSock.once[js.Error](
"error",
e =>
seqDispatcher.unsafeRunAndForget(
verifyError.complete(Left(new js.JavaScriptException(e)))
)
)
tlsSock
}
)
.evalTap(_ => verifyError.get.rethrow)
tlsSock.unsafeRegisterOneTimeListener[F, js.Error](
"error",
seqDispatcher,
scope
)(e => verifyError.complete(Left(new js.JavaScriptException(e))).void)
tlsSock
}
)
.evalTap(_ => verifyError.get.rethrow)
}
}
}
}
.adaptError { case IOException(ex) => ex }
.adaptError { case IOException(ex) => ex }
}

def fromSecureContext(context: SecureContext): TLSContext[F] =
Expand Down