Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dc5ae8b
Add instance dialing to health check
tifftoff Aug 5, 2021
9c82b43
add flag to skip these tests
tifftoff Aug 6, 2021
80d3e26
add healthcheck end to end test environment variables
tifftoff Aug 6, 2021
563f633
change instance name format
tifftoff Aug 6, 2021
5bf0f83
remove new env variables, use old env variables
tifftoff Aug 6, 2021
759baf9
requested changes
tifftoff Aug 6, 2021
6988576
convert dialing tests into table driven tests
tifftoff Aug 6, 2021
952936a
remove old example yaml file
tifftoff Aug 6, 2021
dde5234
remove build tags
tifftoff Aug 6, 2021
75d078f
Formatting
monazhn Aug 9, 2021
e81703c
Fix rebase
monazhn Aug 9, 2021
989d6d2
revert to separate tests
tifftoff Aug 6, 2021
58bc735
add check for if context is cancelled
tifftoff Aug 9, 2021
7785530
drop diff
tifftoff Aug 9, 2021
ff6c4e1
remove extra line
tifftoff Aug 9, 2021
09f76ab
add sql server to multi instance dial test
tifftoff Aug 10, 2021
6ff62f1
fix error messages
tifftoff Aug 10, 2021
5ea8df9
add single instance dialing tests for postgres and sql
tifftoff Aug 10, 2021
7f82670
reduce logging
tifftoff Aug 10, 2021
ae6cb39
add readiness indicator to error log
tifftoff Aug 10, 2021
eb52499
dial instances asynchronously
tifftoff Aug 10, 2021
48adf5f
change Dial to DialContext
tifftoff Aug 10, 2021
48a2422
add WaitGroup to wait for asynchronous dials to finish
tifftoff Aug 10, 2021
21fae02
add mutex to protect canDial
tifftoff Aug 10, 2021
ff9679f
Merge branch 'main' into my-cool-branch
enocom Aug 10, 2021
83af920
fix WaitGroup usage for asynchronous dialing
tifftoff Aug 10, 2021
348c5b7
move single isntance dialing tests
tifftoff Aug 11, 2021
10d9444
move TestMultiInstanceDial, add function to skip tests
tifftoff Aug 11, 2021
829c80e
remove unused helper function
tifftoff Aug 11, 2021
2224f1a
requested changes
tifftoff Aug 11, 2021
7f4ae7a
Merge branch 'main' into my-cool-branch
tifftoff Aug 11, 2021
5918679
remove extra line
tifftoff Aug 11, 2021
749ad88
use early return and defer
tifftoff Aug 11, 2021
911e79b
Merge branch 'main' into my-cool-branch
enocom Aug 11, 2021
cfd5259
whitespace
tifftoff Aug 11, 2021
afc1a89
use once instead of mutex
tifftoff Aug 12, 2021
11d48d8
have incoming request to readiness create a context
tifftoff Aug 12, 2021
1926e75
replace exit with return
tifftoff Aug 12, 2021
59ad5ee
use StartProxy
tifftoff Aug 12, 2021
126a87d
make ctx first argument
tifftoff Aug 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/cloud_sql_proxy/cloud_sql_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,14 @@ func runProxy() int {

var hc *healthcheck.Server
if *useHTTPHealthCheck {
hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort)
// Extract a list of all instances specified statically. List is empty when in fuse mode.
var insts []string
for _, cfg := range cfgs {
insts = append(insts, cfg.Instance)
}
hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort, insts)
if err != nil {
logging.Errorf("Could not initialize health check server: %v", err)
logging.Errorf("[Health Check] Could not initialize health check server: %v", err)
return 1
}
defer hc.Close(ctx)
Expand Down
64 changes: 49 additions & 15 deletions cmd/cloud_sql_proxy/internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ type Server struct {
port string
// srv is a pointer to the HTTP server used to communicate proxy health.
srv *http.Server
// instances is a list of all instances specified statically (e.g. as flags to the binary)
instances []string
}

