Skip to content

Commit d551110

Browse files
committed
handle when a peer is removed from config, so the known_peers dict gets updated accordingly
1 parent e57273f commit d551110

File tree

2 files changed

+53
-13
lines changed

2 files changed

+53
-13
lines changed

exo/networking/manual/manual_discovery.py

+46-13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import os
12
import asyncio
23
from exo.networking.discovery import Discovery
3-
from typing import Dict, List, Callable
4+
from typing import Dict, List, Callable, Optional
45

56
from exo.topology.device_capabilities import DeviceCapabilities
67
from exo.networking.manual.network_topology_config import NetworkTopology, PeerConfig
@@ -19,14 +20,19 @@ def __init__(
1920
self.node_id = node_id
2021
self.create_peer_handle = create_peer_handle
2122
self.listen_task = None
23+
self.cleanup_task = None
2224
self.known_peers: Dict[str, PeerHandle] = {}
2325

26+
self._cached_peers: Dict[str, PeerConfig] = {}
27+
self._last_modified_time: Optional[float] = None
28+
2429
async def start(self) -> None:
2530
self.listen_task = asyncio.create_task(self.task_find_peers_from_config())
31+
self.cleanup_task = asyncio.create_task(self.task_clean_up_peers_from_config())
2632

2733
async def stop(self) -> None:
28-
if self.listen_task:
29-
self.listen_task.cancel()
34+
if self.listen_task: self.listen_task.cancel()
35+
if self.cleanup_task: self.cleanup_task.cancel()
3036

3137
async def discover_peers(self, wait_for_peers: int = 0) -> List[PeerHandle]:
3238
if wait_for_peers > 0:
@@ -36,6 +42,19 @@ async def discover_peers(self, wait_for_peers: int = 0) -> List[PeerHandle]:
3642
if DEBUG_DISCOVERY >= 2: print(f"Discovered peers: {[peer.id() for peer in self.known_peers.values()]}")
3743
return list(self.known_peers.values())
3844

45+
async def task_clean_up_peers_from_config(self):
46+
if DEBUG_DISCOVERY >= 2: print("Starting task to clean up peers from config...")
47+
while True:
48+
peers_from_config = self._get_peers().items()
49+
if peers_from_config:
50+
peers_to_remove = [peer for peer in self.known_peers.keys() if peer not in peers_from_config]
51+
52+
for peer in peers_to_remove:
53+
if DEBUG_DISCOVERY >= 2: print(f"{peer} is no longer found in the config but is currently a known peer. Removing from known peers...")
54+
try: del self.known_peers[peer]
55+
except KeyError: pass
56+
57+
await asyncio.sleep(5.0)
3958

4059
async def task_find_peers_from_config(self):
4160
if DEBUG_DISCOVERY >= 2: print("Starting task to find peers from config...")
@@ -56,23 +75,37 @@ async def task_find_peers_from_config(self):
5675
try: del self.known_peers[peer_id]
5776
except KeyError: pass
5877
except Exception as e:
59-
if DEBUG_DISCOVERY >= 2: print(f"Exception occured when attempting to add {peer_id=}: {e}")
78+
if DEBUG_DISCOVERY >= 2: print(f"Exception occured when attempting to add {peer_id=}: {e}")
6079
await asyncio.sleep(1.0)
6180

6281
if DEBUG_DISCOVERY >= 2: print(f"Current known peers: {[peer.id() for peer in self.known_peers.values()]}")
6382

6483
def _get_peers(self):
6584
try:
66-
topology = NetworkTopology.from_path(self.network_config_path)
85+
current_mtime = os.path.getmtime(self.network_config_path)
6786

68-
if self.node_id not in topology.peers:
69-
raise ValueError(f"Node ID {self.node_id} not found in network config file {self.network_config_path}. Please run with `node_id` set to one of the keys in the config file: {[k for k, _ in topology.peers]}")
87+
if self._cached_peers is not None and self._last_modified_time is not None and current_mtime <= self._last_modified_time:
88+
return self._cached_peers
7089

71-
peers_in_network: Dict[str, PeerConfig] = topology.peers
72-
peers_in_network.pop(self.node_id)
73-
except Exception as e:
74-
if DEBUG_DISCOVERY >= 2: print(f"Error when loading network config file from {self.network_config_path}. Please update the config file in order to successfully discover peers. Exception: {e}")
75-
peers_in_network = {}
90+
topology = NetworkTopology.from_path(self.network_config_path)
7691

77-
return peers_in_network
92+
if self.node_id not in topology.peers:
93+
raise ValueError(
94+
f"Node ID {self.node_id} not found in network config file "
95+
f"{self.network_config_path}. Please run with `node_id` set to "
96+
f"one of the keys in the config file: {[k for k, _ in topology.peers]}"
97+
)
7898

99+
peers_in_network: Dict[str, PeerConfig] = topology.peers
100+
peers_in_network.pop(self.node_id)
101+
102+
self._cached_peers = peers_in_network
103+
self._last_modified_time = current_mtime
104+
105+
return peers_in_network
106+
107+
except Exception as e:
108+
if DEBUG_DISCOVERY >= 2: print(f"Error when loading network config file from {self.network_config_path}. Please update the config file in order to successfully discover peers. Exception: {e}")
109+
self._cached_peers = {}
110+
self._last_modified_time = None
111+
return {}

exo/networking/manual/test_manual_discovery.py

+7
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ async def test_dynamic_config_update(self):
145145
with open(root_path, "w") as f:
146146
json.dump(original_config, f, indent=2)
147147

148+
# Wait for the config to be reloaded again
149+
await asyncio.sleep(5.5)
150+
151+
updated_peers = await self.discovery1.discover_peers(wait_for_peers=1)
152+
self.assertEqual(len(updated_peers), 1)
153+
154+
148155

149156
if __name__ == "__main__":
150157
asyncio.run(unittest.main())

0 commit comments

Comments
 (0)