diff --git a/server/client.go b/server/client.go index 3c3a6d5eed2..aa20fe7ad71 100644 --- a/server/client.go +++ b/server/client.go @@ -4131,6 +4131,34 @@ func removeHeaderIfPresent(hdr []byte, key string) []byte { return hdr } +func removeHeaderIfPrefixPresent(hdr []byte, prefix string) []byte { + var index int + for { + if index >= len(hdr) { + return hdr + } + + start := bytes.Index(hdr[index:], []byte(prefix)) + if start < 0 { + return hdr + } + index += start + if index < 1 || hdr[index-1] != '\n' { + return hdr + } + + end := bytes.Index(hdr[index+len(prefix):], []byte(_CRLF_)) + if end < 0 { + return hdr + } + + hdr = append(hdr[:index], hdr[index+end+len(prefix)+len(_CRLF_):]...) + if len(hdr) <= len(emptyHdrLine) { + return nil + } + } +} + // Generate a new header based on optional original header and key value. // More used in JetStream layers. func genHeader(hdr []byte, key, value string) []byte { diff --git a/server/client_test.go b/server/client_test.go index a0a17799e67..21f430c5440 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2944,3 +2944,21 @@ func TestTLSClientHandshakeFirstAndInProcessConnection(t *testing.T) { t.Fatal("Should have not got an error retrieving TLS connection state") } } + +func TestRemoveHeaderIfPrefixPresent(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + hdr = genHeader(hdr, "a", "1") + hdr = genHeader(hdr, JSExpectedStream, "my-stream") + hdr = genHeader(hdr, JSExpectedLastSeq, "22") + hdr = genHeader(hdr, "b", "2") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + hdr = genHeader(hdr, JSExpectedLastMsgId, "1") + hdr = genHeader(hdr, "c", "3") + + hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-") + + if !bytes.Equal(hdr, []byte("NATS/1.0\r\na: 1\r\nb: 2\r\nc: 3\r\n\r\n")) { + t.Fatalf("Expected headers to be stripped, got %q", hdr) + } +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index bb985f1ed73..f49e449acdc 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11968,6 +11968,60 @@ func TestJetStreamSourceWorkingQueueWithLimit(t *testing.T) { } } +func TestJetStreamStreamSourceFromKV(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API reuqests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Create a kv store + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "test"}) + require_NoError(t, err) + + // Create a stream with a source from the kv store + _, err = js.AddStream(&nats.StreamConfig{Name: "test", Retention: nats.InterestPolicy, Sources: []*nats.StreamSource{{Name: "KV_" + kv.Bucket()}}}) + require_NoError(t, err) + + // Create a interested consumer + _, err = js.AddConsumer("test", &nats.ConsumerConfig{Durable: "durable", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + ss, err := js.PullSubscribe("", "", nats.Bind("test", "durable")) + require_NoError(t, err) + + rev1, err := kv.Create("key", []byte("value1")) + require_NoError(t, err) + + m, err := ss.Fetch(1, nats.MaxWait(500*time.Millisecond)) + require_NoError(t, err) + require_NoError(t, m[0].Ack()) + if string(m[0].Data) != "value1" { + t.Fatalf("Expected value1, got %s", m[0].Data) + } + + rev2, err := kv.Update("key", []byte("value2"), rev1) + require_NoError(t, err) + + _, err = kv.Update("key", []byte("value3"), rev2) + require_NoError(t, err) + + m, err = ss.Fetch(1, nats.MaxWait(500*time.Millisecond)) + require_NoError(t, err) + require_NoError(t, m[0].Ack()) + if string(m[0].Data) != "value2" { + t.Fatalf("Expected value2, got %s", m[0].Data) + } + + m, err = ss.Fetch(1, nats.MaxWait(500*time.Millisecond)) + require_NoError(t, err) + require_NoError(t, m[0].Ack()) + if string(m[0].Data) != "value3" { + t.Fatalf("Expected value3, got %s", m[0].Data) + } +} + func TestJetStreamInputTransform(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/stream.go b/server/stream.go index db766654f71..614b9aab3b7 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3236,6 +3236,9 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { // If we are daisy chained here make sure to remove the original one. if len(hdr) > 0 { hdr = removeHeaderIfPresent(hdr, JSStreamSource) + + // Remove any Nats-Expected- headers as we don't want to validate them. + hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-") } // Hold onto the origin reply which has all the metadata. hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))