diff --git a/server/client.go b/server/client.go index 191f3be7582..cd1ba3e839c 100644 --- a/server/client.go +++ b/server/client.go @@ -4337,7 +4337,7 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra // Will remove a header if present. func removeHeaderIfPresent(hdr []byte, key string) []byte { - start := bytes.Index(hdr, []byte(key)) + start := bytes.Index(hdr, []byte(key+":")) // key can't be first and we want to check that it is preceded by a '\n' if start < 1 || hdr[start-1] != '\n' { return hdr diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index b6b17b1e59b..ea366cf3ad2 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9441,6 +9441,53 @@ func TestJetStreamClusterScheduledDelayedMessage(t *testing.T) { } } +func TestJetStreamClusterScheduledDelayedMessageReversedHeaderOrder(t *testing.T) { + for _, replicas := range []int{1, 3} { + for _, storage := range []StorageType{FileStorage, MemoryStorage} { + t.Run(fmt.Sprintf("R%d/%s", replicas, storage), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Storage: storage, + Replicas: replicas, + AllowMsgSchedules: true, + AllowMsgTTL: true, + } + _, err := jsStreamCreate(t, nc, cfg) + require_NoError(t, err) + + sl := c.streamLeader(globalAccountName, "TEST") + gacc := sl.globalAccount() + // The Nats-Schedule headers share a common prefix, so make sure if these are ordered differently + // we can still properly schedule a message. + hdr := genHeader(nil, "Nats-Schedule-Target", "foo.publish") + hdr = genHeader(hdr, "Nats-Schedule", "@at 1970-01-01T00:00:00Z") + require_NoError(t, sl.sendInternalAccountMsgWithReply(gacc, "foo.schedule", _EMPTY_, hdr, nil, false)) + + mset, err := gacc.lookupStream("TEST") + require_NoError(t, err) + + // Waiting for the delayed message to be published. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + state := mset.state() + if state.LastSeq != 2 { + return fmt.Errorf("expected last seq 2, got %d", state.LastSeq) + } else if state.Msgs != 1 { + return fmt.Errorf("expected 1 msg, got %d", state.Msgs) + } + return nil + }) + }) + } + } +} + func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *testing.T) { clusterName := "R3S" c := createJetStreamClusterExplicit(t, clusterName, 3)