22import time
33from contextlib import contextmanager
44from datetime import timedelta
5- from typing import ContextManager , Generator , Literal , Optional , Union
5+ from functools import wraps
6+ from typing import (
7+ Callable ,
8+ Generator ,
9+ Literal ,
10+ Optional ,
11+ TypeVar ,
12+ Union ,
13+ )
614
715import httpx
816from pydantic import BaseModel , Field , TypeAdapter , ValidationError
9- from typing_extensions import Annotated
17+ from typing_extensions import Annotated , ParamSpec
1018
1119from fastapi_cloud_cli import __version__
1220from fastapi_cloud_cli .config import Settings
@@ -22,6 +30,10 @@ class BuildLogError(Exception):
2230 pass
2331
2432
33+ class TooManyRetriesError (Exception ):
34+ pass
35+
36+
2537class BuildLogLineGeneric (BaseModel ):
2638 type : Literal ["complete" , "failed" , "timeout" , "heartbeat" ]
2739 id : Optional [str ] = None
@@ -81,18 +93,39 @@ def _backoff() -> None:
8193 ) from error
8294
8395
96+ P = ParamSpec ("P" )
97+ T = TypeVar ("T" )
98+
99+
84100def attempts (
85101 total_attempts : int = 3 , timeout : timedelta = timedelta (minutes = 5 )
86- ) -> Generator [ContextManager [None ], None , None ]:
87- start = time .monotonic ()
102+ ) -> Callable [
103+ [Callable [P , Generator [T , None , None ]]], Callable [P , Generator [T , None , None ]]
104+ ]:
105+ def decorator (
106+ func : Callable [P , Generator [T , None , None ]],
107+ ) -> Callable [P , Generator [T , None , None ]]:
108+ @wraps (func )
109+ def wrapper (* args : P .args , ** kwargs : P .kwargs ) -> Generator [T , None , None ]:
110+ start = time .monotonic ()
88111
89- for attempt_number in range (total_attempts ):
90- if time .monotonic () - start > timeout .total_seconds ():
91- raise TimeoutError (
92- "Build log streaming timed out after %ds" , timeout .total_seconds ()
93- )
112+ for attempt_number in range (total_attempts ):
113+ if time .monotonic () - start > timeout .total_seconds ():
114+ raise TimeoutError (
115+ "Build log streaming timed out after %ds" ,
116+ timeout .total_seconds (),
117+ )
118+
119+ with attempt (attempt_number ):
120+ yield from func (* args , ** kwargs )
121+ # If we get here without exception, the generator completed successfully
122+ return
123+
124+ raise TooManyRetriesError (f"Failed after { total_attempts } attempts" )
94125
95- yield attempt (attempt_number )
126+ return wrapper
127+
128+ return decorator
96129
97130
98131class APIClient (httpx .Client ):
@@ -110,54 +143,47 @@ def __init__(self) -> None:
110143 },
111144 )
112145
146+ @attempts (BUILD_LOG_MAX_RETRIES , BUILD_LOG_TIMEOUT )
113147 def stream_build_logs (
114148 self , deployment_id : str
115149 ) -> Generator [BuildLogLine , None , None ]:
116150 last_id = None
117151
118- for attempt in attempts (BUILD_LOG_MAX_RETRIES , BUILD_LOG_TIMEOUT ):
119- with attempt :
120- while True :
121- params = {"last_id" : last_id } if last_id else None
122-
123- with self .stream (
124- "GET" ,
125- f"/deployments/{ deployment_id } /build-logs" ,
126- timeout = 60 ,
127- params = params ,
128- ) as response :
129- response .raise_for_status ()
130-
131- for line in response .iter_lines ():
132- if not line or not line .strip ():
133- continue
152+ while True :
153+ params = {"last_id" : last_id } if last_id else None
134154
135- if log_line := self ._parse_log_line (line ):
136- if log_line .id :
137- last_id = log_line .id
155+ with self .stream (
156+ "GET" ,
157+ f"/deployments/{ deployment_id } /build-logs" ,
158+ timeout = 60 ,
159+ params = params ,
160+ ) as response :
161+ response .raise_for_status ()
138162
139- if log_line .type == "message" :
140- yield log_line
163+ for line in response .iter_lines ():
164+ if not line or not line .strip ():
165+ continue
141166
142- if log_line .type in ("complete" , "failed" ):
143- yield log_line
167+ if log_line := self ._parse_log_line (line ):
168+ if log_line .id :
169+ last_id = log_line .id
144170
145- return
171+ if log_line .type == "message" :
172+ yield log_line
146173
147- if log_line .type == "timeout" :
148- logger . debug ( "Received timeout; reconnecting" )
149- break # Breaks for loop to reconnect
174+ if log_line .type in ( "complete" , "failed" ) :
175+ yield log_line
176+ return
150177
151- else : # Only triggered if the for loop is not broken
152- logger .debug (
153- "Connection closed by server unexpectedly; attempting to reconnect"
154- )
155- break
178+ if log_line . type == "timeout" :
179+ logger .debug ("Received timeout; reconnecting" )
180+ break # Breaks for loop to reconnect
181+ else :
182+ logger . debug ( "Connection closed by server unexpectedly; will retry" )
156183
157- time . sleep ( 0.5 )
184+ raise httpx . NetworkError ( "Connection closed without terminal state" )
158185
159- # Exhausted retries without getting any response
160- raise BuildLogError (f"Failed after { BUILD_LOG_MAX_RETRIES } attempts" )
186+ time .sleep (0.5 )
161187
162188 def _parse_log_line (self , line : str ) -> Optional [BuildLogLine ]:
163189 try :
0 commit comments