Skip to content

Commit

Permalink
Single Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed Oct 12, 2023
1 parent a88e0f3 commit edf5c53
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 1 deletion.
103 changes: 103 additions & 0 deletions examples/admin_list_offsets/admin_list_offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// List Offsets example
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <topicname> <partition> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..\n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]

args := len(os.Args)
i := 2
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for i < args {
topicName := os.Args[i]
partition, err := strconv.Atoi(os.Args[i+1])
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid partition: %s\n", err)
os.Exit(1)
}
tp := kafka.TopicPartition{Topic: &topicName, Partition: int32(partition)}
if os.Args[i+2] == "EARLIEST" {
topicPartitionOffsets[tp] = kafka.EarliestOffsetSpec
} else if os.Args[i+2] == "LATEST" {
topicPartitionOffsets[tp] = kafka.LatestOffsetSpec
} else if os.Args[i+2] == "MAXTIMESTAMP" {
topicPartitionOffsets[tp] = kafka.MaxTimestampOffsetSpec
} else if os.Args[i+2] == "TIMESTAMP" {
timestamp, timestampErr := strconv.Atoi(os.Args[i+3])
if timestampErr != nil {
fmt.Fprintf(os.Stderr, "Invalid timestamp: %s\n", timestampErr)
os.Exit(1)
}
topicPartitionOffsets[tp] = kafka.NewOffsetSpecOfTimestamp(int64(timestamp))
i = i + 1
} else {
fmt.Fprintf(os.Stderr, "Invalid OffsetSpec.\n")
os.Exit(1)
}
i = i + 3
}

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

results, err := a.ListOffsets(ctx, topicPartitionOffsets, kafka.SetAdminIsolationLevel(kafka.ReadCommitted))
if err != nil {
fmt.Printf("Failed to List offsets: %v\n", err)
os.Exit(1)
}
// map[TopicPartition]ListOffsetsResultInfo
// Print results
for tp, info := range results.Results {
fmt.Printf("Topic: %s Partition_Index : %d\n", *tp.Topic, tp.Partition)
if info.Error.Code() != kafka.ErrNoError {
fmt.Printf(" ErrorCode : %d ErrorMessage : %s\n\n", info.Error.Code(), info.Error.String())
} else {
fmt.Printf(" Offset : %d Timestamp : %d\n\n", info.Offset, info.Timestamp)
}
}

}
128 changes: 128 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCr
return responses[idx];
}
static const rd_kafka_ListOffsetsResultInfo_t *
ListOffsetsResultInfo_by_idx(const rd_kafka_ListOffsetsResultInfo_t **result_infos, size_t cnt, size_t idx) {
if (idx >= cnt)
return NULL;
return result_infos[idx];
}
static const rd_kafka_error_t *
error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) {
if (idx >= cnt)
Expand Down Expand Up @@ -897,6 +904,39 @@ type AlterUserScramCredentialsResult struct {
Errors map[string]Error
}

// OffsetSpec
// OffsetSpec specifies desired offsets while using ListOffsets.
type OffsetSpec int64

const (
// MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side.
MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP)
// EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition.
EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST)
// LatestOffsetSpec is used to describe the latest offset for the TopicPartition.
LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST)
)

// Creates an OffsetSpec corresponding to the timestamp.
func NewOffsetSpecOfTimestamp(timestamp int64) OffsetSpec {
return OffsetSpec(timestamp)
}

// ListOffsetsResultInfo
// Describes the result of ListOffsets request for a Topic Partition.
type ListOffsetsResultInfo struct {
Offset int64
Timestamp int64
LeaderEpoch int
Error Error
}

// ListOffsetsResult
// Holds the map of TopicPartition to ListOffsetsResultInfo for a request.
type ListOffsetsResult struct {
Results map[TopicPartition]ListOffsetsResultInfo
}

// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
// first.
// The returned result event is checked for errors its error is returned if set.
Expand Down Expand Up @@ -1105,6 +1145,27 @@ func cToDescribeUserScramCredentialsResult(
return result
}

// cToListOffsetsResult converts a C
// rd_kafka_ListOffsets_result_t to a Go ListOffsetsResult
func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOffsetsResult) {
result = ListOffsetsResult{Results: make(map[TopicPartition]ListOffsetsResultInfo)}
var cPartitionCount C.size_t
cResultInfos := C.rd_kafka_ListOffsets_result_infos(cRes, &cPartitionCount)
for itr := 0; itr < int(cPartitionCount); itr++ {
cResultInfo := C.ListOffsetsResultInfo_by_idx(cResultInfos, cPartitionCount, C.size_t(itr))
Value := ListOffsetsResultInfo{}
cPartition := C.rd_kafka_ListOffsetsResultInfo_topic_partition(cResultInfo)
Topic := C.GoString(cPartition.topic)
Partition := TopicPartition{Topic: &Topic, Partition: int32(cPartition.partition)}
Value.Offset = int64(cPartition.offset)
Value.Timestamp = int64(C.rd_kafka_ListOffsetsResultInfo_timestamp(cResultInfo))
Value.LeaderEpoch = -1
Value.Error = newError(cPartition.err)
result.Results[Partition] = Value
}
return result
}

// ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array
// to a Go ConsumerGroupListing slice.
func (a *AdminClient) cToConsumerGroupListings(
Expand Down Expand Up @@ -2734,6 +2795,73 @@ func (a *AdminClient) DescribeUserScramCredentials(
return result, nil
}

// ListOffsets describe offsets for the
// specified TopicPartiton based on an OffsetSpec.
//
// Parameters:
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it holds either the OffsetSpec enum value or timestamp.
// - `options` - ListOffsetsAdminOption options.
//
// Returns a ListOffsetsResult.
// Each TopicPartition's ListOffset can have an individual error.
func (a *AdminClient) ListOffsets(
ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) {
if len(topicPartitionOffsets) < 1 || topicPartitionOffsets == nil {
return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets of size greater or equal 1.")
}

topicPartitions := C.rd_kafka_topic_partition_list_new(C.int(len(topicPartitionOffsets)))
defer C.rd_kafka_topic_partition_list_destroy(topicPartitions)

for tp, offsetValue := range topicPartitionOffsets {
cStr := C.CString(*tp.Topic)
defer C.free(unsafe.Pointer(cStr))
topicPartition := C.rd_kafka_topic_partition_list_add(topicPartitions, cStr, C.int32_t(tp.Partition))
topicPartition.offset = C.int64_t(offsetValue)
}

// Convert Go AdminOptions (if any) to C AdminOptions.
genericOptions := make([]AdminOption, len(options))
for i := range options {
genericOptions[i] = options[i]
}
cOptions, err := adminOptionsSetup(
a.handle, C.RD_KAFKA_ADMIN_OP_LISTOFFSETS, genericOptions)
if err != nil {
return result, err
}
defer C.rd_kafka_AdminOptions_destroy(cOptions)

// Create temporary queue for async operation.
cQueue := C.rd_kafka_queue_new(a.handle.rk)
defer C.rd_kafka_queue_destroy(cQueue)

// Call rd_kafka_ListOffsets (asynchronous).
C.rd_kafka_ListOffsets(
a.handle.rk,
topicPartitions,
cOptions,
cQueue)

// Wait for result, error or context timeout.
rkev, err := a.waitResult(
ctx, cQueue, C.RD_KAFKA_EVENT_LISTOFFSETS_RESULT)
if err != nil {
return result, err
}
defer C.rd_kafka_event_destroy(rkev)

cRes := C.rd_kafka_event_ListOffsets_result(rkev)

// Convert result from C to Go.
result = cToListOffsetsResult(cRes)

return result, nil
}

// AlterUserScramCredentials alters SASL/SCRAM credentials.
// The pair (user, mechanism) must be unique among upsertions and deletions.
//
Expand Down
59 changes: 59 additions & 0 deletions kafka/adminoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,57 @@ func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
return ao
}

type IsolationLevel int

const (
ReadUncommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)
ReadCommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED)
)

// AdminOptionIsolationLevel sets the overall request IsolationLevel.
//
// Default: `ReadUncommitted`.
//
// Valid for all Admin API methods.
type AdminOptionIsolationLevel struct {
isSet bool
val IsolationLevel
}

func (ao AdminOptionIsolationLevel) supportsListOffsets() {
}
func (ao AdminOptionIsolationLevel) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
return nil
}

cErrstrSize := C.size_t(512)
cErrstr := (*C.char)(C.malloc(cErrstrSize))
defer C.free(unsafe.Pointer(cErrstr))

cError := C.rd_kafka_AdminOptions_set_isolation_level(
cOptions, C.rd_kafka_IsolationLevel_t(ao.val))
if cError != nil {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newErrorFromCErrorDestroy(cError)

}

return nil

}

// SetAdminIsolationLevel sets the overall IsolationLevel for a request.
//
// Default: `ReadUncommitted`.
//
// Valid for all Admin API methods.
func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel) {
ao.isSet = true
ao.val = isolationLevel
return ao
}

// AdminOptionValidateOnly tells the broker to only validate the request,
// without performing the requested operation (create topics, etc).
//
Expand Down Expand Up @@ -434,6 +485,14 @@ type AlterUserScramCredentialsAdminOption interface {
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}

// ListOffsetsAdminOption - see setter.
//
// See SetAdminRequestTimeout, SetAdminIsolationLevel.
type ListOffsetsAdminOption interface {
supportsListOffsets()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}

// AdminOption is a generic type not to be used directly.
//
// See CreateTopicsAdminOption et.al.
Expand Down
Loading

0 comments on commit edf5c53

Please sign in to comment.