Skip to content

Commit

Permalink
chore: remove ack logic from the client as unused (#392)
Browse files Browse the repository at this point in the history
This PR simplifies the client by removing `_lastAck` and `_lastSent`
from the process, and `ack_lsn` from the outbound replication state in
the client.

There are three parts to validity of these changes:
1. `this.outbound.ack_lsn` in the client was just unused. It was
assigned in a bunch of places, but read only in one function
(`getOutboundLogPositions`), where the only calling place
(`SatelliteProcess#_performSnapshot`) didn't use that part of the return
value.
This PR removes this field, and simplifies function signatures
appropriately.
2. `_lastSentRowId` in the client was set in a lot of places as well,
but only usage place was to fill `client.outbound.enqueued_lsn`. This
allowed the client to continue streaming to the server from the LSN it
didn't sent before, however that logic is incorrect:
`SatInStartReplicationReq` message sent by the server has an LSN in it
which is the actual LSN the client is expected to start from - current
code just didn't respect this (because of a bug in the server that
didn't actually set that field correctly, which is also fixed in this
PR). Correct behaviour is to keep track of enqueued LSN only for the
lifetime of the client connection, because on reconnect server will tell
the client where to start from - which makes keeping `_lastSentRowId`
useless.
This PR removes this field, as well as the preset row in the meta table
with `lastSentRowId` key.
3. `_lastAckdRowId` is the most complicated to explain.
  1. It was set based on the pings from the server, which are currently
incorrect. There are changes in #386 which fix the acknowledges on the
server to be only done on actual persistence to Postgres. If Electric
had reached the transaction saving the persisted position to Postgres,
that means it reached the original sent transaction itself and is about
to send it out to clients. Hence, there is no difference in time between
receiving the acknowledge and the transaction itself after the
round-trip.
  2. The queries in `_performSnapshot` used `rowid > _lastAckdRowId` check
in all queries, but if have received back the original transaction, we
have already deleted the irrelevant oplog entries. Furthermore, queries
have equality checks on `timestamp` column that made `rowid` checks
useless regardless of other context.
  3. The other place where `_lastAckdRowId` was used, was in `_apply`
call, which called `this._getEntries()`. `this._getEntries()` without
arguments used to do a query with `rowid > _lastAckdRowId` to avoid
including acknowledged rows in conflict resolution, however, as
described above, any acknowledged rows can be safely considered garbage
collected.
  _This is actually more correct now, because before we receive back the
transaction and GC the oplog entry, all other transactions came before
ours from Postgres point of view, and to replicate conflict resolution
exactly we should use all oplog entries regardless if the server has
seen them, persisted them, or whatever._
    
So this PR makes it so we rely only on GC of oplog for all queries,
removes `_lastAckdRowId` property, as well as the preset row in the meta
table with `lastAckdRowId` key.
    
There is a follow-up to be made that we don't actually use ping data
neither on the client nor on the server, but that's a separate PR.

---------

Co-authored-by: Valter Balegas <[email protected]>
  • Loading branch information
