From b0fa1a00b1ef348e76bb1fba1eff0d4927574cf4 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 10 Mar 2023 14:10:42 -0700 Subject: [PATCH] kgo.Client: add ConfigValue and ConfigValues This allows a way to interrogate the client for the value of any option, whether the option was specified or not. More type safe and explicit would be to add a getter for every option, but that'd also bloat the client API unnecessarily. --- pkg/kgo/client.go | 241 ++++++++++++++++++++++++++++++++++++++++++++++ pkg/kgo/config.go | 5 +- 2 files changed, 244 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 5669ccad..edc11796 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -16,6 +16,7 @@ import ( "math/rand" "net" "reflect" + "runtime" "sort" "strconv" "strings" @@ -153,6 +154,246 @@ func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) { return cfg, seeds, compressor, nil } +func namefn(fn any) string { + v := reflect.ValueOf(fn) + if v.Type().Kind() != reflect.Func { + return "" + } + name := runtime.FuncForPC(v.Pointer()).Name() + dot := strings.LastIndexByte(name, '.') + if dot >= 0 { + return name[dot+1:] + } + return name +} + +// OptValue returns the value for the given configuration option. If the +// given option does not exist, this returns nil. This function takes either a +// raw Opt, or an Opt function name. +// +// If a configuration option has multiple inputs, this function returns only +// the first input. If the function is a boolean function (such as +// BlockRebalanceOnPoll), this function returns the value of the internal bool. +// Variadic option inputs are returned as a single slice. Options that are +// internally stored as a pointer (ClientID, TransactionalID, and InstanceID) +// are returned as their string input; you can see if the option is internally +// nil by looking at the second value returned from OptValues. +// +// var ( +// cl, _ := NewClient( +// InstanceID("foo"), +// ConsumeTopics("foo", "bar"), +// ) +// iid = cl.OptValue(InstanceID) // iid is "foo" +// gid = cl.OptValue(ConsumerGroup) // gid is "" since groups are not used +// topics = cl.OptValue("ConsumeTopics") // topics is []string{"foo", "bar"}; string lookup for the option works +// bpoll = cl.OptValue(BlockRebalanceOnPoll) // bpoll is false +// t = cl.OptValue(SessionTimeout) // t is 45s, the internal default +// td = t.(time.Duration) // safe conversion since SessionTimeout's input is a time.Duration +// unk = cl.OptValue("Unknown"), // unk is nil +// ) +func (cl *Client) OptValue(opt any) any { + vs := cl.OptValues(opt) + if len(vs) > 0 { + return vs[0] + } + return nil +} + +// OptValues returns all values for options. This method is useful for +// options that have multiple inputs (notably, SoftwareNameAndVersion). This is +// also useful for options that are internally stored as a pointer (ClientID, +// TransactionalID, and InstanceID) -- this function will return the string +// value of the option but also whether the option is non-nil. Boolean options +// are returned as a single-element slice with the bool value. Variadic inputs +// are returned as a signle slice. If the input option does not exist, this +// returns nil. +// +// var ( +// cl, _ = NewClient( +// InstanceID("foo"), +// ConsumeTopics("foo", "bar"), +// ) +// idValues = cl.OptValues(InstanceID) // idValues is []any{"foo", true} +// tValues = cl.OptValues(SessionTimeout) // tValues is []any{45 * time.Second} +// topics = cl.OptValues(ConsumeTopics) // topics is []any{[]string{"foo", "bar"} +// bpoll = cl.OptValues(BlockRebalanceOnPoll) // bpoll is []any{false} +// unknown = cl.OptValues("Unknown") // unknown is nil +// ) +func (cl *Client) OptValues(opt any) []any { + name := namefn(opt) + if s, ok := opt.(string); ok { + name = s + } + cfg := &cl.cfg + + switch name { + case namefn(ClientID): + if cfg.id != nil { + return []any{*cfg.id, true} + } + return []any{"", false} + case namefn(SoftwareNameAndVersion): + return []any{cfg.softwareName, cfg.softwareVersion} + case namefn(WithLogger): + if cfg.logger != nil { + return []any{cfg.logger.(*wrappedLogger).inner} + } + return []any{nil} + case namefn(RequestTimeoutOverhead): + return []any{cfg.requestTimeoutOverhead} + case namefn(ConnIdleTimeout): + return []any{cfg.connIdleTimeout} + case namefn(Dialer): + return []any{cfg.dialFn} + case namefn(DialTLSConfig): + return []any{cfg.dialTLS} + case namefn(SeedBrokers): + return []any{cfg.seedBrokers} + case namefn(MaxVersions): + return []any{cfg.maxVersions} + case namefn(MinVersions): + return []any{cfg.minVersions} + case namefn(RetryBackoffFn): + return []any{cfg.retryBackoff} + case namefn(RequestRetries): + return []any{cfg.retries} + case namefn(RetryTimeout): + return []any{cfg.retryTimeout(0)} + case namefn(RetryTimeoutFn): + return []any{cfg.retryTimeout} + case namefn(AllowAutoTopicCreation): + return []any{cfg.allowAutoTopicCreation} + case namefn(BrokerMaxWriteBytes): + return []any{cfg.maxBrokerWriteBytes} + case namefn(BrokerMaxReadBytes): + return []any{cfg.maxBrokerReadBytes} + case namefn(MetadataMaxAge): + return []any{cfg.metadataMaxAge} + case namefn(MetadataMinAge): + return []any{cfg.metadataMinAge} + case namefn(SASL): + return []any{cfg.sasls} + case namefn(WithHooks): + return []any{cfg.hooks} + case namefn(ConcurrentTransactionsBackoff): + return []any{cfg.txnBackoff} + + case namefn(DefaultProduceTopic): + return []any{cfg.defaultProduceTopic} + case namefn(RequiredAcks): + return []any{cfg.acks} + case namefn(DisableIdempotentWrite): + return []any{cfg.disableIdempotency} + case namefn(MaxProduceRequestsInflightPerBroker): + return []any{cfg.maxProduceInflight} + case namefn(ProducerBatchCompression): + return []any{cfg.compression} + case namefn(ProducerBatchMaxBytes): + return []any{cfg.maxRecordBatchBytes} + case namefn(MaxBufferedRecords): + return []any{cfg.maxBufferedRecords} + case namefn(RecordPartitioner): + return []any{cfg.partitioner} + case namefn(ProduceRequestTimeout): + return []any{cfg.produceTimeout} + case namefn(RecordRetries): + return []any{cfg.recordRetries} + case namefn(UnknownTopicRetries): + return []any{cfg.maxUnknownFailures} + case namefn(StopProducerOnDataLossDetected): + return []any{cfg.stopOnDataLoss} + case namefn(ProducerOnDataLossDetected): + return []any{cfg.onDataLoss} + case namefn(ProducerLinger): + return []any{cfg.linger} + case namefn(ManualFlushing): + return []any{cfg.manualFlushing} + case namefn(RecordDeliveryTimeout): + return []any{cfg.recordTimeout} + case namefn(TransactionalID): + if cfg.txnID != nil { + return []any{cfg.txnID, true} + } + return []any{"", false} + case namefn(TransactionTimeout): + return []any{cfg.txnTimeout} + + case namefn(ConsumePartitions): + return []any{cfg.partitions} + case namefn(ConsumePreferringLagFn): + return []any{cfg.preferLagFn} + case namefn(ConsumeRegex): + return []any{cfg.regex} + case namefn(ConsumeResetOffset): + return []any{cfg.resetOffset} + case namefn(ConsumeTopics): + return []any{cfg.topics} + case namefn(DisableFetchSessions): + return []any{cfg.disableFetchSessions} + case namefn(FetchIsolationLevel): + return []any{cfg.isolationLevel} + case namefn(FetchMaxBytes): + return []any{int32(cfg.maxBytes)} + case namefn(FetchMaxPartitionBytes): + return []any{int32(cfg.maxPartBytes)} + case namefn(FetchMaxWait): + return []any{time.Duration(cfg.maxWait) * time.Millisecond} + case namefn(FetchMinBytes): + return []any{cfg.minBytes} + case namefn(KeepControlRecords): + return []any{cfg.keepControl} + case namefn(MaxConcurrentFetches): + return []any{cfg.maxConcurrentFetches} + case namefn(Rack): + return []any{cfg.rack} + + case namefn(AdjustFetchOffsetsFn): + return []any{cfg.adjustOffsetsBeforeAssign} + case namefn(AutoCommitCallback): + return []any{cfg.commitCallback} + case namefn(AutoCommitInterval): + return []any{cfg.autocommitInterval} + case namefn(AutoCommitMarks): + return []any{cfg.autocommitMarks} + case namefn(Balancers): + return []any{cfg.balancers} + case namefn(BlockRebalanceOnPoll): + return []any{cfg.blockRebalanceOnPoll} + case namefn(ConsumerGroup): + return []any{cfg.group} + case namefn(DisableAutoCommit): + return []any{cfg.autocommitDisable} + case namefn(GreedyAutoCommit): + return []any{cfg.autocommitGreedy} + case namefn(GroupProtocol): + return []any{cfg.protocol} + case namefn(HeartbeatInterval): + return []any{cfg.heartbeatInterval} + case namefn(InstanceID): + if cfg.instanceID != nil { + return []any{*cfg.instanceID, true} + } + return []any{"", false} + case namefn(OnOffsetsFetched): + return []any{cfg.onFetched} + case namefn(OnPartitionsAssigned): + return []any{cfg.onAssigned} + case namefn(OnPartitionsLost): + return []any{cfg.onLost} + case namefn(OnPartitionsRevoked): + return []any{cfg.onRevoked} + case namefn(RebalanceTimeout): + return []any{cfg.rebalanceTimeout} + case namefn(RequireStableFetchOffsets): + return []any{cfg.requireStable} + case namefn(SessionTimeout): + return []any{cfg.sessionTimeout} + default: + return nil + } +} + // NewClient returns a new Kafka client with the given options or an error if // the options are invalid. Connections to brokers are lazily created only when // requests are written to them. diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 42216134..d2c3aa71 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -60,8 +60,9 @@ func (consumerOpt) consumerOpt() {} func (groupOpt) groupOpt() {} // A cfg can be written to while initializing a client, and after that it is -// only ever read from. Some areas of initializing may follow options, but all -// initializing is done before NewClient returns. +// (mostly) only ever read from. Some areas can continue to be modified -- +// particularly reconfiguring what to consume from -- but most areas are +// static. type cfg struct { ///////////////////// // GENERAL SECTION //