Skip to content
98 changes: 81 additions & 17 deletions src/datachain/data_storage/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,40 @@ def execute_str(self, sql: str, parameters=None) -> sqlite3.Cursor:
return self.db.execute(sql)
return self.db.execute(sql, parameters)

def add_column(self, table_name: str, column: Column) -> None:
"""
Add a column to an existing table.
Uses SQLAlchemy's type compilation to ensure proper type conversion.
"""
compiled_type = column.type.compile(dialect=self.dialect)

parts = [column.name, str(compiled_type)]

if not column.nullable:
parts.append("NOT NULL")

if column.default is not None and hasattr(column.default, "arg"):
default_val = column.default.arg
if isinstance(default_val, str):
parts.append(f"DEFAULT '{default_val}'")
elif isinstance(default_val, bool):
parts.append(f"DEFAULT {int(default_val)}")
else:
parts.append(f"DEFAULT {default_val}")

column_def = " ".join(parts)
alter_query = f"ALTER TABLE {table_name} ADD COLUMN {column_def}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use sqlalchemy here and not build raw SQL, which is fragile/not safe.

Brief example:

from sqlalchemy.schema import AddColumn

table.append_column(col)
self.execute(AddColumn(table, col))

Haven't checked it but it looks more robust to me, what do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no AddColumn any more, SQLAlchemy stopped supporting it a while ago since it wasn't stable / portable I guess. Only thing left is to create raw query.


try:
self.execute_str(alter_query)
logger.debug("Added column %s to table %s", column.name, table_name)
except sqlite3.OperationalError as e:
# Column likely already exists
if "duplicate column name" not in str(e).lower():
logger.debug(
"Could not add column %s to %s: %s", column.name, table_name, e
)

def insert_dataframe(self, table_name: str, df) -> int:
# Dynamically calculates chunksize by dividing max variable limit in a
# single SQL insert with number of columns in dataframe.
Expand Down Expand Up @@ -459,24 +493,54 @@ def _init_meta_schema_value(self) -> None:
)
self.db.execute(stmt)

@property
def _metastore_tables(self) -> list[Table]:
"""List of all metastore tables that require initialization and migration."""
return [
self._namespaces,
self._projects,
self._datasets,
self._datasets_versions,
self._datasets_dependencies,
self._jobs,
self._checkpoints,
self._dataset_version_jobs,
]

def _init_tables(self) -> None:
"""Initialize tables."""
self.db.create_table(self._namespaces, if_not_exists=True)
self.default_table_names.append(self._namespaces.name)
self.db.create_table(self._projects, if_not_exists=True)
self.default_table_names.append(self._projects.name)
self.db.create_table(self._datasets, if_not_exists=True)
self.default_table_names.append(self._datasets.name)
self.db.create_table(self._datasets_versions, if_not_exists=True)
self.default_table_names.append(self._datasets_versions.name)
self.db.create_table(self._datasets_dependencies, if_not_exists=True)
self.default_table_names.append(self._datasets_dependencies.name)
self.db.create_table(self._jobs, if_not_exists=True)
self.default_table_names.append(self._jobs.name)
self.db.create_table(self._checkpoints, if_not_exists=True)
self.default_table_names.append(self._checkpoints.name)
self.db.create_table(self._dataset_version_jobs, if_not_exists=True)
self.default_table_names.append(self._dataset_version_jobs.name)
"""Initialize tables with automatic schema migration."""
for table in self._metastore_tables:
self.db.create_table(table, if_not_exists=True)
self.default_table_names.append(table.name)

# Auto-migrate: add missing columns based on schema definitions
for table in self._metastore_tables:
self._migrate_table_schema(table)

def _migrate_table_schema(self, table: Table) -> None:
"""
Automatically add missing columns to match the SQLAlchemy schema definition.
This enables lazy schema evolution without manual migrations.
"""
# Get actual columns in database
columns_query = f"PRAGMA table_info({table.name})"
existing_columns = self.db.execute_str(columns_query).fetchall()
existing_column_names = {col[1] for col in existing_columns}

# Get expected columns from SQLAlchemy Table definition and add missing ones
for column in table.columns:
if column.name not in existing_column_names:
self.db.add_column(table.name, column)

self._create_table_indexes(table)

def _create_table_indexes(self, table: Table) -> None:
"""Create all indexes for a table, skipping if they fail."""
for index in table.indexes:
try:
self.db.execute(CreateIndex(index, if_not_exists=True))
except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError) as e:
logger.debug("Could not create index %s: %s", index.name, e)

def _init_namespaces_projects(self) -> None:
"""
Expand Down
Loading