Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions server/config_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2398,6 +2398,98 @@ func TestConfigCheck(t *testing.T) {
errorLine: 9,
errorPos: 9,
},
{
name: "leafnode proxy with unsupported scheme",
config: `
leafnodes {
remotes = [
{
url: "ws://127.0.0.1:7422"
proxy {
url: "ftp://proxy.example.com:8080"
}
}
]
}
`,
err: errors.New("proxy URL scheme must be http or https, got: ftp"),
errorLine: 6,
errorPos: 8,
},
{
name: "leafnode proxy with missing host",
config: `
leafnodes {
remotes = [
{
url: "ws://127.0.0.1:7422"
proxy {
url: "http://"
}
}
]
}
`,
err: errors.New("proxy URL must specify a host"),
errorLine: 6,
errorPos: 8,
},
{
name: "leafnode proxy with username but no password",
config: `
leafnodes {
remotes = [
{
url: "ws://127.0.0.1:7422"
proxy {
url: "http://proxy.example.com:8080"
username: "testuser"
}
}
]
}
`,
err: errors.New("proxy username and password must both be specified or both be empty"),
errorLine: 6,
errorPos: 8,
},
{
name: "leafnode proxy with password but no username",
config: `
leafnodes {
remotes = [
{
url: "ws://127.0.0.1:7422"
proxy {
url: "http://proxy.example.com:8080"
password: "testpass"
}
}
]
}
`,
err: errors.New("proxy username and password must both be specified or both be empty"),
errorLine: 6,
errorPos: 8,
},
{
name: "leafnode proxy with WSS URL but no TLS config",
config: `
leafnodes {
remotes = [
{
url: "wss://127.0.0.1:7422"
proxy {
url: "http://proxy.example.com:8080"
}
}
]
}
`,
err: errors.New("proxy is configured but remote URL wss://127.0.0.1:7422 requires TLS and no TLS configuration is provided"),
errorLine: 6,
errorPos: 8,
},
}

checkConfig := func(config string) error {
Expand Down
160 changes: 159 additions & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func validateLeafNode(o *Options) error {

// If a remote has a websocket scheme, all need to have it.
for _, rcfg := range o.LeafNode.Remotes {
// Validate proxy configuration
if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
return err
}

if len(rcfg.URLs) >= 2 {
firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
for i := 1; i < len(rcfg.URLs); i++ {
Expand Down Expand Up @@ -369,6 +374,60 @@ func validateLeafNodeAuthOptions(o *Options) error {
return nil
}

func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
var warnings []string

if remote.Proxy.URL == _EMPTY_ {
return warnings, nil
}

proxyURL, err := url.Parse(remote.Proxy.URL)
if err != nil {
return warnings, fmt.Errorf("invalid proxy URL: %v", err)
}

if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
}

if proxyURL.Host == _EMPTY_ {
return warnings, fmt.Errorf("proxy URL must specify a host")
}

if remote.Proxy.Timeout < 0 {
return warnings, fmt.Errorf("proxy timeout must be >= 0")
}

if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
}

if len(remote.URLs) > 0 {
hasWebSocketURL := false
hasNonWebSocketURL := false

for _, remoteURL := range remote.URLs {
if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
hasWebSocketURL = true
if (remoteURL.Scheme == wsSchemePrefixTLS) &&
remote.TLSConfig == nil && !remote.TLS {
return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
}
} else {
hasNonWebSocketURL = true
}
}

if !hasWebSocketURL {
warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
} else if hasNonWebSocketURL {
warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
}
}

return warnings, nil
}

// Update remote LeafNode TLS configurations after a config reload.
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
max := len(opts.LeafNode.Remotes)
Expand Down Expand Up @@ -502,6 +561,67 @@ func (s *Server) setLeafNodeNonExportedOptions() {

const sharedSysAccDelay = 250 * time.Millisecond

// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would really rather us not build and parse HTTP requests/responses by hand here when the Go standard library has the ability to do this for us, i.e. something like this:

	conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to proxy: %w", err)
	}

	req := &http.Request{
		Method: http.MethodConnect,
		URL:    &url.URL{Opaque: targetHost}, // Opaque is required here
		Host:   targetHost,
		Header: make(http.Header),
	}
 	// do we need auth here?
	req.Header.Set("Proxy-Authorization", "Basic " + ...)

	if err := req.Write(conn); err != nil {
		conn.Close()
		return nil, fmt.Errorf("req.Write: %w", err)
	}

	resp, err := http.ReadResponse(bufio.NewReader(conn), req)
	if err != nil {
		conn.Close()
		return nil, fmt.Errorf("http.ReadResponse: %w", err)
	}
	if resp.StatusCode != http.StatusOK {
		conn.Close()
		return nil, fmt.Errorf("status code: %s", resp.Status)
	}

	// now use "conn"

We already have net/http imported so this is fine to use.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@neilalexander I have changed this to now use the standard http library. Thx

proxyAddr, err := url.Parse(proxyURL)
Comment thread
kozlovic marked this conversation as resolved.
if err != nil {
// This should not happen since proxy URL is validated during configuration parsing
return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
}

// Connect to the proxy server
conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %v", err)
}

// Set deadline for the entire proxy handshake
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to set deadline: %v", err)
}

req := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
Host: targetHost,
Header: make(http.Header),
}

// Add proxy authentication if provided
if username != "" && password != "" {
req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
}

if err := req.Write(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
}

resp, err := http.ReadResponse(bufio.NewReader(conn), req)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to read proxy response: %v", err)
}

if resp.StatusCode != http.StatusOK {
resp.Body.Close()
conn.Close()
return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
}

// Close the response body
resp.Body.Close()

// Clear the deadline now that we've finished the proxy handshake
if err := conn.SetDeadline(time.Time{}); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to clear deadline: %v", err)
}

return conn, nil
}

func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
defer s.grWG.Done()

Expand Down Expand Up @@ -541,6 +661,19 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)

const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"

// Capture proxy configuration once before the loop with proper locking
remote.RLock()
proxyURL := remote.Proxy.URL
proxyUsername := remote.Proxy.Username
proxyPassword := remote.Proxy.Password
proxyTimeout := remote.Proxy.Timeout
remote.RUnlock()

// Set default proxy timeout if not specified
if proxyTimeout == 0 {
proxyTimeout = dialTimeout
}

attempts := 0

for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
Expand All @@ -557,7 +690,25 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
err = ErrLeafNodeDisabled
} else {
s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
conn, err = natsDialTimeout("tcp", url, dialTimeout)

// Check if proxy is configured first, then check if URL supports it
if proxyURL != _EMPTY_ && isWSURL(rURL) {
// Use proxy for WebSocket connections - use original hostname, resolved IP for connection
targetHost := rURL.Host
// If URL doesn't include port, add the default port for the scheme
if rURL.Port() == _EMPTY_ {
defaultPort := "80"
if rURL.Scheme == wsSchemePrefixTLS {
defaultPort = "443"
}
targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
}

conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
} else {
// Direct connection
conn, err = natsDialTimeout("tcp", url, dialTimeout)
}
}
}
if err != nil {
Expand Down Expand Up @@ -1287,6 +1438,13 @@ func (c *client) processLeafnodeInfo(info *Info) {
// otherwise if there is no TLS configuration block for the remote,
// the solicit side will not attempt to perform the TLS handshake.
if firstINFO && info.TLSRequired {
// Check for TLS/proxy configuration mismatch
if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
c.mu.Unlock()
c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
c.closeConnection(TLSHandshakeError)
return
}
remote.TLS = true
}
if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
Expand Down
Loading