Skip to content

Commit

Permalink
kgo: universally switch to 1.19's atomics if on Go 1.19+
Browse files Browse the repository at this point in the history
The current lint on arm should be ensuring alignment is proper, but
apparently that is not always the case, as seen in #286.

Go has compiler intrinsics to ensure proper alignment for the actual
atomic number types introduced in 1.19.

This doesn't fix 1.18, but it should fix 1.19+.

Closes #286.
  • Loading branch information
twmb committed Jan 3, 2023
1 parent 66e626f commit a2c4bad
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 113 deletions.
18 changes: 8 additions & 10 deletions pkg/kgo/atomic_maybe_work.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package kgo

import "sync/atomic"

const (
stateUnstarted = iota
stateWorking
stateContinueWorking
)

type workLoop struct{ state uint32 }
type workLoop struct{ state atomicU32 }

// maybeBegin returns whether a work loop should begin.
func (l *workLoop) maybeBegin() bool {
var state uint32
var done bool
for !done {
switch state = atomic.LoadUint32(&l.state); state {
switch state = l.state.Load(); state {
case stateUnstarted:
done = atomic.CompareAndSwapUint32(&l.state, state, stateWorking)
done = l.state.CompareAndSwap(state, stateWorking)
state = stateWorking
case stateWorking:
done = atomic.CompareAndSwapUint32(&l.state, state, stateContinueWorking)
done = l.state.CompareAndSwap(state, stateContinueWorking)
state = stateContinueWorking
case stateContinueWorking:
done = true
Expand All @@ -43,24 +41,24 @@ func (l *workLoop) maybeBegin() bool {
// since the loop itself calls MaybeFinish after it has been started, this
// should never be called if the loop is unstarted.
func (l *workLoop) maybeFinish(again bool) bool {
switch state := atomic.LoadUint32(&l.state); state {
switch state := l.state.Load(); state {
// Working:
// If again, we know we should continue; keep our state.
// If not again, we try to downgrade state and stop.
// If we cannot, then something slipped in to say keep going.
case stateWorking:
if !again {
again = !atomic.CompareAndSwapUint32(&l.state, state, stateUnstarted)
again = !l.state.CompareAndSwap(state, stateUnstarted)
}
// Continue: demote ourself and run again no matter what.
case stateContinueWorking:
atomic.StoreUint32(&l.state, stateWorking)
l.state.Store(stateWorking)
again = true
}

return again
}

func (l *workLoop) hardFinish() {
atomic.StoreUint32(&l.state, stateUnstarted)
l.state.Store(stateUnstarted)
}
54 changes: 27 additions & 27 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type broker struct {
// reqs manages incoming message requests.
reqs ringReq
// dead is an atomic so a backed up reqs cannot block broker stoppage.
dead int32
dead atomicBool
}

// brokerVersions is loaded once (and potentially a few times concurrently if
Expand Down Expand Up @@ -214,7 +214,7 @@ func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string)

// stopForever permanently disables this broker.
func (b *broker) stopForever() {
if atomic.SwapInt32(&b.dead, 1) == 1 {
if b.dead.Swap(true) {
return
}

Expand Down Expand Up @@ -502,7 +502,7 @@ func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerC
pcxn = &b.cxnSlow
}

if *pcxn != nil && atomic.LoadInt32(&(*pcxn).dead) == 0 {
if *pcxn != nil && !(*pcxn).dead.Load() {
return *pcxn, nil
}

Expand Down Expand Up @@ -581,7 +581,7 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) {
b.cxnGroup,
b.cxnSlow,
} {
if cxn == nil || atomic.LoadInt32(&cxn.dead) == 1 {
if cxn == nil || cxn.dead.Load() {
continue
}

Expand All @@ -592,11 +592,11 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) {
// - produce can write but never read
// - fetch can hang for a while reading (infrequent writes)

lastWrite := time.Unix(0, atomic.LoadInt64(&cxn.lastWrite))
lastRead := time.Unix(0, atomic.LoadInt64(&cxn.lastRead))
lastWrite := time.Unix(0, cxn.lastWrite.Load())
lastRead := time.Unix(0, cxn.lastRead.Load())

writeIdle := time.Since(lastWrite) > idleTimeout && atomic.LoadUint32(&cxn.writing) == 0
readIdle := time.Since(lastRead) > idleTimeout && atomic.LoadUint32(&cxn.reading) == 0
writeIdle := time.Since(lastWrite) > idleTimeout && !cxn.writing.Load()
readIdle := time.Since(lastRead) > idleTimeout && !cxn.reading.Load()

if writeIdle && readIdle {
cxn.die()
Expand Down Expand Up @@ -634,7 +634,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
// brokerCxn manages an actual connection to a Kafka broker. This is separate
// the broker struct to allow lazy connection (re)creation.
type brokerCxn struct {
throttleUntil int64 // atomic nanosec
throttleUntil atomicI64 // atomic nanosec

conn net.Conn

Expand All @@ -651,17 +651,17 @@ type brokerCxn struct {
// The following four fields are used for connection reaping.
// Write is only updated in one location; read is updated in three
// due to readConn, readConnAsync, and discard.
lastWrite int64
lastRead int64
writing uint32
reading uint32
lastWrite atomicI64
lastRead atomicI64
writing atomicBool
reading atomicBool

successes uint64

// resps manages reading kafka responses.
resps ringResp
// dead is an atomic so that a backed up resps cannot block cxn death.
dead int32
dead atomicBool
// closed in cloneConn; allows throttle waiting to quit
deadCh chan struct{}
}
Expand Down Expand Up @@ -982,7 +982,7 @@ func maybeUpdateCtxErr(clientCtx, reqCtx context.Context, err *error) {
func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt time.Time, req kmsg.Request) (corrID int32, bytesWritten int, writeWait, timeToWrite time.Duration, readEnqueue time.Time, writeErr error) {
// A nil ctx means we cannot be throttled.
if ctx != nil {
throttleUntil := time.Unix(0, atomic.LoadInt64(&cxn.throttleUntil))
throttleUntil := time.Unix(0, cxn.throttleUntil.Load())
if sleep := time.Until(throttleUntil); sleep > 0 {
after := time.NewTimer(sleep)
select {
Expand Down Expand Up @@ -1037,10 +1037,10 @@ func (cxn *brokerCxn) writeConn(
timeout time.Duration,
enqueuedForWritingAt time.Time,
) (bytesWritten int, writeWait, timeToWrite time.Duration, readEnqueue time.Time, writeErr error) {
atomic.SwapUint32(&cxn.writing, 1)
cxn.writing.Store(true)
defer func() {
atomic.StoreInt64(&cxn.lastWrite, time.Now().UnixNano())
atomic.SwapUint32(&cxn.writing, 0)
cxn.lastWrite.Store(time.Now().UnixNano())
cxn.writing.Store(false)
}()

if ctx == nil {
Expand Down Expand Up @@ -1085,10 +1085,10 @@ func (cxn *brokerCxn) readConn(
timeout time.Duration,
enqueuedForReadingAt time.Time,
) (nread int, buf []byte, readWait, timeToRead time.Duration, err error) {
atomic.SwapUint32(&cxn.reading, 1)
cxn.reading.Store(true)
defer func() {
atomic.StoreInt64(&cxn.lastRead, time.Now().UnixNano())
atomic.SwapUint32(&cxn.reading, 0)
cxn.lastRead.Store(time.Now().UnixNano())
cxn.reading.Store(false)
}()

if ctx == nil {
Expand Down Expand Up @@ -1256,7 +1256,7 @@ func (cxn *brokerCxn) closeConn() {
// die kills a broker connection (which could be dead already) and replies to
// all requests awaiting responses appropriately.
func (cxn *brokerCxn) die() {
if cxn == nil || atomic.SwapInt32(&cxn.dead, 1) == 1 {
if cxn == nil || cxn.dead.Swap(true) {
return
}
cxn.closeConn()
Expand Down Expand Up @@ -1364,10 +1364,10 @@ func (cxn *brokerCxn) discard() {
}
deadlineMu.Unlock()

atomic.SwapUint32(&cxn.reading, 1)
cxn.reading.Store(true)
defer func() {
atomic.StoreInt64(&cxn.lastRead, time.Now().UnixNano())
atomic.SwapUint32(&cxn.reading, 0)
cxn.lastRead.Store(time.Now().UnixNano())
cxn.reading.Store(false)
}()

readStart := time.Now()
Expand Down Expand Up @@ -1470,8 +1470,8 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {
if millis > 0 {
if throttlesAfterResp {
throttleUntil := time.Now().Add(time.Millisecond * time.Duration(millis)).UnixNano()
if throttleUntil > cxn.throttleUntil {
atomic.StoreInt64(&cxn.throttleUntil, throttleUntil)
if throttleUntil > cxn.throttleUntil.Load() {
cxn.throttleUntil.Store(throttleUntil)
}
}
cxn.cl.cfg.hooks.each(func(h Hook) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (o Offset) At(at int64) Offset {
}

type consumer struct {
bufferedRecords int64
bufferedRecords atomicI64

cl *Client

Expand Down Expand Up @@ -272,7 +272,7 @@ func (c *consumer) unaddRebalance() {
// problematic if for you if this function is consistently returning large
// values.
func (cl *Client) BufferedFetchRecords() int64 {
return atomic.LoadInt64(&cl.consumer.bufferedRecords)
return cl.consumer.bufferedRecords.Load()
}

type usedCursors map[*cursor]struct{}
Expand Down Expand Up @@ -1224,7 +1224,7 @@ type consumerSession struct {
desireFetchCh chan chan chan struct{}
cancelFetchCh chan chan chan struct{}
allowedFetches int
fetchManagerStarted uint32 // atomic, once 1, we start the fetch manager
fetchManagerStarted atomicBool // atomic, once true, we start the fetch manager

// Workers signify the number of fetch and list / epoch goroutines that
// are currently running within the context of this consumer session.
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func (c *consumer) newConsumerSession(tps *topicsPartitions) *consumerSession {
}

func (s *consumerSession) desireFetch() chan chan chan struct{} {
if atomic.SwapUint32(&s.fetchManagerStarted, 1) == 0 {
if !s.fetchManagerStarted.Swap(true) {
go s.manageFetchConcurrency()
}
return s.desireFetchCh
Expand Down
31 changes: 31 additions & 0 deletions pkg/kgo/go118.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,34 @@ func (b *atomicBool) Swap(v bool) bool {
}
return atomic.SwapUint32((*uint32)(b), swap) == 1
}

type atomicI32 int32

func (v *atomicI32) Add(s int32) int32 { return atomic.AddInt32((*int32)(v), s) }
func (v *atomicI32) Store(s int32) { atomic.StoreInt32((*int32)(v), s) }
func (v *atomicI32) Load() int32 { return atomic.LoadInt32((*int32)(v)) }
func (v *atomicI32) Swap(s int32) int32 { return atomic.SwapInt32((*int32)(v), s) }

type atomicU32 uint32

func (v *atomicU32) Add(s uint32) uint32 { return atomic.AddUint32((*uint32)(v), s) }
func (v *atomicU32) Store(s uint32) { atomic.StoreUint32((*uint32)(v), s) }
func (v *atomicU32) Load() uint32 { return atomic.LoadUint32((*uint32)(v)) }
func (v *atomicU32) Swap(s uint32) uint32 { return atomic.SwapUint32((*uint32)(v), s) }
func (v *atomicU32) CompareAndSwap(old, new uint32) bool {
return atomic.CompareAndSwapUint32((*uint32)(v), old, new)
}

type atomicI64 int64

func (v *atomicI64) Add(s int64) int64 { return atomic.AddInt64((*int64)(v), s) }
func (v *atomicI64) Store(s int64) { atomic.StoreInt64((*int64)(v), s) }
func (v *atomicI64) Load() int64 { return atomic.LoadInt64((*int64)(v)) }
func (v *atomicI64) Swap(s int64) int64 { return atomic.SwapInt64((*int64)(v), s) }

type atomicU64 uint64

func (v *atomicU64) Add(s uint64) uint64 { return atomic.AddUint64((*uint64)(v), s) }
func (v *atomicU64) Store(s uint64) { atomic.StoreUint64((*uint64)(v), s) }
func (v *atomicU64) Load() uint64 { return atomic.LoadUint64((*uint64)(v)) }
func (v *atomicU64) Swap(s uint64) uint64 { return atomic.SwapUint64((*uint64)(v), s) }
10 changes: 7 additions & 3 deletions pkg/kgo/go119.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ package kgo

import "sync/atomic"

type atomicBool struct {
atomic.Bool
}
type (
atomicBool struct{ atomic.Bool }
atomicI32 struct{ atomic.Int32 }
atomicU32 struct{ atomic.Uint32 }
atomicI64 struct{ atomic.Int64 }
atomicU64 struct{ atomic.Uint64 }
)
5 changes: 2 additions & 3 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"strconv"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -178,7 +177,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
fetches := cl.PollRecords(ctx, 100)
cancel()
if fetches.Err() == context.DeadlineExceeded || fetches.Err() == ErrClientClosed {
if consumed := int(atomic.LoadUint64(&c.consumed)); consumed == testRecordLimit {
if consumed := int(c.consumed.Load()); consumed == testRecordLimit {
return
} else if consumed > testRecordLimit {
panic(fmt.Sprintf("invalid: consumed too much from %s (group %s)", c.consumeFrom, c.group))
Expand Down Expand Up @@ -217,7 +216,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {

c.mu.Unlock()

atomic.AddUint64(&c.consumed, 1)
c.consumed.Add(1)

cl.Produce(
context.Background(),
Expand Down
7 changes: 3 additions & 4 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -66,7 +65,7 @@ func getSeedBrokers() Opt {
return SeedBrokers(strings.Split(seeds, ",")...)
}

var loggerNum int64
var loggerNum atomicI64

var testLogLevel = func() LogLevel {
level := strings.ToLower(os.Getenv("KGO_LOG_LEVEL"))
Expand All @@ -80,7 +79,7 @@ var testLogLevel = func() LogLevel {
}()

func testLogger() Logger {
num := atomic.AddInt64(&loggerNum, 1)
num := loggerNum.Add(1)
pfx := strconv.Itoa(int(num))
return BasicLogger(os.Stderr, testLogLevel, func() string {
return time.Now().Format("[15:04:05 ") + pfx + "]"
Expand Down Expand Up @@ -193,7 +192,7 @@ type testConsumer struct {

expBody []byte // what every record body should be

consumed uint64 // shared atomically
consumed atomicU64 // shared atomically

wg sync.WaitGroup
mu sync.Mutex
Expand Down
3 changes: 1 addition & 2 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kgo
import (
"math"
"math/rand"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kbin"
Expand Down Expand Up @@ -200,7 +199,7 @@ type (

func (i *leastBackupInput) Next() (int, int64) {
last := len(i.mapping) - 1
buffered := atomic.LoadInt64(&i.mapping[last].records.buffered)
buffered := i.mapping[last].records.buffered.Load()
i.mapping = i.mapping[:last]
return last, buffered
}
Expand Down
Loading

0 comments on commit a2c4bad

Please sign in to comment.