From 17a28b9330ac9f7fd9336be831173310d121be16 Mon Sep 17 00:00:00 2001 From: ilongin Date: Mon, 5 Jan 2026 10:53:17 +0100 Subject: [PATCH 1/8] added simple local db automatic migration --- src/datachain/data_storage/sqlite.py | 99 ++++++++-- tests/func/test_local_db_migration.py | 262 ++++++++++++++++++++++++++ 2 files changed, 344 insertions(+), 17 deletions(-) create mode 100644 tests/func/test_local_db_migration.py diff --git a/src/datachain/data_storage/sqlite.py b/src/datachain/data_storage/sqlite.py index b985ef746..8dd1575cc 100644 --- a/src/datachain/data_storage/sqlite.py +++ b/src/datachain/data_storage/sqlite.py @@ -261,6 +261,40 @@ def execute_str(self, sql: str, parameters=None) -> sqlite3.Cursor: return self.db.execute(sql) return self.db.execute(sql, parameters) + def add_column(self, table_name: str, column: Column) -> None: + """ + Add a column to an existing table. + Uses SQLAlchemy's type compilation to ensure proper type conversion. + """ + compiled_type = column.type.compile(dialect=self.dialect) + + parts = [column.name, str(compiled_type)] + + if not column.nullable: + parts.append("NOT NULL") + + if column.default is not None and hasattr(column.default, "arg"): + default_val = column.default.arg + if isinstance(default_val, str): + parts.append(f"DEFAULT '{default_val}'") + elif isinstance(default_val, bool): + parts.append(f"DEFAULT {int(default_val)}") + else: + parts.append(f"DEFAULT {default_val}") + + column_def = " ".join(parts) + alter_query = f"ALTER TABLE {table_name} ADD COLUMN {column_def}" + + try: + self.execute_str(alter_query) + logger.debug("Added column %s to table %s", column.name, table_name) + except sqlite3.OperationalError as e: + # Column likely already exists + if "duplicate column name" not in str(e).lower(): + logger.debug( + "Could not add column %s to %s: %s", column.name, table_name, e + ) + def insert_dataframe(self, table_name: str, df) -> int: # Dynamically calculates chunksize by dividing max variable limit in a # single SQL insert with number of columns in dataframe. @@ -458,24 +492,55 @@ def _init_meta_schema_value(self) -> None: ) self.db.execute(stmt) + @property + def _metastore_tables(self) -> list[Table]: + """List of all metastore tables that require initialization and migration.""" + return [ + self._namespaces, + self._projects, + self._datasets, + self._datasets_versions, + self._datasets_dependencies, + self._jobs, + self._checkpoints, + self._dataset_version_jobs, + ] + def _init_tables(self) -> None: - """Initialize tables.""" - self.db.create_table(self._namespaces, if_not_exists=True) - self.default_table_names.append(self._namespaces.name) - self.db.create_table(self._projects, if_not_exists=True) - self.default_table_names.append(self._projects.name) - self.db.create_table(self._datasets, if_not_exists=True) - self.default_table_names.append(self._datasets.name) - self.db.create_table(self._datasets_versions, if_not_exists=True) - self.default_table_names.append(self._datasets_versions.name) - self.db.create_table(self._datasets_dependencies, if_not_exists=True) - self.default_table_names.append(self._datasets_dependencies.name) - self.db.create_table(self._jobs, if_not_exists=True) - self.default_table_names.append(self._jobs.name) - self.db.create_table(self._checkpoints, if_not_exists=True) - self.default_table_names.append(self._checkpoints.name) - self.db.create_table(self._dataset_version_jobs, if_not_exists=True) - self.default_table_names.append(self._dataset_version_jobs.name) + """Initialize tables with automatic schema migration.""" + for table in self._metastore_tables: + self.db.create_table(table, if_not_exists=True) + self.default_table_names.append(table.name) + + # Auto-migrate: add missing columns based on schema definitions + for table in self._metastore_tables: + self._migrate_table_schema(table) + + def _migrate_table_schema(self, table: Table) -> None: + """ + Automatically add missing columns to match the SQLAlchemy schema definition. + This enables lazy schema evolution without manual migrations. + """ + # Get actual columns in database + columns_query = f"PRAGMA table_info({table.name})" + existing_columns = self.db.execute_str(columns_query).fetchall() + existing_column_names = {col[1] for col in existing_columns} + + # Get expected columns from SQLAlchemy Table definition and add missing ones + for column in table.columns: + if column.name not in existing_column_names: + self.db.add_column(table.name, column) + + self._create_table_indexes(table) + + def _create_table_indexes(self, table: Table) -> None: + """Create all indexes for a table, skipping if they fail.""" + for index in table.indexes: + try: + self.db.execute(CreateIndex(index, if_not_exists=True)) + except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError) as e: + # Index creation might fail if column doesn't exist or other issues + logger.debug("Could not create index %s: %s", index.name, e) def _init_namespaces_projects(self) -> None: """ diff --git a/tests/func/test_local_db_migration.py b/tests/func/test_local_db_migration.py new file mode 100644 index 000000000..8175d2561 --- /dev/null +++ b/tests/func/test_local_db_migration.py @@ -0,0 +1,262 @@ +"""Tests for automatic local database schema migration. + +These tests verify that the MongoDB-style lazy schema evolution works correctly +for SQLite databases. ClickHouse databases used in SaaS have proper migrations, +so these tests are skipped when running against ClickHouse. +""" + +import time + +from sqlalchemy import Column, Index, Integer, Table, Text + +from tests.utils import skip_if_not_sqlite + + +@skip_if_not_sqlite +def test_automatic_schema_migration(catalog): + """Test that missing columns are automatically added during initialization. + + This test simulates upgrading from an old database schema by: + 1. Creating a table with a subset of columns (old schema) + 2. Calling migration logic + 3. Verifying missing columns were added + 4. Verifying default values were applied + 5. Verifying indexes were created + 6. Verifying the table is functional + """ + metastore = catalog.metastore + db = metastore.db + + old_table_name = "test_migration_table" + + # Clean up if exists from previous test run + db.execute_str(f"DROP TABLE IF EXISTS {old_table_name}") + + # Create table with old schema (only 2 columns) + db.execute_str( + f""" + CREATE TABLE {old_table_name} ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL + ) + """ + ) + + db.execute_str( + f"INSERT INTO {old_table_name} (id, name) VALUES (?, ?)", # noqa: S608 + ("test-id", "test-name"), + ) + + # Define the "new" schema with additional columns + new_table = Table( + old_table_name, + db.metadata, + Column("id", Text, primary_key=True), + Column("name", Text, nullable=False), + Column("description", Text, nullable=True), + Column("count", Integer, nullable=True, default=0), + Column("status", Text, nullable=False, default="active"), + Index("idx_test_name", "name"), + ) + + metastore._migrate_table_schema(new_table) + + # Verify: Check that new columns were added + columns_query = f"PRAGMA table_info({old_table_name})" + columns = db.execute_str(columns_query).fetchall() + column_names = {col[1] for col in columns} + + assert "id" in column_names + assert "name" in column_names + assert "description" in column_names, ( + "Missing column 'description' should have been added" + ) + assert "count" in column_names, "Missing column 'count' should have been added" + assert "status" in column_names, "Missing column 'status' should have been added" + + # Verify: Check that index was created + indexes_query = f"PRAGMA index_list({old_table_name})" + indexes = db.execute_str(indexes_query).fetchall() + index_names = {idx[1] for idx in indexes} + + assert "idx_test_name" in index_names, "Index should have been created" + + # Verify: Old data still exists + result = db.execute_str( + f"SELECT id, name FROM {old_table_name} WHERE id = ?", # noqa: S608 + ("test-id",), + ).fetchone() + assert result[0] == "test-id" + assert result[1] == "test-name" + + # Verify: Can insert data with new columns + db.execute_str( + f"INSERT INTO {old_table_name} (id, name, description, count, status) " # noqa: S608 + "VALUES (?, ?, ?, ?, ?)", + ("test-id-2", "test-name-2", "test description", 42, "pending"), + ) + + # Verify: Can query new columns + result = db.execute_str( + f"SELECT description, count, status FROM {old_table_name} WHERE id = ?", # noqa: S608 + ("test-id-2",), + ).fetchone() + assert result[0] == "test description" + assert result[1] == 42 + assert result[2] == "pending" + + # Verify: Old rows have NULL for nullable columns without defaults, + # but get default values for columns with defaults + result = db.execute_str( + f"SELECT description, count, status FROM {old_table_name} WHERE id = ?", # noqa: S608 + ("test-id",), + ).fetchone() + assert result[0] is None, "Nullable column without default should be NULL" + assert result[1] == 0, ( + "Column with default=0 should have default applied to existing rows" + ) + assert result[2] == "active", ( + "Column with default='active' should have default applied to existing rows" + ) + + # Verify: New rows get default values when not specified + db.execute_str( + f"INSERT INTO {old_table_name} (id, name) VALUES (?, ?)", # noqa: S608 + ("test-id-3", "test-name-3"), + ) + result = db.execute_str( + f"SELECT count, status FROM {old_table_name} WHERE id = ?", # noqa: S608 + ("test-id-3",), + ).fetchone() + # SQLite applies defaults on INSERT + assert result[0] == 0, "Default value for count should be 0" + assert result[1] == "active", "Default value for status should be 'active'" + + db.execute_str(f"DROP TABLE {old_table_name}") + + +@skip_if_not_sqlite +def test_migration_is_idempotent(catalog): + """Test that running migration multiple times doesn't cause errors.""" + metastore = catalog.metastore + db = metastore.db + + old_table_name = "test_idempotent_migration" + + db.execute_str(f"DROP TABLE IF EXISTS {old_table_name}") + + db.execute_str( + f""" + CREATE TABLE {old_table_name} ( + id TEXT PRIMARY KEY, + name TEXT + ) + """ + ) + + new_table = Table( + old_table_name, + db.metadata, + Column("id", Text, primary_key=True), + Column("name", Text), + Column("extra", Text, nullable=True), + Index("idx_test_idempotent", "name"), + ) + + # Run migration multiple times - should not fail + metastore._migrate_table_schema(new_table) + metastore._migrate_table_schema(new_table) + metastore._migrate_table_schema(new_table) + + # Verify column exists + columns_query = f"PRAGMA table_info({old_table_name})" + columns = db.execute_str(columns_query).fetchall() + column_names = {col[1] for col in columns} + assert "extra" in column_names + + db.execute_str(f"DROP TABLE {old_table_name}") + + +@skip_if_not_sqlite +def test_migration_with_data_preservation(catalog): + """Test that migration preserves existing data correctly.""" + metastore = catalog.metastore + db = metastore.db + + table_name = "test_data_preservation" + + db.execute_str(f"DROP TABLE IF EXISTS {table_name}") + + db.execute_str( + f""" + CREATE TABLE {table_name} ( + id INTEGER PRIMARY KEY, + value TEXT NOT NULL + ) + """ + ) + + for i in range(100): + db.execute_str( + f"INSERT INTO {table_name} (id, value) VALUES (?, ?)", # noqa: S608 + (i, f"value-{i}"), + ) + + new_table = Table( + table_name, + db.metadata, + Column("id", Integer, primary_key=True), + Column("value", Text, nullable=False), + Column("new_field", Text, nullable=True), + ) + + metastore._migrate_table_schema(new_table) + + # Verify all data is preserved + result = db.execute_str(f"SELECT COUNT(*) FROM {table_name}").fetchone() # noqa: S608 + assert result[0] == 100, "All rows should be preserved" + + # Verify data integrity + for i in range(100): + result = db.execute_str( + f"SELECT value, new_field FROM {table_name} WHERE id = ?", # noqa: S608 + (i,), + ).fetchone() + assert result[0] == f"value-{i}", f"Value for row {i} should be preserved" + assert result[1] is None, f"New field for row {i} should be NULL" + + db.execute_str(f"DROP TABLE {table_name}") + + +@skip_if_not_sqlite +def test_migration_performance_overhead(catalog): + """Measure the overhead of migration checks when schema is already up-to-date. + + This simulates the common case where users run commands and the schema + hasn't changed - we want to ensure the migration check is fast. + """ + metastore = catalog.metastore + + for table in metastore._metastore_tables: + metastore._migrate_table_schema(table) + + # Measure overhead by running multiple times + num_runs = 100 + start = time.perf_counter() + for _ in range(num_runs): + for table in metastore._metastore_tables: + metastore._migrate_table_schema(table) + elapsed = time.perf_counter() - start + + avg_time_ms = (elapsed / num_runs) * 1000 + num_tables = len(metastore._metastore_tables) + + print(f"\nMigration check overhead (average of {num_runs} runs):") + print(f" Total: {avg_time_ms:.3f}ms for {num_tables} tables") + print(f" Per table: {avg_time_ms / num_tables:.3f}ms") + + # Assert reasonable performance: should be under 5ms for all tables + assert avg_time_ms < 5.0, ( + f"Migration check overhead is {avg_time_ms:.2f}ms, should be under 5ms. " + "This might indicate a performance regression." + ) From f675e1c20f2141781969f2500a2731607ac69baf Mon Sep 17 00:00:00 2001 From: ilongin Date: Mon, 5 Jan 2026 10:59:18 +0100 Subject: [PATCH 2/8] simplifying linter --- src/datachain/data_storage/sqlite.py | 1 - tests/func/test_local_db_migration.py | 22 ++++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/datachain/data_storage/sqlite.py b/src/datachain/data_storage/sqlite.py index 8dd1575cc..884016d7c 100644 --- a/src/datachain/data_storage/sqlite.py +++ b/src/datachain/data_storage/sqlite.py @@ -539,7 +539,6 @@ def _create_table_indexes(self, table: Table) -> None: try: self.db.execute(CreateIndex(index, if_not_exists=True)) except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError) as e: - # Index creation might fail if column doesn't exist or other issues logger.debug("Could not create index %s: %s", index.name, e) def _init_namespaces_projects(self) -> None: diff --git a/tests/func/test_local_db_migration.py b/tests/func/test_local_db_migration.py index 8175d2561..2d40b02be 100644 --- a/tests/func/test_local_db_migration.py +++ b/tests/func/test_local_db_migration.py @@ -5,6 +5,8 @@ so these tests are skipped when running against ClickHouse. """ +# ruff: noqa: S608 + import time from sqlalchemy import Column, Index, Integer, Table, Text @@ -43,7 +45,7 @@ def test_automatic_schema_migration(catalog): ) db.execute_str( - f"INSERT INTO {old_table_name} (id, name) VALUES (?, ?)", # noqa: S608 + f"INSERT INTO {old_table_name} (id, name) VALUES (?, ?)", ("test-id", "test-name"), ) @@ -83,7 +85,7 @@ def test_automatic_schema_migration(catalog): # Verify: Old data still exists result = db.execute_str( - f"SELECT id, name FROM {old_table_name} WHERE id = ?", # noqa: S608 + f"SELECT id, name FROM {old_table_name} WHERE id = ?", ("test-id",), ).fetchone() assert result[0] == "test-id" @@ -91,14 +93,14 @@ def test_automatic_schema_migration(catalog): # Verify: Can insert data with new columns db.execute_str( - f"INSERT INTO {old_table_name} (id, name, description, count, status) " # noqa: S608 + f"INSERT INTO {old_table_name} (id, name, description, count, status) " "VALUES (?, ?, ?, ?, ?)", ("test-id-2", "test-name-2", "test description", 42, "pending"), ) # Verify: Can query new columns result = db.execute_str( - f"SELECT description, count, status FROM {old_table_name} WHERE id = ?", # noqa: S608 + f"SELECT description, count, status FROM {old_table_name} WHERE id = ?", ("test-id-2",), ).fetchone() assert result[0] == "test description" @@ -108,7 +110,7 @@ def test_automatic_schema_migration(catalog): # Verify: Old rows have NULL for nullable columns without defaults, # but get default values for columns with defaults result = db.execute_str( - f"SELECT description, count, status FROM {old_table_name} WHERE id = ?", # noqa: S608 + f"SELECT description, count, status FROM {old_table_name} WHERE id = ?", ("test-id",), ).fetchone() assert result[0] is None, "Nullable column without default should be NULL" @@ -121,11 +123,11 @@ def test_automatic_schema_migration(catalog): # Verify: New rows get default values when not specified db.execute_str( - f"INSERT INTO {old_table_name} (id, name) VALUES (?, ?)", # noqa: S608 + f"INSERT INTO {old_table_name} (id, name) VALUES (?, ?)", ("test-id-3", "test-name-3"), ) result = db.execute_str( - f"SELECT count, status FROM {old_table_name} WHERE id = ?", # noqa: S608 + f"SELECT count, status FROM {old_table_name} WHERE id = ?", ("test-id-3",), ).fetchone() # SQLite applies defaults on INSERT @@ -198,7 +200,7 @@ def test_migration_with_data_preservation(catalog): for i in range(100): db.execute_str( - f"INSERT INTO {table_name} (id, value) VALUES (?, ?)", # noqa: S608 + f"INSERT INTO {table_name} (id, value) VALUES (?, ?)", (i, f"value-{i}"), ) @@ -213,13 +215,13 @@ def test_migration_with_data_preservation(catalog): metastore._migrate_table_schema(new_table) # Verify all data is preserved - result = db.execute_str(f"SELECT COUNT(*) FROM {table_name}").fetchone() # noqa: S608 + result = db.execute_str(f"SELECT COUNT(*) FROM {table_name}").fetchone() assert result[0] == 100, "All rows should be preserved" # Verify data integrity for i in range(100): result = db.execute_str( - f"SELECT value, new_field FROM {table_name} WHERE id = ?", # noqa: S608 + f"SELECT value, new_field FROM {table_name} WHERE id = ?", (i,), ).fetchone() assert result[0] == f"value-{i}", f"Value for row {i} should be preserved" From a41bae9e602c76c1e02fbe94c888db793d5e6d1c Mon Sep 17 00:00:00 2001 From: ilongin Date: Mon, 5 Jan 2026 11:45:51 +0100 Subject: [PATCH 3/8] added warning skip --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index c050a8dbd..c6ca32f29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,6 +173,7 @@ filterwarnings = [ "ignore::DeprecationWarning:datasets.utils._dill", "ignore::DeprecationWarning:librosa.core.intervals", "ignore::FutureWarning:google.api_core._python_version_support", # Python 3.10 EOL warning + "ignore::FutureWarning:datamodel_code_generator.format", # Formatter migration warning # Expected warnings from our code "ignore:Field name .* shadows an attribute in parent:UserWarning", # datachain.lib.feature # Coverage limitation on Python 3.13 From 1d74a73e6abf80bda9cdc1f46f83ad224bc4e439 Mon Sep 17 00:00:00 2001 From: ilongin Date: Mon, 5 Jan 2026 12:13:55 +0100 Subject: [PATCH 4/8] added warning skip --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c6ca32f29..6926facbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,7 +173,7 @@ filterwarnings = [ "ignore::DeprecationWarning:datasets.utils._dill", "ignore::DeprecationWarning:librosa.core.intervals", "ignore::FutureWarning:google.api_core._python_version_support", # Python 3.10 EOL warning - "ignore::FutureWarning:datamodel_code_generator.format", # Formatter migration warning + "ignore:The default formatters.*will be replaced by ruff:FutureWarning:datamodel_code_generator.*", # Formatter migration warning # Expected warnings from our code "ignore:Field name .* shadows an attribute in parent:UserWarning", # datachain.lib.feature # Coverage limitation on Python 3.13 From 1d9099312a6136e22cd8a985d06bac61ff912d13 Mon Sep 17 00:00:00 2001 From: ilongin Date: Mon, 5 Jan 2026 13:25:34 +0100 Subject: [PATCH 5/8] added warning skip --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 6926facbf..01bcdff90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -174,6 +174,7 @@ filterwarnings = [ "ignore::DeprecationWarning:librosa.core.intervals", "ignore::FutureWarning:google.api_core._python_version_support", # Python 3.10 EOL warning "ignore:The default formatters.*will be replaced by ruff:FutureWarning:datamodel_code_generator.*", # Formatter migration warning + "ignore:.*getdata.*deprecated.*:DeprecationWarning:PIL.*", # Pillow 14 deprecation (we use <13) # Expected warnings from our code "ignore:Field name .* shadows an attribute in parent:UserWarning", # datachain.lib.feature # Coverage limitation on Python 3.13 From 31c1b2b584b9d3d9879b34f24807f11c7126f357 Mon Sep 17 00:00:00 2001 From: ilongin Date: Mon, 5 Jan 2026 13:39:10 +0100 Subject: [PATCH 6/8] added warning skip --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 01bcdff90..3df99c16f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -174,7 +174,7 @@ filterwarnings = [ "ignore::DeprecationWarning:librosa.core.intervals", "ignore::FutureWarning:google.api_core._python_version_support", # Python 3.10 EOL warning "ignore:The default formatters.*will be replaced by ruff:FutureWarning:datamodel_code_generator.*", # Formatter migration warning - "ignore:.*getdata.*deprecated.*:DeprecationWarning:PIL.*", # Pillow 14 deprecation (we use <13) + "ignore:Image.Image.getdata is deprecated:DeprecationWarning", # Pillow 14 deprecation (we use <13) # Expected warnings from our code "ignore:Field name .* shadows an attribute in parent:UserWarning", # datachain.lib.feature # Coverage limitation on Python 3.13 From 631bc477fd6399d09b148a07aa1539bdc3b32bc8 Mon Sep 17 00:00:00 2001 From: ilongin Date: Tue, 6 Jan 2026 22:38:45 +0100 Subject: [PATCH 7/8] bumping data-model dependency --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3df99c16f..cab8485b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ "cloudpickle", "pydantic", "jmespath>=1.0", - "datamodel-code-generator>=0.25", + "datamodel-code-generator>=0.27", "Pillow>=10.0.0,<13", "msgpack>=1.0.4,<2", "psutil", @@ -173,7 +173,6 @@ filterwarnings = [ "ignore::DeprecationWarning:datasets.utils._dill", "ignore::DeprecationWarning:librosa.core.intervals", "ignore::FutureWarning:google.api_core._python_version_support", # Python 3.10 EOL warning - "ignore:The default formatters.*will be replaced by ruff:FutureWarning:datamodel_code_generator.*", # Formatter migration warning "ignore:Image.Image.getdata is deprecated:DeprecationWarning", # Pillow 14 deprecation (we use <13) # Expected warnings from our code "ignore:Field name .* shadows an attribute in parent:UserWarning", # datachain.lib.feature From ff36a54405bbf1ac1cbc554f0c4ca1fdae3ef527 Mon Sep 17 00:00:00 2001 From: ilongin Date: Tue, 6 Jan 2026 22:45:14 +0100 Subject: [PATCH 8/8] added backward compatibility test --- tests/func/test_local_db_migration.py | 73 +++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/tests/func/test_local_db_migration.py b/tests/func/test_local_db_migration.py index 2d40b02be..1132a5987 100644 --- a/tests/func/test_local_db_migration.py +++ b/tests/func/test_local_db_migration.py @@ -230,6 +230,79 @@ def test_migration_with_data_preservation(catalog): db.execute_str(f"DROP TABLE {table_name}") +@skip_if_not_sqlite +def test_backward_compatibility_with_extra_columns(catalog): + """Test that old code works with DB that has extra columns (downgrade scenario). + + Simulates the case where: + 1. User runs new version (DB has new columns) + 2. User downgrades to old version (schema doesn't define new columns) + + Verifies that SQLAlchemy-based queries handle extra columns gracefully. + """ + metastore = catalog.metastore + db = metastore.db + + table_name = "test_backward_compat" + + db.execute_str(f"DROP TABLE IF EXISTS {table_name}") + + # Simulate NEW version DB: create table with extra columns + db.execute_str( + f""" + CREATE TABLE {table_name} ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + extra_column_1 TEXT, + extra_column_2 INTEGER + ) + """ + ) + + db.execute_str( + f"INSERT INTO {table_name} VALUES (1, 'Alice', 'extra_data', 42)", + ) + + # Simulate OLD version code: schema WITHOUT extra columns + old_schema_table = Table( + table_name, + db.metadata, + Column("id", Integer, primary_key=True), + Column("name", Text, nullable=False), + # Note: extra_column_1 and extra_column_2 are NOT defined + extend_existing=True, + ) + + result = db.execute_str(f"SELECT id, name FROM {table_name} WHERE id=1").fetchone() + assert result[0] == 1 + assert result[1] == "Alice" + + from sqlalchemy import insert + + db.execute(insert(old_schema_table).values(id=2, name="Bob")) + result = db.execute_str( + f"SELECT id, name, extra_column_1, extra_column_2 FROM {table_name} WHERE id=2" + ).fetchone() + assert result[0] == 2 + assert result[1] == "Bob" + assert result[2] is None # extra_column_1 is NULL + assert result[3] is None # extra_column_2 is NULL + + from sqlalchemy import update + + db.execute( + update(old_schema_table).where(old_schema_table.c.id == 1).values(name="Alice2") + ) + result = db.execute_str( + f"SELECT name, extra_column_1, extra_column_2 FROM {table_name} WHERE id=1" + ).fetchone() + assert result[0] == "Alice2" # name updated + assert result[1] == "extra_data" # extra_column_1 preserved + assert result[2] == 42 # extra_column_2 preserved + + db.execute_str(f"DROP TABLE {table_name}") + + @skip_if_not_sqlite def test_migration_performance_overhead(catalog): """Measure the overhead of migration checks when schema is already up-to-date.