diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index c9919c7d167..86202c678db 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -38,8 +38,8 @@ jobs: - name: Install syft # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit 7b36ad622f042cab6f59a75c2ac24ccb256e9b45 = tag v0.20.4 - uses: anchore/sbom-action/download-syft@7b36ad622f042cab6f59a75c2ac24ccb256e9b45 + # Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6 + uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b with: syft-version: "v1.27.1" diff --git a/go.mod b/go.mod index a5468f09665..d0548ef336e 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.24.0 require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op - github.com/google/go-tpm v0.9.5 + github.com/google/go-tpm v0.9.6 github.com/klauspost/compress v1.18.0 github.com/minio/highwayhash v1.0.3 - github.com/nats-io/jwt/v2 v2.7.4 - github.com/nats-io/nats.go v1.45.0 + github.com/nats-io/jwt/v2 v2.8.0 + github.com/nats-io/nats.go v1.46.1 github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 diff --git a/go.sum b/go.sum index 2100e79d779..3f4b9fba47f 100644 --- a/go.sum +++ b/go.sum @@ -2,16 +2,16 @@ github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfr github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU= -github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= +github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= -github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= -github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo= +github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 7b16b962263..5787cdcd9ba 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -802,8 +802,7 @@ type JSApiStreamTemplateNamesResponse struct { const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response" // Structure that holds state for a JetStream API request that is processed -// in a separate long-lived go routine. This is to avoid possibly blocking -// ROUTE and GATEWAY connections. +// in a separate long-lived go routine. This is to avoid blocking connections. type jsAPIRoutedReq struct { jsub *subscription sub *subscription @@ -872,17 +871,6 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub } jsub := rr.psubs[0] - // If this is directly from a client connection ok to do in place. - if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF { - start := time.Now() - jsub.icb(sub, c, acc, subject, reply, rmsg) - if dur := time.Since(start); dur >= readLoopReportThreshold { - s.Warnf("Internal subscription on %q took too long: %v", subject, dur) - } - return - } - - // If we are here we have received this request over a non-client connection. // We need to make sure not to block. We will send the request to a long-lived // pool of go routines. @@ -1731,8 +1719,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, // Handle clustered version here. if s.JetStreamIsClustered() { - // Always do in separate Go routine. - go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic) + s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic) return } @@ -4481,14 +4468,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } if isClustered && !req.Config.Direct { - // If we are inline with client, we still may need to do a callout for consumer info - // during this call, so place in Go routine to not block client. - // Router and Gateway API calls already in separate context. - if c.kind != ROUTER && c.kind != GATEWAY { - go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic) - } else { - s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic) - } + s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 36a60c9b4a0..cba2215b200 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1409,13 +1409,13 @@ func (js *jetStream) monitorCluster() { go checkHealth() continue } - if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, err := js.applyMetaEntries(ce.Entries, ru); err == nil { var nb uint64 // Some entries can fail without an error when shutting down, don't move applied forward. if !js.isShuttingDown() { _, nb = n.Applied(ce.Index) } - if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { + if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) { doSnapshot() } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() @@ -1992,8 +1992,8 @@ func (ca *consumerAssignment) recoveryKey() string { return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name } -func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) { - var didSnap, didRemoveStream, didRemoveConsumer bool +func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, error) { + var didSnap bool isRecovering := js.isMetaRecovering() for _, e := range entries { @@ -2015,7 +2015,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2029,7 +2029,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2041,13 +2041,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) - didRemoveStream = true } case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2067,7 +2066,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2087,7 +2086,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) @@ -2102,13 +2101,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } } else { js.processConsumerRemoval(ca) - didRemoveConsumer = true } case updateStreamOp: sa, err := decodeStreamAssignment(js.srv, buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, didRemoveStream, didRemoveConsumer, err + return didSnap, err } if isRecovering { js.setStreamAssignmentRecovering(sa) @@ -2118,16 +2116,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) - // Since an update can be lowering replica count, we want upper layer to treat - // similar to a removal and snapshot to collapse old entries. - didRemoveStream = true } default: panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0]))) } } } - return didSnap, didRemoveStream, didRemoveConsumer, nil + return didSnap, nil } func (rg *raftGroup) isMember(id string) bool { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b28cf6b8f26..1ef64219bf9 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6640,13 +6640,13 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { } // Push recovery entries that create the stream & consumer. - _, _, _, err := js.applyMetaEntries(create, ru) + _, err := js.applyMetaEntries(create, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers), 1) // Now push another recovery entry that deletes the stream. The // entry that creates the consumer should now be gone. - _, _, _, err = js.applyMetaEntries(delete, ru) + _, err = js.applyMetaEntries(delete, ru) require_NoError(t, err) require_Len(t, len(ru.removeStreams), 1) require_Len(t, len(ru.updateConsumers), 0) @@ -6694,27 +6694,27 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { } // We created a file-based stream first, but deleted it shortly after. - _, _, _, err := js.applyMetaEntries(createFileStream, ru) + _, err := js.applyMetaEntries(createFileStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) // Now push another recovery entry that deletes the stream. // The file-based stream should not have been created. - _, _, _, err = js.applyMetaEntries(deleteFileStream, ru) + _, err = js.applyMetaEntries(deleteFileStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 0) require_Len(t, len(ru.removeStreams), 1) // Now stage a memory-based stream to be created. - _, _, _, err = js.applyMetaEntries(createMemoryStream, ru) + _, err = js.applyMetaEntries(createMemoryStream, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) require_Len(t, len(ru.updateConsumers), 0) // Also create a consumer on that memory-based stream. - _, _, _, err = js.applyMetaEntries(createConsumer, ru) + _, err = js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.addStreams), 1) require_Len(t, len(ru.removeStreams), 0) @@ -6751,19 +6751,19 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } // Creating the consumer should append to update consumers list. - _, _, _, err := js.applyMetaEntries(createConsumer, ru) + _, err := js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers[":TEST"]), 1) require_Len(t, len(ru.removeConsumers), 0) // Deleting the consumer should append to remove consumers list and remove from update list. - _, _, _, err = js.applyMetaEntries(deleteConsumer, ru) + _, err = js.applyMetaEntries(deleteConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.removeConsumers[":TEST"]), 1) require_Len(t, len(ru.updateConsumers[":TEST"]), 0) // When re-creating the consumer, add to update list and remove from remove list. - _, _, _, err = js.applyMetaEntries(createConsumer, ru) + _, err = js.applyMetaEntries(createConsumer, ru) require_NoError(t, err) require_Len(t, len(ru.updateConsumers[":TEST"]), 1) require_Len(t, len(ru.removeConsumers[":TEST"]), 0)