Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/apidiff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ jobs:
if: github.base_ref
steps:
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: stable
- name: Add GOBIN to PATH
run: echo "$(go env GOPATH)/bin" >>$GITHUB_PATH
- name: Install apidiff cmd
run: go install golang.org/x/exp/cmd/apidiff@v0.0.0-20250813145105-42675adae3e6
- name: Checkout base code
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
ref: ${{ github.base_ref }}
path: "base"
Expand All @@ -46,7 +46,7 @@ jobs:
run: apidiff -m -w ../baseline.bin .
working-directory: "base"
- name: Checkout updated code
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
path: "updated"
persist-credentials: false
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env:
# Use the Go toolchain installed by setup-go
GOTOOLCHAIN: local
# renovate: datasource=github-releases depName=golangci/golangci-lint
GOLANGCI_LINT_VERSION: v2.5.0
GOLANGCI_LINT_VERSION: v2.6.2
# renovate: datasource=github-releases depName=dominikh/go-tools
STATICCHECK_VERSION: 2025.1.1
# renovate: datasource=github-releases depName=mfridman/tparse
Expand All @@ -41,11 +41,11 @@ jobs:
matrix:
go-version: [stable]
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: ${{ matrix.go-version }}
- name: Staticcheck
Expand All @@ -59,7 +59,7 @@ jobs:
- name: golangci-lint
env:
GOFLAGS: -tags=functional
uses: golangci/golangci-lint-action@e7fa5ac41e1cf5b7d48e45e42232ce7ada589601 # v9.1.0
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9.2.0
with:
version: ${{ env.GOLANGCI_LINT_VERSION }}
test:
Expand All @@ -73,11 +73,11 @@ jobs:
DEBUG: true
GOFLAGS: -trimpath
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: ${{ matrix.go-version }}
- name: Test (Unit)
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ jobs:
language: ["actions", "go"]
steps:
- name: Checkout repository
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Initialize CodeQL
uses: github/codeql-action/init@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
uses: github/codeql-action/init@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4
with:
languages: ${{ matrix.language }}
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: stable
- name: Autobuild
uses: github/codeql-action/autobuild@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
uses: github/codeql-action/autobuild@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
uses: github/codeql-action/analyze@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4
2 changes: 1 addition & 1 deletion .github/workflows/dependency-review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: 'Dependency Review'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/fuzz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ jobs:
env:
GOFLAGS: -trimpath
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: stable
- name: Run any fuzzing tests
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/fvt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ jobs:
KAFKA_VERSION: ${{ inputs.kafka-version }}
SCALA_VERSION: ${{ inputs.scala-version }}
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Setup Docker
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
id: buildx
- name: Build FVT Docker Image
uses: docker/bake-action@3acf805d94d93a86cce4ca44798a76464a75b88c # v6.9.0
uses: docker/bake-action@5be5f02ff8819ecd3092ea6b2e6261c31774f2b4 # v6.10.0
with:
builder: ${{ steps.buildx.outputs.name }}
files: docker-compose.yml
Expand All @@ -53,7 +53,7 @@ jobs:
*.cache-from=type=gha,scope=fvt-kafka-${{ inputs.kafka-version }}
*.cache-to=type=gha,scope=fvt-kafka-${{ inputs.kafka-version }},mode=max
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: ${{ inputs.go-version }}
- name: Setup Docker Compose
Expand Down Expand Up @@ -92,7 +92,7 @@ jobs:
if [ -f "fvt-kafka-${KAFKA_VERSION}.pcap" ]; then sudo chmod a+r "fvt-kafka-${KAFKA_VERSION}.pcap"; fi
- name: Upload pcap file
if: always()
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: fvt-kafka-${{ inputs.kafka-version }}.pcap
path: fvt-kafka-${{ inputs.kafka-version }}.pcap
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/i386.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ jobs:
pull-requests: read # for golangci/golangci-lint-action to fetch pull requests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Setup Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: stable
- name: staticcheck
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/renovate-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
sparse-checkout: |
.github/renovate.json5
sparse-checkout-cone-mode: false
- uses: actions/setup-node@2028fbc5c25fe9cf00d9f06a71cc4710d4507903 # v6.0.0
- uses: actions/setup-node@395ad3262231945c25e8478fd5baf05154b1d79f # v6.1.0
- run: npx --package=renovate@latest -- renovate-config-validator
6 changes: 3 additions & 3 deletions .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

