From 26b7f61606b219572a19ccb568add912d581ad99 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Tue, 16 Feb 2021 15:51:08 +0100 Subject: [PATCH 1/7] language agnostic checkpointing for azure eventhub scaler Signed-off-by: Christian Leinweber --- pkg/scalers/azure/azure_eventhub.go | 119 ++------ .../azure/azure_eventhub_checkpoint.go | 220 +++++++++++++++ pkg/scalers/azure/azure_eventhub_test.go | 255 ++++++++++++++++-- pkg/scalers/azure_eventhub_scaler.go | 8 +- pkg/scalers/azure_eventhub_scaler_test.go | 2 +- 5 files changed, 477 insertions(+), 127 deletions(-) create mode 100644 pkg/scalers/azure/azure_eventhub_checkpoint.go diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 80ac819d904..0f2afb2031c 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -1,48 +1,15 @@ package azure import ( - "bytes" - "context" - "encoding/json" "errors" "fmt" - "net/url" "strings" - "github.com/imdario/mergo" - "github.com/Azure/azure-amqp-common-go/v3/aad" eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/go-autorest/autorest/azure" - - kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" - "github.com/kedacore/keda/v2/pkg/util" ) -type baseCheckpoint struct { - Epoch int64 `json:"Epoch"` - Offset string `json:"Offset"` - Owner string `json:"Owner"` - Token string `json:"Token"` -} - -// Checkpoint is the object eventhub processor stores in storage -// for checkpointing event processors. This matches the object -// stored by the eventhub C# sdk and Java sdk -type Checkpoint struct { - baseCheckpoint - PartitionID string `json:"PartitionId"` - SequenceNumber int64 `json:"SequenceNumber"` -} - -// Eventhub python sdk stores the checkpoint differently -type pythonCheckpoint struct { - baseCheckpoint - PartitionID string `json:"partition_id"` - SequenceNumber int64 `json:"sequence_number"` -} - // EventHubInfo to keep event hub connection and resources type EventHubInfo struct { EventHubConnection string @@ -51,6 +18,7 @@ type EventHubInfo struct { BlobContainer string Namespace string EventHubName string + CheckpointType string } // GetEventHubClient returns eventhub client @@ -80,74 +48,6 @@ func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) { return nil, aadErr } -// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition -func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { - blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "") - if err != nil { - return Checkpoint{}, err - } - - var eventHubNamespace string - var eventHubName string - if info.EventHubConnection != "" { - eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection) - if err != nil { - return Checkpoint{}, err - } - } else { - eventHubNamespace = info.Namespace - eventHubName = info.EventHubName - } - - // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats - var baseURL *url.URL - // Checking blob store for C# and Java applications - if info.BlobContainer != "" { - // URL format - /// - path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, partitionID)) - baseURL = storageEndpoint.ResolveReference(path) - } else { - // Checking blob store for Azure functions - // URL format - /azure-webjobs-eventhub//// - path, _ := url.Parse(fmt.Sprintf("/azure-webjobs-eventhub/%s/%s/%s/%s", eventHubNamespace, eventHubName, info.EventHubConsumerGroup, partitionID)) - baseURL = storageEndpoint.ResolveReference(path) - } - - // Create a BlockBlobURL object to a blob in the container. - blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{})) - - get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err) - } - - blobData := &bytes.Buffer{} - reader := get.Body(azblob.RetryReaderOptions{}) - if _, err := blobData.ReadFrom(reader); err != nil { - return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err) - } - defer reader.Close() // The client must close the response body when finished with it - - return getCheckpoint(blobData.Bytes()) -} - -func getCheckpoint(bytes []byte) (Checkpoint, error) { - var checkpoint Checkpoint - var pyCheckpoint pythonCheckpoint - - if err := json.Unmarshal(bytes, &checkpoint); err != nil { - return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) - } - - if err := json.Unmarshal(bytes, &pyCheckpoint); err != nil { - return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) - } - - err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint)) - - return checkpoint, err -} - // ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name) // Connection string should be in following format: // Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name @@ -177,3 +77,20 @@ func ParseAzureEventHubConnectionString(connectionString string) (string, string return eventHubNamespace, eventHubName, nil } + +func getHubAndNamespace(info EventHubInfo) (string, string, error) { + var eventHubNamespace string + var eventHubName string + var err error + if info.EventHubConnection != "" { + eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection) + if err != nil { + return "", "", err + } + } else { + eventHubNamespace = info.Namespace + eventHubName = info.EventHubName + } + + return eventHubNamespace, eventHubName, nil +} diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go new file mode 100644 index 00000000000..b643a43f852 --- /dev/null +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -0,0 +1,220 @@ +package azure + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + "strconv" + + "github.com/Azure/azure-storage-blob-go/azblob" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/util" +) + +// goCheckpoint struct to adapt GoSdk Checkpoint +type goCheckpoint struct { + Checkpoint struct { + SequenceNumber int64 `json:"sequenceNumber"` + Offset string `json:"offset"` + } `json:"checkpoint"` + PartitionID string `json:"partitionId"` +} + +// Checkpoint is the object eventhub processor stores in storage +// for checkpointing event processors. This matches the object +// stored by the eventhub C# sdk and Java sdk +type Checkpoint struct { + Epoch int64 `json:"Epoch"` + Offset string `json:"Offset"` + Owner string `json:"Owner"` + Token string `json:"Token"` + PartitionID string `json:"PartitionId"` + SequenceNumber int64 `json:"SequenceNumber"` +} + +type checkpointer interface { + resolvePath(info EventHubInfo) (*url.URL, error) + extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) +} + +type azureWebjobCheckpointer struct { + partitionID string + containerName string +} + +type defaultCheckpointer struct { + partitionID string + containerName string +} + +type goSdkCheckpointer struct { + partitionID string + containerName string +} + +// GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage +func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { + + checkpointer := newCheckpointer(info, partitionID) + return getCheckpoint(ctx, httpClient, info, checkpointer) +} + +func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { + if info.CheckpointType == "GoSdk" { + return &goSdkCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } + } else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" { + return &azureWebjobCheckpointer{ + containerName: "azure-webjobs-eventhub", + partitionID: partitionID, + } + } else { + return &defaultCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } + } +} + +func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + eventHubNamespace, eventHubName, err := getHubAndNamespace(info) + if err != nil { + return nil, err + } + + // URL format - /azure-webjobs-eventhub//// + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) + + return path, nil +} + +func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + eventHubNamespace, eventHubName, err := getHubAndNamespace(info) + if err != nil { + return nil, err + } + + // URL format - /azure-webjobs-eventhub//// + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) + + return path, nil +} + +// Resolve Path for AzureWebJob Checkpoint +func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID)) + + return path, nil +} + +func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + err := readToCheckpointFromBody(get, &checkpoint) + if err != nil { + return Checkpoint{}, err + } + + return checkpoint, nil +} + +func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) +} + +func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint goCheckpoint + err := readToCheckpointFromBody(get, &checkpoint) + if err != nil { + return Checkpoint{}, err + } + + return Checkpoint{ + SequenceNumber: checkpoint.Checkpoint.SequenceNumber, + Offset: checkpoint.Checkpoint.Offset, + PartitionID: checkpoint.PartitionID, + }, nil +} + +func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) { + blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "") + if err != nil { + return Checkpoint{}, err + } + + path, err := checkpointer.resolvePath(info) + if err != nil { + return Checkpoint{}, err + } + + baseURL := storageEndpoint.ResolveReference(path) + + get, err := downloadBlob(ctx, baseURL, blobCreds) + if err != nil { + return Checkpoint{}, err + } + + return checkpointer.extractCheckpoint(get) +} + +func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID string) (Checkpoint, error) { + checkpoint := Checkpoint{ + PartitionID: partitionID, + } + + metadata := get.NewMetadata() + + if sequencenumber, ok := metadata["sequencenumber"]; ok { + if !ok { + if sequencenumber, ok = metadata["Sequencenumber"]; !ok { + return Checkpoint{}, fmt.Errorf("sequencenumber on blob not found") + } + } + + if sn, err := strconv.ParseInt(sequencenumber, 10, 64); err == nil { + checkpoint.SequenceNumber = sn + } else { + return Checkpoint{}, fmt.Errorf("sequencenumber is not a valid int64 value: %w", err) + } + } + + if offset, ok := metadata["offset"]; ok { + if !ok { + if offset, ok = metadata["Offset"]; !ok { + return Checkpoint{}, fmt.Errorf("offset on blob not found") + } + } + checkpoint.Offset = offset + } + + return checkpoint, nil +} + +func readToCheckpointFromBody(get *azblob.DownloadResponse, checkpoint interface{}) error { + blobData := &bytes.Buffer{} + + reader := get.Body(azblob.RetryReaderOptions{}) + if _, err := blobData.ReadFrom(reader); err != nil { + return fmt.Errorf("failed to read blob data: %s", err) + } + defer reader.Close() // The client must close the response body when finished with it + + if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil { + return fmt.Errorf("failed to decode blob data: %s", err) + } + + return nil +} + +func downloadBlob(ctx context.Context, baseURL *url.URL, blobCreds azblob.Credential) (*azblob.DownloadResponse, error) { + blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{})) + + get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to download file from blob storage: %w", err) + } + return get, nil +} diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index c7e89fe4a46..a21a6cd4b05 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -1,37 +1,244 @@ package azure import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" "testing" - "github.com/stretchr/testify/assert" + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/go-playground/assert/v2" ) -const csharpSdkCheckpoint = `{ - "Epoch": 123456, - "Offset": "test offset", - "Owner": "test owner", - "PartitionId": "test partitionId", - "SequenceNumber": 12345 - }` - -const pythonSdkCheckpoint = `{ - "epoch": 123456, - "offset": "test offset", - "owner": "test owner", - "partition_id": "test partitionId", - "sequence_number": 12345 - }` - -func TestGetCheckpoint(t *testing.T) { - cckp, err := getCheckpoint([]byte(csharpSdkCheckpoint)) - if err != nil { - t.Error(err) +// Add a valid Storage account connection string here +const StorageConnectionString = "" + +func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1001" + consumerGroup := "$Default" + + sequencenumber := int64(1) + + containerName := "azure-webjobs-eventhub" + checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + Offset: offset, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1002" + consumerGroup := "$default" + + sequencenumber := int64(1) + + metadata := map[string]string{ + "offset": offset, + "sequencenumber": strconv.FormatInt(sequencenumber, 10), + } + + containerName := "defaultcontainer" + urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/checkpoint/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, "", metadata) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + Offset: offset, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { + + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1003" + + sequencenumber := int64(1) + + containerName := "gosdkcontainer" + checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" + checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber) + + urlPath := "" + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + Offset: offset, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubName: "hub", + BlobContainer: containerName, + CheckpointType: "GoSdk", } - pckp, err := getCheckpoint([]byte(pythonSdkCheckpoint)) + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestShouldParseCheckpointForWebJob(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") +} + +func TestShouldParseCheckpointForWebJobWithCheckpointType(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + CheckpointType: "AzureWebJob", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") +} + +func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + CheckpointType: "Default", + BlobContainer: "containername", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") +} + +func TestShouldParseCheckpointForDefault(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "containername", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") +} + +func TestShouldParseCheckpointForGoSdk(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "containername", + CheckpointType: "GoSdk", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/0") +} + +func Test(t *testing.T) { + +} + +func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) { + + credential, endpoint, _ := ParseAzureStorageBlobConnection(http.DefaultClient, "none", StorageConnectionString, "") + + // Create container + ctx := context.Background() + path, _ := url.Parse(containerName) + url := endpoint.ResolveReference(path) + containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{})) + _, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) + + err = errors.Unwrap(err) if err != nil { - t.Error(err) + if stErr, ok := err.(azblob.StorageError); ok { + if stErr.ServiceCode() == azblob.ServiceCodeContainerAlreadyExists { + return ctx, fmt.Errorf("failed to create container: %s", err) + } + } } - assert.Equal(t, cckp, pckp) + blobFolderURL := containerURL.NewBlockBlobURL(urlPath + partitionID) + + var b bytes.Buffer + b.WriteString(checkpoint) + + // Upload file + _, err = azblob.UploadBufferToBlockBlob(ctx, b.Bytes(), blobFolderURL, azblob.UploadToBlockBlobOptions{ + BlockSize: 4 * 1024 * 1024, + Metadata: metadata, + Parallelism: 16}) + if err != nil { + return ctx, fmt.Errorf("Err uploading file to blob: %s", err) + } + return ctx, nil } diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index c754c645bc3..decf50de81a 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -28,6 +28,7 @@ const ( thresholdMetricName = "unprocessedEventThreshold" defaultEventHubConsumerGroup = "$Default" defaultBlobContainer = "" + defaultCheckpointType = "" ) var eventhubLog = logf.Log.WithName("azure_eventhub_scaler") @@ -93,6 +94,11 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.EventHubConsumerGroup = val } + meta.eventHubInfo.CheckpointType = defaultCheckpointType + if val, ok := config.TriggerMetadata["checkpointType"]; ok { + meta.eventHubInfo.CheckpointType = val + } + meta.eventHubInfo.BlobContainer = defaultBlobContainer if val, ok := config.TriggerMetadata["blobContainer"]; ok { meta.eventHubInfo.BlobContainer = val @@ -145,7 +151,7 @@ func (scaler *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx conte // if blob not found return the total partition event count err = errors.Unwrap(err) if stErr, ok := err.(azblob.StorageError); ok { - if stErr.ServiceCode() == azblob.ServiceCodeBlobNotFound { + if stErr.ServiceCode() == azblob.ServiceCodeBlobNotFound || stErr.ServiceCode() == azblob.ServiceCodeContainerNotFound { return GetUnprocessedEventCountWithoutCheckpoint(partitionInfo), azure.Checkpoint{}, nil } } diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 0f33d520872..dc6573362c3 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -49,7 +49,7 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false}, // added blob container details - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName}, false}, + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointType": "AzureWebJob"}, false}, } var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{ From fe608182f382a21007e176c4a88807cb67945024 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Mon, 12 Apr 2021 01:12:58 +0200 Subject: [PATCH 2/7] add default checkpointer for backward compatibility to azure eventhub scaler Signed-off-by: Christian Leinweber --- go.mod | 1 + .../azure/azure_eventhub_checkpoint.go | 112 ++++++++++++---- pkg/scalers/azure/azure_eventhub_test.go | 120 ++++++++++++++++-- 3 files changed, 196 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index e871b5e1ac0..ae6dbb821ae 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/go-logr/logr v0.4.0 github.com/go-logr/zapr v0.4.0 // indirect github.com/go-openapi/spec v0.20.3 + github.com/go-playground/assert/v2 v2.0.1 github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.6.0 github.com/golang/mock v1.5.0 diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index b643a43f852..fd131758bdd 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -9,6 +9,7 @@ import ( "strconv" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/imdario/mergo" kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" "github.com/kedacore/keda/v2/pkg/util" ) @@ -22,18 +23,27 @@ type goCheckpoint struct { PartitionID string `json:"partitionId"` } -// Checkpoint is the object eventhub processor stores in storage -// for checkpointing event processors. This matches the object -// stored by the eventhub C# sdk and Java sdk +type baseCheckpoint struct { + Epoch int64 `json:"Epoch"` + Offset string `json:"Offset"` + Owner string `json:"Owner"` + Token string `json:"Token"` +} + +// Checkpoint in a common format type Checkpoint struct { - Epoch int64 `json:"Epoch"` - Offset string `json:"Offset"` - Owner string `json:"Owner"` - Token string `json:"Token"` + baseCheckpoint PartitionID string `json:"PartitionId"` SequenceNumber int64 `json:"SequenceNumber"` } +// Older python sdk stores the checkpoint differently +type pythonCheckpoint struct { + baseCheckpoint + PartitionID string `json:"partition_id"` + SequenceNumber int64 `json:"sequence_number"` +} + type checkpointer interface { resolvePath(info EventHubInfo) (*url.URL, error) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) @@ -44,7 +54,7 @@ type azureWebjobCheckpointer struct { containerName string } -type defaultCheckpointer struct { +type blobMetadataCheckpointer struct { partitionID string containerName string } @@ -54,9 +64,13 @@ type goSdkCheckpointer struct { containerName string } +type defaultCheckpointer struct { + partitionID string + containerName string +} + // GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { - checkpointer := newCheckpointer(info, partitionID) return getCheckpoint(ctx, httpClient, info, checkpointer) } @@ -67,6 +81,11 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { containerName: info.BlobContainer, partitionID: partitionID, } + } else if info.CheckpointType == "BlobMetadata" { + return &blobMetadataCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } } else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" { return &azureWebjobCheckpointer{ containerName: "azure-webjobs-eventhub", @@ -80,63 +99,100 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { } } +// resolve path for AzureWebJobCheckpointer func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { eventHubNamespace, eventHubName, err := getHubAndNamespace(info) if err != nil { return nil, err } - // URL format - /azure-webjobs-eventhub//// path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) return path, nil } -func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { +// extract checkpoint for AzureWebJobCheckpointer +func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + err := readToCheckpointFromBody(get, &checkpoint) + if err != nil { + return Checkpoint{}, err + } + + return checkpoint, nil +} + +// resolve path for BlobMetadataCheckpointer +func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { eventHubNamespace, eventHubName, err := getHubAndNamespace(info) if err != nil { return nil, err } - // URL format - /azure-webjobs-eventhub//// path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) return path, nil } -// Resolve Path for AzureWebJob Checkpoint +// extract checkpoint for BlobMetadataCheckpointer +func (checkpointer *blobMetadataCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) +} + +// resolve path for GoSdkCheckpointer func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID)) return path, nil } -func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { - var checkpoint Checkpoint +// extract checkpoint for GoSdkCheckpointer +func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint goCheckpoint err := readToCheckpointFromBody(get, &checkpoint) if err != nil { return Checkpoint{}, err } - return checkpoint, nil + return Checkpoint{ + SequenceNumber: checkpoint.Checkpoint.SequenceNumber, + baseCheckpoint: baseCheckpoint{ + Offset: checkpoint.Checkpoint.Offset, + }, + PartitionID: checkpoint.PartitionID, + }, nil } -func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { - return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) +// resolve path for DefaultCheckpointer +func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, checkpointer.partitionID)) + + return path, nil } -func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { - var checkpoint goCheckpoint - err := readToCheckpointFromBody(get, &checkpoint) - if err != nil { - return Checkpoint{}, err +// extract checkpoint with deprecated Python sdk checkpoint for backward compatibility +func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + var pyCheckpoint pythonCheckpoint + blobData := &bytes.Buffer{} + + reader := get.Body(azblob.RetryReaderOptions{}) + if _, err := blobData.ReadFrom(reader); err != nil { + return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err) } + defer reader.Close() // The client must close the response body when finished with it - return Checkpoint{ - SequenceNumber: checkpoint.Checkpoint.SequenceNumber, - Offset: checkpoint.Checkpoint.Offset, - PartitionID: checkpoint.PartitionID, - }, nil + if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil { + return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) + } + + if err := json.Unmarshal(blobData.Bytes(), &pyCheckpoint); err != nil { + return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) + } + + err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint)) + + return checkpoint, err } func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) { diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index a21a6cd4b05..be7a0d2d5b4 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -37,7 +37,9 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - Offset: offset, + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -55,7 +57,89 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { assert.Equal(t, check, expectedCheckpoint) } -func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { +func TestCheckpointFromBlobStorageDefault(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1005" + consumerGroup := "$Default" + + sequencenumber := int64(1) + + containerName := "defaultcontainer" + checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1006" + consumerGroup := "$Default" + + sequencenumber := int64(1) + + containerName := "defaultcontainerpython" + checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { if StorageConnectionString == "" { return } @@ -71,14 +155,16 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { "sequencenumber": strconv.FormatInt(sequencenumber, 10), } - containerName := "defaultcontainer" + containerName := "blobmetadatacontainer" urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/checkpoint/", consumerGroup) ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, "", metadata) assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - Offset: offset, + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -89,6 +175,7 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { EventHubConsumerGroup: consumerGroup, EventHubName: "hub", BlobContainer: containerName, + CheckpointType: "BlobMetadata", } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") @@ -98,7 +185,6 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { } func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { - if StorageConnectionString == "" { return } @@ -118,7 +204,9 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - Offset: offset, + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -162,11 +250,24 @@ func TestShouldParseCheckpointForWebJobWithCheckpointType(t *testing.T) { assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") } -func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) { +func TestShouldParseCheckpointForDefault(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "DefaultContainer", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/DefaultContainer/$Default/0") +} + +func TestShouldParseCheckpointForBlobMetadataWithCheckpointType(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", - CheckpointType: "Default", + CheckpointType: "BlobMetadata", BlobContainer: "containername", } @@ -176,11 +277,12 @@ func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) { assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") } -func TestShouldParseCheckpointForDefault(t *testing.T) { +func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", BlobContainer: "containername", + CheckpointType: "BlobMetadata", } cp := newCheckpointer(eventHubInfo, "0") From bbde952932a7b5ed03f0e3f4d5674704793688a8 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Tue, 20 Apr 2021 21:40:16 +0200 Subject: [PATCH 3/7] rename checkpointType to checkpointStrategy in eventhubscaler Signed-off-by: Christian Leinweber --- pkg/scalers/azure/azure_eventhub.go | 2 +- pkg/scalers/azure/azure_eventhub_checkpoint.go | 8 ++++---- pkg/scalers/azure/azure_eventhub_test.go | 16 ++++++++-------- pkg/scalers/azure_eventhub_scaler.go | 8 ++++---- pkg/scalers/azure_eventhub_scaler_test.go | 2 +- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 0f2afb2031c..4b6a16480b8 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -18,7 +18,7 @@ type EventHubInfo struct { BlobContainer string Namespace string EventHubName string - CheckpointType string + CheckpointStrategy string } // GetEventHubClient returns eventhub client diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index fd131758bdd..432b3b80819 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -69,24 +69,24 @@ type defaultCheckpointer struct { containerName string } -// GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage +// GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { checkpointer := newCheckpointer(info, partitionID) return getCheckpoint(ctx, httpClient, info, checkpointer) } func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { - if info.CheckpointType == "GoSdk" { + if info.CheckpointStrategy == "GoSdk" { return &goSdkCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - } else if info.CheckpointType == "BlobMetadata" { + } else if info.CheckpointStrategy == "BlobMetadata" { return &blobMetadataCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - } else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" { + } else if info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == "" { return &azureWebjobCheckpointer{ containerName: "azure-webjobs-eventhub", partitionID: partitionID, diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index be7a0d2d5b4..8950dd7c499 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -175,7 +175,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { EventHubConsumerGroup: consumerGroup, EventHubName: "hub", BlobContainer: containerName, - CheckpointType: "BlobMetadata", + CheckpointStrategy: "BlobMetadata", } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") @@ -216,7 +216,7 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { StorageConnection: StorageConnectionString, EventHubName: "hub", BlobContainer: containerName, - CheckpointType: "GoSdk", + CheckpointStrategy: "GoSdk", } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") @@ -237,11 +237,11 @@ func TestShouldParseCheckpointForWebJob(t *testing.T) { assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") } -func TestShouldParseCheckpointForWebJobWithCheckpointType(t *testing.T) { +func TestShouldParseCheckpointForWebJobWithCheckpointStrategy(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", - CheckpointType: "AzureWebJob", + CheckpointStrategy: "AzureWebJob", } cp := newCheckpointer(eventHubInfo, "0") @@ -263,11 +263,11 @@ func TestShouldParseCheckpointForDefault(t *testing.T) { assert.Equal(t, url.Path, "/DefaultContainer/$Default/0") } -func TestShouldParseCheckpointForBlobMetadataWithCheckpointType(t *testing.T) { +func TestShouldParseCheckpointForBlobMetadataWithCheckpointStrategy(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", - CheckpointType: "BlobMetadata", + CheckpointStrategy: "BlobMetadata", BlobContainer: "containername", } @@ -282,7 +282,7 @@ func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", BlobContainer: "containername", - CheckpointType: "BlobMetadata", + CheckpointStrategy: "BlobMetadata", } cp := newCheckpointer(eventHubInfo, "0") @@ -296,7 +296,7 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) { EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", BlobContainer: "containername", - CheckpointType: "GoSdk", + CheckpointStrategy: "GoSdk", } cp := newCheckpointer(eventHubInfo, "0") diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index decf50de81a..3bfd324ef4c 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -28,7 +28,7 @@ const ( thresholdMetricName = "unprocessedEventThreshold" defaultEventHubConsumerGroup = "$Default" defaultBlobContainer = "" - defaultCheckpointType = "" + defaultCheckpointStrategy = "" ) var eventhubLog = logf.Log.WithName("azure_eventhub_scaler") @@ -94,9 +94,9 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.EventHubConsumerGroup = val } - meta.eventHubInfo.CheckpointType = defaultCheckpointType - if val, ok := config.TriggerMetadata["checkpointType"]; ok { - meta.eventHubInfo.CheckpointType = val + meta.eventHubInfo.CheckpointStrategy = defaultCheckpointStrategy + if val, ok := config.TriggerMetadata["checkpointStrategy"]; ok { + meta.eventHubInfo.CheckpointStrategy = val } meta.eventHubInfo.BlobContainer = defaultBlobContainer diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index dc6573362c3..0dae50bda9c 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -49,7 +49,7 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false}, // added blob container details - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointType": "AzureWebJob"}, false}, + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "AzureWebJob"}, false}, } var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{ From ed79ef0acce1fb83e1a8685e7baaa909dd62f9e9 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Thu, 29 Apr 2021 08:29:22 +0200 Subject: [PATCH 4/7] refactor eventhub checkpointStrategy selection Signed-off-by: Christian Leinweber --- .../azure/azure_eventhub_checkpoint.go | 13 +++--- pkg/scalers/azure/azure_eventhub_test.go | 41 +++++-------------- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index 432b3b80819..0b71e866fe3 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -7,6 +7,7 @@ import ( "fmt" "net/url" "strconv" + "strings" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/imdario/mergo" @@ -76,22 +77,23 @@ func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, } func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { - if info.CheckpointStrategy == "GoSdk" { + switch { + case (info.CheckpointStrategy == "GoSdk"): return &goSdkCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - } else if info.CheckpointStrategy == "BlobMetadata" { + case (info.CheckpointStrategy == "BlobMetadata"): return &blobMetadataCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - } else if info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == "" { + case (info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == ""): return &azureWebjobCheckpointer{ containerName: "azure-webjobs-eventhub", partitionID: partitionID, } - } else { + default: return &defaultCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, @@ -129,8 +131,7 @@ func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*u return nil, err } - path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) - + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, strings.ToLower(info.EventHubConsumerGroup), checkpointer.partitionID)) return path, nil } diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index 8950dd7c499..6d3c93ebd68 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -24,7 +24,7 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { partitionID := "0" offset := "1001" - consumerGroup := "$Default" + consumerGroup := "$Default1" sequencenumber := int64(1) @@ -62,9 +62,9 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) { return } - partitionID := "0" + partitionID := "1" offset := "1005" - consumerGroup := "$Default" + consumerGroup := "$Default2" sequencenumber := int64(1) @@ -92,7 +92,7 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) { BlobContainer: containerName, } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -103,9 +103,9 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T return } - partitionID := "0" + partitionID := "2" offset := "1006" - consumerGroup := "$Default" + consumerGroup := "$Default3" sequencenumber := int64(1) @@ -133,7 +133,7 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T BlobContainer: containerName, } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -144,7 +144,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { return } - partitionID := "0" + partitionID := "4" offset := "1002" consumerGroup := "$default" @@ -178,7 +178,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { CheckpointStrategy: "BlobMetadata", } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -219,7 +219,7 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { CheckpointStrategy: "GoSdk", } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -263,20 +263,6 @@ func TestShouldParseCheckpointForDefault(t *testing.T) { assert.Equal(t, url.Path, "/DefaultContainer/$Default/0") } -func TestShouldParseCheckpointForBlobMetadataWithCheckpointStrategy(t *testing.T) { - eventHubInfo := EventHubInfo{ - EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", - EventHubConsumerGroup: "$Default", - CheckpointStrategy: "BlobMetadata", - BlobContainer: "containername", - } - - cp := newCheckpointer(eventHubInfo, "0") - url, _ := cp.resolvePath(eventHubInfo) - - assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") -} - func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", @@ -288,7 +274,7 @@ func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { cp := newCheckpointer(eventHubInfo, "0") url, _ := cp.resolvePath(eventHubInfo) - assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") + assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$default/checkpoint/0") } func TestShouldParseCheckpointForGoSdk(t *testing.T) { @@ -305,12 +291,7 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) { assert.Equal(t, url.Path, "/containername/0") } -func Test(t *testing.T) { - -} - func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) { - credential, endpoint, _ := ParseAzureStorageBlobConnection(http.DefaultClient, "none", StorageConnectionString, "") // Create container From a47c755b26021965289a1f8934efacee0e6e3d8b Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Thu, 29 Apr 2021 14:16:41 +0200 Subject: [PATCH 5/7] add language agnostic checkpointing to changelog Signed-off-by: Christian Leinweber --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82f9abd4b62..2e42bbb26d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - Fixed goroutine leaks in usage of timers ([#1704](https://github.com/kedacore/keda/pull/1704) | [#1739](https://github.com/kedacore/keda/pull/1739)) - Setting timeouts in the HTTP client used by the IBM MQ scaler ([#1758](https://github.com/kedacore/keda/pull/1758)) - Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768)) +- Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621)) ### Breaking Changes From bdd2d6aeaa067899803f2a887429c335d94d3f02 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Mon, 3 May 2021 21:30:48 +0200 Subject: [PATCH 6/7] refactor eventhub checkpointStrategy parameter values to lowercase Signed-off-by: Christian Leinweber --- pkg/scalers/azure/azure_eventhub_checkpoint.go | 16 ++++++++-------- pkg/scalers/azure/azure_eventhub_test.go | 10 +++++----- pkg/scalers/azure_eventhub_scaler_test.go | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index 0b71e866fe3..64babfacd69 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -15,7 +15,7 @@ import ( "github.com/kedacore/keda/v2/pkg/util" ) -// goCheckpoint struct to adapt GoSdk Checkpoint +// goCheckpoint struct to adapt goSdk Checkpoint type goCheckpoint struct { Checkpoint struct { SequenceNumber int64 `json:"sequenceNumber"` @@ -78,17 +78,17 @@ func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { switch { - case (info.CheckpointStrategy == "GoSdk"): + case (info.CheckpointStrategy == "goSdk"): return &goSdkCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - case (info.CheckpointStrategy == "BlobMetadata"): + case (info.CheckpointStrategy == "blobMetadata"): return &blobMetadataCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - case (info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == ""): + case (info.CheckpointStrategy == "azureWebJob" || info.BlobContainer == ""): return &azureWebjobCheckpointer{ containerName: "azure-webjobs-eventhub", partitionID: partitionID, @@ -124,7 +124,7 @@ func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.Downl return checkpoint, nil } -// resolve path for BlobMetadataCheckpointer +// resolve path for blobMetadataCheckpointer func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { eventHubNamespace, eventHubName, err := getHubAndNamespace(info) if err != nil { @@ -135,19 +135,19 @@ func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*u return path, nil } -// extract checkpoint for BlobMetadataCheckpointer +// extract checkpoint for blobMetadataCheckpointer func (checkpointer *blobMetadataCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) } -// resolve path for GoSdkCheckpointer +// resolve path for goSdkCheckpointer func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID)) return path, nil } -// extract checkpoint for GoSdkCheckpointer +// extract checkpoint for goSdkCheckpointer func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { var checkpoint goCheckpoint err := readToCheckpointFromBody(get, &checkpoint) diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index 6d3c93ebd68..fe7ec2a8bf6 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -175,7 +175,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { EventHubConsumerGroup: consumerGroup, EventHubName: "hub", BlobContainer: containerName, - CheckpointStrategy: "BlobMetadata", + CheckpointStrategy: "blobMetadata", } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) @@ -216,7 +216,7 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { StorageConnection: StorageConnectionString, EventHubName: "hub", BlobContainer: containerName, - CheckpointStrategy: "GoSdk", + CheckpointStrategy: "goSdk", } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) @@ -241,7 +241,7 @@ func TestShouldParseCheckpointForWebJobWithCheckpointStrategy(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", - CheckpointStrategy: "AzureWebJob", + CheckpointStrategy: "azureWebJob", } cp := newCheckpointer(eventHubInfo, "0") @@ -268,7 +268,7 @@ func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", BlobContainer: "containername", - CheckpointStrategy: "BlobMetadata", + CheckpointStrategy: "blobMetadata", } cp := newCheckpointer(eventHubInfo, "0") @@ -282,7 +282,7 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) { EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", BlobContainer: "containername", - CheckpointStrategy: "GoSdk", + CheckpointStrategy: "goSdk", } cp := newCheckpointer(eventHubInfo, "0") diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 0dae50bda9c..07e892feebf 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -49,7 +49,7 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false}, // added blob container details - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "AzureWebJob"}, false}, + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "azureWebJob"}, false}, } var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{ From 48d9b807353b46a6e32f41ae1933fb6b36b371aa Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Tue, 4 May 2021 09:01:21 +0200 Subject: [PATCH 7/7] rename eventhub checkpointStrategy parameter azureWebJob to azureFunction Signed-off-by: Christian Leinweber --- pkg/scalers/azure/azure_eventhub_checkpoint.go | 14 +++++++------- pkg/scalers/azure/azure_eventhub_test.go | 8 ++++---- pkg/scalers/azure_eventhub_scaler_test.go | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index 64babfacd69..ac43ae80ceb 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -50,7 +50,7 @@ type checkpointer interface { extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) } -type azureWebjobCheckpointer struct { +type azureFunctionCheckpointer struct { partitionID string containerName string } @@ -88,8 +88,8 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { containerName: info.BlobContainer, partitionID: partitionID, } - case (info.CheckpointStrategy == "azureWebJob" || info.BlobContainer == ""): - return &azureWebjobCheckpointer{ + case (info.CheckpointStrategy == "azureFunction" || info.BlobContainer == ""): + return &azureFunctionCheckpointer{ containerName: "azure-webjobs-eventhub", partitionID: partitionID, } @@ -101,8 +101,8 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { } } -// resolve path for AzureWebJobCheckpointer -func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { +// resolve path for AzureFunctionCheckpointer +func (checkpointer *azureFunctionCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { eventHubNamespace, eventHubName, err := getHubAndNamespace(info) if err != nil { return nil, err @@ -113,8 +113,8 @@ func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*ur return path, nil } -// extract checkpoint for AzureWebJobCheckpointer -func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { +// extract checkpoint for AzureFunctionCheckpointer +func (checkpointer *azureFunctionCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { var checkpoint Checkpoint err := readToCheckpointFromBody(get, &checkpoint) if err != nil { diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index fe7ec2a8bf6..06f7a5a116b 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -17,7 +17,7 @@ import ( // Add a valid Storage account connection string here const StorageConnectionString = "" -func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { +func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) { if StorageConnectionString == "" { return } @@ -225,7 +225,7 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { assert.Equal(t, check, expectedCheckpoint) } -func TestShouldParseCheckpointForWebJob(t *testing.T) { +func TestShouldParseCheckpointForFunction(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", @@ -237,11 +237,11 @@ func TestShouldParseCheckpointForWebJob(t *testing.T) { assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") } -func TestShouldParseCheckpointForWebJobWithCheckpointStrategy(t *testing.T) { +func TestShouldParseCheckpointForFunctionWithCheckpointStrategy(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", - CheckpointStrategy: "azureWebJob", + CheckpointStrategy: "azureFunction", } cp := newCheckpointer(eventHubInfo, "0") diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 07e892feebf..0565f7e2462 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -49,7 +49,7 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false}, // added blob container details - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "azureWebJob"}, false}, + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "azureFunction"}, false}, } var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{