From b6c2743eff1cfd51bcdae12b7dd2e6134ef72f7f Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Tue, 30 May 2023 23:03:36 -0400 Subject: [PATCH] [fix][fn] Go functions must retrieve consumers by non-particioned topic ID (#20413) Co-authored-by: Andy Walker (cherry picked from commit fb1b46e5e993d77c583f715d2bea2eadbb052a81) --- pulsar-function-go/pf/instance.go | 18 ++++++++++++++++-- pulsar-function-go/pf/util.go | 11 +++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index c5b7500803d6d..3a00182585e68 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -388,11 +388,25 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) { // ackInputMessage doesn't produce any result, or the user doesn't want the result. func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) { log.Debugf("ack input message topic name is: %s", inputMessage.Topic()) - gi.consumers[inputMessage.Topic()].Ack(inputMessage) + gi.respondMessage(inputMessage, true) } func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) { - gi.consumers[inputMessage.Topic()].Nack(inputMessage) + gi.respondMessage(inputMessage, false) +} + +func (gi *goInstance) respondMessage(inputMessage pulsar.Message, ack bool) { + topicName, err := ParseTopicName(inputMessage.Topic()) + if err != nil { + log.Errorf("unable respond to message ID %s - invalid topic: %v", messageIDStr(inputMessage), err) + return + } + // consumers are indexed by topic name only (no partition) + if ack { + gi.consumers[topicName.NameWithoutPartition()].Ack(inputMessage) + return + } + gi.consumers[topicName.NameWithoutPartition()].Nack(inputMessage) } func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration { diff --git a/pulsar-function-go/pf/util.go b/pulsar-function-go/pf/util.go index d5b32da841121..1d1aa1cab939f 100644 --- a/pulsar-function-go/pf/util.go +++ b/pulsar-function-go/pf/util.go @@ -21,6 +21,8 @@ package pf import ( "fmt" + + "github.com/apache/pulsar-client-go/pulsar" ) func getProperties(fullyQualifiedName string, instanceID int) map[string]string { @@ -39,3 +41,12 @@ func getDefaultSubscriptionName(tenant, namespace, name string) string { func getFullyQualifiedInstanceID(tenant, namespace, name string, instanceID int) string { return fmt.Sprintf("%s/%s/%s:%d", tenant, namespace, name, instanceID) } + +func messageIDStr(msg pulsar.Message) string { + // ::: + return fmt.Sprintf("%d:%d:%d:%d", + msg.ID().LedgerID(), + msg.ID().EntryID(), + msg.ID().PartitionIdx(), + msg.ID().BatchIdx()) +}