steps:
- name: "Checkout code"
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false

Expand All @@ -65,14 +65,14 @@ jobs:
# Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF
# format to the repository Actions tab.
- name: "Upload artifact"
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: SARIF file
path: results.sarif
retention-days: 5

# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@014f16e7ab1402f30e7c3329d33797e7948572db # v4.31.3
uses: github/codeql-action/upload-sarif@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v4.31.4
with:
sarif_file: results.sarif
2 changes: 1 addition & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# pinned to main commit to make use of https://github.com/actions/stale/pull/1033
- uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0
- uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1
with:
ascending: true
days-before-stale: 90
Expand Down
1 change: 1 addition & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ClusterAdmin interface {
// This operation is not transactional so it may succeed or fail.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
//
// Deprecated: Use CreateACLs instead.
CreateACL(resource Resource, acl Acl) error

Expand Down
21 changes: 13 additions & 8 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,14 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4

backoffCalled := make([]int32, config.Producer.Retry.Max+1)
// We use a pointer to atomic to prevent the possibility of a reallocation causing a copy.
backoffCalled := make([]*atomic.Int32, config.Producer.Retry.Max+1)
for i := range backoffCalled {
backoffCalled[i] = new(atomic.Int32)
}

config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
atomic.AddInt32(&backoffCalled[retries-1], 1)
backoffCalled[retries-1].Add(1)
return 0
}
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
Expand Down Expand Up @@ -672,11 +677,11 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
closeProducer(t, producer)

for i := 0; i < config.Producer.Retry.Max; i++ {
if atomic.LoadInt32(&backoffCalled[i]) != 1 {
if backoffCalled[i].Load() != 1 {
t.Errorf("expected one retry attempt #%d", i)
}
}
if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
if backoffCalled[config.Producer.Retry.Max].Load() != 0 {
t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
}
}
Expand Down Expand Up @@ -811,21 +816,21 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
// The seed broker only handles Metadata request in bootstrap
seedBroker.setHandler(metadataRequestHandlerFunc)

var emptyValues int32 = 0
var emptyValues atomic.Int32

countRecordsWithEmptyValue := func(req *request) {
preq := req.body.(*ProduceRequest)
if batch := preq.records["my_topic"][0].RecordBatch; batch != nil {
for _, record := range batch.Records {
if len(record.Value) == 0 {
atomic.AddInt32(&emptyValues, 1)
emptyValues.Add(1)
}
}
}
if batch := preq.records["my_topic"][0].MsgSet; batch != nil {
for _, record := range batch.Messages {
if len(record.Msg.Value) == 0 {
atomic.AddInt32(&emptyValues, 1)
emptyValues.Add(1)
}
}
}
Expand Down Expand Up @@ -904,7 +909,7 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {

closeProducerWithTimeout(t, producer, 5*time.Second)

if emptyValues := atomic.LoadInt32(&emptyValues); emptyValues > 0 {
if emptyValues := emptyValues.Load(); emptyValues > 0 {
t.Fatalf("%d empty values", emptyValues)
}
}
Expand Down
7 changes: 7 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,9 +685,16 @@ func (client *client) randomizeSeedBrokers(addrs []string) {
}

func (client *client) updateBroker(brokers []*Broker) {
if client.brokers == nil {
return
}

currentBroker := make(map[int32]*Broker, len(brokers))

for _, broker := range brokers {
if broker == nil {
continue
}
currentBroker[broker.ID()] = broker
if client.brokers[broker.ID()] == nil { // add new broker
client.brokers[broker.ID()] = broker
Expand Down
Loading
Loading