Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ var (
// ErrClusterNameRemoteConflict signals that a remote server has a different cluster name.
ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts")

// ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed.
ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines")

// ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules.
ErrMalformedSubject = errors.New("malformed subject")

Expand Down
357 changes: 357 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,3 +2003,360 @@ func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
require_Equal(t, si.State.NumDeleted, 20)
require_Equal(t, si.State.NumSubjects, 4)
}

type captureLeafClusterSpacesLogger struct {
DummyLogger
ch chan string
warnCh chan string
}

func (l *captureLeafClusterSpacesLogger) Warnf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, `Server name has spaces and used as the cluster name, leaf remotes may not connect properly`) {
select {
case l.warnCh <- msg:
default:
}
}
}

func (l *captureLeafClusterSpacesLogger) Errorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, `Leafnode Error 'cluster name cannot contain spaces or new lines'`) {
select {
case l.ch <- msg:
default:
}
}
}

func TestJetStreamClusterAndNamesWithSpaces(t *testing.T) {
gwConf := `
listen: 127.0.0.1:-1
http: 127.0.0.1:-1
server_name: 'SRV %s'
jetstream: {
store_dir: '%s',
}
cluster {
name: '%s'
listen: 127.0.0.1:%d
routes = [%s]
}
server_tags: ["test"]
system_account: sys
no_auth_user: js

leafnodes {
host: "127.0.0.1"
port: -1
}

accounts {
sys {
users = [ { user: sys, pass: sys } ] }
js {
jetstream: enabled
users = [ { user: js, pass: js } ]
}
}
`
c := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 1", "GW_1_", 3, 15022, false,
func(serverName, clusterName, storeDir, conf string) string {
conf += `
gateway {
name: "S P A C E 1"
listen: 127.0.0.1:-1
}
`
return conf
})
defer c.shutdown()

c2 := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 2", "GW_2_", 3, 16022, false,
func(serverName, clusterName, storeDir, conf string) string {
conf += fmt.Sprintf(`
gateway {
name: "S P A C E 2"
listen: 127.0.0.1:-1
gateways: [{
name: "S P A C E 1"
url: "nats://127.0.0.1:%d"
}]
}
`, c.servers[0].opts.Gateway.Port)
return conf
})
defer c2.shutdown()

c3 := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 3", "GW_3_", 3, 17022, false,
func(serverName, clusterName, storeDir, conf string) string {
conf += fmt.Sprintf(`
gateway {
name: "S P A C E 3"
listen: 127.0.0.1:-1
gateways: [{
name: "S P A C E 1"
url: "nats://127.0.0.1:%d"
}]
}
`, c.servers[0].opts.Gateway.Port)
return conf
})
defer c3.shutdown()

for _, s := range c2.servers {
waitForOutboundGateways(t, s, 2, 2*time.Second)
}
for _, s := range c3.servers {
waitForOutboundGateways(t, s, 2, 2*time.Second)
}

// Leaf with spaces in name which becomes its cluster name as well.
leafConfA := `
host: "127.0.0.1"
port: -1

server_name: "L E A F S P A C E"

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfA = fmt.Sprintf(leafConfA, c.servers[0].opts.LeafNode.Port)
sconfA := createConfFile(t, []byte(leafConfA))
oA := LoadConfig(sconfA)
leafA, err := NewServer(oA)
require_NoError(t, err)
lA := &captureLeafClusterSpacesLogger{ch: make(chan string, 10), warnCh: make(chan string, 10)}
leafA.SetLogger(lA, false, false)
leafA.Start()
defer leafA.Shutdown()

// Leaf with spaces in name but with a valid cluster name is able to connect.
leafConfB := `
host: "127.0.0.1"
port: -1
http: 127.0.0.1:-1

server_name: "L E A F 2"
cluster { name: "LEAF" }

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfB = fmt.Sprintf(leafConfB, c.servers[0].opts.LeafNode.Port)
sconfB := createConfFile(t, []byte(leafConfB))
leafB, _ := RunServerWithConfig(sconfB)
lB := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)}
leafB.SetLogger(lB, false, false)
defer leafB.Shutdown()

// Leaf with valid server name but cluster name with spaces.
leafConfC := `
host: "127.0.0.1"
port: -1
http: 127.0.0.1:-1

server_name: "LEAF3"
cluster { name: "L E A F 3" }

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfC = fmt.Sprintf(leafConfC, c.servers[0].opts.LeafNode.Port)
sconfC := createConfFile(t, []byte(leafConfC))
leafC, _ := RunServerWithConfig(sconfC)
lC := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)}
leafC.SetLogger(lC, false, false)
defer leafC.Shutdown()

// Leafs with valid server name but using protocol special characters in cluster name.
leafConfD := `
host: "127.0.0.1"
port: -1
http: 127.0.0.1:-1

