Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ coverage[toml] >=6.2,<8.0
mypy ==1.14.1
ruff ==0.13.0
respx ==0.22.0
time-machine ==2.15.0
47 changes: 21 additions & 26 deletions src/fastapi_cloud_cli/commands/deploy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import contextlib
import json
import logging
import subprocess
import tempfile
import time
from enum import Enum
from itertools import cycle
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

import fastar
import rignore
Expand All @@ -20,7 +19,7 @@
from typing_extensions import Annotated

from fastapi_cloud_cli.commands.login import login
from fastapi_cloud_cli.utils.api import APIClient
from fastapi_cloud_cli.utils.api import APIClient, BuildLogError, TooManyRetriesError
from fastapi_cloud_cli.utils.apps import AppConfig, get_app_config, write_app_config
from fastapi_cloud_cli.utils.auth import is_logged_in
from fastapi_cloud_cli.utils.cli import get_rich_toolkit, handle_http_errors
Expand Down Expand Up @@ -239,21 +238,11 @@ def _get_apps(team_id: str) -> List[AppResponse]:
return [model_validate(AppResponse, app) for app in data]


def _stream_build_logs(deployment_id: str) -> Generator[str, None, None]:
with APIClient() as client:
with client.stream(
"GET", f"/deployments/{deployment_id}/build-logs", timeout=60
) as response:
response.raise_for_status()

yield from response.iter_lines()


