diff --git a/docs/source/conf.py b/docs/source/conf.py index 282e582e8..df758a6f8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -83,7 +83,8 @@ 'sphinx_copybutton', 'sphinx.ext.doctest', 'sphinx.ext.graphviz', - 'sphinx.ext.intersphinx' + 'sphinx.ext.intersphinx', + "sphinxmermaid" ] autoapi_dirs = [API_TREE] diff --git a/docs/source/guides/evaluate-api.md b/docs/source/guides/evaluate-api.md new file mode 100644 index 000000000..b62f48e3c --- /dev/null +++ b/docs/source/guides/evaluate-api.md @@ -0,0 +1,153 @@ + + +# Evaluate API Endpoints +The evaluation endpoint can be used to start evaluation jobs on a remote AgentIQ server. + +## Evaluation Endpoint Overview +```{mermaid} +graph TD + A["POST /evaluate"] --> B["Background Job Created"] + B --> C["GET /evaluate/job/{job_id}"] + B --> D["GET /evaluate/job/last"] + B --> E["GET /evaluate/jobs"] +``` + +## Evaluate Request and Response +The /evaluate endpoint allows you to start an evaluation job. The request is stored for background processing, and the server returns a job ID for tracking the job status. + +### Evaluate Request +- **Route**: `/evaluate` +- **Method**: `POST` +- **Description**: Start evaluation. Evaluates the performance and accuracy of the workflow on a dataset. +- HTTP Request Example: +```bash +curl --request POST \ + --url http://localhost:8000/evaluate \ + --header 'Content-Type: application/json' \ + --data '{ + "config_file": "examples/simple/configs/eval_config.yml", + "expiry_seconds": 600 +}' | jq +``` +You can optionally pipe the output to `jq` for response formatting. + +### Evaluate Request Format +`AIQEvaluateRequest`: +- `config_file`: Path to the evaluation configuration file on the remote server. +- `job_id`: Unique identifier for the evaluation job. If not provided, a new job ID is generated. +- `reps`: Number of repetitions for the evaluation. Defaults to 1. +- `expiry_seconds`: Optional time (in seconds) before the job expires. This is clamped between 600 (10 min) and 86400 (24h). Defaults to 3600 seconds (1 hour). + +### Evaluate Response +The evaluation request is stored as a background job in the server and the endpoint returns a job ID and status. Sample response: +```json +{ + "job_id": "882317f0-6149-4b29-872b-9c8018d64784", + "status": "submitted" +} +``` + +### Evaluate Response Format +`AIQEvaluateResponse`: +- `job_id`: Unique identifier for the evaluation job. +- `status`: Status of the evaluation job. Possible values are: +**Possible `status` values**: +- `submitted` – The job has been submitted and is waiting to be processed. +- `running` – The job is currently being processed. +- `success` – The job has completed successfully. +- `failure` – The job has failed. +- `interrupted` – The job was interrupted before completion. +- `not_found` – The job ID was not found. + + +## Evaluate Job Status +### Job Status by ID +A submitted job's status can be checked using the job ID. The status endpoint is defined as follows: +- **Route**: `/evaluate/job/{job_id}` +- **Method**: `GET` +- **Description**: Get the status of a submitted evaluation job using the job ID. +- HTTP Request Example: +```bash +curl --request GET \ + --url http://localhost:8000/evaluate/job/882317f0-6149-4b29-872b-9c8018d64784 | jq +``` + +### Evaluate Job Status Response +The response contains the status of the job, including the job ID, status, and any error messages if applicable. Sample response: +```json +{ + "job_id": "882317f0-6149-4b29-872b-9c8018d64784", + "status": "success", + "config_file": "examples/simple/configs/eval_config.yml", + "error": null, + "output_path": ".tmp/aiq/examples/simple/jobs/882317f0-6149-4b29-872b-9c8018d64784", + "created_at": "2025-04-11T17:33:38.018904Z", + "updated_at": "2025-04-11T17:34:40.359080Z", + "expires_at": "2025-04-11T17:44:40.359080Z" +} +``` + +### Job Status: Last Submitted Job +The last job status can be checked using the following endpoint: +- **Route**: `/evaluate/job/last` +- **Method**: `GET` +- **Description**: Get the status of the last submitted evaluation job. +- HTTP Request Example: +```bash +curl --request GET \ + --url http://localhost:8000/evaluate/job/last | jq +``` + +### Status of all jobs +The status of all jobs can be checked using the following endpoint: +- **Route**: `/evaluate/jobs` +- **Method**: `GET` +- **Description**: Get the status of all submitted evaluation jobs. +- HTTP Request Example: +```bash +curl --request GET \ + --url http://localhost:8000/evaluate/jobs | jq +``` + +#### Sample Response +```bash +[ + { + "job_id": "df6fddd7-2adf-45dd-a105-8559a7569ec9", + "status": "success", + "config_file": "examples/simple/configs/eval_config.yml", + "error": null, + "output_path": ".tmp/aiq/examples/simple/jobs/df6fddd7-2adf-45dd-a105-8559a7569ec9", + "created_at": "2025-04-11T17:33:16.711636Z", + "updated_at": "2025-04-11T17:34:24.753742Z", + "expires_at": "2025-04-11T17:44:24.753742Z" + }, + ... +] +``` + +## Output Storage +A separate output directory is created for each job. The output directory contains the evaluation results, including the evaluation metrics and any generated files. The `jobs/{job-id}` is appended to the `eval.general.output.dir` configuration parameter in the evaluation configuration file to maintain the results of each job. If upload to remote storage is enabled, `jobs/{job-id}` is similarly appended to the `eval.general.output.remote_dir` configuration parameter in the evaluation configuration file. + +### Output Directory Cleanup +As the results are maintained per-job, output directory cleanup is recommended. This can be done by enabling `eval.general.output.cleanup` in the evaluation configuration file. If this configuration is enabled, the server removes the entire contents of the output directory at the start of each job. This way only the last job's results are kept in the output directory. + +### Job Expiry +You can also configure the expiry timer per-job using the `expiry_seconds` parameter in the `AIQEvaluateRequest`. The server will automatically clean up expired jobs based on this timer. The default expiry value is 3600 seconds (1 hour). The expiration time is clamped between 600 (10 min) and 86400 (24h). + +This cleanup includes both the job metadata and the contents of the output directory. The most recently finished job is always preserved, even if expired. Similarly, active jobs, `["submitted", "running"]`, are exempt from cleanup. diff --git a/docs/source/guides/evaluate.md b/docs/source/guides/evaluate.md index fe93a6666..5c61fc271 100644 --- a/docs/source/guides/evaluate.md +++ b/docs/source/guides/evaluate.md @@ -225,6 +225,10 @@ Run the evaluation with the `--endpoint` flag and the configuration file with th aiq eval --config_file=examples/simple/configs/eval_config.yml --endpoint http://localhost:8000 ``` +## Evaluation Endpoint +You can also evaluate workflows via the AgentIQ evaluation endpoint. The evaluation endpoint is a REST API that allows you to evaluate workflows using the same configuration file as the `aiq eval` command. The evaluation endpoint is available at `/evaluate` on the AgentIQ server. For more information, refer to the [AgentIQ Evaluation Endpoint](./evaluate-api.md) documentation. + + ## Adding Custom Evaluators You can add custom evaluators to evaluate the workflow output. To add a custom evaluator, you need to implement the evaluator and register it with the AgentIQ evaluator system. See the [Custom Evaluator](custom-evaluator.md) documentation for more information. diff --git a/docs/source/guides/index.md b/docs/source/guides/index.md index 8aad74c1a..2abf8c06a 100644 --- a/docs/source/guides/index.md +++ b/docs/source/guides/index.md @@ -24,6 +24,7 @@ Create and Customize Workflows <./create-customize-workflows.md> Share Components <./sharing-workflows-and-tools.md> Evaluate <./evaluate.md> Add Custom Evaluators <./custom-evaluator.md> +Evaluation Endpoints <./evaluate-api.md> ./observe-workflow-with-phoenix.md Use User Interface and API Server <./using-agentiq-ui-and-server.md> Profile a Workflow <./profiler.md> diff --git a/docs/source/guides/using-agentiq-ui-and-server.md b/docs/source/guides/using-agentiq-ui-and-server.md index 05c4e952a..ffe80f7d2 100644 --- a/docs/source/guides/using-agentiq-ui-and-server.md +++ b/docs/source/guides/using-agentiq-ui-and-server.md @@ -256,6 +256,9 @@ result back to the client. The transaction schema is defined by the workflow. } ``` +## Evaluation Endpoint +You can also evaluate workflows via the AgentIQ `evaluate` endpoint. For more information, refer to the [AgentIQ Evaluation Endpoint](./evaluate-api.md) documentation. + ### Choosing between Streaming and Non-Streaming Use streaming if you need real-time updates or live communication where users expect immediate feedback. Use non-streaming if your workflow responds with simple updates and less feedback is needed. diff --git a/examples/simple/src/aiq_simple/configs/eval_config.yml b/examples/simple/src/aiq_simple/configs/eval_config.yml index 53433d061..93f3f59f8 100644 --- a/examples/simple/src/aiq_simple/configs/eval_config.yml +++ b/examples/simple/src/aiq_simple/configs/eval_config.yml @@ -58,7 +58,9 @@ workflow: eval: general: - output_dir: ./.tmp/aiq/examples/simple/ + output: + dir: ./.tmp/aiq/examples/simple/ + cleanup: true dataset: _type: json file_path: examples/simple/data/langsmith.json diff --git a/examples/simple/src/aiq_simple/configs/eval_upload_config.yml b/examples/simple/src/aiq_simple/configs/eval_upload_config.yml index e0cf40622..3979eeecc 100644 --- a/examples/simple/src/aiq_simple/configs/eval_upload_config.yml +++ b/examples/simple/src/aiq_simple/configs/eval_upload_config.yml @@ -65,16 +65,16 @@ eval: general: output: dir: ./.tmp/aiq/examples/simple_output/ + remote_dir: output # Whether to cleanup the output directory before running the workflow cleanup: true custom_scripts: convert_workflow_to_csv: script: examples/simple/src/aiq_simple/scripts/workflow_to_csv.py kwargs: - input: ./.tmp/aiq/examples/simple_output/workflow_output.json - output: ./.tmp/aiq/examples/simple_output/workflow.csv - # Upload contents of output directory to remote storage using S3 credentials - remote_dir: output + # input and output files here are relative to the output dir + input: workflow_output.json + output: workflow.csv s3: endpoint_url: http://10.185.X.X:9000 bucket: aiq-simple-bucket @@ -82,14 +82,13 @@ eval: secret_key: fake-secret-key dataset: _type: json - # Download dataset from remote storage using S3 credentials remote_file_path: input/langsmith.json file_path: ./.tmp/aiq/examples/simple_input/langsmith.json s3: endpoint_url: http://10.185.X.X:9000 bucket: aiq-simple-bucket access_key: fake-access-key - secret_key: fake-access-key + secret_key: fake-secret-key profiler: # Compute inter query token uniqueness token_uniqueness_forecast: true diff --git a/examples/simple/src/aiq_simple/scripts/workflow_to_csv.py b/examples/simple/src/aiq_simple/scripts/workflow_to_csv.py index 9091cc242..df1ab7560 100644 --- a/examples/simple/src/aiq_simple/scripts/workflow_to_csv.py +++ b/examples/simple/src/aiq_simple/scripts/workflow_to_csv.py @@ -22,7 +22,12 @@ from pathlib import Path -def customize_workflow_json(input_path: Path, output_path: Path): +def customize_workflow_json(output_dir: Path, input_path: Path, output_path: Path): + + # input and output paths are relative to the output_dir + input_path = output_dir / input_path + output_path = output_dir / output_path + if not input_path.exists(): raise FileNotFoundError(f"{input_path} does not exist") @@ -45,11 +50,13 @@ def customize_workflow_json(input_path: Path, output_path: Path): writer.writeheader() writer.writerows(cleaned) - print(f"✅ Converted {input_path.name} to {output_path.name}") + print(f"Converted {input_path.name} to {output_path.name}") def parse_args(): parser = argparse.ArgumentParser(description="Convert workflow_output.json to workflow.csv") + # output_dir is a mandatory first argument + parser.add_argument("--output_dir", type=Path, required=True, help="Path to output directory") parser.add_argument("--input", type=Path, required=True, help="Path to workflow_output.json") parser.add_argument("--output", type=Path, required=True, help="Path to output CSV") return parser.parse_args() @@ -57,4 +64,4 @@ def parse_args(): if __name__ == "__main__": args = parse_args() - customize_workflow_json(args.input, args.output) + customize_workflow_json(args.output_dir, args.input, args.output) diff --git a/pyproject.toml b/pyproject.toml index 974823931..40ecab1f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,6 +139,7 @@ docs = [ "sphinx~=8.2", "sphinx-copybutton>=0.5", "sphinx-autoapi>=3.6", + "sphinx-mermaid", "vale==3.9.5", ] diff --git a/src/aiq/eval/config.py b/src/aiq/eval/config.py index 12a3c3c47..da10cde0d 100644 --- a/src/aiq/eval/config.py +++ b/src/aiq/eval/config.py @@ -24,12 +24,12 @@ class EvaluationRunConfig(BaseModel): """ config_file: Path dataset: str | None # dataset file path can be specified in the config file - result_json_path: str - skip_workflow: bool - skip_completed_entries: bool - endpoint: str | None # only used when running the workflow remotely - endpoint_timeout: int - reps: int + result_json_path: str = "$" + skip_workflow: bool = False + skip_completed_entries: bool = False + endpoint: str | None = None # only used when running the workflow remotely + endpoint_timeout: int = 300 + reps: int = 1 class EvaluationRunOutput(BaseModel): diff --git a/src/aiq/eval/evaluate.py b/src/aiq/eval/evaluate.py index 1e9ac4ac3..4b59f8353 100644 --- a/src/aiq/eval/evaluate.py +++ b/src/aiq/eval/evaluate.py @@ -169,6 +169,7 @@ def cleanup_output_directory(self): '''Remove contents of the output directory if it exists''' if self.eval_config.general.output and self.eval_config.general.output.dir and \ self.eval_config.general.output.dir.exists(): + logger.info("Cleaning up output directory %s", self.eval_config.general.output.dir) shutil.rmtree(self.eval_config.general.output.dir) def write_output(self, dataset_handler: DatasetHandler): @@ -223,7 +224,9 @@ async def run_evaluators(self, evaluators: dict[str, Any]): logger.exception("An error occurred while running evaluators: %s", e, exc_info=True) raise - async def run_and_evaluate(self) -> EvaluationRunOutput: + async def run_and_evaluate(self, + session_manager: AIQSessionManager | None = None, + job_id: str | None = None) -> EvaluationRunOutput: """ Run the workflow with the specified config file and evaluate the dataset """ @@ -240,6 +243,13 @@ async def run_and_evaluate(self) -> EvaluationRunOutput: # Cleanup the output directory if self.eval_config.general.output and self.eval_config.general.output.cleanup: self.cleanup_output_directory() + + # If a job id is provided keep the data per-job + if job_id: + self.eval_config.general.output_dir = self.eval_config.general.output_dir / f"jobs/{job_id}" + if self.eval_config.general.output: + self.eval_config.general.output.dir = self.eval_config.general.output_dir + # Load the input dataset # For multiple datasets, one handler per dataset can be created dataset_config = self.eval_config.general.dataset # Currently only one dataset is supported @@ -267,8 +277,9 @@ async def run_and_evaluate(self) -> EvaluationRunOutput: await self.run_workflow_remote() else: if not self.config.skip_workflow: - session_manager = AIQSessionManager(eval_workflow.build(), - max_concurrency=self.eval_config.general.max_concurrency) + if session_manager is None: + session_manager = AIQSessionManager(eval_workflow.build(), + max_concurrency=self.eval_config.general.max_concurrency) await self.run_workflow_local(session_manager) # Evaluate @@ -283,7 +294,7 @@ async def run_and_evaluate(self) -> EvaluationRunOutput: # Run custom scripts and upload evaluation outputs to S3 if self.eval_config.general.output: - output_uploader = OutputUploader(self.eval_config.general.output) + output_uploader = OutputUploader(self.eval_config.general.output, job_id=job_id) output_uploader.run_custom_scripts() await output_uploader.upload_directory() diff --git a/src/aiq/eval/utils/output_uploader.py b/src/aiq/eval/utils/output_uploader.py index b7a941c33..132cac8ff 100644 --- a/src/aiq/eval/utils/output_uploader.py +++ b/src/aiq/eval/utils/output_uploader.py @@ -35,9 +35,10 @@ class OutputUploader: credentials. """ - def __init__(self, output_config: EvalOutputConfig): + def __init__(self, output_config: EvalOutputConfig, job_id: str | None = None): self.output_config = output_config self._s3_client = None + self.job_id = job_id @property def s3_config(self): @@ -63,6 +64,8 @@ async def upload_directory(self): local_dir = self.output_config.dir bucket = self.s3_config.bucket remote_prefix = self.output_config.remote_dir or "" + if self.job_id: + remote_prefix = str(Path(remote_prefix) / f"jobs/{self.job_id}") file_entries = [] for root, _, files in os.walk(local_dir): @@ -99,6 +102,7 @@ def run_custom_scripts(self): """ Run custom Python scripts defined in the EvalOutputConfig. Each script is run with its kwargs passed as command-line arguments. + The output directory is passed as the first argument. """ for script_name, script_config in self.output_config.custom_scripts.items(): script_path = script_config.script @@ -108,6 +112,9 @@ def run_custom_scripts(self): # use python interpreter args = [sys.executable, str(script_path)] + # add output directory as first keyword argument + args.append("--output_dir") + args.append(str(self.output_config.dir)) if script_config.kwargs: for key, value in script_config.kwargs.items(): args.append(f"--{key}") diff --git a/src/aiq/front_ends/fastapi/fastapi_front_end_config.py b/src/aiq/front_ends/fastapi/fastapi_front_end_config.py index b193a5bd9..33e3db582 100644 --- a/src/aiq/front_ends/fastapi/fastapi_front_end_config.py +++ b/src/aiq/front_ends/fastapi/fastapi_front_end_config.py @@ -15,6 +15,7 @@ import logging import typing +from datetime import datetime from pydantic import BaseModel from pydantic import Field @@ -25,6 +26,35 @@ logger = logging.getLogger(__name__) +class AIQEvaluateRequest(BaseModel): + """Request model for the evaluate endpoint.""" + config_file: str = Field(description="Path to the configuration file for evaluation") + job_id: str | None = Field(default=None, description="Unique identifier for the evaluation job") + reps: int = Field(default=1, description="Number of repetitions for the evaluation, defaults to 1") + expiry_seconds: int = Field( + default=3600, + description="Optional time (in seconds) before the job expires. Clamped between 600 (10 min) and 86400 (24h).") + + +class AIQEvaluateResponse(BaseModel): + """Response model for the evaluate endpoint.""" + job_id: str = Field(description="Unique identifier for the evaluation job") + status: str = Field(description="Current status of the evaluation job") + + +class AIQEvaluateStatusResponse(BaseModel): + """Response model for the evaluate status endpoint.""" + job_id: str = Field(description="Unique identifier for the evaluation job") + status: str = Field(description="Current status of the evaluation job") + config_file: str = Field(description="Path to the configuration file used for evaluation") + error: str | None = Field(default=None, description="Error message if the job failed") + output_path: str | None = Field(default=None, + description="Path to the output file if the job completed successfully") + created_at: datetime = Field(description="Timestamp when the job was created") + updated_at: datetime = Field(description="Timestamp when the job was last updated") + expires_at: datetime | None = Field(default=None, description="Timestamp when the job will expire") + + class FastApiFrontEndConfig(FrontEndBaseConfig, name="fastapi"): """ A FastAPI based front end that allows an AgentIQ workflow to be served as a microservice. @@ -92,6 +122,12 @@ class CrossOriginResourceSharing(BaseModel): description="Executes the default AgentIQ workflow from the loaded configuration ", ) + evaluate: typing.Annotated[EndpointBase, Field(description="Endpoint for evaluating workflows.")] = EndpointBase( + method="POST", + path="/evaluate", + description="Evaluates the performance and accuracy of the workflow on a dataset", + ) + endpoints: list[Endpoint] = Field( default_factory=list, description=( diff --git a/src/aiq/front_ends/fastapi/fastapi_front_end_plugin_worker.py b/src/aiq/front_ends/fastapi/fastapi_front_end_plugin_worker.py index fd3f916e9..2276f9f88 100644 --- a/src/aiq/front_ends/fastapi/fastapi_front_end_plugin_worker.py +++ b/src/aiq/front_ends/fastapi/fastapi_front_end_plugin_worker.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import os import typing @@ -20,10 +21,13 @@ from abc import abstractmethod from contextlib import asynccontextmanager from functools import partial +from pathlib import Path +from fastapi import BackgroundTasks from fastapi import Body from fastapi import FastAPI from fastapi import Response +from fastapi.exceptions import HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -34,7 +38,15 @@ from aiq.data_models.api_server import AIQChatResponseChunk from aiq.data_models.api_server import AIQResponseIntermediateStep from aiq.data_models.config import AIQConfig +from aiq.eval.config import EvaluationRunOutput +from aiq.eval.evaluate import EvaluationRun +from aiq.eval.evaluate import EvaluationRunConfig +from aiq.front_ends.fastapi.fastapi_front_end_config import AIQEvaluateRequest +from aiq.front_ends.fastapi.fastapi_front_end_config import AIQEvaluateResponse +from aiq.front_ends.fastapi.fastapi_front_end_config import AIQEvaluateStatusResponse from aiq.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig +from aiq.front_ends.fastapi.job_store import JobInfo +from aiq.front_ends.fastapi.job_store import JobStore from aiq.front_ends.fastapi.response_helpers import generate_single_response from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_as_str from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_raw_as_str @@ -78,6 +90,12 @@ async def lifespan(starting_app: FastAPI): yield + # If a cleanup task is running, cancel it + cleanup_task = getattr(starting_app.state, "cleanup_task", None) + if cleanup_task: + logger.info("Cancelling cleanup task") + cleanup_task.cancel() + logger.debug("Closing AgentIQ server from process %s", os.getpid()) aiq_app = FastAPI(lifespan=lifespan) @@ -148,6 +166,7 @@ async def configure(self, app: FastAPI, builder: WorkflowBuilder): async def add_routes(self, app: FastAPI, builder: WorkflowBuilder): await self.add_default_route(app, AIQSessionManager(builder.build())) + await self.add_evaluate_route(app, AIQSessionManager(builder.build())) for ep in self.front_end_config.endpoints: @@ -159,6 +178,167 @@ async def add_default_route(self, app: FastAPI, session_manager: AIQSessionManag await self.add_route(app, self.front_end_config.workflow, session_manager) + async def add_evaluate_route(self, app: FastAPI, session_manager: AIQSessionManager): + """Add the evaluate endpoint to the FastAPI app.""" + + response_500 = { + "description": "Internal Server Error", + "content": { + "application/json": { + "example": { + "detail": "Internal server error occurred" + } + } + }, + } + + # Create job store for tracking evaluation jobs + job_store = JobStore() + # Don't run multiple evaluations at the same time + evaluation_lock = asyncio.Lock() + + async def periodic_cleanup(job_store: JobStore): + while True: + try: + job_store.cleanup_expired_jobs() + logger.debug("Expired jobs cleaned up") + except Exception as e: + logger.error("Error during job cleanup: %s", str(e)) + await asyncio.sleep(300) # every 5 minutes + + def create_cleanup_task(): + # Schedule periodic cleanup of expired jobs on first job creation + if not hasattr(app.state, "cleanup_task"): + logger.info("Starting periodic cleanup task") + app.state.cleanup_task = asyncio.create_task(periodic_cleanup(job_store)) + + async def run_evaluation(job_id: str, config_file: str, reps: int, session_manager: AIQSessionManager): + """Background task to run the evaluation.""" + async with evaluation_lock: + try: + # Create EvaluationRunConfig using the CLI defaults + eval_config = EvaluationRunConfig(config_file=Path(config_file), dataset=None, reps=reps) + + # Create a new EvaluationRun with the evaluation-specific config + job_store.update_status(job_id, "running") + eval_runner = EvaluationRun(eval_config) + output: EvaluationRunOutput = await eval_runner.run_and_evaluate(session_manager=session_manager, + job_id=job_id) + if output.workflow_interrupted: + job_store.update_status(job_id, "interrupted") + else: + parent_dir = os.path.dirname( + output.workflow_output_file) if output.workflow_output_file else None + + job_store.update_status(job_id, "success", output_path=str(parent_dir)) + except Exception as e: + logger.error("Error in evaluation job %s: %s", job_id, str(e)) + job_store.update_status(job_id, "failure", error=str(e)) + + async def start_evaluation(request: AIQEvaluateRequest, background_tasks: BackgroundTasks): + """Handle evaluation requests.""" + # if job_id is present and already exists return the job info + if request.job_id: + job = job_store.get_job(request.job_id) + if job: + return AIQEvaluateResponse(job_id=job.job_id, status=job.status) + + job_id = job_store.create_job(request.config_file, request.job_id, request.expiry_seconds) + create_cleanup_task() + background_tasks.add_task(run_evaluation, job_id, request.config_file, request.reps, session_manager) + return AIQEvaluateResponse(job_id=job_id, status="submitted") + + def translate_job_to_response(job: JobInfo) -> AIQEvaluateStatusResponse: + """Translate a JobInfo object to an AIQEvaluateStatusResponse.""" + return AIQEvaluateStatusResponse(job_id=job.job_id, + status=job.status, + config_file=str(job.config_file), + error=job.error, + output_path=str(job.output_path), + created_at=job.created_at, + updated_at=job.updated_at, + expires_at=job_store.get_expires_at(job)) + + def get_job_status(job_id: str) -> AIQEvaluateStatusResponse: + """Get the status of an evaluation job.""" + logger.info("Getting status for job %s", job_id) + job = job_store.get_job(job_id) + if not job: + logger.warning("Job %s not found", job_id) + raise HTTPException(status_code=404, detail=f"Job {job_id} not found") + logger.info(f"Found job {job_id} with status {job.status}") + return translate_job_to_response(job) + + def get_last_job_status() -> AIQEvaluateStatusResponse: + """Get the status of the last created evaluation job.""" + logger.info("Getting last job status") + job = job_store.get_last_job() + if not job: + logger.warning("No jobs found when requesting last job status") + raise HTTPException(status_code=404, detail="No jobs found") + logger.info("Found last job %s with status %s", job.job_id, job.status) + return translate_job_to_response(job) + + def get_jobs(status: str | None = None) -> list[AIQEvaluateStatusResponse]: + """Get all jobs, optionally filtered by status.""" + if status is None: + logger.info("Getting all jobs") + jobs = job_store.get_all_jobs() + else: + logger.info("Getting jobs with status %s", status) + jobs = job_store.get_jobs_by_status(status) + logger.info("Found %d jobs", len(jobs)) + return [translate_job_to_response(job) for job in jobs] + + if self.front_end_config.evaluate.path: + # Add last job endpoint first (most specific) + app.add_api_route( + path=f"{self.front_end_config.evaluate.path}/job/last", + endpoint=get_last_job_status, + methods=["GET"], + response_model=AIQEvaluateStatusResponse, + description="Get the status of the last created evaluation job", + responses={ + 404: { + "description": "No jobs found" + }, 500: response_500 + }, + ) + + # Add specific job endpoint (least specific) + app.add_api_route( + path=f"{self.front_end_config.evaluate.path}/job/{{job_id}}", + endpoint=get_job_status, + methods=["GET"], + response_model=AIQEvaluateStatusResponse, + description="Get the status of an evaluation job", + responses={ + 404: { + "description": "Job not found" + }, 500: response_500 + }, + ) + + # Add jobs endpoint with optional status query parameter + app.add_api_route( + path=f"{self.front_end_config.evaluate.path}/jobs", + endpoint=get_jobs, + methods=["GET"], + response_model=list[AIQEvaluateStatusResponse], + description="Get all jobs, optionally filtered by status", + responses={500: response_500}, + ) + + # Add HTTP endpoint for evaluation + app.add_api_route( + path=self.front_end_config.evaluate.path, + endpoint=start_evaluation, + methods=[self.front_end_config.evaluate.method], + response_model=AIQEvaluateResponse, + description=self.front_end_config.evaluate.description, + responses={500: response_500}, + ) + async def add_route(self, app: FastAPI, endpoint: FastApiFrontEndConfig.EndpointBase, diff --git a/src/aiq/front_ends/fastapi/job_store.py b/src/aiq/front_ends/fastapi/job_store.py new file mode 100644 index 000000000..0a2198e55 --- /dev/null +++ b/src/aiq/front_ends/fastapi/job_store.py @@ -0,0 +1,161 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import shutil +from datetime import UTC +from datetime import datetime +from datetime import timedelta +from enum import Enum +from uuid import uuid4 + +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + + +class JobStatus(str, Enum): + SUBMITTED = "submitted" + RUNNING = "running" + SUCCESS = "success" + FAILURE = "failure" + INTERRUPTED = "interrupted" + NOT_FOUND = "not_found" + + +# pydantic model for the job status +class JobInfo(BaseModel): + job_id: str + status: JobStatus + config_file: str + error: str | None + output_path: str | None + created_at: datetime + updated_at: datetime + expiry_seconds: int + + +class JobStore: + + MIN_EXPIRY = 600 # 10 minutes + MAX_EXPIRY = 86400 # 24 hours + DEFAULT_EXPIRY = 3600 # 1 hour + + # active jobs are exempt from expiry + ACTIVE_STATUS = {"running", "submitted"} + + def __init__(self): + self._jobs = {} + + def create_job(self, config_file: str, job_id: str | None = None, expiry_seconds: int = DEFAULT_EXPIRY) -> str: + if job_id is None: + job_id = str(uuid4()) + + clamped_expiry = max(self.MIN_EXPIRY, min(expiry_seconds, self.MAX_EXPIRY)) + if expiry_seconds != clamped_expiry: + logger.info("Clamped expiry_seconds from %d to %d for job %s", expiry_seconds, clamped_expiry, job_id) + + job = JobInfo(job_id=job_id, + status=JobStatus.SUBMITTED, + config_file=config_file, + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + error=None, + output_path=None, + expiry_seconds=clamped_expiry) + self._jobs[job_id] = job + logger.info("Created new job %s with config %s", job_id, config_file) + return job_id + + def update_status(self, job_id: str, status: str, error: str | None = None, output_path: str | None = None): + if job_id not in self._jobs: + raise ValueError(f"Job {job_id} not found") + + job = self._jobs[job_id] + job.status = status + job.error = error + job.output_path = output_path + job.updated_at = datetime.now(UTC) + + def get_status(self, job_id: str) -> JobInfo | None: + return self._jobs.get(job_id) + + def list_jobs(self): + return self._jobs + + def get_job(self, job_id: str) -> JobInfo | None: + """Get a job by its ID.""" + return self._jobs.get(job_id) + + def get_last_job(self) -> JobInfo | None: + """Get the last created job.""" + if not self._jobs: + logger.info("No jobs found in job store") + return None + last_job = max(self._jobs.values(), key=lambda job: job.created_at) + logger.info("Retrieved last job %s created at %s", last_job.job_id, last_job.created_at) + return last_job + + def get_jobs_by_status(self, status: str) -> list[JobInfo]: + """Get all jobs with the specified status.""" + return [job for job in self._jobs.values() if job.status == status] + + def get_all_jobs(self) -> list[JobInfo]: + """Get all jobs in the store.""" + return list(self._jobs.values()) + + def get_expires_at(self, job: JobInfo) -> datetime | None: + """Get the time for a job to expire.""" + if job.status in self.ACTIVE_STATUS: + return None + return job.updated_at + timedelta(seconds=job.expiry_seconds) + + def cleanup_expired_jobs(self): + """ + Cleanup expired jobs, keeping the most recent one. + Updated_at is used instead of created_at to determine the most recent job. + This is because jobs may not be processed in the order they are created. + """ + now = datetime.now(UTC) + + # Filter out active jobs + finished_jobs = {job_id: job for job_id, job in self._jobs.items() if job.status not in self.ACTIVE_STATUS} + + # Sort finished jobs by updated_at descending + sorted_finished = sorted(finished_jobs.items(), key=lambda item: item[1].updated_at, reverse=True) + + # Always keep the most recent finished job + jobs_to_check = sorted_finished[1:] + + expired_ids = [] + for job_id, job in jobs_to_check: + expires_at = self.get_expires_at(job) + if expires_at and now > expires_at: + expired_ids.append(job_id) + # cleanup output dir if present + if job.output_path: + logger.info("Cleaning up output directory for job %s at %s", job_id, job.output_path) + # If it is a file remove it + if os.path.isfile(job.output_path): + os.remove(job.output_path) + # If it is a directory remove it + elif os.path.isdir(job.output_path): + shutil.rmtree(job.output_path) + + for job_id in expired_ids: + # cleanup output dir if present + + del self._jobs[job_id] diff --git a/tests/aiq/eval/utils/test_output_uploader.py b/tests/aiq/eval/utils/test_output_uploader.py index 8a7eab302..25847629a 100644 --- a/tests/aiq/eval/utils/test_output_uploader.py +++ b/tests/aiq/eval/utils/test_output_uploader.py @@ -102,6 +102,8 @@ def test_run_custom_scripts_success(tmp_path): expected_args = [ mock.ANY, # interpreter path str(script), + "--output_dir", + str(tmp_path), "--iam", "ai" ] diff --git a/tests/aiq/front_ends/fastapi/test_evaluate_endpoints.py b/tests/aiq/front_ends/fastapi/test_evaluate_endpoints.py new file mode 100644 index 000000000..b744fcec0 --- /dev/null +++ b/tests/aiq/front_ends/fastapi/test_evaluate_endpoints.py @@ -0,0 +1,178 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from unittest.mock import AsyncMock +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from aiq.data_models.config import AIQConfig +from aiq.front_ends.fastapi.fastapi_front_end_config import FastApiFrontEndConfig +from aiq.front_ends.fastapi.fastapi_front_end_plugin_worker import FastApiFrontEndPluginWorker + + +@pytest.fixture +def test_config(): + config = AIQConfig() + config.general.front_end = FastApiFrontEndConfig(evaluate=FastApiFrontEndConfig.EndpointBase( + path="/evaluate", method="POST", description="Test evaluate endpoint")) + return config + + +@pytest.fixture(autouse=True) +def patch_evaluation_run(): + with patch("aiq.front_ends.fastapi.fastapi_front_end_plugin_worker.EvaluationRun") as MockEvaluationRun: + mock_eval_instance = MagicMock() + mock_eval_instance.run_and_evaluate = AsyncMock( + return_value=MagicMock(workflow_interrupted=False, workflow_output_file="/fake/output/path.json")) + MockEvaluationRun.return_value = mock_eval_instance + yield MockEvaluationRun + + +@pytest.fixture +def test_client(test_config): + worker = FastApiFrontEndPluginWorker(test_config) + app = FastAPI() + worker.set_cors_config(app) + + with patch("aiq.front_ends.fastapi.fastapi_front_end_plugin_worker.AIQSessionManager") as MockSessionManager: + + # Mock session manager + mock_session = MagicMock() + MockSessionManager.return_value = mock_session + + async def setup(): + await worker.add_evaluate_route(app, session_manager=mock_session) + + asyncio.run(setup()) + + return TestClient(app) + + +def create_job(test_client, config_file="/path/to/config.yml"): + """Helper to create an evaluation job.""" + return test_client.post("/evaluate", json={"config_file": config_file}) + + +def test_create_job(test_client): + """Test creating a new evaluation job.""" + response = create_job(test_client) + assert response.status_code == 200 + data = response.json() + assert "job_id" in data + assert data["status"] == "submitted" + + +def test_get_job_status(test_client): + """Test getting the status of a specific job.""" + create_response = create_job(test_client) + job_id = create_response.json()["job_id"] + + status_response = test_client.get(f"/evaluate/job/{job_id}") + assert status_response.status_code == 200 + data = status_response.json() + assert data["job_id"] == job_id + assert data["status"] == "success" + assert data["config_file"] == "/path/to/config.yml" + + +def test_get_job_status_not_found(test_client): + """Test getting status of a non-existent job.""" + response = test_client.get("/evaluate/job/non-existent-id") + assert response.status_code == 404 + assert response.json()["detail"] == "Job non-existent-id not found" + + +def test_get_last_job(test_client): + """Test getting the last created job.""" + for i in range(3): + create_job(test_client, config_file=f"/path/to/config_{i}.yml") + + response = test_client.get("/evaluate/job/last") + assert response.status_code == 200 + data = response.json() + assert data["config_file"] == "/path/to/config_2.yml" + + +def test_get_last_job_not_found(test_client): + """Test getting last job when no jobs exist.""" + response = test_client.get("/evaluate/job/last") + assert response.status_code == 404 + assert response.json()["detail"] == "No jobs found" + + +def test_get_all_jobs(test_client): + """Test retrieving all jobs.""" + for i in range(3): + create_job(test_client, config_file=f"/path/to/config_{i}.yml") + + response = test_client.get("/evaluate/jobs") + assert response.status_code == 200 + data = response.json() + assert len(data) == 3 + + +@pytest.mark.parametrize("status,expected_count", [ + ("success", 3), + ("interrupted", 0), +]) +def test_get_jobs_by_status(test_client, status, expected_count): + """Test getting jobs filtered by status.""" + for i in range(3): + create_job(test_client, config_file=f"/path/to/config_{i}.yml") + + response = test_client.get(f"/evaluate/jobs?status={status}") + assert response.status_code == 200 + data = response.json() + assert len(data) == expected_count + if status == "submitted": + assert all(job["status"] == "submitted" for job in data) + + +def test_create_job_with_reps(test_client): + """Test creating a new evaluation job with custom repetitions.""" + response = test_client.post("/evaluate", json={"config_file": "/path/to/config.yml", "reps": 3}) + assert response.status_code == 200 + data = response.json() + assert "job_id" in data + assert data["status"] == "submitted" + + +def test_create_job_with_expiry(test_client): + """Test creating a new evaluation job with custom expiry time.""" + response = test_client.post( + "/evaluate", + json={ + "config_file": "/path/to/config.yml", + "expiry_seconds": 1800 # 30 minutes + }) + assert response.status_code == 200 + data = response.json() + assert "job_id" in data + assert data["status"] == "submitted" + + +def test_create_job_with_job_id(test_client): + """Test creating a new evaluation job with a specific job ID.""" + job_id = "test-job-123" + response = test_client.post("/evaluate", json={"config_file": "/path/to/config.yml", "job_id": job_id}) + assert response.status_code == 200 + data = response.json() + assert data["job_id"] == job_id + assert data["status"] == "submitted" diff --git a/uv.lock b/uv.lock index 1f6ff52fd..ea854ff7a 100644 --- a/uv.lock +++ b/uv.lock @@ -123,6 +123,7 @@ docs = [ { name = "sphinx" }, { name = "sphinx-autoapi" }, { name = "sphinx-copybutton" }, + { name = "sphinx-mermaid" }, { name = "vale" }, ] @@ -200,6 +201,7 @@ docs = [ { name = "sphinx", specifier = "~=8.2" }, { name = "sphinx-autoapi", specifier = ">=3.6" }, { name = "sphinx-copybutton", specifier = ">=0.5" }, + { name = "sphinx-mermaid" }, { name = "vale", specifier = "==3.9.5" }, ] @@ -5227,6 +5229,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9e/48/1ea60e74949eecb12cdd6ac43987f9fd331156388dcc2319b45e2ebb81bf/sphinx_copybutton-0.5.2-py3-none-any.whl", hash = "sha256:fb543fd386d917746c9a2c50360c7905b605726b9355cd26e9974857afeae06e", size = 13343 }, ] +[[package]] +name = "sphinx-mermaid" +version = "0.0.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "sphinx" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/65/96cb3a4117ea2a4ead808377259659885ea0fe5e539a9f29fc1c8a723ed1/sphinx_mermaid-0.0.8-py2.py3-none-any.whl", hash = "sha256:03cbad30c04130e5644c5112b4b2da7850d142f897876ac5aea83c8b5965bf76", size = 3336 }, +] + [[package]] name = "sphinxcontrib-applehelp" version = "2.0.0"