Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ library.

```Python
class Node(BaseService):
async def _run(self):
async def do_run(self):
self.discovery = DiscoveryService(token=self.cancel_token)
self.run_daemon(self.discovery)
self.run_task(self.discovery.bootstrap())
Expand Down
4 changes: 2 additions & 2 deletions p2p/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ def __init__(self, proto: DiscoveryProtocol, peer_pool: BasePeerPool,
self.port = port
self._lookup_running = asyncio.Lock()

async def _run(self) -> None:
async def do_run(self) -> None:
await self._start_udp_listener()
connect_loop_sleep = 2
self.run_task(self.proto.bootstrap())
Expand Down Expand Up @@ -1004,7 +1004,7 @@ async def maybe_lookup_random_node(self) -> None:
finally:
self._last_lookup = time.time()

async def _cleanup(self) -> None:
async def do_cleanup(self) -> None:
await self.proto.stop()


Expand Down
4 changes: 2 additions & 2 deletions p2p/nat.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self, port: int, token: CancelToken = None) -> None:
self.port = port
self._mapping: PortMapping = None # when called externally, this never returns None

async def _run(self) -> None:
async def do_run(self) -> None:
"""Run an infinite loop refreshing our NAT port mapping.

On every iteration we configure the port mapping with a lifetime of 30 minutes and then
Expand All @@ -85,7 +85,7 @@ async def _run(self) -> None:
except Exception:
self.logger.exception("Failed to setup NAT portmap")

async def _cleanup(self) -> None:
async def do_cleanup(self) -> None:
pass

async def add_nat_portmap(self) -> str:
Expand Down
10 changes: 5 additions & 5 deletions p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(self, peer: 'BasePeer') -> None:
super().__init__(peer.cancel_token)
self.peer = peer

async def _run(self) -> None:
async def do_run(self) -> None:
pass


Expand Down Expand Up @@ -351,10 +351,10 @@ def close(self) -> None:
def is_closing(self) -> bool:
return self.writer.transport.is_closing()

async def _cleanup(self) -> None:
async def do_cleanup(self) -> None:
self.close()

async def _run(self) -> None:
async def do_run(self) -> None:
# The `boot` process is run in the background to allow the `run` loop
# to continue so that all of the Peer APIs can be used within the
# `boot` task.
Expand Down Expand Up @@ -860,7 +860,7 @@ def _add_peer(self,
for msg in msgs:
subscriber.add_msg(msg)

async def _run(self) -> None:
async def do_run(self) -> None:
# FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
# so in order to ensure we cancel all peers when we terminate.
if self.event_bus is not None:
Expand All @@ -875,7 +875,7 @@ async def stop_all_peers(self) -> None:
peer.disconnect(DisconnectReason.client_quitting) for peer in peers if peer.is_running
])

async def _cleanup(self) -> None:
async def do_cleanup(self) -> None:
await self.stop_all_peers()

async def connect(self, remote: Node) -> BasePeer:
Expand Down
22 changes: 11 additions & 11 deletions p2p/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ def get_event_loop(self) -> asyncio.AbstractEventLoop:
async def run(
self,
finished_callback: Optional[Callable[['BaseService'], None]] = None) -> None:
"""Await for the service's _run() coroutine.
"""Await for the service's do_run() coroutine.

Once _run() returns, triggers the cancel token, call cleanup() and
Once do_run() returns, triggers the cancel token, call cleanup() and
finished_callback (if one was passed).
"""
if self.is_running:
Expand All @@ -100,7 +100,7 @@ async def run(
try:
async with self._run_lock:
self.events.started.set()
await self._run()
await self.do_run()
except OperationCancelled as e:
self.logger.debug("%s finished: %s", self, e)
except Exception:
Expand Down Expand Up @@ -236,10 +236,10 @@ async def _run_in_executor(self, callback: Callable[..., Any], *args: Any) -> An

async def cleanup(self) -> None:
"""
Run the ``_cleanup()`` coroutine and set the ``cleaned_up`` event after the service as
Run the ``do_cleanup()`` coroutine and set the ``cleaned_up`` event after the service as
well as all child services finished their cleanup.

The ``_cleanup()`` coroutine is invoked before the child services may have finished
The ``do_cleanup()`` coroutine is invoked before the child services may have finished
their cleanup.
"""
if self._child_services:
Expand All @@ -253,7 +253,7 @@ async def cleanup(self) -> None:
await asyncio.gather(*self._tasks)
self.logger.debug("All tasks finished")

await self._cleanup()
await self.do_cleanup()
self.events.cleaned_up.set()

def cancel_nowait(self) -> None:
Expand Down Expand Up @@ -322,17 +322,17 @@ async def sleep(self, delay: float) -> None:
await self.wait(asyncio.sleep(delay))

@abstractmethod
async def _run(self) -> None:
async def do_run(self) -> None:
"""Run the service's loop.

Should return or raise OperationCancelled when the CancelToken is triggered.
"""
raise NotImplementedError()

async def _cleanup(self) -> None:
async def do_cleanup(self) -> None:
"""Clean up any resources held by this service.

Called after the service's _run() method returns.
Called after the service's do_run() method returns.
"""
pass

Expand All @@ -357,8 +357,8 @@ async def wrapped(service: BaseService, *args: Any, **kwargs: Any) -> Any:


class EmptyService(BaseService):
async def _run(self) -> None:
async def do_run(self) -> None:
pass

async def _cleanup(self) -> None:
async def do_cleanup(self) -> None:
pass
4 changes: 2 additions & 2 deletions tests/p2p/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ class ParentService(BaseService):
be triggered.
"""

async def _run(self):
async def do_run(self):
self.daemon = WaitService(token=self.cancel_token)
self.run_daemon(self.daemon)
await self.cancel_token.wait()


class WaitService(BaseService):

async def _run(self):
async def do_run(self):
await self.cancel_token.wait()


Expand Down
4 changes: 2 additions & 2 deletions tests/trinity/core/peer_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,5 @@ def __init__(self, peers) -> None:
for peer in peers:
self.connected_nodes[peer.remote] = peer

async def _run(self) -> None:
raise NotImplementedError("This is a mock PeerPool implementation, you must not _run() it")
async def do_run(self) -> None:
raise NotImplementedError("This is a mock PeerPool implementation, you must not run it")
28 changes: 14 additions & 14 deletions trinity/extensibility/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,18 @@ def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAct

def start(self) -> None:
"""
Delegate to :meth:`~trinity.extensibility.plugin.BasePlugin._start` and set ``running``
Delegate to :meth:`~trinity.extensibility.plugin.BasePlugin.do_start` and set ``running``
to ``True``. Broadcast a :class:`~trinity.extensibility.events.PluginStartedEvent` on the
:class:`~lahja.eventbus.EventBus` and hence allow other plugins to act accordingly.
"""
self.running = True
self._start()
self.do_start()
self.event_bus.broadcast(
PluginStartedEvent(type(self))
)
self.logger.info("Plugin started: %s", self.name)

def _start(self) -> None:
def do_start(self) -> None:
"""
Perform the actual plugin start routine. In the case of a `BaseIsolatedPlugin` this method
will be called in a separate process.
Expand All @@ -190,18 +190,18 @@ class BaseSyncStopPlugin(BasePlugin):
A :class:`~trinity.extensibility.plugin.BaseSyncStopPlugin` unwinds synchronoulsy, hence blocks
until the shutdown is done.
"""
def _stop(self) -> None:
def do_stop(self) -> None:
"""
Stop the plugin. Should be overwritten by subclasses.
"""
pass

def stop(self) -> None:
"""
Delegate to :meth:`~trinity.extensibility.plugin.BaseSyncStopPlugin._stop` causing the
Delegate to :meth:`~trinity.extensibility.plugin.BaseSyncStopPlugin.do_stop` causing the
plugin to stop and setting ``running`` to ``False``.
"""
self._stop()
self.do_stop()
self.running = False


Expand All @@ -211,18 +211,18 @@ class BaseAsyncStopPlugin(BasePlugin):
needs to be awaited.
"""

async def _stop(self) -> None:
async def do_stop(self) -> None:
"""
Asynchronously stop the plugin. Should be overwritten by subclasses.
"""
pass

async def stop(self) -> None:
"""
Delegate to :meth:`~trinity.extensibility.plugin.BaseAsyncStopPlugin._stop` causing the
Delegate to :meth:`~trinity.extensibility.plugin.BaseAsyncStopPlugin.do_stop` causing the
plugin to stop asynchronously and setting ``running`` to ``False``.
"""
await self._stop()
await self.do_stop()
self.running = False


Expand Down Expand Up @@ -250,7 +250,7 @@ class BaseIsolatedPlugin(BaseSyncStopPlugin):

def start(self) -> None:
"""
Prepare the plugin to get started and eventually call ``_start`` in a separate process.
Prepare the plugin to get started and eventually call ``do_start`` in a separate process.
"""
self.running = True
self._process = ctx.Process(
Expand All @@ -268,9 +268,9 @@ def _prepare_start(self) -> None:
self.event_bus.broadcast(
PluginStartedEvent(type(self))
)
self._start()
self.do_start()

def _stop(self) -> None:
def do_stop(self) -> None:
self.context.event_bus.stop()
kill_process_gracefully(self._process, self.logger)

Expand All @@ -290,7 +290,7 @@ def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAct
def handle_event(self, activation_event: BaseEvent) -> None:
self.logger.info("Debug plugin: handle_event called: %s", activation_event)

def _start(self) -> None:
def do_start(self) -> None:
self.logger.info("Debug plugin: start called")
asyncio.ensure_future(self.count_forever())

Expand All @@ -301,5 +301,5 @@ async def count_forever(self) -> None:
i += 1
await asyncio.sleep(1)

async def _stop(self) -> None:
async def do_stop(self) -> None:
self.logger.info("Debug plugin: stop called")
2 changes: 1 addition & 1 deletion trinity/nodes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def notify_resource_available(self) -> None:
BroadcastConfig(internal=True),
)

async def _run(self) -> None:
async def do_run(self) -> None:
await self.event_bus.wait_for_connection()
self.notify_resource_available()
self.run_daemon_task(self.handle_network_id_requests())
Expand Down
4 changes: 2 additions & 2 deletions trinity/nodes/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ def __init__(self, event_bus: Endpoint, trinity_config: TrinityConfig) -> None:
token=self.cancel_token,
)

async def _run(self) -> None:
async def do_run(self) -> None:
self.run_daemon(self._peer_chain)
await super()._run()
await super().do_run()

def get_chain(self) -> LightDispatchChain:
if self._chain is None:
Expand Down
2 changes: 1 addition & 1 deletion trinity/plugins/builtin/ethstats/ethstats_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
self.send_queue: asyncio.Queue[EthstatsMessage] = asyncio.Queue()
self.recv_queue: asyncio.Queue[EthstatsMessage] = asyncio.Queue()

async def _run(self) -> None:
async def do_run(self) -> None:
await self.wait_first(
self.send_handler(),
self.recv_handler(),
Expand Down
2 changes: 1 addition & 1 deletion trinity/plugins/builtin/ethstats/ethstats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(

self.chain = self.get_chain()

async def _run(self) -> None:
async def do_run(self) -> None:
while self.is_operational:
try:
self.logger.info('Connecting to %s...' % self.server_url)
Expand Down
2 changes: 1 addition & 1 deletion trinity/plugins/builtin/ethstats/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def ready(self) -> None:

self.start()

def _start(self) -> None:
def do_start(self) -> None:
service = EthstatsService(
self.context,
self.server_url,
Expand Down
2 changes: 1 addition & 1 deletion trinity/plugins/builtin/json_rpc/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAct
help="Disables the JSON-RPC Server",
)

def _start(self) -> None:
def do_start(self) -> None:
db_manager = create_db_manager(self.context.trinity_config.database_ipc_path)
db_manager.connect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(self,
self.chain = chain
self.event_bus = event_bus

async def _run(self) -> None:
async def do_run(self) -> None:
self.logger.info("Running LightPeerChainEventBusHandler")

self.run_daemon_task(self.handle_get_blockheader_by_hash_requests())
Expand Down
4 changes: 2 additions & 2 deletions trinity/plugins/builtin/light_peer_chain_bridge/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ def handle_event(self, event: ResourceAvailableEvent) -> None:
self.chain = event.resource
self.start()

def _start(self) -> None:
def do_start(self) -> None:
chain = cast(LightDispatchChain, self.chain)
self.handler = LightPeerChainEventBusHandler(chain._peer_chain, self.context.event_bus)
asyncio.ensure_future(self.handler.run())

async def _stop(self) -> None:
async def do_stop(self) -> None:
# This isn't really needed for the standard shutdown case as the LightPeerChain will
# automatically shutdown whenever the `CancelToken` it was chained with is triggered.
# It may still be useful to stop the LightPeerChain Bridge plugin individually though.
Expand Down
4 changes: 2 additions & 2 deletions trinity/plugins/builtin/tx_pool/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def handle_event(self, event: ResourceAvailableEvent) -> None:
if all((self.peer_pool is not None, self.chain is not None, self.is_enabled)):
self.start()

def _start(self) -> None:
def do_start(self) -> None:
if isinstance(self.chain, BaseMainnetChain):
validator = DefaultTransactionValidator(self.chain, BYZANTIUM_MAINNET_BLOCK)
elif isinstance(self.chain, BaseRopstenChain):
Expand All @@ -91,7 +91,7 @@ def _start(self) -> None:
self.tx_pool = TxPool(self.peer_pool, validator, self.cancel_token)
asyncio.ensure_future(self.tx_pool.run())

async def _stop(self) -> None:
async def do_stop(self) -> None:
# This isn't really needed for the standard shutdown case as the TxPool will automatically
# shutdown whenever the `CancelToken` it was chained with is triggered. It may still be
# useful to stop the TxPool plugin individually though.
Expand Down
Loading