From 07f655a3c00ea3900cb37ee8730bf119bde08971 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Tue, 3 Dec 2024 16:47:32 +0800 Subject: [PATCH 1/7] perf: avoid unneeded steps when flushing delta --- binlogreplication/binlog_replica_applier.go | 6 +- binlogreplication/writer.go | 2 +- delta/controller.go | 432 ++++++++++++++------ pgserver/logrepl/replication.go | 10 +- replica/replication.go | 4 +- 5 files changed, 331 insertions(+), 123 deletions(-) diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index 2283617e..054dc885 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -1245,6 +1245,10 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.FlushReason) error { + conn, err := adapter.GetConn(ctx) + if err != nil { + return err + } tx, err := adapter.GetCatalogTxn(ctx, nil) if err != nil { return err @@ -1252,7 +1256,7 @@ func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.F defer a.deltaBufSize.Store(0) - if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, tx, reason); err != nil { + if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, conn, tx, reason); err != nil { ctx.GetLogger().Errorf("Failed to flush changelog: %v", err.Error()) MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, err.Error()) } diff --git a/binlogreplication/writer.go b/binlogreplication/writer.go index 1a107d41..3a6ec088 100644 --- a/binlogreplication/writer.go +++ b/binlogreplication/writer.go @@ -51,7 +51,7 @@ type TableWriterProvider interface { ) (DeltaAppender, error) // FlushDelta writes the accumulated changes to the database. - FlushDeltaBuffer(ctx *sql.Context, tx *stdsql.Tx, reason delta.FlushReason) error + FlushDeltaBuffer(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error // DiscardDeltaBuffer discards the accumulated changes. DiscardDeltaBuffer(ctx *sql.Context) diff --git a/delta/controller.go b/delta/controller.go index 687fb70f..228810eb 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -1,22 +1,23 @@ package delta import ( - "bytes" stdsql "database/sql" "fmt" + "hash/maphash" "math/bits" "strconv" "strings" "sync" - "unsafe" - "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/apecloud/myduckserver/binlog" "github.com/apecloud/myduckserver/catalog" "github.com/apecloud/myduckserver/pgtypes" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/types" "github.com/jackc/pgx/v5/pgtype" + "github.com/marcboeker/go-duckdb" "github.com/sirupsen/logrus" ) @@ -29,11 +30,13 @@ type FlushStats struct { type DeltaController struct { mutex sync.Mutex tables map[tableIdentifier]*DeltaAppender + seed maphash.Seed } func NewController() *DeltaController { return &DeltaController{ tables: make(map[tableIdentifier]*DeltaAppender), + seed: maphash.MakeSeed(), } } @@ -68,7 +71,7 @@ func (c *DeltaController) Close() { } // Flush writes the accumulated changes to the database. -func (c *DeltaController) Flush(ctx *sql.Context, tx *stdsql.Tx, reason FlushReason) (FlushStats, error) { +func (c *DeltaController) Flush(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason FlushReason) (FlushStats, error) { c.mutex.Lock() defer c.mutex.Unlock() @@ -93,16 +96,13 @@ func (c *DeltaController) Flush(ctx *sql.Context, tx *stdsql.Tx, reason FlushRea // See: // https://duckdb.org/docs/sql/indexes.html#limitations-of-art-indexes // https://github.com/duckdb/duckdb/issues/14133 - var ( - // Share the buffer among all tables. - buf bytes.Buffer - stats FlushStats - ) + + var stats FlushStats for table, appender := range c.tables { deltaRowCount := appender.RowCount() if deltaRowCount > 0 { - if err := c.updateTable(ctx, tx, table, appender, &buf, &stats); err != nil { + if err := c.updateTable(ctx, conn, tx, table, appender, &stats); err != nil { return stats, err } } @@ -135,121 +135,247 @@ func (c *DeltaController) Flush(ctx *sql.Context, tx *stdsql.Tx, reason FlushRea func (c *DeltaController) updateTable( ctx *sql.Context, + conn *stdsql.Conn, tx *stdsql.Tx, table tableIdentifier, appender *DeltaAppender, - buf *bytes.Buffer, stats *FlushStats, ) error { if tx == nil { return fmt.Errorf("no active transaction") } + defer appender.ResetCounters() + + // We consider the following cases: + // 1. INSERT only - no DELETE or UPDATE. In this case, we can do a simple INSERT INTO in an optimized way, + // without the deduplication step (as the source has confirmed that there are no duplicates) and the DELETE step. + // The data can go directly from the delta view to the base table. + // 2. DELETE only - no INSERT or UPDATE. In this case, we can do a simple DELETE FROM in an optimized way, + // without the the INSERT step and the deduplication step (as the source has confirmed that there are no duplicates). + // The delta view can be directly used to delete rows from the base table, without the need for a temporary table. + // 3. INSERT + non-primary-key UPDATE - no DELETE. In this case, we can skip the DELETE step. + // Therefore, the temporary table is not needed as the delta view will be read only once. + // 4. The general case - INSERT, DELETE, and UPDATE. In this case, we need to create a temporary table + // to store the deduplicated delta and then do the INSERT and DELETE steps. + + // Identify the types of changes in the delta + hasInserts := appender.counters.event.insert > 0 + hasDeletes := appender.counters.event.delete > 0 + hasUpdates := appender.counters.event.update > 0 + + switch { + case hasInserts && !hasDeletes && !hasUpdates: + // Case 1: INSERT only + return c.handleInsertOnly(ctx, conn, tx, table, appender, stats) + case hasDeletes && !hasInserts && !hasUpdates: + // Case 2: DELETE only + return c.handleDeleteOnly(ctx, conn, tx, table, appender, stats) + case appender.counters.action.delete == 0: + // Case 3: INSERT + non-primary-key UPDATE + return c.handleZeroDelete(ctx, conn, tx, table, appender, stats) + default: + // Case 4: General case + return c.handleGeneralCase(ctx, conn, tx, table, appender, stats) + } +} - buf.Reset() - - schema := appender.BaseSchema() // schema of the base table +// Helper function to build the Arrow record and register the view +func (c *DeltaController) prepareArrowView( + conn *stdsql.Conn, + table tableIdentifier, + appender *DeltaAppender, +) (viewName string, close func(), err error) { record := appender.Build() - defer func() { + + var ar *duckdb.Arrow + err = conn.Raw(func(driverConn any) error { + var err error + ar, err = duckdb.NewArrowFromConn(driverConn.(*duckdb.Conn)) + return err + }) + if err != nil { record.Release() - appender.ResetCounters() - }() + return "", nil, err + } - // fmt.Println("record:", record) + reader, err := array.NewRecordReader(record.Schema(), []arrow.Record{record}) + if err != nil { + record.Release() + return "", nil, err + } + + // Register the Arrow view + hash := maphash.String(c.seed, table.dbName+"\x00"+table.tableName) + viewName = "__sys_view_arrow_delta_" + strconv.FormatUint(hash, 16) + "__" + release, err := ar.RegisterView(reader, viewName) + if err != nil { + reader.Release() + record.Release() + return "", nil, err + } - // TODO(fan): Switch to zero-copy Arrow ingestion once this PR is merged: - // https://github.com/marcboeker/go-duckdb/pull/283 - w := ipc.NewWriter(buf, ipc.WithSchema(record.Schema())) - if err := w.Write(record); err != nil { + close = func() { + release() + reader.Release() + record.Release() + } + return viewName, close, nil +} + +func (c *DeltaController) handleInsertOnly( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { + return err + } + defer release() + + // Perform direct INSERT without deduplication + var b strings.Builder + b.Grow(128) + + b.WriteString("INSERT INTO ") + b.WriteString(catalog.ConnectIdentifiersANSI(table.dbName, table.tableName)) + b.WriteString(" SELECT ") + buildColumnList(b, appender.BaseSchema()) + b.WriteString(" FROM ") + b.WriteString(viewName) + + result, err := tx.ExecContext(ctx, b.String()) + if err != nil { return err } - if err := w.Close(); err != nil { + + affected, err := result.RowsAffected() + if err != nil { + return err + } + stats.Insertions += affected + stats.DeltaSize += affected + + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Inserted") + } + + return nil +} + +func (c *DeltaController) handleDeleteOnly( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { return err } - bytes := buf.Bytes() - size := len(bytes) - ptr := unsafe.Pointer(&bytes[0]) - ipcSQL := fmt.Sprintf( - " FROM scan_arrow_ipc([{ptr: %d::ubigint, size: %d::ubigint}])", - uintptr(ptr), size, - ) + defer release() qualifiedTableName := catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + pk := getPrimaryKeyStruct(appender.BaseSchema()) - pkColumns := make([]int, 0, 1) // Most tables have a single-column primary key - for i, col := range schema { - if col.PrimaryKey { - pkColumns = append(pkColumns, i) - } + // Perform direct DELETE without deduplication + deleteSQL := "DELETE FROM " + qualifiedTableName + + " WHERE " + pk + " IN (SELECT " + pk + " FROM " + viewName + ")" + result, err := tx.ExecContext(ctx, deleteSQL) + if err != nil { + return err } - pkList := catalog.QuoteIdentifierANSI(schema[pkColumns[0]].Name) - for _, i := range pkColumns[1:] { - pkList += ", " + catalog.QuoteIdentifierANSI(schema[i].Name) + + affected, err := result.RowsAffected() + if err != nil { + return err } + stats.Deletions += affected + stats.DeltaSize += affected - // Use the following SQL to get the latest view of the rows being updated. - // - // SELECT r[0] as action, ... - // FROM ( - // SELECT - // pk1, pk2, ..., - // LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r - // FROM delta - // GROUP BY pk1, pk2, ... - // ) - // - // Note that an update generates two rows: one for DELETE and one for INSERT. - // So the numeric value of DELETE action MUST be smaller than that of INSERT. - augmentedSchema := appender.Schema() - var builder strings.Builder - builder.Grow(512) - if appender.counters.event.delete > 0 || appender.counters.event.update > 0 { // sometimes UPDATE does not DELETE pre-image - builder.WriteString("SELECT ") - builder.WriteString("r[1] AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) - for i, col := range augmentedSchema[1:] { - builder.WriteString(", r[") - builder.WriteString(strconv.Itoa(i + 2)) - builder.WriteString("]") - if isTimestampType(col.Type) { - builder.WriteString("::TIMESTAMP") - } - builder.WriteString(" AS ") - builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) - } - builder.WriteString(" FROM (SELECT ") - builder.WriteString(pkList) - builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") - builder.WriteString(ipcSQL) - builder.WriteString(" GROUP BY ") - builder.WriteString(pkList) - builder.WriteString(")") - } else { - // For pure INSERTs, since the source has confirmed that there are no duplicates, - // we can skip the deduplication step. - builder.WriteString("SELECT ") - builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) - for _, col := range augmentedSchema[1:] { - builder.WriteString(", ") - builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) - if types.IsTimestampType(col.Type) { - builder.WriteString("::TIMESTAMP") - } - } - builder.WriteString(ipcSQL) + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Deleted") } - condenseDeltaSQL := builder.String() - var ( - result stdsql.Result - affected int64 - err error - ) + return nil +} - // Create a temporary table to store the latest delta view. - result, err = tx.ExecContext(ctx, "CREATE OR REPLACE TEMP TABLE delta AS "+condenseDeltaSQL) - if err == nil { - affected, err = result.RowsAffected() +func (c *DeltaController) handleZeroDelete( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { + return err + } + defer release() + + condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender.Schema(), getPrimaryKeyList(appender.BaseSchema())) + + insertSQL := "INSERT OR REPLACE INTO " + + catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM (" + condenseDeltaSQL + ")" + result, err := tx.ExecContext(ctx, insertSQL) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + stats.Insertions += affected + stats.DeltaSize += affected + + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Upserted") + } + + return nil +} + +func (c *DeltaController) handleGeneralCase( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + viewName, release, err := c.prepareArrowView(conn, table, appender) + if err != nil { + return err + } + + // Create a temporary table to store the latest delta view + condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender.Schema(), getPrimaryKeyList(appender.BaseSchema())) + result, err := tx.ExecContext(ctx, "CREATE OR REPLACE TEMP TABLE delta AS "+condenseDeltaSQL) + release() // release the Arrow view immediately + if err != nil { + return err } + affected, err := result.RowsAffected() if err != nil { return err } @@ -258,17 +384,16 @@ func (c *DeltaController) updateTable( if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ - "table": qualifiedTableName, + "db": table.dbName, + "table": table.tableName, "rows": affected, }).Debug("Delta created") } + qualifiedTableName := catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + // Insert or replace new rows (action = INSERT) into the base table. - insertSQL := "INSERT " - if appender.counters.event.delete > 0 || appender.counters.event.update > 0 { // sometimes UPDATE does not DELETE pre-image - insertSQL += "OR REPLACE " - } - insertSQL += "INTO " + + insertSQL := "INSERT OR REPLACE INTO " + qualifiedTableName + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.InsertRowEvent)) @@ -283,14 +408,10 @@ func (c *DeltaController) updateTable( if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ - "table": qualifiedTableName, + "db": table.dbName, + "table": table.tableName, "rows": affected, - }).Debug("Inserted") - } - - // If there are no rows to delete, we can skip the DELETE step. - if appender.counters.action.delete == 0 { - return nil + }).Debug("Upserted") } // Delete rows that have been deleted. @@ -298,10 +419,7 @@ func (c *DeltaController) updateTable( // which is more efficient than ordinary INNER JOIN. // DuckDB does not support multiple columns in `IN` clauses, // so we need to handle this case separately using the `row()` function. - inTuple := pkList - if len(pkColumns) > 1 { - inTuple = "row(" + pkList + ")" - } + inTuple := getPrimaryKeyStruct(appender.BaseSchema()) deleteSQL := "DELETE FROM " + qualifiedTableName + " WHERE " + inTuple + " IN (SELECT " + inTuple + "FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.DeleteRowEvent)) + ")" @@ -335,7 +453,8 @@ func (c *DeltaController) updateTable( if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ - "table": qualifiedTableName, + "db": table.dbName, + "table": table.tableName, "rows": affected, }).Debug("Deleted") } @@ -343,6 +462,87 @@ func (c *DeltaController) updateTable( return nil } +// Helper function to build column list with timestamp handling +func buildColumnList(b strings.Builder, schema sql.Schema) { + for i, col := range schema { + if i > 0 { + b.WriteString(", ") + } + b.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + if isTimestampType(col.Type) { + b.WriteString("::TIMESTAMP") + } + } +} + +// Helper function to get the primary key. For composite primary keys, `row()` is used. +func getPrimaryKeyStruct(schema sql.Schema) string { + pks := make([]string, 0, 1) + for _, col := range schema { + if col.PrimaryKey { + pks = append(pks, catalog.QuoteIdentifierANSI(col.Name)) + } + } + if len(pks) == 0 { + return "" + } else if len(pks) == 1 { + return pks[0] + } + return "row(" + strings.Join(pks, ", ") + ")" +} + +// Helper function to get the primary key list. +func getPrimaryKeyList(schema sql.Schema) string { + pks := make([]string, 0, 1) + for _, col := range schema { + if col.PrimaryKey { + pks = append(pks, catalog.QuoteIdentifierANSI(col.Name)) + } + } + return strings.Join(pks, ", ") +} + +func buildCondenseDeltaSQL(viewName string, augmentedSchema sql.Schema, pkList string) string { + var builder strings.Builder + builder.Grow(512) + // Use the following SQL to get the latest view of the rows being updated. + // + // SELECT r[0] as action, ... + // FROM ( + // SELECT + // pk1, pk2, ..., + // LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r + // FROM delta + // GROUP BY pk1, pk2, ... + // ) + // + // Note that an update generates two rows: one for DELETE and one for INSERT. + // So the numeric value of DELETE action MUST be smaller than that of INSERT. + builder.Grow(512) + builder.WriteString("SELECT ") + builder.WriteString("r[1] AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(augmentedSchema[0].Name)) + for i, col := range augmentedSchema[1:] { + builder.WriteString(", r[") + builder.WriteString(strconv.Itoa(i + 2)) + builder.WriteString("]") + if isTimestampType(col.Type) { + builder.WriteString("::TIMESTAMP") + } + builder.WriteString(" AS ") + builder.WriteString(catalog.QuoteIdentifierANSI(col.Name)) + } + builder.WriteString(" FROM (SELECT ") + builder.WriteString(pkList) + builder.WriteString(", LAST(ROW(*COLUMNS(*)) ORDER BY txn_group, txn_seq, txn_stmt, action) AS r") + builder.WriteString(" FROM ") + builder.WriteString(viewName) + builder.WriteString(" GROUP BY ") + builder.WriteString(pkList) + builder.WriteString(")") + return builder.String() +} + func isTimestampType(t sql.Type) bool { if types.IsTimestampType(t) { return true diff --git a/pgserver/logrepl/replication.go b/pgserver/logrepl/replication.go index ea4a9037..c76c9a46 100644 --- a/pgserver/logrepl/replication.go +++ b/pgserver/logrepl/replication.go @@ -981,6 +981,10 @@ func (r *LogicalReplicator) commitOngoingTxnIfClean(state *replicationState, rea // commitOngoingTxn commits the current transaction func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReason delta.FlushReason) error { + conn, err := adapter.GetConn(state.replicaCtx) + if err != nil { + return err + } tx := adapter.TryGetTxn(state.replicaCtx) if tx == nil { return nil @@ -990,7 +994,7 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso defer adapter.CloseTxn(state.replicaCtx) // Flush the delta buffer if too large - err := r.flushDeltaBuffer(state, tx, flushReason) + err = r.flushDeltaBuffer(state, conn, tx, flushReason) if err != nil { return err } @@ -1018,12 +1022,12 @@ func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReaso } // flushDeltaBuffer flushes the accumulated changes in the delta buffer -func (r *LogicalReplicator) flushDeltaBuffer(state *replicationState, tx *stdsql.Tx, reason delta.FlushReason) error { +func (r *LogicalReplicator) flushDeltaBuffer(state *replicationState, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error { defer func() { state.deltaBufSize = 0 }() - _, err := state.deltas.Flush(state.replicaCtx, tx, reason) + _, err := state.deltas.Flush(state.replicaCtx, conn, tx, reason) return err } diff --git a/replica/replication.go b/replica/replication.go index 5b2f07fc..64f07f57 100644 --- a/replica/replication.go +++ b/replica/replication.go @@ -83,8 +83,8 @@ func (twp *tableWriterProvider) GetDeltaAppender( return twp.controller.GetDeltaAppender(databaseName, tableName, schema) } -func (twp *tableWriterProvider) FlushDeltaBuffer(ctx *sql.Context, tx *stdsql.Tx, reason delta.FlushReason) error { - _, err := twp.controller.Flush(ctx, tx, reason) +func (twp *tableWriterProvider) FlushDeltaBuffer(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error { + _, err := twp.controller.Flush(ctx, conn, tx, reason) return err } From 182437b3372d60941ad697f6c483ef043c19b371 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Tue, 3 Dec 2024 16:56:45 +0800 Subject: [PATCH 2/7] Fix use-by-value --- delta/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/delta/controller.go b/delta/controller.go index 228810eb..9b61e6fc 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -243,7 +243,7 @@ func (c *DeltaController) handleInsertOnly( b.WriteString("INSERT INTO ") b.WriteString(catalog.ConnectIdentifiersANSI(table.dbName, table.tableName)) b.WriteString(" SELECT ") - buildColumnList(b, appender.BaseSchema()) + buildColumnList(&b, appender.BaseSchema()) b.WriteString(" FROM ") b.WriteString(viewName) @@ -463,7 +463,7 @@ func (c *DeltaController) handleGeneralCase( } // Helper function to build column list with timestamp handling -func buildColumnList(b strings.Builder, schema sql.Schema) { +func buildColumnList(b *strings.Builder, schema sql.Schema) { for i, col := range schema { if i > 0 { b.WriteString(", ") From 653a948304b3f5f06de7c8e7a0b9fbf2279a716f Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Tue, 3 Dec 2024 22:09:03 +0800 Subject: [PATCH 3/7] Save work --- delta/controller.go | 57 +++++++++++++++++++++++----- pgserver/logrepl/replication_test.go | 17 +++++---- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/delta/controller.go b/delta/controller.go index 9b61e6fc..97bd6ba8 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -163,6 +163,8 @@ func (c *DeltaController) updateTable( hasDeletes := appender.counters.event.delete > 0 hasUpdates := appender.counters.event.update > 0 + ctx.GetLogger().Debugf("Delta: %s.%s: stats: %+v", table.dbName, table.tableName, appender.counters) + switch { case hasInserts && !hasDeletes && !hasUpdates: // Case 1: INSERT only @@ -181,12 +183,15 @@ func (c *DeltaController) updateTable( // Helper function to build the Arrow record and register the view func (c *DeltaController) prepareArrowView( + ctx *sql.Context, conn *stdsql.Conn, table tableIdentifier, appender *DeltaAppender, ) (viewName string, close func(), err error) { record := appender.Build() + fmt.Println("record:", record) + var ar *duckdb.Arrow err = conn.Raw(func(driverConn any) error { var err error @@ -207,6 +212,7 @@ func (c *DeltaController) prepareArrowView( // Register the Arrow view hash := maphash.String(c.seed, table.dbName+"\x00"+table.tableName) viewName = "__sys_view_arrow_delta_" + strconv.FormatUint(hash, 16) + "__" + release, err := ar.RegisterView(reader, viewName) if err != nil { reader.Release() @@ -214,6 +220,32 @@ func (c *DeltaController) prepareArrowView( return "", nil, err } + rows, err := conn.QueryContext(ctx, "SELECT * FROM "+viewName) + if err != nil { + return "", nil, err + } + defer rows.Close() + row := make([]any, len(record.Schema().Fields())) + pointers := make([]any, len(row)) + for i := range row { + pointers[i] = &row[i] + } + for rows.Next() { + if err := rows.Scan(pointers...); err != nil { + return "", nil, err + } + fmt.Printf("row:%+v\n", row) + } + + if logger := ctx.GetLogger(); logger.Logger.IsLevelEnabled(logrus.DebugLevel) { + logger.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "view": viewName, + "rows": record.NumRows(), + }).Debug("Arrow view registered") + } + close = func() { release() reader.Release() @@ -230,7 +262,7 @@ func (c *DeltaController) handleInsertOnly( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(conn, table, appender) + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) if err != nil { return err } @@ -247,7 +279,10 @@ func (c *DeltaController) handleInsertOnly( b.WriteString(" FROM ") b.WriteString(viewName) - result, err := tx.ExecContext(ctx, b.String()) + sql := b.String() + ctx.GetLogger().Debug("Insert SQL: ", b.String()) + + result, err := tx.ExecContext(ctx, sql) if err != nil { return err } @@ -278,7 +313,7 @@ func (c *DeltaController) handleDeleteOnly( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(conn, table, appender) + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) if err != nil { return err } @@ -321,13 +356,13 @@ func (c *DeltaController) handleZeroDelete( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(conn, table, appender) + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) if err != nil { return err } defer release() - condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender.Schema(), getPrimaryKeyList(appender.BaseSchema())) + condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender) insertSQL := "INSERT OR REPLACE INTO " + catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + @@ -363,13 +398,13 @@ func (c *DeltaController) handleGeneralCase( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(conn, table, appender) + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) if err != nil { return err } // Create a temporary table to store the latest delta view - condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender.Schema(), getPrimaryKeyList(appender.BaseSchema())) + condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender) result, err := tx.ExecContext(ctx, "CREATE OR REPLACE TEMP TABLE delta AS "+condenseDeltaSQL) release() // release the Arrow view immediately if err != nil { @@ -502,8 +537,12 @@ func getPrimaryKeyList(schema sql.Schema) string { return strings.Join(pks, ", ") } -func buildCondenseDeltaSQL(viewName string, augmentedSchema sql.Schema, pkList string) string { - var builder strings.Builder +func buildCondenseDeltaSQL(viewName string, appender *DeltaAppender) string { + var ( + augmentedSchema = appender.Schema() + pkList = getPrimaryKeyList(appender.BaseSchema()) + builder strings.Builder + ) builder.Grow(512) // Use the following SQL to get the latest view of the rows being updated. // diff --git a/pgserver/logrepl/replication_test.go b/pgserver/logrepl/replication_test.go index d12f0619..1eba7c93 100644 --- a/pgserver/logrepl/replication_test.go +++ b/pgserver/logrepl/replication_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/errors" "github.com/dolthub/go-mysql-server/sql" "github.com/jackc/pgx/v5" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -609,7 +610,7 @@ var replicationTests = []ReplicationTest{ } func TestReplication(t *testing.T) { - // logrus.SetLevel(logrus.DebugLevel) + logrus.SetLevel(logrus.DebugLevel) RunReplicationScripts(t, replicationTests) } @@ -649,14 +650,14 @@ func RunReplicationScripts(t *testing.T, scripts []ReplicationTest) { require.NoError(t, logrepl.CreatePublicationIfNotExists(primaryDns, slotName)) time.Sleep(500 * time.Millisecond) - // for i, script := range scripts { - // if i == 10 { - // RunReplicationScript(t, dsn, script) - // } - // } - for _, script := range scripts { - RunReplicationScript(t, dsn, script) + for i, script := range scripts { + if i == 2 { + RunReplicationScript(t, dsn, script) + } } + // for _, script := range scripts { + // RunReplicationScript(t, dsn, script) + // } } const slotName = "myduck_slot" From 0e58919857bafa9775fd687916bb47b529e67d92 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 4 Dec 2024 14:43:11 +0800 Subject: [PATCH 4/7] fix: implement workaround for RegisterView bug --- delta/controller.go | 77 +++++++++++++++++----------- delta/delta.go | 4 ++ pgserver/logrepl/replication_test.go | 17 +++--- 3 files changed, 58 insertions(+), 40 deletions(-) diff --git a/delta/controller.go b/delta/controller.go index 97bd6ba8..5138f98c 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -187,10 +187,12 @@ func (c *DeltaController) prepareArrowView( conn *stdsql.Conn, table tableIdentifier, appender *DeltaAppender, + fieldOffset int, + fieldIndices []int, ) (viewName string, close func(), err error) { record := appender.Build() - fmt.Println("record:", record) + // fmt.Println("record:", record) var ar *duckdb.Arrow err = conn.Raw(func(driverConn any) error { @@ -203,6 +205,30 @@ func (c *DeltaController) prepareArrowView( return "", nil, err } + // Project the fields before registering the Arrow record into DuckDB. + // Currently, this is necessary because RegisterView uses `arrow_scan` instead of `arrow_scan_dumb` under the hood. + // The former allows projection & filter pushdown, but the implementation does not work as expected in some cases. + schema := record.Schema() + if fieldOffset > 0 { + fields := schema.Fields()[fieldOffset:] + schema = arrow.NewSchema(fields, nil) + columns := record.Columns()[fieldOffset:] + projected := array.NewRecord(schema, columns, record.NumRows()) + record.Release() + record = projected + } else if len(fieldIndices) > 0 { + fields := make([]arrow.Field, len(fieldIndices)) + columns := make([]arrow.Array, len(fieldIndices)) + for i, idx := range fieldIndices { + fields[i] = schema.Field(idx) + columns[i] = record.Column(idx) + } + schema = arrow.NewSchema(fields, nil) + projected := array.NewRecord(schema, columns, record.NumRows()) + record.Release() + record = projected + } + reader, err := array.NewRecordReader(record.Schema(), []arrow.Record{record}) if err != nil { record.Release() @@ -220,33 +246,8 @@ func (c *DeltaController) prepareArrowView( return "", nil, err } - rows, err := conn.QueryContext(ctx, "SELECT * FROM "+viewName) - if err != nil { - return "", nil, err - } - defer rows.Close() - row := make([]any, len(record.Schema().Fields())) - pointers := make([]any, len(row)) - for i := range row { - pointers[i] = &row[i] - } - for rows.Next() { - if err := rows.Scan(pointers...); err != nil { - return "", nil, err - } - fmt.Printf("row:%+v\n", row) - } - - if logger := ctx.GetLogger(); logger.Logger.IsLevelEnabled(logrus.DebugLevel) { - logger.WithFields(logrus.Fields{ - "db": table.dbName, - "table": table.tableName, - "view": viewName, - "rows": record.NumRows(), - }).Debug("Arrow view registered") - } - close = func() { + conn.ExecContext(ctx, "DROP VIEW IF EXISTS "+viewName) release() reader.Release() record.Release() @@ -262,7 +263,8 @@ func (c *DeltaController) handleInsertOnly( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) + // Ignore the augmented fields + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender, appender.NumAugmentedFields(), nil) if err != nil { return err } @@ -313,7 +315,8 @@ func (c *DeltaController) handleDeleteOnly( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) + // Ignore all but the primary key fields + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender, 0, getPrimaryKeyIndices(appender)) if err != nil { return err } @@ -356,7 +359,7 @@ func (c *DeltaController) handleZeroDelete( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender, 0, nil) if err != nil { return err } @@ -398,7 +401,7 @@ func (c *DeltaController) handleGeneralCase( appender *DeltaAppender, stats *FlushStats, ) error { - viewName, release, err := c.prepareArrowView(ctx, conn, table, appender) + viewName, release, err := c.prepareArrowView(ctx, conn, table, appender, 0, nil) if err != nil { return err } @@ -510,6 +513,18 @@ func buildColumnList(b *strings.Builder, schema sql.Schema) { } } +// Helper function to get the primary key indices. +func getPrimaryKeyIndices(appender *DeltaAppender) []int { + schema := appender.BaseSchema() + indices := make([]int, 0, 1) + for i, col := range schema { + if col.PrimaryKey { + indices = append(indices, i+appender.NumAugmentedFields()) + } + } + return indices +} + // Helper function to get the primary key. For composite primary keys, `row()` is used. func getPrimaryKeyStruct(schema sql.Schema) string { pks := make([]string, 0, 1) diff --git a/delta/delta.go b/delta/delta.go index f78a141c..4a682b9c 100644 --- a/delta/delta.go +++ b/delta/delta.go @@ -66,6 +66,10 @@ func newDeltaAppender(schema sql.Schema) (*DeltaAppender, error) { }, nil } +func (a *DeltaAppender) NumAugmentedFields() int { + return 6 +} + func (a *DeltaAppender) Field(i int) array.Builder { return a.appender.Field(i + 6) } diff --git a/pgserver/logrepl/replication_test.go b/pgserver/logrepl/replication_test.go index 1eba7c93..d12f0619 100644 --- a/pgserver/logrepl/replication_test.go +++ b/pgserver/logrepl/replication_test.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/errors" "github.com/dolthub/go-mysql-server/sql" "github.com/jackc/pgx/v5" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -610,7 +609,7 @@ var replicationTests = []ReplicationTest{ } func TestReplication(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) + // logrus.SetLevel(logrus.DebugLevel) RunReplicationScripts(t, replicationTests) } @@ -650,14 +649,14 @@ func RunReplicationScripts(t *testing.T, scripts []ReplicationTest) { require.NoError(t, logrepl.CreatePublicationIfNotExists(primaryDns, slotName)) time.Sleep(500 * time.Millisecond) - for i, script := range scripts { - if i == 2 { - RunReplicationScript(t, dsn, script) - } - } - // for _, script := range scripts { - // RunReplicationScript(t, dsn, script) + // for i, script := range scripts { + // if i == 10 { + // RunReplicationScript(t, dsn, script) + // } // } + for _, script := range scripts { + RunReplicationScript(t, dsn, script) + } } const slotName = "myduck_slot" From 5ba8497fdd566adbfc5e72eb793c23db451ec187 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 4 Dec 2024 15:10:05 +0800 Subject: [PATCH 5/7] GetCatalogConn instead of GetConn --- adapter/adapter.go | 4 ++++ binlogreplication/binlog_replica_applier.go | 2 +- delta/controller.go | 4 +++- pgserver/logrepl/replication.go | 2 +- test/test_utils.go | 2 +- 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/adapter/adapter.go b/adapter/adapter.go index a624980a..567fb729 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -21,6 +21,10 @@ func GetConn(ctx *sql.Context) (*stdsql.Conn, error) { return ctx.Session.(ConnectionHolder).GetConn(ctx) } +func GetCatalogConn(ctx *sql.Context) (*stdsql.Conn, error) { + return ctx.Session.(ConnectionHolder).GetCatalogConn(ctx) +} + func CloseBackendConn(ctx *sql.Context) { ctx.Session.(ConnectionHolder).CloseBackendConn() } diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index 054dc885..3feca741 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -1245,7 +1245,7 @@ func (a *binlogReplicaApplier) appendRowFormatChanges( } func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.FlushReason) error { - conn, err := adapter.GetConn(ctx) + conn, err := adapter.GetCatalogConn(ctx) if err != nil { return err } diff --git a/delta/controller.go b/delta/controller.go index 5138f98c..4ef19214 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -163,7 +163,9 @@ func (c *DeltaController) updateTable( hasDeletes := appender.counters.event.delete > 0 hasUpdates := appender.counters.event.update > 0 - ctx.GetLogger().Debugf("Delta: %s.%s: stats: %+v", table.dbName, table.tableName, appender.counters) + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.Debugf("Delta: %s.%s: stats: %+v", table.dbName, table.tableName, appender.counters) + } switch { case hasInserts && !hasDeletes && !hasUpdates: diff --git a/pgserver/logrepl/replication.go b/pgserver/logrepl/replication.go index c76c9a46..afc1d1bd 100644 --- a/pgserver/logrepl/replication.go +++ b/pgserver/logrepl/replication.go @@ -981,7 +981,7 @@ func (r *LogicalReplicator) commitOngoingTxnIfClean(state *replicationState, rea // commitOngoingTxn commits the current transaction func (r *LogicalReplicator) commitOngoingTxn(state *replicationState, flushReason delta.FlushReason) error { - conn, err := adapter.GetConn(state.replicaCtx) + conn, err := adapter.GetCatalogConn(state.replicaCtx) if err != nil { return err } diff --git a/test/test_utils.go b/test/test_utils.go index 369926d8..bc10c422 100644 --- a/test/test_utils.go +++ b/test/test_utils.go @@ -86,7 +86,7 @@ func StartDuckSqlServer(t *testing.T, dir string, persistentSystemVars map[strin fmt.Sprintf("--datadir=%s", dir), fmt.Sprintf("--pg-port=%v", testEnv.DuckPgPort), "--default-time-zone=UTC", - "--loglevel=6", // TRACE + "--loglevel=4", // 4: INFO, 5: DEBUG, 6: TRACE } // If we're running in CI, use a precompiled dolt binary instead of go run From 9a875fac87a2d7b903ce15a5e41632398f0e40e0 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 4 Dec 2024 15:26:48 +0800 Subject: [PATCH 6/7] minor --- delta/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delta/controller.go b/delta/controller.go index 4ef19214..d195af12 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -231,7 +231,7 @@ func (c *DeltaController) prepareArrowView( record = projected } - reader, err := array.NewRecordReader(record.Schema(), []arrow.Record{record}) + reader, err := array.NewRecordReader(schema, []arrow.Record{record}) if err != nil { record.Release() return "", nil, err From be3ceaa3ade2fce6e36df51d22ce517e15f09eac Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Wed, 4 Dec 2024 15:54:14 +0800 Subject: [PATCH 7/7] Correct comment --- delta/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delta/controller.go b/delta/controller.go index d195af12..93fba7f2 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -563,7 +563,7 @@ func buildCondenseDeltaSQL(viewName string, appender *DeltaAppender) string { builder.Grow(512) // Use the following SQL to get the latest view of the rows being updated. // - // SELECT r[0] as action, ... + // SELECT r[1] as action, ... // FROM ( // SELECT // pk1, pk2, ...,