Skip to content

Commit

Permalink
Merge pull request #464 from iamqizhao/master
Browse files Browse the repository at this point in the history
i) revise picker API and channel state API; ii) add unicastNamingPick to support custom name discovery.
  • Loading branch information
iamqizhao committed Dec 14, 2015
2 parents 3463c93 + f0ee562 commit 2d76f2f
Show file tree
Hide file tree
Showing 5 changed files with 448 additions and 76 deletions.
5 changes: 3 additions & 2 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
break
}
if err != nil {
t.Fatalf("Failed to receive the message from the client.")
return
}
if pf != compressionNone {
t.Fatalf("Received the mistaken message format %d, want %d", pf, compressionNone)
t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone)
return
}
var v string
codec := testCodec{}
Expand Down
108 changes: 70 additions & 38 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func WithCodec(c Codec) DialOption {
}
}

func WithPicker(p Picker) DialOption {
return func(o *dialOptions) {
o.picker = p
}
}

// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
// connection is up. Without this, Dial returns immediately and connecting the server
// happens in background.
Expand Down Expand Up @@ -154,7 +160,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
cc.dopts.codec = protoCodec{}
}
if cc.dopts.picker == nil {
cc.dopts.picker = &unicastPicker{}
cc.dopts.picker = &unicastPicker{
target: target,
}
}
if err := cc.dopts.picker.Init(cc); err != nil {
return nil, err
Expand Down Expand Up @@ -209,15 +217,15 @@ type ClientConn struct {

// State returns the connectivity state of cc.
// This is EXPERIMENTAL API.
func (cc *ClientConn) State() ConnectivityState {
func (cc *ClientConn) State() (ConnectivityState, error) {
return cc.dopts.picker.State()
}

// WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires on cc. It returns false if timeout fires, and true otherwise.
// WaitForStateChange blocks until the state changes to something other than the sourceState.
// It returns the new state or error.
// This is EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
return cc.dopts.picker.WaitForStateChange(timeout, sourceState)
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
return cc.dopts.picker.WaitForStateChange(ctx, sourceState)
}

// Close starts to tear down the ClientConn.
Expand All @@ -229,6 +237,7 @@ func (cc *ClientConn) Close() error {
type Conn struct {
target string
dopts dialOptions
resetChan chan int
shutdownChan chan struct{}
events trace.EventLog

Expand All @@ -249,6 +258,7 @@ func NewConn(cc *ClientConn) (*Conn, error) {
c := &Conn{
target: cc.target,
dopts: cc.dopts,
resetChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
}
if EnableTracing {
Expand Down Expand Up @@ -317,26 +327,20 @@ func (cc *Conn) State() ConnectivityState {
return cc.state
}

// WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires. It returns false if timeout fires and true otherwise.
// TODO(zhaoq): Rewrite for complex Picker.
func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
start := time.Now()
// WaitForStateChange blocks until the state changes to something other than the sourceState.
func (cc *Conn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
cc.mu.Lock()
defer cc.mu.Unlock()
if sourceState != cc.state {
return true
}
expired := timeout <= time.Since(start)
if expired {
return false
return cc.state, nil
}
done := make(chan struct{})
var err error
go func() {
select {
case <-time.After(timeout - time.Since(start)):
case <-ctx.Done():
cc.mu.Lock()
expired = true
err = ctx.Err()
cc.stateCV.Broadcast()
cc.mu.Unlock()
case <-done:
Expand All @@ -345,11 +349,20 @@ func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState Connectivi
defer close(done)
for sourceState == cc.state {
cc.stateCV.Wait()
if expired {
return false
if err != nil {
return cc.state, err
}
}
return true
return cc.state, nil
}

// NotifyReset tries to signal the underlying transport needs to be reset due to
// for example a name resolution change in flight.
func (cc *Conn) NotifyReset() {
select {
case cc.resetChan <- 0:
default:
}
}

func (cc *Conn) resetTransport(closeTransport bool) error {
Expand Down Expand Up @@ -391,7 +404,11 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
copts.Timeout = timeout
}
connectTime := time.Now()
newTransport, err := transport.NewClientTransport(cc.target, &copts)
addr, err := cc.dopts.picker.PickAddr()
var newTransport transport.ClientTransport
if err == nil {
newTransport, err = transport.NewClientTransport(addr, &copts)
}
if err != nil {
cc.mu.Lock()
if cc.state == Shutdown {
Expand Down Expand Up @@ -422,7 +439,7 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
closeTransport = false
time.Sleep(sleepTime)
retries++
grpclog.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
grpclog.Printf("grpc: Conn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
continue
}
cc.mu.Lock()
Expand All @@ -445,6 +462,27 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
}
}

func (cc *Conn) reconnect() bool {
cc.mu.Lock()
if cc.state == Shutdown {
// cc.Close() has been invoked.
cc.mu.Unlock()
return false
}
cc.state = TransientFailure
cc.stateCV.Broadcast()
cc.mu.Unlock()
if err := cc.resetTransport(true); err != nil {
// The ClientConn is closing.
cc.mu.Lock()
cc.printf("transport exiting: %v", err)
cc.mu.Unlock()
grpclog.Printf("grpc: Conn.transportMonitor exits due to: %v", err)
return false
}
return true
}

// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (cc *Conn) transportMonitor() {
Expand All @@ -454,25 +492,19 @@ func (cc *Conn) transportMonitor() {
// the ClientConn is idle (i.e., no RPC in flight).
case <-cc.shutdownChan:
return
case <-cc.transport.Error():
cc.mu.Lock()
if cc.state == Shutdown {
// cc.Close() has been invoked.
cc.mu.Unlock()
case <-cc.resetChan:
if !cc.reconnect() {
return
}
cc.state = TransientFailure
cc.stateCV.Broadcast()
cc.mu.Unlock()
if err := cc.resetTransport(true); err != nil {
// The ClientConn is closing.
cc.mu.Lock()
cc.printf("transport exiting: %v", err)
cc.mu.Unlock()
grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err)
case <-cc.transport.Error():
if !cc.reconnect() {
return
}
continue
// Tries to drain reset signal if there is any since it is out-dated.
select {
case <-cc.resetChan:
default:
}
}
}
}
Expand Down
Loading

0 comments on commit 2d76f2f

Please sign in to comment.