Skip to content

Commit 9c3ee59

Browse files
authored
Check blockchain watchdogs regularly (#1808)
We want to check secondary blockchain sources when we haven't received blocks in a while. Fixes #1803
1 parent ec276f8 commit 9c3ee59

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdog.scala

+11-2
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ import scala.util.Random
3434
/** Monitor secondary blockchain sources to detect when we're being eclipsed. */
3535
object BlockchainWatchdog {
3636

37+
// @formatter:off
3738
case class BlockHeaderAt(blockCount: Long, blockHeader: BlockHeader)
39+
case object NoBlockReceivedTimer
3840

39-
// @formatter:off
4041
sealed trait BlockchainWatchdogEvent
4142
/**
4243
* We are missing too many blocks compared to one of our blockchain watchdogs.
@@ -50,21 +51,29 @@ object BlockchainWatchdog {
5051
case class LatestHeaders(currentBlockCount: Long, blockHeaders: Set[BlockHeaderAt], source: String) extends Command
5152
private[watchdogs] case class WrappedCurrentBlockCount(currentBlockCount: Long) extends Command
5253
private case class CheckLatestHeaders(currentBlockCount: Long) extends Command
54+
private case class NoBlockReceivedSince(lastBlockCount: Long) extends Command
5355
// @formatter:on
5456

5557
/**
5658
* @param chainHash chain we're interested in.
5759
* @param maxRandomDelay to avoid the herd effect whenever a block is created, we add a random delay before we query
5860
* secondary blockchain sources. This parameter specifies the maximum delay we'll allow.
5961
*/
60-
def apply(chainHash: ByteVector32, maxRandomDelay: FiniteDuration): Behavior[Command] = {
62+
def apply(chainHash: ByteVector32, maxRandomDelay: FiniteDuration, blockTimeout: FiniteDuration = 15 minutes): Behavior[Command] = {
6163
Behaviors.setup { context =>
6264
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockCount](cbc => WrappedCurrentBlockCount(cbc.blockCount)))
6365
Behaviors.withTimers { timers =>
66+
// We start a timer to check blockchain watchdogs regularly even when we don't receive any block.
67+
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(0), blockTimeout)
6468
Behaviors.receiveMessage {
69+
case NoBlockReceivedSince(lastBlockCount) =>
70+
context.self ! CheckLatestHeaders(lastBlockCount)
71+
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(lastBlockCount), blockTimeout)
72+
Behaviors.same
6573
case WrappedCurrentBlockCount(blockCount) =>
6674
val delay = Random.nextInt(maxRandomDelay.toSeconds.toInt).seconds
6775
timers.startSingleTimer(CheckLatestHeaders(blockCount), delay)
76+
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(blockCount), blockTimeout)
6877
Behaviors.same
6978
case CheckLatestHeaders(blockCount) =>
7079
val id = UUID.randomUUID()

eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdogSpec.scala

+25
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,29 @@ class BlockchainWatchdogSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa
6161
testKit.stop(watchdog)
6262
}
6363

64+
test("fetch block headers when we don't receive blocks", TestTags.ExternalApi) {
65+
val eventListener = TestProbe[DangerousBlocksSkew]()
66+
system.eventStream ! EventStream.Subscribe(eventListener.ref)
67+
val blockTimeout = 5 seconds
68+
val watchdog = testKit.spawn(BlockchainWatchdog(Block.TestnetGenesisBlock.hash, 1 second, blockTimeout))
69+
70+
watchdog ! WrappedCurrentBlockCount(500000)
71+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
72+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
73+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
74+
eventListener.expectNoMessage(100 millis)
75+
76+
// If we don't receive blocks, we check blockchain sources.
77+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
78+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
79+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
80+
eventListener.expectNoMessage(100 millis)
81+
82+
// And we keep checking blockchain sources until we receive a block.
83+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
84+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
85+
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
86+
eventListener.expectNoMessage(100 millis)
87+
}
88+
6489
}

0 commit comments

Comments
 (0)