Skip to content

Commit

Permalink
Better handling of zeromq connection close
Browse files Browse the repository at this point in the history
  • Loading branch information
alexarchambault committed Jun 19, 2024
1 parent 575ed93 commit 0a4622c
Showing 1 changed file with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package almond.channels.zeromq

import java.nio.channels.{ClosedByInterruptException, Selector}
import java.nio.channels.{ClosedByInterruptException, ClosedSelectorException, Selector}
import java.nio.charset.StandardCharsets.UTF_8

import almond.channels._
Expand Down Expand Up @@ -190,18 +190,29 @@ final class ZeromqConnection(
(channel, new PollItem(socket.channel, Poller.POLLIN))
}

withSelector { selector =>
ZMQ.poll(selector, pollItems.map(_._2).toArray, pollingDelay.toMillis)
val doRead = withSelector { selector =>
try {
ZMQ.poll(selector, pollItems.map(_._2).toArray, pollingDelay.toMillis)
true
}
catch {
case _: ClosedSelectorException if selectorOpt.isEmpty =>
// channel was closed
false
}
}

pollItems
.collectFirst {
case (channel, pi) if pi.isReadable =>
channelSocket0(channel)
.read
.map(_.map((channel, _)))
}
.getOrElse(IO.pure(None))
if (doRead)
pollItems
.collectFirst {
case (channel, pi) if pi.isReadable =>
channelSocket0(channel)
.read
.map(_.map((channel, _)))
}
.getOrElse(IO.pure(None))
else
IO.pure(None)
}.evalOn(threads.pollingEc).flatMap(identity)

def close(partial: Boolean, lingerDuration: Duration): IO[Unit] = {
Expand Down

0 comments on commit 0a4622c

Please sign in to comment.