Skip to content

Commit

Permalink
[KIP-554] Add User SCRAM credentials API (#1004)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Emanuele Sabellico <[email protected]>
Co-authored-by: Milind L <[email protected]>
Co-authored-by: Milind L <[email protected]>
  • Loading branch information
4 people committed Jul 11, 2023
1 parent f6b08cb commit 2652013
Show file tree
Hide file tree
Showing 8 changed files with 804 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ This is a feature release.
`nil` slice in `AdminClient.ListConsumerGroupOffsets`, when earlier it
was not processing that correctly (#985, @alexandredantas).
* Deprecate m.LeaderEpoch in favor of m.TopicPartition.LeaderEpoch (#1012).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API):
User SASL/SCRAM credentials alteration and description (#1004).

confluent-kafka-go is based on librdkafka v2.2.0, see the
[librdkafka v2.2.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0-RC2)
Expand Down
2 changes: 2 additions & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ admin_describe_config/admin_describe_config
admin_describe_consumer_groups/admin_describe_consumer_groups
admin_list_consumer_groups/admin_list_consumer_groups
admin_list_consumer_group_offsets/admin_list_consumer_group_offsets
admin_describe_user_scram_credentials/admin_describe_user_scram_credentials
admin_alter_user_scram_credentials/admin_alter_user_scram_credentials
avro_generic_consumer_example/avro_generic_consumer_example
avro_generic_producer_example/avro_generic_producer_example
avro_specific_consumer_example/avro_specific_consumer_example
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Alter user SCRAM credentials example
package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func usage(reason string) {
fmt.Fprintf(os.Stderr,
"Error: %s\n",
reason)
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> "+
"UPSERT <user1> <mechanism1> "+
"<iterations1> <password1> <salt1> "+
"[UPSERT <user2> <mechanism2> <iterations2> "+
"<password2> <salt2> DELETE <user3> <mechanism3> ...]\n",
os.Args[0])
os.Exit(1)
}

func main() {
// 2 + variable arguments
nArgs := len(os.Args)

if nArgs < 2 {
usage("bootstrap-servers required")
}

if nArgs == 2 {
usage("at least one upsert/delete required")
}

bootstrapServers := os.Args[1]

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

var upsertions []kafka.UserScramCredentialUpsertion
var deletions []kafka.UserScramCredentialDeletion

i := 2
nAlterations := 0
for i < nArgs {
switch os.Args[i] {
case "UPSERT":
if i+5 >= nArgs {
usage(fmt.Sprintf(
"wrong argument count for alteration %d: expected 6, found %d",
nAlterations, nArgs-i))
}

user := os.Args[i+1]
mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2])
if err != nil {
usage(err.Error())
}
iterations, err := strconv.Atoi(os.Args[i+3])
if err != nil {
usage(err.Error())
}
password := []byte(os.Args[i+4])
salt := []byte(os.Args[i+5])
// if salt is an empty string,
// set it to nil to generate it randomly.
if len(salt) == 0 {
salt = nil
}
upsertions = append(upsertions,
kafka.UserScramCredentialUpsertion{
User: user,
ScramCredentialInfo: kafka.ScramCredentialInfo{
Mechanism: mechanism,
Iterations: iterations,
},
Password: password,
Salt: salt,
})
i += 6
case "DELETE":
if i+2 >= nArgs {
usage(fmt.Sprintf(
"wrong argument count for alteration %d: expected 3, found %d",
nAlterations, nArgs-i))
}

user := os.Args[i+1]
mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2])
if err != nil {
usage(err.Error())
}
deletions = append(deletions,
kafka.UserScramCredentialDeletion{
User: user,
Mechanism: mechanism,
})
i += 3
default:
usage(fmt.Sprintf("unknown alteration %s", os.Args[i]))
}
nAlterations++
}

alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, deletions)
if alterErr != nil {
fmt.Printf("Failed to alter user scram credentials: %s\n", alterErr)
os.Exit(1)
}

for username, err := range alterRes.Errors {
fmt.Printf("Username: %s\n", username)
if err.Code() == kafka.ErrNoError {
fmt.Printf(" Success\n")
} else {
fmt.Printf(" Error[%d]: %s\n", err.Code(), err.String())
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Describe user SCRAM credentials example
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func usage(reason string) {
fmt.Fprintf(os.Stderr,
"Error: %s\n",
reason)
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <user1> <user2> ...\n",
os.Args[0])
os.Exit(1)
}

func main() {

// 2 + n arguments
nArgs := len(os.Args)

if nArgs < 2 {
usage("bootstrap-servers required")
}

bootstrapServers := os.Args[1]

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, os.Args[2:])
if describeErr != nil {
fmt.Printf("Failed to describe user scram credentials: %s\n", describeErr)
os.Exit(1)
}

for username, description := range describeRes.Descriptions {
fmt.Printf("Username: %s \n", username)
if description.Error.Code() == kafka.ErrNoError {
for i := 0; i < len(description.ScramCredentialInfos); i++ {
fmt.Printf(" Mechanism: %s Iterations: %d\n",
description.ScramCredentialInfos[i].Mechanism,
description.ScramCredentialInfos[i].Iterations)
}
} else {
fmt.Printf(" Error[%d]: %s\n",
description.Error.Code(), description.Error.String())
}
}

}
Loading

0 comments on commit 2652013

Please sign in to comment.