Skip to content

Commit

Permalink
Bugfix/supabase upsert ids (FlowiseAI#2561)
Browse files Browse the repository at this point in the history
* fix upserting same id with supabase

* remove dedicated addvectors logic for ids

* Update pnpm-lock.yaml

* add fix for null id column
  • Loading branch information
HenryHengZJ authored Jun 3, 2024
1 parent f2a0ffe commit 272fd91
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions packages/components/nodes/vectorstores/Supabase/Supabase.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { flatten } from 'lodash'
import { v4 as uuidv4 } from 'uuid'
import { createClient } from '@supabase/supabase-js'
import { Document } from '@langchain/core/documents'
import { Embeddings } from '@langchain/core/embeddings'
Expand Down Expand Up @@ -213,7 +214,7 @@ class Supabase_VectorStores implements INode {
}

class SupabaseUpsertVectorStore extends SupabaseVectorStore {
async addVectors(vectors: number[][], documents: Document[]): Promise<string[]> {
async addVectors(vectors: number[][], documents: Document[], options?: { ids?: string[] | number[] }): Promise<string[]> {
if (vectors.length === 0) {
return []
}
Expand All @@ -223,23 +224,36 @@ class SupabaseUpsertVectorStore extends SupabaseVectorStore {
metadata: documents[idx].metadata
}))

let idx = 0
const { count } = await this.client.from(this.tableName).select('*', { count: 'exact', head: true })
if (count) {
idx = count
}

let returnedIds: string[] = []
for (let i = 0; i < rows.length; i += this.upsertBatchSize) {
const chunk = rows.slice(i, i + this.upsertBatchSize).map((row) => {
idx = idx += 1
return { id: idx, ...row }
const chunk = rows.slice(i, i + this.upsertBatchSize).map((row, j) => {
if (options?.ids) {
return { id: options.ids[i + j], ...row }
}
return row
})

const res = await this.client.from(this.tableName).upsert(chunk).select()
let res = await this.client.from(this.tableName).upsert(chunk).select()

if (res.error) {
throw new Error(`Error inserting: ${res.error.message} ${res.status} ${res.statusText}`)
// If the error is due to null value in column "id", we will generate a new id for the row
if (res.error.message.includes(`null value in column "id"`)) {
const chunk = rows.slice(i, i + this.upsertBatchSize).map((row, y) => {
if (options?.ids) {
return { id: options.ids[i + y], ...row }
}
return { id: uuidv4(), ...row }
})
res = await this.client.from(this.tableName).upsert(chunk).select()

if (res.error) {
throw new Error(`Error inserting: ${res.error.message} ${res.status} ${res.statusText}`)
}
} else {
throw new Error(`Error inserting: ${res.error.message} ${res.status} ${res.statusText}`)
}
}

if (res.data) {
returnedIds = returnedIds.concat(res.data.map((row) => row.id))
}
Expand Down

0 comments on commit 272fd91

Please sign in to comment.