Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
rp := &ResourcePool{
resources: make(chan resourceWrapper, maxCap),
factory: factory,
capacity: sync2.AtomicInt64(capacity),
idleTimeout: sync2.AtomicDuration(idleTimeout),
capacity: sync2.NewAtomicInt64(int64(capacity)),
idleTimeout: sync2.NewAtomicDuration(idleTimeout),
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}
Expand Down
81 changes: 51 additions & 30 deletions go/sync2/atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,94 @@ import (
"time"
)

type AtomicInt32 int32
// AtomicInt32 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int32 functions.
type AtomicInt32 struct {
int32
}

// NewAtomicInt32 initializes a new AtomicInt32 with a given value.
func NewAtomicInt32(n int32) AtomicInt32 {
return AtomicInt32{n}
}

// Add atomically adds n to the value.
func (i *AtomicInt32) Add(n int32) int32 {
return atomic.AddInt32((*int32)(i), n)
return atomic.AddInt32(&i.int32, n)
}

// Set atomically sets n as new value.
func (i *AtomicInt32) Set(n int32) {
atomic.StoreInt32((*int32)(i), n)
atomic.StoreInt32(&i.int32, n)
}

// Get atomically returns the current value.
func (i *AtomicInt32) Get() int32 {
return atomic.LoadInt32((*int32)(i))
return atomic.LoadInt32(&i.int32)
}

// CompareAndSwap atomatically swaps the old with the new value.
func (i *AtomicInt32) CompareAndSwap(oldval, newval int32) (swapped bool) {
return atomic.CompareAndSwapInt32((*int32)(i), oldval, newval)
}

type AtomicUint32 uint32

func (i *AtomicUint32) Add(n uint32) uint32 {
return atomic.AddUint32((*uint32)(i), n)
return atomic.CompareAndSwapInt32(&i.int32, oldval, newval)
}

func (i *AtomicUint32) Set(n uint32) {
atomic.StoreUint32((*uint32)(i), n)
// AtomicInt64 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions.
type AtomicInt64 struct {
int64
}

func (i *AtomicUint32) Get() uint32 {
return atomic.LoadUint32((*uint32)(i))
// NewAtomicInt64 initializes a new AtomicInt64 with a given value.
func NewAtomicInt64(n int64) AtomicInt64 {
return AtomicInt64{n}
}

func (i *AtomicUint32) CompareAndSwap(oldval, newval uint32) (swapped bool) {
return atomic.CompareAndSwapUint32((*uint32)(i), oldval, newval)
}

type AtomicInt64 int64

// Add atomically adds n to the value.
func (i *AtomicInt64) Add(n int64) int64 {
return atomic.AddInt64((*int64)(i), n)
return atomic.AddInt64(&i.int64, n)
}

// Set atomically sets n as new value.
func (i *AtomicInt64) Set(n int64) {
atomic.StoreInt64((*int64)(i), n)
atomic.StoreInt64(&i.int64, n)
}

// Get atomically returns the current value.
func (i *AtomicInt64) Get() int64 {
return atomic.LoadInt64((*int64)(i))
return atomic.LoadInt64(&i.int64)
}

// CompareAndSwap atomatically swaps the old with the new value.
func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) {
return atomic.CompareAndSwapInt64((*int64)(i), oldval, newval)
return atomic.CompareAndSwapInt64(&i.int64, oldval, newval)
}

type AtomicDuration int64
// AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions.
type AtomicDuration struct {
int64
}

// NewAtomicDuration initializes a new AtomicDuration with a given value.
func NewAtomicDuration(duration time.Duration) AtomicDuration {
return AtomicDuration{int64(duration)}
}

// Add atomically adds duration to the value.
func (d *AtomicDuration) Add(duration time.Duration) time.Duration {
return time.Duration(atomic.AddInt64((*int64)(d), int64(duration)))
return time.Duration(atomic.AddInt64(&d.int64, int64(duration)))
}

// Set atomically sets duration as new value.
func (d *AtomicDuration) Set(duration time.Duration) {
atomic.StoreInt64((*int64)(d), int64(duration))
atomic.StoreInt64(&d.int64, int64(duration))
}

// Get atomically returns the current value.
func (d *AtomicDuration) Get() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(d)))
return time.Duration(atomic.LoadInt64(&d.int64))
}

// CompareAndSwap atomatically swaps the old with the new value.
func (d *AtomicDuration) CompareAndSwap(oldval, newval time.Duration) (swapped bool) {
return atomic.CompareAndSwapInt64((*int64)(d), int64(oldval), int64(newval))
return atomic.CompareAndSwapInt64(&d.int64, int64(oldval), int64(newval))
}

// AtomicString gives you atomic-style APIs for string, but
Expand All @@ -90,19 +108,22 @@ type AtomicString struct {
str string
}

// Set atomically sets str as new value.
func (s *AtomicString) Set(str string) {
s.mu.Lock()
s.str = str
s.mu.Unlock()
}

// Get atomically returns the current value.
func (s *AtomicString) Get() string {
s.mu.Lock()
str := s.str
s.mu.Unlock()
return str
}

