Skip to content

Commit 43a89f8

Browse files
pm47t-bast
andauthored
Add a random delay before processing blocks (#1825)
The goal is to reduce herd effects when there are lots of channels. Co-authored-by: Bastien Teinturier <[email protected]>
1 parent 6f6c458 commit 43a89f8

File tree

5 files changed

+25
-12
lines changed

5 files changed

+25
-12
lines changed

eclair-core/src/main/resources/reference.conf

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ eclair {
8484
// expiry-delta-blocks.
8585
fulfill-safety-before-timeout-blocks = 24
8686
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
87+
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect
8788

8889
fee-base-msat = 1000
8990
fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%)

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

+2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
6363
expiryDelta: CltvExpiryDelta,
6464
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
6565
minFinalExpiryDelta: CltvExpiryDelta,
66+
maxBlockProcessingDelay: FiniteDuration,
6667
htlcMinimum: MilliSatoshi,
6768
toRemoteDelay: CltvExpiryDelta,
6869
maxToLocalDelay: CltvExpiryDelta,
@@ -337,6 +338,7 @@ object NodeParams extends Logging {
337338
expiryDelta = expiryDelta,
338339
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
339340
minFinalExpiryDelta = minFinalExpiryDelta,
341+
maxBlockProcessingDelay = FiniteDuration(config.getDuration("max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
340342
htlcMinimum = htlcMinimum,
341343
toRemoteDelay = CltvExpiryDelta(config.getInt("to-remote-delay-blocks")),
342344
maxToLocalDelay = CltvExpiryDelta(config.getInt("max-to-local-delay-blocks")),

eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala

+17-10
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,19 @@ import fr.acinq.eclair.crypto.ShaChain
3737
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
3838
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
3939
import fr.acinq.eclair.db.PendingCommandsDb
40-
import fr.acinq.eclair.db.pg.PgUtils.PgLock.logger
4140
import fr.acinq.eclair.io.Peer
4241
import fr.acinq.eclair.payment.PaymentSettlingOnChain
4342
import fr.acinq.eclair.router.Announcements
4443
import fr.acinq.eclair.transactions.Transactions.{ClosingTx, TxOwner}
4544
import fr.acinq.eclair.transactions._
4645
import fr.acinq.eclair.wire.protocol._
47-
import org.sqlite.SQLiteException
4846
import scodec.bits.ByteVector
4947

5048
import java.sql.SQLException
5149
import scala.collection.immutable.Queue
5250
import scala.concurrent.ExecutionContext
5351
import scala.concurrent.duration._
54-
import scala.util.{Failure, Success, Try}
52+
import scala.util.{Failure, Random, Success, Try}
5553

5654
/**
5755
* Created by PM on 20/08/2015.
@@ -125,6 +123,9 @@ object Channel {
125123
*/
126124
case class OutgoingMessage(msg: LightningMessage, peerConnection: ActorRef)
127125

126+
/** We don't immediately process [[CurrentBlockCount]] to avoid herd effects */
127+
case class ProcessCurrentBlockCount(c: CurrentBlockCount)
128+
128129
}
129130

130131
class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: typed.ActorRef[ZmqWatcher.Command], relayer: ActorRef, txPublisherFactory: Channel.TxPublisherFactory, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {
@@ -150,6 +151,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
150151

151152
// this will be used to detect htlc timeouts
152153
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])
154+
// the constant delay by which we delay processing of blocks (it will be smoothened among all channels)
155+
private val blockProcessingDelay = Random.nextLong(nodeParams.maxBlockProcessingDelay.toMillis + 1).millis
153156
// this will be used to make sure the current commitment fee is up-to-date
154157
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])
155158

@@ -631,7 +634,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
631634

