Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Broker struct {
conn net.Conn
connErr error
lock sync.Mutex
opened int32
opened atomic.Bool
responses chan *responsePromise
done chan bool

Expand Down Expand Up @@ -162,7 +162,7 @@ func NewBroker(addr string) *Broker {
// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
func (b *Broker) Open(conf *Config) error {
if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
if !b.opened.CompareAndSwap(false, true) {
return ErrAlreadyConnected
}

Expand All @@ -189,7 +189,7 @@ func (b *Broker) Open(conf *Config) error {
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)
return
}
if conf.Net.TLS.Enable {
Expand Down Expand Up @@ -254,7 +254,7 @@ func (b *Broker) Open(conf *Config) error {
Logger.Printf("Error while closing connection to broker %s (due to SASL v0 auth error: %s): %s\n", b.addr, b.connErr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)
return
}
}
Expand All @@ -275,7 +275,7 @@ func (b *Broker) Open(conf *Config) error {
Logger.Printf("Error while closing connection to broker %s (due to SASL v1 auth error: %s): %s\n", b.addr, b.connErr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)
return
}
}
Expand Down Expand Up @@ -349,8 +349,7 @@ func (b *Broker) Close() error {
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}

atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)

return err
}
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type client struct {
// updateMetadataMs stores the time at which metadata was lasted updated.
// Note: this accessed atomically so must be the first word in the struct
// as per golang/go#41970
updateMetadataMs int64
updateMetadataMs atomic.Int64

conf *Config
closer, closed chan none // for shutting down background metadata updater
Expand Down Expand Up @@ -969,7 +969,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
time.Sleep(backoff)
}

t := atomic.LoadInt64(&client.updateMetadataMs)
t := client.updateMetadataMs.Load()
if time.Since(time.UnixMilli(t)) < backoff {
return err
}
Expand All @@ -994,7 +994,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,

req := NewMetadataRequest(client.conf.Version, topics)
req.AllowAutoTopicCreation = allowAutoTopicCreation
atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli())
client.updateMetadataMs.Store(time.Now().UnixMilli())

response, err := broker.GetMetadata(req)
var kerror KError
Expand Down
22 changes: 11 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ type PartitionConsumer interface {
}

type partitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG

consumer *consumer
conf *Config
Expand All @@ -411,9 +411,9 @@ type partitionConsumer struct {
responseResult error
fetchSize int32
offset int64
retries int32
retries atomic.Int32

paused int32
paused atomic.Bool // accessed atomically, 0 = not paused, 1 = paused
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Expand All @@ -434,7 +434,7 @@ func (child *partitionConsumer) sendError(err error) {

func (child *partitionConsumer) computeBackoff() time.Duration {
if child.conf.Consumer.Retry.BackoffFunc != nil {
retries := atomic.AddInt32(&child.retries, 1)
retries := child.retries.Add(1)
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
}
return child.conf.Consumer.Retry.Backoff
Expand Down Expand Up @@ -508,7 +508,7 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
return err
}

child.highWaterMarkOffset = newestOffset
child.highWaterMarkOffset.Store(newestOffset)

oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
if err != nil {
Expand Down Expand Up @@ -562,7 +562,7 @@ func (child *partitionConsumer) Close() error {
}

func (child *partitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&child.highWaterMarkOffset)
return child.highWaterMarkOffset.Load()
}

func (child *partitionConsumer) responseFeeder() {
Expand All @@ -575,7 +575,7 @@ feederLoop:
msgs, child.responseResult = child.parseResponse(response)

if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
child.retries.Store(0)
}

for i, msg := range msgs {
Expand Down Expand Up @@ -751,7 +751,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
child.highWaterMarkOffset.Store(block.HighWaterMarkOffset)

// abortedProducerIDs contains producerID which message should be ignored as uncommitted
// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
Expand Down Expand Up @@ -837,17 +837,17 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {

// Pause implements PartitionConsumer.
func (child *partitionConsumer) Pause() {
atomic.StoreInt32(&child.paused, 1)
child.paused.Store(true)
}

// Resume implements PartitionConsumer.
func (child *partitionConsumer) Resume() {
atomic.StoreInt32(&child.paused, 0)
child.paused.Store(false)
}

// IsPaused implements PartitionConsumer.
func (child *partitionConsumer) IsPaused() bool {
return atomic.LoadInt32(&child.paused) == 1
return child.paused.Load()
}

type brokerConsumer struct {
Expand Down
29 changes: 15 additions & 14 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,17 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
highWatermarkOffset = 0
}

c.partitionConsumers[topic][partition] = &PartitionConsumer{
highWaterMarkOffset: highWatermarkOffset,
t: c.t,
topic: topic,
partition: partition,
offset: offset,
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
consumer := &PartitionConsumer{
t: c.t,
topic: topic,
partition: partition,
offset: offset,
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
}
consumer.highWaterMarkOffset.Store(highWatermarkOffset)
c.partitionConsumers[topic][partition] = consumer
}

return c.partitionConsumers[topic][partition]
Expand All @@ -247,7 +248,7 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
// Errors and Messages channel, you should specify what values will be provided on these
// channels using YieldMessage and YieldError.
type PartitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
suppressedHighWaterMarkOffset int64
l sync.Mutex
t ErrorReporter
Expand Down Expand Up @@ -345,15 +346,15 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset)
return pc.highWaterMarkOffset.Load()
}

// Pause implements the Pause method from the sarama.PartitionConsumer interface.
func (pc *PartitionConsumer) Pause() {
pc.l.Lock()
defer pc.l.Unlock()

pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset)
pc.suppressedHighWaterMarkOffset = pc.highWaterMarkOffset.Load()

pc.paused = true
}
Expand All @@ -363,7 +364,7 @@ func (pc *PartitionConsumer) Resume() {
pc.l.Lock()
defer pc.l.Unlock()

pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset)
pc.highWaterMarkOffset.Store(pc.suppressedHighWaterMarkOffset)
for len(pc.suppressedMessages) > 0 {
msg := <-pc.suppressedMessages
pc.messages <- msg
Expand Down Expand Up @@ -400,7 +401,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio
msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1
pc.suppressedMessages <- msg
} else {
msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1
msg.Offset = pc.highWaterMarkOffset.Add(1) - 1
pc.messages <- msg
}

Expand Down
Loading