// NewServer initializes a Server and exposes HTTP endpoints used to
// communicate proxy health.
func NewServer(c *proxy.Client, port string) (*Server, error) {
func NewServer(c *proxy.Client, port string, staticInst []string) (*Server, error) {
mux := http.NewServeMux()

srv := &http.Server{
Expand All @@ -57,10 +59,11 @@ func NewServer(c *proxy.Client, port string) (*Server, error) {
}

hcServer := &Server{
started: make(chan struct{}),
once: &sync.Once{},
port: port,
srv: srv,
started: make(chan struct{}),
once: &sync.Once{},
port: port,
srv: srv,
instances: staticInst,
}

mux.HandleFunc(startupPath, func(w http.ResponseWriter, _ *http.Request) {
Expand All @@ -74,7 +77,9 @@ func NewServer(c *proxy.Client, port string) (*Server, error) {
})

mux.HandleFunc(readinessPath, func(w http.ResponseWriter, _ *http.Request) {
if !isReady(c, hcServer) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if !isReady(ctx, c, hcServer) {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("error"))
return
Expand All @@ -100,7 +105,7 @@ func NewServer(c *proxy.Client, port string) (*Server, error) {

go func() {
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
logging.Errorf("Failed to start health check HTTP server: %v", err)
logging.Errorf("[Health Check] Failed to serve: %v", err)
}
}()

Expand Down Expand Up @@ -132,22 +137,51 @@ func isLive() bool {
return true
}

// isReady will check the following criteria before determining whether the
// proxy is ready for new connections.
// isReady will check the following criteria:
// 1. Finished starting up / been sent the 'Ready for Connections' log.
// 2. Not yet hit the MaxConnections limit, if applicable.
func isReady(c *proxy.Client, s *Server) bool {
// Not ready until we reach the 'Ready for Connections' log
// 2. Not yet hit the MaxConnections limit, if set.
// 3. Able to dial all specified instances without error.
func isReady(ctx context.Context, c *proxy.Client, s *Server) bool {
// Not ready until we reach the 'Ready for Connections' log.
if !s.proxyStarted() {
logging.Errorf("Readiness failed because proxy has not finished starting up.")
logging.Errorf("[Health Check] Readiness failed because proxy has not finished starting up.")
return false
}

// Not ready if the proxy is at the optional MaxConnections limit.
if !c.AvailableConn() {
logging.Errorf("Readiness failed because proxy has reached the maximum connections limit (%d).", c.MaxConnections)
logging.Errorf("[Health Check] Readiness failed because proxy has reached the maximum connections limit (%v).", c.MaxConnections)
return false
}

return true
// Not ready if one or more instances cannot be dialed.
instances := s.instances
if s.instances == nil { // Proxy is in fuse mode.
instances = c.GetInstances()
}

canDial := true
var once sync.Once
var wg sync.WaitGroup

for _, inst := range instances {
wg.Add(1)
go func(inst string) {
defer wg.Done()
conn, err := c.DialContext(ctx, inst)
if err != nil {
logging.Errorf("[Health Check] Readiness failed because proxy couldn't connect to %q: %v", inst, err)
once.Do(func() { canDial = false })
return
}

err = conn.Close()
if err != nil {
logging.Errorf("[Health Check] Readiness: error while closing connection: %v", err)
}
}(inst)
}
wg.Wait()

return canDial
}
118 changes: 87 additions & 31 deletions cmd/cloud_sql_proxy/internal/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ package healthcheck_test

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"net"
"net/http"
"testing"
"time"

"github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck"
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
Expand All @@ -30,9 +35,23 @@ const (
testPort = "8090"
)

type fakeCertSource struct{}

func (cs *fakeCertSource) Local(instance string) (tls.Certificate, error) {
return tls.Certificate{
Leaf: &x509.Certificate{
NotAfter: time.Date(9999, 0, 0, 0, 0, 0, 0, time.UTC),
},
}, nil
}

func (cs *fakeCertSource) Remote(instance string) (cert *x509.Certificate, addr, name, version string, err error) {
return &x509.Certificate{}, "fake address", "fake name", "fake version", nil
}

// Test to verify that when the proxy client is up, the liveness endpoint writes http.StatusOK.
func TestLiveness(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
Expand All @@ -43,62 +62,62 @@ func TestLiveness(t *testing.T) {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK)
t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode)
}
}

// Test to verify that when startup has NOT finished, the startup and readiness endpoints write
// http.StatusServiceUnavailable.
func TestStartupFail(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
// Test to verify that when startup HAS finished (and MaxConnections limit not specified),
// the startup and readiness endpoints write http.StatusOK.
func TestStartupPass(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v\n", err)
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

// Simulate the proxy client completing startup.
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + startupPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusServiceUnavailable)
if resp.StatusCode != http.StatusOK {
t.Errorf("%v: want %v, got %v", startupPath, http.StatusOK, resp.StatusCode)
}

resp, err = http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusServiceUnavailable)
if resp.StatusCode != http.StatusOK {
t.Errorf("%v: want %v, got %v", readinessPath, http.StatusOK, resp.StatusCode)
}
}

// Test to verify that when startup HAS finished (and MaxConnections limit not specified),
// the startup and readiness endpoints write http.StatusOK.
func TestStartupPass(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
// Test to verify that when startup has NOT finished, the startup and readiness endpoints write
// http.StatusServiceUnavailable.
func TestStartupFail(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v\n", err)
t.Fatalf("Could not initialize health check: %v", err)
}
defer s.Close(context.Background())

// Simulate the proxy client completing startup.
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + startupPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%v returned status code %v instead of %v", startupPath, resp.StatusCode, http.StatusOK)
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v: want %v, got %v", startupPath, http.StatusOK, resp.StatusCode)
}

resp, err = http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("HTTP GET failed: %v\n", err)
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("%v returned status code %v instead of %v", readinessPath, resp.StatusCode, http.StatusOK)
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("%v: want %v, got %v", readinessPath, http.StatusOK, resp.StatusCode)
}
}

Expand All @@ -108,7 +127,7 @@ func TestMaxConnectionsReached(t *testing.T) {
c := &proxy.Client{
MaxConnections: 1,
}
s, err := healthcheck.NewServer(c, testPort)
s, err := healthcheck.NewServer(c, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
Expand All @@ -122,14 +141,51 @@ func TestMaxConnectionsReached(t *testing.T) {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusServiceUnavailable)
t.Errorf("want %v, got %v", http.StatusServiceUnavailable, resp.StatusCode)
}
}

// Test to verify that when dialing instance(s) returns an error, the readiness endpoint
// writes http.StatusServiceUnavailable.
func TestDialFail(t *testing.T) {
tests := map[string]struct {
insts []string
}{
"Single instance": {insts: []string{"project:region:instance"}},
"Multiple instances": {insts: []string{"project:region:instance-1", "project:region:instance-2", "project:region:instance-3"}},
}

c := &proxy.Client{
Certs: &fakeCertSource{},
Dialer: func(string, string) (net.Conn, error) {
return nil, errors.New("error")
},
}

for name, test := range tests {
func() {
s, err := healthcheck.NewServer(c, testPort, test.insts)
if err != nil {
t.Fatalf("%v: Could not initialize health check: %v", name, err)
}
defer s.Close(context.Background())
s.NotifyStarted()

resp, err := http.Get("http://localhost:" + testPort + readinessPath)
if err != nil {
t.Fatalf("%v: HTTP GET failed: %v", name, err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Errorf("want %v, got %v", http.StatusServiceUnavailable, resp.StatusCode)
}
}()
}
}

// Test to verify that after closing a healthcheck, its liveness endpoint serves
// an error.
func TestCloseHealthCheck(t *testing.T) {
s, err := healthcheck.NewServer(&proxy.Client{}, testPort)
s, err := healthcheck.NewServer(&proxy.Client{}, testPort, nil)
if err != nil {
t.Fatalf("Could not initialize health check: %v", err)
}
Expand All @@ -140,7 +196,7 @@ func TestCloseHealthCheck(t *testing.T) {
t.Fatalf("HTTP GET failed: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Got status code %v instead of %v", resp.StatusCode, http.StatusOK)
t.Errorf("want %v, got %v", http.StatusOK, resp.StatusCode)
}

err = s.Close(context.Background())
Expand Down
13 changes: 13 additions & 0 deletions proxy/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ func ParseInstanceConnectionName(instance string) (string, string, string, []str
return proj, region, name, args, nil
}

// GetInstances iterates through the client cache, returning a list of previously dialed
// instances.
func (c *Client) GetInstances() []string {
var insts []string
c.cacheL.Lock()
cfgCache := c.cfgCache
c.cacheL.Unlock()
for i := range cfgCache {
insts = append(insts, i)
}
return insts
}

// AvailableConn returns false if MaxConnections has been reached, true otherwise.
// When MaxConnections is 0, there is no limit.
func (c *Client) AvailableConn() bool {
Expand Down
2 changes: 1 addition & 1 deletion proxy/proxy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"unsafe"
)

const instance = "instance-name"
const instance = "project:region:instance"

var (
sentinelError = errors.New("sentinel error")
Expand Down
Loading