diff --git a/cmd/cloud_sql_proxy/cloud_sql_proxy.go b/cmd/cloud_sql_proxy/cloud_sql_proxy.go index 78e1859a1..0b7813929 100644 --- a/cmd/cloud_sql_proxy/cloud_sql_proxy.go +++ b/cmd/cloud_sql_proxy/cloud_sql_proxy.go @@ -570,9 +570,14 @@ func runProxy() int { var hc *healthcheck.Server if *useHTTPHealthCheck { - hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort) + // Extract a list of all instances specified statically. List is empty when in fuse mode. + var insts []string + for _, cfg := range cfgs { + insts = append(insts, cfg.Instance) + } + hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort, insts) if err != nil { - logging.Errorf("Could not initialize health check server: %v", err) + logging.Errorf("[Health Check] Could not initialize health check server: %v", err) return 1 } defer hc.Close(ctx) diff --git a/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go b/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go index 3bb956af8..2a8ccf2ba 100644 --- a/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go +++ b/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go @@ -44,11 +44,13 @@ type Server struct { port string // srv is a pointer to the HTTP server used to communicate proxy health. srv *http.Server + // instances is a list of all instances specified statically (e.g. as flags to the binary) + instances []string } // NewServer initializes a Server and exposes HTTP endpoints used to // communicate proxy health. -func NewServer(c *proxy.Client, port string) (*Server, error) { +func NewServer(c *proxy.Client, port string, staticInst []string) (*Server, error) { mux := http.NewServeMux() srv := &http.Server{ @@ -57,10 +59,11 @@ func NewServer(c *proxy.Client, port string) (*Server, error) { } hcServer := &Server{ - started: make(chan struct{}), - once: &sync.Once{}, - port: port, - srv: srv, + started: make(chan struct{}), + once: &sync.Once{}, + port: port, + srv: srv, + instances: staticInst, } mux.HandleFunc(startupPath, func(w http.ResponseWriter, _ *http.Request) { @@ -74,7 +77,9 @@ func NewServer(c *proxy.Client, port string) (*Server, error) { }) mux.HandleFunc(readinessPath, func(w http.ResponseWriter, _ *http.Request) { - if !isReady(c, hcServer) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if !isReady(ctx, c, hcServer) { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte("error")) return @@ -100,7 +105,7 @@ func NewServer(c *proxy.Client, port string) (*Server, error) { go func() { if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { - logging.Errorf("Failed to start health check HTTP server: %v", err) + logging.Errorf("[Health Check] Failed to serve: %v", err) } }() @@ -132,22 +137,51 @@ func isLive() bool { return true } -// isReady will check the following criteria before determining whether the -// proxy is ready for new connections. +// isReady will check the following criteria: // 1. Finished starting up / been sent the 'Ready for Connections' log. -// 2. Not yet hit the MaxConnections limit, if applicable. -func isReady(c *proxy.Client, s *Server) bool { - // Not ready until we reach the 'Ready for Connections' log +// 2. Not yet hit the MaxConnections limit, if set. +// 3. Able to dial all specified instances without error. +func isReady(ctx context.Context, c *proxy.Client, s *Server) bool { + // Not ready until we reach the 'Ready for Connections' log. if !s.proxyStarted() { - logging.Errorf("Readiness failed because proxy has not finished starting up.") + logging.Errorf("[Health Check] Readiness failed because proxy has not finished starting up.") return false } // Not ready if the proxy is at the optional MaxConnections limit. if !c.AvailableConn() { - logging.Errorf("Readiness failed because proxy has reached the maximum connections limit (%d).", c.MaxConnections) + logging.Errorf("[Health Check] Readiness failed because proxy has reached the maximum connections limit (%v).", c.MaxConnections) return false } - return true + // Not ready if one or more instances cannot be dialed. + instances := s.instances + if s.instances == nil { // Proxy is in fuse mode. + instances = c.GetInstances() + } + + canDial := true + var once sync.Once + var wg sync.WaitGroup + + for _, inst := range instances { + wg.Add(1) + go func(inst string) { + defer wg.Done() + conn, err := c.DialContext(ctx, inst) + if err != nil { + logging.Errorf("[Health Check] Readiness failed because proxy couldn't connect to %q: %v", inst, err) + once.Do(func() { canDial = false }) + return + } + + err = conn.Close() + if err != nil { + logging.Errorf("[Health Check] Readiness: error while closing connection: %v", err) + } + }(inst) + } + wg.Wait() + + return canDial } diff --git a/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go b/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go index f0b2a8d3f..9e08143a4 100644 --- a/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go +++ b/cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go @@ -16,8 +16,13 @@ package healthcheck_test import ( "context" + "crypto/tls" + "crypto/x509" + "errors" + "net" "net/http" "testing" + "time" "github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck" "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy" @@ -30,9 +35,23 @@ const ( testPort = "8090" ) +type fakeCertSource struct{} + +func (cs *fakeCertSource) Local(instance string) (tls.Certificate, error) { + return tls.Certificate{ + Leaf: &x509.Certificate{ + NotAfter: time.Date(9999, 0, 0, 0, 0, 0, 0, time.UTC), + }, + }, nil +} + +func (cs *fakeCertSource) Remote(instance string) (cert *x509.Certificate, addr, name, version string, err error) { + return &x509.Certificate{}, "fake address", "fake name", "fake version", nil +} + // Test to verify that when the proxy client is up, the liveness endpoint writes http.StatusOK. func TestLiveness(t *testing.T) { - s, err := healthcheck.NewServer(&proxy.Client{}, testPort) + s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil) if err != nil { t.Fatalf("Could not initialize health check: %v", err) } @@ -43,62 +62,62 @@ func TestLiveness(t *testing.T) { t.Fatalf("HTTP GET failed: %v", err) } if resp.StatusCode != http.StatusOK { - t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK) + t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode) } } -// Test to verify that when startup has NOT finished, the startup and readiness endpoints write -// http.StatusServiceUnavailable. -func TestStartupFail(t *testing.T) { - s, err := healthcheck.NewServer(&proxy.Client{}, testPort) +// Test to verify that when startup HAS finished (and MaxConnections limit not specified), +// the startup and readiness endpoints write http.StatusOK. +func TestStartupPass(t *testing.T) { + s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil) if err != nil { - t.Fatalf("Could not initialize health check: %v\n", err) + t.Fatalf("Could not initialize health check: %v", err) } defer s.Close(context.Background()) + // Simulate the proxy client completing startup. + s.NotifyStarted() + resp, err := http.Get("http://localhost:" + testPort + startupPath) if err != nil { - t.Fatalf("HTTP GET failed: %v\n", err) + t.Fatalf("HTTP GET failed: %v", err) } - if resp.StatusCode != http.StatusServiceUnavailable { - t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusServiceUnavailable) + if resp.StatusCode != http.StatusOK { + t.Errorf("%v: want %v, got %v", startupPath, http.StatusOK, resp.StatusCode) } resp, err = http.Get("http://localhost:" + testPort + readinessPath) if err != nil { - t.Fatalf("HTTP GET failed: %v\n", err) + t.Fatalf("HTTP GET failed: %v", err) } - if resp.StatusCode != http.StatusServiceUnavailable { - t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusServiceUnavailable) + if resp.StatusCode != http.StatusOK { + t.Errorf("%v: want %v, got %v", readinessPath, http.StatusOK, resp.StatusCode) } } -// Test to verify that when startup HAS finished (and MaxConnections limit not specified), -// the startup and readiness endpoints write http.StatusOK. -func TestStartupPass(t *testing.T) { - s, err := healthcheck.NewServer(&proxy.Client{}, testPort) +// Test to verify that when startup has NOT finished, the startup and readiness endpoints write +// http.StatusServiceUnavailable. +func TestStartupFail(t *testing.T) { + s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil) if err != nil { - t.Fatalf("Could not initialize health check: %v\n", err) + t.Fatalf("Could not initialize health check: %v", err) } defer s.Close(context.Background()) - // Simulate the proxy client completing startup. - s.NotifyStarted() - resp, err := http.Get("http://localhost:" + testPort + startupPath) if err != nil { - t.Fatalf("HTTP GET failed: %v\n", err) + t.Fatalf("HTTP GET failed: %v", err) } - if resp.StatusCode != http.StatusOK { - t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusOK) + if resp.StatusCode != http.StatusServiceUnavailable { + t.Errorf("%v: want %v, got %v", startupPath, http.StatusOK, resp.StatusCode) } resp, err = http.Get("http://localhost:" + testPort + readinessPath) if err != nil { - t.Fatalf("HTTP GET failed: %v\n", err) + t.Fatalf("HTTP GET failed: %v", err) } - if resp.StatusCode != http.StatusOK { - t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusOK) + if resp.StatusCode != http.StatusServiceUnavailable { + t.Errorf("%v: want %v, got %v", readinessPath, http.StatusOK, resp.StatusCode) } } @@ -108,7 +127,7 @@ func TestMaxConnectionsReached(t *testing.T) { c := &proxy.Client{ MaxConnections: 1, } - s, err := healthcheck.NewServer(c, testPort) + s, err := healthcheck.NewServer(c, testPort, nil) if err != nil { t.Fatalf("Could not initialize health check: %v", err) } @@ -122,14 +141,51 @@ func TestMaxConnectionsReached(t *testing.T) { t.Fatalf("HTTP GET failed: %v", err) } if resp.StatusCode != http.StatusServiceUnavailable { - t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusServiceUnavailable) + t.Errorf("want %v, got %v", http.StatusServiceUnavailable, resp.StatusCode) + } +} + +// Test to verify that when dialing instance(s) returns an error, the readiness endpoint +// writes http.StatusServiceUnavailable. +func TestDialFail(t *testing.T) { + tests := map[string]struct { + insts []string + }{ + "Single instance": {insts: []string{"project:region:instance"}}, + "Multiple instances": {insts: []string{"project:region:instance-1", "project:region:instance-2", "project:region:instance-3"}}, + } + + c := &proxy.Client{ + Certs: &fakeCertSource{}, + Dialer: func(string, string) (net.Conn, error) { + return nil, errors.New("error") + }, + } + + for name, test := range tests { + func() { + s, err := healthcheck.NewServer(c, testPort, test.insts) + if err != nil { + t.Fatalf("%v: Could not initialize health check: %v", name, err) + } + defer s.Close(context.Background()) + s.NotifyStarted() + + resp, err := http.Get("http://localhost:" + testPort + readinessPath) + if err != nil { + t.Fatalf("%v: HTTP GET failed: %v", name, err) + } + if resp.StatusCode != http.StatusServiceUnavailable { + t.Errorf("want %v, got %v", http.StatusServiceUnavailable, resp.StatusCode) + } + }() } } // Test to verify that after closing a healthcheck, its liveness endpoint serves // an error. func TestCloseHealthCheck(t *testing.T) { - s, err := healthcheck.NewServer(&proxy.Client{}, testPort) + s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil) if err != nil { t.Fatalf("Could not initialize health check: %v", err) } @@ -140,7 +196,7 @@ func TestCloseHealthCheck(t *testing.T) { t.Fatalf("HTTP GET failed: %v", err) } if resp.StatusCode != http.StatusOK { - t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK) + t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode) } err = s.Close(context.Background()) diff --git a/proxy/proxy/client.go b/proxy/proxy/client.go index b2359e7f5..a8f204907 100644 --- a/proxy/proxy/client.go +++ b/proxy/proxy/client.go @@ -556,6 +556,19 @@ func ParseInstanceConnectionName(instance string) (string, string, string, []str return proj, region, name, args, nil } +// GetInstances iterates through the client cache, returning a list of previously dialed +// instances. +func (c *Client) GetInstances() []string { + var insts []string + c.cacheL.Lock() + cfgCache := c.cfgCache + c.cacheL.Unlock() + for i := range cfgCache { + insts = append(insts, i) + } + return insts +} + // AvailableConn returns false if MaxConnections has been reached, true otherwise. // When MaxConnections is 0, there is no limit. func (c *Client) AvailableConn() bool { diff --git a/proxy/proxy/client_test.go b/proxy/proxy/client_test.go index 11d270fa3..54fecf33e 100644 --- a/proxy/proxy/client_test.go +++ b/proxy/proxy/client_test.go @@ -29,7 +29,7 @@ import ( "unsafe" ) -const instance = "instance-name" +const instance = "project:region:instance" var ( sentinelError = errors.New("sentinel error") diff --git a/tests/alldb_test.go b/tests/alldb_test.go new file mode 100644 index 000000000..4fea9b568 --- /dev/null +++ b/tests/alldb_test.go @@ -0,0 +1,67 @@ +// Copyright 2021 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// alldb_test.go contains end to end tests that require all environment variables to be defined. +package tests + +import ( + "context" + "fmt" + "net/http" + "testing" +) + +// requireAllVars skips the given test if at least one environment variable is undefined. +func requireAllVars(t *testing.T) { + var allVars []string + allVars = append(allVars, *mysqlConnName, *mysqlUser, *mysqlPass, *mysqlDb) + allVars = append(allVars, *postgresConnName, *postgresUser, *postgresPass, *postgresDb) + allVars = append(allVars, *sqlserverConnName, *sqlserverUser, *sqlserverPass, *sqlserverDb) + + for _, envVar := range allVars { + if envVar == "" { + t.Skip("skipping test, all environment variable must be defined") + } + } +} + +// Test to verify that when a proxy client serves multiple instances that can all be successfully dialed, +// the health check readiness endpoint serves http.StatusOK. +func TestMultiInstanceDial(t *testing.T) { + requireAllVars(t) + ctx := context.Background() + + var args []string + args = append(args, fmt.Sprintf("-instances=%s=tcp:%d,%s=tcp:%d,%s=tcp:%d", *mysqlConnName, mysqlPort, *postgresConnName, postgresPort, *sqlserverConnName, sqlserverPort)) + args = append(args, "-use_http_health_check") + + // Start the proxy. + p, err := StartProxy(ctx, args...) + if err != nil { + t.Fatalf("unable to start proxy: %v", err) + } + defer p.Close() + output, err := p.WaitForServe(ctx) + if err != nil { + t.Fatalf("unable to verify proxy was serving: %s \n %s", err, output) + } + + resp, err := http.Get("http://localhost:" + testPort + readinessPath) + if err != nil { + t.Fatalf("HTTP GET failed: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode) + } +} diff --git a/tests/healthcheck_test.go b/tests/healthcheck_test.go new file mode 100644 index 000000000..2616b2c98 --- /dev/null +++ b/tests/healthcheck_test.go @@ -0,0 +1,56 @@ +// Copyright 2021 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// healthcheck_test.go provides some helpers for end to end health check server tests. +package tests + +import ( + "context" + "fmt" + "net/http" + "testing" +) + +const ( + readinessPath = "/readiness" + testPort = "8090" +) + +// singleInstanceDial verifies that when a proxy client serves the given instance, the readiness +// endpoint serves http.StatusOK. +func singleInstanceDial(t *testing.T, connName string, port int) { + ctx := context.Background() + + var args []string + args = append(args, fmt.Sprintf("-instances=%s=tcp:%d", connName, port), "-use_http_health_check") + + // Start the proxy. + p, err := StartProxy(ctx, args...) + if err != nil { + t.Fatalf("unable to start proxy: %v", err) + } + defer p.Close() + output, err := p.WaitForServe(ctx) + if err != nil { + t.Fatalf("unable to verify proxy was serving: %s \n %s", err, output) + } + + resp, err := http.Get("http://localhost:" + testPort + readinessPath) + if err != nil { + t.Fatalf("HTTP GET failed: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode) + } +} diff --git a/tests/mysql_test.go b/tests/mysql_test.go index 79484ddc9..f3957e487 100644 --- a/tests/mysql_test.go +++ b/tests/mysql_test.go @@ -95,3 +95,14 @@ func TestMysqlConnLimit(t *testing.T) { } proxyConnLimitTest(t, *mysqlConnName, "mysql", cfg.FormatDSN(), mysqlPort) } + +// Test to verify that when a proxy client serves one mysql instance that can be +// dialed successfully, the health check readiness endpoint serves http.StatusOK. +func TestMysqlDial(t *testing.T) { + switch "" { + case *mysqlConnName: + t.Fatal("'mysql_conn_name' not set") + } + + singleInstanceDial(t, *mysqlConnName, mysqlPort) +} diff --git a/tests/postgres_test.go b/tests/postgres_test.go index c766f92ee..881ce09ae 100644 --- a/tests/postgres_test.go +++ b/tests/postgres_test.go @@ -136,3 +136,14 @@ func TestPostgresHook(t *testing.T) { t.Fatalf("query failed: %s", err) } } + +// Test to verify that when a proxy client serves one postgres instance that can be +// dialed successfully, the health check readiness endpoint serves http.StatusOK. +func TestPostgresDial(t *testing.T) { + switch "" { + case *postgresConnName: + t.Fatal("'postgres_conn_name' not set") + } + + singleInstanceDial(t, *postgresConnName, postgresPort) +} diff --git a/tests/sqlserver_test.go b/tests/sqlserver_test.go index 5c2c58524..837f379c2 100644 --- a/tests/sqlserver_test.go +++ b/tests/sqlserver_test.go @@ -61,3 +61,14 @@ func TestSqlserverConnLimit(t *testing.T) { dsn := fmt.Sprintf("sqlserver://%s:%s@127.0.0.1:%d/%s", *sqlserverUser, *sqlserverPass, sqlserverPort, *sqlserverDb) proxyConnLimitTest(t, *sqlserverConnName, "sqlserver", dsn, sqlserverPort) } + +// Test to verify that when a proxy client serves one sqlserver instance that can be +// dialed successfully, the health check readiness endpoint serves http.StatusOK. +func TestSqlserverDial(t *testing.T) { + switch "" { + case *sqlserverConnName: + t.Fatal("'sqlserver_conn_name' not set") + } + + singleInstanceDial(t, *sqlserverConnName, sqlserverPort) +}