Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ jobs:

- name: Install syft
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit 7b36ad622f042cab6f59a75c2ac24ccb256e9b45 = tag v0.20.4
uses: anchore/sbom-action/download-syft@7b36ad622f042cab6f59a75c2ac24ccb256e9b45
# Commit f8bdd1d8ac5e901a77a92f111440fdb1b593736b = tag v0.20.6
uses: anchore/sbom-action/download-syft@f8bdd1d8ac5e901a77a92f111440fdb1b593736b
with:
syft-version: "v1.27.1"

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ go 1.24.0

require (
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op
github.com/google/go-tpm v0.9.5
github.com/google/go-tpm v0.9.6
github.com/klauspost/compress v1.18.0
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.7.4
github.com/nats-io/nats.go v1.45.0
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats.go v1.46.1
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfr
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA=
github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo=
github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
26 changes: 3 additions & 23 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,7 @@ type JSApiStreamTemplateNamesResponse struct {
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"

// Structure that holds state for a JetStream API request that is processed
// in a separate long-lived go routine. This is to avoid possibly blocking
// ROUTE and GATEWAY connections.
// in a separate long-lived go routine. This is to avoid blocking connections.
type jsAPIRoutedReq struct {
jsub *subscription
sub *subscription
Expand Down Expand Up @@ -872,17 +871,6 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
}
jsub := rr.psubs[0]

// If this is directly from a client connection ok to do in place.
if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF {
start := time.Now()
jsub.icb(sub, c, acc, subject, reply, rmsg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return
}

// If we are here we have received this request over a non-client connection.
// We need to make sure not to block. We will send the request to a long-lived
// pool of go routines.

Expand Down Expand Up @@ -1731,8 +1719,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,

// Handle clustered version here.
if s.JetStreamIsClustered() {
// Always do in separate Go routine.
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
return
}

Expand Down Expand Up @@ -4481,14 +4468,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}

if isClustered && !req.Config.Direct {
// If we are inline with client, we still may need to do a callout for consumer info
// during this call, so place in Go routine to not block client.
// Router and Gateway API calls already in separate context.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
} else {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
}
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
return
}

Expand Down
27 changes: 11 additions & 16 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,13 +1409,13 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
if didSnap, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
var nb uint64
// Some entries can fail without an error when shutting down, don't move applied forward.
if !js.isShuttingDown() {
_, nb = n.Applied(ce.Index)
}
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
if js.hasPeerEntries(ce.Entries) || (didSnap && !isLeader) {
doSnapshot()
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
doSnapshot()
Expand Down Expand Up @@ -1992,8 +1992,8 @@ func (ca *consumerAssignment) recoveryKey() string {
return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name
}

func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) {
var didSnap, didRemoveStream, didRemoveConsumer bool
func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, error) {
var didSnap bool
isRecovering := js.isMetaRecovering()

for _, e := range entries {
Expand All @@ -2015,7 +2015,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -2029,7 +2029,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -2041,13 +2041,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.removeConsumers, key)
} else {
js.processStreamRemoval(sa)
didRemoveStream = true
}
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -2067,7 +2066,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -2087,7 +2086,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
Expand All @@ -2102,13 +2101,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
}
} else {
js.processConsumerRemoval(ca)
didRemoveConsumer = true
}
case updateStreamOp:
sa, err := decodeStreamAssignment(js.srv, buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, didRemoveStream, didRemoveConsumer, err
return didSnap, err
}
if isRecovering {
js.setStreamAssignmentRecovering(sa)
Expand All @@ -2118,16 +2116,13 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.removeStreams, key)
} else {
js.processUpdateStreamAssignment(sa)
// Since an update can be lowering replica count, we want upper layer to treat
// similar to a removal and snapshot to collapse old entries.
didRemoveStream = true
}
default:
panic(fmt.Sprintf("JetStream Cluster Unknown meta entry op type: %v", entryOp(buf[0])))
}
}
}
return didSnap, didRemoveStream, didRemoveConsumer, nil
return didSnap, nil
}

func (rg *raftGroup) isMember(id string) bool {
Expand Down
18 changes: 9 additions & 9 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6640,13 +6640,13 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) {
}

// Push recovery entries that create the stream & consumer.
_, _, _, err := js.applyMetaEntries(create, ru)
_, err := js.applyMetaEntries(create, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers), 1)

// Now push another recovery entry that deletes the stream. The
// entry that creates the consumer should now be gone.
_, _, _, err = js.applyMetaEntries(delete, ru)
_, err = js.applyMetaEntries(delete, ru)
require_NoError(t, err)
require_Len(t, len(ru.removeStreams), 1)
require_Len(t, len(ru.updateConsumers), 0)
Expand Down Expand Up @@ -6694,27 +6694,27 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) {
}

// We created a file-based stream first, but deleted it shortly after.
_, _, _, err := js.applyMetaEntries(createFileStream, ru)
_, err := js.applyMetaEntries(createFileStream, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.removeStreams), 0)

// Now push another recovery entry that deletes the stream.
// The file-based stream should not have been created.
_, _, _, err = js.applyMetaEntries(deleteFileStream, ru)
_, err = js.applyMetaEntries(deleteFileStream, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 0)
require_Len(t, len(ru.removeStreams), 1)

// Now stage a memory-based stream to be created.
_, _, _, err = js.applyMetaEntries(createMemoryStream, ru)
_, err = js.applyMetaEntries(createMemoryStream, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.removeStreams), 0)
require_Len(t, len(ru.updateConsumers), 0)

// Also create a consumer on that memory-based stream.
_, _, _, err = js.applyMetaEntries(createConsumer, ru)
_, err = js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.addStreams), 1)
require_Len(t, len(ru.removeStreams), 0)
Expand Down Expand Up @@ -6751,19 +6751,19 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) {
}

// Creating the consumer should append to update consumers list.
_, _, _, err := js.applyMetaEntries(createConsumer, ru)
_, err := js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers[":TEST"]), 1)
require_Len(t, len(ru.removeConsumers), 0)

// Deleting the consumer should append to remove consumers list and remove from update list.
_, _, _, err = js.applyMetaEntries(deleteConsumer, ru)
_, err = js.applyMetaEntries(deleteConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.removeConsumers[":TEST"]), 1)
require_Len(t, len(ru.updateConsumers[":TEST"]), 0)

// When re-creating the consumer, add to update list and remove from remove list.
_, _, _, err = js.applyMetaEntries(createConsumer, ru)
_, err = js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers[":TEST"]), 1)
require_Len(t, len(ru.removeConsumers[":TEST"]), 0)
Expand Down