diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6c486391b1e..c0c387a6beb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5672,6 +5672,7 @@ type selectPeerError struct { misc bool noJsClust bool noMatchTags map[string]struct{} + excludeTags map[string]struct{} } func (e *selectPeerError) Error() string { @@ -5704,6 +5705,21 @@ func (e *selectPeerError) Error() string { } b.WriteString("]") } + if len(e.excludeTags) != 0 { + b.WriteString(", tags excluded [") + var firstTagWritten bool + for tag := range e.excludeTags { + if firstTagWritten { + b.WriteString(", ") + } + firstTagWritten = true + b.WriteRune('\'') + b.WriteString(tag) + b.WriteRune('\'') + } + b.WriteString("]") + } + return b.String() } @@ -5714,6 +5730,13 @@ func (e *selectPeerError) addMissingTag(t string) { e.noMatchTags[t] = struct{}{} } +func (e *selectPeerError) addExcludeTag(t string) { + if e.excludeTags == nil { + e.excludeTags = map[string]struct{}{} + } + e.excludeTags[t] = struct{}{} +} + func (e *selectPeerError) accumulate(eAdd *selectPeerError) { if eAdd == nil { return @@ -5732,6 +5755,9 @@ func (e *selectPeerError) accumulate(eAdd *selectPeerError) { for tag := range eAdd.noMatchTags { e.addMissingTag(tag) } + for tag := range eAdd.excludeTags { + e.addExcludeTag(tag) + } } // selectPeerGroup will select a group of peers to start a raft group. @@ -5748,9 +5774,19 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo } // Check for tags. - var tags []string - if cfg.Placement != nil && len(cfg.Placement.Tags) > 0 { - tags = cfg.Placement.Tags + type tagInfo struct { + tag string + exclude bool + } + var ti []tagInfo + if cfg.Placement != nil { + ti = make([]tagInfo, 0, len(cfg.Placement.Tags)) + for _, t := range cfg.Placement.Tags { + ti = append(ti, tagInfo{ + tag: strings.TrimPrefix(t, "!"), + exclude: strings.HasPrefix(t, "!"), + }) + } } // Used for weighted sorting based on availability. @@ -5767,8 +5803,8 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo uniqueTagPrefix := s.getOpts().JetStreamUniqueTag if uniqueTagPrefix != _EMPTY_ { - for _, tag := range tags { - if strings.HasPrefix(tag, uniqueTagPrefix) { + for _, t := range ti { + if strings.HasPrefix(t.tag, uniqueTagPrefix) { // disable uniqueness check if explicitly listed in tags uniqueTagPrefix = _EMPTY_ break @@ -5884,14 +5920,21 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo continue } - if len(tags) > 0 { + if len(ti) > 0 { matched := true - for _, t := range tags { - if !ni.tags.Contains(t) { + for _, t := range ti { + contains := ni.tags.Contains(t.tag) + if t.exclude && contains { + matched = false + s.Debugf("Peer selection: discard %s@%s tags: %v reason: excluded tag %s present", + ni.name, ni.cluster, ni.tags, t) + err.addExcludeTag(t.tag) + break + } else if !t.exclude && !contains { matched = false s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present", ni.name, ni.cluster, ni.tags, t) - err.addMissingTag(t) + err.addMissingTag(t.tag) break } } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index e2d73d1d1a0..e255b5f82f0 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5378,6 +5378,92 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { }) } +func TestJetStreamStreamTagPlacement(t *testing.T) { + c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "R3S", 3, + func(serverName, clusterName, storeDir, conf string) string { + return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName) + }) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for i, c := range []nats.StreamConfig{ + {Replicas: 1, Placement: &nats.Placement{Tags: []string{"!S-1", "!S-2", "!S-3"}}}, // exclude all servers. + {Replicas: 2, Placement: &nats.Placement{Tags: []string{"!S-1", "!S-2"}}}, // exclude two servers. + {Replicas: 3, Placement: &nats.Placement{Tags: []string{"!S-2"}}}, // not enough server. + } { + c.Name = fmt.Sprintf("TEST%d", i) + c.Subjects = []string{c.Name} + _, err := js.AddStream(&c) + require_Error(t, err) + require_Contains(t, err.Error(), "no suitable peers for placement", "tags excluded", "S-2") + } + + // Test adding excluded tags to an existing stream. + cfg := &nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}, Replicas: 3} + _, err := js.AddStream(cfg) + require_NoError(t, err) + + cfg.Placement = &nats.Placement{Tags: []string{"!S-1"}} + _, err = js.UpdateStream(cfg) + require_Error(t, err) + require_Contains(t, err.Error(), "no suitable peers for placement", "tags excluded", "S-1") + + // Test changing replicas to 2 and then add the excluded tag. + cfg.Replicas = 2 + cfg.Placement = nil + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + var leaderName string + // Wait until we have two replicas + checkFor(t, 6*time.Second, 1*time.Second, func() error { + s, err := js.StreamInfo("TEST") + if err != nil { + return err + } + + if s.Cluster.Leader == "" { + return fmt.Errorf("no leader yet") + } + + if len(s.Cluster.Replicas) != 1 { + return fmt.Errorf("expected 1 replica, got %d", len(s.Cluster.Replicas)) + } + leaderName = s.Cluster.Leader + return nil + }) + + // Now that we only have two servers, we can exclude one. + cfg.Replicas = 2 + cfg.Placement = &nats.Placement{Tags: []string{"!" + leaderName}} + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + + // Check that the stream grab the correct servers. + // This can take a bit as we need to first add the new replica and later remove the old one. + checkFor(t, 6*time.Second, 1*time.Second, func() error { + s, err := js.StreamInfo("TEST") + if err != nil { + return err + } + + if len(s.Cluster.Replicas) != 1 { + return fmt.Errorf("expected 1 replica, got %d", len(s.Cluster.Replicas)) + } + + if s.Cluster.Leader == leaderName { + return fmt.Errorf("expected leader to be different than %q", leaderName) + } + + if s.Cluster.Replicas[0].Name == leaderName { + return fmt.Errorf("expected replica to be different than %q", leaderName) + } + return nil + }) +} + func TestJetStreamClusterObserverNotElectedMetaLeader(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()