Skip to content

Commit

Permalink
chore(client): Performance improvements on initial sync downstream pa…
Browse files Browse the repository at this point in the history
…th (#1363)

I profiled the downstream path for initial syncs, excluding SQLite
performance as that will be driver and storage dependent and I plan to
focus on that separately, and implemented some changes on the most
significant bottlenecks for that pathway.

The tests were done using 10k issues and 50k comments on Linearlite
(although now benchmarking suite @icehaunter made is available so I will
be using that), and performance improvements were checked in isolation
with JS perf tests as well (on Chrome/V8 but I avoided any super niche
optimizations)

In memory performance is improved for this path, excluding the execution
of the SQL statements the 10k issues and 50k comments go through
`_applySubscriptionData` in under 200ms on my machine.

One very significant overhead is the decoding, first of the Satellite
messages in the generated library which I don't think we can do much
about, but then also to JS variables with our decoders that spend a lot
of time and also cause lots of GC pauses. There are ways we could
optimise this path but I was reluctant to spend time on that, ultimately
a lot of the deserialization is pointless in the sense that it is
re-serialized for inserting to permanent storage, but if we want to keep
things modular and pluggable with e.g. JS SQLite drivers this might be
necessary.

Some individual commits contain messages with a brief description of the
performance bits, mostly I tried to reuse objects to reduce GC pauses
and that seemed to give the most benefit.

Measured performance with 10k issues and 50k comments of
`_handleSubscriptionData`:
on main: `265ms ± 36ms`
after improvements: `135ms ± 9ms`
  • Loading branch information
msfstef authored Jun 17, 2024
1 parent fbf8a4d commit b3c1bdc
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 141 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-lemons-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Minor performance improvements in downstream initial sync path
106 changes: 67 additions & 39 deletions clients/typescript/src/migrators/query-builder/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,35 +312,51 @@ export abstract class QueryBuilder {
const stmts: Statement[] = []
const columnCount = columns.length
const recordCount = records.length
let processed = 0
let positionalParam = 1
const pos = (i: number) => `${this.makePositionalParam(i)}`
const makeInsertPattern = () => {
const insertRow = Array.from({ length: columnCount }, () =>
pos(positionalParam++)
)

return ` (${insertRow.join(', ')})`
}

// Amount of rows we can insert at once
const batchMaxSize = Math.floor(maxParameters / columnCount)

// keep a temporary join array for joining strings, to avoid
// the overhead of generating a new array every time
const tempColJoinArray = Array.from({ length: columnCount })

let processed = 0
let prevInsertCount = -1
let insertPattern = ''
while (processed < recordCount) {
positionalParam = 1 // start counting parameters from 1 again
const currentInsertCount = Math.min(recordCount - processed, batchMaxSize)
let sql =
baseSql +
Array.from({ length: currentInsertCount }, makeInsertPattern).join(',')

// cache insert pattern as it is going to be the same for every batch
// of `batchMaxSize` - ideally we can externalize this cache since for a
// given adapter this is _always_ going to be the same
if (currentInsertCount !== prevInsertCount) {
insertPattern = Array.from(
{ length: currentInsertCount },
(_, recordIdx) => {
for (let i = 0; i < columnCount; i++) {
tempColJoinArray[i] = this.makePositionalParam(
recordIdx * columnCount + i + 1
)
}
return ` (${tempColJoinArray.join(', ')})`
}
).join(',')
}

let sql = baseSql + insertPattern

if (suffixSql !== '') {
sql += ' ' + suffixSql
}

const args = records
.slice(processed, processed + currentInsertCount)
.flatMap((record) => columns.map((col) => record[col] as SqlValue))
const args = []
for (let i = 0; i < currentInsertCount; i++) {
for (let j = 0; j < columnCount; j++) {
args.push(records[processed + i][columns[j]] as SqlValue)
}
}

processed += currentInsertCount
prevInsertCount = currentInsertCount
stmts.push({ sql, args })
}
return stmts
Expand Down Expand Up @@ -370,38 +386,50 @@ export abstract class QueryBuilder {
const stmts: Statement[] = []
const columnCount = columns.length
const recordCount = records.length
let processed = 0
let positionalParam = 1
const pos = (i: number) => this.makePositionalParam(i)
const makeWherePattern = () => {
const columnComparisons = Array.from(
{ length: columnCount },
(_, i) => `"${columns[i] as string}" = ${pos(positionalParam++)}`
)

return ` (${columnComparisons.join(' AND ')})`
}

// Amount of rows we can delete at once
const batchMaxSize = Math.floor(maxParameters / columnCount)

// keep a temporary join array for joining strings, to avoid
// the overhead of generating a new array every time
const tempColumnComparisonJoinArr = Array.from({ length: columnCount })

let processed = 0
let prevDeleteCount = -1
let deletePattern = ''
while (processed < recordCount) {
positionalParam = 1 // start counting parameters from 1 again
const currentDeleteCount = Math.min(recordCount - processed, batchMaxSize)
let sql =
baseSql +
Array.from({ length: currentDeleteCount }, makeWherePattern).join(
' OR '
)

// cache delete pattern as it is going to be the same for every batch
// of `batchMaxSize` - ideally we can externalize this cache since for a
// given adapter this is _always_ going to be the same
if (currentDeleteCount !== prevDeleteCount) {
deletePattern = Array.from(
{ length: currentDeleteCount },
(_, recordIdx) => {
for (let i = 0; i < columnCount; i++) {
tempColumnComparisonJoinArr[i] = `"${
columns[i] as string
}" = ${this.makePositionalParam(recordIdx * columnCount + i + 1)}`
}
return ` (${tempColumnComparisonJoinArr.join(' AND ')})`
}
).join(' OR')
}
let sql = baseSql + deletePattern

if (suffixSql !== '') {
sql += ' ' + suffixSql
}

const args = records
.slice(processed, processed + currentDeleteCount)
.flatMap((record) => columns.map((col) => record[col] as SqlValue))
const args = []
for (let i = 0; i < currentDeleteCount; i++) {
for (let j = 0; j < columnCount; j++) {
args.push(records[processed + i][columns[j]] as SqlValue)
}
}

processed += currentDeleteCount
prevDeleteCount = currentDeleteCount
stmts.push({ sql, args })
}
return stmts
Expand Down
25 changes: 13 additions & 12 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1491,18 +1491,19 @@ export function deserializeRow(
if (row === undefined) {
return undefined
}
return Object.fromEntries(
relation.columns.map((c, i) => {
let value
if (getMaskBit(row.nullsBitmask, i) === 1) {
value = null
} else {
const pgColumnType = getColumnType(dbDescription, relation.table, c)
value = deserializeColumnData(row.values[i], pgColumnType, decoder)
}
return [c.name, value]
})
)
return relation.columns.reduce((deserializedRow, c, i) => {
if (getMaskBit(row.nullsBitmask, i) === 1) {
deserializedRow[c.name] = null
} else {
const pgColumnType = getColumnType(dbDescription, relation.table, c)
deserializedRow[c.name] = deserializeColumnData(
row.values[i],
pgColumnType,
decoder
)
}
return deserializedRow
}, {} as DbRecord)
}

function calculateNumBytes(column_num: number): number {
Expand Down
11 changes: 6 additions & 5 deletions clients/typescript/src/satellite/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,12 @@ export function extractPK(c: DataChange) {
const columnValues = c.record ? c.record : c.oldRecord!

return primaryKeyToStr(
Object.fromEntries(
c.relation.columns
.filter((c) => c.primaryKey)
.map((col) => [col.name, columnValues[col.name]!])
)
c.relation.columns
.filter((c) => c.primaryKey)
.reduce((primaryKeyRec, col) => {
primaryKeyRec[col.name] = columnValues[col.name]!
return primaryKeyRec
}, {} as Record<string, boolean | string | number | Uint8Array>)
)
}

Expand Down
54 changes: 32 additions & 22 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ export class SatelliteProcess implements Satellite {
string,
{
relation: Relation
primaryKeyColNames: string[]
records: InitialDataChange['record'][]
table: QualifiedTablename
}
Expand All @@ -561,26 +562,37 @@ export class SatelliteProcess implements Satellite {
>[] = []

// Group all changes by table name to be able to insert them all together
const fullTableNameLookup = new Map<string, string>()
for (const op of changes) {
const qt = new QualifiedTablename(namespace, op.relation.table)
const tableName = qt.toString()

if (groupedChanges.has(tableName)) {
groupedChanges.get(tableName)?.records.push(op.record)
} else {
groupedChanges.set(tableName, {
let groupedChange
if (!fullTableNameLookup.has(op.relation.table)) {
const qt = new QualifiedTablename(namespace, op.relation.table)
const tableName = qt.toString()
fullTableNameLookup.set(op.relation.table, tableName)
groupedChange = {
relation: op.relation,
primaryKeyColNames: op.relation.columns
.filter((col) => col.primaryKey)
.map((col) => col.name),
records: [op.record],
table: qt,
})
}
groupedChanges.set(tableName, groupedChange)
} else {
groupedChange = groupedChanges.get(
fullTableNameLookup.get(op.relation.table)!
)!
groupedChange.records.push(op.record)
}

// Since we're already iterating changes, we can also prepare data for shadow table
const primaryKeyCols = op.relation.columns.reduce((agg, col) => {
if (col.primaryKey)
agg[col.name] = op.record[col.name] as string | number
return agg
}, {} as Record<string, string | number>)
const primaryKeyCols = groupedChange.primaryKeyColNames.reduce(
(agg, colName) => {
agg[colName] = op.record[colName] as string | number
return agg
},
{} as Record<string, string | number>
)

allArgsForShadowInsert.push({
namespace,
Expand Down Expand Up @@ -650,20 +662,18 @@ export class SatelliteProcess implements Satellite {
// because nobody uses them and we don't have the machinery to to a
// `RETURNING` clause in the middle of `runInTransaction`.
const notificationChanges: Change[] = []

groupedChanges.forEach(({ records, table, relation }) => {
const primaryKeyColNames = relation.columns
.filter((col) => col.primaryKey)
.map((col) => col.name)
groupedChanges.forEach(({ records, table, primaryKeyColNames }) => {
notificationChanges.push({
qualifiedTablename: table,
rowids: [],
recordChanges: records.map((change) => {
return {
primaryKey: Object.fromEntries(
primaryKeyColNames.map((col_name) => {
return [col_name, change[col_name]]
})
primaryKey: primaryKeyColNames.reduce(
(primaryKeyRec, col_name) => {
primaryKeyRec[col_name] = change[col_name]
return primaryKeyRec
},
{} as typeof change
),
type: 'INITIAL',
}
Expand Down
4 changes: 2 additions & 2 deletions clients/typescript/src/satellite/shapes/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class SubscriptionsDataCache extends EventEmitter {
subscriptionId: delivered.subscriptionId,
lsn: delivered.lsn,
data: delivered.transaction.map((t) =>
this.proccessShapeDataOperations(t, relations)
this.processShapeDataOperations(t, relations)
),
shapeReqToUuid: delivered.shapeReqToUuid,
}
Expand Down Expand Up @@ -259,7 +259,7 @@ export class SubscriptionsDataCache extends EventEmitter {
}
}

private proccessShapeDataOperations(
private processShapeDataOperations(
op: SatTransOp,
relations: Map<number, Relation>
): InitialDataChange {
Expand Down
31 changes: 20 additions & 11 deletions clients/typescript/src/util/encoders/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,30 @@ export const base64 = {
Uint8Array.from(BASE64.decode(string), (c) => c.charCodeAt(0)),
encode: (string: string) => base64.fromBytes(textEncoder.encode(string)),
decode: (string: string) => textDecoder.decode(base64.toBytes(string)),
}
} as const

// Initialize TextEncoder and TextDecoder late to prevent polyfill
// race conditions, and reuse instance for performance
let textEncoderInstance: (TextEncoder | TextEncoderLite) | null = null
let textDecoderInstance: (TextDecoder | TextDecoderLite) | null = null

export const textEncoder = {
encode: (string: string): Uint8Array =>
globalThis.TextEncoder
? new TextEncoder().encode(string)
: new TextEncoderLite().encode(string),
}
encode: (string: string): Uint8Array => {
textEncoderInstance ??= globalThis.TextEncoder
? new TextEncoder()
: new TextEncoderLite()
return textEncoderInstance.encode(string)
},
} as const

export const textDecoder = {
decode: (bytes: Uint8Array): string =>
globalThis.TextDecoder
? new TextDecoder().decode(bytes)
: new TextDecoderLite().decode(bytes),
}
decode: (bytes: Uint8Array): string => {
textDecoderInstance ??= globalThis.TextDecoder
? new TextDecoder()
: new TextDecoderLite()
return textDecoderInstance.decode(bytes)
},
} as const

export const trueByte = 't'.charCodeAt(0)
export const falseByte = 'f'.charCodeAt(0)
Expand Down
Loading

0 comments on commit b3c1bdc

Please sign in to comment.