Skip to content

Commit

Permalink
Add SpanReader wrapper for creating additional spans (emeraldpay#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Mar 7, 2023
1 parent 076bbbe commit 01ea756
Show file tree
Hide file tree
Showing 18 changed files with 283 additions and 91 deletions.
20 changes: 20 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/constatnts.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.emeraldpay.dshackle.commons

const val SPAN_READER_NAME = "reader.name"
const val SPAN_REQUEST_INFO = "request.info"
const val SPAN_ERROR = "error"
const val SPAN_STATUS_MESSAGE = "status.message"
const val SPAN_READER_RESULT = "reader.result"
const val SPAN_REQUEST_API_TYPE = "request.api.type"
const val SPAN_REQUEST_UPSTREAM_ID = "request.upstreamId"
const val SPAN_REQUEST_ID = "request.id"

const val LOCAL_READER = "localReader"
const val REMOTE_QUORUM_RPC_READER = "remoteQuorumRpcReader"
const val API_READER = "apiReader"
const val CACHE_BLOCK_BY_HASH_READER = "cacheBlockByHashReader"
const val DIRECT_QUORUM_RPC_READER = "directQuorumRpcReader"
const val CACHE_HEIGHT_BY_HASH_READER = "cacheHeightByHashReader"
const val CACHE_BLOCK_BY_HEIGHT_READER = "cacheBlockByHeightReader"
const val CACHE_TX_BY_HASH_READER = "cacheTxByHashReader"
const val CACHE_RECEIPTS_READER = "cacheReceiptsReader"
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.cloud.sleuth.Tracer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import reactor.core.scheduler.Scheduler
Expand All @@ -21,14 +22,15 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)
cachesFactory: CachesFactory,
callTargetsHolder: CallTargetsHolder,
@Qualifier("headMergedScheduler")
headScheduler: Scheduler
headScheduler: Scheduler,
tracer: Tracer
): List<Multistream> {
return Chain.values()
.filterNot { it == Chain.UNSPECIFIED }
.mapNotNull { chain ->
when (BlockchainType.from(chain)) {
BlockchainType.EVM_POS -> ethereumPosMultistream(chain, cachesFactory, headScheduler)
BlockchainType.EVM_POW -> ethereumMultistream(chain, cachesFactory, headScheduler)
BlockchainType.EVM_POS -> ethereumPosMultistream(chain, cachesFactory, headScheduler, tracer)
BlockchainType.EVM_POW -> ethereumMultistream(chain, cachesFactory, headScheduler, tracer)
BlockchainType.BITCOIN -> bitcoinMultistream(chain, cachesFactory)
else -> null
}
Expand All @@ -38,30 +40,34 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)
private fun ethereumMultistream(
chain: Chain,
cachesFactory: CachesFactory,
headScheduler: Scheduler
headScheduler: Scheduler,
tracer: Tracer
): EthereumMultistream {
val name = "multi-ethereum-$chain"

return EthereumMultistream(
chain,
ArrayList(),
cachesFactory.getCaches(chain),
headScheduler
headScheduler,
tracer
).also { register(it, name) }
}

open fun ethereumPosMultistream(
chain: Chain,
cachesFactory: CachesFactory,
headScheduler: Scheduler
headScheduler: Scheduler,
tracer: Tracer
): EthereumPosMultiStream {
val name = "multi-ethereum-pos-$chain"

return EthereumPosMultiStream(
chain,
ArrayList(),
cachesFactory.getCaches(chain),
headScheduler
headScheduler,
tracer
).also { register(it, name) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.upstream.ApiSource
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import org.springframework.cloud.sleuth.Tracer

// creates instance of a Quorum based reader
interface QuorumReaderFactory {
Expand All @@ -29,11 +30,11 @@ interface QuorumReaderFactory {
}
}

fun create(apis: ApiSource, quorum: CallQuorum, signer: ResponseSigner?): Reader<JsonRpcRequest, QuorumRpcReader.Result>
fun create(apis: ApiSource, quorum: CallQuorum, signer: ResponseSigner?, tracer: Tracer): Reader<JsonRpcRequest, QuorumRpcReader.Result>

class Default : QuorumReaderFactory {
override fun create(apis: ApiSource, quorum: CallQuorum, signer: ResponseSigner?): Reader<JsonRpcRequest, QuorumRpcReader.Result> {
return QuorumRpcReader(apis, quorum, signer)
override fun create(apis: ApiSource, quorum: CallQuorum, signer: ResponseSigner?, tracer: Tracer): Reader<JsonRpcRequest, QuorumRpcReader.Result> {
return QuorumRpcReader(apis, quorum, signer, tracer)
}
}
}
15 changes: 13 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRpcReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package io.emeraldpay.dshackle.quorum

import io.emeraldpay.dshackle.commons.API_READER
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_API_TYPE
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_UPSTREAM_ID
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.upstream.ApiSource
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
Expand All @@ -25,6 +29,7 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import io.emeraldpay.etherjar.rpc.RpcException
import org.slf4j.LoggerFactory
import org.springframework.cloud.sleuth.Tracer
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.function.Tuple3
Expand All @@ -42,13 +47,14 @@ class QuorumRpcReader(
private val apiControl: ApiSource,
private val quorum: CallQuorum,
private val signer: ResponseSigner?,
private val tracer: Tracer
) : Reader<JsonRpcRequest, QuorumRpcReader.Result> {

companion object {
private val log = LoggerFactory.getLogger(QuorumRpcReader::class.java)
}

constructor(apiControl: ApiSource, quorum: CallQuorum) : this(apiControl, quorum, null)
constructor(apiControl: ApiSource, quorum: CallQuorum, tracer: Tracer) : this(apiControl, quorum, null, tracer)

override fun read(key: JsonRpcRequest): Mono<Result> {
// needs at least one response, so start a request
Expand Down Expand Up @@ -129,7 +135,12 @@ class QuorumRpcReader(
}

fun callApi(api: Upstream, key: JsonRpcRequest): Mono<Tuple4<ByteArray, Optional<ResponseSigner.Signature>, Upstream, Optional<String>>> {
return api.getIngressReader()
val apiReader = api.getIngressReader()
val spanParams = mapOf(
SPAN_REQUEST_API_TYPE to apiReader.javaClass.name,
SPAN_REQUEST_UPSTREAM_ID to api.getId()
)
return SpannedReader(apiReader, tracer, API_READER, spanParams)
.read(key)
.flatMap { response ->
log.debug("Received response from upstream ${api.getId()} for method ${key.method}")
Expand Down
57 changes: 57 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/reader/SpannedReader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.emeraldpay.dshackle.reader

import io.emeraldpay.dshackle.commons.SPAN_ERROR
import io.emeraldpay.dshackle.commons.SPAN_READER_NAME
import io.emeraldpay.dshackle.commons.SPAN_READER_RESULT
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_INFO
import io.emeraldpay.dshackle.commons.SPAN_STATUS_MESSAGE
import io.emeraldpay.dshackle.data.HashId
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import org.springframework.cloud.sleuth.Tracer
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.switchIfEmpty

class SpannedReader<K, D>(
private val reader: Reader<K, D>,
private val tracer: Tracer,
private val name: String,
private val additionalParams: Map<String, String> = emptyMap()
) : Reader<K, D> {

override fun read(key: K): Mono<D> {
val newSpan = tracer.nextSpan(tracer.currentSpan())
.name(reader.javaClass.name)
.tag(SPAN_READER_NAME, name)
.start()

extractInfoFromKey(key)?.let {
newSpan.tag(SPAN_REQUEST_INFO, it)
}
additionalParams.forEach { newSpan.tag(it.key, it.value) }

return reader.read(key)
.contextWrite { ReactorSleuth.putSpanInScope(tracer, it, newSpan) }
.doOnError {
newSpan.apply {
tag(SPAN_ERROR, "true")
tag(SPAN_STATUS_MESSAGE, it.message)
end()
}
}
.doOnNext { newSpan.end() }
.switchIfEmpty {
newSpan.tag(SPAN_READER_RESULT, "empty result")
newSpan.end()
Mono.empty()
}
}

private fun extractInfoFromKey(key: K): String? {
return when (key) {
is JsonRpcRequest -> "method: ${key.method}"
is HashId, Long -> "params: $key"
else -> null
}
}
}
21 changes: 14 additions & 7 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.SilentException
import io.emeraldpay.dshackle.commons.LOCAL_READER
import io.emeraldpay.dshackle.commons.REMOTE_QUORUM_RPC_READER
import io.emeraldpay.dshackle.commons.SPAN_ERROR
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_ID
import io.emeraldpay.dshackle.commons.SPAN_STATUS_MESSAGE
import io.emeraldpay.dshackle.config.MainConfig
import io.emeraldpay.dshackle.quorum.CallQuorum
import io.emeraldpay.dshackle.quorum.NotLaggingQuorum
import io.emeraldpay.dshackle.quorum.QuorumReaderFactory
import io.emeraldpay.dshackle.quorum.QuorumRpcReader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.ApiSource
import io.emeraldpay.dshackle.upstream.Multistream
Expand Down Expand Up @@ -137,8 +143,8 @@ open class NativeCall(

private fun errorSpan(span: Span?, message: String) {
span?.apply {
tag("error", "true")
tag("status.message", message)
tag(SPAN_ERROR, "true")
tag(SPAN_STATUS_MESSAGE, message)
}
}

Expand All @@ -151,7 +157,7 @@ open class NativeCall(
if (requestCount > 1) {
val span = tracer.nextSpan(requestSpan)
.name(requestId)
.tag("request.id", requestId)
.tag(SPAN_REQUEST_ID, requestId)
.start()
return ReactorSleuth.putSpanInScope(tracer, ctx, span)
}
Expand All @@ -170,7 +176,7 @@ open class NativeCall(
return@run Mono.error(e)
}
if (callContext.requestCount == 1 && callContext.requestId.isNotBlank()) {
requestSpan?.tag("request.id", callContext.requestId)
requestSpan?.tag(SPAN_REQUEST_ID, callContext.requestId)
}
this.fetch(parsed)
.doOnError { e -> log.warn("Error during native call: ${e.message}") }
Expand Down Expand Up @@ -358,7 +364,8 @@ open class NativeCall(
fun fetch(ctx: ValidCallContext<ParsedCallDetails>): Mono<CallResult> {
return ctx.upstream.getLocalReader(localRouterEnabled)
.flatMap { api ->
api.read(JsonRpcRequest(ctx.payload.method, ctx.payload.params, ctx.nonce, ctx.forwardedSelector))
SpannedReader(api, tracer, LOCAL_READER)
.read(JsonRpcRequest(ctx.payload.method, ctx.payload.params, ctx.nonce, ctx.forwardedSelector))
.flatMap(JsonRpcResponse::requireResult)
.map {
validateResult(it, "local", ctx)
Expand All @@ -381,14 +388,14 @@ open class NativeCall(
if (!ctx.upstream.getMethods().isCallable(ctx.payload.method)) {
return Mono.error(RpcException(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unsupported method"))
}
val reader = quorumReaderFactory.create(ctx.getApis(), ctx.callQuorum, signer)
val reader = quorumReaderFactory.create(ctx.getApis(), ctx.callQuorum, signer, tracer)
val counter = if (reader is QuorumRpcReader) {
reader.getValidAttemptsCount()
} else {
AtomicInteger(-1)
}

return reader
return SpannedReader(reader, tracer, REMOTE_QUORUM_RPC_READER)
.read(JsonRpcRequest(ctx.payload.method, ctx.payload.params, ctx.nonce, ctx.forwardedSelector))
.map {
val bytes = ctx.resultDecorator.processResult(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CurrentBlockCache
import io.emeraldpay.dshackle.cache.HeightByHashAdding
import io.emeraldpay.dshackle.commons.CACHE_BLOCK_BY_HASH_READER
import io.emeraldpay.dshackle.commons.CACHE_BLOCK_BY_HEIGHT_READER
import io.emeraldpay.dshackle.commons.CACHE_HEIGHT_BY_HASH_READER
import io.emeraldpay.dshackle.commons.CACHE_RECEIPTS_READER
import io.emeraldpay.dshackle.commons.CACHE_TX_BY_HASH_READER
import io.emeraldpay.dshackle.commons.DIRECT_QUORUM_RPC_READER
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.data.SourceContainer
Expand All @@ -28,6 +34,7 @@ import io.emeraldpay.dshackle.data.TxId
import io.emeraldpay.dshackle.reader.CompoundReader
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.reader.RekeyingReader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.reader.TransformingReader
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.Multistream
Expand All @@ -41,6 +48,7 @@ import io.emeraldpay.etherjar.rpc.json.TransactionJson
import io.emeraldpay.etherjar.rpc.json.TransactionRefJson
import org.apache.commons.collections4.Factory
import org.slf4j.LoggerFactory
import org.springframework.cloud.sleuth.Tracer
import java.util.function.Function

/**
Expand All @@ -49,7 +57,8 @@ import java.util.function.Function
open class EthereumCachingReader(
private val up: Multistream,
private val caches: Caches,
private val callMethodsFactory: Factory<CallMethods>
private val callMethodsFactory: Factory<CallMethods>,
private val tracer: Tracer
) : Lifecycle {

companion object {
Expand All @@ -58,7 +67,7 @@ open class EthereumCachingReader(

private val objectMapper: ObjectMapper = Global.objectMapper
private val balanceCache = CurrentBlockCache<Address, Wei>()
private val directReader = EthereumDirectReader(up, caches, balanceCache, callMethodsFactory)
private val directReader = EthereumDirectReader(up, caches, balanceCache, callMethodsFactory, tracer)

val extractBlock = Function<BlockContainer, BlockJson<TransactionRefJson>> { block ->
val existing = block.getParsed(BlockJson::class.java)
Expand Down Expand Up @@ -86,16 +95,17 @@ open class EthereumCachingReader(
private val idToTxHash = Function<TxId, TransactionId> { id -> TransactionId.from(id.value) }

private val blocksByIdAsCont = CompoundReader(
caches.getBlocksByHash(),
RekeyingReader(idToBlockHash, directReader.blockReader)
SpannedReader(caches.getBlocksByHash(), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER)
)

private val heightByHash = HeightByHashAdding(caches, blocksByIdAsCont)
private val heightByHash =
SpannedReader(HeightByHashAdding(caches, blocksByIdAsCont), tracer, CACHE_HEIGHT_BY_HASH_READER)

fun blocksByHashAsCont(): Reader<BlockHash, BlockContainer> {
return CompoundReader(
RekeyingReader(blockHashToId, caches.getBlocksByHash()),
directReader.blockReader
SpannedReader(RekeyingReader(blockHashToId, caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(directReader.blockReader, tracer, DIRECT_QUORUM_RPC_READER)
)
}

Expand All @@ -119,8 +129,8 @@ open class EthereumCachingReader(

open fun blocksByHeightAsCont(): Reader<Long, BlockContainer> {
return CompoundReader(
caches.getBlocksByHeight(),
directReader.blockByHeightReader
SpannedReader(caches.getBlocksByHeight(), tracer, CACHE_BLOCK_BY_HEIGHT_READER),
SpannedReader(directReader.blockByHeightReader, tracer, DIRECT_QUORUM_RPC_READER)
)
}

Expand All @@ -143,8 +153,8 @@ open class EthereumCachingReader(

open fun txByHashAsCont(): Reader<TxId, TxContainer> {
return CompoundReader(
caches.getTxByHash(),
RekeyingReader(idToTxHash, directReader.txReader)
SpannedReader(caches.getTxByHash(), tracer, CACHE_TX_BY_HASH_READER),
SpannedReader(RekeyingReader(idToTxHash, directReader.txReader), tracer, DIRECT_QUORUM_RPC_READER)
)
}

Expand All @@ -161,8 +171,8 @@ open class EthereumCachingReader(
directReader.receiptReader
)
return CompoundReader(
caches.getReceipts(),
requested
SpannedReader(caches.getReceipts(), tracer, CACHE_RECEIPTS_READER),
SpannedReader(requested, tracer, DIRECT_QUORUM_RPC_READER)
)
}

Expand Down
Loading

0 comments on commit 01ea756

Please sign in to comment.