Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5672,6 +5672,7 @@ type selectPeerError struct {
misc bool
noJsClust bool
noMatchTags map[string]struct{}
excludeTags map[string]struct{}
}

func (e *selectPeerError) Error() string {
Expand Down Expand Up @@ -5704,6 +5705,21 @@ func (e *selectPeerError) Error() string {
}
b.WriteString("]")
}
if len(e.excludeTags) != 0 {
b.WriteString(", tags excluded [")
var firstTagWritten bool
for tag := range e.excludeTags {
if firstTagWritten {
b.WriteString(", ")
}
firstTagWritten = true
b.WriteRune('\'')
b.WriteString(tag)
b.WriteRune('\'')
}
b.WriteString("]")
}

return b.String()
}

Expand All @@ -5714,6 +5730,13 @@ func (e *selectPeerError) addMissingTag(t string) {
e.noMatchTags[t] = struct{}{}
}

func (e *selectPeerError) addExcludeTag(t string) {
if e.excludeTags == nil {
e.excludeTags = map[string]struct{}{}
}
e.excludeTags[t] = struct{}{}
}

func (e *selectPeerError) accumulate(eAdd *selectPeerError) {
if eAdd == nil {
return
Expand All @@ -5732,6 +5755,9 @@ func (e *selectPeerError) accumulate(eAdd *selectPeerError) {
for tag := range eAdd.noMatchTags {
e.addMissingTag(tag)
}
for tag := range eAdd.excludeTags {
e.addExcludeTag(tag)
}
}

// selectPeerGroup will select a group of peers to start a raft group.
Expand All @@ -5748,9 +5774,19 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}

// Check for tags.
var tags []string
if cfg.Placement != nil && len(cfg.Placement.Tags) > 0 {
tags = cfg.Placement.Tags
type tagInfo struct {
tag string
exclude bool
}
var ti []tagInfo
if cfg.Placement != nil {
ti = make([]tagInfo, 0, len(cfg.Placement.Tags))
for _, t := range cfg.Placement.Tags {
ti = append(ti, tagInfo{
tag: strings.TrimPrefix(t, "!"),
exclude: strings.HasPrefix(t, "!"),
})
}
}

// Used for weighted sorting based on availability.
Expand All @@ -5767,8 +5803,8 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo

uniqueTagPrefix := s.getOpts().JetStreamUniqueTag
if uniqueTagPrefix != _EMPTY_ {
for _, tag := range tags {
if strings.HasPrefix(tag, uniqueTagPrefix) {
for _, t := range ti {
if strings.HasPrefix(t.tag, uniqueTagPrefix) {
// disable uniqueness check if explicitly listed in tags
uniqueTagPrefix = _EMPTY_
break
Expand Down Expand Up @@ -5884,14 +5920,21 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
continue
}

if len(tags) > 0 {
if len(ti) > 0 {
matched := true
for _, t := range tags {
if !ni.tags.Contains(t) {
for _, t := range ti {
contains := ni.tags.Contains(t.tag)
if t.exclude && contains {
matched = false
s.Debugf("Peer selection: discard %s@%s tags: %v reason: excluded tag %s present",
ni.name, ni.cluster, ni.tags, t)
err.addExcludeTag(t.tag)
break
} else if !t.exclude && !contains {
matched = false
s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present",
ni.name, ni.cluster, ni.tags, t)
err.addMissingTag(t)
err.addMissingTag(t.tag)
break
}
}
Expand Down
86 changes: 86 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5378,6 +5378,92 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) {
})
}

func TestJetStreamStreamTagPlacement(t *testing.T) {
c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "R3S", 3,
func(serverName, clusterName, storeDir, conf string) string {
return fmt.Sprintf("%s\nserver_tags: [%s]", conf, serverName)
})
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

for i, c := range []nats.StreamConfig{
{Replicas: 1, Placement: &nats.Placement{Tags: []string{"!S-1", "!S-2", "!S-3"}}}, // exclude all servers.
{Replicas: 2, Placement: &nats.Placement{Tags: []string{"!S-1", "!S-2"}}}, // exclude two servers.
{Replicas: 3, Placement: &nats.Placement{Tags: []string{"!S-2"}}}, // not enough server.
} {
c.Name = fmt.Sprintf("TEST%d", i)
c.Subjects = []string{c.Name}
_, err := js.AddStream(&c)
require_Error(t, err)
require_Contains(t, err.Error(), "no suitable peers for placement", "tags excluded", "S-2")
}

// Test adding excluded tags to an existing stream.
cfg := &nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}, Replicas: 3}
_, err := js.AddStream(cfg)
require_NoError(t, err)

cfg.Placement = &nats.Placement{Tags: []string{"!S-1"}}
_, err = js.UpdateStream(cfg)
require_Error(t, err)
require_Contains(t, err.Error(), "no suitable peers for placement", "tags excluded", "S-1")

// Test changing replicas to 2 and then add the excluded tag.
cfg.Replicas = 2
cfg.Placement = nil
_, err = js.UpdateStream(cfg)
require_NoError(t, err)

var leaderName string
// Wait until we have two replicas
checkFor(t, 6*time.Second, 1*time.Second, func() error {
s, err := js.StreamInfo("TEST")
if err != nil {
return err
}

if s.Cluster.Leader == "" {
return fmt.Errorf("no leader yet")
}

if len(s.Cluster.Replicas) != 1 {
return fmt.Errorf("expected 1 replica, got %d", len(s.Cluster.Replicas))
}
leaderName = s.Cluster.Leader
return nil
})

// Now that we only have two servers, we can exclude one.
cfg.Replicas = 2
cfg.Placement = &nats.Placement{Tags: []string{"!" + leaderName}}
_, err = js.UpdateStream(cfg)
require_NoError(t, err)

// Check that the stream grab the correct servers.
// This can take a bit as we need to first add the new replica and later remove the old one.
checkFor(t, 6*time.Second, 1*time.Second, func() error {
s, err := js.StreamInfo("TEST")
if err != nil {
return err
}

if len(s.Cluster.Replicas) != 1 {
return fmt.Errorf("expected 1 replica, got %d", len(s.Cluster.Replicas))
}

if s.Cluster.Leader == leaderName {
return fmt.Errorf("expected leader to be different than %q", leaderName)
}

if s.Cluster.Replicas[0].Name == leaderName {
return fmt.Errorf("expected replica to be different than %q", leaderName)
}
return nil
})
}

func TestJetStreamClusterObserverNotElectedMetaLeader(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down
Loading