Skip to content

Commit

Permalink
Add monitoring with Kamon (disabled by default) (#1126)
Browse files Browse the repository at this point in the history
For now:
- we only track some tasks (especially in the router, but not even
`node_announcement` and `channel_update`
- all db calls are monitored
- kamon is disabled by default
  • Loading branch information
pm47 authored Sep 6, 2019
1 parent 7a50610 commit ff0b4c8
Show file tree
Hide file tree
Showing 15 changed files with 452 additions and 190 deletions.
11 changes: 11 additions & 0 deletions eclair-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- MONITORING -->
<dependency>
<groupId>io.kamon</groupId>
<artifactId>kamon-core_${scala.version.short}</artifactId>
<version>${kamon.version}</version>
</dependency>
<dependency>
<groupId>io.kamon</groupId>
<artifactId>kamon-akka_${scala.version.short}</artifactId>
<version>${kamon.version}</version>
</dependency>
<!-- TESTS -->
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object NodeParams {
ConfigFactory.parseProperties(System.getProperties)
.withFallback(ConfigFactory.parseFile(new File(datadir, "eclair.conf")))
.withFallback(overrideDefaults)
.withFallback(ConfigFactory.load()).getConfig("eclair")
.withFallback(ConfigFactory.load())

def getSeed(datadir: File): ByteVector = {
val seedPath = new File(datadir, "seed.dat")
Expand Down
41 changes: 41 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2019 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair

import kamon.Kamon

import scala.concurrent.{ExecutionContext, Future}

object KamonExt {

def time[T](name: String)(f: => T) = {
val timer = Kamon.timer(name).withoutTags().start()
try {
f
} finally {
timer.stop()
}
}

def timeFuture[T](name: String)(f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val timer = Kamon.timer(name).withoutTags().start()
val res = f
res onComplete { case _ => timer.stop }
res
}

}
3 changes: 2 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class Setup(datadir: File,
secureRandom.nextInt()

datadir.mkdirs()
val config = NodeParams.loadConfiguration(datadir, overrideDefaults)
val appConfig = NodeParams.loadConfiguration(datadir, overrideDefaults)
val config = appConfig.getConfig("eclair")
val seed = seed_opt.getOrElse(NodeParams.getSeed(datadir))
val chain = config.getString("chain")
val chaindir = new File(datadir, chain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.TxCoordinates
import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.wire.ChannelAnnouncement
import kamon.Kamon
import org.json4s.JsonAST._

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -149,26 +150,36 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {

def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)

for {
blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex))
txid: String <- rpcClient.invoke("getblock", blockHash).map {
case json => Try {
val JArray(txs) = json \ "tx"
txs(txIndex).extract[String]
} getOrElse ByteVector32.Zeroes.toHex
}
tx <- getRawTransaction(txid)
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map {
case res => UtxoStatus.Spent(spendingTxConfirmed = !res)
val span = Kamon.spanBuilder("validate-bitcoin-client").start()
for {
_ <- Future.successful(0)
span0 = Kamon.spanBuilder("getblockhash").start()
blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex))
_ = span0.finish()
span1 = Kamon.spanBuilder("getblock").start()
txid: String <- rpcClient.invoke("getblock", blockHash).map {
case json => Try {
val JArray(txs) = json \ "tx"
txs(txIndex).extract[String]
} getOrElse ByteVector32.Zeroes.toHex
}
}
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))
_ = span1.finish()
span2 = Kamon.spanBuilder("getrawtx").start()
tx <- getRawTransaction(txid)
_ = span2.finish()
span3 = Kamon.spanBuilder("utxospendable-mempool").start()
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
_ = span3.finish()
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map {
case res => UtxoStatus.Spent(spendingTxConfirmed = !res)
}
}
_ = span.finish()
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))

} recover { case t: Throwable => ValidateResult(c, Left(t)) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted
import fr.acinq.eclair.io.Authenticator.{Authenticated, AuthenticationFailed, PendingAuth}
import fr.acinq.eclair.wire.LightningMessageCodecs
import fr.acinq.eclair.{Logs, NodeParams}
import kamon.Kamon

/**
* The purpose of this class is to serve as a buffer for newly connection before they are authenticated
Expand All @@ -43,6 +44,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo
def ready(switchboard: ActorRef, authenticating: Map[ActorRef, PendingAuth]): Receive = {
case pending@PendingAuth(connection, remoteNodeId_opt, address, _) =>
log.debug(s"authenticating connection to ${address.getHostString}:${address.getPort} (pending=${authenticating.size} handlers=${context.children.size})")
Kamon.counter("peers.connecting.count").withTag("state", "authenticating").increment()
val transport = context.actorOf(TransportHandler.props(
KeyPair(nodeParams.nodeId.value, nodeParams.privateKey.value),
remoteNodeId_opt.map(_.value),
Expand All @@ -56,6 +58,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo
import pendingAuth.{address, remoteNodeId_opt}
val outgoing = remoteNodeId_opt.isDefined
log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (outgoing) "outgoing" else "incoming"}")
Kamon.counter("peers.connecting.count").withTag("state", "authenticated").increment()
switchboard ! Authenticated(connection, transport, remoteNodeId, address, remoteNodeId_opt.isDefined, pendingAuth.origin_opt)
context become ready(switchboard, authenticating - transport)

Expand Down
23 changes: 23 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{secureRandom, wire, _}
import kamon.Kamon
import scodec.Attempt
import scodec.bits.ByteVector

Expand Down Expand Up @@ -535,6 +536,22 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
case DISCONNECTED -> _ if nodeParams.autoReconnect => cancelTimer(RECONNECT_TIMER)
}

onTransition {
case _ -> CONNECTED =>
Metrics.connectedPeers.increment()
context.system.eventStream.publish(PeerConnected(self, remoteNodeId))
case CONNECTED -> DISCONNECTED =>
Metrics.connectedPeers.decrement()
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
}

onTermination {
case StopEvent(_, CONNECTED, d: ConnectedData) =>
// the transition handler won't be fired if we go directly from CONNECTED to closed
Metrics.connectedPeers.decrement()
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
}

def createNewChannel(nodeParams: NodeParams, funder: Boolean, fundingAmount: Satoshi, origin_opt: Option[ActorRef]): (ActorRef, LocalParams) = {
val defaultFinalScriptPubKey = Helpers.getFinalScriptPubKey(wallet, nodeParams.chainHash)
val localParams = makeChannelParams(nodeParams, defaultFinalScriptPubKey, funder, fundingAmount)
Expand Down Expand Up @@ -640,6 +657,12 @@ object Peer {

// @formatter:on

object Metrics {
val peers = Kamon.rangeSampler("peers.count").withoutTags()
val connectedPeers = Kamon.rangeSampler("peers.connected.count").withoutTags()
val channels = Kamon.rangeSampler("channels.count").withoutTags()
}

def makeChannelParams(nodeParams: NodeParams, defaultFinalScriptPubKey: ByteVector, isFunder: Boolean, fundingAmount: Satoshi): LocalParams = {
val entropy = new Array[Byte](16)
secureRandom.nextBytes(entropy)
Expand Down
26 changes: 26 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.io

import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey

sealed trait PeerEvent

case class PeerConnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent

case class PeerDisconnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp}
import fr.acinq.eclair.NodeParams
import kamon.Kamon

import scala.concurrent.Promise

Expand Down Expand Up @@ -52,6 +53,7 @@ class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocke
def listening(listener: ActorRef): Receive = {
case Connected(remote, _) =>
log.info(s"connected to $remote")
Kamon.counter("peers.connecting.count").withTag("state", "connected").increment()
val connection = sender
authenticator ! Authenticator.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None)
listener ! ResumeAccepting(batchSize = 1)
Expand Down
54 changes: 46 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package fr.acinq.eclair.payment

import akka.actor.{Actor, ActorLogging, Props}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.{MilliSatoshi, NodeParams}
import fr.acinq.eclair.channel.Helpers.Closing.{LocalClose, MutualClose, RecoveryClose, RemoteClose, RevokedClose}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel.Channel.{LocalError, RemoteError}
import fr.acinq.eclair.channel.Helpers.Closing._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent}
import kamon.Kamon

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand All @@ -40,26 +43,61 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {

override def receive: Receive = {

case e: PaymentSent => db.add(e)

case e: PaymentReceived => db.add(e)

case e: PaymentRelayed => db.add(e)
case e: PaymentSent =>
Kamon
.histogram("payment.hist")
.withTag("direction", "sent")
.record(e.amount.truncateToSatoshi.toLong)
db.add(e)

case e: PaymentReceived =>
Kamon
.histogram("payment.hist")
.withTag("direction", "received")
.record(e.amount.truncateToSatoshi.toLong)
db.add(e)

case e: PaymentRelayed =>
Kamon
.histogram("payment.hist")
.withTag("direction", "relayed")
.withTag("type", "total")
.record(e.amountIn.truncateToSatoshi.toLong)
Kamon
.histogram("payment.hist")
.withTag("direction", "relayed")
.withTag("type", "fee")
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
db.add(e)

case e: NetworkFeePaid => db.add(e)

case e: AvailableBalanceChanged => balanceEventThrottler ! e

case e: ChannelErrorOccured => db.add(e)
case e: ChannelErrorOccured =>
val metric = Kamon.counter("channels.errors")
e.error match {
case LocalError(_) if e.isFatal => metric.withTag("origin", "local").withTag("fatal", "yes").increment()
case LocalError(_) if !e.isFatal => metric.withTag("origin", "local").withTag("fatal", "no").increment()
case RemoteError(_) => metric.withTag("origin", "remote").increment()
}
db.add(e)

case e: ChannelStateChanged =>
val metric = Kamon.counter("channels.lifecycle")
// NB: order matters!
e match {
case ChannelStateChanged(_, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, d: DATA_NORMAL) =>
metric.withTag("event", "created").increment()
db.add(ChannelLifecycleEvent(d.channelId, remoteNodeId, d.commitments.commitInput.txOut.amount, d.commitments.localParams.isFunder, !d.commitments.announceChannel, "created"))
case ChannelStateChanged(_, _, _, WAIT_FOR_INIT_INTERNAL, _, _) =>
case ChannelStateChanged(_, _, _, _, CLOSING, _) =>
metric.withTag("event", "closing").increment()
case _ => ()
}

case e: ChannelClosed =>
Kamon.counter("channels.lifecycle").withTag("event", "closed").increment()
val event = e.closingType match {
case MutualClose => "mutual"
case LocalClose => "local"
Expand Down
Loading

0 comments on commit ff0b4c8

Please sign in to comment.