From 61327ba949e8069b44a2b3be4f99b9a3038684ca Mon Sep 17 00:00:00 2001 From: Jan Segre Date: Fri, 8 Dec 2023 23:20:15 +0100 Subject: [PATCH] feat(sync-v1): remove SendDataPush queue --- hathor/p2p/sync_v1/agent.py | 136 +----------------------------------- 1 file changed, 2 insertions(+), 134 deletions(-) diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index 8a53fd962..f9757f7e7 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -14,15 +14,13 @@ import base64 import struct -from collections import OrderedDict from math import inf from typing import TYPE_CHECKING, Any, Callable, Generator, Iterator, Optional from weakref import WeakSet from structlog import get_logger from twisted.internet.defer import Deferred, inlineCallbacks -from twisted.internet.interfaces import IConsumer, IDelayedCall, IPushProducer -from zope.interface import implementer +from twisted.internet.interfaces import IDelayedCall from hathor.conf.get_settings import get_settings from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload @@ -32,7 +30,6 @@ from hathor.transaction.base_transaction import tx_or_block_from_bytes from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.util import Reactor, json_dumps, json_loads -from hathor.utils.zope import asserted_cast logger = get_logger() @@ -52,126 +49,6 @@ def _get_deps(tx: BaseTransaction) -> Iterator[bytes]: yield txin.tx_id -@implementer(IPushProducer) -class SendDataPush: - """ Prioritize blocks over transactions when pushing data to peers. - """ - def __init__(self, node_sync: 'NodeSyncTimestamp'): - self.node_sync = node_sync - self.protocol: 'HathorProtocol' = node_sync.protocol - assert self.protocol.transport is not None - consumer = asserted_cast(IConsumer, self.protocol.transport) - self.consumer = consumer - self.is_running: bool = False - self.is_producing: bool = False - - self.queue: OrderedDict[bytes, tuple[BaseTransaction, list[bytes]]] = OrderedDict() - self.priority_queue: OrderedDict[bytes, tuple[BaseTransaction, list[bytes]]] = OrderedDict() - - self.delayed_call: Optional[IDelayedCall] = None - - def start(self) -> None: - """ Start pushing data. - """ - if self.is_running: - raise Exception('SendDataPush is already started.') - self.is_running = True - self.consumer.registerProducer(self, True) - self.resumeProducing() - - def stop(self) -> None: - """ Stop pushing data. - """ - if not self.is_running: - raise Exception('SendDataPush is already stopped.') - self.is_running = False - self.pauseProducing() - self.consumer.unregisterProducer() - - def schedule_if_needed(self) -> None: - """ Schedule `send_next` if needed. - """ - if not self.is_running: - return - - if not self.is_producing: - return - - if self.delayed_call and self.delayed_call.active(): - return - - if len(self.queue) > 0 or len(self.priority_queue) > 0: - self.delayed_call = self.node_sync.reactor.callLater(0, self.send_next) - - def add(self, tx: BaseTransaction) -> None: - """ Add a new block/transaction to be pushed. - """ - assert tx.hash is not None - if tx.is_block: - self.add_to_priority(tx) - else: - deps = list(_get_deps(tx)) - self.queue[tx.hash] = (tx, deps) - self.schedule_if_needed() - - def add_to_priority(self, tx: BaseTransaction) -> None: - """ Add a new block/transaction to be pushed with priority. - """ - assert tx.hash is not None - assert tx.hash not in self.queue - if tx.hash in self.priority_queue: - return - deps = list(_get_deps(tx)) - for h in deps: - if h in self.queue: - tx2, _ = self.queue.pop(h) - self.add_to_priority(tx2) - self.priority_queue[tx.hash] = (tx, deps) - self.schedule_if_needed() - - def send_next(self) -> None: - """ Push next block/transaction to peer. - """ - assert self.is_running - assert self.is_producing - - if len(self.priority_queue) > 0: - # Send blocks first. - _, (tx, _) = self.priority_queue.popitem(last=False) - - elif len(self.queue) > 0: - # Otherwise, send in order. - _, (tx, _) = self.queue.popitem(last=False) - - else: - # Nothing to send. - self.delayed_call = None - return - - self.node_sync.send_data(tx) - self.schedule_if_needed() - - def resumeProducing(self) -> None: - """ This method is automatically called to resume pushing data. - """ - self.is_producing = True - self.schedule_if_needed() - - def pauseProducing(self) -> None: - """ This method is automatically called to pause pushing data. - """ - self.is_producing = False - if self.delayed_call and self.delayed_call.active(): - self.delayed_call.cancel() - - def stopProducing(self) -> None: - """ This method is automatically called to stop pushing data. - """ - self.pauseProducing() - self.queue.clear() - self.priority_queue.clear() - - class NodeSyncTimestamp(SyncAgent): """ An algorithm to sync the DAG between two peers using the timestamp of the transactions. @@ -218,8 +95,6 @@ def __init__(self, protocol: 'HathorProtocol', downloader: Downloader, reactor: # This number may decrease if a new transaction/block arrives in a timestamp smaller than it. self.synced_timestamp: int = 0 - self.send_data_queue: SendDataPush = SendDataPush(self) - # Latest data timestamp of the peer. self.previous_timestamp: int = 0 @@ -274,8 +149,6 @@ def start(self) -> None: if self._started: raise Exception('NodeSyncTimestamp is already running') self._started = True - if self.send_data_queue: - self.send_data_queue.start() self.next_step() def stop(self) -> None: @@ -284,8 +157,6 @@ def stop(self) -> None: if not self._started: raise Exception('NodeSyncTimestamp is already stopped') self._started = False - if self.send_data_queue and self.send_data_queue.is_running: - self.send_data_queue.stop() if self.call_later_id and self.call_later_id.active(): self.call_later_id.cancel() for call_later in self._send_tips_call_later: @@ -330,10 +201,7 @@ def send_tx_to_peer_if_possible(self, tx: BaseTransaction) -> None: if parent.timestamp > self.synced_timestamp: return - if self.send_data_queue: - self.send_data_queue.add(tx) - else: - self.send_data(tx) + self.send_data(tx) def get_peer_next(self, timestamp: Optional[int] = None, offset: int = 0) -> Deferred[NextPayload]: """ A helper that returns a deferred that is called when the peer replies.