-
Notifications
You must be signed in to change notification settings - Fork 665
/
admin_describe_config.go
89 lines (78 loc) · 2.61 KB
/
admin_describe_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// List current configuration for a cluster resource
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func main() {
if len(os.Args) != 4 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <resource-type> <resource-name>\n"+
"\n"+
" <bootstrap-servers> - CSV list of bootstrap brokers\n"+
" <resource-type> - any, broker, topic, group\n"+
" <resource-name> - broker id or topic name\n",
os.Args[0])
os.Exit(1)
}
bootstrapServers := os.Args[1]
resourceType, err := kafka.ResourceTypeFromString(os.Args[2])
if err != nil {
fmt.Printf("Invalid resource type: %s\n", os.Args[2])
os.Exit(1)
}
resourceName := os.Args[3]
// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dur, _ := time.ParseDuration("20s")
// Ask cluster for the resource's current configuration
results, err := a.DescribeConfigs(ctx,
[]kafka.ConfigResource{{Type: resourceType, Name: resourceName}},
kafka.SetAdminRequestTimeout(dur))
if err != nil {
fmt.Printf("Failed to DescribeConfigs(%s, %s): %s\n",
resourceType, resourceName, err)
os.Exit(1)
}
// Print results
for _, result := range results {
fmt.Printf("%s %s: %s:\n", result.Type, result.Name, result.Error)
for _, entry := range result.Config {
// Truncate the value to 60 chars, if needed, for nicer formatting.
fmt.Printf("%60s = %-60.60s %-20s Read-only:%v Sensitive:%v Default:%v\n",
entry.Name, entry.Value, entry.Source,
entry.IsReadOnly, entry.IsSensitive,
entry.IsDefault)
}
}
a.Close()
}