From d22630c0615480bd04a45270fb14b914a14c61e3 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 31 Aug 2023 19:18:49 +0300 Subject: [PATCH] chore: remove ack logic from the client as unused (#392) This PR simplifies the client by removing `_lastAck` and `_lastSent` from the process, and `ack_lsn` from the outbound replication state in the client. There are three parts to validity of these changes: 1. `this.outbound.ack_lsn` in the client was just unused. It was assigned in a bunch of places, but read only in one function (`getOutboundLogPositions`), where the only calling place (`SatelliteProcess#_performSnapshot`) didn't use that part of the return value. This PR removes this field, and simplifies function signatures appropriately. 2. `_lastSentRowId` in the client was set in a lot of places as well, but only usage place was to fill `client.outbound.enqueued_lsn`. This allowed the client to continue streaming to the server from the LSN it didn't sent before, however that logic is incorrect: `SatInStartReplicationReq` message sent by the server has an LSN in it which is the actual LSN the client is expected to start from - current code just didn't respect this (because of a bug in the server that didn't actually set that field correctly, which is also fixed in this PR). Correct behaviour is to keep track of enqueued LSN only for the lifetime of the client connection, because on reconnect server will tell the client where to start from - which makes keeping `_lastSentRowId` useless. This PR removes this field, as well as the preset row in the meta table with `lastSentRowId` key. 3. `_lastAckdRowId` is the most complicated to explain. 1. It was set based on the pings from the server, which are currently incorrect. There are changes in #386 which fix the acknowledges on the server to be only done on actual persistence to Postgres. If Electric had reached the transaction saving the persisted position to Postgres, that means it reached the original sent transaction itself and is about to send it out to clients. Hence, there is no difference in time between receiving the acknowledge and the transaction itself after the round-trip. 2. The queries in `_performSnapshot` used `rowid > _lastAckdRowId` check in all queries, but if have received back the original transaction, we have already deleted the irrelevant oplog entries. Furthermore, queries have equality checks on `timestamp` column that made `rowid` checks useless regardless of other context. 3. The other place where `_lastAckdRowId` was used, was in `_apply` call, which called `this._getEntries()`. `this._getEntries()` without arguments used to do a query with `rowid > _lastAckdRowId` to avoid including acknowledged rows in conflict resolution, however, as described above, any acknowledged rows can be safely considered garbage collected. _This is actually more correct now, because before we receive back the transaction and GC the oplog entry, all other transactions came before ours from Postgres point of view, and to replicate conflict resolution exactly we should use all oplog entries regardless if the server has seen them, persisted them, or whatever._ So this PR makes it so we rely only on GC of oplog for all queries, removes `_lastAckdRowId` property, as well as the preset row in the meta table with `lastAckdRowId` key. There is a follow-up to be made that we don't actually use ping data neither on the client nor on the server, but that's a separate PR. --------- Co-authored-by: Valter Balegas --- clients/typescript/src/electric/adapter.ts | 10 +- clients/typescript/src/migrators/schema.ts | 2 +- clients/typescript/src/satellite/client.ts | 97 ++++++----------- clients/typescript/src/satellite/index.ts | 6 +- clients/typescript/src/satellite/mock.ts | 34 +----- clients/typescript/src/satellite/process.ts | 101 ++++-------------- clients/typescript/src/util/types.ts | 17 +-- .../typescript/test/satellite/client.test.ts | 57 +--------- clients/typescript/test/satellite/common.ts | 1 - .../typescript/test/satellite/process.test.ts | 101 ++++-------------- .../test/satellite/process.timing.test.ts | 3 +- .../satellite_collector_producer.ex | 2 +- ...er_correctly_continues_the_replication.lux | 46 ++++++++ 13 files changed, 146 insertions(+), 331 deletions(-) create mode 100644 e2e/tests/03.12_server_correctly_continues_the_replication.lux diff --git a/clients/typescript/src/electric/adapter.ts b/clients/typescript/src/electric/adapter.ts index b31ea75b..fe2cea38 100644 --- a/clients/typescript/src/electric/adapter.ts +++ b/clients/typescript/src/electric/adapter.ts @@ -17,8 +17,14 @@ export interface DatabaseAdapter { // Query the database. query(statement: Statement): Promise - // Runs the provided function inside a transaction - // The function may not use async/await otherwise the transaction may commit before the queries are actually executed + /** + * Runs the provided __non-async__ function inside a transaction. + * + * The function may not use async/await otherwise the transaction may commit before + * the queries are actually executed. This is a limitation of some adapters, that the + * function passed to the transaction runs "synchronously" through callbacks without + * releasing the event loop. + */ transaction( f: (tx: Transaction, setResult: (res: T) => void) => void ): Promise diff --git a/clients/typescript/src/migrators/schema.ts b/clients/typescript/src/migrators/schema.ts index c3903c62..57f876a4 100644 --- a/clients/typescript/src/migrators/schema.ts +++ b/clients/typescript/src/migrators/schema.ts @@ -17,7 +17,7 @@ export const data = { //`-- Somewhere to track migrations\n`, `CREATE TABLE IF NOT EXISTS ${migrationsTable} (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n version TEXT NOT NULL UNIQUE,\n applied_at TEXT NOT NULL\n);`, //`-- Initialisation of the metadata table\n`, - `INSERT INTO ${metaTable} (key, value) VALUES ('compensations', 1), ('lastAckdRowId','0'), ('lastSentRowId', '0'), ('lsn', ''), ('clientId', ''), ('subscriptions', '');`, + `INSERT INTO ${metaTable} (key, value) VALUES ('compensations', 1), ('lsn', ''), ('clientId', ''), ('subscriptions', '');`, //`-- These are toggles for turning the triggers on and off\n`, `DROP TABLE IF EXISTS ${triggersTable};`, `CREATE TABLE ${triggersTable} (tablename TEXT PRIMARY KEY, flag INTEGER);`, diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index 177e037d..20c7fbdf 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -44,8 +44,6 @@ import { Socket, SocketFactory } from '../sockets/index' import _m0 from 'protobufjs/minimal.js' import { EventEmitter } from 'events' import { - AckCallback, - AckType, AuthResponse, DataChangeType, LSN, @@ -58,7 +56,6 @@ import { Record, Relation, SchemaChange, - OutgoingReplication, Transaction, StartReplicationResponse, StopReplicationResponse, @@ -69,6 +66,7 @@ import { DEFAULT_LOG_POS, typeEncoder, typeDecoder, + bytesToNumber, } from '../util/common' import { Client } from '.' import { SatelliteClientOpts, satelliteClientDefaults } from './config' @@ -116,8 +114,8 @@ export class SatelliteClient extends EventEmitter implements Client { private socketFactory: SocketFactory private socket?: Socket - private inbound: Replication - private outbound: OutgoingReplication + private inbound: Replication + private outbound: Replication // can only handle a single subscription at a time private subscriptionsDataCache: SubscriptionsDataCache @@ -215,17 +213,15 @@ export class SatelliteClient extends EventEmitter implements Client { this.subscriptionsDataCache = new SubscriptionsDataCache() } - private resetReplication( - enqueued?: LSN, - ack?: LSN, + private resetReplication( + last_lsn?: LSN, isReplicating?: ReplicationStatus - ): OutgoingReplication { + ): Replication { return { authenticated: false, isReplicating: isReplicating ? isReplicating : ReplicationStatus.STOPPED, relations: new Map(), - ack_lsn: ack, - enqueued_lsn: enqueued, + last_lsn: last_lsn, transactions: [], } } @@ -287,14 +283,8 @@ export class SatelliteClient extends EventEmitter implements Client { } close() { - this.outbound = this.resetReplication( - this.outbound.enqueued_lsn, - this.outbound.ack_lsn - ) - this.inbound = this.resetReplication( - this.inbound.enqueued_lsn, - this.inbound.ack_lsn - ) + this.outbound = this.resetReplication(this.outbound.last_lsn) + this.inbound = this.resetReplication(this.inbound.last_lsn) this.socketHandler = undefined @@ -345,7 +335,7 @@ export class SatelliteClient extends EventEmitter implements Client { } // Then set the replication state - this.inbound = this.resetReplication(lsn, lsn, ReplicationStatus.STARTING) + this.inbound = this.resetReplication(lsn, ReplicationStatus.STARTING) return this.rpc(request) } @@ -402,11 +392,9 @@ export class SatelliteClient extends EventEmitter implements Client { } this.outbound.transactions.push(transaction) - this.outbound.enqueued_lsn = transaction.lsn + this.outbound.last_lsn = transaction.lsn - if (this.throttledPushTransaction) { - this.throttledPushTransaction() - } + this.throttledPushTransaction?.() } private pushTransactions() { @@ -424,7 +412,6 @@ export class SatelliteClient extends EventEmitter implements Client { const satOpLog: SatOpLog = this.transactionToSatOpLog(next) this.sendMessage(satOpLog) - this.emit('ack_lsn', next.lsn, AckType.LOCAL_SEND) } } @@ -436,14 +423,6 @@ export class SatelliteClient extends EventEmitter implements Client { this.removeListener('error', callback) } - subscribeToAck(callback: AckCallback): void { - this.on('ack_lsn', callback) - } - - unsubscribeToAck(callback: AckCallback) { - this.removeListener('ack_lsn', callback) - } - subscribeToOutboundEvent(_event: 'started', callback: () => void): void { this.on('outbound_started', callback) } @@ -515,7 +494,7 @@ export class SatelliteClient extends EventEmitter implements Client { private sendMissingRelations( transaction: DataTransaction, - replication: Replication + replication: Replication ): void { transaction.changes.forEach((change) => { const relation = change.relation @@ -639,30 +618,28 @@ export class SatelliteClient extends EventEmitter implements Client { } private handleStartReq(message: SatInStartReplicationReq) { - Log.info(`received replication request ${JSON.stringify(message)}`) - if (this.outbound.isReplicating == ReplicationStatus.STOPPED) { - const replication = { - ...this.outbound, - ack_lsn: DEFAULT_LOG_POS, - enqueued_lsn: DEFAULT_LOG_POS, - } + Log.info( + `Server sent a replication request to start from ${bytesToNumber( + message.lsn + )}, and options ${JSON.stringify(message.options)}` + ) + if (this.outbound.isReplicating == ReplicationStatus.STOPPED) { + // Use server-sent LSN as the starting point for replication this.outbound = this.resetReplication( - replication.enqueued_lsn, - replication.ack_lsn, + message.lsn, ReplicationStatus.ACTIVE ) - const throttleOpts = { leading: true, trailing: true } this.throttledPushTransaction = throttle( () => this.pushTransactions(), this.opts.pushPeriod, - throttleOpts + { leading: true, trailing: true } ) const response = SatInStartReplicationResp.fromPartial({}) this.sendMessage(response) - this.emit('outbound_started', replication.enqueued_lsn) + this.emit('outbound_started', message.lsn) } else { const response = SatErrorResp.fromPartial({ errorType: SatErrorResp_ErrorCode.REPLICATION_FAILED, @@ -765,19 +742,18 @@ export class SatelliteClient extends EventEmitter implements Client { private handlePingReq() { Log.info( `respond to ping with last ack ${ - this.inbound.ack_lsn ? base64.fromBytes(this.inbound.ack_lsn) : 'NULL' + this.inbound.last_lsn ? base64.fromBytes(this.inbound.last_lsn) : 'NULL' }` ) - const pong = SatPingResp.fromPartial({ lsn: this.inbound.ack_lsn }) + const pong = SatPingResp.fromPartial({ lsn: this.inbound.last_lsn }) this.sendMessage(pong) } - // TODO: emit ping request to clear oplog. - private handlePingResp(message: SatPingResp) { - if (message.lsn) { - this.outbound.ack_lsn = message.lsn - this.emit('ack_lsn', message.lsn, AckType.REMOTE_COMMIT) - } + private handlePingResp(_message: SatPingResp) { + // TODO: This message is not used in any way right now. + // We might be dropping client-initiated pings completely. + // However, the server sends these messages without any prompting, + // so this handler cannot just throw an error } private handleError(error: SatErrorResp) { @@ -879,7 +855,7 @@ export class SatelliteClient extends EventEmitter implements Client { this.emit( 'transaction', transaction, - () => (this.inbound.ack_lsn = transaction.lsn) + () => (this.inbound.last_lsn = transaction.lsn) ) replication.transactions.splice(lastTxnIdx) } @@ -1037,15 +1013,8 @@ export class SatelliteClient extends EventEmitter implements Client { }).finally(() => clearTimeout(waitingFor)) } - resetOutboundLogPositions(sent: LSN, ack: LSN): void { - this.outbound = this.resetReplication(sent, ack) - } - - getOutboundLogPositions(): { enqueued: LSN; ack: LSN } { - return { - ack: this.outbound.ack_lsn ?? DEFAULT_LOG_POS, - enqueued: this.outbound.enqueued_lsn ?? DEFAULT_LOG_POS, - } + getLastSentLsn(): LSN { + return this.outbound.last_lsn ?? DEFAULT_LOG_POS } } diff --git a/clients/typescript/src/satellite/index.ts b/clients/typescript/src/satellite/index.ts index 1a02d660..ecb6657b 100644 --- a/clients/typescript/src/satellite/index.ts +++ b/clients/typescript/src/satellite/index.ts @@ -5,7 +5,6 @@ import { Migrator } from '../migrators/index' import { Notifier } from '../notifiers/index' import { SocketFactory } from '../sockets' import { - AckCallback, AuthResponse, ConnectivityState, DbName, @@ -85,10 +84,7 @@ export interface Client { callback: (transaction: Transaction) => Promise ): void enqueueTransaction(transaction: DataTransaction): void - subscribeToAck(callback: AckCallback): void - unsubscribeToAck(callback: AckCallback): void - resetOutboundLogPositions(sent?: LSN, ack?: LSN): void - getOutboundLogPositions(): { enqueued: LSN; ack: LSN } + getLastSentLsn(): LSN subscribeToOutboundEvent(event: 'started', callback: () => void): void unsubscribeToOutboundEvent(event: 'started', callback: () => void): void subscribeToError(callback: ErrorCallback): void diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index f099ee62..e10dc1c1 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -4,8 +4,6 @@ import { Migrator } from '../migrators/index' import { Notifier } from '../notifiers/index' import { sleepAsync } from '../util/timer' import { - AckCallback, - AckType, AuthResponse, DbName, LSN, @@ -131,7 +129,6 @@ export class MockSatelliteClient extends EventEmitter implements Client { inboundAck: Uint8Array = DEFAULT_LOG_POS outboundSent: Uint8Array = DEFAULT_LOG_POS - outboundAck: Uint8Array = DEFAULT_LOG_POS // to clear any pending timeouts timeouts: NodeJS.Timeout[] = [] @@ -238,12 +235,9 @@ export class MockSatelliteClient extends EventEmitter implements Client { isClosed(): boolean { return this.closed } - resetOutboundLogPositions(sent: Uint8Array, ack: Uint8Array): void { - this.outboundSent = sent - this.outboundAck = ack - } - getOutboundLogPositions(): { enqueued: Uint8Array; ack: Uint8Array } { - return { enqueued: this.outboundSent, ack: this.outboundAck } + + getLastSentLsn(): Uint8Array { + return this.outboundSent } connect(): Promise { this.closed = false @@ -300,28 +294,6 @@ export class MockSatelliteClient extends EventEmitter implements Client { enqueueTransaction(transaction: DataTransaction): void { this.outboundSent = transaction.lsn - - this.emit('ack_lsn', transaction.lsn, AckType.LOCAL_SEND) - - // simulate ping message effect - const t = setTimeout(() => { - this.outboundAck = transaction.lsn - this.emit('ack_lsn', transaction.lsn, AckType.REMOTE_COMMIT) - }, 500) - this.timeouts.push(t) - } - - subscribeToAck(callback: AckCallback): void { - this.on('ack_lsn', callback) - } - - unsubscribeToAck(callback: AckCallback): void { - this.removeListener('ack_lsn', callback) - } - - setOutboundLogPositions(sent: LSN, ack: LSN): void { - this.outboundSent = sent - this.outboundAck = ack } subscribeToOutboundEvent(_event: 'started', callback: () => void): void { diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 304cfae9..34fda1be 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -19,12 +19,10 @@ import { bytesToNumber, emptyPromise, getWaiter, - numberToBytes, uuid, } from '../util/common' import { QualifiedTablename } from '../util/tablename' import { - AckType, Change as Chg, ConnectivityState, DataChange, @@ -33,7 +31,6 @@ import { MigrationTable, Relation, RelationsCache, - Row, SatelliteError, SatelliteErrorCode, SchemaChange, @@ -94,8 +91,6 @@ type Uuid = `${string}-${string}-${string}-${string}-${string}` type MetaEntries = { clientId: Uuid | '' compensations: number - lastAckdRowId: `${number}` - lastSentRowId: `${number}` lsn: string | null subscriptions: string } @@ -137,8 +132,6 @@ export class SatelliteProcess implements Satellite { _potentialDataChangeSubscription?: string _throttledSnapshot: ThrottleFunction - _lastAckdRowId: number - _lastSentRowId: number _lsn?: LSN relations: RelationsCache @@ -176,10 +169,6 @@ export class SatelliteProcess implements Satellite { this.client = client this.opts = opts - - this._lastAckdRowId = 0 - this._lastSentRowId = 0 - this.relations = {} this.subscriptions = new InMemorySubscriptionsManager( @@ -277,14 +266,6 @@ export class SatelliteProcess implements Satellite { this.relations = await this._getLocalRelations() this.checkMaxSqlParameters() - this._lastAckdRowId = Number(await this._getMeta('lastAckdRowId')) - this._lastSentRowId = Number(await this._getMeta('lastSentRowId')) - - this.client.resetOutboundLogPositions( - numberToBytes(this._lastAckdRowId), - numberToBytes(this._lastSentRowId) - ) - const lsnBase64 = await this._getMeta('lsn') if (lsnBase64 && lsnBase64.length > 0) { this._lsn = base64.toBytes(lsnBase64) @@ -341,10 +322,8 @@ export class SatelliteProcess implements Satellite { setClientListeners(): void { this.client.subscribeToError(this._handleClientError.bind(this)) this.client.subscribeToRelations(this._updateRelations.bind(this)) + // FIXME: calling an async function in an event emitter this.client.subscribeToTransactions(this._applyTransaction.bind(this)) - // When a local transaction is sent, or an acknowledgement for - // a remote transaction commit is received, we update lsn records. - this.client.subscribeToAck(this._handleAck.bind(this)) this.client.subscribeToOutboundEvent( 'started', this._throttledSnapshot.bind(this) @@ -666,11 +645,6 @@ export class SatelliteProcess implements Satellite { ) } - async _handleAck(lsn: LSN, type: AckType) { - const decoded = bytesToNumber(lsn) - await this._ack(decoded, type == AckType.REMOTE_COMMIT) - } - async _connectivityStateChanged(status: ConnectivityState): Promise { this.connectivityState = status Log.debug(`connectivity state changed ${status}`) @@ -874,12 +848,11 @@ export class SatelliteProcess implements Satellite { WHERE rowid in ( SELECT rowid FROM ${oplog} WHERE timestamp is NULL - AND rowid > ? ORDER BY rowid ASC ) RETURNING * - `, - args: [timestamp.toISOString(), this._lastAckdRowId], + `, + args: [timestamp.toISOString()], } // For each first oplog entry per element, set `clearTags` array to previous tags from the shadow table @@ -895,12 +868,11 @@ export class SatelliteProcess implements Satellite { AND op.tablename = shadow.tablename AND op.primaryKey = shadow.primaryKey WHERE op.timestamp = ? - AND op.rowid > ? GROUP BY op.namespace, op.tablename, op.primaryKey ) AS updates WHERE updates.op_rowid = ${oplog}.rowid - `, - args: [timestamp.toISOString(), this._lastAckdRowId], + `, + args: [timestamp.toISOString()], } // For each affected shadow row, set new tag array, unless the last oplog operation was a DELETE @@ -910,15 +882,10 @@ export class SatelliteProcess implements Satellite { SELECT namespace, tablename, primaryKey, ? FROM ${oplog} AS op WHERE timestamp = ? - AND rowid > ? GROUP BY namespace, tablename, primaryKey HAVING rowid = max(rowid) AND optype != 'DELETE' - `, - args: [ - encodeTags([newTag]), - timestamp.toISOString(), - this._lastAckdRowId, - ], + `, + args: [encodeTags([newTag]), timestamp.toISOString()], } // And finally delete any shadow rows where the last oplog operation was a `DELETE` @@ -931,15 +898,14 @@ export class SatelliteProcess implements Satellite { INNER JOIN ${shadow} AS shadow ON shadow.namespace = op.namespace AND shadow.tablename = op.tablename AND shadow.primaryKey = op.primaryKey WHERE op.timestamp = ? - AND op.rowid > ? GROUP BY op.namespace, op.tablename, op.primaryKey HAVING op.rowid = max(op.rowid) AND op.optype = 'DELETE' ) DELETE FROM ${shadow} WHERE rowid IN _to_be_deleted - `, - args: [timestamp.toISOString(), this._lastAckdRowId], + `, + args: [timestamp.toISOString()], } // Execute the four queries above in a transaction, returning the results from the first query @@ -963,7 +929,7 @@ export class SatelliteProcess implements Satellite { if (oplogEntries.length > 0) this._notifyChanges(oplogEntries) if (!this.client.isClosed()) { - const { enqueued } = this.client.getOutboundLogPositions() + const enqueued = this.client.getLastSentLsn() const enqueuedLogPos = bytesToNumber(enqueued) // TODO: handle case where pending oplog is large @@ -1066,9 +1032,8 @@ export class SatelliteProcess implements Satellite { } async _getEntries(since?: number): Promise { - if (since === undefined) { - since = this._lastAckdRowId - } + // `rowid` is never below 0, so -1 means "everything" + since ??= -1 const oplog = this.opts.oplogTable.toString() const selectEntries = ` @@ -1353,28 +1318,6 @@ export class SatelliteProcess implements Satellite { else return [] } - async _ack(lsn: number, isAck: boolean): Promise { - if (lsn < this._lastAckdRowId || (lsn > this._lastSentRowId && isAck)) { - throw new Error('Invalid position') - } - - const meta = this.opts.metaTable.toString() - - const sql = ` UPDATE ${meta} SET value = ? WHERE key = ?` - const args = [ - `${lsn.toString()}`, - isAck ? 'lastAckdRowId' : 'lastSentRowId', - ] - - if (isAck) { - this._lastAckdRowId = lsn - await this.adapter.runInTransaction({ sql, args }) - } else { - this._lastSentRowId = lsn - await this.adapter.run({ sql, args }) - } - } - _setMetaStatement( key: K, value: MetaEntries[K] @@ -1429,7 +1372,7 @@ export class SatelliteProcess implements Satellite { return clientId } - private async _getLocalTableNames(): Promise { + private async _getLocalTableNames(): Promise<{ name: string }[]> { const notIn = [ this.opts.metaTable.tablename.toString(), this.opts.migrationsTable.tablename.toString(), @@ -1446,7 +1389,9 @@ export class SatelliteProcess implements Satellite { WHERE type = 'table' AND name NOT IN (${notIn.map(() => '?').join(',')}) ` - return await this.adapter.query({ sql: tables, args: notIn }) + return (await this.adapter.query({ sql: tables, args: notIn })) as { + name: string + }[] } // Fetch primary keys from local store and use them to identify incoming ops. @@ -1458,7 +1403,7 @@ export class SatelliteProcess implements Satellite { let id = 0 const schema = 'public' // TODO for (const table of tableNames) { - const tableName = table.name as any + const tableName = table.name const sql = 'SELECT * FROM pragma_table_info(?)' const args = [tableName] const columnsForTable = await this.adapter.query({ sql, args }) @@ -1491,14 +1436,14 @@ export class SatelliteProcess implements Satellite { return generateTag(instanceId, timestamp) } - async _garbageCollectOplog(commitTimestamp: Date) { + async _garbageCollectOplog(commitTimestamp: Date): Promise { const isoString = commitTimestamp.toISOString() const oplog = this.opts.oplogTable.tablename.toString() - const stmt = ` - DELETE FROM ${oplog} - WHERE timestamp = ?; - ` - await this.adapter.run({ sql: stmt, args: [isoString] }) + + await this.adapter.run({ + sql: `DELETE FROM ${oplog} WHERE timestamp = ?`, + args: [isoString], + }) } /** diff --git a/clients/typescript/src/util/types.ts b/clients/typescript/src/util/types.ts index d7959ac5..602b45ef 100644 --- a/clients/typescript/src/util/types.ts +++ b/clients/typescript/src/util/types.ts @@ -147,17 +147,12 @@ export function isDataChange(change: Change): change is DataChange { export type Record = { [key: string]: string | number | undefined | null } -export type Replication = { +export type Replication = { authenticated: boolean isReplicating: ReplicationStatus relations: Map - ack_lsn?: LSN - enqueued_lsn?: LSN - transactions: Transaction[] -} - -export type OutgoingReplication = Omit & { - transactions: DataTransaction[] // outgoing transactions cannot contain migrations + last_lsn: LSN | undefined + transactions: TransactionType[] } export type Relation = { @@ -184,12 +179,6 @@ export enum ReplicationStatus { ACTIVE, } -export enum AckType { - LOCAL_SEND, - REMOTE_COMMIT, -} - -export type AckCallback = (lsn: LSN, type: AckType) => void export type ErrorCallback = (error: SatelliteError) => void export type ConnectivityState = diff --git a/clients/typescript/test/satellite/client.test.ts b/clients/typescript/test/satellite/client.test.ts index 7ea105e8..5e0d5c14 100644 --- a/clients/typescript/test/satellite/client.test.ts +++ b/clients/typescript/test/satellite/client.test.ts @@ -9,7 +9,7 @@ import { } from '../../src/satellite/client' import { OplogEntry, toTransactions } from '../../src/satellite/oplog' import { WebSocketNodeFactory } from '../../src/sockets/node' -import { base64, bytesToNumber, numberToBytes } from '../../src/util/common' +import { base64, bytesToNumber } from '../../src/util/common' import { getObjFromString, getTypeFromCode, @@ -17,8 +17,6 @@ import { SatPbMsg, } from '../../src/util/proto' import { - AckType, - DataChangeType, Relation, SatelliteErrorCode, DataTransaction, @@ -367,10 +365,10 @@ test.serial('acknowledge lsn', async (t) => { await new Promise(async (res) => { client.on('transaction', (_t: DataTransaction, ack: any) => { - const lsn0 = client['inbound'].ack_lsn + const lsn0 = client['inbound'].last_lsn t.is(lsn0, undefined) ack() - const lsn1 = base64.fromBytes(client['inbound'].ack_lsn!) + const lsn1 = base64.fromBytes(client['inbound'].last_lsn!) t.is(lsn1, 'FAKE') res() }) @@ -520,55 +518,6 @@ test.serial('send transaction', async (t) => { }) }) -test('ack on send and pong', async (t) => { - await connectAndAuth(t.context) - const { client, server } = t.context - - const lsn_1 = numberToBytes(1) - - const startResp = Proto.SatInStartReplicationResp.fromPartial({}) - const pingResponse = Proto.SatPingResp.fromPartial({ lsn: lsn_1 }) - - server.nextResponses([startResp]) - server.nextResponses([]) - server.nextResponses([pingResponse]) - - await client.startReplication() - - const transaction: DataTransaction = { - lsn: lsn_1, - commit_timestamp: Long.UZERO, - changes: [ - { - relation: relations.parent, - type: DataChangeType.INSERT, - record: { id: 0 }, - tags: [], // actual value is not relevent here - }, - ], - } - - const res = new Promise((res) => { - let sent = false - client.subscribeToAck((lsn, type) => { - if (type == AckType.LOCAL_SEND) { - t.is(bytesToNumber(lsn), 1) - sent = true - } else if (sent && type == AckType.REMOTE_COMMIT) { - t.is(bytesToNumber(lsn), 1) - t.is(sent, true) - res() - } - }) - }) - - setTimeout(() => { - client.enqueueTransaction(transaction) - }, 100) - - await res -}) - test.serial('default and null test', async (t) => { await connectAndAuth(t.context) const { client, server } = t.context diff --git a/clients/typescript/test/satellite/common.ts b/clients/typescript/test/satellite/common.ts index e4ac117f..600a0bf1 100644 --- a/clients/typescript/test/satellite/common.ts +++ b/clients/typescript/test/satellite/common.ts @@ -100,7 +100,6 @@ export interface TestNotifier extends EventNotifier { } export interface TestSatellite extends Satellite { - _lastSentRowId: number _authState?: AuthState relations: RelationsCache initializing?: { diff --git a/clients/typescript/test/satellite/process.test.ts b/clients/typescript/test/satellite/process.test.ts index 81d373ef..231e5cb9 100644 --- a/clients/typescript/test/satellite/process.test.ts +++ b/clients/typescript/test/satellite/process.test.ts @@ -86,8 +86,6 @@ test('load metadata', async (t) => { const meta = await loadSatelliteMetaTable(adapter) t.deepEqual(meta, { compensations: 1, - lastAckdRowId: '0', - lastSentRowId: '0', lsn: '', clientId: '', subscriptions: '', @@ -841,44 +839,6 @@ test('merge incoming with empty local', async (t) => { }) }) -test('advance oplog cursor', async (t) => { - const { adapter, runMigrations, satellite } = t.context - await runMigrations() - - // fake current propagated rowId - satellite._lastSentRowId = 2 - - // Get tablenames. - const oplogTablename = opts.oplogTable.tablename - const metaTablename = opts.metaTable.tablename - - // Insert a couple of rows. - await adapter.run({ sql: `INSERT INTO main.parent(id) VALUES ('1'),('2')` }) - - // We have two rows in the oplog. - let rows = await adapter.query({ - sql: `SELECT count(rowid) as num_rows FROM ${oplogTablename}`, - }) - t.is(rows[0].num_rows, 2) - - // Ack. - await satellite._ack(2, true) - - // NOTE: The oplog is not clean! This is a current design decision to clear - // oplog only when receiving transaction that originated from Satellite in the - // first place. - rows = await adapter.query({ - sql: `SELECT count(rowid) as num_rows FROM ${oplogTablename}`, - }) - t.is(rows[0].num_rows, 2) - - // Verify the meta. - rows = await adapter.query({ - sql: `SELECT value FROM ${metaTablename} WHERE key = 'lastAckdRowId'`, - }) - t.is(rows[0].value, '2') -}) - test('compensations: referential integrity is enforced', async (t) => { const { adapter, runMigrations, satellite } = t.context await runMigrations() @@ -1007,14 +967,13 @@ test('compensations: using triggers with flag 0', async (t) => { await adapter.run({ sql: `PRAGMA foreign_keys = ON` }) await satellite._setMeta('compensations', 0) - satellite._lastSentRowId = 1 await adapter.run({ sql: `INSERT INTO main.parent(id, value) VALUES (1, '1')`, }) await satellite._setAuthState(authState) - await satellite._performSnapshot() - await satellite._ack(1, true) + const ts = await satellite._performSnapshot() + await satellite._garbageCollectOplog(ts) await adapter.run({ sql: `INSERT INTO main.child(id, parent) VALUES (1, 1)` }) await satellite._performSnapshot() @@ -1053,14 +1012,13 @@ test('compensations: using triggers with flag 1', async (t) => { await adapter.run({ sql: `PRAGMA foreign_keys = ON` }) await satellite._setMeta('compensations', 1) - satellite._lastSentRowId = 1 await adapter.run({ sql: `INSERT INTO main.parent(id, value) VALUES (1, '1')`, }) await satellite._setAuthState(authState) - await satellite._performSnapshot() - await satellite._ack(1, true) + const ts = await satellite._performSnapshot() + await satellite._garbageCollectOplog(ts) await adapter.run({ sql: `INSERT INTO main.child(id, parent) VALUES (1, 1)` }) await satellite._performSnapshot() @@ -1198,20 +1156,8 @@ test('get transactions from opLogEntries', async (t) => { t.deepEqual(opLog, expected) }) -test('rowid acks updates meta', async (t) => { - const { runMigrations, satellite, client, authState } = t.context - await runMigrations() - await satellite.start(authState) - - const lsn1 = numberToBytes(1) - client['emit']('ack_lsn', lsn1, false) - - const lsn = await satellite['_getMeta']('lastSentRowId') - t.is(lsn, '1') -}) - test('handling connectivity state change stops queueing operations', async (t) => { - const { runMigrations, satellite, adapter, authState, client } = t.context + const { runMigrations, satellite, adapter, authState } = t.context await runMigrations() await satellite.start(authState) @@ -1221,12 +1167,9 @@ test('handling connectivity state change stops queueing operations', async (t) = await satellite._performSnapshot() - const sentLsn = await satellite._getMeta('lastSentRowId') - t.is(sentLsn, '1') - await new Promise((r) => client.once('ack_lsn', () => r())) - - const acknowledgedLsn = await satellite._getMeta('lastAckdRowId') - t.is(acknowledgedLsn, '1') + // We should have sent (or at least enqueued to send) one row + const sentLsn = await satellite.client.getLastSentLsn() + t.deepEqual(sentLsn, numberToBytes(1)) await satellite._connectivityStateChanged('disconnected') @@ -1236,14 +1179,15 @@ test('handling connectivity state change stops queueing operations', async (t) = await satellite._performSnapshot() - const lsn1 = await satellite._getMeta('lastSentRowId') - t.is(lsn1, '1') + // Since connectivity is down, that row isn't yet sent + const lsn1 = await satellite.client.getLastSentLsn() + t.deepEqual(lsn1, sentLsn) + // Once connectivity is restored, we will immediately run a snapshot to send pending rows await satellite._connectivityStateChanged('available') - - await sleepAsync(200) - const lsn2 = await satellite._getMeta('lastSentRowId') - t.is(lsn2, '2') + await sleepAsync(200) // Wait for snapshot to run + const lsn2 = await satellite.client.getLastSentLsn() + t.deepEqual(lsn2, numberToBytes(2)) }) test('garbage collection is triggered when transaction from the same origin is replicated', async (t) => { @@ -1259,19 +1203,20 @@ test('garbage collection is triggered when transaction from the same origin is r sql: `UPDATE parent SET value = 'local', other = 2 WHERE id = 1;`, }) - let lsn = await satellite._getMeta('lastSentRowId') - t.is(lsn, '0') + // Before snapshot, we didn't send anything + const lsn1 = await satellite.client.getLastSentLsn() + t.deepEqual(lsn1, numberToBytes(0)) + // Snapshot sends these oplog entries await satellite._performSnapshot() - - lsn = await satellite._getMeta('lastSentRowId') - t.is(lsn, '2') - lsn = await satellite._getMeta('lastAckdRowId') + const lsn2 = await satellite.client.getLastSentLsn() + t.deepEqual(lsn2, numberToBytes(2)) const old_oplog = await satellite._getEntries() - let transactions = toTransactions(old_oplog, relations) + const transactions = toTransactions(old_oplog, relations) transactions[0].origin = satellite._authState!.clientId + // Transaction containing these oplogs is applies, which means we delete them await satellite._applyTransaction(transactions[0]) const new_oplog = await satellite._getEntries() t.deepEqual(new_oplog, []) diff --git a/clients/typescript/test/satellite/process.timing.test.ts b/clients/typescript/test/satellite/process.timing.test.ts index 73869ec3..bbb44a86 100644 --- a/clients/typescript/test/satellite/process.timing.test.ts +++ b/clients/typescript/test/satellite/process.timing.test.ts @@ -15,8 +15,7 @@ test.beforeEach(async (t) => makeContext(t, opts)) test.afterEach.always(clean) test('throttled snapshot respects window', async (t) => { - const { adapter, notifier, runMigrations, satellite, authState } = - t.context as any + const { adapter, notifier, runMigrations, satellite, authState } = t.context await runMigrations() await satellite._setAuthState(authState) diff --git a/components/electric/lib/electric/replication/satellite_collector_producer.ex b/components/electric/lib/electric/replication/satellite_collector_producer.ex index 4b62914e..fb11d6ce 100644 --- a/components/electric/lib/electric/replication/satellite_collector_producer.ex +++ b/components/electric/lib/electric/replication/satellite_collector_producer.ex @@ -36,7 +36,7 @@ defmodule Electric.Replication.SatelliteCollectorProducer do @impl GenStage def handle_call({:store_incoming_transactions, transactions}, _, state) do transactions - |> Stream.each(& &1.ack_fn()) + |> Stream.each(& &1.ack_fn.()) |> Stream.reject(&Enum.empty?(&1.changes)) |> Stream.with_index(state.next_key) |> Enum.to_list() diff --git a/e2e/tests/03.12_server_correctly_continues_the_replication.lux b/e2e/tests/03.12_server_correctly_continues_the_replication.lux new file mode 100644 index 00000000..6f6b90c7 --- /dev/null +++ b/e2e/tests/03.12_server_correctly_continues_the_replication.lux @@ -0,0 +1,46 @@ +[doc When client resumes, server correctly requests stream continuation] +[include _shared.luxinc] +[include _satellite_macros.luxinc] + +[invoke setup] + +[invoke setup_client 1 "electric_1" 5133] + +[shell pg_1] + [invoke migrate_items_table 20230504114018] + +[shell satellite_1] + ??[proto] recv: #SatInStartReplicationResp + [invoke node_await_table "items"] + + # First write gets propagated + [invoke node_await_insert "['hello from satellite - first']"] + +[shell pg_1] + [invoke wait-for "SELECT * FROM public.items;" "hello from satellite - first" 10 $psql] + +[shell satellite_1] + [progress stopping client] + !await client.stop(db) + ?$node + + # Verify that the client retrieves previously stored LSN when it reestablishes the replication connection. + [progress resuming client] + [invoke electrify_db "originalDb" "electric_1" 5133 "[]"] + ?$node + + -no previous LSN + + # Server requests replication. The `AAAAAQ==` here is [0, 0, 0, 1] UInt8Array, representing an integer 1 - the one rowid + ?+\[proto\] recv: #SatInStartReplicationReq\{lsn: AAAAAQ==, + ?\[proto\] send: #SatInStartReplicationReq\{lsn: [a-zA-Z0-9=]+, + ??[proto] recv: #SatInStartReplicationResp + + # Second write gets propagated + [invoke node_await_insert "['hello from satellite - second']"] + +[shell pg_1] + [invoke wait-for "SELECT * FROM public.items;" "hello from satellite - second" 10 $psql] + +[cleanup] + [invoke teardown]