Skip to content

Commit

Permalink
Event Hub scalar expansion to work with Java and C# applications (#517)
Browse files Browse the repository at this point in the history
* Event Hub scalar expansion to work with Java and C# applications
- Made changes to event hub scalar codes

* Fixed variable declaration and value assignment bug for Event Hub Scalar
  • Loading branch information
rasavant-ms authored and ahmelsayed committed Dec 19, 2019
1 parent fa13062 commit 7789f5f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pkg/scalers/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type baseCheckpoint struct {

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk
// stored by the eventhub C# sdk and Java sdk
type Checkpoint struct {
baseCheckpoint
PartitionID string `json:"PartitionId"`
Expand Down Expand Up @@ -86,7 +86,16 @@ func GetCheckpointFromBlobStorage(ctx context.Context, partitionID string, event
}

// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
u, _ := url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID))
var u *url.URL
// Checking blob store for C# and Java applications
if eventHubMetadata.blobContainer != "" {
// URL format - <endpointProtocol>://<storageAccountName>.blob.<endpointSuffix>/<blobContainer>/<eventHubConsumerGroup>/<partitionID>
u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubMetadata.blobContainer, eventHubMetadata.eventHubConsumerGroup, partitionID))
} else {
// Checking blob store for Azure functions
// URL format - <endpointProtocol>://<storageAccountName>.blob.<endpointSuffix>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
u, _ = url.Parse(fmt.Sprintf("%s://%s.blob.%s/azure-webjobs-eventhub/%s/%s/%s/%s", endpointProtocol, storageAccountName, endpointSuffix, eventHubNamespace, eventHubName, eventHubMetadata.eventHubConsumerGroup, partitionID))
}

_, cred, err := GetStorageCredentials(eventHubMetadata.storageConnection)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
defaultEventHubConsumerGroup = "$Default"
defaultEventHubConnectionSetting = "EventHub"
defaultStorageConnectionSetting = "AzureWebJobsStorage"
defaultBlobContainer = ""
)

var eventhubLog = logf.Log.WithName("azure_eventhub_scaler")
Expand All @@ -38,6 +39,7 @@ type EventHubMetadata struct {
eventHubConsumerGroup string
threshold int64
storageConnection string
blobContainer string
}

// NewAzureEventHubScaler creates a new scaler for eventHub
Expand Down Expand Up @@ -105,6 +107,11 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event
meta.eventHubConsumerGroup = val
}

meta.blobContainer = defaultBlobContainer
if val, ok := metadata["blobContainer"]; ok {
meta.blobContainer = val
}

return &meta, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{
{map[string]string{"storageConnection": storageConnectionSetting, "connection": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false},
// missing unprocessed event threshold - should replace with default
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting}, false},
// added blob container details
{map[string]string{"storageConnection": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connection": eventHubConnectionSetting, "blobContainer": testContainerName}, false},
}

var testEventHubScaler = AzureEventHubScaler{
Expand Down

0 comments on commit 7789f5f

Please sign in to comment.