Skip to content

Commit 5e0db20

Browse files
authored
Merge pull request #278 from exo-explore/peer_prio
add a priority to broadcast messages
2 parents 2654f29 + b611d0a commit 5e0db20

File tree

1 file changed

+21
-14
lines changed

1 file changed

+21
-14
lines changed

exo/networking/udp/udp_discovery.py

+21-14
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __init__(
5353
self.broadcast_interval = broadcast_interval
5454
self.discovery_timeout = discovery_timeout
5555
self.device_capabilities = device_capabilities
56-
self.known_peers: Dict[str, Tuple[PeerHandle, float, float]] = {}
56+
self.known_peers: Dict[str, Tuple[PeerHandle, float, float, int]] = {}
5757
self.broadcast_task = None
5858
self.listen_task = None
5959
self.cleanup_task = None
@@ -76,24 +76,25 @@ async def discover_peers(self, wait_for_peers: int = 0) -> List[PeerHandle]:
7676
while len(self.known_peers) < wait_for_peers:
7777
if DEBUG_DISCOVERY >= 2: print(f"Current peers: {len(self.known_peers)}/{wait_for_peers}. Waiting for more peers...")
7878
await asyncio.sleep(0.1)
79-
return [peer_handle for peer_handle, _, _ in self.known_peers.values()]
79+
return [peer_handle for peer_handle, _, _, _ in self.known_peers.values()]
8080

8181
async def task_broadcast_presence(self):
82-
message = json.dumps({
83-
"type": "discovery",
84-
"node_id": self.node_id,
85-
"grpc_port": self.node_port,
86-
"device_capabilities": self.device_capabilities.to_dict(),
87-
})
88-
8982
if DEBUG_DISCOVERY >= 2:
9083
print("Starting task_broadcast_presence...")
91-
print(f"\nBroadcast message: {message}")
9284

9385
while True:
9486
# Explicitly broadcasting on all assigned ips since broadcasting on `0.0.0.0` on MacOS does not broadcast over
9587
# the Thunderbolt bridge when other connection modalities exist such as WiFi or Ethernet
9688
for addr in get_all_ip_addresses():
89+
message = json.dumps({
90+
"type": "discovery",
91+
"node_id": self.node_id,
92+
"grpc_port": self.node_port,
93+
"device_capabilities": self.device_capabilities.to_dict(),
94+
"priority": 1, # For now, every interface has the same priority. We can make this better by prioriting interfaces based on bandwidth, latency, and jitter e.g. prioritise Thunderbolt over WiFi.
95+
})
96+
if DEBUG_DISCOVERY >= 3: print(f"Broadcasting presence at ({addr}): {message}")
97+
9798
transport = None
9899
try:
99100
transport, _ = await asyncio.get_event_loop().create_datagram_endpoint(
@@ -138,21 +139,27 @@ async def on_listen_message(self, data, addr):
138139
peer_id = message["node_id"]
139140
peer_host = addr[0]
140141
peer_port = message["grpc_port"]
142+
peer_prio = message["priority"]
141143
device_capabilities = DeviceCapabilities(**message["device_capabilities"])
142144

143145
if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
146+
if peer_id in self.known_peers:
147+
existing_peer_prio = self.known_peers[peer_id][3]
148+
if existing_peer_prio >= peer_prio:
149+
if DEBUG >= 1: print(f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
150+
return
144151
new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", device_capabilities)
145152
if not await new_peer_handle.health_check():
146153
if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
147154
return
148155
if DEBUG >= 1: print(f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
149-
self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time())
156+
self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time(), peer_prio)
150157
else:
151158
if not await self.known_peers[peer_id][0].health_check():
152159
if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Removing.")
153160
if peer_id in self.known_peers: del self.known_peers[peer_id]
154161
return
155-
self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time())
162+
self.known_peers[peer_id] = (self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time(), peer_prio)
156163

157164
async def task_listen_for_peers(self):
158165
await asyncio.get_event_loop().create_datagram_endpoint(lambda: ListenProtocol(self.on_listen_message),
@@ -164,13 +171,13 @@ async def task_cleanup_peers(self):
164171
try:
165172
current_time = time.time()
166173
peers_to_remove = []
167-
for peer_id, (peer_handle, connected_at, last_seen) in self.known_peers.items():
174+
for peer_id, (peer_handle, connected_at, last_seen, prio) in self.known_peers.items():
168175
if (not await peer_handle.is_connected() and current_time - connected_at > self.discovery_timeout) or \
169176
(current_time - last_seen > self.discovery_timeout) or \
170177
(not await peer_handle.health_check()):
171178
peers_to_remove.append(peer_id)
172179

173-
if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}" for peer_handle, connected_at, last_seen in self.known_peers.values()})
180+
if DEBUG_DISCOVERY >= 2: print("Peer statuses:", {peer_handle.id(): f"is_connected={await peer_handle.is_connected()}, health_check={await peer_handle.health_check()}, {connected_at=}, {last_seen=}, {prio=}" for peer_handle, connected_at, last_seen, prio in self.known_peers.values()})
174181

175182
for peer_id in peers_to_remove:
176183
if peer_id in self.known_peers: del self.known_peers[peer_id]

0 commit comments

Comments
 (0)