// CompareAndSwap atomatically swaps the old with the new value.
func (s *AtomicString) CompareAndSwap(oldval, newval string) (swqpped bool) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions go/vt/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,16 @@ func NewQueryEngine(config Config) *QueryEngine {

// Vars
qe.queryTimeout.Set(time.Duration(config.QueryTimeout * 1e9))
qe.spotCheckFreq = sync2.AtomicInt64(config.SpotCheckRatio * spotCheckMultiplier)
qe.spotCheckFreq = sync2.NewAtomicInt64(int64(config.SpotCheckRatio * spotCheckMultiplier))
if config.StrictMode {
qe.strictMode.Set(1)
}
qe.strictTableAcl = config.StrictTableAcl
qe.enableTableAclDryRun = config.EnableTableAclDryRun
qe.exemptACL = config.TableAclExemptACL
qe.maxResultSize = sync2.AtomicInt64(config.MaxResultSize)
qe.maxDMLRows = sync2.AtomicInt64(config.MaxDMLRows)
qe.streamBufferSize = sync2.AtomicInt64(config.StreamBufferSize)
qe.maxResultSize = sync2.NewAtomicInt64(int64(config.MaxResultSize))
qe.maxDMLRows = sync2.NewAtomicInt64(int64(config.MaxDMLRows))
qe.streamBufferSize = sync2.NewAtomicInt64(int64(config.StreamBufferSize))

// Loggers
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)
Expand Down
6 changes: 3 additions & 3 deletions go/vt/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func NewTxPool(
axp := &TxPool{
pool: NewConnPool(name, capacity, idleTimeout, enablePublishStats, qStats),
activePool: pools.NewNumbered(),
lastID: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
poolTimeout: sync2.AtomicDuration(poolTimeout),
lastID: sync2.NewAtomicInt64(time.Now().UnixNano()),
timeout: sync2.NewAtomicDuration(timeout),
poolTimeout: sync2.NewAtomicDuration(poolTimeout),
ticks: timer.NewTimer(timeout / 10),
txStats: stats.NewTimings(txStatsName),
queryServiceStats: qStats,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletserver/txlogz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func testHandler(req *http.Request, t *testing.T) {
StartTime: time.Now(),
Queries: []string{"select * from test"},
Conclusion: "unknown",
LogToFile: sync2.AtomicInt32(0),
LogToFile: sync2.AtomicInt32{},
}
txConn.EndTime = txConn.StartTime
response = httptest.NewRecorder()
Expand Down
68 changes: 34 additions & 34 deletions go/vt/vtgate/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
if err != nil {
t.Errorf("want nil, got %v", err)
}
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}

// non-retryable failure
Expand All @@ -249,11 +249,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
}
}
// Ensure that we tried only once
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried topo only once when mapping KeyspaceId/KeyRange to shards
if s.SrvKeyspaceCounter != 1 {
Expand Down Expand Up @@ -281,11 +281,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
}
}
// Ensure that we tried only once.
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried topo only twice.
if s.SrvKeyspaceCounter != 2 {
Expand All @@ -308,15 +308,15 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
t.Errorf("want nil, got %v", err)
}
// Ensure original keyspace is not used.
if sbc0.ExecCount != 0 {
t.Errorf("want 0, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 0 {
t.Errorf("want 0, got %v", execCount)
}
if sbc1.ExecCount != 0 {
t.Errorf("want 0, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 0 {
t.Errorf("want 0, got %v", execCount)
}
// Ensure redirected keyspace is accessed once.
if sbc2.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc2.ExecCount)
if execCount := sbc2.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried each keyspace only once.
if s.SrvKeyspaceCounter != 1 {
Expand Down Expand Up @@ -345,11 +345,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
t.Errorf("want nil, got %v", err)
}
// Ensure that we tried only twice.
if sbc0.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
if sbc1.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// Ensure that we tried topo only 3 times.
if s.SrvKeyspaceCounter != 3 {
Expand All @@ -376,11 +376,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
t.Errorf("want nil, got %v", err)
}
// Ensure that we tried only twice.
if sbc0.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
if sbc1.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// Ensure that we tried topo only twice.
if s.SrvKeyspaceCounter != 2 {
Expand All @@ -399,8 +399,8 @@ func testResolverStreamGeneric(t *testing.T, name string, action func() (*mproto
if err != nil {
t.Errorf("want nil, got %v", err)
}
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}

// failure
Expand All @@ -415,8 +415,8 @@ func testResolverStreamGeneric(t *testing.T, name string, action func() (*mproto
t.Errorf("want\n%s\ngot\n%v", want, err)
}
// Ensure that we tried only once.
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried topo only once
if s.SrvKeyspaceCounter != 1 {
Expand Down Expand Up @@ -539,8 +539,8 @@ func TestResolverExecBatchAsTransaction(t *testing.T) {
if callcount != 2 {
t.Errorf("want 2, got %v", callcount)
}
if sbc.AsTransactionCount != 0 {
t.Errorf("want 0, got %v", sbc.AsTransactionCount)
if count := sbc.AsTransactionCount.Get(); count != 0 {
t.Errorf("want 0, got %v", count)
}

callcount = 0
Expand All @@ -552,8 +552,8 @@ func TestResolverExecBatchAsTransaction(t *testing.T) {
if callcount != 1 {
t.Errorf("want 1, got %v", callcount)
}
if sbc.AsTransactionCount != 1 {
t.Errorf("want 1, got %v", sbc.AsTransactionCount)
if count := sbc.AsTransactionCount.Get(); count != 1 {
t.Errorf("want 1, got %v", count)
}
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/router_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func TestSelectEqual(t *testing.T) {
if !reflect.DeepEqual(sbc2.Queries, wantQueries) {
t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc2.Queries, wantQueries)
}
if sbc1.ExecCount != 1 {
t.Errorf("sbc1.ExecCount: %v, want 1\n", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("sbc1.ExecCount: %v, want 1\n", execCount)
}
if sbc1.Queries != nil {
t.Errorf("sbc1.Queries: %+v, want nil\n", sbc1.Queries)
Expand Down
Loading