Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions lib/backend/pgbk/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
}
}

// 'kv'::regclass will get the oid for the kv table as searched given the
// current search_path, which matches the behavior of any query that refers
// to the kv table with no qualifier (like the rest of the pgbk code does)
var schemaName string
if err := conn.QueryRow(ctx,
"SELECT nsp.nspname "+
"FROM pg_class AS cl JOIN pg_namespace AS nsp ON cl.relnamespace = nsp.oid "+
"WHERE cl.oid = 'kv'::regclass",
pgx.QueryExecModeExec,
).Scan(&schemaName); err != nil {
return trace.Wrap(err)
}
addTables := wal2jsonEscape(schemaName) + ".kv"

// reading from a replication slot adds to the postgres log at "log" level
// (right below "fatal") for every poll, and we poll every second here, so
// we try to silence the logs for this connection; this can fail because of
Expand Down Expand Up @@ -189,7 +203,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
defer b.buf.Reset()

for ctx.Err() == nil {
messages, err := b.pollChangeFeed(ctx, conn, slotName, b.cfg.ChangeFeedBatchSize)
messages, err := b.pollChangeFeed(ctx, conn, addTables, slotName, b.cfg.ChangeFeedBatchSize)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -210,16 +224,16 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {

// pollChangeFeed will poll the change feed and emit any fetched events, if any.
// It returns the count of received messages.
func (b *Backend) pollChangeFeed(ctx context.Context, conn *pgx.Conn, slotName string, batchSize int) (int64, error) {
func (b *Backend) pollChangeFeed(ctx context.Context, conn *pgx.Conn, addTables, slotName string, batchSize int) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

t0 := time.Now()

rows, _ := conn.Query(ctx,
"SELECT data FROM pg_logical_slot_get_changes($1, NULL, $2, "+
"'format-version', '2', 'add-tables', 'public.kv', 'include-transaction', 'false')",
slotName, batchSize)
"'format-version', '2', 'add-tables', $3, 'include-transaction', 'false')",
slotName, batchSize, addTables)

var data []byte
tag, err := pgx.ForEachRow(rows, []any{(*pgtype.DriverBytes)(&data)}, func() error {
Expand Down
30 changes: 13 additions & 17 deletions lib/backend/pgbk/wal2json.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pgbk
import (
"bytes"
"encoding/hex"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -98,8 +99,6 @@ func (c *wal2jsonColumn) UUID() (uuid.UUID, error) {

type wal2jsonMessage struct {
Action string `json:"action"`
Schema string `json:"schema"`
Table string `json:"table"`

Columns []wal2jsonColumn `json:"columns"`
Identity []wal2jsonColumn `json:"identity"`
Expand All @@ -113,16 +112,9 @@ func (w *wal2jsonMessage) Events() ([]backend.Event, error) {
return nil, trace.BadParameter("unexpected action %q", w.Action)

case "T":
if w.Schema != "public" || w.Table != "kv" {
return nil, nil
}
return nil, trace.BadParameter("received truncate for table kv")

case "I":
if w.Schema != "public" || w.Table != "kv" {
return nil, nil
}

key, err := w.newCol("key").Bytea()
if err != nil {
return nil, trace.Wrap(err, "parsing key on insert")
Expand Down Expand Up @@ -152,10 +144,6 @@ func (w *wal2jsonMessage) Events() ([]backend.Event, error) {
}}, nil

case "D":
if w.Schema != "public" || w.Table != "kv" {
return nil, nil
}

key, err := w.oldCol("key").Bytea()
if err != nil {
return nil, trace.Wrap(err, "parsing key on delete")
Expand All @@ -168,10 +156,6 @@ func (w *wal2jsonMessage) Events() ([]backend.Event, error) {
}}, nil

case "U":
if w.Schema != "public" || w.Table != "kv" {
return nil, nil
}

// on an UPDATE, an unmodified TOASTed column might be missing from
// "columns", but it should be present in "identity" (and this also
// applies to "key"), so we use the toastCol accessor function
Expand Down Expand Up @@ -260,3 +244,15 @@ func (w *wal2jsonMessage) toastCol(name string) *wal2jsonColumn {
}
return w.oldCol(name)
}

// wal2jsonEscape turns a schema or table name into a form suitable for use in
// wal2json's filter-tables or add-tables option, by prepending a backslash to
// each character.
func wal2jsonEscape(s string) string {
var b strings.Builder
for _, r := range s {
b.WriteRune('\\')
b.WriteRune(r)
}
return b.String()
}
26 changes: 1 addition & 25 deletions lib/backend/pgbk/wal2json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ func TestMessage(t *testing.T) {

m := &wal2jsonMessage{
Action: "I",
Schema: "public",
Table: "kv",
Columns: []wal2jsonColumn{
{Name: "key", Type: "bytea", Value: s("")},
{Name: "expires", Type: "bytea", Value: s("")},
Expand All @@ -126,15 +124,8 @@ func TestMessage(t *testing.T) {
_, err := m.Events()
require.ErrorContains(t, err, "missing column")

m.Table = "notkv"
evs, err := m.Events()
require.NoError(t, err)
require.Empty(t, evs)

m = &wal2jsonMessage{
Action: "I",
Schema: "public",
Table: "kv",
Columns: []wal2jsonColumn{
{Name: "key", Type: "bytea", Value: s("")},
{Name: "value", Type: "bytea", Value: s("")},
Expand All @@ -148,8 +139,6 @@ func TestMessage(t *testing.T) {

m = &wal2jsonMessage{
Action: "I",
Schema: "public",
Table: "kv",
Columns: []wal2jsonColumn{
{Name: "key", Type: "bytea", Value: s("666f6f")},
{Name: "value", Type: "bytea", Value: s("")},
Expand All @@ -158,7 +147,7 @@ func TestMessage(t *testing.T) {
},
Identity: []wal2jsonColumn{},
}
evs, err = m.Events()
evs, err := m.Events()
require.NoError(t, err)
require.Len(t, evs, 1)
require.Empty(t, cmp.Diff(evs[0], backend.Event{
Expand All @@ -171,15 +160,8 @@ func TestMessage(t *testing.T) {
},
}))

m.Table = "notkv"
evs, err = m.Events()
require.NoError(t, err)
require.Empty(t, evs)

m = &wal2jsonMessage{
Action: "U",
Schema: "public",
Table: "kv",
Columns: []wal2jsonColumn{
{Name: "value", Type: "bytea", Value: s("666f6f32")},
{Name: "expires", Type: "timestamp with time zone", Value: nil},
Expand All @@ -205,8 +187,6 @@ func TestMessage(t *testing.T) {

m = &wal2jsonMessage{
Action: "U",
Schema: "public",
Table: "kv",
Columns: []wal2jsonColumn{
{Name: "key", Type: "bytea", Value: s("666f6f32")},
{Name: "value", Type: "bytea", Value: s("666f6f32")},
Expand Down Expand Up @@ -243,8 +223,6 @@ func TestMessage(t *testing.T) {

m = &wal2jsonMessage{
Action: "U",
Schema: "public",
Table: "kv",
Columns: []wal2jsonColumn{
{Name: "value", Type: "bytea", Value: s("666f6f32")},
},
Expand All @@ -260,8 +238,6 @@ func TestMessage(t *testing.T) {

m = &wal2jsonMessage{
Action: "D",
Schema: "public",
Table: "kv",
Identity: []wal2jsonColumn{
{Name: "key", Type: "bytea", Value: s("666f6f")},
{Name: "value", Type: "bytea", Value: s("")},
Expand Down