diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go index 5eac3eeb85e..1d549bdcf51 100644 --- a/go/pools/resource_pool.go +++ b/go/pools/resource_pool.go @@ -65,8 +65,8 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur rp := &ResourcePool{ resources: make(chan resourceWrapper, maxCap), factory: factory, - capacity: sync2.AtomicInt64(capacity), - idleTimeout: sync2.AtomicDuration(idleTimeout), + capacity: sync2.NewAtomicInt64(int64(capacity)), + idleTimeout: sync2.NewAtomicDuration(idleTimeout), } for i := 0; i < capacity; i++ { rp.resources <- resourceWrapper{} diff --git a/go/sync2/atomic.go b/go/sync2/atomic.go index 909f3b10f28..2637112a3e2 100644 --- a/go/sync2/atomic.go +++ b/go/sync2/atomic.go @@ -10,76 +10,94 @@ import ( "time" ) -type AtomicInt32 int32 +// AtomicInt32 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int32 functions. +type AtomicInt32 struct { + int32 +} + +// NewAtomicInt32 initializes a new AtomicInt32 with a given value. +func NewAtomicInt32(n int32) AtomicInt32 { + return AtomicInt32{n} +} +// Add atomically adds n to the value. func (i *AtomicInt32) Add(n int32) int32 { - return atomic.AddInt32((*int32)(i), n) + return atomic.AddInt32(&i.int32, n) } +// Set atomically sets n as new value. func (i *AtomicInt32) Set(n int32) { - atomic.StoreInt32((*int32)(i), n) + atomic.StoreInt32(&i.int32, n) } +// Get atomically returns the current value. func (i *AtomicInt32) Get() int32 { - return atomic.LoadInt32((*int32)(i)) + return atomic.LoadInt32(&i.int32) } +// CompareAndSwap atomatically swaps the old with the new value. func (i *AtomicInt32) CompareAndSwap(oldval, newval int32) (swapped bool) { - return atomic.CompareAndSwapInt32((*int32)(i), oldval, newval) -} - -type AtomicUint32 uint32 - -func (i *AtomicUint32) Add(n uint32) uint32 { - return atomic.AddUint32((*uint32)(i), n) + return atomic.CompareAndSwapInt32(&i.int32, oldval, newval) } -func (i *AtomicUint32) Set(n uint32) { - atomic.StoreUint32((*uint32)(i), n) +// AtomicInt64 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions. +type AtomicInt64 struct { + int64 } -func (i *AtomicUint32) Get() uint32 { - return atomic.LoadUint32((*uint32)(i)) +// NewAtomicInt64 initializes a new AtomicInt64 with a given value. +func NewAtomicInt64(n int64) AtomicInt64 { + return AtomicInt64{n} } -func (i *AtomicUint32) CompareAndSwap(oldval, newval uint32) (swapped bool) { - return atomic.CompareAndSwapUint32((*uint32)(i), oldval, newval) -} - -type AtomicInt64 int64 - +// Add atomically adds n to the value. func (i *AtomicInt64) Add(n int64) int64 { - return atomic.AddInt64((*int64)(i), n) + return atomic.AddInt64(&i.int64, n) } +// Set atomically sets n as new value. func (i *AtomicInt64) Set(n int64) { - atomic.StoreInt64((*int64)(i), n) + atomic.StoreInt64(&i.int64, n) } +// Get atomically returns the current value. func (i *AtomicInt64) Get() int64 { - return atomic.LoadInt64((*int64)(i)) + return atomic.LoadInt64(&i.int64) } +// CompareAndSwap atomatically swaps the old with the new value. func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) { - return atomic.CompareAndSwapInt64((*int64)(i), oldval, newval) + return atomic.CompareAndSwapInt64(&i.int64, oldval, newval) } -type AtomicDuration int64 +// AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions. +type AtomicDuration struct { + int64 +} + +// NewAtomicDuration initializes a new AtomicDuration with a given value. +func NewAtomicDuration(duration time.Duration) AtomicDuration { + return AtomicDuration{int64(duration)} +} +// Add atomically adds duration to the value. func (d *AtomicDuration) Add(duration time.Duration) time.Duration { - return time.Duration(atomic.AddInt64((*int64)(d), int64(duration))) + return time.Duration(atomic.AddInt64(&d.int64, int64(duration))) } +// Set atomically sets duration as new value. func (d *AtomicDuration) Set(duration time.Duration) { - atomic.StoreInt64((*int64)(d), int64(duration)) + atomic.StoreInt64(&d.int64, int64(duration)) } +// Get atomically returns the current value. func (d *AtomicDuration) Get() time.Duration { - return time.Duration(atomic.LoadInt64((*int64)(d))) + return time.Duration(atomic.LoadInt64(&d.int64)) } +// CompareAndSwap atomatically swaps the old with the new value. func (d *AtomicDuration) CompareAndSwap(oldval, newval time.Duration) (swapped bool) { - return atomic.CompareAndSwapInt64((*int64)(d), int64(oldval), int64(newval)) + return atomic.CompareAndSwapInt64(&d.int64, int64(oldval), int64(newval)) } // AtomicString gives you atomic-style APIs for string, but @@ -90,12 +108,14 @@ type AtomicString struct { str string } +// Set atomically sets str as new value. func (s *AtomicString) Set(str string) { s.mu.Lock() s.str = str s.mu.Unlock() } +// Get atomically returns the current value. func (s *AtomicString) Get() string { s.mu.Lock() str := s.str @@ -103,6 +123,7 @@ func (s *AtomicString) Get() string { return str } +// CompareAndSwap atomatically swaps the old with the new value. func (s *AtomicString) CompareAndSwap(oldval, newval string) (swqpped bool) { s.mu.Lock() defer s.mu.Unlock() diff --git a/go/vt/tabletserver/query_engine.go b/go/vt/tabletserver/query_engine.go index 2c66fbc170d..cd5a1e34245 100644 --- a/go/vt/tabletserver/query_engine.go +++ b/go/vt/tabletserver/query_engine.go @@ -168,16 +168,16 @@ func NewQueryEngine(config Config) *QueryEngine { // Vars qe.queryTimeout.Set(time.Duration(config.QueryTimeout * 1e9)) - qe.spotCheckFreq = sync2.AtomicInt64(config.SpotCheckRatio * spotCheckMultiplier) + qe.spotCheckFreq = sync2.NewAtomicInt64(int64(config.SpotCheckRatio * spotCheckMultiplier)) if config.StrictMode { qe.strictMode.Set(1) } qe.strictTableAcl = config.StrictTableAcl qe.enableTableAclDryRun = config.EnableTableAclDryRun qe.exemptACL = config.TableAclExemptACL - qe.maxResultSize = sync2.AtomicInt64(config.MaxResultSize) - qe.maxDMLRows = sync2.AtomicInt64(config.MaxDMLRows) - qe.streamBufferSize = sync2.AtomicInt64(config.StreamBufferSize) + qe.maxResultSize = sync2.NewAtomicInt64(int64(config.MaxResultSize)) + qe.maxDMLRows = sync2.NewAtomicInt64(int64(config.MaxDMLRows)) + qe.streamBufferSize = sync2.NewAtomicInt64(int64(config.StreamBufferSize)) // Loggers qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second) diff --git a/go/vt/tabletserver/tx_pool.go b/go/vt/tabletserver/tx_pool.go index 51244769cb1..fdf43ff231e 100644 --- a/go/vt/tabletserver/tx_pool.go +++ b/go/vt/tabletserver/tx_pool.go @@ -77,9 +77,9 @@ func NewTxPool( axp := &TxPool{ pool: NewConnPool(name, capacity, idleTimeout, enablePublishStats, qStats), activePool: pools.NewNumbered(), - lastID: sync2.AtomicInt64(time.Now().UnixNano()), - timeout: sync2.AtomicDuration(timeout), - poolTimeout: sync2.AtomicDuration(poolTimeout), + lastID: sync2.NewAtomicInt64(time.Now().UnixNano()), + timeout: sync2.NewAtomicDuration(timeout), + poolTimeout: sync2.NewAtomicDuration(poolTimeout), ticks: timer.NewTimer(timeout / 10), txStats: stats.NewTimings(txStatsName), queryServiceStats: qStats, diff --git a/go/vt/tabletserver/txlogz_test.go b/go/vt/tabletserver/txlogz_test.go index 17136d095f7..0a3aae6c976 100644 --- a/go/vt/tabletserver/txlogz_test.go +++ b/go/vt/tabletserver/txlogz_test.go @@ -26,7 +26,7 @@ func testHandler(req *http.Request, t *testing.T) { StartTime: time.Now(), Queries: []string{"select * from test"}, Conclusion: "unknown", - LogToFile: sync2.AtomicInt32(0), + LogToFile: sync2.AtomicInt32{}, } txConn.EndTime = txConn.StartTime response = httptest.NewRecorder() diff --git a/go/vt/vtgate/resolver_test.go b/go/vt/vtgate/resolver_test.go index 3b1cad9f1b4..7b94a2dbad6 100644 --- a/go/vt/vtgate/resolver_test.go +++ b/go/vt/vtgate/resolver_test.go @@ -221,11 +221,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query if err != nil { t.Errorf("want nil, got %v", err) } - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // non-retryable failure @@ -249,11 +249,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query } } // Ensure that we tried only once - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // Ensure that we tried topo only once when mapping KeyspaceId/KeyRange to shards if s.SrvKeyspaceCounter != 1 { @@ -281,11 +281,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query } } // Ensure that we tried only once. - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // Ensure that we tried topo only twice. if s.SrvKeyspaceCounter != 2 { @@ -308,15 +308,15 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query t.Errorf("want nil, got %v", err) } // Ensure original keyspace is not used. - if sbc0.ExecCount != 0 { - t.Errorf("want 0, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 0 { + t.Errorf("want 0, got %v", execCount) } - if sbc1.ExecCount != 0 { - t.Errorf("want 0, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 0 { + t.Errorf("want 0, got %v", execCount) } // Ensure redirected keyspace is accessed once. - if sbc2.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc2.ExecCount) + if execCount := sbc2.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // Ensure that we tried each keyspace only once. if s.SrvKeyspaceCounter != 1 { @@ -345,11 +345,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query t.Errorf("want nil, got %v", err) } // Ensure that we tried only twice. - if sbc0.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } - if sbc1.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // Ensure that we tried topo only 3 times. if s.SrvKeyspaceCounter != 3 { @@ -376,11 +376,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query t.Errorf("want nil, got %v", err) } // Ensure that we tried only twice. - if sbc0.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } - if sbc1.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // Ensure that we tried topo only twice. if s.SrvKeyspaceCounter != 2 { @@ -399,8 +399,8 @@ func testResolverStreamGeneric(t *testing.T, name string, action func() (*mproto if err != nil { t.Errorf("want nil, got %v", err) } - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // failure @@ -415,8 +415,8 @@ func testResolverStreamGeneric(t *testing.T, name string, action func() (*mproto t.Errorf("want\n%s\ngot\n%v", want, err) } // Ensure that we tried only once. - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // Ensure that we tried topo only once if s.SrvKeyspaceCounter != 1 { @@ -539,8 +539,8 @@ func TestResolverExecBatchAsTransaction(t *testing.T) { if callcount != 2 { t.Errorf("want 2, got %v", callcount) } - if sbc.AsTransactionCount != 0 { - t.Errorf("want 0, got %v", sbc.AsTransactionCount) + if count := sbc.AsTransactionCount.Get(); count != 0 { + t.Errorf("want 0, got %v", count) } callcount = 0 @@ -552,8 +552,8 @@ func TestResolverExecBatchAsTransaction(t *testing.T) { if callcount != 1 { t.Errorf("want 1, got %v", callcount) } - if sbc.AsTransactionCount != 1 { - t.Errorf("want 1, got %v", sbc.AsTransactionCount) + if count := sbc.AsTransactionCount.Get(); count != 1 { + t.Errorf("want 1, got %v", count) } } diff --git a/go/vt/vtgate/router_select_test.go b/go/vt/vtgate/router_select_test.go index f84721a4c8f..7d53848b15e 100644 --- a/go/vt/vtgate/router_select_test.go +++ b/go/vt/vtgate/router_select_test.go @@ -164,8 +164,8 @@ func TestSelectEqual(t *testing.T) { if !reflect.DeepEqual(sbc2.Queries, wantQueries) { t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc2.Queries, wantQueries) } - if sbc1.ExecCount != 1 { - t.Errorf("sbc1.ExecCount: %v, want 1\n", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("sbc1.ExecCount: %v, want 1\n", execCount) } if sbc1.Queries != nil { t.Errorf("sbc1.Queries: %+v, want nil\n", sbc1.Queries) diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index a87c87e3e07..ef53c4ffe4d 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -105,8 +105,8 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) ( t.Errorf("want %s, got %v", want, err) } // Ensure that we tried only once. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // two shards @@ -123,11 +123,11 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) ( t.Errorf("\nwant\n%s\ngot\n%v", want1, err) } // Ensure that we tried only once. - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // duplicate shards @@ -136,8 +136,8 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) ( s.MapTestConn("0", sbc) qr, err = f([]string{"0", "0"}) // Ensure that we executed only once. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // no errors @@ -150,11 +150,11 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) ( if err != nil { t.Errorf("want nil, got %v", err) } - if sbc0.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount) + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } if qr.RowsAffected != 2 { t.Errorf("want 2, got %v", qr.RowsAffected) @@ -287,11 +287,11 @@ func TestScatterConnCommitSuccess(t *testing.T) { if !reflect.DeepEqual(wantSession, *session.Session) { t.Errorf("want\n%+v, got\n%+v", wantSession, *session.Session) } - if sbc0.CommitCount != 1 { - t.Errorf("want 1, got %d", sbc0.CommitCount) + if commitCount := sbc0.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } - if sbc1.RollbackCount != 1 { - t.Errorf("want 1, got %d", sbc1.RollbackCount) + if rollbackCount := sbc1.RollbackCount.Get(); rollbackCount != 1 { + t.Errorf("want 1, got %d", rollbackCount) } } @@ -315,11 +315,11 @@ func TestScatterConnRollback(t *testing.T) { if !reflect.DeepEqual(wantSession, *session.Session) { t.Errorf("want\n%#v, got\n%#v", wantSession, *session.Session) } - if sbc0.RollbackCount != 1 { - t.Errorf("want 1, got %d", sbc0.RollbackCount) + if rollbackCount := sbc0.RollbackCount.Get(); rollbackCount != 1 { + t.Errorf("want 1, got %d", rollbackCount) } - if sbc1.RollbackCount != 1 { - t.Errorf("want 1, got %d", sbc1.RollbackCount) + if rollbackCount := sbc1.RollbackCount.Get(); rollbackCount != 1 { + t.Errorf("want 1, got %d", rollbackCount) } } @@ -331,8 +331,8 @@ func TestScatterConnClose(t *testing.T) { stc.Execute(context.Background(), "query1", nil, "TestScatterConnClose", []string{"0"}, "", nil, false) stc.Close() time.Sleep(1) - if sbc.CloseCount.Get() != 1 { - t.Errorf("want 1, got %d", sbc.CloseCount) + if closeCount := sbc.CloseCount.Get(); closeCount != 1 { + t.Errorf("want 1, got %d (test may be flaky because connections are closed asynchronously)", closeCount) } } @@ -380,14 +380,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session) } stc.Commit(context.Background(), session) - if sbc0.ExecCount != 1 || sbc1.ExecCount != 3 { - t.Errorf("want 1/3, got %d/%d", sbc0.ExecCount, sbc1.ExecCount) + { + execCount0 := sbc0.ExecCount.Get() + execCount1 := sbc1.ExecCount.Get() + if execCount0 != 1 || execCount1 != 3 { + t.Errorf("want 1/3, got %d/%d", execCount0, execCount1) + } } - if sbc0.CommitCount != 0 { - t.Errorf("want 0, got %d", sbc0.CommitCount) + if commitCount := sbc0.CommitCount.Get(); commitCount != 0 { + t.Errorf("want 0, got %d", commitCount) } - if sbc1.CommitCount != 1 { - t.Errorf("want 1, got %d", sbc1.CommitCount) + if commitCount := sbc1.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } // case 2: write query followed by read query (not in transaction), not in the same shard. @@ -414,14 +418,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session) } stc.Commit(context.Background(), session) - if sbc0.ExecCount != 3 || sbc1.ExecCount != 1 { - t.Errorf("want 3/1, got %d/%d", sbc0.ExecCount, sbc1.ExecCount) + { + execCount0 := sbc0.ExecCount.Get() + execCount1 := sbc1.ExecCount.Get() + if execCount0 != 3 || execCount1 != 1 { + t.Errorf("want 3/1, got %d/%d", execCount0, execCount1) + } } - if sbc0.CommitCount != 1 { - t.Errorf("want 1, got %d", sbc0.CommitCount) + if commitCount := sbc0.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } - if sbc1.CommitCount != 0 { - t.Errorf("want 0, got %d", sbc1.CommitCount) + if commitCount := sbc1.CommitCount.Get(); commitCount != 0 { + t.Errorf("want 0, got %d", commitCount) } // case 3: write query followed by read query, in the same shard. @@ -448,14 +456,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session) } stc.Commit(context.Background(), session) - if sbc0.ExecCount != 4 || sbc1.ExecCount != 1 { - t.Errorf("want 4/1, got %d/%d", sbc0.ExecCount, sbc1.ExecCount) + { + execCount0 := sbc0.ExecCount.Get() + execCount1 := sbc1.ExecCount.Get() + if execCount0 != 4 || execCount1 != 1 { + t.Errorf("want 4/1, got %d/%d", execCount0, execCount1) + } } - if sbc0.CommitCount != 1 { - t.Errorf("want 1, got %d", sbc0.CommitCount) + if commitCount := sbc0.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } - if sbc1.CommitCount != 0 { - t.Errorf("want 0, got %d", sbc1.CommitCount) + if commitCount := sbc1.CommitCount.Get(); commitCount != 0 { + t.Errorf("want 0, got %d", commitCount) } } diff --git a/go/vt/vtgate/shard_conn_test.go b/go/vt/vtgate/shard_conn_test.go index 7d0c0f8b7cf..d203dabda7f 100644 --- a/go/vt/vtgate/shard_conn_test.go +++ b/go/vt/vtgate/shard_conn_test.go @@ -133,8 +133,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) { t.Errorf("want 2, got %v", s.DialCounter) } // Ensure we executed 2 times before failing. - if sbc.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // retry error (one failure) @@ -150,8 +150,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) { t.Errorf("want 2, got %v", s.DialCounter) } // Ensure we executed twice (second one succeeded) - if sbc.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // fatal error (one failure) @@ -167,8 +167,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) { t.Errorf("want 2, got %v", s.DialCounter) } // Ensure we executed twice (second one succeeded) - if sbc.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // server error @@ -185,8 +185,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) { t.Errorf("want 1, got %v", s.DialCounter) } // Ensure we did not re-execute. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // conn error (one failure) @@ -204,8 +204,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) { t.Errorf("want 1, got %v", s.DialCounter) } // Ensure we did not re-execute. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // no failures @@ -219,8 +219,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) { if s.DialCounter != 1 { t.Errorf("want 1, got %v", s.DialCounter) } - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } @@ -235,8 +235,8 @@ func testShardConnTransact(t *testing.T, name string, f func() error) { t.Errorf("want %s, got %v", want, err) } // Should not retry if we're in transaction - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } // conn error @@ -249,8 +249,8 @@ func testShardConnTransact(t *testing.T, name string, f func() error) { t.Errorf("want %s, got %v", want, err) } // Should not retry if we're in transaction - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } @@ -270,8 +270,8 @@ func TestShardConnBeginOther(t *testing.T) { t.Errorf("want 1, got %v", s.DialCounter) } // Account for 1 call to Begin. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } @@ -289,8 +289,8 @@ func TestShardConnStreamingRetry(t *testing.T) { if s.DialCounter != 2 { t.Errorf("want 2, got %v", s.DialCounter) } - if sbc.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // ERR_FATAL @@ -307,8 +307,8 @@ func TestShardConnStreamingRetry(t *testing.T) { if s.DialCounter != 1 { t.Errorf("want 1, got %v", s.DialCounter) } - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } @@ -486,8 +486,8 @@ func TestShardConnReconnect(t *testing.T) { if timeDuration >= retryDelay { t.Errorf("want no delay, got %v", timeDuration) } - if sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount) + if execCount := sbc0.ExecCount.Get() + sbc1.ExecCount.Get() + sbc2.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } if s.EndPointCounter != 1 { t.Errorf("want 1, got %v", s.EndPointCounter) @@ -515,11 +515,16 @@ func TestShardConnReconnect(t *testing.T) { if timeDuration >= retryDelay { t.Errorf("want no delay, got %v", timeDuration) } - if sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount) + if execCount := sbc0.ExecCount.Get() + sbc1.ExecCount.Get() + sbc2.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } - if sbc0.ExecCount > 1 || sbc1.ExecCount > 1 || sbc2.ExecCount > 1 { - t.Errorf("want no more than 1, got %v,%v,%v", sbc0.ExecCount, sbc1.ExecCount, sbc2.ExecCount) + { + execCount0 := sbc0.ExecCount.Get() + execCount1 := sbc1.ExecCount.Get() + execCount2 := sbc2.ExecCount.Get() + if execCount0 > 1 || execCount1 > 1 || execCount2 > 1 { + t.Errorf("want no more than 1, got %v,%v,%v", execCount0, execCount1, execCount2) + } } if s.EndPointCounter != 2 { t.Errorf("want 2, got %v", s.EndPointCounter) @@ -548,11 +553,16 @@ func TestShardConnReconnect(t *testing.T) { if timeDuration >= retryDelay { t.Errorf("want no delay, got %v", timeDuration) } - if sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount) - } - if sbc0.ExecCount > 1 || sbc1.ExecCount > 1 || sbc2.ExecCount > 1 { - t.Errorf("want no more than 1, got %v,%v,%v", sbc0.ExecCount, sbc1.ExecCount, sbc2.ExecCount) + { + execCount0 := sbc0.ExecCount.Get() + execCount1 := sbc1.ExecCount.Get() + execCount2 := sbc2.ExecCount.Get() + if sum := execCount0 + execCount1 + execCount2; sum != 2 { + t.Errorf("want 2, got %v", sum) + } + if execCount0 > 1 || execCount1 > 1 || execCount2 > 1 { + t.Errorf("want no more than 1, got %v,%v,%v", execCount0, execCount1, execCount2) + } } if s.EndPointCounter != 2 { t.Errorf("want 2, got %v", s.EndPointCounter) @@ -585,12 +595,12 @@ func TestShardConnReconnect(t *testing.T) { t.Errorf("want instant resolve %v, got %v", retryDelay, timeDuration) } for _, conn := range []*sandboxConn{sbc0, sbc1, sbc2} { - wantExecCount := 1 + var wantExecCount int64 = 1 if conn == firstConn { wantExecCount = 2 } - if int(conn.ExecCount) != wantExecCount { - t.Errorf("want %v, got %v", wantExecCount, conn.ExecCount) + if execCount := conn.ExecCount.Get(); execCount != wantExecCount { + t.Errorf("want %v, got %v", wantExecCount, execCount) } } if s.EndPointCounter != 5 { @@ -628,12 +638,12 @@ func TestShardConnReconnect(t *testing.T) { if timeDuration >= retryDelay { t.Errorf("want no delay, got %v", timeDuration) } - if firstConn.ExecCount != 1 { - t.Errorf("want 1, got %v", firstConn.ExecCount) + if execCount := firstConn.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - totalExecCount := 0 + var totalExecCount int64 for _, conn := range s.TestConns["0"] { - totalExecCount += int(conn.(*sandboxConn).ExecCount) + totalExecCount += conn.(*sandboxConn).ExecCount.Get() } if totalExecCount != 1 { t.Errorf("want 1, got %v", totalExecCount) @@ -681,15 +691,15 @@ func TestShardConnReconnect(t *testing.T) { if timeDuration > retryDelay*2 { t.Errorf("want instant resolve %v, got %v", retryDelay, timeDuration) } - if secondConn.ExecCount != 1 { - t.Errorf("want 1, got %v", secondConn.ExecCount) + if execCount := secondConn.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } - if firstConn.ExecCount != 2 { - t.Errorf("want 2, got %v", firstConn.ExecCount) + if execCount := firstConn.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } for _, conn := range s.TestConns["0"] { - if conn != firstConn && conn.(*sandboxConn).ExecCount != 1 { - t.Errorf("want 1, got %v", conn.(*sandboxConn).ExecCount) + if execCount := conn.(*sandboxConn).ExecCount.Get(); conn != firstConn && execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } if s.EndPointCounter != 6 { @@ -734,16 +744,16 @@ func TestShardConnReconnect(t *testing.T) { if timeDuration >= retryDelay { t.Errorf("want no delay, got %v", timeDuration) } - if firstConn.ExecCount != 1 { - t.Errorf("want 1, got %v", firstConn.ExecCount) + if execCount := firstConn.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } for _, conn := range []*sandboxConn{sbc0, sbc1, sbc2} { - if conn != firstConn && conn.ExecCount != 0 { - t.Errorf("want 0, got %v", conn.ExecCount) + if execCount := conn.ExecCount.Get(); conn != firstConn && execCount != 0 { + t.Errorf("want 0, got %v", execCount) } } - if sbc3.ExecCount+sbc4.ExecCount+sbc5.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc3.ExecCount+sbc4.ExecCount+sbc5.ExecCount) + if sum := sbc3.ExecCount.Get() + sbc4.ExecCount.Get() + sbc5.ExecCount.Get(); sum != 1 { + t.Errorf("want 1, got %v", sum) } if s.EndPointCounter != 2 { t.Errorf("want 2, got %v", s.EndPointCounter) diff --git a/go/vt/vtgate/vertical_split_test.go b/go/vt/vtgate/vertical_split_test.go index c827af8f14a..6e871cee0b5 100644 --- a/go/vt/vtgate/vertical_split_test.go +++ b/go/vt/vtgate/vertical_split_test.go @@ -75,8 +75,8 @@ func TestInTransactionKeyspaceAlias(t *testing.T) { } // Ensure that we tried once, no retry here // since we are in a transaction. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } @@ -90,8 +90,8 @@ func testVerticalSplitGeneric(t *testing.T, isStreaming bool, f func(shards []st t.Errorf("want nil, got %v", err) } // Ensure that we tried 2 times, 1 for retry and 1 for redirect. - if sbc.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } // Fatal Error, for keyspace that is redirected should succeed. @@ -105,16 +105,16 @@ func testVerticalSplitGeneric(t *testing.T, isStreaming bool, f func(shards []st t.Errorf("want '%v', got '%v'", want, err) } // Ensure that we tried only once. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } else { if err != nil { t.Errorf("want nil, got %v", err) } // Ensure that we tried 2 times, 1 for retry and 1 for redirect. - if sbc.ExecCount != 2 { - t.Errorf("want 2, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 2 { + t.Errorf("want 2, got %v", execCount) } } @@ -128,7 +128,7 @@ func testVerticalSplitGeneric(t *testing.T, isStreaming bool, f func(shards []st t.Errorf("want '%v', got '%v'", want, err) } // Ensure that we tried once, no retry here. - if sbc.ExecCount != 1 { - t.Errorf("want 1, got %v", sbc.ExecCount) + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) } } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 78755de5e8b..9f04e1a8a54 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -102,7 +102,7 @@ func Init(serv SrvTopoServer, schema *planbuilder.Schema, cell string, retryDela rowsReturned: stats.NewMultiCounters("VtgateApiRowsReturned", []string{"Operation", "Keyspace", "DbType"}), maxInFlight: int64(maxInFlight), - inFlight: 0, + inFlight: sync2.NewAtomicInt64(0), logExecute: logutil.NewThrottledLogger("Execute", 5*time.Second), logExecuteShard: logutil.NewThrottledLogger("ExecuteShard", 5*time.Second), diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go index 4323a2badee..b7f93f2178f 100644 --- a/go/vt/vtgate/vtgate_test.go +++ b/go/vt/vtgate/vtgate_test.go @@ -79,8 +79,8 @@ func TestVTGateExecute(t *testing.T) { } rpcVTGate.Commit(context.Background(), q.Session) - if sbc.CommitCount != 1 { - t.Errorf("want 1, got %d", sbc.CommitCount) + if commitCount := sbc.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } q.Session = new(proto.Session) @@ -131,8 +131,8 @@ func TestVTGateExecuteShard(t *testing.T) { } rpcVTGate.Commit(context.Background(), q.Session) - if sbc.CommitCount != 1 { - t.Errorf("want 1, got %d", sbc.CommitCount) + if commitCount := sbc.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } q.Session = new(proto.Session) @@ -178,8 +178,8 @@ func TestVTGateExecuteKeyspaceIds(t *testing.T) { if qr.Session != nil { t.Errorf("want nil, got %+v\n", qr.Session) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v\n", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v\n", execCount) } // Test for successful execution in transaction q.Session = new(proto.Session) @@ -201,8 +201,8 @@ func TestVTGateExecuteKeyspaceIds(t *testing.T) { t.Errorf("want \n%+v, got \n%+v", wantSession, q.Session) } rpcVTGate.Commit(context.Background(), q.Session) - if sbc1.CommitCount.Get() != 1 { - t.Errorf("want 1, got %d", sbc1.CommitCount.Get()) + if commitCount := sbc1.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } // Test for multiple shards kid30, err := key.HexKeyspaceId("30").Unhex() @@ -243,8 +243,8 @@ func TestVTGateExecuteKeyRanges(t *testing.T) { if qr.Session != nil { t.Errorf("want nil, got %+v\n", qr.Session) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v\n", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v\n", execCount) } // Test for successful execution in transaction q.Session = new(proto.Session) @@ -269,8 +269,8 @@ func TestVTGateExecuteKeyRanges(t *testing.T) { t.Errorf("want \n%+v, got \n%+v", wantSession, q.Session) } rpcVTGate.Commit(context.Background(), q.Session) - if sbc1.CommitCount.Get() != 1 { - t.Errorf("want 1, got %v", sbc1.CommitCount.Get()) + if commitCount := sbc1.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %v", commitCount) } // Test for multiple shards kr, err = key.ParseKeyRangeParts("10", "30") @@ -317,8 +317,8 @@ func TestVTGateExecuteEntityIds(t *testing.T) { if qr.Session != nil { t.Errorf("want nil, got %+v\n", qr.Session) } - if sbc1.ExecCount != 1 { - t.Errorf("want 1, got %v\n", sbc1.ExecCount) + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v\n", execCount) } // Test for successful execution in transaction q.Session = new(proto.Session) @@ -340,8 +340,8 @@ func TestVTGateExecuteEntityIds(t *testing.T) { t.Errorf("want \n%+v, got \n%+v", wantSession, q.Session) } rpcVTGate.Commit(context.Background(), q.Session) - if sbc1.CommitCount.Get() != 1 { - t.Errorf("want 1, got %d", sbc1.CommitCount.Get()) + if commitCount := sbc1.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } // Test for multiple shards kid30, err := key.HexKeyspaceId("30").Unhex() @@ -526,8 +526,8 @@ func TestVTGateStreamExecuteKeyspaceIds(t *testing.T) { t.Errorf("want\n%#v\ngot\n%#v", want, qrs) } rpcVTGate.Commit(context.Background(), sq.Session) - if sbc.CommitCount.Get() != 1 { - t.Errorf("want 1, got %d", sbc.CommitCount.Get()) + if commitCount := sbc.CommitCount.Get(); commitCount != 1 { + t.Errorf("want 1, got %d", commitCount) } // Test for successful execution - multiple keyspaceids in single shard sq.Session = nil