Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix races from clientv3/integration tests #4876

Merged
merged 8 commits into from
Mar 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)

var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
)

type Logger grpclog.Logger

// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
Expand All @@ -54,8 +51,6 @@ type Client struct {

ctx context.Context
cancel context.CancelFunc

logger Logger
}

// EndpointDialer is a policy for choosing which endpoint to dial next
Expand Down Expand Up @@ -190,13 +185,11 @@ func newClient(cfg *Config) (*Client, error) {
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = &maintenance{c: client}
if cfg.Logger == nil {
client.logger = log.New(ioutil.Discard, "", 0)
// disable client side grpc by default
grpclog.SetLogger(log.New(ioutil.Discard, "", 0))
if cfg.Logger != nil {
logger.Set(cfg.Logger)
} else {
client.logger = cfg.Logger
grpclog.SetLogger(cfg.Logger)
// disable client side grpc by default
logger.Set(log.New(ioutil.Discard, "", 0))
}

return client, nil
Expand Down
13 changes: 8 additions & 5 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -379,17 +380,19 @@ func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNot
func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
defer testutil.AfterTest(t)

// accelerate report interval so test terminates quickly
oldpi := v3rpc.ProgressReportIntervalMilliseconds
// using atomics to avoid race warnings
atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000)
pi := 3 * time.Second
defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }()

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close()

testInterval := 3 * time.Second
pi := v3rpc.ProgressReportInterval
v3rpc.ProgressReportInterval = testInterval
defer func() { v3rpc.ProgressReportInterval = pi }()

opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
if watchOnPut {
opts = append(opts, clientv3.WithPrefix())
Expand Down
10 changes: 7 additions & 3 deletions clientv3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,18 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
}

