diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/Dockerfile.api b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/Dockerfile.api new file mode 100644 index 00000000..875cf2af --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/Dockerfile.api @@ -0,0 +1,12 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY requirements_api.txt . +RUN pip install --no-cache-dir -r requirements_api.txt + +COPY ./src ./src +ENV PYTHONPATH="/app:${PYTHONPATH}" + +EXPOSE 8080 +CMD ["uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/Dockerfile.streamlit b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/Dockerfile.streamlit new file mode 100644 index 00000000..28b58530 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/Dockerfile.streamlit @@ -0,0 +1,22 @@ +FROM python:3.9-slim + +WORKDIR /app + +# Install required packages +COPY requirements_streamlit.txt . +RUN pip install --no-cache-dir -r requirements_streamlit.txt + +# Copy your source files +COPY ./src ./src + +# Add /app to PYTHONPATH so that 'src' is discoverable +ENV PYTHONPATH="/app:${PYTHONPATH}" + +# Set a default port; Cloud Run will override this PORT environment variable as needed +ENV PORT 8080 + +# Not strictly required by Cloud Run, but does no harm +EXPOSE 8080 + +# Use the environment variable $PORT provided by Cloud Run +CMD ["sh", "-c", "streamlit run src/frontend/streamlit_app.py --server.port=$PORT --server.address=0.0.0.0"] diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/README.md b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/README.md new file mode 100644 index 00000000..75defacf --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/README.md @@ -0,0 +1,120 @@ +# Agentic Accessibility Checker Project + +This project showcases an end-to-end system that leverages Google's cutting-edge AI tools to create an "agentic" solution for real estate accessibility analysis. + +It includes synthetic data generation, a cloud-based API, a sophisticated LangGraph agent powered by Gemini and deployed using Vertex AI Reasoning Engine, and a user-friendly streamlit interface. The project's core is its demonstration of the power of the Vertex AI Reasoning Engine and its seamless integration with LangGraph, highlighting how these technologies can build powerful, autonomous agents. + +## Key Features + +* **Synthetic Data Generation:** + * Utilizes Gemini and Imagen3's generative capabilities to produce realistic, multi-modal real estate data, including property titles, descriptions, and room images. + * Generates data for both accessible (wheelchair-friendly) and standard properties, allowing the agent to handle a varied environment. + * Focuses on capturing realistic nuances in the descriptions that the agent can then use to understand accessibility. + * Stores images in Google Cloud Storage (GCS) and metadata in Firestore, creating a structured dataset for agent interactions. +* **Cloud Run API - A Simulated Environment for the Agent:** + * Deploys a FastAPI-based API on Cloud Run, acting as a "real world" system for the agent to interact with. + * Provides endpoints for the agent to retrieve property details, enhancing its capability to gather information and make decisions. +* **LangGraph Agent Powered by Gemini and the Reasoning Engine:** + * Employs Gemini-1.5 Pro as the agent's "brain", facilitating reasoning and complex decision-making capabilities. + * Implements a ReAct-style agent, enabling the agent to take actions and leverage tools based on the current context. + * Uses tools for fetching data from the API, analyzing text and images, and even drafting emails—creating a fully agentic workflow. + * Demonstrates the power of the Vertex AI Reasoning Engine for managed and scalable agent deployments. +* **Streamlit Application - Agent Interaction Layer:** + * Offers a user-friendly interface built with Streamlit for seamless interaction with the agent. + * Provides both a property viewing and a chat interface, where users can engage with the agent for accessibility inquiries. + * Users can experience the agent's reasoning capabilities firsthand, showcasing how it makes decisions and interacts with the system. +* **Vertex AI Reasoning Engine - Seamless LangGraph Deployment:** + * Demonstrates the capability of the Vertex AI Reasoning Engine for deploying complex LangGraph agents in a scalable and managed way. + * The agent is available as a cloud resource that can be used by external client applications and other Vertex AI services. +* **Gemini Multi-Modal Capabilities - Enhanced Agent Perception:** + * Leverages Gemini's ability to process both text and images, providing a robust perception layer for the agent. + * The agent can analyze both property descriptions and images for accessibility features and issues, showcasing its ability to leverage multi-modal data. + +## Getting Started + +Follow these steps to set up and run the project: + +### 1. Install Dependencies +```bash +pip install -r requirements.txt +``` +This command will install all necessary libraries specified in `requirements.txt` + +### 2. Generate Synthetic Data +```bash +python -m src.main +``` +This command will run the `src/main.py` script, which uses `src/data_generation/house_dataset_generator.py` to create your multi-modal real estate dataset. + +### 3. Deploy the API +```bash +./deploy_api.sh +``` +This script builds and deploys the FastAPI application (`src/api/app.py`) to Google Cloud Run. The API provides the agent with a means to retrieve property details. + +### 4. Build and Deploy the Agent using the Reasoning Engine +```bash +python -m src.agent.agent +``` +This command runs `src/agent/agent.py`, which creates and deploys the agent as a Vertex AI Reasoning Engine. This demonstrates how easily complex agents can be managed using the Reasoning Engine. + +**Important:** After deployment, you need to edit `src/config.py` and set the correct `AGENT_NAME` with the name of the deployed Reasoning Engine. The engine name will be printed to the console upon successful execution of the previous step. The `AGENT_NAME` should be in the format: `projects//locations//reasoningEngines/`. + +### 5. Deploy the Streamlit Application +```bash +./deploy_streamlit.sh +``` +This script builds and deploys the Streamlit application (`src/frontend/streamlit_app.py`) to Google Cloud Run, allowing users to interact with the deployed agent. + +## Configuration + +Key configuration settings can be found in `src/config.py`: + +* `PROJECT_ID`: Your Google Cloud project ID. +* `LOCATION`: Your Google Cloud project's region (e.g. us-central1). +* `GCS_BUCKET`: The name of your Google Cloud Storage bucket. +* `FIRESTORE_COLLECTION`: The Firestore collection name for storing the property data. +* `API_URL`: The URL for your deployed Cloud Run API. +* `STREAMLIT_URL`: The URL for your deployed Streamlit application. +* `STAGING_BUCKET`: The Google Cloud Storage bucket used for staging agent resources. +* `AGENT_NAME`: The resource name of the deployed Vertex AI Reasoning Engine. This is crucial for connecting the agent to other services. + +Adjust these values to match your specific Google Cloud environment. + +## Project Structure + +``` +project/ + requirements.txt # main requirements file + requirements_api.txt # requirements for the API service + Dockerfile.streamlit # dockerfile for the streamlit service + deploy_api.sh # script to deploy the API + Dockerfile.api # dockerfile for the API service + README.md # this file + .dockerignore # files to ignore during docker builds + requirements_streamlit.txt # requirements for the streamlit service + deploy_streamlit.sh # script to deploy streamlit service + src/ + config.py # Configuration settings + retry.py # Retry logic for API calls + main.py # Main script to generate the dataset + frontend/ + streamlit_app.py # Streamlit application + agent/ + agent.py # LangGraph agent using Vertex AI and Gemini + api/ + app.py # FastAPI application + data_generation/ + house_dataset_generator.py # Generator for synthetic data +``` + +## Notes + +* Ensure the Google Cloud SDK (gcloud) is installed, configured, and authenticated to your Google Cloud project. +* Make sure the required Google Cloud APIs are enabled, including Cloud Run, Cloud Storage, Firestore, and Vertex AI. +* The Agent Engine resource name is in the format `projects//locations//reasoningEngines/`. This is required to configure the agent properly. + +## Contributing + +This project is intended for demonstration and learning. Feel free to fork, contribute, and submit pull requests to make it even better! +This project demonstrates an "agentic" approach to accessibility analysis and how modern AI tools can be used to build practical applications. \ No newline at end of file diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/deploy_api.sh b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/deploy_api.sh new file mode 100755 index 00000000..a317b60c --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/deploy_api.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Set your GCP project and location +PROJECT_ID="" +LOCATION="" + +# 2. Build and deploy the API image/service +# Temporarily rename Dockerfile.api to Dockerfile +mv Dockerfile.api Dockerfile + +# Build and push the image +gcloud builds submit --tag gcr.io/${PROJECT_ID}/api-image:latest . + +# Restore the original Dockerfile name +mv Dockerfile Dockerfile.api + +# Deploy the API service to Cloud Run +gcloud run deploy real-estate-api \ + --image gcr.io/${PROJECT_ID}/api-image:latest \ + --platform managed \ + --region ${LOCATION} \ + --allow-unauthenticated diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/deploy_streamlit.sh b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/deploy_streamlit.sh new file mode 100755 index 00000000..c4b1a83f --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/deploy_streamlit.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Set your GCP project and location +PROJECT_ID="" +LOCATION="" + +# 1. Build and deploy the Streamlit image/service +# Temporarily rename Dockerfile.streamlit to Dockerfile +mv Dockerfile.streamlit Dockerfile + +# Build and push the image +gcloud builds submit --tag gcr.io/${PROJECT_ID}/streamlit-image:latest . + +# Restore the original Dockerfile name +mv Dockerfile Dockerfile.streamlit + +# Deploy the Streamlit service to Cloud Run +gcloud run deploy real-estate-streamlit \ + --image gcr.io/${PROJECT_ID}/streamlit-image:latest \ + --platform managed \ + --region ${LOCATION} \ + --allow-unauthenticated \ No newline at end of file diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements.txt b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements.txt new file mode 100644 index 00000000..023ad661 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements.txt @@ -0,0 +1,13 @@ +fastapi==0.115.6 +uvicorn==0.32.1 +google-cloud-storage==2.19.0 +google-cloud-firestore==2.19.0 +python-multipart==0.0.19 +streamlit==1.41.0 +google-cloud-aiplatform[langchain,reasoningengine]==1.74.0 +cloudpickle==3.0.0 +pydantic==2.7.4 +langgraph==0.2.58 +langchain-google-vertexai==2.0.7 +requests==2.32.3 +pillow==11.0.0 diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements_api.txt b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements_api.txt new file mode 100644 index 00000000..becf2be0 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements_api.txt @@ -0,0 +1,4 @@ +fastapi==0.115.6 +uvicorn==0.32.1 +google-cloud-firestore==2.19.0 +pydantic==2.7.4 diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements_streamlit.txt b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements_streamlit.txt new file mode 100644 index 00000000..89e7f834 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/requirements_streamlit.txt @@ -0,0 +1,4 @@ +streamlit==1.41.0 +google-cloud-aiplatform[reasoningengine]==1.74.0 +pillow==11.0.0 +google-cloud-storage==2.19.0 diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/agent/agent.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/agent/agent.py new file mode 100644 index 00000000..e2bdd099 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/agent/agent.py @@ -0,0 +1,455 @@ +import json +from typing import Dict, List, Optional + +import requests +import vertexai +from pydantic import BaseModel, Field + +from langchain_google_vertexai import ChatVertexAI +from langchain_core.messages import HumanMessage +from langchain_core.tools import tool +from langgraph.prebuilt import create_react_agent +from vertexai.preview import reasoning_engines + +from src.config import PROJECT_ID, LOCATION, API_URL, STAGING_BUCKET + +vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket="gs://"+STAGING_BUCKET) + +model = ChatVertexAI(model="gemini-1.5-pro-002") + +class Feature(BaseModel): + feature: str = Field(description="Name of the accessibility feature") + feature_score: float = Field(description="Score from 0-100") + confidence: float = Field(description="Confidence level from 0-100") + +class Issue(BaseModel): + issue: str = Field(description="Description of accessibility issue") + severity_score: float = Field(description="Severity score from 0-100") + confidence: float = Field(description="Confidence level from 0-100") + +class ImageAnalysis(BaseModel): + features: List[Feature] + issues: List[Issue] + +class DescriptionAnalysis(BaseModel): + features: List[Feature] + issues: List[Issue] + +class AccessibilityReport(BaseModel): + global_accessibility_score: float + description_analysis_summary: str + image_analysis_summary: str + +class EmailDraft(BaseModel): + subject: str + body: str + + +def get_property_data(property_id: str) -> Dict: + """ + Retrieve comprehensive property data from a remote API, given a unique property identifier. + + This tool makes a GET request to a backend API endpoint using the provided `property_id`. + Upon successful retrieval, it returns a dictionary containing detailed property information. + This includes the property's title, a description of it, a list of image urls, and + whether it is accessible or not. If the remote API request fails (for example, due to the property ID + not existing or network issues), an HTTP error is raised. + + Args: + property_id (str): The unique ID of the property to retrieve from the API. + + Returns: + Dict: A dictionary containing the full set of property data + """ + url = f"{API_URL}/houses/{property_id}" + resp = requests.get(url) + resp.raise_for_status() + return resp.json() + + +def analyze_description(description: str) -> DescriptionAnalysis: + """ + Analyze the accessibility aspects of a property description text, focusing on wheelchair-related features. + + This tool uses a language model to process a given textual description and identify key features + that support wheelchair accessibility, as well as potential issues or barriers. The analysis + returns structured data following the `DescriptionAnalysis` schema, which includes: + - A list of recognized wheelchair-accessibility features, each with a name, a confidence score, and + a feature score. + - A list of identified issues or accessibility barriers, each with a descriptive name, a severity score, + and a confidence level. + + Scores range from 0-100, and confidence indicates how certain the model is about the identified feature or issue. + + Args: + description (str): The property’s textual description, including details about its layout, + amenities, and environment. + + Returns: + DescriptionAnalysis: A pydantic model instance containing the analysis results with + `features` and `issues` lists. + """ + text_message = { + "type": "text", + "text": f"""Analyze this property description for wheelchair accessibility: + {description} + + Return the results in JSON strictly following the `DescriptionAnalysis` schema: + {{ + "features": [ + {{ + "feature": "string", + "feature_score": float, + "confidence": float + }} + ], + "issues": [ + {{ + "issue": "string", + "severity_score": float, + "confidence": float + }} + ] + }} + """ + } + response = model.with_structured_output(DescriptionAnalysis).invoke([HumanMessage(content=[text_message])]) + return response + + +def analyze_images(images: List[str]) -> List[ImageAnalysis]: + """ + Assess a collection of images to determine the presence of wheelchair-accessible features and identify potential issues. + + For each image URL provided, this tool uses a vision-capable language model to analyze visual cues + related to wheelchair accessibility. It examines elements such as ramps, wide doorways, and other + accessible design characteristics. It also looks for potential barriers, like steps, narrow passages, + or obstructions. + + The results adhere to the `ImageAnalysis` schema and include: + - A list of detected accessibility features in the image, with feature names, scores, and confidence. + - A list of identified issues in the image, each described with a severity score and confidence level. + + Args: + images (List[str]): A list of image URLs representing the property’s visuals. + + Returns: + List[ImageAnalysis]: A list of `ImageAnalysis` model instances, one per input image, each containing + `features` and `issues` fields. + """ + analyses = [] + for image_url in images: + image_message = { + "type": "image_url", + "image_url": {"url": f"{image_url}"} + } + text_message = { + "type": "text", + "text": """Analyze this image for wheelchair accessibility features and issues. + + Return results in JSON strictly following the `ImageAnalysis` schema: + { + "features": [ + { + "feature": "string", + "feature_score": float, + "confidence": float + } + ], + "issues": [ + { + "issue": "string", + "severity_score": float, + "confidence": float + } + ] + } + """ + } + response = model.with_structured_output(ImageAnalysis).invoke([HumanMessage(content=[text_message, image_message])]) + analyses.append(response) + return analyses + + +def generate_accessibility_report(description_analysis: DescriptionAnalysis, image_analyses: List[ImageAnalysis]) -> AccessibilityReport: + """ + Compile a comprehensive accessibility report for a property by integrating both textual and visual analyses. + + This tool takes the results from the description analysis and image analyses, merging them into a single + summary that provides a global accessibility score, a summary of the textual description findings, and + a summary of the visual (image-based) findings. + + The `global_accessibility_score` offers an aggregated metric representing the property’s wheelchair + accessibility level. The `description_analysis_summary` and `image_analysis_summary` explain the rationale + behind the score by detailing key features and issues discovered in the text and visuals. + + Args: + description_analysis (DescriptionAnalysis): The result of analyzing the property's textual description. + image_analyses (List[ImageAnalysis]): The results of analyzing multiple images of the property. + + Returns: + AccessibilityReport: A pydantic model instance containing the integrated report, including a global score + and summaries of the description and image analyses. + """ + text_message = { + "type": "text", + "text": f"""Generate a comprehensive accessibility report based on these analyses: + Description Analysis: {json.dumps(description_analysis.model_dump(), indent=2)} + Image Analyses: {json.dumps([a.model_dump() for a in image_analyses], indent=2)} + + Return the results in JSON strictly following the `AccessibilityReport` schema: + {{ + "global_accessibility_score": float, + "description_analysis_summary": "string", + "image_analysis_summary": "string" + }} + """ + } + response = model.with_structured_output(AccessibilityReport).invoke([HumanMessage(content=[text_message])]) + return response + + +def draft_host_email(report: AccessibilityReport) -> Optional[EmailDraft]: + """ + Draft a professional and courteous email to the property host requesting additional accessibility details, + if the determined accessibility score meets or exceeds a certain threshold. + + This tool examines the `global_accessibility_score` from the `AccessibilityReport`. If the score is + high enough (>= 70 in this case), it constructs a polite email that asks the host for more information + to further clarify and possibly improve the wheelchair accessibility of the property. The email is + returned adhering to the `EmailDraft` schema. If the score is below the threshold, indicating that + additional details are not currently necessary, this function returns None. + + Args: + report (AccessibilityReport): The comprehensive accessibility report of the property. + + Returns: + Optional[EmailDraft]: If the global accessibility score is >= 70, returns an `EmailDraft` model + containing the email subject and body. Otherwise, returns None. + """ + if report.global_accessibility_score >= 70: + text_message = { + "type": "text", + "text": f"""Draft an email to the host asking for more accessibility details. + Given this report: {json.dumps(report.model_dump(), indent=2)} + + Return the results in JSON strictly following the `EmailDraft` schema: + {{ + "subject": "string", + "body": "string" + }} + """ + } + response = model.with_structured_output(EmailDraft).invoke([HumanMessage(content=[text_message])]) + return response + else: + return None + +@tool +def get_property(property_id: str) -> Dict: + """ + Retrieve comprehensive property data from a remote API, given a unique property identifier. + + This tool makes a GET request to a backend API endpoint using the provided `property_id`. + Upon successful retrieval, it returns a dictionary containing detailed property information. + This includes the property's title, a description of it, a list of image urls, and + whether it is accessible or not. If the remote API request fails (for example, due to the property ID + not existing or network issues), an HTTP error is raised. + + Args: + property_id (str): The unique ID of the property to retrieve from the API. + + Returns: + Dict: A dictionary containing the full set of property data + """ + return get_property_data(property_id) + +@tool +def analyze_description_tool(description: str): + """ + Analyze the accessibility aspects of a property description text, focusing on wheelchair-related features. + + This tool uses a language model to process a given textual description and identify key features + that support wheelchair accessibility, as well as potential issues or barriers. The analysis + returns structured data following the `DescriptionAnalysis` schema, which includes: + - A list of recognized wheelchair-accessibility features, each with a name, a confidence score, and + a feature score. + - A list of identified issues or accessibility barriers, each with a descriptive name, a severity score, + and a confidence level. + + Scores range from 0-100, and confidence indicates how certain the model is about the identified feature or issue. + + Args: + description (str): The property’s textual description, including details about its layout, + amenities, and environment. + + Returns: + DescriptionAnalysis: A pydantic model instance containing the analysis results with + `features` and `issues` lists. + """ + analysis = analyze_description(description) + return analysis.model_dump() + +@tool +def analyze_images_tool(images: List[str]): + """ + Assess a collection of images to determine the presence of wheelchair-accessible features and identify potential issues. + + For each image URL provided, this tool uses a vision-capable language model to analyze visual cues + related to wheelchair accessibility. It examines elements such as ramps, wide doorways, and other + accessible design characteristics. It also looks for potential barriers, like steps, narrow passages, + or obstructions. + + The results adhere to the `ImageAnalysis` schema and include: + - A list of detected accessibility features in the image, with feature names, scores, and confidence. + - A list of identified issues in the image, each described with a severity score and confidence level. + + Args: + images (List[str]): A list of image URLs representing the property’s visuals. + + Returns: + List[ImageAnalysis]: A list of `ImageAnalysis` model instances, one per input image, each containing + `features` and `issues` fields. + """ + analyses = analyze_images(images) + return [a.model_dump() for a in analyses] + +@tool +def generate_report_tool(description_analysis: dict, image_analyses: List[dict]): + """ + Compile a comprehensive accessibility report for a property by integrating both textual and visual analyses. + + This tool takes the results from the description analysis and image analyses, merging them into a single + summary that provides a global accessibility score, a summary of the textual description findings, and + a summary of the visual (image-based) findings. + + The `global_accessibility_score` offers an aggregated metric representing the property’s wheelchair + accessibility level. The `description_analysis_summary` and `image_analysis_summary` explain the rationale + behind the score by detailing key features and issues discovered in the text and visuals. + + Args: + description_analysis (DescriptionAnalysis): The result of analyzing the property's textual description. + image_analyses (List[ImageAnalysis]): The results of analyzing multiple images of the property. + + Returns: + AccessibilityReport: A pydantic model instance containing the integrated report, including a global score + and summaries of the description and image analyses. + """ + desc_obj = DescriptionAnalysis(**description_analysis) + img_objs = [ImageAnalysis(**ia) for ia in image_analyses] + report = generate_accessibility_report(desc_obj, img_objs) + return report.model_dump() + +@tool +def draft_email_tool(report: dict): + """ + Draft a professional and courteous email to the property host requesting additional accessibility details, + if the determined accessibility score meets or exceeds a certain threshold. + + This tool examines the `global_accessibility_score` from the `AccessibilityReport`. If the score is + high enough (>= 70 in this case), it constructs a polite email that asks the host for more information + to further clarify and possibly improve the wheelchair accessibility of the property. The email is + returned adhering to the `EmailDraft` schema. If the score is below the threshold, indicating that + additional details are not currently necessary, this function returns None. + + Args: + report (AccessibilityReport): The comprehensive accessibility report of the property. + + Returns: + Optional[EmailDraft]: If the global accessibility score is >= 70, returns an `EmailDraft` model + containing the email subject and body. Otherwise, returns None. + """ + report_obj = AccessibilityReport(**report) + email = draft_host_email(report_obj) + if email: + return email.model_dump() + else: + return {"message": "Global accessibility score below 70, not drafting email."} + +tools = [ + get_property, + analyze_description_tool, + analyze_images_tool, + generate_report_tool, + draft_email_tool, +] + +class PropertyAccessibilityApp: + def __init__(self, project: str, location: str, staging_bucket: str): + self.project_id = project + self.location = location + self.staging_bucket = staging_bucket + vertexai.init(project=self.project_id, location=self.location, staging_bucket="gs://"+self.staging_bucket) + + def set_up(self) -> None: + prompt = """ + You are an AI agent specializing in assessing wheelchair accessibility for properties. Your goal is to provide + accurate and concise answers based on a user's query by invoking the appropriate tool. You have access to the + following tools: + + 1. **get_property(property_id: str)**: Fetches detailed property information, including title, description, + images, and accessibility status, using the provided property ID. + + 2. **analyze_description_tool(description: str)**: Analyzes the property's textual description to extract accessibility + features and issues specifically related to wheelchair users. + + 3. **analyze_images_tool(images: List[str])**: Evaluates images to detect accessibility features (e.g., ramps, wide + doorways) or barriers (e.g., steps, obstructions). + + 4. **generate_report_tool(description_analysis: dict, image_analyses: List[dict])**: + Generates a comprehensive report summarizing the accessibility aspects of the property, combining text and image + analyses. (Note: This tool requires the outputs from `analyze_description` and `analyze_images` as inputs.) + + 5. **draft_email_tool(report: dict)**: Drafts a professional email to the property host requesting + additional accessibility details if the global accessibility score is high enough. (Note: This tool requires the + output from `generate_accessibility_report`.) + + Key Points: + - For every query, determine if a tool needs to be invoked. If so, use **only one** tool per query. + - The selected tool should directly address the user's request. + - Some tools depend on the outputs of other tools. For example: + - `generate_accessibility_report` needs results from `analyze_description` and `analyze_images`. + - `draft_host_email` needs the `AccessibilityReport` from `generate_accessibility_report`. + - Plan your actions accordingly and guide the user through the process if multiple steps are needed. + - If the query does not require a tool, respond directly without invoking any tools. + - Ensure clear and structured communication of results to the user. + - Make sure to output the result of the tool to the user in a pretty JSON format. + + Examples: + - Query: "Can you analyze the description for property ID 12345?" + Action: First, use `get_property_data` to fetch the description. Then, with that description in hand, use + `analyze_description` on the description. Since only one tool call is allowed per query, you might need to + guide the user step by step: + 1) On user's first request, call `get_property_data`. Make sure to show the data to the user in a pretty JSON format. + 2) On user's next request, take the returned description and call `analyze_description`. + + - Query: "What does the global accessibility report say for this property?" + Action: You would need the outputs from both `analyze_description` and `analyze_images`. If you do not have them + yet, try to find them in conversation history. Even if they are not in the right format, format them accordingly. + Otherwise instruct the user to get them first. Once both are available, call `generate_accessibility_report`. + + Stay focused and efficient in tool usage while ensuring clarity and helpfulness in responses. + """ + self.agent = create_react_agent(model, tools=tools, state_modifier=prompt) + + def query(self, conversation: List[Dict[str, str]]) -> str: + result = self.agent.invoke({"messages": conversation}) + last_msg = result["messages"][-1] + return last_msg.content + +remote_agent = reasoning_engines.ReasoningEngine.create( + PropertyAccessibilityApp(project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET), + requirements=[ + "langchain-google-vertexai", + "google-cloud-aiplatform[langchain,reasoningengine]", + "cloudpickle==3.0.0", + "pydantic==2.7.4", + "langgraph", + "requests" + ], + display_name="Property Accessibility Reasoning Engine", + description="A reasoning engine that analyzes properties for accessibility features and drafts emails to hosts.", + extra_packages=[], +) + +print(f"Created reasoning engine: {remote_agent.resource_name}") \ No newline at end of file diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/api/app.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/api/app.py new file mode 100644 index 00000000..e4aca6f0 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/api/app.py @@ -0,0 +1,72 @@ +from fastapi import FastAPI, Query, HTTPException +from google.cloud import firestore +from pydantic import BaseModel +from typing import Optional, List +from src.config import PROJECT_ID, FIRESTORE_COLLECTION + +db = firestore.Client(project=PROJECT_ID) +app = FastAPI() + +class House(BaseModel): + id: str + title: str + description: str + image_urls: List[str] + is_accessible: bool + +@app.get("/houses") +async def list_houses( + accessible: Optional[bool] = Query(None), + page: int = Query(1), + page_size: int = Query(10) +): + query = db.collection(FIRESTORE_COLLECTION) + if accessible is not None: + query = query.where('is_accessible', '==', accessible) + + start = (page - 1) * page_size + docs = query.limit(page_size).offset(start).stream() + + houses = [] + for doc in docs: + house_data = doc.to_dict() + house_data['id'] = doc.id + houses.append(House(**house_data)) + + return { + "page": page, + "page_size": page_size, + "houses": houses + } + +@app.get("/houses/{house_id}") +async def get_house(house_id: str): + doc_ref = db.collection(FIRESTORE_COLLECTION).document(house_id) + doc = doc_ref.get() + if not doc.exists: + raise HTTPException(status_code=404, detail="House not found") + house_data = doc.to_dict() + house_data['id'] = doc.id + return House(**house_data) + +@app.get("/houses/search") +async def search_houses( + query: str = Query(...), + page: int = Query(1), + page_size: int = Query(10) +): + results = [] + docs = db.collection(FIRESTORE_COLLECTION).stream() + for doc in docs: + data = doc.to_dict() + if (query.lower() in data['title'].lower() or + query.lower() in data['description'].lower()): + data['id'] = doc.id + results.append(House(**data)) + start = (page - 1) * page_size + end = start + page_size + return { + "page": page, + "page_size": page_size, + "houses": results[start:end] + } diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/config.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/config.py new file mode 100644 index 00000000..1c4ac84b --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/config.py @@ -0,0 +1,8 @@ +PROJECT_ID = "your-project-id" +LOCATION = "your-location" +GCS_BUCKET = "your-gcs-bucket" +FIRESTORE_COLLECTION = "houses" +API_URL = "https://real-estate-api-..run.app" +STREAMLIT_URL = "https://real-estate-streamlit-..run.app/" +STAGING_BUCKET = "your-staging-bucket" +AGENT_NAME = "projects//locations//reasoningEngines/" \ No newline at end of file diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/data_generation/house_dataset_generator.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/data_generation/house_dataset_generator.py new file mode 100644 index 00000000..79d1f75b --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/data_generation/house_dataset_generator.py @@ -0,0 +1,352 @@ +import os +import json +import time +import logging +from datetime import datetime +from typing import Optional, Dict, List +import shutil + +import vertexai +from vertexai.generative_models import GenerativeModel, SafetySetting +from vertexai.preview.vision_models import ImageGenerationModel +from google.cloud import storage, firestore + +from src.retry import RetryError, retry_with_exponential_backoff +from src.config import PROJECT_ID, LOCATION, GCS_BUCKET, FIRESTORE_COLLECTION + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +class HouseDatasetGenerator: + HOUSE_SCHEMA = { + "type": "OBJECT", + "properties": { + "title": {"type": "STRING"}, + "description": {"type": "STRING"}, + }, + "required": ["title", "description"] + } + + ROOM_PROMPTS_SCHEMA = { + "type": "OBJECT", + "properties": { + "prompts": { + "type": "ARRAY", + "items": {"type": "STRING"} + } + }, + "required": ["prompts"] + } + + def __init__(self, base_output_dir: str = "house_dataset", max_retries: int = 3, retry_delay: int = 2): + """ + Initialize the HouseDatasetGenerator. + + Args: + base_output_dir (str): Directory where generated data will be stored. + max_retries (int): Max number of retries for generation API calls. + retry_delay (int): Delay between retries in seconds. + """ + self.project_id = PROJECT_ID + self.location = LOCATION + self.base_output_dir = base_output_dir + self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.max_retries = max_retries + self.retry_delay = retry_delay + + vertexai.init(project=self.project_id, location=self.location) + self.generation_config = { + "max_output_tokens": 8192, + "temperature": 0.1, + "top_p": 0.95, + "response_mime_type": "application/json", + } + + self.safety_settings = [ + SafetySetting(category=SafetySetting.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold=SafetySetting.HarmBlockThreshold.OFF), + SafetySetting(category=SafetySetting.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold=SafetySetting.HarmBlockThreshold.OFF), + SafetySetting(category=SafetySetting.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold=SafetySetting.HarmBlockThreshold.OFF), + SafetySetting(category=SafetySetting.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold=SafetySetting.HarmBlockThreshold.OFF), + ] + + self.text_model = GenerativeModel("gemini-1.5-flash-002") + self.image_model = ImageGenerationModel.from_pretrained("imagen-3.0-fast-generate-001") + + os.makedirs(base_output_dir, exist_ok=True) + self.storage_client = storage.Client(project=self.project_id) + self.db = firestore.Client(project=self.project_id) + self.bucket = self._ensure_bucket_exists(GCS_BUCKET) + + def _ensure_bucket_exists(self, bucket_name: str) -> storage.Bucket: + """ + Ensure the specified GCS bucket exists, create it if not. + """ + bucket = self.storage_client.bucket(bucket_name) + if not bucket.exists(): + logger.info(f"Creating bucket: {bucket_name}") + bucket.create(location=self.location) + return bucket + + def _get_house_description_prompt(self, is_accessible: bool) -> str: + """ + Create a prompt to generate a house title and description. + + If is_accessible=True, the house is wheelchair accessible and should + mention accessibility features. Otherwise, mention that the house may + have obstacles like stairs or clutter that impede accessibility. + """ + if is_accessible: + return """ + Generate a title and a description for a wheelchair accessible vacation rental house. + The description should highlight accessibility features such as ramps, wide doorways, + and grab bars, ensuring the property is suitable for wheelchair users. + Return a JSON object with 'title' and 'description' fields. + """ + else: + return """ + Generate a title and a description for a standard (not wheelchair accessible) vacation rental house. + The description should mention obstacles such as stairs, narrow doorways, or cluttered areas + that might make it challenging for wheelchair users. Never mention too obviously that the house is + not wheelchair accessible. It should be understood from context. + Return a JSON object with 'title' and 'description' fields. + """ + + def _get_room_prompts_prompt(self, house_desc: Dict, is_accessible: bool) -> str: + """ + Create a prompt to generate image prompts for the rooms: + - Living room + - Kitchen + - Main bathroom + - Master bedroom + + If is_accessible=True, each room prompt should highlight accessible features. + If not, each room prompt should highlight barriers to accessibility. + Return a JSON with 'prompts' as a list of image prompt strings. + """ + if is_accessible: + room_condition = "Highlight accessibility features (ramps, wide doorways, grab bars)" + else: + room_condition = "Mention obstacles such as stairs, narrow doorways, or clutter" + + return f""" + The house description: + {json.dumps(house_desc, indent=2)} + + Generate detailed image prompts for the following rooms: + - Living room + - Kitchen + - Main bathroom + - Master bedroom + + {room_condition} in each prompt. + + Focus on describing the room's style, lighting, materials, and layout. + Return the results as a JSON object with a 'prompts' field containing a list of 4 prompt strings. + """ + + @retry_with_exponential_backoff + def _generate_house_description_impl(self, is_accessible: bool) -> Dict: + """ + Implementation of generating house description. + """ + config = self.generation_config.copy() + config["response_schema"] = self.HOUSE_SCHEMA + prompt = self._get_house_description_prompt(is_accessible) + + logger.info("Generating house description...") + response = self.text_model.generate_content( + prompt, + generation_config=config, + safety_settings=self.safety_settings + ) + return json.loads(response.text) + + def generate_house_description(self, is_accessible: bool) -> Optional[Dict]: + """ + Generate house description data. + + Args: + is_accessible (bool): Whether the house is wheelchair accessible. + + Returns: + Optional[Dict]: {'title': str, 'description': str} or None on failure. + """ + try: + result = self._generate_house_description_impl(is_accessible) + # Add is_accessible field programmatically + result["is_accessible"] = is_accessible + return result + except RetryError as e: + logger.error(f"Failed to generate house description: {e}") + return None + + @retry_with_exponential_backoff + def _generate_room_prompts_impl(self, house_desc: Dict, is_accessible: bool) -> List[str]: + """ + Implementation of generating room prompts. + """ + config = self.generation_config.copy() + config["response_schema"] = self.ROOM_PROMPTS_SCHEMA + prompt = self._get_room_prompts_prompt(house_desc, is_accessible) + + logger.info("Generating room prompts...") + response = self.text_model.generate_content( + prompt, + generation_config=config, + safety_settings=self.safety_settings + ) + result = json.loads(response.text) + return result["prompts"] + + def generate_room_prompts(self, house_desc: Dict, is_accessible: bool) -> Optional[List[str]]: + """ + Generate image prompts for the specified rooms. + + Args: + house_desc (Dict): The house description dictionary. + is_accessible (bool): Whether the house is accessible. + + Returns: + Optional[List[str]]: A list of prompts or None on failure. + """ + try: + return self._generate_room_prompts_impl(house_desc, is_accessible) + except RetryError as e: + logger.error(f"Failed to generate room prompts: {e}") + return None + + def generate_and_save_images(self, prompts: List[str], house_dir: str) -> List[str]: + """ + Generate images for each prompt and save them to GCS. + + Args: + prompts (List[str]): The image generation prompts. + house_dir (str): The local directory for saving images. + + Returns: + List[str]: A list of GCS URLs for the generated images. + """ + if not prompts: + logger.warning("No prompts for image generation.") + return [] + + images_dir = os.path.join(house_dir, "images") + os.makedirs(images_dir, exist_ok=True) + image_urls = [] + + for idx, prompt in enumerate(prompts): + img_name = f"room_{idx + 1}.png" + for attempt in range(self.max_retries): + try: + logger.info(f"Generating image {idx+1}, attempt {attempt+1}...") + images = self.image_model.generate_images( + prompt=prompt, + number_of_images=1, + aspect_ratio="4:3", + safety_filter_level="block_some", + ) + image_path = os.path.join(images_dir, img_name) + images[0].save(image_path) + + # Upload image + blob = self.bucket.blob(f"{house_dir}/{img_name}") + blob.upload_from_filename(image_path) + gcs_url = f"gs://{GCS_BUCKET}/{house_dir}/{img_name}" + image_urls.append(gcs_url) + + # Save prompt + prompt_path = os.path.join(images_dir, f"room_{idx + 1}_prompt.txt") + with open(prompt_path, "w") as f: + f.write(prompt) + + prompt_blob = self.bucket.blob(f"{house_dir}/room_{idx + 1}_prompt.txt") + prompt_blob.upload_from_filename(prompt_path) + break + except Exception as e: + wait_time = self.retry_delay * (2 ** attempt) + logger.error(f"Error generating image {idx+1}, attempt {attempt+1}: {e}") + if attempt < self.max_retries - 1: + logger.info(f"Retrying in {wait_time}s...") + time.sleep(wait_time) + else: + logger.error(f"Failed to generate image {idx + 1} after retries.") + return image_urls + + def save_to_firestore(self, description: Dict, image_urls: List[str]): + """ + Save the generated house data to Firestore. + + Args: + description (Dict): The house description dictionary containing title, description, and is_accessible. + image_urls (List[str]): The GCS URLs of the generated room images. + """ + logger.info("Saving house data to Firestore...") + doc_ref = self.db.collection(FIRESTORE_COLLECTION).document() + house_data = { + "title": description["title"], + "description": description["description"], + "is_accessible": description["is_accessible"], + "image_urls": image_urls + } + doc_ref.set(house_data) + + def generate_dataset(self, num_accessible: int = 2, num_standard: int = 3): + """ + Generate a dataset of houses. Accessible houses highlight accessibility features, + while standard houses highlight obstacles. + + Args: + num_accessible (int): Number of accessible houses to generate. + num_standard (int): Number of standard (not accessible) houses to generate. + """ + # Accessible houses + for i in range(num_accessible): + house_dir_name = f"accessible_house_{i + 1}" + full_house_dir = os.path.join(self.base_output_dir, house_dir_name) + os.makedirs(full_house_dir, exist_ok=True) + + logger.info(f"Generating accessible house {i+1}...") + description = self.generate_house_description(True) + if description is None: + logger.error("Skipping due to description failure.") + continue + + desc_path = os.path.join(full_house_dir, "description.json") + with open(desc_path, "w") as f: + json.dump(description, f, indent=2) + + logger.info("Generating images...") + prompts = self.generate_room_prompts(description, True) + if prompts: + image_urls = self.generate_and_save_images(prompts, full_house_dir) + self.save_to_firestore(description, image_urls) + else: + logger.error("Skipping image generation due to prompt failure.") + + # Standard (not accessible) houses + for i in range(num_standard): + house_dir_name = f"standard_house_{i + 1}" + full_house_dir = os.path.join(self.base_output_dir, house_dir_name) + os.makedirs(full_house_dir, exist_ok=True) + + logger.info(f"Generating standard (not accessible) house {i+1}...") + description = self.generate_house_description(False) + if description is None: + logger.error("Skipping due to description failure.") + continue + + desc_path = os.path.join(full_house_dir, "description.json") + with open(desc_path, "w") as f: + json.dump(description, f, indent=2) + + logger.info("Generating images...") + prompts = self.generate_room_prompts(description, False) + if prompts: + image_urls = self.generate_and_save_images(prompts, full_house_dir) + self.save_to_firestore(description, image_urls) + else: + logger.error("Skipping image generation due to prompt failure.") + + # Delete self.base_output_dir and all its files + logger.info(f"Deleting output directory: {self.base_output_dir}") + shutil.rmtree(self.base_output_dir) \ No newline at end of file diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/frontend/streamlit_app.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/frontend/streamlit_app.py new file mode 100644 index 00000000..a39d0d89 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/frontend/streamlit_app.py @@ -0,0 +1,112 @@ +import streamlit as st +from vertexai.preview import reasoning_engines +from src.config import AGENT_NAME +from pydantic import BaseModel +from typing import List +import requests +from src.config import API_URL, GCS_BUCKET +from google.cloud import storage +from io import BytesIO +from PIL import Image + +storage_client = storage.Client() +bucket = storage_client.bucket(GCS_BUCKET) + +def get_blob(gcs_url: str) -> (str, str): + """ + Extracts the bucket name and blob path from a Google Cloud Storage URI. + """ + if not gcs_url.startswith("gs://"): + raise ValueError("URL must start with gs://") + + no_gs = gcs_url[len("gs://"):] + parts = no_gs.split("/", 1) + bucket = parts[0] + blob = parts[1] if len(parts) > 1 else "" + return bucket, blob + +class House(BaseModel): + id: str + title: str + description: str + image_urls: List[str] + is_accessible: bool + +# Create or load the reasoning engine instance. +remote_agent = reasoning_engines.ReasoningEngine(AGENT_NAME) + +st.set_page_config(page_title="Accessibility Analysis Assistant", page_icon="🤖") +st.title("Accessibility Analysis Assistant") + +tab1, tab2 = st.tabs(["Property Search", "Chat"]) + +with tab1: + st.write("## Property Accessibility Check") + property_id = st.text_input("Enter Property ID:", "") + + if st.button("Fetch Property Details"): + if property_id: + # Fetch property info from the API + url = f"{API_URL}/houses/{property_id}" + resp = requests.get(url) + if resp.status_code == 200: + house_data = resp.json() + # Parse into House model + try: + house = House(**house_data) + st.write(f"**Title:** {house.title}") + st.write(f"**Accessibility:** {'Yes' if house.is_accessible else 'No'}") + st.write(f"**Description:** {house.description}") + + # Display images if available + if house.image_urls: + st.write("**Images:**") + # Create one column per image to place them side-by-side + columns = st.columns(len(house.image_urls)) + for col, img_url in zip(columns, house.image_urls): + _, blob_name = get_blob(img_url) + blob = bucket.blob(blob_name) + image_data = blob.download_as_bytes() + image = Image.open(BytesIO(image_data)) + + # Resize image to 1/4 original size + new_width = max(1, image.width // 4) + new_height = max(1, image.height // 4) + resized_image = image.resize((new_width, new_height)) + + col.image(resized_image) + else: + st.write("No images available.") + + except Exception as e: + st.error(f"Error parsing house data: {e}") + else: + st.error("Failed to fetch property details. Check the property ID or try again later.") + +with tab2: + # Displaying the conversation UI in the Chat tab + if "messages" not in st.session_state: + st.session_state["messages"] = [] + + + for msg in st.session_state["messages"]: + with st.chat_message(msg["role"]): + st.markdown(msg["content"]) + + user_input = st.chat_input("Ask about accessibility, e.g., 'Check accessibility of property '") + if user_input: + # Add user's message to session state and display it + st.session_state["messages"].append({"role": "user", "content": user_input}) + with st.chat_message("user"): + st.markdown(user_input) + + # Send the entire conversation to the reasoning engine + conversation = st.session_state["messages"] + assistant_response = remote_agent.query(conversation=conversation) + + # Add assistant's response to session state and display it + st.session_state["messages"].append({"role": "assistant", "content": assistant_response}) + with st.chat_message("assistant"): + st.markdown(assistant_response) + + diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/main.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/main.py new file mode 100644 index 00000000..3eb8a036 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/main.py @@ -0,0 +1,9 @@ +import logging +from src.data_generation.house_dataset_generator import HouseDatasetGenerator + +logging.basicConfig(level=logging.INFO) + +if __name__ == "__main__": + generator = HouseDatasetGenerator() + generator.generate_dataset(num_accessible=5, num_standard=5) + logging.info("Dataset generation completed.") \ No newline at end of file diff --git a/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/retry.py b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/retry.py new file mode 100644 index 00000000..7087c599 --- /dev/null +++ b/genai-on-vertex-ai/agents/reasoning_engine/langgraph/real-estate-accessibility-analyser/src/retry.py @@ -0,0 +1,28 @@ +import time +import logging +from typing import Callable, Any + +logger = logging.getLogger(__name__) + +class RetryError(Exception): + """Custom exception raised when retries are exhausted.""" + +def retry_with_exponential_backoff(func: Callable) -> Callable: + """ + Decorator that applies exponential backoff retry logic. + """ + def wrapper(*args, **kwargs) -> Any: + self = args[0] + last_exception = None + for attempt in range(self.max_retries): + try: + return func(*args, **kwargs) + except Exception as e: + last_exception = e + wait_time = self.retry_delay * (2 ** attempt) + logger.error(f"Attempt {attempt + 1} failed: {e}") + if attempt < self.max_retries - 1: + logger.info(f"Retrying in {wait_time} seconds...") + time.sleep(wait_time) + raise RetryError(f"Failed after {self.max_retries} attempts. Last error: {last_exception}") + return wrapper