diff --git a/CHANGELOG.md b/CHANGELOG.md index d13041337..e03748c92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ This is a feature release. `nil` slice in `AdminClient.ListConsumerGroupOffsets`, when earlier it was not processing that correctly (#985, @alexandredantas). * Deprecate m.LeaderEpoch in favor of m.TopicPartition.LeaderEpoch (#1012). + * [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): + User SASL/SCRAM credentials alteration and description (#1004). confluent-kafka-go is based on librdkafka v2.2.0, see the [librdkafka v2.2.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0-RC2) diff --git a/examples/.gitignore b/examples/.gitignore index 3519e7b27..df267fe13 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -9,6 +9,8 @@ admin_describe_config/admin_describe_config admin_describe_consumer_groups/admin_describe_consumer_groups admin_list_consumer_groups/admin_list_consumer_groups admin_list_consumer_group_offsets/admin_list_consumer_group_offsets +admin_describe_user_scram_credentials/admin_describe_user_scram_credentials +admin_alter_user_scram_credentials/admin_alter_user_scram_credentials avro_generic_consumer_example/avro_generic_consumer_example avro_generic_producer_example/avro_generic_producer_example avro_specific_consumer_example/avro_specific_consumer_example diff --git a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go new file mode 100644 index 000000000..e693d5ef2 --- /dev/null +++ b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go @@ -0,0 +1,151 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Alter user SCRAM credentials example +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func usage(reason string) { + fmt.Fprintf(os.Stderr, + "Error: %s\n", + reason) + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + "UPSERT "+ + " "+ + "[UPSERT "+ + " DELETE ...]\n", + os.Args[0]) + os.Exit(1) +} + +func main() { + // 2 + variable arguments + nArgs := len(os.Args) + + if nArgs < 2 { + usage("bootstrap-servers required") + } + + if nArgs == 2 { + usage("at least one upsert/delete required") + } + + bootstrapServers := os.Args[1] + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + var upsertions []kafka.UserScramCredentialUpsertion + var deletions []kafka.UserScramCredentialDeletion + + i := 2 + nAlterations := 0 + for i < nArgs { + switch os.Args[i] { + case "UPSERT": + if i+5 >= nArgs { + usage(fmt.Sprintf( + "wrong argument count for alteration %d: expected 6, found %d", + nAlterations, nArgs-i)) + } + + user := os.Args[i+1] + mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2]) + if err != nil { + usage(err.Error()) + } + iterations, err := strconv.Atoi(os.Args[i+3]) + if err != nil { + usage(err.Error()) + } + password := []byte(os.Args[i+4]) + salt := []byte(os.Args[i+5]) + // if salt is an empty string, + // set it to nil to generate it randomly. + if len(salt) == 0 { + salt = nil + } + upsertions = append(upsertions, + kafka.UserScramCredentialUpsertion{ + User: user, + ScramCredentialInfo: kafka.ScramCredentialInfo{ + Mechanism: mechanism, + Iterations: iterations, + }, + Password: password, + Salt: salt, + }) + i += 6 + case "DELETE": + if i+2 >= nArgs { + usage(fmt.Sprintf( + "wrong argument count for alteration %d: expected 3, found %d", + nAlterations, nArgs-i)) + } + + user := os.Args[i+1] + mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2]) + if err != nil { + usage(err.Error()) + } + deletions = append(deletions, + kafka.UserScramCredentialDeletion{ + User: user, + Mechanism: mechanism, + }) + i += 3 + default: + usage(fmt.Sprintf("unknown alteration %s", os.Args[i])) + } + nAlterations++ + } + + alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, deletions) + if alterErr != nil { + fmt.Printf("Failed to alter user scram credentials: %s\n", alterErr) + os.Exit(1) + } + + for username, err := range alterRes.Errors { + fmt.Printf("Username: %s\n", username) + if err.Code() == kafka.ErrNoError { + fmt.Printf(" Success\n") + } else { + fmt.Printf(" Error[%d]: %s\n", err.Code(), err.String()) + } + } + +} diff --git a/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go new file mode 100644 index 000000000..27896bb07 --- /dev/null +++ b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go @@ -0,0 +1,83 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Describe user SCRAM credentials example +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func usage(reason string) { + fmt.Fprintf(os.Stderr, + "Error: %s\n", + reason) + fmt.Fprintf(os.Stderr, + "Usage: %s ...\n", + os.Args[0]) + os.Exit(1) +} + +func main() { + + // 2 + n arguments + nArgs := len(os.Args) + + if nArgs < 2 { + usage("bootstrap-servers required") + } + + bootstrapServers := os.Args[1] + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, os.Args[2:]) + if describeErr != nil { + fmt.Printf("Failed to describe user scram credentials: %s\n", describeErr) + os.Exit(1) + } + + for username, description := range describeRes.Descriptions { + fmt.Printf("Username: %s \n", username) + if description.Error.Code() == kafka.ErrNoError { + for i := 0; i < len(description.ScramCredentialInfos); i++ { + fmt.Printf(" Mechanism: %s Iterations: %d\n", + description.ScramCredentialInfos[i].Mechanism, + description.ScramCredentialInfos[i].Iterations) + } + } else { + fmt.Printf(" Error[%d]: %s\n", + description.Error.Code(), description.Error.String()) + } + } + +} diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 34cc5ed16..9c6caf71c 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -92,6 +92,20 @@ ConsumerGroupDescription_by_idx(const rd_kafka_ConsumerGroupDescription_t **resu return result_groups[idx]; } +static const rd_kafka_UserScramCredentialsDescription_t * +DescribeUserScramCredentials_result_description_by_idx(const rd_kafka_UserScramCredentialsDescription_t **descriptions, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return descriptions[idx]; +} + +static const rd_kafka_AlterUserScramCredentials_result_response_t* +AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCredentials_result_response_t **responses, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return responses[idx]; +} + static const rd_kafka_error_t * error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) { if (idx >= cnt) @@ -734,6 +748,105 @@ type DescribeACLsResult struct { // DeleteACLsResult provides delete ACLs result or error information. type DeleteACLsResult = DescribeACLsResult +// ScramMechanism enumerates SASL/SCRAM mechanisms. +// Used by `AdminClient.AlterUserScramCredentials` +// and `AdminClient.DescribeUserScramCredentials`. +type ScramMechanism int + +const ( + // ScramMechanismUnknown - Unknown SASL/SCRAM mechanism + ScramMechanismUnknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) + // ScramMechanismSHA256 - SCRAM-SHA-256 mechanism + ScramMechanismSHA256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) + // ScramMechanismSHA512 - SCRAM-SHA-512 mechanism + ScramMechanismSHA512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) +) + +// String returns the human-readable representation of an ScramMechanism +func (o ScramMechanism) String() string { + switch o { + case ScramMechanismSHA256: + return "SCRAM-SHA-256" + case ScramMechanismSHA512: + return "SCRAM-SHA-512" + default: + return "UNKNOWN" + } +} + +// ScramMechanismFromString translates a Scram Mechanism name to +// a ScramMechanism value. +func ScramMechanismFromString(mechanism string) (ScramMechanism, error) { + switch strings.ToUpper(mechanism) { + case "SCRAM-SHA-256": + return ScramMechanismSHA256, nil + case "SCRAM-SHA-512": + return ScramMechanismSHA512, nil + default: + return ScramMechanismUnknown, + NewError(ErrInvalidArg, "Unknown SCRAM mechanism", false) + } +} + +// ScramCredentialInfo contains Mechanism and Iterations for a +// SASL/SCRAM credential associated with a user. +type ScramCredentialInfo struct { + // Iterations - positive number of iterations used when creating the credential + Iterations int + // Mechanism - SASL/SCRAM mechanism + Mechanism ScramMechanism +} + +// UserScramCredentialsDescription represent all SASL/SCRAM credentials +// associated with a user that can be retrieved, or an error indicating +// why credentials could not be retrieved. +type UserScramCredentialsDescription struct { + // User - the user name. + User string + // ScramCredentialInfos - SASL/SCRAM credential representations for the user. + ScramCredentialInfos []ScramCredentialInfo + // Error - error corresponding to this user description. + Error Error +} + +// UserScramCredentialDeletion is a request to delete +// a SASL/SCRAM credential for a user. +type UserScramCredentialDeletion struct { + // User - user name + User string + // Mechanism - SASL/SCRAM mechanism. + Mechanism ScramMechanism +} + +// UserScramCredentialUpsertion is a request to update/insert +// a SASL/SCRAM credential for a user. +type UserScramCredentialUpsertion struct { + // User - user name + User string + // ScramCredentialInfo - the mechanism and iterations. + ScramCredentialInfo ScramCredentialInfo + // Password - password to HMAC before storage. + Password []byte + // Salt - salt to use. Will be generated randomly if nil. (optional) + Salt []byte +} + +// DescribeUserScramCredentialsResult represents the result of a +// DescribeUserScramCredentials call. +type DescribeUserScramCredentialsResult struct { + // ConsumerGroupDescriptions - Map from user name + // to UserScramCredentialsDescription + Descriptions map[string]UserScramCredentialsDescription +} + +// AlterUserScramCredentialsResult represents the result of a +// AlterUserScramCredentials call. +type AlterUserScramCredentialsResult struct { + // Errors - Map from user name + // to an Error, with ErrNoError code on success. + Errors map[string]Error +} + // waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens // first. // The returned result event is checked for errors its error is returned if set. @@ -896,6 +1009,52 @@ func (a *AdminClient) cToConsumerGroupDescriptions( return result } +// cToDescribeUserScramCredentialsResult converts a C +// rd_kafka_DescribeUserScramCredentials_result_t to a Go map of users to +// UserScramCredentialsDescription. +func cToDescribeUserScramCredentialsResult( + cRes *C.rd_kafka_DescribeUserScramCredentials_result_t) map[string]UserScramCredentialsDescription { + result := make(map[string]UserScramCredentialsDescription) + var cDescriptionCount C.size_t + cDescriptions := + C.rd_kafka_DescribeUserScramCredentials_result_descriptions(cRes, + &cDescriptionCount) + + for i := 0; i < int(cDescriptionCount); i++ { + cDescription := + C.DescribeUserScramCredentials_result_description_by_idx( + cDescriptions, cDescriptionCount, C.size_t(i)) + user := C.GoString(C.rd_kafka_UserScramCredentialsDescription_user(cDescription)) + userDescription := UserScramCredentialsDescription{User: user} + + // Populate the error if required. + cError := C.rd_kafka_UserScramCredentialsDescription_error(cDescription) + if C.rd_kafka_error_code(cError) != C.RD_KAFKA_RESP_ERR_NO_ERROR { + userDescription.Error = newError(C.rd_kafka_error_code(cError)) + result[user] = userDescription + continue + } + + cCredentialCount := C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(cDescription) + scramCredentialInfos := make([]ScramCredentialInfo, int(cCredentialCount)) + for j := 0; j < int(cCredentialCount); j++ { + cScramCredentialInfo := + C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + cDescription, C.size_t(j)) + cMechanism := C.rd_kafka_ScramCredentialInfo_mechanism(cScramCredentialInfo) + cIterations := C.rd_kafka_ScramCredentialInfo_iterations(cScramCredentialInfo) + scramCredentialInfos[j] = ScramCredentialInfo{ + Mechanism: ScramMechanism(cMechanism), + Iterations: int(cIterations), + } + } + userDescription.ScramCredentialInfos = scramCredentialInfos + result[user] = userDescription + } + + return result +} + // ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array // to a Go ConsumerGroupListing slice. func (a *AdminClient) cToConsumerGroupListings( @@ -2351,6 +2510,193 @@ func (a *AdminClient) AlterConsumerGroupOffsets( return acgor, nil } +// DescribeUserScramCredentials describe SASL/SCRAM credentials for the +// specified user names. +// +// Parameters: +// - `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// - `users` - a slice of string, each one correspond to a user name, no +// duplicates are allowed +// - `options` - DescribeUserScramCredentialsAdminOption options. +// +// Returns a map from user name to user SCRAM credentials description. +// Each description can have an individual error. +func (a *AdminClient) DescribeUserScramCredentials( + ctx context.Context, users []string, + options ...DescribeUserScramCredentialsAdminOption) (result DescribeUserScramCredentialsResult, err error) { + result = DescribeUserScramCredentialsResult{ + Descriptions: make(map[string]UserScramCredentialsDescription), + } + err = a.verifyClient() + if err != nil { + return result, err + } + + // Convert user names into char** required by the implementation. + cUserList := make([]*C.char, len(users)) + cUserCount := C.size_t(len(users)) + + for idx, user := range users { + cUserList[idx] = C.CString(user) + defer C.free(unsafe.Pointer(cUserList[idx])) + } + + var cUserListPtr **C.char + if cUserCount > 0 { + cUserListPtr = ((**C.char)(&cUserList[0])) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, + C.RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_DescribeConsumerGroups (asynchronous). + C.rd_kafka_DescribeUserScramCredentials( + a.handle.rk, + cUserListPtr, + cUserCount, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_DescribeUserScramCredentials_result(rkev) + + // Convert result from C to Go. + result.Descriptions = cToDescribeUserScramCredentialsResult(cRes) + return result, nil +} + +// AlterUserScramCredentials alters SASL/SCRAM credentials. +// The pair (user, mechanism) must be unique among upsertions and deletions. +// +// Parameters: +// - `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// - `upsertions` - a slice of user credential upsertions +// - `deletions` - a slice of user credential deletions +// - `options` - AlterUserScramCredentialsAdminOption options. +// +// Returns a map from user name to the corresponding Error, with error code +// ErrNoError when the request succeeded. +func (a *AdminClient) AlterUserScramCredentials( + ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, + options ...AlterUserScramCredentialsAdminOption) (result AlterUserScramCredentialsResult, err error) { + result = AlterUserScramCredentialsResult{ + Errors: make(map[string]Error), + } + err = a.verifyClient() + if err != nil { + return result, err + } + + // Convert user names into char** required by the implementation. + cAlterationList := make([]*C.rd_kafka_UserScramCredentialAlteration_t, len(upsertions)+len(deletions)) + cAlterationCount := C.size_t(len(upsertions) + len(deletions)) + idx := 0 + + for _, upsertion := range upsertions { + user := C.CString(upsertion.User) + defer C.free(unsafe.Pointer(user)) + + var salt *C.uchar = nil + var saltSize C.size_t = 0 + if upsertion.Salt != nil { + salt = (*C.uchar)(&upsertion.Salt[0]) + saltSize = C.size_t(len(upsertion.Salt)) + } + + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(user, + C.rd_kafka_ScramMechanism_t(upsertion.ScramCredentialInfo.Mechanism), + C.int(upsertion.ScramCredentialInfo.Iterations), + (*C.uchar)(&upsertion.Password[0]), C.size_t(len(upsertion.Password)), + salt, saltSize) + defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) + idx = idx + 1 + } + + for _, deletion := range deletions { + user := C.CString(deletion.User) + defer C.free(unsafe.Pointer(user)) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new( + user, C.rd_kafka_ScramMechanism_t(deletion.Mechanism)) + defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) + idx = idx + 1 + } + + var cAlterationListPtr **C.rd_kafka_UserScramCredentialAlteration_t + if cAlterationCount > 0 { + cAlterationListPtr = ((**C.rd_kafka_UserScramCredentialAlteration_t)(&cAlterationList[0])) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_AlterUserScramCredentials (asynchronous). + C.rd_kafka_AlterUserScramCredentials( + a.handle.rk, + cAlterationListPtr, + cAlterationCount, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_AlterUserScramCredentials_result(rkev) + + // Convert result from C to Go. + var cResponseSize C.size_t + cResponses := C.rd_kafka_AlterUserScramCredentials_result_responses(cRes, &cResponseSize) + for i := 0; i < int(cResponseSize); i++ { + cResponse := C.AlterUserScramCredentials_result_response_by_idx( + cResponses, cResponseSize, C.size_t(i)) + user := C.GoString(C.rd_kafka_AlterUserScramCredentials_result_response_user(cResponse)) + err := newErrorFromCError(C.rd_kafka_AlterUserScramCredentials_result_response_error(cResponse)) + result.Errors[user] = err + } + + return result, nil +} + // NewAdminClient creats a new AdminClient instance with a new underlying client instance func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index 200bd9a36..1ab48a6e8 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -595,6 +595,75 @@ func testAdminAPIsAlterConsumerGroupOffsets( t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) } } +func testAdminAPIsUserScramCredentials(what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + var users []string + + // With nil users + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, describeErr := a.DescribeUserScramCredentials(ctx, users) + if describeErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + describeErr, ctx.Err()) + } + + // With one user + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + users = append(users, "non-existent") + _, describeErr = a.DescribeUserScramCredentials(ctx, users) + if describeErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + describeErr, ctx.Err()) + } + + // With duplicate users + users = append(users, "non-existent") + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + users = append(users, "non-existent") + _, describeErr = a.DescribeUserScramCredentials(ctx, users) + if describeErr == nil || describeErr.(Error).Code() != ErrInvalidArg { + t.Fatalf("Duplicate user should give an InvalidArgument error, got %s\n", describeErr) + } + + var upsertions []UserScramCredentialUpsertion + upsertions = append(upsertions, + UserScramCredentialUpsertion{ + User: "non-existent", + ScramCredentialInfo: ScramCredentialInfo{ + Mechanism: ScramMechanismSHA256, Iterations: 10000}, + Password: []byte("password"), + Salt: []byte("salt"), + }) + var deletions []UserScramCredentialDeletion + deletions = append(deletions, UserScramCredentialDeletion{ + User: "non-existent", Mechanism: ScramMechanismSHA256}) + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, alterErr := a.AlterUserScramCredentials(ctx, upsertions, deletions) + if alterErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + alterErr, ctx.Err()) + } + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, alterErr = a.AlterUserScramCredentials(ctx, upsertions, nil) + if alterErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + alterErr, ctx.Err()) + } + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, alterErr = a.AlterUserScramCredentials(ctx, nil, deletions) + if alterErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + alterErr, ctx.Err()) + } +} func testAdminAPIs(what string, a *AdminClient, t *testing.T) { t.Logf("AdminClient API testing on %s: %s", a, what) @@ -832,6 +901,8 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) { testAdminAPIsListConsumerGroupOffsets(what, a, expDuration, t) testAdminAPIsAlterConsumerGroupOffsets(what, a, expDuration, t) + + testAdminAPIsUserScramCredentials(what, a, expDuration, t) } // TestAdminAPIs dry-tests most Admin APIs, no broker is needed. diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index c0b9eb690..0681f0072 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -133,7 +133,10 @@ func (ao AdminOptionRequestTimeout) supportsListConsumerGroupOffsets() { } func (ao AdminOptionRequestTimeout) supportsAlterConsumerGroupOffsets() { } - +func (ao AdminOptionRequestTimeout) supportsDescribeUserScramCredentials() { +} +func (ao AdminOptionRequestTimeout) supportsAlterUserScramCredentials() { +} func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { return nil @@ -415,6 +418,22 @@ type AlterConsumerGroupOffsetsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// DescribeUserScramCredentialsAdminOption - see setter. +// +// See SetAdminRequestTimeout. +type DescribeUserScramCredentialsAdminOption interface { + supportsDescribeUserScramCredentials() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +// AlterUserScramCredentialsAdminOption - see setter. +// +// See SetAdminRequestTimeout. +type AlterUserScramCredentialsAdminOption interface { + supportsAlterUserScramCredentials() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // AdminOption is a generic type not to be used directly. // // See CreateTopicsAdminOption et.al. diff --git a/kafka/integration_test.go b/kafka/integration_test.go index 8eb500427..394632f79 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2519,6 +2519,135 @@ func (its *IntegrationTestSuite) TestProducerConsumerHeaders() { } +// TestUserScramTestAdminClient_UserScramCredentialsCredentialsAPI describes +// the SCRAM credentials for a user, upserts some credentials, describes them +// again to check insertion, deletes them, and finally describes them once again +// to check deletion. +func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { + t := its.T() + ac, err := NewAdminClient(&ConfigMap{ + "bootstrap.servers": testconf.Brokers, + }) + if err != nil { + t.Fatalf("Failed to create Admin Client: %s\n", err) + } + defer ac.Close() + + users := []string{"non-existent"} + + // Call DescribeUserScramCredentials + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, users) + if describeErr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) + } + + // Check Describe result + if len(describeRes.Descriptions) != 1 { + t.Fatalf("Expected 1 user in Describe Result, got %d\n", len(describeRes.Descriptions)) + } + description, ok := describeRes.Descriptions[users[0]] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + + if description.Error.Code() != ErrResourceNotFound { + t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error.Code()) + } + + // Call AlterUserScramCredentials for Upsert + upsertions := []UserScramCredentialUpsertion{ + { + User: "non-existent", + ScramCredentialInfo: ScramCredentialInfo{ + Mechanism: ScramMechanismSHA256, Iterations: 10000}, + Password: []byte("password"), + Salt: []byte("salt"), + }} + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, nil) + + // Check Upsert result + if alterErr != nil { + t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", alterErr) + } + if len(alterRes.Errors) != 1 { + t.Fatalf("Expected 1 user in Alter Result, got %d\n", len(alterRes.Errors)) + } + kErr, ok := alterRes.Errors[upsertions[0].User] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + if kErr.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", kErr.Code()) + } + + // Call DescribeUserScramCredentials to verify upsert + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + describeRes, describeErr = ac.DescribeUserScramCredentials(ctx, users) + + // Check Describe result + if describeErr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) + } + description, ok = describeRes.Descriptions[users[0]] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + if description.Error.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %s", description.Error.Code()) + } + if description.ScramCredentialInfos[0].Iterations != 10000 { + t.Fatalf("Iterations field doesn't match the upserted value. Expected 10000, got %d", + description.ScramCredentialInfos[0].Iterations) + } + if description.ScramCredentialInfos[0].Mechanism != ScramMechanismSHA256 { + t.Fatalf("Mechanism field doesn't match the upserted value. Expected %s, got %s", + ScramMechanismSHA256, description.ScramCredentialInfos[0].Mechanism) + } + + // Call AlterUserScramCredentials for Delete + deletions := []UserScramCredentialDeletion{ + {User: "non-existent", Mechanism: ScramMechanismSHA256}} + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + alterRes, alterErr = ac.AlterUserScramCredentials(ctx, nil, deletions) + + // Check Delete result + if alterErr != nil { + t.Fatalf("Failed to alter user scram credentials: %s\n", alterErr) + } + kErr, ok = alterRes.Errors[upsertions[0].User] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + if kErr.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", kErr.Code()) + } + + // Call DescribeUserScramCredentials to verify delete + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + describeRes, describeErr = ac.DescribeUserScramCredentials(ctx, users) + + // Check Describe result + if describeErr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) + } + description, ok = describeRes.Descriptions[users[0]] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + + if description.Error.Code() != ErrResourceNotFound { + t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error.Code()) + } +} + func TestIntegration(t *testing.T) { its := new(IntegrationTestSuite) testconfInit()