Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: drop duckdb usage and migrations #3730

Merged
merged 7 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions src/backend/base/langflow/services/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from langflow.services.database.utils import (
Result,
TableResults,
migrate_messages_from_monitor_service_to_database,
migrate_transactions_from_monitor_service_to_database,
)
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
Expand Down Expand Up @@ -215,14 +213,6 @@ def run_migrations(self, fix=False):
logger.error(f"AutogenerateDiffsDetected: {exc}")
if not fix:
raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}")
try:
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
try:
migrate_transactions_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating transactions from monitor service to database: {exc}")

if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)
Expand Down
113 changes: 1 addition & 112 deletions src/backend/base/langflow/services/database/utils.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,15 @@
import json
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING

from alembic.util.exc import CommandError
from loguru import logger
from sqlmodel import Session, select, text
from sqlmodel import Session, text

from langflow.services.database.models import TransactionTable
from langflow.services.deps import get_monitor_service

if TYPE_CHECKING:
from langflow.services.database.service import DatabaseService

from typing import Dict, List


def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
from langflow.schema.message import Message
from langflow.services.database.models.message import MessageTable

try:
monitor_service = get_monitor_service()
messages_df = monitor_service.get_messages()
except Exception as e:
if "Table with name messages does not exist" in str(e):
logger.debug(f"Error retrieving messages from monitor service: {e}")
else:
logger.warning(f"Error retrieving messages from monitor service: {e}")
return False

if messages_df.empty:
logger.info("No messages to migrate.")
return True

original_messages: List[Dict] = messages_df.to_dict(orient="records")

db_messages = session.exec(select(MessageTable)).all()
db_messages = [msg[0] for msg in db_messages] # type: ignore
db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages}
# Filter out messages that already exist in the database
original_messages_filtered = []
for message in original_messages:
key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"])
if key not in db_msg_dict:
original_messages_filtered.append(message)
if not original_messages_filtered:
logger.info("No messages to migrate.")
return True
try:
# Bulk insert messages
session.bulk_insert_mappings(
MessageTable, # type: ignore
[MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered],
)
session.commit()
except Exception as e:
logger.error(f"Error during message insertion: {str(e)}")
session.rollback()
return False

# Create a dictionary for faster lookup

all_ok = True
for orig_msg in original_messages_filtered:
key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"])
matching_db_msg = db_msg_dict.get(key)

if matching_db_msg is None:
logger.warning(f"Message not found in database: {orig_msg}")
all_ok = False
else:
# Validate other fields
if any(getattr(matching_db_msg, k) != v for k, v in orig_msg.items() if k != "index"):
logger.warning(f"Message mismatch in database: {orig_msg}")
all_ok = False

if all_ok:
messages_ids = [message["index"] for message in original_messages]
monitor_service.delete_messages(messages_ids)
logger.info("Migration completed successfully. Original messages deleted.")
else:
logger.warning("Migration completed with errors. Original messages not deleted.")

return all_ok


def initialize_database(fix_migration: bool = False):
logger.debug("Initializing database")
Expand Down Expand Up @@ -153,39 +78,3 @@ class Result:
class TableResults:
table_name: str
results: list[Result]


def migrate_transactions_from_monitor_service_to_database(session: Session) -> None:
try:
monitor_service = get_monitor_service()
batch = monitor_service.get_transactions()
except Exception as e:
if "Table with name transactions does not exist" in str(e):
logger.debug(f"Error retrieving transactions from monitor service: {e}")
else:
logger.warning(f"Error retrieving transactions from monitor service: {e}")
return

if not batch:
logger.debug("No transactions to migrate.")
return
to_delete = []
while batch:
logger.debug(f"Migrating {len(batch)} transactions")
for row in batch:
tt = TransactionTable(
flow_id=row["flow_id"],
status=row["status"],
error=row["error"],
timestamp=row["timestamp"],
vertex_id=row["vertex_id"],
inputs=json.loads(row["inputs"]) if row["inputs"] else None,
outputs=json.loads(row["outputs"]) if row["outputs"] else None,
target_id=row["target_id"],
)
to_delete.append(row["index"])
session.add(tt)
session.commit()
monitor_service.delete_transactions(to_delete)
batch = monitor_service.get_transactions()
logger.debug("Transactions migrations completed.")
13 changes: 0 additions & 13 deletions src/backend/base/langflow/services/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from langflow.services.cache.service import CacheService
from langflow.services.chat.service import ChatService
from langflow.services.database.service import DatabaseService
from langflow.services.monitor.service import MonitorService
from langflow.services.plugins.service import PluginService
from langflow.services.session.service import SessionService
from langflow.services.settings.service import SettingsService
Expand Down Expand Up @@ -217,18 +216,6 @@ def get_session_service() -> "SessionService":
return get_service(ServiceType.SESSION_SERVICE, SessionServiceFactory()) # type: ignore


def get_monitor_service() -> "MonitorService":
"""
Retrieves the MonitorService instance from the service manager.

Returns:
MonitorService: The MonitorService instance.
"""
from langflow.services.monitor.factory import MonitorServiceFactory

return get_service(ServiceType.MONITOR_SERVICE, MonitorServiceFactory()) # type: ignore


def get_task_service() -> "TaskService":
"""
Retrieves the TaskService instance from the service manager.
Expand Down
Empty file.
13 changes: 0 additions & 13 deletions src/backend/base/langflow/services/monitor/factory.py

This file was deleted.

Loading
Loading