Skip to content

Commit

Permalink
solution: newPendingTransactions subscription
Browse files Browse the repository at this point in the history
rel: #179
  • Loading branch information
splix committed Oct 11, 2022
1 parent 1498876 commit fa2fc3e
Show file tree
Hide file tree
Showing 53 changed files with 1,983 additions and 278 deletions.
3 changes: 3 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Global.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import io.emeraldpay.dshackle.upstream.bitcoin.data.EsploraUnspent
import io.emeraldpay.dshackle.upstream.bitcoin.data.EsploraUnspentDeserializer
import io.emeraldpay.dshackle.upstream.bitcoin.data.RpcUnspent
import io.emeraldpay.dshackle.upstream.bitcoin.data.RpcUnspentDeserializer
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.TransactionIdSerializer
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.etherjar.domain.TransactionId
import io.emeraldpay.grpc.Chain
import java.text.SimpleDateFormat
import java.util.Locale
Expand Down Expand Up @@ -79,6 +81,7 @@ class Global {
private fun createObjectMapper(): ObjectMapper {
val module = SimpleModule("EmeraldDshackle", Version(1, 0, 0, null, null, null))
module.addSerializer(JsonRpcResponse::class.java, JsonRpcResponse.ResponseJsonSerializer())
module.addSerializer(TransactionId::class.java, TransactionIdSerializer())

module.addDeserializer(EsploraUnspent::class.java, EsploraUnspentDeserializer())
module.addDeserializer(RpcUnspent::class.java, RpcUnspentDeserializer())
Expand Down
102 changes: 102 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/DurableFlux.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.emeraldpay.dshackle.commons

import java.time.Duration
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.util.backoff.BackOff
import org.springframework.util.backoff.BackOffExecution
import org.springframework.util.backoff.ExponentialBackOff
import org.springframework.util.backoff.FixedBackOff
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

/**
* A flux holder that reconnects to it on failure taking into account a back off strategy
*/
class DurableFlux<T>(
private val provider: () -> Flux<T>,
private val errorBackOff: BackOff,
private val log: Logger,
) {

companion object {
private val defaultLog = LoggerFactory.getLogger(DurableFlux::class.java)

@JvmStatic
fun newBuilder(): Builder<*> {
return Builder<Any>()
}
}

private var messagesSinceStart = 0
private var errorBackOffExecution = errorBackOff.start()

fun connect(): Flux<T> {
return provider.invoke()
.doOnNext {
if (messagesSinceStart == 0) {
errorBackOffExecution = errorBackOff.start()
}
messagesSinceStart++
}
.doOnSubscribe {
messagesSinceStart = 0
}
.onErrorResume { t ->
val backoff = errorBackOffExecution.nextBackOff()
if (backoff != BackOffExecution.STOP) {
log.warn("Connection closed with ${t.message}. Reconnecting in ${backoff}ms")
connect().delaySubscription(Duration.ofMillis(backoff))
} else {
log.warn("Connection closed with ${t.message}. Not reconnecting")
Mono.error(t)
}
}
}

class Builder<T> {

private var provider: (() -> Flux<T>)? = null

protected var errorBackOff: BackOff = FixedBackOff(1_000, Long.MAX_VALUE)
protected var log: Logger = DurableFlux.defaultLog

@Suppress("UNCHECKED_CAST")
fun <X> using(provider: () -> Flux<X>): Builder<X> {
this.provider = provider as () -> Flux<T>
return this as Builder<X>
}

fun backoffOnError(time: Duration): Builder<T> {
errorBackOff = FixedBackOff(time.toMillis(), Long.MAX_VALUE)
return this
}

fun backoffOnError(time: Duration, multiplier: Double, max: Duration? = null): Builder<T> {
errorBackOff = ExponentialBackOff(time.toMillis(), multiplier).also {
if (max != null) {
it.maxInterval = max.toMillis()
}
}
return this
}

fun backoffOnError(backOff: BackOff): Builder<T> {
errorBackOff = backOff
return this
}

fun logTo(log: Logger): Builder<T> {
this.log = log
return this
}

fun build(): DurableFlux<T> {
if (provider == null) {
throw IllegalStateException("No provider for original Flux")
}
return DurableFlux(provider!!,errorBackOff, log)
}
}

}
153 changes: 153 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/ExpiringSet.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package io.emeraldpay.dshackle.commons

import java.time.Duration
import java.util.LinkedList
import java.util.TreeSet
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import org.apache.commons.collections4.iterators.UnmodifiableIterator
import org.slf4j.LoggerFactory

/**
* A naive implementation of a Set with a limit for elements and an expiration time. Supposed to be used a filter for uniqueness.
* Internally it uses a TreeSet and a journal of added elements, which is used ot remove elements when they expire or the list grows too large.
* It's tread safe, but may be suboptimal to use in multithreaded scenario because of internal locks.
*/
class ExpiringSet<T>(
ttl: Duration,
comparator: Comparator<T>,
val limit: Int,
): MutableSet<T> {

companion object {
private val log = LoggerFactory.getLogger(ExpiringSet::class.java)
}

private val tree = TreeSet<T>(comparator)
private val lock = ReentrantLock()
private val journal = LinkedList<JournalItem<T>>()
private var count = 0

private val ttl = ttl.toMillis()

data class JournalItem<T>(
val since: Long = System.currentTimeMillis(),
val value: T
) {
fun isExpired(ttl: Long): Boolean {
return System.currentTimeMillis() > since + ttl
}
}

override val size: Int
get() = count

override fun clear() {
lock.withLock {
tree.clear()
journal.clear()
count = 0
}
}

override fun addAll(elements: Collection<T>): Boolean {
var changed = false
elements.forEach {
changed = changed || add(it)
}
return changed
}

override fun add(element: T): Boolean {
lock.withLock {
val added = tree.add(element)
if (added) {
journal.offer(JournalItem(value = element))
count++
shrink()
}
return added
}
}

override fun isEmpty(): Boolean {
return count == 0
}

override fun iterator(): MutableIterator<T> {
// not mutable
return UnmodifiableIterator.unmodifiableIterator(tree.iterator())
}

override fun retainAll(elements: Collection<T>): Boolean {
lock.withLock {
var changed = false
val iter = tree.iterator()
while (iter.hasNext()) {
val next = iter.next()
if (!elements.contains(next)) {
changed = true
iter.remove()
count--
}
}
return changed
}
}

override fun removeAll(elements: Collection<T>): Boolean {
var changed = false
elements.forEach {
changed = changed || remove(it)
}
return changed
}

override fun remove(element: T): Boolean {
lock.withLock {
val changed = tree.remove(element)
if (changed) {
count--
}
return changed
}
}

override fun containsAll(elements: Collection<T>): Boolean {
return elements.all { contains(it) }
}

override fun contains(element: T): Boolean {
lock.withLock {
return tree.contains(element)
}
}

fun shrink() {
lock.withLock {
val iter = journal.iterator()
val removeAtLeast = (count - limit).coerceAtLeast(0)
var removed = 0
var stop = false
while (!stop && iter.hasNext()) {
val next = iter.next()
val overflow = removeAtLeast > removed
val expired = next.isExpired(ttl)
if (overflow || expired) {
iter.remove()
if (tree.remove(next.value)) {
removed++
}
}
// we always delete expired elements so don't stop on that
if (!expired) {
// but if we already deleted all non-expired element (i.e., started because it grew too large)
// then we stop as soon as we don't have any overflow
stop = !overflow
}
}
count -= removed
}
}

}
67 changes: 67 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/SharedFluxHolder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.emeraldpay.dshackle.commons

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux

/**
* A flux holder that that creates it only if requested. Keeps it for the following calls, so all the following calls will
* reuse it. Forgets as soon as it completes/cancelled, so it will be recreated again if needed.
*/
class SharedFluxHolder<T>(
/**
* Provider for the flux. Note that it can be called multiple times but only one is used at the same time.
* I.e., if there is a few calls because of a thread-race only one is kept.
* But once it's completed a new one may be created if requested.
*/
private val provider: () -> Flux<T>
) {

companion object {
private val log = LoggerFactory.getLogger(SharedFluxHolder::class.java)
}

private val ids = AtomicLong()
private val lock = ReentrantReadWriteLock()
private var current: Holder<T>? = null

fun get(): Flux<T> {
lock.read {
if (current != null) {
return current!!.flux
}
}
// The following doesn't consume resources because it's just create a Flux without actual subscription
// So even for the case of a thread race it's okay to create many. B/c only one is going to be kept as `current` and subscribed
val id = ids.incrementAndGet()
val created = Holder(
provider.invoke()
.share()
.doFinally { onClose(id) },
id
)
lock.write {
if (current != null) {
return current!!.flux
}
current = created
}
return created.flux
}

private fun onClose(id: Long) {
lock.write {
if (current?.id == id) {
current = null
}
}
}

data class Holder<T>(
val flux: Flux<T>,
val id: Long,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class WebsocketHandler(
): Flux<String> {
return requests.flatMap { call ->
val method = call.method

if (method == "eth_subscribe") {
val methodParams = splitMethodParams(call.params)
if (methodParams != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ open class NativeSubscribe(
open fun subscribe(chain: Chain, method: String, params: Any?): Flux<out Any> {
val up = multistreamHolder.getUpstream(chain) ?: return Flux.error(SilentException.UnsupportedBlockchain(chain))
return (up as EthereumMultistream)
.getSubscribe()
.getSubscribtionApi()
.subscribe(method, params)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ class TrackERC20Address(
val asset = request.asset.code.lowercase(Locale.getDefault())
val tokenDefinition = tokens[TokenId(chain, asset)] ?: return Flux.empty()
val logs = getUpstream(chain)
.getSubscribe().logs
.start(
.getSubscribtionApi().logs
.create(
listOf(tokenDefinition.token.contract),
listOf(EventId.fromSignature("Transfer", "address", "address", "uint256"))
)
.connect()

return ethereumAddresses.extract(request.address)
.map { TrackedAddress(chain, it, tokenDefinition.token, tokenDefinition.name) }
Expand Down
Loading

0 comments on commit fa2fc3e

Please sign in to comment.