diff --git a/lib/src/powersync_database.dart b/lib/src/powersync_database.dart index 7e8f7691..5ec974a1 100644 --- a/lib/src/powersync_database.dart +++ b/lib/src/powersync_database.dart @@ -31,7 +31,7 @@ import 'sync_status.dart'; /// or not. Once connected, the changes are uploaded. class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// Schema used for the local database. - final Schema schema; + Schema schema; /// The underlying database. /// @@ -123,6 +123,19 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { statusStream = _statusStreamController.stream; await database.initialize(); await migrations.migrate(database); + await updateSchema(schema); + } + + /// Replace the schema with a new version. + /// This is for advanced use cases - typically the schema should just be + /// specified once in the constructor. + /// + /// Cannot be used while connected - this should only be called before [connect]. + Future updateSchema(Schema schema) async { + if (_disconnecter != null) { + throw AssertionError('Cannot update schema while connected'); + } + this.schema = schema; await updateSchemaInIsolate(database, schema); } @@ -144,6 +157,8 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// /// Status changes are reported on [statusStream]. connect({required PowerSyncBackendConnector connector}) async { + await initialize(); + // Disconnect if connected await disconnect(); final disconnector = AbortController(); @@ -259,19 +274,23 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// /// The database can still be queried after this is called, but the tables /// would be empty. - Future disconnectAndClear() async { + /// + /// To preserve data in local-only tables, set [clearLocal] to false. + Future disconnectAndClear({bool clearLocal = true}) async { await disconnect(); await writeTransaction((tx) async { - await tx.execute('DELETE FROM ps_oplog WHERE 1'); - await tx.execute('DELETE FROM ps_crud WHERE 1'); - await tx.execute('DELETE FROM ps_buckets WHERE 1'); + await tx.execute('DELETE FROM ps_oplog'); + await tx.execute('DELETE FROM ps_crud'); + await tx.execute('DELETE FROM ps_buckets'); + final tableGlob = clearLocal ? 'ps_data_*' : 'ps_data__*'; final existingTableRows = await tx.getAll( - "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"); + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", + [tableGlob]); for (var row in existingTableRows) { - await tx.execute('DELETE FROM "${row['name']}" WHERE 1'); + await tx.execute('DELETE FROM ${quoteIdentifier(row['name'])}'); } }); } diff --git a/lib/src/schema.dart b/lib/src/schema.dart index 1ef1e269..654f3eb9 100644 --- a/lib/src/schema.dart +++ b/lib/src/schema.dart @@ -13,7 +13,7 @@ class Schema { /// A single table in the schema. class Table { - /// The table name, as used in queries. + /// The synced table name, matching sync rules. final String name; /// List of columns. @@ -28,6 +28,9 @@ class Table { /// Whether this is an insert-only table. final bool insertOnly; + /// Override the name for the view + final String? _viewNameOverride; + /// Internal use only. /// /// Name of the table that stores the underlying data. @@ -42,33 +45,90 @@ class Table { /// Create a synced table. /// /// Local changes are recorded, and remote changes are synced to the local table. - const Table(this.name, this.columns, {this.indexes = const []}) - : localOnly = false, - insertOnly = false; + const Table(this.name, this.columns, + {this.indexes = const [], String? viewName, this.localOnly = false}) + : insertOnly = false, + _viewNameOverride = viewName; /// Create a table that only exists locally. /// /// This table does not record changes, and is not synchronized from the service. - const Table.localOnly(this.name, this.columns, {this.indexes = const []}) + const Table.localOnly(this.name, this.columns, + {this.indexes = const [], String? viewName}) : localOnly = true, - insertOnly = false; + insertOnly = false, + _viewNameOverride = viewName; /// Create a table that only supports inserts. /// /// This table records INSERT statements, but does not persist data locally. /// /// SELECT queries on the table will always return 0 rows. - const Table.insertOnly(this.name, this.columns) + const Table.insertOnly(this.name, this.columns, {String? viewName}) : localOnly = false, insertOnly = true, - indexes = const []; + indexes = const [], + _viewNameOverride = viewName; Column operator [](String columnName) { return columns.firstWhere((element) => element.name == columnName); } bool get validName { - return !invalidSqliteCharacters.hasMatch(name); + return !invalidSqliteCharacters.hasMatch(name) && + (_viewNameOverride == null || + !invalidSqliteCharacters.hasMatch(_viewNameOverride!)); + } + + /// Check that there are no issues in the table definition. + void validate() { + if (invalidSqliteCharacters.hasMatch(name)) { + throw AssertionError("Invalid characters in table name: $name"); + } else if (_viewNameOverride != null && + invalidSqliteCharacters.hasMatch(_viewNameOverride!)) { + throw AssertionError( + "Invalid characters in view name: $_viewNameOverride"); + } + + Set columnNames = {"id"}; + for (var column in columns) { + if (column.name == 'id') { + throw AssertionError( + "$name: id column is automatically added, custom id columns are not supported"); + } else if (columnNames.contains(column.name)) { + throw AssertionError("Duplicate column $name.${column.name}"); + } else if (invalidSqliteCharacters.hasMatch(column.name)) { + throw AssertionError( + "Invalid characters in column name: $name.${column.name}"); + } + + columnNames.add(column.name); + } + Set indexNames = {}; + + for (var index in indexes) { + if (indexNames.contains(index.name)) { + throw AssertionError("Duplicate index $name.${index.name}"); + } else if (invalidSqliteCharacters.hasMatch(index.name)) { + throw AssertionError( + "Invalid characters in index name: $name.${index.name}"); + } + + for (var column in index.columns) { + if (!columnNames.contains(column.column)) { + throw AssertionError( + "Column $name.${column.column} not found for index ${index.name}"); + } + } + + indexNames.add(index.name); + } + } + + /// Name for the view, used for queries. + /// Defaults to the synced table name. + String get viewName { + return _viewNameOverride ?? name; } } diff --git a/lib/src/schema_logic.dart b/lib/src/schema_logic.dart index 96d38f5e..d8b9b8b0 100644 --- a/lib/src/schema_logic.dart +++ b/lib/src/schema_logic.dart @@ -16,10 +16,10 @@ String createViewStatement(Table table) { if (table.insertOnly) { final nulls = table.columns.map((column) => 'NULL').join(', '); - return 'CREATE VIEW ${quoteIdentifier(table.name)}("id", $columnNames) AS SELECT NULL, $nulls WHERE 0 $_autoGenerated'; + return 'CREATE VIEW ${quoteIdentifier(table.viewName)}("id", $columnNames) AS SELECT NULL, $nulls WHERE 0 $_autoGenerated'; } final select = table.columns.map(mapColumn).join(', '); - return 'CREATE VIEW ${quoteIdentifier(table.name)}("id", $columnNames) AS SELECT "id", $select FROM ${quoteIdentifier(table.internalName)} $_autoGenerated'; + return 'CREATE VIEW ${quoteIdentifier(table.viewName)}("id", $columnNames) AS SELECT "id", $select FROM ${quoteIdentifier(table.internalName)} $_autoGenerated'; } String mapColumn(Column column) { @@ -32,6 +32,7 @@ List createViewTriggerStatements(Table table) { } else if (table.insertOnly) { return createViewTriggerStatementsInsert(table); } + final viewName = table.viewName; final type = table.name; final internalNameE = quoteIdentifier(table.internalName); @@ -46,16 +47,16 @@ List createViewTriggerStatements(Table table) { // Names in alphabetical order return [ """ -CREATE TRIGGER ${quoteIdentifier('ps_view_delete_$type')} -INSTEAD OF DELETE ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_delete_$viewName')} +INSTEAD OF DELETE ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN DELETE FROM $internalNameE WHERE id = OLD.id; INSERT INTO ps_crud(tx_id, data) SELECT current_tx, json_object('op', 'DELETE', 'type', ${quoteString(type)}, 'id', OLD.id) FROM ps_tx WHERE id = 1; END""", """ -CREATE TRIGGER ${quoteIdentifier('ps_view_insert_$type')} -INSTEAD OF INSERT ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_insert_$viewName')} +INSTEAD OF INSERT ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN SELECT CASE @@ -76,8 +77,8 @@ BEGIN INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('\$local', 1, 0, $maxOpId); END""", """ -CREATE TRIGGER ${quoteIdentifier('ps_view_update_$type')} -INSTEAD OF UPDATE ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_update_$viewName')} +INSTEAD OF UPDATE ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN SELECT CASE @@ -102,7 +103,7 @@ END""" } List createViewTriggerStatementsLocal(Table table) { - final type = table.name; + final viewName = table.viewName; final internalNameE = quoteIdentifier(table.internalName); final jsonFragment = table.columns @@ -112,23 +113,23 @@ List createViewTriggerStatementsLocal(Table table) { // Names in alphabetical order return [ """ -CREATE TRIGGER ${quoteIdentifier('ps_view_delete_$type')} -INSTEAD OF DELETE ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_delete_$viewName')} +INSTEAD OF DELETE ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN DELETE FROM $internalNameE WHERE id = OLD.id; END""", """ -CREATE TRIGGER ${quoteIdentifier('ps_view_insert_$type')} -INSTEAD OF INSERT ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_insert_$viewName')} +INSTEAD OF INSERT ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN INSERT INTO $internalNameE(id, data) SELECT NEW.id, json_object($jsonFragment); END""", """ -CREATE TRIGGER ${quoteIdentifier('ps_view_update_$type')} -INSTEAD OF UPDATE ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_update_$viewName')} +INSTEAD OF UPDATE ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN SELECT CASE @@ -144,6 +145,7 @@ END""" List createViewTriggerStatementsInsert(Table table) { final type = table.name; + final viewName = table.viewName; final jsonFragment = table.columns .map((column) => @@ -151,8 +153,8 @@ List createViewTriggerStatementsInsert(Table table) { .join(', '); return [ """ -CREATE TRIGGER ${quoteIdentifier('ps_view_insert_$type')} -INSTEAD OF INSERT ON ${quoteIdentifier(type)} +CREATE TRIGGER ${quoteIdentifier('ps_view_insert_$viewName')} +INSTEAD OF INSERT ON ${quoteIdentifier(viewName)} FOR EACH ROW BEGIN INSERT INTO ps_crud(tx_id, data) SELECT current_tx, json_object('op', 'PUT', 'type', ${quoteString(type)}, 'id', NEW.id, 'data', json(powersync_diff('{}', json_object($jsonFragment)))) FROM ps_tx WHERE id = 1; @@ -164,6 +166,10 @@ END""" /// /// Must be wrapped in a transaction. void updateSchema(sqlite.Database db, Schema schema) { + for (var table in schema.tables) { + table.validate(); + } + _createTablesAndIndexes(db, schema); final existingViewRows = db.select( @@ -172,8 +178,6 @@ void updateSchema(sqlite.Database db, Schema schema) { Set toRemove = {for (var row in existingViewRows) row['name']}; for (var table in schema.tables) { - assert(table.validName, "Invalid characters in table name: ${table.name}"); - toRemove.remove(table.name); var createViewOp = createViewStatement(table); diff --git a/test/offline_online_test.dart b/test/offline_online_test.dart new file mode 100644 index 00000000..af497a5b --- /dev/null +++ b/test/offline_online_test.dart @@ -0,0 +1,130 @@ +import 'package:powersync/powersync.dart'; +import 'package:test/test.dart'; + +import 'util.dart'; + +const assetId = "2290de4f-0488-4e50-abed-f8e8eb1d0b42"; +const userId = "3390de4f-0488-4e50-abed-f8e8eb1d0b42"; +const customerId = "4490de4f-0488-4e50-abed-f8e8eb1d0b42"; + +/// The schema contains two copies of each table - a local-only one, and +/// a online/synced one. Depending on the 'online' flag, one of those gets +/// the main 'assets' / 'customer' view name. +/// +/// For online, we have these views: +/// assets +/// local_assets +/// customers +/// local_customers +/// +/// For offline, the views become: +/// online_assets +/// assets +/// online_customers +/// customers +Schema makeSchema(bool online) { + String onlineName(String table) { + if (online) { + return table; + } else { + return "online_$table"; + } + } + + String localName(String table) { + if (online) { + return "local_$table"; + } else { + return table; + } + } + + final tables = [ + Table('assets', [ + Column.text('created_at'), + Column.text('make'), + Column.text('model'), + Column.text('serial_number'), + Column.integer('quantity'), + Column.text('user_id'), + Column.text('customer_id'), + Column.text('description'), + ], indexes: [ + Index('makemodel', [IndexedColumn('make'), IndexedColumn('model')]) + ]), + Table('customers', [Column.text('name'), Column.text('email')]), + ]; + + return Schema([ + for (var table in tables) + Table(table.name, table.columns, + indexes: table.indexes, viewName: onlineName(table.name)), + for (var table in tables) + Table.localOnly('local_${table.name}', table.columns, + indexes: table.indexes, viewName: localName(table.name)) + ]); +} + +void main() { + setupLogger(); + + group('Offline-online Tests', () { + late String path; + + setUp(() async { + path = dbPath(); + await cleanDb(path: path); + }); + + test('Switch from offline-only to online', () async { + // Start with "offline-only" schema. + // This does not record any operations to the crud queue. + final db = await setupPowerSync(path: path, schema: makeSchema(false)); + + await db.execute('INSERT INTO customers(id, name, email) VALUES(?, ?, ?)', + [customerId, 'test customer', 'test@example.org']); + await db.execute( + 'INSERT INTO assets(id, description, customer_id) VALUES(?, ?, ?)', + [assetId, 'test', customerId]); + await db + .execute('UPDATE assets SET description = description || ?', ['.']); + + expect( + await db.getAll('SELECT data FROM ps_crud ORDER BY id'), equals([])); + + // Now switch to the "online" schema + await db.updateSchema(makeSchema(true)); + + // Note that updateSchema cannot be called inside a transaction, and there + // is a possibility of crash between updating the schema, and when the data + // has been moved. It may be best to attempt the data move on every application + // start where the online schema is used, if there is any local_ data still present. + + await db.writeTransaction((tx) async { + // Copy local data to the "online" views. + // This records each operation to the crud queue. + await tx.execute('INSERT INTO customers SELECT * FROM local_customers'); + await tx.execute( + 'INSERT INTO assets(id, description, customer_id, user_id) SELECT id, description, customer_id, ? FROM local_assets', + [userId]); + + // Delete the "offline-only" data. + await tx.execute('DELETE FROM local_customers'); + await tx.execute('DELETE FROM local_assets'); + }); + + expect( + await db.getAll('SELECT data FROM ps_crud ORDER BY id'), + equals([ + { + 'data': + '{"op":"PUT","type":"customers","id":"$customerId","data":{"name":"test customer","email":"test@example.org"}}' + }, + { + 'data': + '{"op":"PUT","type":"assets","id":"$assetId","data":{"user_id":"$userId","customer_id":"$customerId","description":"test."}}' + } + ])); + }); + }); +}