diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 8bb9b227ddd..f00f47d3a6d 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -9511,6 +9511,57 @@ func TestJetStreamMirrorBasics(t *testing.T) { } +func TestJetStreamMirrorStripExpectedHeaders(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Create source and mirror streams. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "S", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "S"}, + }) + require_NoError(t, err) + + m := nats.NewMsg("foo") + pubAck, err := js.PublishMsg(m) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + // Mirror should get message. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + if si, err := js.StreamInfo("M"); err != nil { + return err + } else if si.State.Msgs != 1 { + return fmt.Errorf("expected 1 mirrored msg, got %d", si.State.Msgs) + } + return nil + }) + + m.Header.Set("Nats-Expected-Stream", "S") + pubAck, err = js.PublishMsg(m) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 2) + + // Mirror should strip expected headers and store the message. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + if si, err := js.StreamInfo("M"); err != nil { + return err + } else if si.State.Msgs != 2 { + return fmt.Errorf("expected 2 mirrored msgs, got %d", si.State.Msgs) + } + return nil + }) +} + func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/stream.go b/server/stream.go index b085c55e020..ea0f07b895d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2960,6 +2960,10 @@ func (mset *stream) setupMirrorConsumer() error { msgs := mirror.msgs sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. + if len(hdr) > 0 { + // Remove any Nats-Expected- headers as we don't want to validate them. + hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-") + } mset.queueInbound(msgs, subject, reply, hdr, msg, nil, nil) mirror.last.Store(time.Now().UnixNano()) })