diff --git a/server/errors.go b/server/errors.go index 8efa7ac02ea..c096bbef92b 100644 --- a/server/errors.go +++ b/server/errors.go @@ -180,6 +180,9 @@ var ( // ErrClusterNameRemoteConflict signals that a remote server has a different cluster name. ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts") + // ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed. + ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines") + // ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules. ErrMalformedSubject = errors.New("malformed subject") diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 92c88bf12c9..5f5e726c676 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2003,3 +2003,360 @@ func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { require_Equal(t, si.State.NumDeleted, 20) require_Equal(t, si.State.NumSubjects, 4) } + +type captureLeafClusterSpacesLogger struct { + DummyLogger + ch chan string + warnCh chan string +} + +func (l *captureLeafClusterSpacesLogger) Warnf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, `Server name has spaces and used as the cluster name, leaf remotes may not connect properly`) { + select { + case l.warnCh <- msg: + default: + } + } +} + +func (l *captureLeafClusterSpacesLogger) Errorf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, `Leafnode Error 'cluster name cannot contain spaces or new lines'`) { + select { + case l.ch <- msg: + default: + } + } +} + +func TestJetStreamClusterAndNamesWithSpaces(t *testing.T) { + gwConf := ` + listen: 127.0.0.1:-1 + http: 127.0.0.1:-1 + server_name: 'SRV %s' + jetstream: { + store_dir: '%s', + } + cluster { + name: '%s' + listen: 127.0.0.1:%d + routes = [%s] + } + server_tags: ["test"] + system_account: sys + no_auth_user: js + + leafnodes { + host: "127.0.0.1" + port: -1 + } + + accounts { + sys { + users = [ { user: sys, pass: sys } ] } + js { + jetstream: enabled + users = [ { user: js, pass: js } ] + } + } + ` + c := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 1", "GW_1_", 3, 15022, false, + func(serverName, clusterName, storeDir, conf string) string { + conf += ` + gateway { + name: "S P A C E 1" + listen: 127.0.0.1:-1 + } + ` + return conf + }) + defer c.shutdown() + + c2 := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 2", "GW_2_", 3, 16022, false, + func(serverName, clusterName, storeDir, conf string) string { + conf += fmt.Sprintf(` + gateway { + name: "S P A C E 2" + listen: 127.0.0.1:-1 + gateways: [{ + name: "S P A C E 1" + url: "nats://127.0.0.1:%d" + }] + } + `, c.servers[0].opts.Gateway.Port) + return conf + }) + defer c2.shutdown() + + c3 := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 3", "GW_3_", 3, 17022, false, + func(serverName, clusterName, storeDir, conf string) string { + conf += fmt.Sprintf(` + gateway { + name: "S P A C E 3" + listen: 127.0.0.1:-1 + gateways: [{ + name: "S P A C E 1" + url: "nats://127.0.0.1:%d" + }] + } + `, c.servers[0].opts.Gateway.Port) + return conf + }) + defer c3.shutdown() + + for _, s := range c2.servers { + waitForOutboundGateways(t, s, 2, 2*time.Second) + } + for _, s := range c3.servers { + waitForOutboundGateways(t, s, 2, 2*time.Second) + } + + // Leaf with spaces in name which becomes its cluster name as well. + leafConfA := ` + host: "127.0.0.1" + port: -1 + + server_name: "L E A F S P A C E" + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfA = fmt.Sprintf(leafConfA, c.servers[0].opts.LeafNode.Port) + sconfA := createConfFile(t, []byte(leafConfA)) + oA := LoadConfig(sconfA) + leafA, err := NewServer(oA) + require_NoError(t, err) + lA := &captureLeafClusterSpacesLogger{ch: make(chan string, 10), warnCh: make(chan string, 10)} + leafA.SetLogger(lA, false, false) + leafA.Start() + defer leafA.Shutdown() + + // Leaf with spaces in name but with a valid cluster name is able to connect. + leafConfB := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "L E A F 2" + cluster { name: "LEAF" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfB = fmt.Sprintf(leafConfB, c.servers[0].opts.LeafNode.Port) + sconfB := createConfFile(t, []byte(leafConfB)) + leafB, _ := RunServerWithConfig(sconfB) + lB := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafB.SetLogger(lB, false, false) + defer leafB.Shutdown() + + // Leaf with valid server name but cluster name with spaces. + leafConfC := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF3" + cluster { name: "L E A F 3" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfC = fmt.Sprintf(leafConfC, c.servers[0].opts.LeafNode.Port) + sconfC := createConfFile(t, []byte(leafConfC)) + leafC, _ := RunServerWithConfig(sconfC) + lC := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafC.SetLogger(lC, false, false) + defer leafC.Shutdown() + + // Leafs with valid server name but using protocol special characters in cluster name. + leafConfD := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF4" + cluster { name: "LEAF +4" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfD = fmt.Sprintf(leafConfD, c.servers[0].opts.LeafNode.Port) + sconfD := createConfFile(t, []byte(leafConfD)) + leafD, _ := RunServerWithConfig(sconfD) + lD := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafD.SetLogger(lD, false, false) + defer leafD.Shutdown() + + leafConfD2 := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF42" + cluster { name: "LEAF4\r2" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfD2 = fmt.Sprintf(leafConfD2, c.servers[0].opts.LeafNode.Port) + sconfD2 := createConfFile(t, []byte(leafConfD2)) + leafD2, _ := RunServerWithConfig(sconfD2) + lD2 := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafD2.SetLogger(lD2, false, false) + defer leafD2.Shutdown() + + leafConfD3 := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF43" + cluster { name: "LEAF4\t3" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfD3 = fmt.Sprintf(leafConfD3, c.servers[0].opts.LeafNode.Port) + sconfD3 := createConfFile(t, []byte(leafConfD3)) + leafD3, _ := RunServerWithConfig(sconfD3) + lD3 := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafD3.SetLogger(lD3, false, false) + defer leafD3.Shutdown() + + // Leaf with valid configuration should be able to connect to GW cluster with spaces in names. + leafConfE := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF5" + cluster { name: "LEAF5" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfE = fmt.Sprintf(leafConfE, c.servers[0].opts.LeafNode.Port) + sconfE := createConfFile(t, []byte(leafConfE)) + leafE, _ := RunServerWithConfig(sconfE) + lE := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafE.SetLogger(lE, false, false) + defer leafE.Shutdown() + + // Finally do a smoke test of connectivity among gateways and that JS is working + // when using clusters with spaces still. + nc1, js1 := jsClientConnect(t, c.servers[1]) + defer nc1.Close() + + _, err = js1.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + c.waitOnStreamLeader("js", "foo") + + sub1, err := nc1.SubscribeSync("foo") + require_NoError(t, err) + nc1.Flush() + + // Check that invalid configs got the errors. + select { + case <-lA.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lC.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lD.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lD2.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lD3.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + + // Check that we got a warning about the server name being reused + // for the cluster name. + select { + case <-lA.warnCh: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for warning") + } + + // Check that valid configs were ok still. + select { + case <-lB.ch: + t.Errorf("Unexpected error from valid leafnode config") + case <-lE.ch: + t.Errorf("Unexpected error from valid leafnode config") + case <-time.After(2 * time.Second): + } + + nc2, js2 := jsClientConnect(t, c2.servers[1]) + defer nc2.Close() + nc2.Publish("foo", []byte("test")) + nc2.Flush() + time.Sleep(250 * time.Millisecond) + + msg, err := sub1.NextMsg(1 * time.Second) + require_NoError(t, err) + require_Equal(t, "test", string(msg.Data)) + sinfo, err := js2.StreamInfo("foo") + require_NoError(t, err) + require_Equal(t, sinfo.State.Msgs, 1) +} diff --git a/server/leafnode.go b/server/leafnode.go index 3c20cbdf437..3e66909c9c5 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -34,6 +34,7 @@ import ( "sync" "sync/atomic" "time" + "unicode" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" @@ -1764,6 +1765,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return err } + // Reject a cluster that contains spaces or line breaks. + if proto.Cluster != _EMPTY_ && strings.ContainsFunc(proto.Cluster, unicode.IsSpace) { + c.sendErrAndErr(ErrClusterNameHasSpaces.Error()) + c.closeConnection(ProtocolViolation) + return ErrClusterNameHasSpaces + } + // Check for cluster name collisions. if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn { c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error()) diff --git a/server/server.go b/server/server.go index 52d95810e53..58be465a5c9 100644 --- a/server/server.go +++ b/server/server.go @@ -30,6 +30,7 @@ import ( "net/url" "regexp" "runtime/pprof" + "unicode" // Allow dynamic profiling. _ "net/http/pprof" @@ -2351,6 +2352,9 @@ func (s *Server) Start() { // Solicit remote servers for leaf node connections. if len(opts.LeafNode.Remotes) > 0 { s.solicitLeafNodeRemotes(opts.LeafNode.Remotes) + if opts.Cluster.Name == opts.ServerName && strings.ContainsFunc(opts.Cluster.Name, unicode.IsSpace) { + s.Warnf("Server name has spaces and used as the cluster name, leaf remotes may not connect properly") + } } // TODO (ik): I wanted to refactor this by starting the client