Skip to content

Commit

Permalink
[fix][fn] Go functions must retrieve consumers by non-particioned top…
Browse files Browse the repository at this point in the history
…ic ID (apache#20413)

Co-authored-by: Andy Walker <[email protected]>
(cherry picked from commit fb1b46e)
  • Loading branch information
flowchartsman authored and michaeljmarshall committed Jun 6, 2023
1 parent 6506d6a commit b6c2743
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
18 changes: 16 additions & 2 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,25 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
// ackInputMessage doesn't produce any result, or the user doesn't want the result.
func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
log.Debugf("ack input message topic name is: %s", inputMessage.Topic())
gi.consumers[inputMessage.Topic()].Ack(inputMessage)
gi.respondMessage(inputMessage, true)
}

func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
gi.consumers[inputMessage.Topic()].Nack(inputMessage)
gi.respondMessage(inputMessage, false)
}

func (gi *goInstance) respondMessage(inputMessage pulsar.Message, ack bool) {
topicName, err := ParseTopicName(inputMessage.Topic())
if err != nil {
log.Errorf("unable respond to message ID %s - invalid topic: %v", messageIDStr(inputMessage), err)
return
}
// consumers are indexed by topic name only (no partition)
if ack {
gi.consumers[topicName.NameWithoutPartition()].Ack(inputMessage)
return
}
gi.consumers[topicName.NameWithoutPartition()].Nack(inputMessage)
}

func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
Expand Down
11 changes: 11 additions & 0 deletions pulsar-function-go/pf/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package pf

import (
"fmt"

"github.com/apache/pulsar-client-go/pulsar"
)

func getProperties(fullyQualifiedName string, instanceID int) map[string]string {
Expand All @@ -39,3 +41,12 @@ func getDefaultSubscriptionName(tenant, namespace, name string) string {
func getFullyQualifiedInstanceID(tenant, namespace, name string, instanceID int) string {
return fmt.Sprintf("%s/%s/%s:%d", tenant, namespace, name, instanceID)
}

func messageIDStr(msg pulsar.Message) string {
// <ledger ID>:<entry ID>:<partition index>:<batch index>
return fmt.Sprintf("%d:%d:%d:%d",
msg.ID().LedgerID(),
msg.ID().EntryID(),
msg.ID().PartitionIdx(),
msg.ID().BatchIdx())
}

0 comments on commit b6c2743

Please sign in to comment.