Skip to content

Commit 55b50ec

Browse files
authored
ZMQ actors should subscribe to a single topic (#1793)
We use one actor per topic, but each actor previously registered to multiple topics so we received duplicate events and consumed twice the necessary bandwidth.
1 parent a8d4e07 commit 55b50ec

File tree

3 files changed

+11
-7
lines changed

3 files changed

+11
-7
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,8 @@ class Setup(datadir: File,
235235

236236
extendedBitcoinClient = new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoin))
237237
watcher = {
238-
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
239-
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
238+
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), ZMQActor.Topics.RawBlock, Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
239+
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), ZMQActor.Topics.RawTx, Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
240240
system.spawn(Behaviors.supervise(ZmqWatcher(nodeParams.chainHash, blockCount, extendedBitcoinClient)).onFailure(typed.SupervisorStrategy.resume), "watcher")
241241
}
242242

eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import scala.util.Try
3131
/**
3232
* Created by PM on 04/04/2017.
3333
*/
34-
class ZMQActor(address: String, connected: Option[Promise[Done]] = None) extends Actor with ActorLogging {
34+
class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] = None) extends Actor with ActorLogging {
3535

3636
import ZMQActor._
3737

@@ -40,8 +40,7 @@ class ZMQActor(address: String, connected: Option[Promise[Done]] = None) extends
4040
val subscriber = ctx.createSocket(SocketType.SUB)
4141
subscriber.monitor("inproc://events", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED)
4242
subscriber.connect(address)
43-
subscriber.subscribe("rawblock".getBytes(ZMQ.CHARSET))
44-
subscriber.subscribe("rawtx".getBytes(ZMQ.CHARSET))
43+
subscriber.subscribe(topic.getBytes(ZMQ.CHARSET))
4544

4645
val monitor = ctx.createSocket(SocketType.PAIR)
4746
monitor.connect("inproc://events")
@@ -114,4 +113,9 @@ object ZMQActor {
114113
case object ZMQDisconnected extends ZMQEvent
115114
// @formatter:on
116115

116+
object Topics {
117+
val RawBlock: String = "rawblock"
118+
val RawTx: String = "rawtx"
119+
}
120+
117121
}

eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
5151
waitForBitcoindReady()
5252
logger.info("starting zmq actors")
5353
val (zmqBlockConnected, zmqTxConnected) = (Promise[Done](), Promise[Done]())
54-
zmqBlock = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqBlockPort", Some(zmqBlockConnected))))
55-
zmqTx = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqTxPort", Some(zmqTxConnected))))
54+
zmqBlock = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqBlockPort", ZMQActor.Topics.RawBlock, Some(zmqBlockConnected))))
55+
zmqTx = system.actorOf(Props(new ZMQActor(s"tcp://127.0.0.1:$bitcoindZmqTxPort", ZMQActor.Topics.RawTx, Some(zmqTxConnected))))
5656
awaitCond(zmqBlockConnected.isCompleted && zmqTxConnected.isCompleted)
5757
super.beforeAll()
5858
}

0 commit comments

Comments
 (0)