632635
case Event(BITCOIN_FUNDING_PUBLISH_FAILED, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => handleFundingPublishFailed(d)
633636

634-
case Event(c: CurrentBlockCount, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => d.fundingTx match {
637+
case Event(ProcessCurrentBlockCount(c), d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => d.fundingTx match {
635638
case Some(_) => stay // we are funder, we're still waiting for the funding tx to be confirmed
636639
case None if c.blockCount - d.waitingSinceBlock > FUNDING_TIMEOUT_FUNDEE =>
637640
log.warning(s"funding tx hasn't been published in ${c.blockCount - d.waitingSinceBlock} blocks")
@@ -943,7 +946,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
943946
}
944947
}
945948

946-
case Event(c: CurrentBlockCount, d: DATA_NORMAL) => handleNewBlock(c, d)
949+
case Event(ProcessCurrentBlockCount(c), d: DATA_NORMAL) => handleNewBlock(c, d)
947950

948951
case Event(c: CurrentFeerates, d: DATA_NORMAL) => handleCurrentFeerate(c, d)
949952

@@ -1221,7 +1224,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
12211224

12221225
case Event(r: RevocationTimeout, d: DATA_SHUTDOWN) => handleRevocationTimeout(r, d)
12231226

1224-
case Event(c: CurrentBlockCount, d: DATA_SHUTDOWN) => handleNewBlock(c, d)
1227+
case Event(ProcessCurrentBlockCount(c), d: DATA_SHUTDOWN) => handleNewBlock(c, d)
12251228

12261229
case Event(c: CurrentFeerates, d: DATA_SHUTDOWN) => handleCurrentFeerate(c, d)
12271230

@@ -1519,7 +1522,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
15191522
// note: this can only happen if state is NORMAL or SHUTDOWN
15201523
// -> in NEGOTIATING there are no more htlcs
15211524
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
1522-
case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)
1525+
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)
15231526

15241527
case Event(c: CurrentFeerates, d: HasCommitments) =>
15251528
handleOfflineFeerate(c, d)
@@ -1710,7 +1713,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
17101713
context.system.scheduler.scheduleOnce(5 seconds, self, remoteAnnSigs)
17111714
stay
17121715

1713-
case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)
1716+
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)
17141717

17151718
case Event(c: CurrentFeerates, d: HasCommitments) =>
17161719
handleOfflineFeerate(c, d)
@@ -1792,8 +1795,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
17921795
// we only care about this event in NORMAL and SHUTDOWN state, and there may be cases where the task is not cancelled
17931796
case Event(_: RevocationTimeout, _) => stay
17941797

1798+
// we reschedule with a random delay to prevent herd effect when there are a lot of channels
1799+
case Event(c: CurrentBlockCount, _) =>
1800+
context.system.scheduler.scheduleOnce(blockProcessingDelay, self, ProcessCurrentBlockCount(c))
1801+
stay
1802+
17951803
// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
1796-
case Event(CurrentBlockCount(_), _) => stay
1804+
case Event(ProcessCurrentBlockCount(_), _) => stay
17971805

17981806
// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
17991807
case Event(CurrentFeerates(_), _) => stay
@@ -2568,4 +2576,3 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
25682576

25692577
}
25702578

2571-

eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ object TestConstants {
111111
expiryDelta = CltvExpiryDelta(144),
112112
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
113113
minFinalExpiryDelta = CltvExpiryDelta(18),
114+
maxBlockProcessingDelay = 10 millis,
114115
htlcMinimum = 0 msat,
115116
minDepthBlocks = 3,
116117
toRemoteDelay = CltvExpiryDelta(144),
@@ -215,6 +216,7 @@ object TestConstants {
215216
expiryDelta = CltvExpiryDelta(144),
216217
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
217218
minFinalExpiryDelta = CltvExpiryDelta(18),
219+
maxBlockProcessingDelay = 10 millis,
218220
htlcMinimum = 1000 msat,
219221
minDepthBlocks = 3,
220222
toRemoteDelay = CltvExpiryDelta(144),
@@ -287,4 +289,4 @@ object TestTags {
287289
// Tests that call an external API (which may start failing independently of our code).
288290
object ExternalApi extends Tag("external-api")
289291

290-
}
292+
}

eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
7777
"eclair.bitcoind.wallet" -> defaultWallet,
7878
"eclair.mindepth-blocks" -> 2,
7979
"eclair.max-htlc-value-in-flight-msat" -> 100000000000L,
80-
"eclair.router.broadcast-interval" -> "2 second",
80+
"eclair.max-block-processing-delay" -> "2 seconds",
81+
"eclair.router.broadcast-interval" -> "2 seconds",
8182
"eclair.auto-reconnect" -> false,
8283
"eclair.to-remote-delay-blocks" -> 24,
8384
"eclair.multi-part-payment-expiry" -> "20 seconds").asJava).withFallback(ConfigFactory.load())

0 commit comments

Comments
 (0)