server_name: "LEAF4"
cluster { name: "LEAF
4" }

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfD = fmt.Sprintf(leafConfD, c.servers[0].opts.LeafNode.Port)
sconfD := createConfFile(t, []byte(leafConfD))
leafD, _ := RunServerWithConfig(sconfD)
lD := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)}
leafD.SetLogger(lD, false, false)
defer leafD.Shutdown()

leafConfD2 := `
host: "127.0.0.1"
port: -1
http: 127.0.0.1:-1

server_name: "LEAF42"
cluster { name: "LEAF4\r2" }

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfD2 = fmt.Sprintf(leafConfD2, c.servers[0].opts.LeafNode.Port)
sconfD2 := createConfFile(t, []byte(leafConfD2))
leafD2, _ := RunServerWithConfig(sconfD2)
lD2 := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)}
leafD2.SetLogger(lD2, false, false)
defer leafD2.Shutdown()

leafConfD3 := `
host: "127.0.0.1"
port: -1
http: 127.0.0.1:-1

server_name: "LEAF43"
cluster { name: "LEAF4\t3" }

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfD3 = fmt.Sprintf(leafConfD3, c.servers[0].opts.LeafNode.Port)
sconfD3 := createConfFile(t, []byte(leafConfD3))
leafD3, _ := RunServerWithConfig(sconfD3)
lD3 := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)}
leafD3.SetLogger(lD3, false, false)
defer leafD3.Shutdown()

// Leaf with valid configuration should be able to connect to GW cluster with spaces in names.
leafConfE := `
host: "127.0.0.1"
port: -1
http: 127.0.0.1:-1

server_name: "LEAF5"
cluster { name: "LEAF5" }

leafnodes {
host: "127.0.0.1"
port: -1
advertise: "127.0.0.1"
remotes: [ {
url: "nats://127.0.0.1:%d"
} ]
}
`
leafConfE = fmt.Sprintf(leafConfE, c.servers[0].opts.LeafNode.Port)
sconfE := createConfFile(t, []byte(leafConfE))
leafE, _ := RunServerWithConfig(sconfE)
lE := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)}
leafE.SetLogger(lE, false, false)
defer leafE.Shutdown()

// Finally do a smoke test of connectivity among gateways and that JS is working
// when using clusters with spaces still.
nc1, js1 := jsClientConnect(t, c.servers[1])
defer nc1.Close()

_, err = js1.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo"},
})
require_NoError(t, err)
c.waitOnStreamLeader("js", "foo")

sub1, err := nc1.SubscribeSync("foo")
require_NoError(t, err)
nc1.Flush()

// Check that invalid configs got the errors.
select {
case <-lA.ch:
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for error")
}
select {
case <-lC.ch:
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for error")
}
select {
case <-lD.ch:
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for error")
}
select {
case <-lD2.ch:
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for error")
}
select {
case <-lD3.ch:
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for error")
}

// Check that we got a warning about the server name being reused
// for the cluster name.
select {
case <-lA.warnCh:
case <-time.After(5 * time.Second):
t.Errorf("Timed out waiting for warning")
}

// Check that valid configs were ok still.
select {
case <-lB.ch:
t.Errorf("Unexpected error from valid leafnode config")
case <-lE.ch:
t.Errorf("Unexpected error from valid leafnode config")
case <-time.After(2 * time.Second):
}

nc2, js2 := jsClientConnect(t, c2.servers[1])
defer nc2.Close()
nc2.Publish("foo", []byte("test"))
nc2.Flush()
time.Sleep(250 * time.Millisecond)

msg, err := sub1.NextMsg(1 * time.Second)
require_NoError(t, err)
require_Equal(t, "test", string(msg.Data))
sinfo, err := js2.StreamInfo("foo")
require_NoError(t, err)
require_Equal(t, sinfo.State.Msgs, 1)
}
8 changes: 8 additions & 0 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sync"
"sync/atomic"
"time"
"unicode"

"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
Expand Down Expand Up @@ -1764,6 +1765,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return err
}

// Reject a cluster that contains spaces or line breaks.
if proto.Cluster != _EMPTY_ && strings.ContainsFunc(proto.Cluster, unicode.IsSpace) {
Comment thread
wallyqs marked this conversation as resolved.
c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
c.closeConnection(ProtocolViolation)
return ErrClusterNameHasSpaces
}

// Check for cluster name collisions.
if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
Expand Down
Loading