-
Notifications
You must be signed in to change notification settings - Fork 919
GODRIVER-3659 Filter CMAP/SDAM events for awaitMinPoolSizeMS #2235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
4110738
57c3818
fd9093f
8c9477a
472df7b
8619a32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -38,6 +38,52 @@ var securitySensitiveCommands = []string{ | |||||
| "createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb", | ||||||
| } | ||||||
|
|
||||||
| // eventSequencer allows for sequence-based event filtering for | ||||||
| // awaitMinPoolSizeMS support. | ||||||
| // | ||||||
| // Per the unified test format spec, when awaitMinPoolSizeMS is specified, any | ||||||
| // CMAP and SDAM events that occur during connection pool initialization | ||||||
| // (before minPoolSize is reached) must be ignored. We track this by | ||||||
| // assigning a monotonically increasing sequence number to each event as it's | ||||||
| // recorded. After pool initialization completes, we set eventCutoffSeq to the | ||||||
| // current sequence number. Event accessors for CMAP and SDAM types then | ||||||
| // filter out any events with sequence <= eventCutoffSeq. | ||||||
| // | ||||||
| // Sequencing is thread-safe to support concurrent operations that may generate | ||||||
| // events (e.g., connection checkouts generating CMAP events). | ||||||
| type eventSequencer struct { | ||||||
| counter atomic.Int64 | ||||||
| cutoff atomic.Int64 | ||||||
|
|
||||||
| mu sync.RWMutex | ||||||
|
|
||||||
| // pool events are heterogeneous, so we track their sequence separately | ||||||
| poolSeq []int64 | ||||||
| seqByEventType map[monitoringEventType][]int64 | ||||||
| } | ||||||
|
|
||||||
| // setCutoff marks the current sequence as the filtering cutoff point. | ||||||
| func (es *eventSequencer) setCutoff() { | ||||||
| es.cutoff.Store(es.counter.Load()) | ||||||
| } | ||||||
|
|
||||||
| // recordEvent stores the sequence number for a given event type. | ||||||
| func (es *eventSequencer) recordEvent(eventType monitoringEventType) { | ||||||
| next := es.counter.Add(1) | ||||||
|
|
||||||
| es.mu.Lock() | ||||||
| es.seqByEventType[eventType] = append(es.seqByEventType[eventType], next) | ||||||
| es.mu.Unlock() | ||||||
| } | ||||||
|
|
||||||
| func (es *eventSequencer) recordPooledEvent() { | ||||||
| next := es.counter.Add(1) | ||||||
|
|
||||||
| es.mu.Lock() | ||||||
| es.poolSeq = append(es.poolSeq, next) | ||||||
|
||||||
| es.mu.Unlock() | ||||||
| } | ||||||
|
|
||||||
| // clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test | ||||||
| // execution. | ||||||
| type clientEntity struct { | ||||||
|
|
@@ -72,30 +118,8 @@ type clientEntity struct { | |||||
|
|
||||||
| entityMap *EntityMap | ||||||
|
|
||||||
| logQueue chan orderedLogMessage | ||||||
| } | ||||||
|
|
||||||
| // awaitMinimumPoolSize waits for the client's connection pool to reach the | ||||||
| // specified minimum size. This is a best effort operation that times out after | ||||||
| // some predefined amount of time to avoid blocking tests indefinitely. | ||||||
| func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error { | ||||||
| // Don't spend longer than 500ms awaiting minPoolSize. | ||||||
| awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) | ||||||
| defer cancel() | ||||||
|
|
||||||
| ticker := time.NewTicker(100 * time.Millisecond) | ||||||
| defer ticker.Stop() | ||||||
|
|
||||||
| for { | ||||||
| select { | ||||||
| case <-awaitCtx.Done(): | ||||||
| return fmt.Errorf("timed out waiting for client to reach minPoolSize") | ||||||
| case <-ticker.C: | ||||||
| if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize { | ||||||
| return nil | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| logQueue chan orderedLogMessage | ||||||
| eventSequencer eventSequencer | ||||||
| } | ||||||
|
|
||||||
| func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) { | ||||||
|
|
@@ -118,6 +142,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp | |||||
| serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32), | ||||||
| entityMap: em, | ||||||
| observeSensitiveCommands: entityOptions.ObserveSensitiveCommands, | ||||||
| eventSequencer: eventSequencer{ | ||||||
| seqByEventType: make(map[monitoringEventType][]int64), | ||||||
| }, | ||||||
| } | ||||||
| entity.setRecordEvents(true) | ||||||
|
|
||||||
|
|
@@ -226,8 +253,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp | |||||
| return nil, fmt.Errorf("error creating mongo.Client: %w", err) | ||||||
| } | ||||||
|
|
||||||
| if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 { | ||||||
| if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil { | ||||||
| if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 && | ||||||
| clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 { | ||||||
| if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize, *entityOptions.AwaitMinPoolSizeMS); err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
| } | ||||||
|
|
@@ -326,8 +354,47 @@ func (c *clientEntity) failedEvents() []*event.CommandFailedEvent { | |||||
| return events | ||||||
| } | ||||||
|
|
||||||
| func (c *clientEntity) poolEvents() []*event.PoolEvent { | ||||||
| return c.pooled | ||||||
| // filterEventsBySeq filters events by sequence number for the given eventType. | ||||||
| // See comments on eventSequencer for more details. | ||||||
| func filterEventsBySeq[T any](c *clientEntity, events []T, eventType monitoringEventType) []T { | ||||||
| cutoff := c.eventSequencer.cutoff.Load() | ||||||
| if cutoff == 0 { | ||||||
| return events | ||||||
| } | ||||||
|
|
||||||
| // Lock order: eventProcessMu -> eventSequencer.mu (matches writers) | ||||||
| c.eventProcessMu.RLock() | ||||||
| c.eventSequencer.mu.RLock() | ||||||
|
|
||||||
| // Snapshot to minimize time under locks and avoid races | ||||||
| localEvents := append([]T(nil), events...) | ||||||
|
|
||||||
| var seqSlice []int64 | ||||||
| if eventType == poolAnyEvent { | ||||||
| seqSlice = c.eventSequencer.poolSeq | ||||||
| } else { | ||||||
| seqSlice = c.eventSequencer.seqByEventType[eventType] | ||||||
| } | ||||||
|
|
||||||
| localSeqs := append([]int64(nil), seqSlice...) | ||||||
|
||||||
| localSeqs := append([]int64(nil), seqSlice...) | |
| localSeqs := slices.Clone(seqSlice) |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec says that awaitMinPoolSizeMS must wait for each connection pool to reach minPoolSize, but this seems to wait for the total number of connections across all pools. Is there a way to wait for each connection pool to reach minPoolSize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to this is to re-initialize the event containers here. While this is a much simpler solution, it could be buggy depending if we need to re-initialize event logic for some other reason somewhere else. The eventSequencer future-proofs this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of storing every sequence number per event type, can we only store the index when the cutoff happened?
E.g.
That would also simplify
filterEventsBySeqbecause you just get a subslice index:Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this approach is simpler, it only supports a single cutoff per test. If the unified spec adds scenarios with multiple cutoffs (e.g., awaitMinPoolSizeMS (implicit cutoff) → run X → cutoff at N (either implicit or explicit) → run Y), we’d have to refactor back to the original design.