Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Change Log

## Release 1.4.1 (12/13/23)

### Added

- Local test API server includes simulated endpoints that mimic the behavior of `run`, `runsync`, `stream`, and `status`.
- Internal job tracker can be used to track job inputs.

---

## Release 1.4.0 (12/4/23)

### Changed
Expand Down
17 changes: 17 additions & 0 deletions examples/serverless/simple_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
""" Simple Handler

To setup a local API server, run the following command:
python simple_handler.py --rp_serve_api
"""

import runpod


def handler(job):
""" Simple handler """
job_input = job["input"]

return f"Hello {job_input['name']}!"


runpod.serverless.start({"handler": handler})
6 changes: 2 additions & 4 deletions runpod/serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ def start(config: Dict[str, Any]):

if config["rp_args"]["rp_serve_api"]:
print("Starting API server.")
api_server = rp_fastapi.WorkerAPI()
api_server.config = config
api_server = rp_fastapi.WorkerAPI(config)

api_server.start_uvicorn(
api_host=config['rp_args']['rp_api_host'],
Expand All @@ -137,8 +136,7 @@ def start(config: Dict[str, Any]):

elif realtime_port:
print("Starting API server for realtime.")
api_server = rp_fastapi.WorkerAPI()
api_server.config = config
api_server = rp_fastapi.WorkerAPI(config)

api_server.start_uvicorn(
api_host='0.0.0.0',
Expand Down
178 changes: 148 additions & 30 deletions runpod/serverless/modules/rp_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# pylint: disable=too-few-public-methods

import os
from typing import Union
import uuid
from typing import Union, Optional, Dict, Any

import uvicorn
from fastapi import FastAPI, APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import RedirectResponse
from pydantic import BaseModel

from .rp_handler import is_generator
Expand Down Expand Up @@ -47,14 +49,39 @@ class TestJob(BaseModel):
''' Represents a test job.
input can be any type of data.
'''
id: str = "test_job"
input: Union[dict, list, str, int, float, bool]
id: Optional[str]
input: Optional[Union[dict, list, str, int, float, bool]]


class DefaultInput(BaseModel):
""" Represents a test input. """
input: Dict[str, Any]


# ------------------------------ Output Objects ------------------------------ #
class JobOutput(BaseModel):
''' Represents the output of a job. '''
id: str
status: str
output: Optional[Union[dict, list, str, int, float, bool]]
error: Optional[str]


class StreamOutput(BaseModel):
""" Stream representation of a job. """
id: str
status: str = "IN_PROGRESS"
stream: Optional[Union[dict, list, str, int, float, bool]]
error: Optional[str]


# ---------------------------------------------------------------------------- #
# API Worker #
# ---------------------------------------------------------------------------- #
class WorkerAPI:
''' Used to launch the FastAPI web server when the worker is running in API mode. '''

def __init__(self, handler=None):
def __init__(self, config: Dict[str, Any]):
'''
Initializes the WorkerAPI class.
1. Starts the heartbeat thread.
Expand All @@ -64,23 +91,50 @@ def __init__(self, handler=None):
# Start the heartbeat thread.
heartbeat.start_ping()

# Set the handler for processing jobs.
self.config = {"handler": handler}
self.config = config

# Initialize the FastAPI web server.
self.rp_app = FastAPI(
title="RunPod | Test Worker | API",
description=DESCRIPTION,
version=runpod_version,
docs_url="/"
)

# Create an APIRouter and add the route for processing jobs.
api_router = APIRouter()

if RUNPOD_ENDPOINT_ID:
api_router.add_api_route(f"/{RUNPOD_ENDPOINT_ID}/realtime", self._run, methods=["POST"])
# Docs Redirect /docs -> /
api_router.add_api_route(
"/docs", lambda: RedirectResponse(url="/"),
include_in_schema=False
)

api_router.add_api_route("/runsync", self._debug_run, methods=["POST"])
if RUNPOD_ENDPOINT_ID:
api_router.add_api_route(f"/{RUNPOD_ENDPOINT_ID}/realtime",
self._realtime, methods=["POST"])

# Simulation endpoints.
api_router.add_api_route(
"/run", self._sim_run, methods=["POST"], response_model_exclude_none=True,
summary="Simulate run behavior.",
description="Returns job ID to be used with `/stream` and `/status` endpoints."
)
api_router.add_api_route(
"/runsync", self._sim_runsync, methods=["POST"], response_model_exclude_none=True,
summary="Simulate runsync behavior.",
description="Returns job output directly when called."
)
api_router.add_api_route(
"/stream/{job_id}", self._sim_stream, methods=["POST"],
response_model_exclude_none=True, summary="Simulate stream behavior.",
description="Aggregates the output of the job and returns it when the job is complete."
)
api_router.add_api_route(
"/status/{job_id}", self._sim_status, methods=["POST"],
response_model_exclude_none=True, summary="Simulate status behavior.",
description="Returns the output of the job when the job is complete."
)

# Include the APIRouter in the FastAPI application.
self.rp_app.include_router(api_router)
Expand All @@ -96,47 +150,111 @@ def start_uvicorn(self, api_host='localhost', api_port=8000, api_concurrency=1):
access_log=False
)

async def _run(self, job: Job):
# ----------------------------- Realtime Endpoint ---------------------------- #
async def _realtime(self, job: Job):
'''
Performs model inference on the input data using the provided handler.
If handler is not provided, returns an error message.
'''
if self.config["handler"] is None:
return {"error": "Handler not provided"}

# Set the current job ID.
job_list.add_job(job.id)

# Process the job using the provided handler.
# Process the job using the provided handler, passing in the job input.
job_results = await run_job(self.config["handler"], job.__dict__)

# Reset the job ID.
job_list.remove_job(job.id)

# Return the results of the job processing.
return jsonable_encoder(job_results)

async def _debug_run(self, job: TestJob):
'''
Performs model inference on the input data using the provided handler.
'''
if self.config["handler"] is None:
return {"error": "Handler not provided"}
# ---------------------------------------------------------------------------- #
# Simulation Endpoints #
# ---------------------------------------------------------------------------- #

# Set the current job ID.
job_list.add_job(job.id)
# ------------------------------------ run ----------------------------------- #
async def _sim_run(self, job_input: DefaultInput) -> JobOutput:
""" Development endpoint to simulate run behavior. """
assigned_job_id = f"test-{uuid.uuid4()}"
job_list.add_job(assigned_job_id, job_input.input)
return jsonable_encoder({"id": assigned_job_id, "status": "IN_PROGRESS"})

# ---------------------------------- runsync --------------------------------- #
async def _sim_runsync(self, job_input: DefaultInput) -> JobOutput:
""" Development endpoint to simulate runsync behavior. """
assigned_job_id = f"test-{uuid.uuid4()}"
job = TestJob(id=assigned_job_id, input=job_input.input)

if is_generator(self.config["handler"]):
generator_output = run_job_generator(self.config["handler"], job.__dict__)
job_results = {"output": []}
job_output = {"output": []}
async for stream_output in generator_output:
job_results["output"].append(stream_output["output"])
job_output['output'].append(stream_output["output"])
else:
job_results = await run_job(self.config["handler"], job.__dict__)
job_output = await run_job(self.config["handler"], job.__dict__)

return jsonable_encoder({
"id": job.id,
"status": "COMPLETED",
"output": job_output['output']
})

# ---------------------------------- stream ---------------------------------- #
async def _sim_stream(self, job_id: str) -> StreamOutput:
""" Development endpoint to simulate stream behavior. """
job_input = job_list.get_job_input(job_id)
if job_input is None:
return jsonable_encoder({
"id": job_id,
"status": "FAILED",
"error": "Job ID not found"
})

job = TestJob(id=job_id, input=job_input)

job_results["id"] = job.id
if is_generator(self.config["handler"]):
generator_output = run_job_generator(self.config["handler"], job.__dict__)
stream_accumulator = []
async for stream_output in generator_output:
stream_accumulator.append({"output": stream_output["output"]})
else:
return jsonable_encoder({
"id": job_id,
"status": "FAILED",
"error": "Stream not supported, handler must be a generator."
})

# Reset the job ID.
job_list.remove_job(job.id)

return jsonable_encoder(job_results)
return jsonable_encoder({
"id": job_id,
"status": "COMPLETED",
"stream": stream_accumulator
})

# ---------------------------------- status ---------------------------------- #
async def _sim_status(self, job_id: str) -> JobOutput:
""" Development endpoint to simulate status behavior. """
job_input = job_list.get_job_input(job_id)
if job_input is None:
return jsonable_encoder({
"id": job_id,
"status": "FAILED",
"error": "Job ID not found"
})

job = TestJob(id=job_id, input=job_input)

if is_generator(self.config["handler"]):
generator_output = run_job_generator(self.config["handler"], job.__dict__)
job_output = {"output": []}
async for stream_output in generator_output:
job_output['output'].append(stream_output["output"])
else:
job_output = await run_job(self.config["handler"], job.__dict__)

job_list.remove_job(job.id)

return jsonable_encoder({
"id": job_id,
"status": "COMPLETED",
"output": job_output['output']
})
6 changes: 3 additions & 3 deletions runpod/serverless/modules/rp_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# pylint: disable=too-many-branches

import inspect
from typing import Any, Callable, Dict, Generator, Optional, Union
from typing import Any, Callable, Dict, Optional, Union, AsyncGenerator

import os
import json
Expand Down Expand Up @@ -179,9 +179,9 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:

async def run_job_generator(
handler: Callable,
job: Dict[str, Any]) -> Generator[Dict[str, Union[str, Any]], None, None]:
job: Dict[str, Any]) -> AsyncGenerator[Dict[str, Union[str, Any]], None]:
'''
Run generator job.
Run generator job used to stream output.
Yields output partials from the generator.
'''
try:
Expand Down
42 changes: 35 additions & 7 deletions runpod/serverless/modules/worker_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import uuid
import time
from typing import Optional, Dict, Any, Union

REF_COUNT_ZERO = time.perf_counter() # Used for benchmarking with the debugger.

Expand All @@ -22,6 +23,25 @@ def get_auth_header():
return {"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"}


# ------------------------------- Job Tracking ------------------------------- #
class Job:
""" Represents a job. """

def __init__(self, job_id: str, job_input: Optional[Dict[str, Any]] = None) -> None:
self.job_id = job_id
self.job_input = job_input

def __eq__(self, other: object) -> bool:
if isinstance(other, Job):
return self.job_id == other.job_id
return False

def __hash__(self) -> int:
return hash(self.job_id)

def __str__(self) -> str:
return self.job_id


class Jobs:
''' Track the state of current jobs.'''
Expand All @@ -35,23 +55,31 @@ def __new__(cls):
Jobs._instance.jobs = set()
return Jobs._instance

def add_job(self, job_id):
def add_job(self, job_id, job_input=None):
'''
Adds a job to the list of jobs.
'''
self.jobs.add(job_id)
self.jobs.add(Job(job_id, job_input))

def remove_job(self, job_id):
'''
Removes a job from the list of jobs.
'''
self.jobs.remove(job_id)
self.jobs.remove(Job(job_id))

def get_job_input(self, job_id) -> Optional[Union[dict, list, str, int, float, bool]]:
'''
Returns the job with the given id.
Used within rp_fastapi.py for local testing.
'''
for job in self.jobs:
if job.job_id == job_id:
return job.job_input

return None

def get_job_list(self):
'''
Returns the list of jobs as a string.
'''
if len(self.jobs) == 0:
return None

return ','.join(list(self.jobs))
return ','.join(str(job) for job in self.jobs) if self.jobs else None
Loading