Skip to content

Commit

Permalink
chore: drop duckdb usage and migrations (langflow-ai#3730)
Browse files Browse the repository at this point in the history
* chore: drop duckdb usage and migrations

* [autofix.ci] apply automated fixes

* Add DefaultModel and MessageResponse classes with custom JSON serialization and validation

- Introduced `DefaultModel` class with custom JSON encoders and serialization methods.
- Added `MessageResponse` class inheriting from `DefaultModel` with fields for message details and custom validators/serializers.
- Enhanced file handling and timestamp formatting in `MessageResponse`.

* Refactor: Replace `MessageModelResponse` with `MessageResponse` in monitor API

- Updated import statements to use `MessageResponse` from `langflow.schema.message`.
- Modified `/messages` endpoint to return `list[MessageResponse]` instead of `list[MessageModelResponse]`.
- Adjusted response model validation to use `MessageResponse`.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <[email protected]>
  • Loading branch information
3 people authored and diogocabral committed Nov 26, 2024
1 parent f751ae9 commit 0714240
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 751 deletions.
1 change: 0 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/backend/base/langflow/api/v1/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from sqlalchemy import delete
from sqlmodel import Session, col, select

from langflow.schema.message import MessageResponse
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
Expand All @@ -15,7 +16,6 @@
)
from langflow.services.database.models.vertex_builds.model import VertexBuildMapModel
from langflow.services.deps import get_session
from langflow.services.monitor.schema import MessageModelResponse

router = APIRouter(prefix="/monitor", tags=["Monitor"])

Expand Down Expand Up @@ -43,7 +43,7 @@ async def delete_vertex_builds(
raise HTTPException(status_code=500, detail=str(e))


@router.get("/messages", response_model=list[MessageModelResponse])
@router.get("/messages", response_model=list[MessageResponse])
async def get_messages(
flow_id: str | None = Query(None),
session_id: str | None = Query(None),
Expand All @@ -66,7 +66,7 @@ async def get_messages(
col = getattr(MessageTable, order_by).asc()
stmt = stmt.order_by(col)
messages = session.exec(stmt)
return [MessageModelResponse.model_validate(d, from_attributes=True) for d in messages]
return [MessageResponse.model_validate(d, from_attributes=True) for d in messages]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

Expand Down
68 changes: 67 additions & 1 deletion src/backend/base/langflow/schema/message.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
from collections.abc import AsyncIterator, Iterator
from datetime import datetime, timezone
from typing import Annotated, Any
Expand All @@ -11,7 +12,7 @@
from langchain_core.prompts import BaseChatPromptTemplate, ChatPromptTemplate, PromptTemplate
from langchain_core.prompts.image import ImagePromptTemplate
from loguru import logger
from pydantic import BeforeValidator, ConfigDict, Field, field_serializer, field_validator
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, field_serializer, field_validator

from langflow.base.prompts.utils import dict_values_to_string
from langflow.schema.data import Data
Expand Down Expand Up @@ -248,3 +249,68 @@ def sync_from_template_and_variables(cls, template: str, **variables):
return asyncio.run(cls.from_template_and_variables(template, **variables))
else:
return loop.run_until_complete(cls.from_template_and_variables(template, **variables))


class DefaultModel(BaseModel):
class Config:
from_attributes = True
populate_by_name = True
json_encoders = {
datetime: lambda v: v.isoformat(),
}

def json(self, **kwargs):
# Usa a função de serialização personalizada
return super().model_dump_json(**kwargs, encoder=self.custom_encoder)

@staticmethod
def custom_encoder(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")


class MessageResponse(DefaultModel):
id: str | UUID | None = Field(default=None)
flow_id: UUID | None = Field(default=None)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
sender: str
sender_name: str
session_id: str
text: str
files: list[str] = []

@field_validator("files", mode="before")
@classmethod
def validate_files(cls, v):
if isinstance(v, str):
v = json.loads(v)
return v

@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v):
v = v.replace(microsecond=0)
return v.strftime("%Y-%m-%d %H:%M:%S")

@field_serializer("files")
@classmethod
def serialize_files(cls, v):
if isinstance(v, list):
return json.dumps(v)
return v

@classmethod
def from_message(cls, message: Message, flow_id: str | None = None):
# first check if the record has all the required fields
if message.text is None or not message.sender or not message.sender_name:
raise ValueError("The message does not have the required fields (text, sender, sender_name).")
return cls(
sender=message.sender,
sender_name=message.sender_name,
text=message.text,
session_id=message.session_id,
files=message.files or [],
timestamp=message.timestamp,
flow_id=flow_id,
)
10 changes: 0 additions & 10 deletions src/backend/base/langflow/services/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from langflow.services.database.utils import (
Result,
TableResults,
migrate_messages_from_monitor_service_to_database,
migrate_transactions_from_monitor_service_to_database,
)
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
Expand Down Expand Up @@ -215,14 +213,6 @@ def run_migrations(self, fix=False):
logger.error(f"AutogenerateDiffsDetected: {exc}")
if not fix:
raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}")
try:
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
try:
migrate_transactions_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating transactions from monitor service to database: {exc}")

if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)
Expand Down
112 changes: 1 addition & 111 deletions src/backend/base/langflow/services/database/utils.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,17 @@
from __future__ import annotations

import json
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING

from alembic.util.exc import CommandError
from loguru import logger
from sqlmodel import Session, select, text

from langflow.services.database.models import TransactionTable
from langflow.services.deps import get_monitor_service
from sqlmodel import Session, text

if TYPE_CHECKING:
from langflow.services.database.service import DatabaseService


def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
from langflow.schema.message import Message
from langflow.services.database.models.message import MessageTable

try:
monitor_service = get_monitor_service()
messages_df = monitor_service.get_messages()
except Exception as e:
if "Table with name messages does not exist" in str(e):
logger.debug(f"Error retrieving messages from monitor service: {e}")
else:
logger.warning(f"Error retrieving messages from monitor service: {e}")
return False

if messages_df.empty:
logger.info("No messages to migrate.")
return True

original_messages: list[dict] = messages_df.to_dict(orient="records")

db_messages = session.exec(select(MessageTable)).all()
db_messages = [msg[0] for msg in db_messages] # type: ignore
db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages}
# Filter out messages that already exist in the database
original_messages_filtered = []
for message in original_messages:
key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"])
if key not in db_msg_dict:
original_messages_filtered.append(message)
if not original_messages_filtered:
logger.info("No messages to migrate.")
return True
try:
# Bulk insert messages
session.bulk_insert_mappings(
MessageTable, # type: ignore
[MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered],
)
session.commit()
except Exception as e:
logger.error(f"Error during message insertion: {str(e)}")
session.rollback()
return False

