diff --git a/broker.go b/broker.go index da31d4fe5..f00aa5396 100644 --- a/broker.go +++ b/broker.go @@ -29,7 +29,7 @@ type Broker struct { conn net.Conn connErr error lock sync.Mutex - opened int32 + opened atomic.Bool responses chan *responsePromise done chan bool @@ -162,7 +162,7 @@ func NewBroker(addr string) *Broker { // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or // AlreadyConnected. If conf is nil, the result of NewConfig() is used. func (b *Broker) Open(conf *Config) error { - if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { + if !b.opened.CompareAndSwap(false, true) { return ErrAlreadyConnected } @@ -189,7 +189,7 @@ func (b *Broker) Open(conf *Config) error { if b.connErr != nil { Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } if conf.Net.TLS.Enable { @@ -254,7 +254,7 @@ func (b *Broker) Open(conf *Config) error { Logger.Printf("Error while closing connection to broker %s (due to SASL v0 auth error: %s): %s\n", b.addr, b.connErr, err) } b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } } @@ -275,7 +275,7 @@ func (b *Broker) Open(conf *Config) error { Logger.Printf("Error while closing connection to broker %s (due to SASL v1 auth error: %s): %s\n", b.addr, b.connErr, err) } b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } } @@ -349,8 +349,7 @@ func (b *Broker) Close() error { } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } - - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return err } diff --git a/client.go b/client.go index 34b78f473..0d510ee92 100644 --- a/client.go +++ b/client.go @@ -143,7 +143,7 @@ type client struct { // updateMetadataMs stores the time at which metadata was lasted updated. // Note: this accessed atomically so must be the first word in the struct // as per golang/go#41970 - updateMetadataMs int64 + updateMetadataMs atomic.Int64 conf *Config closer, closed chan none // for shutting down background metadata updater @@ -969,7 +969,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, time.Sleep(backoff) } - t := atomic.LoadInt64(&client.updateMetadataMs) + t := client.updateMetadataMs.Load() if time.Since(time.UnixMilli(t)) < backoff { return err } @@ -994,7 +994,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, req := NewMetadataRequest(client.conf.Version, topics) req.AllowAutoTopicCreation = allowAutoTopicCreation - atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli()) + client.updateMetadataMs.Store(time.Now().UnixMilli()) response, err := broker.GetMetadata(req) var kerror KError diff --git a/consumer.go b/consumer.go index 11ca02307..4c96af4fe 100644 --- a/consumer.go +++ b/consumer.go @@ -392,7 +392,7 @@ type PartitionConsumer interface { } type partitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG consumer *consumer conf *Config @@ -411,9 +411,9 @@ type partitionConsumer struct { responseResult error fetchSize int32 offset int64 - retries int32 + retries atomic.Int32 - paused int32 + paused atomic.Bool // accessed atomically, 0 = not paused, 1 = paused } var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing @@ -434,7 +434,7 @@ func (child *partitionConsumer) sendError(err error) { func (child *partitionConsumer) computeBackoff() time.Duration { if child.conf.Consumer.Retry.BackoffFunc != nil { - retries := atomic.AddInt32(&child.retries, 1) + retries := child.retries.Add(1) return child.conf.Consumer.Retry.BackoffFunc(int(retries)) } return child.conf.Consumer.Retry.Backoff @@ -508,7 +508,7 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) error { return err } - child.highWaterMarkOffset = newestOffset + child.highWaterMarkOffset.Store(newestOffset) oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) if err != nil { @@ -562,7 +562,7 @@ func (child *partitionConsumer) Close() error { } func (child *partitionConsumer) HighWaterMarkOffset() int64 { - return atomic.LoadInt64(&child.highWaterMarkOffset) + return child.highWaterMarkOffset.Load() } func (child *partitionConsumer) responseFeeder() { @@ -575,7 +575,7 @@ feederLoop: msgs, child.responseResult = child.parseResponse(response) if child.responseResult == nil { - atomic.StoreInt32(&child.retries, 0) + child.retries.Store(0) } for i, msg := range msgs { @@ -751,7 +751,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu // we got messages, reset our fetch size in case it was increased for a previous request child.fetchSize = child.conf.Consumer.Fetch.Default - atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) + child.highWaterMarkOffset.Store(block.HighWaterMarkOffset) // abortedProducerIDs contains producerID which message should be ignored as uncommitted // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) @@ -837,17 +837,17 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) { // Pause implements PartitionConsumer. func (child *partitionConsumer) Pause() { - atomic.StoreInt32(&child.paused, 1) + child.paused.Store(true) } // Resume implements PartitionConsumer. func (child *partitionConsumer) Resume() { - atomic.StoreInt32(&child.paused, 0) + child.paused.Store(false) } // IsPaused implements PartitionConsumer. func (child *partitionConsumer) IsPaused() bool { - return atomic.LoadInt32(&child.paused) == 1 + return child.paused.Load() } type brokerConsumer struct { diff --git a/mocks/consumer.go b/mocks/consumer.go index 77bb9195c..0f6f1ab2a 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -222,16 +222,17 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset highWatermarkOffset = 0 } - c.partitionConsumers[topic][partition] = &PartitionConsumer{ - highWaterMarkOffset: highWatermarkOffset, - t: c.t, - topic: topic, - partition: partition, - offset: offset, - messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), + consumer := &PartitionConsumer{ + t: c.t, + topic: topic, + partition: partition, + offset: offset, + messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), } + consumer.highWaterMarkOffset.Store(highWatermarkOffset) + c.partitionConsumers[topic][partition] = consumer } return c.partitionConsumers[topic][partition] @@ -247,7 +248,7 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset // Errors and Messages channel, you should specify what values will be provided on these // channels using YieldMessage and YieldError. type PartitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG suppressedHighWaterMarkOffset int64 l sync.Mutex t ErrorReporter @@ -345,7 +346,7 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { } func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { - return atomic.LoadInt64(&pc.highWaterMarkOffset) + return pc.highWaterMarkOffset.Load() } // Pause implements the Pause method from the sarama.PartitionConsumer interface. @@ -353,7 +354,7 @@ func (pc *PartitionConsumer) Pause() { pc.l.Lock() defer pc.l.Unlock() - pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset) + pc.suppressedHighWaterMarkOffset = pc.highWaterMarkOffset.Load() pc.paused = true } @@ -363,7 +364,7 @@ func (pc *PartitionConsumer) Resume() { pc.l.Lock() defer pc.l.Unlock() - pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset) + pc.highWaterMarkOffset.Store(pc.suppressedHighWaterMarkOffset) for len(pc.suppressedMessages) > 0 { msg := <-pc.suppressedMessages pc.messages <- msg @@ -400,7 +401,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1 pc.suppressedMessages <- msg } else { - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1 + msg.Offset = pc.highWaterMarkOffset.Add(1) - 1 pc.messages <- msg }