diff --git a/lib/backend/pgbk/background.go b/lib/backend/pgbk/background.go index 95a83323f1a1c..749ded8854589 100644 --- a/lib/backend/pgbk/background.go +++ b/lib/backend/pgbk/background.go @@ -16,7 +16,6 @@ package pgbk import ( "context" - "encoding/hex" "encoding/json" "fmt" "time" @@ -116,26 +115,30 @@ func (b *Backend) backgroundChangeFeed(ctx context.Context) { // events. Assumes that b.buf is not initialized but not closed, and will reset // it before returning. func (b *Backend) runChangeFeed(ctx context.Context) error { - // we manually copy the pool configuration and connect because we don't want - // to hit a connection limit or mess with the connection pool stats; we need - // a separate, long-running connection here anyway. - poolConfig := b.pool.Config() - if poolConfig.BeforeConnect != nil { - if err := poolConfig.BeforeConnect(ctx, poolConfig.ConnConfig); err != nil { + connConfig := b.feedConfig.ConnConfig.Copy() + if bc := b.feedConfig.BeforeConnect; bc != nil { + if err := bc(ctx, connConfig); err != nil { return trace.Wrap(err) } } - conn, err := pgx.ConnectConfig(ctx, poolConfig.ConnConfig) + // TODO(espadolini): use a replication connection if + // connConfig.RuntimeParams["replication"] == "database" + conn, err := pgx.ConnectConfig(ctx, connConfig) if err != nil { return trace.Wrap(err) } defer func() { - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - if err := conn.Close(ctx); err != nil && ctx.Err() != nil { + if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil { b.log.WithError(err).Warn("Error closing change feed connection.") } }() + if ac := b.feedConfig.AfterConnect; ac != nil { + if err := ac(ctx, conn); err != nil { + return trace.Wrap(err) + } + } // reading from a replication slot adds to the postgres log at "log" level // (right below "fatal") for every poll, and we poll every second here, so @@ -146,26 +149,40 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { b.log.WithError(err).Debug("Failed to silence log messages for change feed session.") } - // this can be useful if we're some sort of admin but we haven't gotten the - // REPLICATION attribute yet - // HACK(espadolini): ALTER ROLE CURRENT_USER REPLICATION just crashes postgres on Azure - if _, err := conn.Exec(ctx, - fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{poolConfig.ConnConfig.User}.Sanitize()), - pgx.QueryExecModeExec, - ); err != nil { - b.log.WithError(err).Debug("Failed to enable replication for the current user.") + // this can be useful on Azure if we have azure_pg_admin permissions but not + // the REPLICATION attribute; in vanilla Postgres you have to be SUPERUSER + // to grant REPLICATION, and if you are SUPERUSER you can do replication + // things even without the attribute anyway + // + // HACK(espadolini): ALTER ROLE CURRENT_USER crashes Postgres on Azure, so + // we have to use an explicit username + if b.cfg.AuthMode == AzureADAuth && connConfig.User != "" { + if _, err := conn.Exec(ctx, + fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()), + pgx.QueryExecModeExec, + ); err != nil { + b.log.WithError(err).Debug("Failed to enable replication for the current user.") + } } - u := uuid.New() - slotName := hex.EncodeToString(u[:]) + // a replication slot must be 1-63 lowercase letters, numbers and + // underscores, as per + // https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194 + slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New())) b.log.WithField("slot_name", slotName).Info("Setting up change feed.") - if _, err := conn.Exec(ctx, + + // be noisy about pg_create_logical_replication_slot taking too long, since + // hanging here leaves the backend non-functional + createCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + if _, err := conn.Exec(createCtx, "SELECT * FROM pg_create_logical_replication_slot($1, 'wal2json', true)", pgx.QueryExecModeExec, slotName, ); err != nil { + cancel() return trace.Wrap(err) } + cancel() b.log.WithField("slot_name", slotName).Info("Change feed started.") b.buf.SetInit() diff --git a/lib/backend/pgbk/pgbk.go b/lib/backend/pgbk/pgbk.go index 087b05c4bffed..ea25b2a220a9a 100644 --- a/lib/backend/pgbk/pgbk.go +++ b/lib/backend/pgbk/pgbk.go @@ -82,6 +82,7 @@ type Config struct { AuthMode AuthMode `json:"auth_mode"` + ChangeFeedConnString string `json:"change_feed_conn_string"` ChangeFeedPollInterval types.Duration `json:"change_feed_poll_interval"` ChangeFeedBatchSize int `json:"change_feed_batch_size"` @@ -95,6 +96,9 @@ func (c *Config) CheckAndSetDefaults() error { return trace.Wrap(err) } + if c.ChangeFeedConnString == "" { + c.ChangeFeedConnString = c.ConnString + } if c.ChangeFeedPollInterval < 0 { return trace.BadParameter("change feed poll interval must be non-negative") } @@ -150,6 +154,10 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { if err != nil { return nil, trace.Wrap(err) } + feedConfig, err := pgxpool.ParseConfig(cfg.ChangeFeedConnString) + if err != nil { + return nil, trace.Wrap(err) + } log := logrus.WithField(trace.Component, componentName) @@ -159,6 +167,7 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { return nil, trace.Wrap(err) } poolConfig.BeforeConnect = bc + feedConfig.BeforeConnect = bc } const defaultTxIsoParamName = "default_transaction_isolation" @@ -185,7 +194,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { ctx, cancel := context.WithCancel(ctx) b := &Backend{ - cfg: cfg, + cfg: cfg, + feedConfig: feedConfig, + log: log, pool: pool, buf: backend.NewCircularBuffer(), @@ -211,7 +222,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { // Backend is a PostgreSQL-backed [backend.Backend]. type Backend struct { - cfg Config + cfg Config + feedConfig *pgxpool.Config + log logrus.FieldLogger pool *pgxpool.Pool buf *backend.CircularBuffer