Skip to content

Commit

Permalink
fix(admin): make DeleteRecords err consistent
Browse files Browse the repository at this point in the history
Also prevent a few other creations of new errors where the existing
KError should have been preserved for errors.Is checking.

Fixes IBM#2225
  • Loading branch information
dnwe authored and k-wall committed May 23, 2022
1 parent 5ed395a commit de55c54
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
38 changes: 21 additions & 17 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,13 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][
errs = append(errs, err)
} else {
if rsp.ErrorCode > 0 {
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
errs = append(errs, rsp.ErrorCode)
}

for topic, topicErrors := range rsp.Errors {
for partition, partitionError := range topicErrors {
if !errors.Is(partitionError.errorCode, ErrNoError) {
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
errs = append(errs, errors.New(errStr))
errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
}
}
}
Expand Down Expand Up @@ -577,40 +576,45 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
if topic == "" {
return ErrInvalidTopic
}
errs := make([]error, 0)
partitionPerBroker := make(map[*Broker][]int32)
for partition := range partitionOffsets {
broker, err := ca.client.Leader(topic, partition)
if err != nil {
return err
errs = append(errs, err)
continue
}
partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
}
errs := make([]error, 0)
for broker, partitions := range partitionPerBroker {
topics := make(map[string]*DeleteRecordsRequestTopic)
recordsToDelete := make(map[int32]int64)
for _, p := range partitions {
recordsToDelete[p] = partitionOffsets[p]
}
topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
topics[topic] = &DeleteRecordsRequestTopic{
PartitionOffsets: recordsToDelete,
}
request := &DeleteRecordsRequest{
Topics: topics,
Timeout: ca.conf.Admin.Timeout,
}

rsp, err := broker.DeleteRecords(request)
if err != nil {
errs = append(errs, err)
} else {
deleteRecordsResponseTopic, ok := rsp.Topics[topic]
if !ok {
errs = append(errs, ErrIncompleteResponse)
} else {
for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
}
}
continue
}

deleteRecordsResponseTopic, ok := rsp.Topics[topic]
if !ok {
errs = append(errs, ErrIncompleteResponse)
continue
}

for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
errs = append(errs, deleteRecordsResponsePartition.Err)
continue
}
}
}
Expand Down
47 changes: 46 additions & 1 deletion admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
}
}

func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
func TestClusterAdminDeleteRecordsWithUnsupportedVersion(t *testing.T) {
topicName := "my_topic"
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down Expand Up @@ -643,6 +643,51 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
}
}

func TestClusterAdminDeleteRecordsWithLeaderNotAvailable(t *testing.T) {
topicName := "my_topic"
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetLeader("my_topic", 1, -1).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

config := NewTestConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

partitionOffset := make(map[int32]int64)
partitionOffset[1] = 1000

err = admin.DeleteRecords(topicName, partitionOffset)
if err == nil {
t.Fatal("expected an ErrDeleteRecords")
}

if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
t.Fatal(err)
}

if !errors.Is(err, ErrDeleteRecords) {
t.Fatal(err)
}

if !errors.Is(err, ErrLeaderNotAvailable) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDescribeConfig(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down

0 comments on commit de55c54

Please sign in to comment.