func (kv *kv) switchRemote(prevErr error) error {
// Usually it's a bad idea to lock on network i/o but here it's OK
// since the link is down and new requests can't be processed anyway.
// Likewise, if connecting stalls, closing the Client can break the
// lock via context cancelation.
kv.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not a good idea to hold the lock while doing network io i think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK here since switchRemote only happens on connection failure; IO would have to wait for a new connection anyway and canceling the client context can break the lock if it stalls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. add a comment for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

defer kv.mu.Unlock()

newConn, err := kv.c.retryConnection(kv.conn, prevErr)
if err != nil {
return err
}

kv.mu.Lock()
defer kv.mu.Unlock()

kv.conn = newConn
kv.remote = pb.NewKVClient(kv.conn)
return nil
Expand Down
64 changes: 64 additions & 0 deletions clientv3/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3

import (
"log"
"os"
"sync"

"google.golang.org/grpc/grpclog"
)

type Logger grpclog.Logger

var (
logger settableLogger
)

type settableLogger struct {
l grpclog.Logger
mu sync.RWMutex
}

func init() {
// use go's standard logger by default like grpc
logger.mu.Lock()
logger.l = log.New(os.Stderr, "", log.LstdFlags)
grpclog.SetLogger(&logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. defer to @xiang90

logger.mu.Unlock()
}

func (s *settableLogger) Set(l Logger) {
s.mu.Lock()
logger.l = l
s.mu.Unlock()
}

func (s *settableLogger) Get() Logger {
s.mu.RLock()
l := logger.l
s.mu.RUnlock()
return l
}

// implement the grpclog.Logger interface

func (s *settableLogger) Fatal(args ...interface{}) { s.Get().Fatal(args...) }
func (s *settableLogger) Fatalf(format string, args ...interface{}) { s.Get().Fatalf(format, args...) }
func (s *settableLogger) Fatalln(args ...interface{}) { s.Get().Fatalln(args...) }
func (s *settableLogger) Print(args ...interface{}) { s.Get().Print(args...) }
func (s *settableLogger) Printf(format string, args ...interface{}) { s.Get().Printf(format, args...) }
func (s *settableLogger) Println(args ...interface{}) { s.Get().Println(args...) }
18 changes: 13 additions & 5 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
w.streams[ws.id] = ws
w.mu.Unlock()

// send messages to subscriber
go w.serveStream(ws)

// pass back the subscriber channel for the watcher
pendingReq.retc <- ret

// send messages to subscriber
go w.serveStream(ws)
}

// closeStream closes the watcher resources and removes it
Expand Down Expand Up @@ -436,11 +436,15 @@ func (w *watcher) serveStream(ws *watcherStream) {
// TODO don't keep buffering if subscriber stops reading
wrs = append(wrs, wr)
case resumeRev := <-ws.resumec:
wrs = nil
resuming = true
if resumeRev == -1 {
// pause serving stream while resume gets set up
break
}
if resumeRev != ws.lastRev {
panic("unexpected resume revision")
}
wrs = nil
resuming = true
case <-w.donec:
closing = true
case <-ws.initReq.ctx.Done():
Expand Down Expand Up @@ -502,6 +506,9 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RUnlock()

for _, ws := range streams {
// pause serveStream
ws.resumec <- -1

// reconstruct watcher from initial request
if ws.lastRev != 0 {
ws.initReq.rev = ws.lastRev
Expand All @@ -525,6 +532,7 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
w.streams[ws.id] = ws
w.mu.Unlock()

// unpause serveStream
ws.resumec <- ws.lastRev
}
return nil
Expand Down
15 changes: 12 additions & 3 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v3rpc

import (
"io"
"sync"
"time"

"github.com/coreos/etcd/etcdserver"
Expand All @@ -42,8 +43,9 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {

var (
// expose for testing purpose. External test can change this to a
// small value to finish fast.
ProgressReportInterval = 10 * time.Minute
// small value to finish fast. The type is int32 instead of time.Duration
// in order to placate the race detector by setting the value with atomic stores.
ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment for why we use int32 instead of time duration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

const (
Expand Down Expand Up @@ -71,6 +73,8 @@ type serverWatchStream struct {
// progress tracks the watchID that stream might need to send
// progress to.
progress map[storage.WatchID]bool
// mu protects progress
mu sync.Mutex

// closec indicates the stream is closed.
closec chan struct{}
Expand Down Expand Up @@ -144,7 +148,9 @@ func (sws *serverWatchStream) recvLoop() error {
WatchId: id,
Canceled: true,
}
sws.mu.Lock()
delete(sws.progress, storage.WatchID(id))
sws.mu.Unlock()
}
}
// TODO: do we need to return error back to client?
Expand All @@ -160,7 +166,8 @@ func (sws *serverWatchStream) sendLoop() {
// watch responses pending on a watch id creation message
pending := make(map[storage.WatchID][]*pb.WatchResponse)

progressTicker := time.NewTicker(ProgressReportInterval)
interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond
progressTicker := time.NewTicker(interval)
defer progressTicker.Stop()

for {
Expand Down Expand Up @@ -198,9 +205,11 @@ func (sws *serverWatchStream) sendLoop() {
return
}

sws.mu.Lock()
if _, ok := sws.progress[wresp.WatchID]; ok {
sws.progress[wresp.WatchID] = false
}
sws.mu.Unlock()

case c, ok := <-sws.ctrlStream:
if !ok {
Expand Down
9 changes: 6 additions & 3 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -922,10 +923,12 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
}

func TestWatchWithProgressNotify(t *testing.T) {
// accelerate report interval so test terminates quickly
oldpi := v3rpc.ProgressReportIntervalMilliseconds
// using atomics to avoid race warnings
atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000)
testInterval := 3 * time.Second
pi := v3rpc.ProgressReportInterval
v3rpc.ProgressReportInterval = testInterval
defer func() { v3rpc.ProgressReportInterval = pi }()
defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }()

defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
Expand Down
8 changes: 4 additions & 4 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,19 @@ func (t *Transport) Send(msgs []raftpb.Message) {
to := types.ID(m.To)

t.mu.RLock()
p, ok := t.peers[to]
p, pok := t.peers[to]
g, rok := t.remotes[to]
t.mu.RUnlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. Would like to test this code to see if we still get #4855.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I still see failures even with this. Will investigate more.


if ok {
if pok {
if m.Type == raftpb.MsgApp {
t.ServerStats.SendAppendReq(m.Size())
}
p.send(m)
continue
}

g, ok := t.remotes[to]
if ok {
if rok {
g.send(m)
continue
}
Expand Down
11 changes: 5 additions & 6 deletions test
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ split=(${TEST// / })
TEST=${split[@]/#/${REPO_PATH}/}
split=(${NO_RACE_TEST// / })
NO_RACE_TEST=${split[@]/#/${REPO_PATH}/}
MACHINE_TYPE=$(uname -m)
if [ $MACHINE_TYPE != "armv7l" ]; then
RACE="--race"
fi

function unit_tests {
echo "Running tests..."

MACHINE_TYPE=$(uname -m)
if [ $MACHINE_TYPE != "armv7l" ]; then
RACE="--race"
fi
go test -timeout 3m ${COVER} ${RACE} -cpu 1,2,4 $@ ${TEST}
go test -timeout 3m ${COVER} -cpu 1,2,4 $@ ${NO_RACE_TEST}
}
Expand All @@ -61,7 +60,7 @@ function integration_tests {
echo "Running integration tests..."
go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e
go test -timeout 15m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration
go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
}

Expand Down