Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22357,3 +22357,44 @@ func TestJetStreamReloadMetaCompact(t *testing.T) {

require_Equal(t, s.getOpts().JetStreamMetaCompact, 0)
}

// https://github.com/nats-io/nats-server/issues/7511
func TestJetStreamImplicitRePublishAfterSubjectTransform(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"a.>", "c.>"},
SubjectTransform: &nats.SubjectTransformConfig{Source: "a.>", Destination: "b.>"},
RePublish: &nats.RePublish{Destination: ">"}, // Implicitly RePublish 'b.>'.
}
// Forms a cycle since the RePublish captures both 'a.>' and 'c.>'
_, err := js.AddStream(cfg)
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle")))

// Doesn't form a cycle as 'a.>' is mapped to 'b.>'. A RePublish for '>' can be translated to 'b.>'.
cfg.Subjects = []string{"a.>"}
_, err = js.AddStream(cfg)
require_NoError(t, err)

sub, err := nc.SubscribeSync("b.>")
require_NoError(t, err)
defer sub.Drain()

// The published message should be transformed and RePublished.
_, err = js.Publish("a.hello", nil)
require_NoError(t, err)
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, msg.Subject, "b.hello")

// Forms a cycle since the implicit RePublish on 'b.>' is lost.
// The RePublish would now mean publishing to 'c.>' which is a cycle.
cfg.Subjects = []string{"c.>"}
_, err = js.UpdateStream(cfg)
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle")))
}
36 changes: 24 additions & 12 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,18 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
}
}

// Check the subject transform if any
if cfg.SubjectTransform != nil {
if cfg.SubjectTransform.Source != _EMPTY_ && !IsValidSubject(cfg.SubjectTransform.Source) {
return StreamConfig{}, NewJSStreamTransformInvalidSourceError(fmt.Errorf("%w %s", ErrBadSubject, cfg.SubjectTransform.Source))
}

err := ValidateMapping(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
return StreamConfig{}, NewJSStreamTransformInvalidDestinationError(err)
}
}

// If we have a republish directive check if we can create a transform here.
if cfg.RePublish != nil {
// Check to make sure source is a valid subset of the subjects we have.
Expand All @@ -2023,6 +2035,18 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
}
cfg.RePublish.Source = fwcs
}
// A RePublish from '>' to '>' could be used, normally this would form a cycle with the stream subjects.
// But if this aligns to a different subject based on the transform, we allow it still.
// The RePublish will be implicit based on the transform, but only if the transform's source
// is the only stream subject.
if cfg.RePublish.Destination == fwcs && cfg.RePublish.Source == fwcs && cfg.SubjectTransform != nil &&
len(cfg.Subjects) == 1 && cfg.SubjectTransform.Source == cfg.Subjects[0] {
if pedantic {
return StreamConfig{}, NewJSPedanticError(fmt.Errorf("implicit republish based on subject transform"))
}
// RePublish all messages with the transformed subject.
cfg.RePublish.Source, cfg.RePublish.Destination = cfg.SubjectTransform.Destination, cfg.SubjectTransform.Destination
}
var formsCycle bool
for _, subj := range cfg.Subjects {
if SubjectsCollide(cfg.RePublish.Destination, subj) {
Expand All @@ -2038,18 +2062,6 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo
}
}

// Check the subject transform if any
if cfg.SubjectTransform != nil {
if cfg.SubjectTransform.Source != _EMPTY_ && !IsValidSubject(cfg.SubjectTransform.Source) {
return StreamConfig{}, NewJSStreamTransformInvalidSourceError(fmt.Errorf("%w %s", ErrBadSubject, cfg.SubjectTransform.Source))
}

err := ValidateMapping(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
return StreamConfig{}, NewJSStreamTransformInvalidDestinationError(err)
}
}

// Remove placement if it's an empty object.
if cfg.Placement != nil && reflect.DeepEqual(cfg.Placement, &Placement{}) {
cfg.Placement = nil
Expand Down