diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d5d47fb55b2..cb216308d80 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22357,3 +22357,44 @@ func TestJetStreamReloadMetaCompact(t *testing.T) { require_Equal(t, s.getOpts().JetStreamMetaCompact, 0) } + +// https://github.com/nats-io/nats-server/issues/7511 +func TestJetStreamImplicitRePublishAfterSubjectTransform(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"a.>", "c.>"}, + SubjectTransform: &nats.SubjectTransformConfig{Source: "a.>", Destination: "b.>"}, + RePublish: &nats.RePublish{Destination: ">"}, // Implicitly RePublish 'b.>'. + } + // Forms a cycle since the RePublish captures both 'a.>' and 'c.>' + _, err := js.AddStream(cfg) + require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))) + + // Doesn't form a cycle as 'a.>' is mapped to 'b.>'. A RePublish for '>' can be translated to 'b.>'. + cfg.Subjects = []string{"a.>"} + _, err = js.AddStream(cfg) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("b.>") + require_NoError(t, err) + defer sub.Drain() + + // The published message should be transformed and RePublished. + _, err = js.Publish("a.hello", nil) + require_NoError(t, err) + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, msg.Subject, "b.hello") + + // Forms a cycle since the implicit RePublish on 'b.>' is lost. + // The RePublish would now mean publishing to 'c.>' which is a cycle. + cfg.Subjects = []string{"c.>"} + _, err = js.UpdateStream(cfg) + require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))) +} diff --git a/server/stream.go b/server/stream.go index e6c864a4fa3..a3aad6ce296 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2012,6 +2012,18 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } } + // Check the subject transform if any + if cfg.SubjectTransform != nil { + if cfg.SubjectTransform.Source != _EMPTY_ && !IsValidSubject(cfg.SubjectTransform.Source) { + return StreamConfig{}, NewJSStreamTransformInvalidSourceError(fmt.Errorf("%w %s", ErrBadSubject, cfg.SubjectTransform.Source)) + } + + err := ValidateMapping(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) + if err != nil { + return StreamConfig{}, NewJSStreamTransformInvalidDestinationError(err) + } + } + // If we have a republish directive check if we can create a transform here. if cfg.RePublish != nil { // Check to make sure source is a valid subset of the subjects we have. @@ -2023,6 +2035,18 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } cfg.RePublish.Source = fwcs } + // A RePublish from '>' to '>' could be used, normally this would form a cycle with the stream subjects. + // But if this aligns to a different subject based on the transform, we allow it still. + // The RePublish will be implicit based on the transform, but only if the transform's source + // is the only stream subject. + if cfg.RePublish.Destination == fwcs && cfg.RePublish.Source == fwcs && cfg.SubjectTransform != nil && + len(cfg.Subjects) == 1 && cfg.SubjectTransform.Source == cfg.Subjects[0] { + if pedantic { + return StreamConfig{}, NewJSPedanticError(fmt.Errorf("implicit republish based on subject transform")) + } + // RePublish all messages with the transformed subject. + cfg.RePublish.Source, cfg.RePublish.Destination = cfg.SubjectTransform.Destination, cfg.SubjectTransform.Destination + } var formsCycle bool for _, subj := range cfg.Subjects { if SubjectsCollide(cfg.RePublish.Destination, subj) { @@ -2038,18 +2062,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } } - // Check the subject transform if any - if cfg.SubjectTransform != nil { - if cfg.SubjectTransform.Source != _EMPTY_ && !IsValidSubject(cfg.SubjectTransform.Source) { - return StreamConfig{}, NewJSStreamTransformInvalidSourceError(fmt.Errorf("%w %s", ErrBadSubject, cfg.SubjectTransform.Source)) - } - - err := ValidateMapping(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) - if err != nil { - return StreamConfig{}, NewJSStreamTransformInvalidDestinationError(err) - } - } - // Remove placement if it's an empty object. if cfg.Placement != nil && reflect.DeepEqual(cfg.Placement, &Placement{}) { cfg.Placement = nil