Skip to content

Commit

Permalink
Merge pull request #2041 from josephschorr/crdb-watch-timeout
Browse files Browse the repository at this point in the history
Add a default connect timeout for watch in CRDB driver
  • Loading branch information
josephschorr authored Aug 27, 2024
2 parents d77601b + bc7d7cf commit a137834
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 1 deletion.
2 changes: 2 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
dburl: url,
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchConnectTimeout: config.watchConnectTimeout,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
beginChangefeedQuery: changefeedQuery,
Expand Down Expand Up @@ -267,6 +268,7 @@ type crdbDatastore struct {
readPool, writePool *pool.RetryPool
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
analyzeBeforeStatistics bool
Expand Down
11 changes: 11 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type crdbOptions struct {

watchBufferLength uint16
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
Expand All @@ -40,6 +41,7 @@ const (
defaultMaxRevisionStalenessPercent = 0.1
defaultWatchBufferLength = 128
defaultWatchBufferWriteTimeout = 1 * time.Second
defaultWatchConnectTimeout = 1 * time.Second
defaultSplitSize = 1024

defaultMaxRetries = 5
Expand All @@ -61,6 +63,7 @@ func generateConfig(options []Option) (crdbOptions, error) {
gcWindow: 24 * time.Hour,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
watchConnectTimeout: defaultWatchConnectTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
Expand Down Expand Up @@ -232,6 +235,14 @@ func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
return func(po *crdbOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout }
}

// WatchConnectTimeout is the maximum timeout for connecting the watch stream
// to the datastore.
//
// This value defaults to 1 second.
func WatchConnectTimeout(watchConnectTimeout time.Duration) Option {
return func(po *crdbOptions) { po.watchConnectTimeout = watchConnectTimeout }
}

// RevisionQuantization is the time bucket size to which advertised revisions
// will be rounded.
//
Expand Down
7 changes: 6 additions & 1 deletion internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,17 @@ func (cds *crdbDatastore) watch(
defer close(updates)
defer close(errs)

watchConnectTimeout := opts.WatchConnectTimeout
if watchConnectTimeout <= 0 {
watchConnectTimeout = cds.watchConnectTimeout
}

// get non-pooled connection for watch
// "applications should explicitly create dedicated connections to consume
// changefeed data, instead of using a connection pool as most client
// drivers do by default."
// see: https://www.cockroachlabs.com/docs/v22.2/changefeed-for#considerations
conn, err := pgxcommon.ConnectWithInstrumentation(ctx, cds.dburl)
conn, err := pgxcommon.ConnectWithInstrumentationAndTimeout(ctx, cds.dburl, watchConnectTimeout)
if err != nil {
errs <- err
return
Expand Down
11 changes: 11 additions & 0 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ func ConnectWithInstrumentation(ctx context.Context, url string) (*pgx.Conn, err
return pgx.ConnectConfig(ctx, connConfig)
}

// ConnectWithInstrumentationAndTimeout returns a pgx.Conn that has been instrumented for observability
func ConnectWithInstrumentationAndTimeout(ctx context.Context, url string, connectTimeout time.Duration) (*pgx.Conn, error) {
connConfig, err := ParseConfigWithInstrumentation(url)
if err != nil {
return nil, err
}

connConfig.ConnectTimeout = connectTimeout
return pgx.ConnectConfig(ctx, connConfig)
}

// ConfigurePGXLogger sets zerolog global logger into the connection pool configuration, and maps
// info level events to debug, as they are rather verbose for SpiceDB's info level
func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Config struct {
// Internal
WatchBufferLength uint16 `debugmap:"visible"`
WatchBufferWriteTimeout time.Duration `debugmap:"visible"`
WatchConnectTimeout time.Duration `debugmap:"visible"`

// Migrations
MigrationPhase string `debugmap:"visible"`
Expand Down Expand Up @@ -230,6 +231,7 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt
flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in")
flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how large the watch buffer should be before blocking")
flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), 1*time.Second, "how long the watch buffer should queue before forcefully disconnecting the reader")
flagSet.DurationVar(&opts.WatchConnectTimeout, flagName("datastore-watch-connect-timeout"), 1*time.Second, "how long the watch connection should wait before timing out (cockroachdb driver only)")

// disabling stats is only for tests
flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table")
Expand Down Expand Up @@ -271,6 +273,7 @@ func DefaultDatastoreConfig() *Config {
GCMaxOperationTime: 1 * time.Minute,
WatchBufferLength: 1024,
WatchBufferWriteTimeout: 1 * time.Second,
WatchConnectTimeout: 1 * time.Second,
EnableDatastoreMetrics: true,
DisableStats: false,
BootstrapFiles: []string{},
Expand Down Expand Up @@ -411,6 +414,7 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er
crdb.OverlapStrategy(opts.OverlapStrategy),
crdb.WatchBufferLength(opts.WatchBufferLength),
crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout),
crdb.WatchConnectTimeout(opts.WatchConnectTimeout),
crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics),
crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing),
crdb.ConnectRate(opts.ConnectRate),
Expand Down
9 changes: 9 additions & 0 deletions pkg/cmd/datastore/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,11 @@ type WatchOptions struct {
// WatchBufferWriteTimeout is the timeout for writing to the watch channel.
// If given the zero value, the datastore's default will be used.
WatchBufferWriteTimeout time.Duration

// WatchConnectTimeout is the timeout for connecting to the watch channel.
// If given the zero value, the datastore's default will be used.
// May not be supported by the datastore.
WatchConnectTimeout time.Duration
}

// WatchJustRelationships returns watch options for just relationships.
Expand Down

0 comments on commit a137834

Please sign in to comment.