Skip to content

Commit

Permalink
Merge branch 'master' into 1.8_http2_push
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo authored Nov 21, 2016
2 parents 03f938e + 9d398ad commit aa3adac
Show file tree
Hide file tree
Showing 16 changed files with 521 additions and 79 deletions.
60 changes: 50 additions & 10 deletions caddyhttp/fastcgi/dialer.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
package fastcgi

import "sync"
import (
"errors"
"sync"
"sync/atomic"
"time"
)

type dialer interface {
Dial() (*FCGIClient, error)
Close(*FCGIClient) error
Dial() (Client, error)
Close(Client) error
}

// basicDialer is a basic dialer that wraps default fcgi functions.
type basicDialer struct {
network, address string
network string
address string
timeout time.Duration
}

func (b basicDialer) Dial() (*FCGIClient, error) { return Dial(b.network, b.address) }
func (b basicDialer) Close(c *FCGIClient) error { return c.Close() }
func (b basicDialer) Dial() (Client, error) { return Dial(b.network, b.address, b.timeout) }
func (b basicDialer) Close(c Client) error { return c.Close() }

// persistentDialer keeps a pool of fcgi connections.
// connections are not closed after use, rather added back to the pool for reuse.
type persistentDialer struct {
size int
network string
address string
pool []*FCGIClient
timeout time.Duration
pool []Client
sync.Mutex
}

