Skip to content

Commit

Permalink
style-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jainruchir committed Apr 11, 2023
1 parent b9fea3c commit 0e16f98
Show file tree
Hide file tree
Showing 18 changed files with 928 additions and 823 deletions.
84 changes: 42 additions & 42 deletions examples/describe_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ static void stop(int sig) {

static void usage(const char *reason, ...) {

fprintf(stderr,
"Describe cluster usage examples\n"
"\n"
"Usage: %s <options> <include_cluster_authorized_operations> ...\n"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
" -X <prop=val> Set librdkafka configuration property.\n"
" See CONFIGURATION.md for full list.\n"
" -d <dbg,..> Enable librdkafka debugging (%s).\n"
"\n",
argv0, rd_kafka_get_debug_contexts());
fprintf(
stderr,
"Describe cluster usage examples\n"
"\n"
"Usage: %s <options> <include_cluster_authorized_operations> ...\n"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
" -X <prop=val> Set librdkafka configuration property.\n"
" See CONFIGURATION.md for full list.\n"
" -d <dbg,..> Enable librdkafka debugging (%s).\n"
"\n",
argv0, rd_kafka_get_debug_contexts());

if (reason) {
va_list ap;
Expand Down Expand Up @@ -142,48 +143,50 @@ print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
int j, acl_operation;
const rd_kafka_ClusterDescription_t *desc;
int controller_id, node_cnt, cluster_authorized_operations_cnt;
const char* cluster_id;
const char *cluster_id;

desc = rd_kafka_DescribeCluster_result_description(clusterdesc);

controller_id = rd_kafka_ClusterDescription_controller_id(desc);
node_cnt = rd_kafka_ClusterDescription_node_cnt(desc);
cluster_authorized_operations_cnt = rd_kafka_ClusterDescription_cluster_acl_operations_cnt(desc);
node_cnt = rd_kafka_ClusterDescription_node_cnt(desc);
cluster_authorized_operations_cnt =
rd_kafka_ClusterDescription_cluster_acl_operations_cnt(desc);
cluster_id = rd_kafka_ClusterDescription_cluster_id(desc);

printf("Cluster id: %s\t Controller id: %d\t ACL operations count allowed: %d\n",
cluster_id, controller_id, cluster_authorized_operations_cnt);
for(j=0;j<cluster_authorized_operations_cnt;j++){
acl_operation =
rd_kafka_ClusterDescription_authorized_operation_idx(desc,j);
printf(
"Cluster id: %s\t Controller id: %d\t ACL operations count "
"allowed: %d\n",
cluster_id, controller_id, cluster_authorized_operations_cnt);
for (j = 0; j < cluster_authorized_operations_cnt; j++) {
acl_operation =
rd_kafka_ClusterDescription_authorized_operation_idx(desc,
j);
printf("\t%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
rd_kafka_AclOperation_name(acl_operation));
}
for(j=0;j<node_cnt;j++){

for (j = 0; j < node_cnt; j++) {
const rd_kafka_Node_t *node = NULL;
node = rd_kafka_ClusterDescription_node_idx(desc, j);
printf("Node [id: %" PRId32
", host: %s"
", port: %" PRIu16 "]\n",
rd_kafka_Node_id(node),
rd_kafka_Node_host(node),
rd_kafka_Node_port(node));
", host: %s"
", port: %" PRIu16 "]\n",
rd_kafka_Node_id(node), rd_kafka_Node_host(node),
rd_kafka_Node_port(node));
}
return 0;
}

/**
* @brief Call rd_kafka_DescribeCluster()
*/
static void
cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_t *rk;
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event = NULL;
rd_kafka_error_t *error;
int retval = 0;
int retval = 0;

int include_cluster_authorized_operations =
parse_int("include_cluster_authorized_operations", argv[0]);
Expand All @@ -209,18 +212,20 @@ cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
/* Signal handler for clean shutdown */
signal(SIGINT, stop);

options = rd_kafka_AdminOptions_new(
rk, RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER);
options =
rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER);

if (rd_kafka_AdminOptions_set_request_timeout(
options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
goto exit;
}
if ((error = rd_kafka_AdminOptions_set_include_cluster_authorized_operations(
options, include_cluster_authorized_operations))) {
if ((error =
rd_kafka_AdminOptions_set_include_cluster_authorized_operations(
options, include_cluster_authorized_operations))) {
fprintf(stderr,
"%% Failed to set require cluster authorized operations: %s\n",
"%% Failed to set require cluster authorized "
"operations: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
exit(1);
Expand All @@ -241,8 +246,7 @@ cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
} else if (rd_kafka_event_error(event)) {
rd_kafka_resp_err_t err = rd_kafka_event_error(event);
/* DescribeCluster request failed */
fprintf(stderr,
"%% DescribeCluster failed[%" PRId32 "]: %s\n",
fprintf(stderr, "%% DescribeCluster failed[%" PRId32 "]: %s\n",
err, rd_kafka_event_error_string(event));
goto exit;

Expand Down Expand Up @@ -276,10 +280,6 @@ int main(int argc, char **argv) {
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
conf_set(conf, "sasl.username", "broker");
conf_set(conf, "sasl.password", "broker");
conf_set(conf, "sasl.mechanism", "SCRAM-SHA-256");
conf_set(conf, "security.protocol", "SASL_PLAINTEXT");

/*
* Parse common options
Expand Down
21 changes: 9 additions & 12 deletions examples/describe_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
}

for (i = 0; i < result_groups_cnt; i++) {
int j, member_cnt, authorized_operation_count,
acl_operation;
int j, member_cnt, authorized_operation_count, acl_operation;
const rd_kafka_error_t *error;
const rd_kafka_ConsumerGroupDescription_t *group =
result_groups[i];
Expand All @@ -181,8 +180,9 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
rd_kafka_ConsumerGroupDescription_partition_assignor(group);
rd_kafka_consumer_group_state_t state =
rd_kafka_ConsumerGroupDescription_state(group);
authorized_operation_count =
rd_kafka_ConsumerGroupDescription_authorized_operations_count(group);
authorized_operation_count =
rd_kafka_ConsumerGroupDescription_authorized_operations_count(
group);
member_cnt =
rd_kafka_ConsumerGroupDescription_member_count(group);
error = rd_kafka_ConsumerGroupDescription_error(group);
Expand All @@ -205,11 +205,12 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
group_id, partition_assignor,
rd_kafka_consumer_group_state_name(state), coordinator_desc,
member_cnt);
for(j=0;j<authorized_operation_count;j++){
acl_operation =
rd_kafka_ConsumerGroupDescription_authorized_operation(group,j);
for (j = 0; j < authorized_operation_count; j++) {
acl_operation =
rd_kafka_ConsumerGroupDescription_authorized_operation(
group, j);
printf("%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
rd_kafka_AclOperation_name(acl_operation));
}
if (error)
printf(" error[%" PRId32 "]: %s",
Expand Down Expand Up @@ -374,10 +375,6 @@ int main(int argc, char **argv) {
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
conf_set(conf, "sasl.username", "broker");
conf_set(conf, "sasl.password", "broker");
conf_set(conf, "sasl.mechanism", "SCRAM-SHA-256");
conf_set(conf, "security.protocol", "SASL_PLAINTEXT");

/*
* Parse common options
Expand Down
Loading

0 comments on commit 0e16f98

Please sign in to comment.