@@ -31,10 +31,33 @@ async def worker(runtime: DistributedRuntime):
3131 client = await endpoint .client ()
3232 await client .wait_for_instances ()
3333
34- # Issue request and process the stream
35- stream = await client .generate ("world,sun,moon,star" )
36- async for response in stream :
37- print (response .data ())
34+ idx = 0
35+ base_delay = 0.1 # Start with 100ms
36+ max_delay = 5.0 # Max 5 seconds
37+ current_delay = base_delay
38+
39+ while True :
40+ try :
41+ # Issue request and process the stream
42+ idx += 1
43+ stream = await client .generate (f"Query[{ idx } ] Hello world" )
44+ async for response in stream :
45+ print (response .data ())
46+ # Reset backoff on successful iteration
47+ current_delay = base_delay
48+ # Sleep for 1 second
49+ await asyncio .sleep (1 )
50+ except asyncio .CancelledError :
51+ # Re-raise for graceful shutdown
52+ raise
53+ except Exception as e :
54+ # Log the exception with context
55+ print (f"Error in worker iteration { idx } : { type (e ).__name__ } : { e } " )
56+ # Perform exponential backoff
57+ print (f"Retrying after { current_delay :.2f} seconds..." )
58+ await asyncio .sleep (current_delay )
59+ # Double the delay for next time, up to max_delay
60+ current_delay = min (current_delay * 2 , max_delay )
3861
3962
4063if __name__ == "__main__" :
0 commit comments