func (p *persistentDialer) Dial() (*FCGIClient, error) {
func (p *persistentDialer) Dial() (Client, error) {
p.Lock()
// connection is available, return first one.
if len(p.pool) > 0 {
Expand All @@ -39,10 +47,10 @@ func (p *persistentDialer) Dial() (*FCGIClient, error) {
p.Unlock()

// no connection available, create new one
return Dial(p.network, p.address)
return Dial(p.network, p.address, p.timeout)
}

func (p *persistentDialer) Close(client *FCGIClient) error {
func (p *persistentDialer) Close(client Client) error {
p.Lock()
if len(p.pool) < p.size {
// pool is not full yet, add connection for reuse
Expand All @@ -57,3 +65,35 @@ func (p *persistentDialer) Close(client *FCGIClient) error {
// otherwise, close the connection.
return client.Close()
}

type loadBalancingDialer struct {
dialers []dialer
current int64
}

func (m *loadBalancingDialer) Dial() (Client, error) {
nextDialerIndex := atomic.AddInt64(&m.current, 1) % int64(len(m.dialers))
currentDialer := m.dialers[nextDialerIndex]

client, err := currentDialer.Dial()

if err != nil {
return nil, err
}

return &dialerAwareClient{Client: client, dialer: currentDialer}, nil
}

func (m *loadBalancingDialer) Close(c Client) error {
// Close the client according to dialer behaviour
if da, ok := c.(*dialerAwareClient); ok {
return da.dialer.Close(c)
}

return errors.New("Cannot close client")
}

type dialerAwareClient struct {
Client
dialer dialer
}
126 changes: 126 additions & 0 deletions caddyhttp/fastcgi/dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package fastcgi

import (
"errors"
"testing"
)

func TestLoadbalancingDialer(t *testing.T) {
// given
runs := 100
mockDialer1 := new(mockDialer)
mockDialer2 := new(mockDialer)

dialer := &loadBalancingDialer{dialers: []dialer{mockDialer1, mockDialer2}}

// when
for i := 0; i < runs; i++ {
client, err := dialer.Dial()
dialer.Close(client)

if err != nil {
t.Errorf("Expected error to be nil")
}
}

// then
if mockDialer1.dialCalled != mockDialer2.dialCalled && mockDialer1.dialCalled != 50 {
t.Errorf("Expected dialer to call Dial() on multiple backend dialers %d times [actual: %d, %d]", 50, mockDialer1.dialCalled, mockDialer2.dialCalled)
}

if mockDialer1.closeCalled != mockDialer2.closeCalled && mockDialer1.closeCalled != 50 {
t.Errorf("Expected dialer to call Close() on multiple backend dialers %d times [actual: %d, %d]", 50, mockDialer1.closeCalled, mockDialer2.closeCalled)
}
}

func TestLoadBalancingDialerShouldReturnDialerAwareClient(t *testing.T) {
// given
mockDialer1 := new(mockDialer)
dialer := &loadBalancingDialer{dialers: []dialer{mockDialer1}}

// when
client, err := dialer.Dial()

// then
if err != nil {
t.Errorf("Expected error to be nil")
}

if awareClient, ok := client.(*dialerAwareClient); !ok {
t.Error("Expected dialer to wrap client")
} else {
if awareClient.dialer != mockDialer1 {
t.Error("Expected wrapped client to have reference to dialer")
}
}
}

func TestLoadBalancingDialerShouldUnderlyingReturnDialerError(t *testing.T) {
// given
mockDialer1 := new(errorReturningDialer)
dialer := &loadBalancingDialer{dialers: []dialer{mockDialer1}}

// when
_, err := dialer.Dial()

// then
if err.Error() != "Error during dial" {
t.Errorf("Expected 'Error during dial', got: '%s'", err.Error())
}
}

func TestLoadBalancingDialerShouldCloseClient(t *testing.T) {
// given
mockDialer1 := new(mockDialer)
mockDialer2 := new(mockDialer)

dialer := &loadBalancingDialer{dialers: []dialer{mockDialer1, mockDialer2}}
client, _ := dialer.Dial()

// when
err := dialer.Close(client)

// then
if err != nil {
t.Error("Expected error not to occur")
}

// load balancing starts from index 1
if mockDialer2.client != client {
t.Errorf("Expected Close() to be called on referenced dialer")
}
}

type mockDialer struct {
dialCalled int
closeCalled int
client Client
}

type mockClient struct {
Client
}

func (m *mockDialer) Dial() (Client, error) {
m.dialCalled++
return mockClient{Client: &FCGIClient{}}, nil
}

func (m *mockDialer) Close(c Client) error {
m.client = c
m.closeCalled++
return nil
}

type errorReturningDialer struct {
client Client
}

func (m *errorReturningDialer) Dial() (Client, error) {
return mockClient{Client: &FCGIClient{}}, errors.New("Error during dial")
}

func (m *errorReturningDialer) Close(c Client) error {
m.client = c
return errors.New("Error during close")
}
9 changes: 7 additions & 2 deletions caddyhttp/fastcgi/fastcgi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/mholt/caddy/caddyhttp/httpserver"
)
Expand Down Expand Up @@ -81,6 +82,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
if err != nil {
return http.StatusBadGateway, err
}
fcgiBackend.SetReadTimeout(rule.ReadTimeout)

var resp *http.Response
contentLength, _ := strconv.Atoi(r.Header.Get("Content-Length"))
Expand Down Expand Up @@ -111,9 +113,9 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
defer rule.dialer.Close(fcgiBackend)

// Log any stderr output from upstream
if fcgiBackend.stderr.Len() != 0 {
if stderr := fcgiBackend.StdErr(); stderr.Len() != 0 {
// Remove trailing newline, error logger already does this.
err = LogError(strings.TrimSuffix(fcgiBackend.stderr.String(), "\n"))
err = LogError(strings.TrimSuffix(stderr.String(), "\n"))
}

// Normally we would return the status code if it is an error status (>= 400),
Expand Down Expand Up @@ -301,6 +303,9 @@ type Rule struct {
// Ignored paths
IgnoredSubPaths []string

// The duration used to set a deadline when reading from the FastCGI server.
ReadTimeout time.Duration

// FCGI dialer
dialer dialer
}
Expand Down
47 changes: 45 additions & 2 deletions caddyhttp/fastcgi/fastcgi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"
"testing"
"time"
)

func TestServeHTTP(t *testing.T) {
Expand All @@ -28,8 +29,14 @@ func TestServeHTTP(t *testing.T) {

network, address := parseAddress(listener.Addr().String())
handler := Handler{
Next: nil,
Rules: []Rule{{Path: "/", Address: listener.Addr().String(), dialer: basicDialer{network, address}}},
Next: nil,
Rules: []Rule{
{
Path: "/",
Address: listener.Addr().String(),
dialer: basicDialer{network: network, address: address},
},
},
}
r, err := http.NewRequest("GET", "/", nil)
if err != nil {
Expand Down Expand Up @@ -318,3 +325,39 @@ func TestBuildEnv(t *testing.T) {
}

}

func TestReadTimeout(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Unable to create listener for test: %v", err)
}
defer listener.Close()
go fcgi.Serve(listener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Second * 1)
}))

network, address := parseAddress(listener.Addr().String())
handler := Handler{
Next: nil,
Rules: []Rule{
{
Path: "/",
Address: listener.Addr().String(),
dialer: basicDialer{network: network, address: address},
ReadTimeout: time.Millisecond * 100,
},
},
}
r, err := http.NewRequest("GET", "/", nil)
if err != nil {
t.Fatalf("Unable to create request: %v", err)
}
w := httptest.NewRecorder()

_, err = handler.ServeHTTP(w, r)
if err == nil {
t.Error("Expected i/o timeout error but had none")
} else if err, ok := err.(net.Error); !ok || !err.Timeout() {
t.Errorf("Expected i/o timeout error, got: '%s'", err.Error())
}
}
Loading

0 comments on commit aa3adac

Please sign in to comment.