Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
58c1281
Initial code for /evaluate endpoint
AnuradhaKaruppiah Apr 9, 2025
9ce6648
Use the existing session manager for running the workflow.
AnuradhaKaruppiah Apr 9, 2025
18c0ed4
Add an evaluate status endpoint to query the status of the job
AnuradhaKaruppiah Apr 9, 2025
47aaf1d
Update the job status to a pydantic model
AnuradhaKaruppiah Apr 9, 2025
6c1d508
Add an endpoint to query the last created evaluation job
AnuradhaKaruppiah Apr 9, 2025
943a7ee
Miscellaneous fixes
AnuradhaKaruppiah Apr 10, 2025
1da4dca
Rename functions to provide a better short description
AnuradhaKaruppiah Apr 10, 2025
4fcc2c8
Rename routes and descriptions for clarity
AnuradhaKaruppiah Apr 10, 2025
6f9fadf
Add a lock to prevent them one evaluation running at a time
AnuradhaKaruppiah Apr 10, 2025
17e5946
Default EvaluationRunConfig to match CLI defaults
AnuradhaKaruppiah Apr 10, 2025
2675850
Fix issue with user specified job-id handling
AnuradhaKaruppiah Apr 10, 2025
9253e07
Allow configurable repitions for the evaluate endpoint
AnuradhaKaruppiah Apr 10, 2025
4770837
Add a mechanism to expire jobs
AnuradhaKaruppiah Apr 10, 2025
fd1b2fa
Display the time at which a job is expected to expire
AnuradhaKaruppiah Apr 10, 2025
e29b396
If a job already exists return its status
AnuradhaKaruppiah Apr 10, 2025
8e55812
Fix filling expires_at
AnuradhaKaruppiah Apr 10, 2025
c022da3
Unit tests for the evaluate endpoint
AnuradhaKaruppiah Apr 10, 2025
5a87b80
Fix cleanup handling
AnuradhaKaruppiah Apr 10, 2025
1af79bd
Fix output dir handling for output customization
AnuradhaKaruppiah Apr 11, 2025
1c3b29f
Create cleanup task on first evaluation job
AnuradhaKaruppiah Apr 11, 2025
9aa8009
Add documentation for the evaluation endpoint
AnuradhaKaruppiah Apr 11, 2025
7a7cb8d
Fix mermaid block
AnuradhaKaruppiah Apr 11, 2025
30907ef
Reduce label size
AnuradhaKaruppiah Apr 11, 2025
1a0ad4a
Fix typos in the documentation
AnuradhaKaruppiah Apr 11, 2025
d8479c6
Rename evaluate_api.md to evaluate-api.md for consistency
AnuradhaKaruppiah Apr 11, 2025
218dc07
Fix unit test breakage because of adding an arg to the custom scripts
AnuradhaKaruppiah Apr 11, 2025
2c31979
Fix mermaid usage in docs to work with sphinx
AnuradhaKaruppiah Apr 11, 2025
da3be73
Add sphinx-mermaid to allow mermaid flow diagrams
AnuradhaKaruppiah Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/aiq/eval/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ async def run_evaluators(self, evaluators: dict[str, Any]):
logger.exception("An error occurred while running evaluators: %s", e, exc_info=True)
raise

async def run_and_evaluate(self) -> EvaluationRunOutput:
async def run_and_evaluate(self, session_manager: AIQSessionManager | None = None) -> EvaluationRunOutput:
"""
Run the workflow with the specified config file and evaluate the dataset
"""
Expand Down Expand Up @@ -267,8 +267,9 @@ async def run_and_evaluate(self) -> EvaluationRunOutput:
await self.run_workflow_remote()
else:
if not self.config.skip_workflow:
session_manager = AIQSessionManager(eval_workflow.build(),
max_concurrency=self.eval_config.general.max_concurrency)
if session_manager is None:
session_manager = AIQSessionManager(eval_workflow.build(),
max_concurrency=self.eval_config.general.max_concurrency)
await self.run_workflow_local(session_manager)

# Evaluate
Expand Down
30 changes: 30 additions & 0 deletions src/aiq/front_ends/fastapi/fastapi_front_end_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging
import typing
from datetime import datetime

from pydantic import BaseModel
from pydantic import Field
Expand All @@ -25,6 +26,29 @@
logger = logging.getLogger(__name__)


class AIQEvaluateRequest(BaseModel):
"""Request model for the evaluate endpoint."""
config_file: str = Field(description="Path to the configuration file for evaluation")


class AIQEvaluateResponse(BaseModel):
"""Response model for the evaluate endpoint."""
job_id: str = Field(description="Unique identifier for the evaluation job")
status: str = Field(description="Current status of the evaluation job")


