Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/backend/pgbk/pgbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ type Config struct {
DisableExpiry bool `json:"disable_expiry"`
ExpiryInterval types.Duration `json:"expiry_interval"`
ExpiryBatchSize int `json:"expiry_batch_size"`
// ClientCertReloadInterval is the interval for reloading the client cert
// for connections to the database
ClientCertReloadInterval types.Duration `json:"client_cert_reload_interval"`
// ChangeFeedCertReloadInterval is the interval for reloading the change feed client cert
// for connections to the database
ChangeFeedCertReloadInterval types.Duration `json:"changefeed_cert_reload_interval"`
}

func (c *Config) CheckAndSetDefaults() error {
Expand All @@ -88,6 +94,7 @@ func (c *Config) CheckAndSetDefaults() error {

if c.ChangeFeedConnString == "" {
c.ChangeFeedConnString = c.ConnString
c.ChangeFeedCertReloadInterval = c.ClientCertReloadInterval
}
if c.ChangeFeedPollInterval < 0 {
return trace.BadParameter("change feed poll interval must be non-negative")
Expand Down Expand Up @@ -144,10 +151,23 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
if err != nil {
return nil, trace.Wrap(err)
}
if cfg.ClientCertReloadInterval > 0 {
err := pgcommon.CreateClientCertReloader(ctx, "backend", cfg.ConnString, poolConfig.ConnConfig, cfg.ClientCertReloadInterval.Value(), nil)
if err != nil {
return nil, trace.Wrap(err)
}
}

feedConfig, err := pgxpool.ParseConfig(cfg.ChangeFeedConnString)
if err != nil {
return nil, trace.Wrap(err)
}
if cfg.ChangeFeedCertReloadInterval > 0 {
err := pgcommon.CreateClientCertReloader(ctx, "changefeed", cfg.ChangeFeedConnString, feedConfig.ConnConfig, cfg.ChangeFeedCertReloadInterval.Value(), nil)
if err != nil {
return nil, trace.Wrap(err)
}
}

log := slog.With(teleport.ComponentKey, componentName)

Expand Down
30 changes: 24 additions & 6 deletions lib/events/pgevents/pgevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ const (
)

const (
defaultRetentionPeriod = 8766 * time.Hour // 365.25 days, i.e. one year
defaultCleanupInterval = time.Hour
defaultRetentionPeriod = 8766 * time.Hour // 365.25 days, i.e. one year
defaultCleanupInterval = time.Hour
defaultCertReloadInterval = 0
)

// URL parameters for configuration.
Expand All @@ -67,6 +68,7 @@ const (
disableCleanupParam = "disable_cleanup"
cleanupIntervalParam = "cleanup_interval"
retentionPeriodParam = "retention_period"
certReloadParam = "cert_reload_interval"
)

const (
Expand Down Expand Up @@ -114,9 +116,10 @@ type Config struct {
Log *slog.Logger
PoolConfig *pgxpool.Config

DisableCleanup bool
RetentionPeriod time.Duration
CleanupInterval time.Duration
DisableCleanup bool
RetentionPeriod time.Duration
CleanupInterval time.Duration
CertReloadInterval time.Duration
}

// SetFromURL sets config params from the URL, as per [pgxpool.ParseConfig]
Expand Down Expand Up @@ -169,6 +172,14 @@ func (c *Config) SetFromURL(u *url.URL) error {
c.RetentionPeriod = d
}

if s := params.Get(certReloadParam); s != "" {
d, err := time.ParseDuration(s)
if err != nil {
return trace.Wrap(err)
}
c.CertReloadInterval = d
}

return nil
}

Expand Down Expand Up @@ -204,7 +215,7 @@ func (c *Config) CheckAndSetDefaults() error {
return nil
}

// Returns a new Log given a Config. Starts a background cleanup task unless
// New returns a new Log given a Config. Starts a background cleanup task unless
// disabled in the Config.
func New(ctx context.Context, cfg Config) (*Log, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
Expand All @@ -219,6 +230,13 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
return nil, trace.Wrap(err)
}

if cfg.CertReloadInterval > 0 {
err := pgcommon.CreateClientCertReloader(ctx, "pgevents", cfg.PoolConfig.ConnString(), cfg.PoolConfig.ConnConfig, cfg.CertReloadInterval, nil)
if err != nil {
return nil, trace.Wrap(err)
}
}

cfg.Log.InfoContext(ctx, "Setting up events backend.")

pgcommon.TryEnsureDatabase(ctx, cfg.PoolConfig, cfg.Log)
Expand Down
5 changes: 5 additions & 0 deletions lib/events/pgevents/pgevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func TestConfig(t *testing.T) {
RetentionPeriod: defaultRetentionPeriod,
CleanupInterval: defaultCleanupInterval,
},
"postgres://foo#cert_reload_interval=1h": {
RetentionPeriod: defaultRetentionPeriod,
CleanupInterval: defaultCleanupInterval,
CertReloadInterval: time.Hour,
},

"postgres://foo#auth_mode=invalid-auth-mode": nil,
}
Expand Down
Loading