Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1338,14 +1338,14 @@ export class ShapeStream<T extends Row<unknown> = Row>
url: fetchUrl.toString(),
})

const { metadata, data } = await response.json()
const batch = this.#messageParser.parse<Array<ChangeMessage<T>>>(
JSON.stringify(data),
const responseData = await response.json()
const batch = this.#messageParser.parseSnapshotData<ChangeMessage<T>>(
responseData.data,
schema
)

return {
metadata,
metadata: responseData.metadata,
data: batch,
}
}
Expand Down
52 changes: 45 additions & 7 deletions packages/typescript-client/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,56 @@ export class MessageParser<T extends Row<unknown>> {
typeof value === `object` &&
value !== null
) {
// Parse the row values
const row = value as Record<string, Value<GetExtensions<T>>>
Object.keys(row).forEach((key) => {
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
})

if (this.transformer) value = this.transformer(value)
return this.transformMessageValue(value, schema)
}
return value
}) as Result
}

/**
* Parse an array of ChangeMessages from a snapshot response.
* Applies type parsing and transformations to the value and old_value properties.
*/
parseSnapshotData<Result>(
messages: Array<unknown>,
schema: Schema
): Array<Result> {
return messages.map((message) => {
const msg = message as Record<string, unknown>

// Transform the value property if it exists
if (msg.value && typeof msg.value === `object` && msg.value !== null) {
msg.value = this.transformMessageValue(msg.value, schema)
}

// Transform the old_value property if it exists
if (
msg.old_value &&
typeof msg.old_value === `object` &&
msg.old_value !== null
) {
msg.old_value = this.transformMessageValue(msg.old_value, schema)
}

return msg as Result
})
}

/**
* Transform a message value or old_value object by parsing its columns.
*/
private transformMessageValue(
value: unknown,
schema: Schema
): Row<GetExtensions<T>> {
const row = value as Record<string, Value<GetExtensions<T>>>
Object.keys(row).forEach((key) => {
row[key] = this.parseRow(key, row[key] as NullableToken, schema)
})

return this.transformer ? this.transformer(row) : row
}

// Parses the message values using the provided parser based on the schema information
private parseRow(
key: string,
Expand Down