Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 2 additions & 134 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

import base64
import struct
from collections import OrderedDict
from math import inf
from typing import TYPE_CHECKING, Any, Callable, Generator, Iterator, Optional
from weakref import WeakSet

from structlog import get_logger
from twisted.internet.defer import Deferred, inlineCallbacks
from twisted.internet.interfaces import IConsumer, IDelayedCall, IPushProducer
from zope.interface import implementer
from twisted.internet.interfaces import IDelayedCall

from hathor.conf.get_settings import get_settings
from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload
Expand All @@ -32,7 +30,6 @@
from hathor.transaction.base_transaction import tx_or_block_from_bytes
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.util import Reactor, json_dumps, json_loads
from hathor.utils.zope import asserted_cast

logger = get_logger()

Expand All @@ -52,126 +49,6 @@ def _get_deps(tx: BaseTransaction) -> Iterator[bytes]:
yield txin.tx_id


@implementer(IPushProducer)
class SendDataPush:
""" Prioritize blocks over transactions when pushing data to peers.
"""
def __init__(self, node_sync: 'NodeSyncTimestamp'):
self.node_sync = node_sync
self.protocol: 'HathorProtocol' = node_sync.protocol
assert self.protocol.transport is not None
consumer = asserted_cast(IConsumer, self.protocol.transport)
self.consumer = consumer
self.is_running: bool = False
self.is_producing: bool = False

self.queue: OrderedDict[bytes, tuple[BaseTransaction, list[bytes]]] = OrderedDict()
self.priority_queue: OrderedDict[bytes, tuple[BaseTransaction, list[bytes]]] = OrderedDict()

self.delayed_call: Optional[IDelayedCall] = None

def start(self) -> None:
""" Start pushing data.
"""
if self.is_running:
raise Exception('SendDataPush is already started.')
self.is_running = True
self.consumer.registerProducer(self, True)
self.resumeProducing()

def stop(self) -> None:
""" Stop pushing data.
"""
if not self.is_running:
raise Exception('SendDataPush is already stopped.')
self.is_running = False
self.pauseProducing()
self.consumer.unregisterProducer()

def schedule_if_needed(self) -> None:
""" Schedule `send_next` if needed.
"""
if not self.is_running:
return

if not self.is_producing:
return

if self.delayed_call and self.delayed_call.active():
return

if len(self.queue) > 0 or len(self.priority_queue) > 0:
self.delayed_call = self.node_sync.reactor.callLater(0, self.send_next)

def add(self, tx: BaseTransaction) -> None:
""" Add a new block/transaction to be pushed.
"""
assert tx.hash is not None
if tx.is_block:
self.add_to_priority(tx)
else:
deps = list(_get_deps(tx))
self.queue[tx.hash] = (tx, deps)
self.schedule_if_needed()

def add_to_priority(self, tx: BaseTransaction) -> None:
""" Add a new block/transaction to be pushed with priority.
"""
assert tx.hash is not None
assert tx.hash not in self.queue
if tx.hash in self.priority_queue:
return
deps = list(_get_deps(tx))
for h in deps:
if h in self.queue:
tx2, _ = self.queue.pop(h)
self.add_to_priority(tx2)
self.priority_queue[tx.hash] = (tx, deps)
self.schedule_if_needed()

def send_next(self) -> None:
""" Push next block/transaction to peer.
"""
assert self.is_running
assert self.is_producing

if len(self.priority_queue) > 0:
# Send blocks first.
_, (tx, _) = self.priority_queue.popitem(last=False)

elif len(self.queue) > 0:
# Otherwise, send in order.
_, (tx, _) = self.queue.popitem(last=False)

else:
# Nothing to send.
self.delayed_call = None
return

self.node_sync.send_data(tx)
self.schedule_if_needed()

def resumeProducing(self) -> None:
""" This method is automatically called to resume pushing data.
"""
self.is_producing = True
self.schedule_if_needed()

def pauseProducing(self) -> None:
""" This method is automatically called to pause pushing data.
"""
self.is_producing = False
if self.delayed_call and self.delayed_call.active():
self.delayed_call.cancel()

def stopProducing(self) -> None:
""" This method is automatically called to stop pushing data.
"""
self.pauseProducing()
self.queue.clear()
self.priority_queue.clear()


class NodeSyncTimestamp(SyncAgent):
""" An algorithm to sync the DAG between two peers using the timestamp of the transactions.

Expand Down Expand Up @@ -218,8 +95,6 @@ def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor:
# This number may decrease if a new transaction/block arrives in a timestamp smaller than it.
self.synced_timestamp: int = 0

self.send_data_queue: SendDataPush = SendDataPush(self)

# Latest data timestamp of the peer.
self.previous_timestamp: int = 0

Expand Down Expand Up @@ -274,8 +149,6 @@ def start(self) -> None:
if self._started:
raise Exception('NodeSyncTimestamp is already running')
self._started = True
if self.send_data_queue:
self.send_data_queue.start()
self.next_step()

def stop(self) -> None:
Expand All @@ -284,8 +157,6 @@ def stop(self) -> None:
if not self._started:
raise Exception('NodeSyncTimestamp is already stopped')
self._started = False
if self.send_data_queue and self.send_data_queue.is_running:
self.send_data_queue.stop()
if self.call_later_id and self.call_later_id.active():
self.call_later_id.cancel()
for call_later in self._send_tips_call_later:
Expand Down Expand Up @@ -330,10 +201,7 @@ def send_tx_to_peer_if_possible(self, tx: BaseTransaction) -> None:
if parent.timestamp > self.synced_timestamp:
return

if self.send_data_queue:
self.send_data_queue.add(tx)
else:
self.send_data(tx)
self.send_data(tx)

def get_peer_next(self, timestamp: Optional[int] = None, offset: int = 0) -> Deferred[NextPayload]:
""" A helper that returns a deferred that is called when the peer replies.
Expand Down