diff --git a/.github/workflows/apidiff.yml b/.github/workflows/apidiff.yml index 90de74ccc..1ffce8efc 100644 --- a/.github/workflows/apidiff.yml +++ b/.github/workflows/apidiff.yml @@ -29,7 +29,7 @@ jobs: if: github.base_ref steps: - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: stable - name: Add GOBIN to PATH @@ -37,7 +37,7 @@ jobs: - name: Install apidiff cmd run: go install golang.org/x/exp/cmd/apidiff@v0.0.0-20250813145105-42675adae3e6 - name: Checkout base code - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: ref: ${{ github.base_ref }} path: "base" @@ -46,7 +46,7 @@ jobs: run: apidiff -m -w ../baseline.bin . working-directory: "base" - name: Checkout updated code - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: path: "updated" persist-credentials: false diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 039f055c9..061e44f47 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ env: # Use the Go toolchain installed by setup-go GOTOOLCHAIN: local # renovate: datasource=github-releases depName=golangci/golangci-lint - GOLANGCI_LINT_VERSION: v2.5.0 + GOLANGCI_LINT_VERSION: v2.6.2 # renovate: datasource=github-releases depName=dominikh/go-tools STATICCHECK_VERSION: 2025.1.1 # renovate: datasource=github-releases depName=mfridman/tparse @@ -41,11 +41,11 @@ jobs: matrix: go-version: [stable] steps: - - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: ${{ matrix.go-version }} - name: Staticcheck @@ -59,7 +59,7 @@ jobs: - name: golangci-lint env: GOFLAGS: -tags=functional - uses: golangci/golangci-lint-action@e7fa5ac41e1cf5b7d48e45e42232ce7ada589601 # v9.1.0 + uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9.2.0 with: version: ${{ env.GOLANGCI_LINT_VERSION }} test: @@ -73,11 +73,11 @@ jobs: DEBUG: true GOFLAGS: -trimpath steps: - - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: ${{ matrix.go-version }} - name: Test (Unit) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 08d51ca40..974fe8098 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -35,18 +35,18 @@ jobs: language: ["actions", "go"] steps: - name: Checkout repository - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Initialize CodeQL - uses: github/codeql-action/init@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3 + uses: github/codeql-action/init@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4 with: languages: ${{ matrix.language }} - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: stable - name: Autobuild - uses: github/codeql-action/autobuild@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3 + uses: github/codeql-action/autobuild@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4 - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3 + uses: github/codeql-action/analyze@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4 diff --git a/.github/workflows/dependency-review.yml b/.github/workflows/dependency-review.yml index c0e92ceac..8ae85e295 100644 --- a/.github/workflows/dependency-review.yml +++ b/.github/workflows/dependency-review.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest steps: - name: 'Checkout Repository' - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: 'Dependency Review' diff --git a/.github/workflows/fuzz.yml b/.github/workflows/fuzz.yml index 7d67e88e9..f952e0ae5 100644 --- a/.github/workflows/fuzz.yml +++ b/.github/workflows/fuzz.yml @@ -30,11 +30,11 @@ jobs: env: GOFLAGS: -trimpath steps: - - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: stable - name: Run any fuzzing tests diff --git a/.github/workflows/fvt.yml b/.github/workflows/fvt.yml index 126751b9c..55a1a0302 100644 --- a/.github/workflows/fvt.yml +++ b/.github/workflows/fvt.yml @@ -36,14 +36,14 @@ jobs: KAFKA_VERSION: ${{ inputs.kafka-version }} SCALA_VERSION: ${{ inputs.scala-version }} steps: - - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Setup Docker uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 id: buildx - name: Build FVT Docker Image - uses: docker/bake-action@3acf805d94d93a86cce4ca44798a76464a75b88c # v6.9.0 + uses: docker/bake-action@5be5f02ff8819ecd3092ea6b2e6261c31774f2b4 # v6.10.0 with: builder: ${{ steps.buildx.outputs.name }} files: docker-compose.yml @@ -53,7 +53,7 @@ jobs: *.cache-from=type=gha,scope=fvt-kafka-${{ inputs.kafka-version }} *.cache-to=type=gha,scope=fvt-kafka-${{ inputs.kafka-version }},mode=max - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: ${{ inputs.go-version }} - name: Setup Docker Compose @@ -92,7 +92,7 @@ jobs: if [ -f "fvt-kafka-${KAFKA_VERSION}.pcap" ]; then sudo chmod a+r "fvt-kafka-${KAFKA_VERSION}.pcap"; fi - name: Upload pcap file if: always() - uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0 with: name: fvt-kafka-${{ inputs.kafka-version }}.pcap path: fvt-kafka-${{ inputs.kafka-version }}.pcap diff --git a/.github/workflows/i386.yml b/.github/workflows/i386.yml index 667616e97..1093d45ee 100644 --- a/.github/workflows/i386.yml +++ b/.github/workflows/i386.yml @@ -30,11 +30,11 @@ jobs: pull-requests: read # for golangci/golangci-lint-action to fetch pull requests runs-on: ubuntu-latest steps: - - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Setup Go - uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0 + uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0 with: go-version: stable - name: staticcheck diff --git a/.github/workflows/renovate-config.yml b/.github/workflows/renovate-config.yml index ad44407ec..99e52d4b4 100644 --- a/.github/workflows/renovate-config.yml +++ b/.github/workflows/renovate-config.yml @@ -12,10 +12,10 @@ jobs: validate: runs-on: ubuntu-latest steps: - - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: sparse-checkout: | .github/renovate.json5 sparse-checkout-cone-mode: false - - uses: actions/setup-node@2028fbc5c25fe9cf00d9f06a71cc4710d4507903 # v6.0.0 + - uses: actions/setup-node@395ad3262231945c25e8478fd5baf05154b1d79f # v6.1.0 - run: npx --package=renovate@latest -- renovate-config-validator diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 5dd214f3c..a21349edb 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -38,7 +38,7 @@ jobs: steps: - name: "Checkout code" - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false @@ -65,7 +65,7 @@ jobs: # Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF # format to the repository Actions tab. - name: "Upload artifact" - uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 + uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0 with: name: SARIF file path: results.sarif @@ -73,6 +73,6 @@ jobs: # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3 + uses: github/codeql-action/upload-sarif@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4 with: sarif_file: results.sarif diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index ec7ef9c42..22dc52b59 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest steps: # pinned to main commit to make use of https://github.com/actions/stale/pull/1033 - - uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0 + - uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1 with: ascending: true days-before-stale: 90 diff --git a/admin.go b/admin.go index 42f6fa0c4..da220ed2a 100644 --- a/admin.go +++ b/admin.go @@ -83,6 +83,7 @@ type ClusterAdmin interface { // This operation is not transactional so it may succeed or fail. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher. + // // Deprecated: Use CreateACLs instead. CreateACL(resource Resource, acl Acl) error diff --git a/async_producer_test.go b/async_producer_test.go index 2855b25f3..e87381ace 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -629,9 +629,14 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { config.Producer.Return.Successes = true config.Producer.Retry.Max = 4 - backoffCalled := make([]int32, config.Producer.Retry.Max+1) + // We use a pointer to atomic to prevent the possibility of a reallocation causing a copy. + backoffCalled := make([]*atomic.Int32, config.Producer.Retry.Max+1) + for i := range backoffCalled { + backoffCalled[i] = new(atomic.Int32) + } + config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration { - atomic.AddInt32(&backoffCalled[retries-1], 1) + backoffCalled[retries-1].Add(1) return 0 } producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -672,11 +677,11 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { closeProducer(t, producer) for i := 0; i < config.Producer.Retry.Max; i++ { - if atomic.LoadInt32(&backoffCalled[i]) != 1 { + if backoffCalled[i].Load() != 1 { t.Errorf("expected one retry attempt #%d", i) } } - if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 { + if backoffCalled[config.Producer.Retry.Max].Load() != 0 { t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max) } } @@ -811,21 +816,21 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { // The seed broker only handles Metadata request in bootstrap seedBroker.setHandler(metadataRequestHandlerFunc) - var emptyValues int32 = 0 + var emptyValues atomic.Int32 countRecordsWithEmptyValue := func(req *request) { preq := req.body.(*ProduceRequest) if batch := preq.records["my_topic"][0].RecordBatch; batch != nil { for _, record := range batch.Records { if len(record.Value) == 0 { - atomic.AddInt32(&emptyValues, 1) + emptyValues.Add(1) } } } if batch := preq.records["my_topic"][0].MsgSet; batch != nil { for _, record := range batch.Messages { if len(record.Msg.Value) == 0 { - atomic.AddInt32(&emptyValues, 1) + emptyValues.Add(1) } } } @@ -904,7 +909,7 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { closeProducerWithTimeout(t, producer, 5*time.Second) - if emptyValues := atomic.LoadInt32(&emptyValues); emptyValues > 0 { + if emptyValues := emptyValues.Load(); emptyValues > 0 { t.Fatalf("%d empty values", emptyValues) } } diff --git a/client.go b/client.go index 0dc29e225..bc5e646b6 100644 --- a/client.go +++ b/client.go @@ -685,9 +685,16 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } func (client *client) updateBroker(brokers []*Broker) { + if client.brokers == nil { + return + } + currentBroker := make(map[int32]*Broker, len(brokers)) for _, broker := range brokers { + if broker == nil { + continue + } currentBroker[broker.ID()] = broker if client.brokers[broker.ID()] == nil { // add new broker client.brokers[broker.ID()] = broker diff --git a/client_test.go b/client_test.go index 9d6242fa3..301d97e49 100644 --- a/client_test.go +++ b/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/rcrowley/go-metrics" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestSimpleClient(t *testing.T) { @@ -361,12 +362,12 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) - retryCount := int32(0) + var retryCount atomic.Int32 config := NewTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration { - atomic.AddInt32(&retryCount, 1) + retryCount.Add(1) return 0 } client, err := NewClient([]string{seedBroker.Addr()}, config) @@ -387,7 +388,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { safeClose(t, client) seedBroker.Close() - actualRetryCount := atomic.LoadInt32(&retryCount) + actualRetryCount := retryCount.Load() if actualRetryCount != 1 { t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount) } @@ -594,7 +595,7 @@ func TestClientRefreshBrokers(t *testing.T) { newSeedBrokers := []string{"localhost:12345"} _ = client.RefreshBrokers(newSeedBrokers) - if client.seedBrokers[0].addr != newSeedBrokers[0] { + if len(client.seedBrokers) == 0 || client.seedBrokers[0].addr != newSeedBrokers[0] { t.Error("Seed broker not updated") } if len(client.Brokers()) != 0 { @@ -1330,3 +1331,70 @@ func TestMetricsCleanup(t *testing.T) { t.Errorf("excepted 1 metric, found: %v", all) } } + +func TestUpdateBroker(t *testing.T) { + t.Run("closed client doesn't panic", func(t *testing.T) { + c := &client{} + fn := func() { + c.updateBroker(nil) + c.updateBroker([]*Broker{ + { + id: 0, + addr: "127.0.0.1:9092", + }, + }) + } + require.NotPanics(t, fn) + }) + + t.Run("open client adds new broker entries", func(t *testing.T) { + c := &client{ + brokers: make(map[int32]*Broker), + } + fn := func() { + c.updateBroker([]*Broker{ + { + id: 0, + addr: "127.0.0.1:9092", + }, + }) + } + require.NotPanics(t, fn) + require.Len(t, c.brokers, 1) + assert.Equal(t, 0, int(c.brokers[0].ID())) + assert.Equal(t, "127.0.0.1:9092", c.brokers[0].Addr()) + }) + + t.Run("open client adds, updates and removes broker entries", func(t *testing.T) { + c := &client{ + brokers: map[int32]*Broker{ + 0: { + id: 0, + addr: "127.0.0.1:9092", + }, + 1: { + id: 1, + addr: "127.0.0.1:9093", + }, + }, + } + fn := func() { + c.updateBroker([]*Broker{ + { + id: 1, + addr: "127.0.0.1:19093", // new addr for existing broker + }, + { + id: 2, + addr: "127.0.0.1:19094", + }, + }) + } + require.NotPanics(t, fn) + require.Len(t, c.brokers, 2) + assert.Equal(t, 1, int(c.brokers[1].ID())) + assert.Equal(t, "127.0.0.1:19093", c.brokers[1].Addr()) + assert.Equal(t, 2, int(c.brokers[2].ID())) + assert.Equal(t, "127.0.0.1:19094", c.brokers[2].Addr()) + }) +} diff --git a/config.go b/config.go index 5bac2b50a..80758e9bb 100644 --- a/config.go +++ b/config.go @@ -327,6 +327,7 @@ type Config struct { } Rebalance struct { // Strategy for allocating topic partitions to members. + // // Deprecated: Strategy exists for historical compatibility // and should not be used. Please use GroupStrategies. Strategy BalanceStrategy diff --git a/consumer_test.go b/consumer_test.go index 5f796eeb0..4ffb4d9eb 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -491,12 +491,12 @@ func TestConsumerLeaderRefreshError(t *testing.T) { } func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { - var calls int32 = 0 + var calls atomic.Int32 config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration { - atomic.AddInt32(&calls, 1) + calls.Add(1) return 200 * time.Millisecond } config.Consumer.Return.Errors = true @@ -505,7 +505,7 @@ func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { runConsumerLeaderRefreshErrorTestWithConfig(t, config) // we expect at least one call to our backoff function - if calls == 0 { + if calls.Load() == 0 { t.Fail() } } diff --git a/examples/consumergroup/go.mod b/examples/consumergroup/go.mod index 7b0ca562f..7fa672a9a 100644 --- a/examples/consumergroup/go.mod +++ b/examples/consumergroup/go.mod @@ -16,7 +16,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/examples/consumergroup/go.sum b/examples/consumergroup/go.sum index b8e525726..7486c1530 100644 --- a/examples/consumergroup/go.sum +++ b/examples/consumergroup/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/examples/exactly_once/go.mod b/examples/exactly_once/go.mod index 0976d1c3d..132b26bb0 100644 --- a/examples/exactly_once/go.mod +++ b/examples/exactly_once/go.mod @@ -16,7 +16,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/examples/exactly_once/go.sum b/examples/exactly_once/go.sum index ff7614215..5aa6fda55 100644 --- a/examples/exactly_once/go.sum +++ b/examples/exactly_once/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/examples/http_server/go.mod b/examples/http_server/go.mod index c9101b0b5..bae92128b 100644 --- a/examples/http_server/go.mod +++ b/examples/http_server/go.mod @@ -16,7 +16,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect golang.org/x/crypto v0.45.0 // indirect diff --git a/examples/http_server/go.sum b/examples/http_server/go.sum index ff7614215..5aa6fda55 100644 --- a/examples/http_server/go.sum +++ b/examples/http_server/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/examples/interceptors/go.mod b/examples/interceptors/go.mod index 27e17a156..f10003ae8 100644 --- a/examples/interceptors/go.mod +++ b/examples/interceptors/go.mod @@ -25,7 +25,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect go.opentelemetry.io/otel/metric v1.29.0 // indirect diff --git a/examples/interceptors/go.sum b/examples/interceptors/go.sum index aa2f9bdd8..b30ab660b 100644 --- a/examples/interceptors/go.sum +++ b/examples/interceptors/go.sum @@ -37,8 +37,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/examples/sasl_scram_client/go.mod b/examples/sasl_scram_client/go.mod index c5b81490e..1b81aee54 100644 --- a/examples/sasl_scram_client/go.mod +++ b/examples/sasl_scram_client/go.mod @@ -19,7 +19,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/examples/sasl_scram_client/go.sum b/examples/sasl_scram_client/go.sum index a42a7a7d4..0d8e35d5d 100644 --- a/examples/sasl_scram_client/go.sum +++ b/examples/sasl_scram_client/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/examples/txn_producer/go.mod b/examples/txn_producer/go.mod index 4c2fad1b6..ad95d3186 100644 --- a/examples/txn_producer/go.mod +++ b/examples/txn_producer/go.mod @@ -19,7 +19,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/net v0.47.0 // indirect diff --git a/examples/txn_producer/go.sum b/examples/txn_producer/go.sum index ff7614215..5aa6fda55 100644 --- a/examples/txn_producer/go.sum +++ b/examples/txn_producer/go.sum @@ -28,8 +28,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 249e1ebf3..be4045e44 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -348,20 +348,20 @@ type testFuncConsumerGroupMessage struct { type testFuncConsumerGroupSink struct { msgs chan testFuncConsumerGroupMessage - count int32 + count atomic.Int32 } func (s *testFuncConsumerGroupSink) Len() int { if s == nil { return -1 } - return int(atomic.LoadInt32(&s.count)) + return int(s.count.Load()) } func (s *testFuncConsumerGroupSink) Push(clientID string, m *ConsumerMessage) { if s != nil { s.msgs <- testFuncConsumerGroupMessage{ClientID: clientID, ConsumerMessage: m} - atomic.AddInt32(&s.count, 1) + s.count.Add(1) } } @@ -378,18 +378,19 @@ func (s *testFuncConsumerGroupSink) Close() map[string][]string { type testFuncConsumerGroupMember struct { ConsumerGroup - clientID string - claims map[string]int - generationId int32 - state int32 - handlers int32 - errs []error - maxMessages int32 - isCapped bool - sink *testFuncConsumerGroupSink + t *testing.T + clientID string + isCapped bool + sink *testFuncConsumerGroupSink - t *testing.T - mu sync.RWMutex + generationId atomic.Int32 + state atomic.Int32 + handlers atomic.Int32 + maxMessages atomic.Int32 + + mu sync.RWMutex + claims map[string]int + errs []error } func defaultConfig(clientID string) *Config { @@ -443,11 +444,11 @@ func runTestFuncConsumerGroupMemberWithConfig( ConsumerGroup: group, clientID: config.ClientID, claims: make(map[string]int), - maxMessages: maxMessages, isCapped: maxMessages != 0, sink: sink, t: t, } + member.maxMessages.Store(maxMessages) go member.loop(topics) return member } @@ -480,7 +481,7 @@ func (m *testFuncConsumerGroupMember) WaitForState(expected int32) { m.t.Helper() m.waitFor("state", expected, func() (interface{}, error) { - return atomic.LoadInt32(&m.state), nil + return m.state.Load(), nil }) } @@ -488,7 +489,7 @@ func (m *testFuncConsumerGroupMember) WaitForHandlers(expected int) { m.t.Helper() m.waitFor("handlers", expected, func() (interface{}, error) { - return int(atomic.LoadInt32(&m.handlers)), nil + return int(m.handlers.Load()), nil }) } @@ -518,17 +519,17 @@ func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error { m.mu.Unlock() // store generationID - atomic.StoreInt32(&m.generationId, s.GenerationID()) + m.generationId.Store(s.GenerationID()) // enter post-setup state - atomic.StoreInt32(&m.state, 2) + m.state.Store(2) return nil } func (m *testFuncConsumerGroupMember) Cleanup(s ConsumerGroupSession) error { m.t.Logf("Consumer %s: session ended %s in generation %d", m.clientID, s.MemberID(), s.GenerationID()) // enter post-cleanup state - atomic.StoreInt32(&m.state, 3) + m.state.Store(3) return nil } @@ -538,8 +539,8 @@ func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c Con m.t.Errorf("panic in ConsumeClaim: %v", r) } }() - atomic.AddInt32(&m.handlers, 1) - defer atomic.AddInt32(&m.handlers, -1) + m.handlers.Add(1) + defer m.handlers.Add(-1) consumed := 0 for { @@ -549,7 +550,7 @@ func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c Con m.t.Logf("Consumer %s: message channel closed, consumed %d messages", m.clientID, consumed) return nil } - if n := atomic.AddInt32(&m.maxMessages, -1); m.isCapped && n < 0 { + if n := m.maxMessages.Add(-1); m.isCapped && n < 0 { m.t.Logf("Consumer %s: reached max messages, consumed %d messages", m.clientID, consumed) return nil } @@ -597,7 +598,7 @@ func (m *testFuncConsumerGroupMember) loop(topics []string) { if r := recover(); r != nil { m.t.Errorf("panic in loop for %s: %v", m.clientID, r) } - atomic.StoreInt32(&m.state, 4) + m.state.Store(4) }() go func() { @@ -614,7 +615,7 @@ func (m *testFuncConsumerGroupMember) loop(topics []string) { ctx := context.Background() for { // set state to pre-consume - atomic.StoreInt32(&m.state, 1) + m.state.Store(1) if err := m.Consume(ctx, topics, m); errors.Is(err, ErrClosedConsumerGroup) { m.t.Logf("Consumer %s: closed consumer group", m.clientID) @@ -634,7 +635,7 @@ func (m *testFuncConsumerGroupMember) loop(topics []string) { } // return if capped - if n := atomic.LoadInt32(&m.maxMessages); m.isCapped && n < 0 { + if n := m.maxMessages.Load(); m.isCapped && n < 0 { m.t.Logf("Consumer %s: reached max messages, returning from loop", m.clientID) return } @@ -651,7 +652,7 @@ func newTestStatefulStrategy(t *testing.T) *testStatefulStrategy { type testStatefulStrategy struct { BalanceStrategy t *testing.T - initial int32 + initial atomic.Int32 state sync.Map } @@ -664,7 +665,7 @@ func (h *testStatefulStrategy) Plan(members map[string]ConsumerGroupMemberMetada for memberID, metadata := range members { if !strings.HasSuffix(string(metadata.UserData), "-stateful") { metadata.UserData = []byte(string(metadata.UserData) + "-stateful") - atomic.AddInt32(&h.initial, 1) + h.initial.Add(1) } h.state.Store(memberID, metadata.UserData) } @@ -680,7 +681,7 @@ func (h *testStatefulStrategy) AssignmentData(memberID string, topics map[string func (h *testStatefulStrategy) AssertInitialValues(count int32) { h.t.Helper() - actual := atomic.LoadInt32(&h.initial) + actual := h.initial.Load() if actual != count { h.t.Fatalf("unexpected count of initial values: %d, expected: %d", actual, count) } diff --git a/functional_consumer_staticmembership_test.go b/functional_consumer_staticmembership_test.go index 2cab56cc3..28c1f287a 100644 --- a/functional_consumer_staticmembership_test.go +++ b/functional_consumer_staticmembership_test.go @@ -7,7 +7,6 @@ import ( "errors" "math" "reflect" - "sync/atomic" "testing" ) @@ -107,7 +106,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { t.Errorf("should have 2 members in group , got %v\n", len(res1[0].Members)) } - generationId1 := m1.generationId + generationId1 := m1.generationId.Load() // shut down m2, membership should not change (we didn't leave group when close) m2.AssertCleanShutdown() @@ -123,7 +122,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { t.Errorf("group description be the same before %s, after %s", res1Bytes, res2Bytes) } - generationId2 := atomic.LoadInt32(&m1.generationId) + generationId2 := m1.generationId.Load() if generationId2 != generationId1 { t.Errorf("m1 generation should not increase expect %v, actual %v", generationId1, generationId2) } @@ -147,7 +146,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { t.Errorf("should have 2 members in group , got %v\n", len(res3[0].Members)) } - generationId3 := atomic.LoadInt32(&m1.generationId) + generationId3 := m1.generationId.Load() if generationId3 != generationId1 { t.Errorf("m1 generation should not increase expect %v, actual %v", generationId1, generationId2) } @@ -162,7 +161,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { } m1.WaitForHandlers(4) - generationId4 := atomic.LoadInt32(&m1.generationId) + generationId4 := m1.generationId.Load() if generationId4 == generationId1 { t.Errorf("m1 generation should increase expect %v, actual %v", generationId1, generationId2) } diff --git a/go.mod b/go.mod index 1f733cd44..477ad6ad3 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/fortytw2/leaktest v1.3.0 github.com/jcmturner/gofork v1.7.6 github.com/jcmturner/gokrb5/v8 v8.4.4 - github.com/klauspost/compress v1.18.1 + github.com/klauspost/compress v1.18.2 github.com/pierrec/lz4/v4 v4.1.22 github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 github.com/stretchr/testify v1.11.1 diff --git a/go.sum b/go.sum index dcf07d123..0f9f1ac91 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= diff --git a/mocks/consumer.go b/mocks/consumer.go index 0f6f1ab2a..91074e2e1 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -398,7 +398,8 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Partition = pc.partition if pc.paused { - msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1 + msg.Offset = pc.suppressedHighWaterMarkOffset + pc.suppressedHighWaterMarkOffset++ pc.suppressedMessages <- msg } else { msg.Offset = pc.highWaterMarkOffset.Add(1) - 1 diff --git a/offset_manager_test.go b/offset_manager_test.go index a1f287637..a981b2adf 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -112,7 +112,7 @@ func TestNewOffsetManager(t *testing.T) { // Test that the correct sequence of offset commit messages is sent to a broker when // multiple goroutines for a group are committing offsets at the same time func TestOffsetManagerCommitSequence(t *testing.T) { - lastOffset := map[int32]int64{} + lastOffset := make(map[int32]int64) var outOfOrder atomic.Pointer[string] seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -384,9 +384,9 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { - retryCount := int32(0) + var retryCount atomic.Int32 backoff := func(retries, maxRetries int) time.Duration { - atomic.AddInt32(&retryCount, 1) + retryCount.Add(1) return 0 } om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewTestConfig()) @@ -420,7 +420,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { safeClose(t, om) safeClose(t, testClient) - if atomic.LoadInt32(&retryCount) == 0 { + if retryCount.Load() == 0 { t.Fatal("Expected at least one retry") } }