Skip to content

Commit 4af100b

Browse files
Merge pull request #3604 from VajiraPrabuddhaka/multi-tenant-asb-choreo
[choreo] multi-tenanting ASB: Add topic wise ASB event listeners
2 parents f2d432c + b3e3181 commit 4af100b

File tree

5 files changed

+169
-109
lines changed

5 files changed

+169
-109
lines changed

adapter/config/types.go

+10
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ type controlPlane struct {
544544
SyncApisOnStartUp bool
545545
SendRevisionUpdate bool
546546
EnvironmentLabels []string
547+
ASBDataplaneTopics []asbDataplaneTopic `toml:"asbDataplaneTopics"`
547548
DynamicEnvironments dynamicEnvironments
548549
RetryInterval time.Duration
549550
SkipSSLVerification bool
@@ -553,6 +554,15 @@ type controlPlane struct {
553554
InitialFetch initialFetch
554555
}
555556

557+
type asbDataplaneTopic struct {
558+
Type string `toml:"type"`
559+
TopicName string `toml:"topicName"`
560+
ConnectionString string `toml:"connectionString"`
561+
AmqpOverWebsocketsEnabled bool
562+
ReconnectInterval time.Duration
563+
ReconnectRetryCount int
564+
}
565+
556566
type dynamicEnvironments struct {
557567
Enabled bool
558568
DataPlaneID string

adapter/internal/messaging/azure_listener.go

+82-18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
2424
"net"
2525
"nhooyr.io/websocket"
26+
"os"
27+
"strconv"
2628
"time"
2729

2830
"github.com/wso2/product-microgateway/adapter/config"
@@ -34,16 +36,89 @@ import (
3436
const (
3537
componentName = "adapter"
3638
subscriptionIdleTimeDuration = "P0Y0M3DT0H0M0S"
39+
notification = "notification"
40+
tokenRevocation = "tokenRevocation"
41+
stepQuotaThreshold = "thresholdEvent"
42+
stepQuotaReset = "billingCycleResetEvent"
43+
organizationPurge = "organizationPurge"
3744
)
3845

46+
var topicNames = []string{tokenRevocation, notification, stepQuotaThreshold, stepQuotaReset}
47+
48+
const orgPurgeEnabled = "ORG_PURGE_ENABLED"
49+
50+
func init() {
51+
// Temporarily disable reacting organization Purge
52+
orgPurgeEnabled, envParseErr := strconv.ParseBool(os.Getenv(orgPurgeEnabled))
53+
54+
if envParseErr == nil {
55+
if orgPurgeEnabled {
56+
topicNames = append(topicNames, organizationPurge)
57+
}
58+
}
59+
}
60+
3961
// InitiateAndProcessEvents to pass event consumption
4062
func InitiateAndProcessEvents(config *config.Config) {
41-
var err error
42-
var reconnectRetryCount = config.ControlPlane.BrokerConnectionParameters.ReconnectRetryCount
43-
var reconnectInterval = config.ControlPlane.BrokerConnectionParameters.ReconnectInterval
63+
if len(config.ControlPlane.ASBDataplaneTopics) > 0 {
64+
for _, topic := range config.ControlPlane.ASBDataplaneTopics {
65+
subscription, err := msg.InitiateBrokerConnectionAndValidate(
66+
topic.ConnectionString,
67+
topic.TopicName,
68+
getAmqpClientOptions(config),
69+
componentName,
70+
topic.ReconnectRetryCount,
71+
topic.ReconnectInterval*time.Millisecond,
72+
subscriptionIdleTimeDuration)
73+
if err != nil {
74+
logger.LoggerMgw.Errorf("Error while initiating broker connection for topic %s: %v", topic.TopicName, err)
75+
health.SetControlPlaneBrokerStatus(false)
76+
return
77+
}
78+
msg.InitiateConsumer(subscription, topic.Type)
79+
startChannelConsumer(topic.Type)
80+
logger.LoggerMgw.Infof("Broker connection initiated and lsitening on topic %s...", topic.TopicName)
81+
}
82+
health.SetControlPlaneBrokerStatus(true)
83+
} else {
84+
for _, topic := range topicNames {
85+
connectionString := config.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints[0]
86+
reconnectRetryCount := config.ControlPlane.BrokerConnectionParameters.ReconnectRetryCount
87+
reconnectInterval := config.ControlPlane.BrokerConnectionParameters.ReconnectInterval
88+
89+
subscription, err := msg.InitiateBrokerConnectionAndValidate(
90+
connectionString,
91+
topic,
92+
getAmqpClientOptions(config),
93+
componentName,
94+
reconnectRetryCount,
95+
reconnectInterval*time.Millisecond,
96+
subscriptionIdleTimeDuration)
97+
if err != nil {
98+
logger.LoggerMgw.Errorf("Error while initiating broker connection for topic %s: %v", topic, err)
99+
health.SetControlPlaneBrokerStatus(false)
100+
return
101+
}
102+
msg.InitiateConsumer(subscription, topic)
103+
startChannelConsumer(topic)
104+
logger.LoggerMgw.Infof("Broker connection initiated and lsitening on topic %s...", topic)
105+
}
106+
health.SetControlPlaneBrokerStatus(true)
107+
}
108+
}
109+
110+
func startChannelConsumer(consumerType string) {
111+
switch consumerType {
112+
case notification:
113+
go handleAzureNotification()
114+
case tokenRevocation:
115+
go handleAzureTokenRevocation()
116+
case organizationPurge:
117+
go handleAzureOrganizationPurge()
118+
}
119+
}
44120

45-
connectionString := config.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints[0]
46-
var clientOpts *azservicebus.ClientOptions
121+
func getAmqpClientOptions(config *config.Config) *azservicebus.ClientOptions {
47122
if config.ControlPlane.BrokerConnectionParameters.AmqpOverWebsocketsEnabled {
48123
logger.LoggerMgw.Info("AMQP over Websockets is enabled. Initiating brokers with AMQP over Websockets.")
49124
newWebSocketConnFn := func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) {
@@ -54,20 +129,9 @@ func InitiateAndProcessEvents(config *config.Config) {
54129
}
55130
return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil
56131
}
57-
clientOpts = &azservicebus.ClientOptions{
132+
return &azservicebus.ClientOptions{
58133
NewWebSocketConn: newWebSocketConnFn,
59134
}
60135
}
61-
62-
subscriptionMetaDataList, err := msg.InitiateBrokerConnectionAndValidate(connectionString, clientOpts, componentName,
63-
reconnectRetryCount, reconnectInterval*time.Millisecond, subscriptionIdleTimeDuration)
64-
health.SetControlPlaneBrokerStatus(err == nil)
65-
if err == nil {
66-
logger.LoggerMgw.Info("Service bus meta data successfully initialized.")
67-
msg.InitiateConsumers(connectionString, clientOpts, subscriptionMetaDataList, reconnectInterval*time.Millisecond)
68-
go handleAzureNotification()
69-
go handleAzureTokenRevocation()
70-
go handleAzureOrganizationPurge()
71-
}
72-
136+
return nil
73137
}

adapter/pkg/messaging/azure_connection.go

+52-81
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,29 @@ package messaging
2121
import (
2222
"context"
2323
"errors"
24-
"os"
24+
"fmt"
2525
"regexp"
2626
"strconv"
2727
"time"
2828

2929
asb "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
3030
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
3131
"github.com/google/uuid"
32-
"github.com/sirupsen/logrus"
3332
logger "github.com/wso2/product-microgateway/adapter/pkg/loggers"
3433
)
3534

36-
// TODO: (erandi) when refactoring, refactor organization purge flow as well
37-
var bindingKeys = []string{tokenRevocation, notification, stepQuotaThreshold, stepQuotaReset}
38-
39-
const orgPurgeEnabled = "ORG_PURGE_ENABLED"
40-
41-
// Subscription stores the meta data of a specific subscription
35+
// Subscription stores the metadata of a specific subscription
36+
// TopicName: the topic name of the subscription
37+
// SubscriptionName: the name of the subscription
38+
// ConnectionString: the connection string of the service bus
39+
// ClientOptions: the client options for initiating the client
40+
// ReconnectInterval: the interval to wait before reconnecting
4241
type Subscription struct {
43-
topicName string
44-
subscriptionName string
42+
TopicName string
43+
SubscriptionName string
44+
ConnectionString string
45+
ClientOptions *asb.ClientOptions
46+
ReconnectInterval time.Duration
4547
}
4648

4749
var (
@@ -63,107 +65,76 @@ func init() {
6365
AzureStepQuotaThresholdChannel = make(chan []byte)
6466
AzureStepQuotaResetChannel = make(chan []byte)
6567
AzureOrganizationPurgeChannel = make(chan []byte)
66-
67-
// Temporarily disable reacting organization Purge
68-
orgPurgeEnabled, envParseErr := strconv.ParseBool(os.Getenv(orgPurgeEnabled))
69-
70-
if envParseErr == nil {
71-
if orgPurgeEnabled {
72-
bindingKeys = append(bindingKeys, organizationPurge)
73-
}
74-
}
7568
}
7669

7770
// InitiateBrokerConnectionAndValidate to initiate connection and validate azure service bus constructs to
7871
// further process
79-
func InitiateBrokerConnectionAndValidate(connectionString string, clientOptions *asb.ClientOptions, componentName string, reconnectRetryCount int,
80-
reconnectInterval time.Duration, subscriptionIdleTimeDuration string) ([]Subscription, error) {
81-
subscriptionMetaDataList := make([]Subscription, 0)
72+
func InitiateBrokerConnectionAndValidate(connectionString string, topic string, clientOptions *asb.ClientOptions, componentName string, reconnectRetryCount int,
73+
reconnectInterval time.Duration, subscriptionIdleTimeDuration string) (*Subscription, error) {
8274
subProps := &admin.SubscriptionProperties{
8375
AutoDeleteOnIdle: &subscriptionIdleTimeDuration,
8476
}
8577
_, err := asb.NewClientFromConnectionString(connectionString, clientOptions)
8678

8779
if err == nil {
88-
if logger.LoggerMsg.IsLevelEnabled(logrus.DebugLevel) {
89-
logger.LoggerMsg.Debugf("ASB client initialized for connection url: %s", maskSharedAccessKey(connectionString))
90-
}
80+
logger.LoggerMsg.Debugf("ASB client initialized for connection url: %s", maskSharedAccessKey(connectionString))
9181

9282
for j := 0; j < reconnectRetryCount || reconnectRetryCount == -1; j++ {
93-
err = nil
94-
subscriptionMetaDataList, err = retrieveSubscriptionMetadata(subscriptionMetaDataList,
95-
connectionString, componentName, subProps)
83+
sub, err := retrieveSubscriptionMetadataForTopic(connectionString, topic,
84+
clientOptions, componentName, subProps, reconnectInterval)
9685
if err != nil {
9786
logError(reconnectRetryCount, reconnectInterval, err)
98-
subscriptionMetaDataList = nil
9987
time.Sleep(reconnectInterval)
10088
continue
10189
}
102-
return subscriptionMetaDataList, err
103-
}
104-
if err != nil {
105-
logger.LoggerMsg.Errorf("%v. Retry attempted %d times.", err, reconnectRetryCount)
106-
return subscriptionMetaDataList, err
90+
return sub, err
10791
}
108-
} else {
109-
// any error which comes to this point is because the connection url is not up to the expected format
110-
// hence not retrying
111-
logger.LoggerMsg.Errorf("Error occurred while trying to create ASB client using the connection url %s, err: %v",
112-
connectionString, err)
92+
return nil, fmt.Errorf("failed to create subscription for topic %s", topic)
11393
}
114-
return subscriptionMetaDataList, err
94+
logger.LoggerMsg.Errorf("Error occurred while trying to create ASB client using the connection url %s, err: %v",
95+
maskSharedAccessKey(connectionString), err)
96+
return nil, err
11597
}
11698

117-
// InitiateConsumers to pass event consumption
118-
func InitiateConsumers(connectionString string, clientOptions *asb.ClientOptions, subscriptionMetaDataList []Subscription, reconnectInterval time.Duration) {
119-
for _, subscriptionMetaData := range subscriptionMetaDataList {
120-
go func(subscriptionMetaData Subscription) {
121-
startBrokerConsumer(connectionString, clientOptions, subscriptionMetaData, reconnectInterval)
122-
}(subscriptionMetaData)
123-
}
99+
// InitiateConsumer to start the broker consumer in a separate go routine
100+
func InitiateConsumer(sub *Subscription, consumerType string) {
101+
go startBrokerConsumer(sub, consumerType)
124102
}
125103

126-
func retrieveSubscriptionMetadata(metaDataList []Subscription, connectionString string, componentName string,
127-
opts *admin.SubscriptionProperties) ([]Subscription, error) {
128-
parentContext := context.Background()
104+
func retrieveSubscriptionMetadataForTopic(connectionString string, topicName string, clientOptions *asb.ClientOptions,
105+
componentName string, opts *admin.SubscriptionProperties, reconnectInterval time.Duration) (*Subscription, error) {
106+
ctx, cancel := context.WithCancel(context.Background())
107+
defer cancel()
129108
adminClient, clientErr := admin.NewClientFromConnectionString(connectionString, nil)
130109
if clientErr != nil {
131110
logger.LoggerMsg.Errorf("Error occurred while trying to create ASB admin client using the connection url %s", connectionString)
132111
return nil, clientErr
133112
}
134113

135-
for _, key := range bindingKeys {
136-
var errorValue error
137-
subscriptionMetaData := Subscription{
138-
topicName: key,
139-
subscriptionName: "",
140-
}
141-
// we are creating a unique subscription for each adapter starts. Unused subscriptions will be deleted after
142-
// idle for three days
143-
uniqueID := uuid.New()
144-
145-
// in ASB, subscription names can contain letters, numbers, periods (.), hyphens (-), and
146-
// underscores (_), up to 50 characters. Subscription names are also case-insensitive.
147-
var subscriptionName = componentName + "_" + uniqueID.String() + "_sub"
148-
var subscriptionCreationError error
149-
func() {
150-
ctx, cancel := context.WithCancel(parentContext)
151-
defer cancel()
152-
_, subscriptionCreationError = adminClient.CreateSubscription(ctx, key, subscriptionName, &admin.CreateSubscriptionOptions{
153-
Properties: opts,
154-
})
155-
}()
156-
if subscriptionCreationError != nil {
157-
errorValue = errors.New("Error occurred while trying to create subscription " + subscriptionName + " in ASB for topic name " +
158-
key + "." + subscriptionCreationError.Error())
159-
return metaDataList, errorValue
160-
}
161-
logger.LoggerMsg.Debugf("Subscription %s created.", subscriptionName)
162-
subscriptionMetaData.subscriptionName = subscriptionName
163-
subscriptionMetaData.topicName = key
164-
metaDataList = append(metaDataList, subscriptionMetaData)
114+
// we are creating a unique subscription for each adapter starts. Unused subscriptions will be deleted after
115+
// idle for three days
116+
117+
// in ASB, subscription names can contain letters, numbers, periods (.), hyphens (-), and
118+
// underscores (_), up to 50 characters. Subscription names are also case-insensitive.
119+
subscriptionName := fmt.Sprintf("%s_%s_sub", componentName, uuid.New().String())
120+
_, err := adminClient.CreateSubscription(ctx, topicName, subscriptionName, &admin.CreateSubscriptionOptions{
121+
Properties: opts,
122+
})
123+
124+
if err != nil {
125+
return nil, errors.New("Error occurred while trying to create subscription " + subscriptionName + " in ASB for topic name " +
126+
topicName + "." + err.Error())
165127
}
166-
return metaDataList, nil
128+
129+
logger.LoggerMsg.Debugf("Subscription %s created.", subscriptionName)
130+
131+
return &Subscription{
132+
TopicName: topicName,
133+
SubscriptionName: subscriptionName,
134+
ConnectionString: connectionString,
135+
ClientOptions: clientOptions,
136+
ReconnectInterval: reconnectInterval,
137+
}, nil
167138
}
168139

169140
func logError(reconnectRetryCount int, reconnectInterval time.Duration, errVal error) {

adapter/pkg/messaging/azure_listener.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,27 @@ import (
2727
logger "github.com/wso2/product-microgateway/adapter/pkg/loggers"
2828
)
2929

30-
func startBrokerConsumer(connectionString string, clientOptions *asb.ClientOptions, sub Subscription, reconnectInterval time.Duration) {
31-
var topic = sub.topicName
32-
var subName = sub.subscriptionName
30+
func startBrokerConsumer(sub *Subscription, consumerType string) {
31+
var topic = sub.TopicName
32+
var subName = sub.SubscriptionName
3333

3434
dataChannel := make(chan []byte)
35-
if strings.EqualFold(topic, notification) {
35+
if strings.EqualFold(consumerType, notification) {
3636
dataChannel = AzureNotificationChannel
37-
} else if strings.EqualFold(topic, tokenRevocation) {
37+
} else if strings.EqualFold(consumerType, tokenRevocation) {
3838
dataChannel = AzureRevokedTokenChannel
39-
} else if strings.EqualFold(topic, stepQuotaThreshold) {
39+
} else if strings.EqualFold(consumerType, stepQuotaThreshold) {
4040
dataChannel = AzureStepQuotaThresholdChannel
41-
} else if strings.EqualFold(topic, stepQuotaReset) {
41+
} else if strings.EqualFold(consumerType, stepQuotaReset) {
4242
dataChannel = AzureStepQuotaResetChannel
43-
} else if strings.EqualFold(topic, organizationPurge) {
43+
} else if strings.EqualFold(consumerType, organizationPurge) {
4444
dataChannel = AzureOrganizationPurgeChannel
4545
}
4646
parentContext := context.Background()
4747

4848
for {
4949
// initializing the receiver client
50-
subClient, err := asb.NewClientFromConnectionString(connectionString, clientOptions)
50+
subClient, err := asb.NewClientFromConnectionString(sub.ConnectionString, sub.ClientOptions)
5151
if err != nil {
5252
logger.LoggerMsg.Errorf("Failed to create ASB client for %s , topic: %s. error: %v.",
5353
subName, topic, err)
@@ -70,7 +70,7 @@ func startBrokerConsumer(connectionString string, clientOptions *asb.ClientOptio
7070
messages, err := receiver.ReceiveMessages(ctx, 10, nil)
7171
if err != nil {
7272
logger.LoggerMsg.Errorf("Failed to receive messages from ASB. Subscription: %s, topic: %s error: %v", subName, topic, err)
73-
time.Sleep(reconnectInterval)
73+
time.Sleep(sub.ReconnectInterval)
7474
continue
7575
}
7676
for _, message := range messages {

0 commit comments

Comments
 (0)