Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 3 additions & 23 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,7 @@ type JSApiStreamTemplateNamesResponse struct {
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"

// Structure that holds state for a JetStream API request that is processed
// in a separate long-lived go routine. This is to avoid possibly blocking
// ROUTE and GATEWAY connections.
// in a separate long-lived go routine. This is to avoid blocking connections.
type jsAPIRoutedReq struct {
jsub *subscription
sub *subscription
Expand Down Expand Up @@ -872,17 +871,6 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
}
jsub := rr.psubs[0]

// If this is directly from a client connection ok to do in place.
if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF {
start := time.Now()
jsub.icb(sub, c, acc, subject, reply, rmsg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return
}

// If we are here we have received this request over a non-client connection.
// We need to make sure not to block. We will send the request to a long-lived
// pool of go routines.

Expand Down Expand Up @@ -1718,8 +1706,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,

// Handle clustered version here.
if s.JetStreamIsClustered() {
// Always do in separate Go routine.
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
return
}

Expand Down Expand Up @@ -4417,14 +4404,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}

if isClustered && !req.Config.Direct {
// If we are inline with client, we still may need to do a callout for consumer info
// during this call, so place in Go routine to not block client.
// Router and Gateway API calls already in separate context.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
} else {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
}
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
return
}

Expand Down
Loading