icehaunter and balegas authored Aug 31, 2023
1 parent 1b24995 commit d22630c
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 331 deletions.
10 changes: 8 additions & 2 deletions clients/typescript/src/electric/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ export interface DatabaseAdapter {
// Query the database.
query(statement: Statement): Promise<Row[]>

// Runs the provided function inside a transaction
// The function may not use async/await otherwise the transaction may commit before the queries are actually executed
/**
* Runs the provided __non-async__ function inside a transaction.
*
* The function may not use async/await otherwise the transaction may commit before
* the queries are actually executed. This is a limitation of some adapters, that the
* function passed to the transaction runs "synchronously" through callbacks without
* releasing the event loop.
*/
transaction<T>(
f: (tx: Transaction, setResult: (res: T) => void) => void
): Promise<T>
Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/migrators/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const data = {
//`-- Somewhere to track migrations\n`,
`CREATE TABLE IF NOT EXISTS ${migrationsTable} (\n id INTEGER PRIMARY KEY AUTOINCREMENT,\n version TEXT NOT NULL UNIQUE,\n applied_at TEXT NOT NULL\n);`,
//`-- Initialisation of the metadata table\n`,
`INSERT INTO ${metaTable} (key, value) VALUES ('compensations', 1), ('lastAckdRowId','0'), ('lastSentRowId', '0'), ('lsn', ''), ('clientId', ''), ('subscriptions', '');`,
`INSERT INTO ${metaTable} (key, value) VALUES ('compensations', 1), ('lsn', ''), ('clientId', ''), ('subscriptions', '');`,
//`-- These are toggles for turning the triggers on and off\n`,
`DROP TABLE IF EXISTS ${triggersTable};`,
`CREATE TABLE ${triggersTable} (tablename TEXT PRIMARY KEY, flag INTEGER);`,
Expand Down
97 changes: 33 additions & 64 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import { Socket, SocketFactory } from '../sockets/index'
import _m0 from 'protobufjs/minimal.js'
import { EventEmitter } from 'events'
import {
AckCallback,
AckType,
AuthResponse,
DataChangeType,
LSN,
Expand All @@ -58,7 +56,6 @@ import {
Record,
Relation,
SchemaChange,
OutgoingReplication,
Transaction,
StartReplicationResponse,
StopReplicationResponse,
Expand All @@ -69,6 +66,7 @@ import {
DEFAULT_LOG_POS,
typeEncoder,
typeDecoder,
bytesToNumber,
} from '../util/common'
import { Client } from '.'
import { SatelliteClientOpts, satelliteClientDefaults } from './config'
Expand Down Expand Up @@ -116,8 +114,8 @@ export class SatelliteClient extends EventEmitter implements Client {
private socketFactory: SocketFactory
private socket?: Socket

private inbound: Replication
private outbound: OutgoingReplication
private inbound: Replication<Transaction>
private outbound: Replication<DataTransaction>

// can only handle a single subscription at a time
private subscriptionsDataCache: SubscriptionsDataCache
Expand Down Expand Up @@ -215,17 +213,15 @@ export class SatelliteClient extends EventEmitter implements Client {
this.subscriptionsDataCache = new SubscriptionsDataCache()
}

private resetReplication(
enqueued?: LSN,
ack?: LSN,
private resetReplication<T = any>(
last_lsn?: LSN,
isReplicating?: ReplicationStatus
): OutgoingReplication {
): Replication<T> {
return {
authenticated: false,
isReplicating: isReplicating ? isReplicating : ReplicationStatus.STOPPED,
relations: new Map(),
ack_lsn: ack,
enqueued_lsn: enqueued,
last_lsn: last_lsn,
transactions: [],
}
}
Expand Down Expand Up @@ -287,14 +283,8 @@ export class SatelliteClient extends EventEmitter implements Client {
}

close() {
this.outbound = this.resetReplication(
this.outbound.enqueued_lsn,
this.outbound.ack_lsn
)
this.inbound = this.resetReplication(
this.inbound.enqueued_lsn,
this.inbound.ack_lsn
)
this.outbound = this.resetReplication(this.outbound.last_lsn)
this.inbound = this.resetReplication(this.inbound.last_lsn)

this.socketHandler = undefined

Expand Down Expand Up @@ -345,7 +335,7 @@ export class SatelliteClient extends EventEmitter implements Client {
}

// Then set the replication state
this.inbound = this.resetReplication(lsn, lsn, ReplicationStatus.STARTING)
this.inbound = this.resetReplication(lsn, ReplicationStatus.STARTING)
return this.rpc<StartReplicationResponse, SatInStartReplicationReq>(request)
}

Expand Down Expand Up @@ -402,11 +392,9 @@ export class SatelliteClient extends EventEmitter implements Client {
}

this.outbound.transactions.push(transaction)
this.outbound.enqueued_lsn = transaction.lsn
this.outbound.last_lsn = transaction.lsn

if (this.throttledPushTransaction) {
this.throttledPushTransaction()
}
this.throttledPushTransaction?.()
}

private pushTransactions() {
Expand All @@ -424,7 +412,6 @@ export class SatelliteClient extends EventEmitter implements Client {
const satOpLog: SatOpLog = this.transactionToSatOpLog(next)

this.sendMessage(satOpLog)
this.emit('ack_lsn', next.lsn, AckType.LOCAL_SEND)
}
}

Expand All @@ -436,14 +423,6 @@ export class SatelliteClient extends EventEmitter implements Client {
this.removeListener('error', callback)
}

subscribeToAck(callback: AckCallback): void {
this.on('ack_lsn', callback)
}

unsubscribeToAck(callback: AckCallback) {
this.removeListener('ack_lsn', callback)
}

subscribeToOutboundEvent(_event: 'started', callback: () => void): void {
this.on('outbound_started', callback)
}
Expand Down Expand Up @@ -515,7 +494,7 @@ export class SatelliteClient extends EventEmitter implements Client {

private sendMissingRelations(
transaction: DataTransaction,
replication: Replication
replication: Replication<DataTransaction>
): void {
transaction.changes.forEach((change) => {
const relation = change.relation
Expand Down Expand Up @@ -639,30 +618,28 @@ export class SatelliteClient extends EventEmitter implements Client {
}

private handleStartReq(message: SatInStartReplicationReq) {
Log.info(`received replication request ${JSON.stringify(message)}`)
if (this.outbound.isReplicating == ReplicationStatus.STOPPED) {
const replication = {
...this.outbound,
ack_lsn: DEFAULT_LOG_POS,
enqueued_lsn: DEFAULT_LOG_POS,
}
Log.info(
`Server sent a replication request to start from ${bytesToNumber(
message.lsn
)}, and options ${JSON.stringify(message.options)}`
)

if (this.outbound.isReplicating == ReplicationStatus.STOPPED) {
// Use server-sent LSN as the starting point for replication
this.outbound = this.resetReplication(
replication.enqueued_lsn,
replication.ack_lsn,
message.lsn,
ReplicationStatus.ACTIVE
)

const throttleOpts = { leading: true, trailing: true }
this.throttledPushTransaction = throttle(
() => this.pushTransactions(),
this.opts.pushPeriod,
throttleOpts
{ leading: true, trailing: true }
)

const response = SatInStartReplicationResp.fromPartial({})
this.sendMessage(response)
this.emit('outbound_started', replication.enqueued_lsn)
this.emit('outbound_started', message.lsn)
} else {
const response = SatErrorResp.fromPartial({
errorType: SatErrorResp_ErrorCode.REPLICATION_FAILED,
Expand Down Expand Up @@ -765,19 +742,18 @@ export class SatelliteClient extends EventEmitter implements Client {
private handlePingReq() {
Log.info(
`respond to ping with last ack ${
this.inbound.ack_lsn ? base64.fromBytes(this.inbound.ack_lsn) : 'NULL'
this.inbound.last_lsn ? base64.fromBytes(this.inbound.last_lsn) : 'NULL'
}`
)
const pong = SatPingResp.fromPartial({ lsn: this.inbound.ack_lsn })
const pong = SatPingResp.fromPartial({ lsn: this.inbound.last_lsn })
this.sendMessage(pong)
}

// TODO: emit ping request to clear oplog.
private handlePingResp(message: SatPingResp) {
if (message.lsn) {
this.outbound.ack_lsn = message.lsn
this.emit('ack_lsn', message.lsn, AckType.REMOTE_COMMIT)
}
private handlePingResp(_message: SatPingResp) {
// TODO: This message is not used in any way right now.
// We might be dropping client-initiated pings completely.
// However, the server sends these messages without any prompting,
// so this handler cannot just throw an error
}

private handleError(error: SatErrorResp) {
Expand Down Expand Up @@ -879,7 +855,7 @@ export class SatelliteClient extends EventEmitter implements Client {
this.emit(
'transaction',
transaction,
() => (this.inbound.ack_lsn = transaction.lsn)
() => (this.inbound.last_lsn = transaction.lsn)
)
replication.transactions.splice(lastTxnIdx)
}
Expand Down Expand Up @@ -1037,15 +1013,8 @@ export class SatelliteClient extends EventEmitter implements Client {
}).finally(() => clearTimeout(waitingFor))
}

resetOutboundLogPositions(sent: LSN, ack: LSN): void {
this.outbound = this.resetReplication(sent, ack)
}

getOutboundLogPositions(): { enqueued: LSN; ack: LSN } {
return {
ack: this.outbound.ack_lsn ?? DEFAULT_LOG_POS,
enqueued: this.outbound.enqueued_lsn ?? DEFAULT_LOG_POS,
}
getLastSentLsn(): LSN {
return this.outbound.last_lsn ?? DEFAULT_LOG_POS
}
}

Expand Down
6 changes: 1 addition & 5 deletions clients/typescript/src/satellite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { Migrator } from '../migrators/index'
import { Notifier } from '../notifiers/index'
import { SocketFactory } from '../sockets'
import {
AckCallback,
AuthResponse,
ConnectivityState,
DbName,
Expand Down Expand Up @@ -85,10 +84,7 @@ export interface Client {
callback: (transaction: Transaction) => Promise<void>
): void
enqueueTransaction(transaction: DataTransaction): void
subscribeToAck(callback: AckCallback): void
unsubscribeToAck(callback: AckCallback): void
resetOutboundLogPositions(sent?: LSN, ack?: LSN): void
getOutboundLogPositions(): { enqueued: LSN; ack: LSN }
getLastSentLsn(): LSN
subscribeToOutboundEvent(event: 'started', callback: () => void): void
unsubscribeToOutboundEvent(event: 'started', callback: () => void): void
subscribeToError(callback: ErrorCallback): void
Expand Down
34 changes: 3 additions & 31 deletions clients/typescript/src/satellite/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import { Migrator } from '../migrators/index'
import { Notifier } from '../notifiers/index'
import { sleepAsync } from '../util/timer'
import {
AckCallback,
AckType,
AuthResponse,
DbName,
LSN,
Expand Down Expand Up @@ -131,7 +129,6 @@ export class MockSatelliteClient extends EventEmitter implements Client {
inboundAck: Uint8Array = DEFAULT_LOG_POS

outboundSent: Uint8Array = DEFAULT_LOG_POS
outboundAck: Uint8Array = DEFAULT_LOG_POS

// to clear any pending timeouts
timeouts: NodeJS.Timeout[] = []
Expand Down Expand Up @@ -238,12 +235,9 @@ export class MockSatelliteClient extends EventEmitter implements Client {
isClosed(): boolean {
return this.closed
}
resetOutboundLogPositions(sent: Uint8Array, ack: Uint8Array): void {
this.outboundSent = sent
this.outboundAck = ack
}
getOutboundLogPositions(): { enqueued: Uint8Array; ack: Uint8Array } {
return { enqueued: this.outboundSent, ack: this.outboundAck }

getLastSentLsn(): Uint8Array {
return this.outboundSent
}
connect(): Promise<void> {
this.closed = false
Expand Down Expand Up @@ -300,28 +294,6 @@ export class MockSatelliteClient extends EventEmitter implements Client {

enqueueTransaction(transaction: DataTransaction): void {
this.outboundSent = transaction.lsn

this.emit('ack_lsn', transaction.lsn, AckType.LOCAL_SEND)

// simulate ping message effect
const t = setTimeout(() => {
this.outboundAck = transaction.lsn
this.emit('ack_lsn', transaction.lsn, AckType.REMOTE_COMMIT)
}, 500)
this.timeouts.push(t)
}

subscribeToAck(callback: AckCallback): void {
this.on('ack_lsn', callback)
}

unsubscribeToAck(callback: AckCallback): void {
this.removeListener('ack_lsn', callback)
}

setOutboundLogPositions(sent: LSN, ack: LSN): void {
this.outboundSent = sent
this.outboundAck = ack
}

subscribeToOutboundEvent(_event: 'started', callback: () => void): void {
Expand Down
Loading

0 comments on commit d22630c

Please sign in to comment.