# Create a dictionary for faster lookup

all_ok = True
for orig_msg in original_messages_filtered:
key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"])
matching_db_msg = db_msg_dict.get(key)

if matching_db_msg is None:
logger.warning(f"Message not found in database: {orig_msg}")
all_ok = False
else:
# Validate other fields
if any(getattr(matching_db_msg, k) != v for k, v in orig_msg.items() if k != "index"):
logger.warning(f"Message mismatch in database: {orig_msg}")
all_ok = False

if all_ok:
messages_ids = [message["index"] for message in original_messages]
monitor_service.delete_messages(messages_ids)
logger.info("Migration completed successfully. Original messages deleted.")
else:
logger.warning("Migration completed with errors. Original messages not deleted.")

return all_ok


def initialize_database(fix_migration: bool = False):
logger.debug("Initializing database")
from langflow.services.deps import get_db_service
Expand Down Expand Up @@ -153,39 +79,3 @@ class Result:
class TableResults:
table_name: str
results: list[Result]


def migrate_transactions_from_monitor_service_to_database(session: Session) -> None:
try:
monitor_service = get_monitor_service()
batch = monitor_service.get_transactions()
except Exception as e:
if "Table with name transactions does not exist" in str(e):
logger.debug(f"Error retrieving transactions from monitor service: {e}")
else:
logger.warning(f"Error retrieving transactions from monitor service: {e}")
return

if not batch:
logger.debug("No transactions to migrate.")
return
to_delete = []
while batch:
logger.debug(f"Migrating {len(batch)} transactions")
for row in batch:
tt = TransactionTable(
flow_id=row["flow_id"],
status=row["status"],
error=row["error"],
timestamp=row["timestamp"],
vertex_id=row["vertex_id"],
inputs=json.loads(row["inputs"]) if row["inputs"] else None,
outputs=json.loads(row["outputs"]) if row["outputs"] else None,
target_id=row["target_id"],
)
to_delete.append(row["index"])
session.add(tt)
session.commit()
monitor_service.delete_transactions(to_delete)
batch = monitor_service.get_transactions()
logger.debug("Transactions migrations completed.")
13 changes: 0 additions & 13 deletions src/backend/base/langflow/services/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from langflow.services.cache.service import CacheService
from langflow.services.chat.service import ChatService
from langflow.services.database.service import DatabaseService
from langflow.services.monitor.service import MonitorService
from langflow.services.plugins.service import PluginService
from langflow.services.session.service import SessionService
from langflow.services.settings.service import SettingsService
Expand Down Expand Up @@ -220,18 +219,6 @@ def get_session_service() -> "SessionService":
return get_service(ServiceType.SESSION_SERVICE, SessionServiceFactory()) # type: ignore


def get_monitor_service() -> "MonitorService":
"""
Retrieves the MonitorService instance from the service manager.
Returns:
MonitorService: The MonitorService instance.
"""
from langflow.services.monitor.factory import MonitorServiceFactory

return get_service(ServiceType.MONITOR_SERVICE, MonitorServiceFactory()) # type: ignore


def get_task_service() -> "TaskService":
"""
Retrieves the TaskService instance from the service manager.
Expand Down
Empty file.
13 changes: 0 additions & 13 deletions src/backend/base/langflow/services/monitor/factory.py

This file was deleted.

Loading

0 comments on commit 0714240

Please sign in to comment.