WAITING_MESSAGES = [
"🚀 Preparing for liftoff! Almost there...",
"👹 Sneaking past the dependency gremlins... Don't wake them up!",
"🤏 Squishing code into a tiny digital sandwich. Nom nom nom.",
"📉 Server space running low. Time to delete those cat videos?",
"🐱 Removing cat videos from our servers to free up space.",
"🐢 Uploading at blazing speeds of 1 byte per hour. Patience, young padawan.",
"🔌 Connecting to server... Please stand by while we argue with the firewall.",
"💥 Oops! We've angered the Python God. Sacrificing a rubber duck to appease it.",
Expand Down Expand Up @@ -363,17 +352,15 @@ def _wait_for_deployment(

with toolkit.progress(
next(messages), inline_logs=True, lines_to_show=20
) as progress:
with handle_http_errors(progress=progress):
for line in _stream_build_logs(deployment.id):
) as progress, APIClient() as client:
try:
for log in client.stream_build_logs(deployment.id):
time_elapsed = time.monotonic() - started_at

data = json.loads(line)
if log.type == "message":
progress.log(Text.from_ansi(log.message.rstrip()))

if "message" in data:
progress.log(Text.from_ansi(data["message"].rstrip()))

if data.get("type") == "complete":
if log.type == "complete":
progress.log("")
progress.log(
f"🐔 Ready the chicken! Your app is ready at [link={deployment.url}]{deployment.url}[/link]"
Expand All @@ -387,20 +374,28 @@ def _wait_for_deployment(

break

if data.get("type") == "failed":
if log.type == "failed":
progress.log("")
progress.log(
f"😔 Oh no! Something went wrong. Check out the logs at [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]"
)
raise typer.Exit(1)

if time_elapsed > 30:
messages = cycle(LONG_WAIT_MESSAGES) # pragma: no cover
messages = cycle(LONG_WAIT_MESSAGES)

if (time.monotonic() - last_message_changed_at) > 2:
progress.title = next(messages) # pragma: no cover
progress.title = next(messages)

last_message_changed_at = time.monotonic()

last_message_changed_at = time.monotonic() # pragma: no cover
except (BuildLogError, TooManyRetriesError) as e:
logger.error("Build log streaming failed: %s", e)
toolkit.print_line()
toolkit.print(
f"⚠️ Unable to stream build logs. Check the dashboard for status: [link={deployment.dashboard_url}]{deployment.dashboard_url}[/link]"
)
raise typer.Exit(1) from e


class SignupToWaitingList(BaseModel):
Expand Down
174 changes: 174 additions & 0 deletions src/fastapi_cloud_cli/utils/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,133 @@
import json
import logging
import time
from contextlib import contextmanager
from datetime import timedelta
from functools import wraps
from typing import (
Callable,
Generator,
Literal,
Optional,
TypeVar,
Union,
)

import httpx
from pydantic import BaseModel, Field, ValidationError
from typing_extensions import Annotated, ParamSpec

from fastapi_cloud_cli import __version__
from fastapi_cloud_cli.config import Settings
from fastapi_cloud_cli.utils.auth import get_auth_token
from fastapi_cloud_cli.utils.pydantic_compat import TypeAdapter

logger = logging.getLogger(__name__)

BUILD_LOG_MAX_RETRIES = 3
BUILD_LOG_TIMEOUT = timedelta(minutes=5)


class BuildLogError(Exception):
pass


class TooManyRetriesError(Exception):
pass


class BuildLogLineGeneric(BaseModel):
type: Literal["complete", "failed", "timeout", "heartbeat"]
id: Optional[str] = None


class BuildLogLineMessage(BaseModel):
type: Literal["message"] = "message"
message: str
id: Optional[str] = None


BuildLogLine = Union[BuildLogLineMessage, BuildLogLineGeneric]
BuildLogAdapter = TypeAdapter[BuildLogLine](
Annotated[BuildLogLine, Field(discriminator="type")] # type: ignore
)


@contextmanager
def attempt(attempt_number: int) -> Generator[None, None, None]:
def _backoff() -> None:
backoff_seconds = min(2**attempt_number, 30)
logger.debug(
"Retrying in %ds (attempt %d)",
backoff_seconds,
attempt_number,
)
time.sleep(backoff_seconds)

try:
yield

except (
httpx.TimeoutException,
httpx.NetworkError,
httpx.RemoteProtocolError,
) as error:
logger.debug("Network error (will retry): %s", error)

_backoff()

except httpx.HTTPStatusError as error:
if error.response.status_code >= 500:
logger.debug(
"Server error %d (will retry): %s",
error.response.status_code,
error,
)
_backoff()
else:
# Try to get response text, but handle streaming responses gracefully
try:
error_detail = error.response.text
except Exception:
error_detail = "(response body unavailable)"
raise BuildLogError(
f"HTTP {error.response.status_code}: {error_detail}"
) from error


P = ParamSpec("P")
T = TypeVar("T")


def attempts(
total_attempts: int = 3, timeout: timedelta = timedelta(minutes=5)
) -> Callable[
[Callable[P, Generator[T, None, None]]], Callable[P, Generator[T, None, None]]
]:
def decorator(
func: Callable[P, Generator[T, None, None]],
) -> Callable[P, Generator[T, None, None]]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> Generator[T, None, None]:
start = time.monotonic()

for attempt_number in range(total_attempts):
if time.monotonic() - start > timeout.total_seconds():
raise TimeoutError(
"Build log streaming timed out after %ds",
timeout.total_seconds(),
)

with attempt(attempt_number):
yield from func(*args, **kwargs)
# If we get here without exception, the generator completed successfully
return

raise TooManyRetriesError(f"Failed after {total_attempts} attempts")

return wrapper

return decorator


class APIClient(httpx.Client):
Expand All @@ -19,3 +144,52 @@ def __init__(self) -> None:
"User-Agent": f"fastapi-cloud-cli/{__version__}",
},
)

@attempts(BUILD_LOG_MAX_RETRIES, BUILD_LOG_TIMEOUT)
def stream_build_logs(
self, deployment_id: str
) -> Generator[BuildLogLine, None, None]:
last_id = None

while True:
params = {"last_id": last_id} if last_id else None

with self.stream(
"GET",
f"/deployments/{deployment_id}/build-logs",
timeout=60,
params=params,
) as response:
response.raise_for_status()

for line in response.iter_lines():
if not line or not line.strip():
continue

if log_line := self._parse_log_line(line):
if log_line.id:
last_id = log_line.id

if log_line.type == "message":
yield log_line

if log_line.type in ("complete", "failed"):
yield log_line
return

if log_line.type == "timeout":
logger.debug("Received timeout; reconnecting")
break # Breaks for loop to reconnect
else:
logger.debug("Connection closed by server unexpectedly; will retry")

raise httpx.NetworkError("Connection closed without terminal state")

time.sleep(0.5)

def _parse_log_line(self, line: str) -> Optional[BuildLogLine]:
try:
return BuildLogAdapter.validate_json(line)
except (ValidationError, json.JSONDecodeError) as e:
logger.debug("Skipping malformed log: %s (error: %s)", line[:100], e)
return None
9 changes: 9 additions & 0 deletions src/fastapi_cloud_cli/utils/pydantic_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ def validate_python(self, value: Any) -> T:
from pydantic import parse_obj_as

return parse_obj_as(self.type_, value) # type: ignore[no-any-return, unused-ignore]

def validate_json(self, value: str) -> T:
"""Validate a JSON string against the type."""
if PYDANTIC_V2:
return self._adapter.validate_json(value) # type: ignore[no-any-return, union-attr, unused-ignore]
else:
from pydantic import parse_raw_as

return parse_raw_as(self.type_, value) # type: ignore[no-any-return, unused-ignore, operator]
Loading