class AIQEvaluateStatusResponse(BaseModel):
"""Response model for the evaluate status endpoint."""
job_id: str = Field(description="Unique identifier for the evaluation job")
status: str = Field(description="Current status of the evaluation job")
config_file: str = Field(description="Path to the configuration file used for evaluation")
error: str | None = Field(default=None, description="Error message if the job failed")
output_path: str | None = Field(default=None,
description="Path to the output file if the job completed successfully")
created_at: datetime = Field(description="Timestamp when the job was created")
updated_at: datetime = Field(description="Timestamp when the job was last updated")


class FastApiFrontEndConfig(FrontEndBaseConfig, name="fastapi"):
"""
A FastAPI based front end that allows an AgentIQ workflow to be served as a microservice.
Expand Down Expand Up @@ -92,6 +116,12 @@ class CrossOriginResourceSharing(BaseModel):
description="Executes the default AgentIQ workflow from the loaded configuration ",
)

evaluate: typing.Annotated[EndpointBase, Field(description="Endpoint for evaluating workflows.")] = EndpointBase(
method="POST",
path="/evaluate",
description="Evaluates the performance and accuracy of the workflow on a dataset",
)

endpoints: list[Endpoint] = Field(
default_factory=list,
description=(
Expand Down
150 changes: 150 additions & 0 deletions src/aiq/front_ends/fastapi/fastapi_front_end_plugin_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
from abc import abstractmethod
from contextlib import asynccontextmanager
from functools import partial
from pathlib import Path

from fastapi import BackgroundTasks
from fastapi import Body
from fastapi import FastAPI
from fastapi import Response
from fastapi.exceptions import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
Expand All @@ -34,7 +37,15 @@
from aiq.data_models.api_server import AIQChatResponseChunk
from aiq.data_models.api_server import AIQResponseIntermediateStep
from aiq.data_models.config import AIQConfig
from aiq.eval.config import EvaluationRunOutput
from aiq.eval.evaluate import EvaluationRun
from aiq.eval.evaluate import EvaluationRunConfig
from aiq.front_ends.fastapi.fastapi_front_end_config import AIQEvaluateRequest
from aiq.front_ends.fastapi.fastapi_front_end_config import AIQEvaluateResponse
from aiq.front_ends.fastapi.fastapi_front_end_config import AIQEvaluateStatusResponse
from aiq.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig
from aiq.front_ends.fastapi.job_store import JobInfo
from aiq.front_ends.fastapi.job_store import JobStore
from aiq.front_ends.fastapi.response_helpers import generate_single_response
from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_as_str
from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_raw_as_str
Expand Down Expand Up @@ -148,6 +159,7 @@ async def configure(self, app: FastAPI, builder: WorkflowBuilder):
async def add_routes(self, app: FastAPI, builder: WorkflowBuilder):

await self.add_default_route(app, AIQSessionManager(builder.build()))
await self.add_evaluate_route(app, AIQSessionManager(builder.build()))

for ep in self.front_end_config.endpoints:

Expand All @@ -159,6 +171,144 @@ async def add_default_route(self, app: FastAPI, session_manager: AIQSessionManag

await self.add_route(app, self.front_end_config.workflow, session_manager)

async def add_evaluate_route(self, app: FastAPI, session_manager: AIQSessionManager):
"""Add the evaluate endpoint to the FastAPI app."""

response_500 = {
"description": "Internal Server Error",
"content": {
"application/json": {
"example": {
"detail": "Internal server error occurred"
}
}
},
}

# Create job store for tracking evaluation jobs
job_store = JobStore()

async def run_evaluation(job_id: str, config_file: str, session_manager: AIQSessionManager):
"""Background task to run the evaluation."""
try:
# Create EvaluationRunConfig using the CLI defaults
eval_config = EvaluationRunConfig(config_file=Path(config_file),
dataset=None,
result_json_path="$",
skip_workflow=False,
skip_completed_entries=False,
endpoint=None,
endpoint_timeout=300,
reps=1)

# Create a new EvaluationRun with the evaluation-specific config
job_store.update_status(job_id, "running")
eval_runner = EvaluationRun(eval_config)
output: EvaluationRunOutput = await eval_runner.run_and_evaluate(session_manager=session_manager)
if output.workflow_interrupted:
job_store.update_status(job_id, "interrupted")
else:
job_store.update_status(job_id, "success", output_path=output.workflow_output_file)
except Exception as e:
logger.error(f"Error in evaluation job {job_id}: {str(e)}")
job_store.update_status(job_id, "failure", error=str(e))

async def start_evaluation(request: AIQEvaluateRequest, background_tasks: BackgroundTasks):
"""Handle evaluation requests."""
job_id = job_store.create_job(request.config_file)
background_tasks.add_task(run_evaluation, job_id, request.config_file, session_manager)
return AIQEvaluateResponse(job_id=job_id, status="submitted")

def translate_job_to_response(job: JobInfo) -> AIQEvaluateStatusResponse:
"""Translate a JobInfo object to an AIQEvaluateStatusResponse."""
return AIQEvaluateStatusResponse(job_id=job.job_id,
status=job.status,
config_file=str(job.config_file),
error=job.error,
output_path=str(job.output_path),
created_at=job.created_at,
updated_at=job.updated_at)

def get_job_status(job_id: str) -> AIQEvaluateStatusResponse:
"""Get the status of an evaluation job."""
logger.info(f"Getting status for job {job_id}")
job = job_store.get_job(job_id)
if not job:
logger.warning(f"Job {job_id} not found")
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
logger.info(f"Found job {job_id} with status {job.status}")
return translate_job_to_response(job)

def get_last_job_status() -> AIQEvaluateStatusResponse:
"""Get the status of the last created evaluation job."""
logger.info("Getting last job status")
job = job_store.get_last_job()
if not job:
logger.warning("No jobs found when requesting last job status")
raise HTTPException(status_code=404, detail="No jobs found")
logger.info(f"Found last job {job.job_id} with status {job.status}")
return translate_job_to_response(job)

def get_jobs(status: str | None = None) -> list[AIQEvaluateStatusResponse]:
"""Get all jobs, optionally filtered by status."""
if status is None:
logger.info("Getting all jobs")
jobs = job_store.get_all_jobs()
else:
logger.info(f"Getting jobs with status {status}")
jobs = job_store.get_jobs_by_status(status)
logger.info(f"Found {len(jobs)} jobs")
return [translate_job_to_response(job) for job in jobs]

if self.front_end_config.evaluate.path:
# Add last job endpoint first (most specific)
app.add_api_route(
path=f"{self.front_end_config.evaluate.path}/job/last",
endpoint=get_last_job_status,
methods=["GET"],
response_model=AIQEvaluateStatusResponse,
description="Get the status of the last created evaluation job",
responses={
404: {
"description": "No jobs found"
}, 500: response_500
},
)

# Add jobs endpoint with optional status query parameter
app.add_api_route(
path=f"{self.front_end_config.evaluate.path}/jobs",
endpoint=get_jobs,
methods=["GET"],
response_model=list[AIQEvaluateStatusResponse],
description="Get all jobs, optionally filtered by status",
responses={500: response_500},
)

# Add specific job endpoint (least specific)
app.add_api_route(
path=f"{self.front_end_config.evaluate.path}/job/{{job_id}}",
endpoint=get_job_status,
methods=["GET"],
response_model=AIQEvaluateStatusResponse,
description="Get the status of an evaluation job",
responses={
404: {
"description": "Job not found"
}, 500: response_500
},
)

# Add HTTP endpoint for evaluation
app.add_api_route(
path=self.front_end_config.evaluate.path,
endpoint=start_evaluation,
methods=[self.front_end_config.evaluate.method],
response_model=AIQEvaluateResponse,
description=self.front_end_config.evaluate.description,
responses={500: response_500},
)

async def add_route(self,
app: FastAPI,
endpoint: FastApiFrontEndConfig.EndpointBase,
Expand Down
99 changes: 99 additions & 0 deletions src/aiq/front_ends/fastapi/job_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from datetime import datetime
from enum import Enum
from uuid import uuid4

from pydantic import BaseModel

logger = logging.getLogger(__name__)


class JobStatus(str, Enum):
SUBMITTED = "submitted"
RUNNING = "running"
SUCCESS = "success"
FAILURE = "failure"
INTERRUPTED = "interrupted"
NOT_FOUND = "not_found"


# pydantic model for the job status
class JobInfo(BaseModel):
job_id: str
status: JobStatus
config_file: str
error: str | None
output_path: str | None
created_at: datetime
updated_at: datetime


class JobStore:

def __init__(self):
self._jobs = {}

def create_job(self, config_file: str) -> str:
job_id = str(uuid4())
job = JobInfo(job_id=job_id,
status=JobStatus.SUBMITTED,
config_file=config_file,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
error=None,
output_path=None)
self._jobs[job_id] = job
logger.info(f"Created new job {job_id} with config {config_file}")
return job_id

def update_status(self, job_id: str, status: str, error: str | None = None, output_path: str | None = None):
if job_id not in self._jobs:
raise ValueError(f"Job {job_id} not found")

job = self._jobs[job_id]
job.status = status
job.error = error
job.output_path = output_path
job.updated_at = datetime.utcnow()

def get_status(self, job_id: str) -> JobInfo | None:
return self._jobs.get(job_id)

def list_jobs(self):
return self._jobs

def get_job(self, job_id: str) -> JobInfo | None:
"""Get a job by its ID."""
return self._jobs.get(job_id)

def get_last_job(self) -> JobInfo | None:
"""Get the last created job."""
if not self._jobs:
logger.info("No jobs found in job store")
return None
last_job = max(self._jobs.values(), key=lambda job: job.created_at)
logger.info(f"Retrieved last job {last_job.job_id} created at {last_job.created_at}")
return last_job

def get_jobs_by_status(self, status: str) -> list[JobInfo]:
"""Get all jobs with the specified status."""
return [job for job in self._jobs.values() if job.status == status]

def get_all_jobs(self) -> list[JobInfo]:
"""Get all jobs in the store."""
return list(self._jobs.values())