diff --git a/CHANGELOG.md b/CHANGELOG.md index 82f9abd4b62..2e42bbb26d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - Fixed goroutine leaks in usage of timers ([#1704](https://github.com/kedacore/keda/pull/1704) | [#1739](https://github.com/kedacore/keda/pull/1739)) - Setting timeouts in the HTTP client used by the IBM MQ scaler ([#1758](https://github.com/kedacore/keda/pull/1758)) - Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768)) +- Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621)) ### Breaking Changes diff --git a/go.mod b/go.mod index e871b5e1ac0..ae6dbb821ae 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/go-logr/logr v0.4.0 github.com/go-logr/zapr v0.4.0 // indirect github.com/go-openapi/spec v0.20.3 + github.com/go-playground/assert/v2 v2.0.1 github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.6.0 github.com/golang/mock v1.5.0 diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 80ac819d904..4b6a16480b8 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -1,48 +1,15 @@ package azure import ( - "bytes" - "context" - "encoding/json" "errors" "fmt" - "net/url" "strings" - "github.com/imdario/mergo" - "github.com/Azure/azure-amqp-common-go/v3/aad" eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/go-autorest/autorest/azure" - - kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" - "github.com/kedacore/keda/v2/pkg/util" ) -type baseCheckpoint struct { - Epoch int64 `json:"Epoch"` - Offset string `json:"Offset"` - Owner string `json:"Owner"` - Token string `json:"Token"` -} - -// Checkpoint is the object eventhub processor stores in storage -// for checkpointing event processors. This matches the object -// stored by the eventhub C# sdk and Java sdk -type Checkpoint struct { - baseCheckpoint - PartitionID string `json:"PartitionId"` - SequenceNumber int64 `json:"SequenceNumber"` -} - -// Eventhub python sdk stores the checkpoint differently -type pythonCheckpoint struct { - baseCheckpoint - PartitionID string `json:"partition_id"` - SequenceNumber int64 `json:"sequence_number"` -} - // EventHubInfo to keep event hub connection and resources type EventHubInfo struct { EventHubConnection string @@ -51,6 +18,7 @@ type EventHubInfo struct { BlobContainer string Namespace string EventHubName string + CheckpointStrategy string } // GetEventHubClient returns eventhub client @@ -80,74 +48,6 @@ func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) { return nil, aadErr } -// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition -func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { - blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "") - if err != nil { - return Checkpoint{}, err - } - - var eventHubNamespace string - var eventHubName string - if info.EventHubConnection != "" { - eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection) - if err != nil { - return Checkpoint{}, err - } - } else { - eventHubNamespace = info.Namespace - eventHubName = info.EventHubName - } - - // TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats - var baseURL *url.URL - // Checking blob store for C# and Java applications - if info.BlobContainer != "" { - // URL format - /// - path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, partitionID)) - baseURL = storageEndpoint.ResolveReference(path) - } else { - // Checking blob store for Azure functions - // URL format - /azure-webjobs-eventhub//// - path, _ := url.Parse(fmt.Sprintf("/azure-webjobs-eventhub/%s/%s/%s/%s", eventHubNamespace, eventHubName, info.EventHubConsumerGroup, partitionID)) - baseURL = storageEndpoint.ResolveReference(path) - } - - // Create a BlockBlobURL object to a blob in the container. - blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{})) - - get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err) - } - - blobData := &bytes.Buffer{} - reader := get.Body(azblob.RetryReaderOptions{}) - if _, err := blobData.ReadFrom(reader); err != nil { - return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err) - } - defer reader.Close() // The client must close the response body when finished with it - - return getCheckpoint(blobData.Bytes()) -} - -func getCheckpoint(bytes []byte) (Checkpoint, error) { - var checkpoint Checkpoint - var pyCheckpoint pythonCheckpoint - - if err := json.Unmarshal(bytes, &checkpoint); err != nil { - return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) - } - - if err := json.Unmarshal(bytes, &pyCheckpoint); err != nil { - return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) - } - - err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint)) - - return checkpoint, err -} - // ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name) // Connection string should be in following format: // Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name @@ -177,3 +77,20 @@ func ParseAzureEventHubConnectionString(connectionString string) (string, string return eventHubNamespace, eventHubName, nil } + +func getHubAndNamespace(info EventHubInfo) (string, string, error) { + var eventHubNamespace string + var eventHubName string + var err error + if info.EventHubConnection != "" { + eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection) + if err != nil { + return "", "", err + } + } else { + eventHubNamespace = info.Namespace + eventHubName = info.EventHubName + } + + return eventHubNamespace, eventHubName, nil +} diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go new file mode 100644 index 00000000000..ac43ae80ceb --- /dev/null +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -0,0 +1,277 @@ +package azure + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/imdario/mergo" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/util" +) + +// goCheckpoint struct to adapt goSdk Checkpoint +type goCheckpoint struct { + Checkpoint struct { + SequenceNumber int64 `json:"sequenceNumber"` + Offset string `json:"offset"` + } `json:"checkpoint"` + PartitionID string `json:"partitionId"` +} + +type baseCheckpoint struct { + Epoch int64 `json:"Epoch"` + Offset string `json:"Offset"` + Owner string `json:"Owner"` + Token string `json:"Token"` +} + +// Checkpoint in a common format +type Checkpoint struct { + baseCheckpoint + PartitionID string `json:"PartitionId"` + SequenceNumber int64 `json:"SequenceNumber"` +} + +// Older python sdk stores the checkpoint differently +type pythonCheckpoint struct { + baseCheckpoint + PartitionID string `json:"partition_id"` + SequenceNumber int64 `json:"sequence_number"` +} + +type checkpointer interface { + resolvePath(info EventHubInfo) (*url.URL, error) + extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) +} + +type azureFunctionCheckpointer struct { + partitionID string + containerName string +} + +type blobMetadataCheckpointer struct { + partitionID string + containerName string +} + +type goSdkCheckpointer struct { + partitionID string + containerName string +} + +type defaultCheckpointer struct { + partitionID string + containerName string +} + +// GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage +func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { + checkpointer := newCheckpointer(info, partitionID) + return getCheckpoint(ctx, httpClient, info, checkpointer) +} + +func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { + switch { + case (info.CheckpointStrategy == "goSdk"): + return &goSdkCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } + case (info.CheckpointStrategy == "blobMetadata"): + return &blobMetadataCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } + case (info.CheckpointStrategy == "azureFunction" || info.BlobContainer == ""): + return &azureFunctionCheckpointer{ + containerName: "azure-webjobs-eventhub", + partitionID: partitionID, + } + default: + return &defaultCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } + } +} + +// resolve path for AzureFunctionCheckpointer +func (checkpointer *azureFunctionCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + eventHubNamespace, eventHubName, err := getHubAndNamespace(info) + if err != nil { + return nil, err + } + + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) + + return path, nil +} + +// extract checkpoint for AzureFunctionCheckpointer +func (checkpointer *azureFunctionCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + err := readToCheckpointFromBody(get, &checkpoint) + if err != nil { + return Checkpoint{}, err + } + + return checkpoint, nil +} + +// resolve path for blobMetadataCheckpointer +func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + eventHubNamespace, eventHubName, err := getHubAndNamespace(info) + if err != nil { + return nil, err + } + + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, strings.ToLower(info.EventHubConsumerGroup), checkpointer.partitionID)) + return path, nil +} + +// extract checkpoint for blobMetadataCheckpointer +func (checkpointer *blobMetadataCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) +} + +// resolve path for goSdkCheckpointer +func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID)) + + return path, nil +} + +// extract checkpoint for goSdkCheckpointer +func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint goCheckpoint + err := readToCheckpointFromBody(get, &checkpoint) + if err != nil { + return Checkpoint{}, err + } + + return Checkpoint{ + SequenceNumber: checkpoint.Checkpoint.SequenceNumber, + baseCheckpoint: baseCheckpoint{ + Offset: checkpoint.Checkpoint.Offset, + }, + PartitionID: checkpoint.PartitionID, + }, nil +} + +// resolve path for DefaultCheckpointer +func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, checkpointer.partitionID)) + + return path, nil +} + +// extract checkpoint with deprecated Python sdk checkpoint for backward compatibility +func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + var pyCheckpoint pythonCheckpoint + blobData := &bytes.Buffer{} + + reader := get.Body(azblob.RetryReaderOptions{}) + if _, err := blobData.ReadFrom(reader); err != nil { + return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err) + } + defer reader.Close() // The client must close the response body when finished with it + + if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil { + return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) + } + + if err := json.Unmarshal(blobData.Bytes(), &pyCheckpoint); err != nil { + return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) + } + + err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint)) + + return checkpoint, err +} + +func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) { + blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "") + if err != nil { + return Checkpoint{}, err + } + + path, err := checkpointer.resolvePath(info) + if err != nil { + return Checkpoint{}, err + } + + baseURL := storageEndpoint.ResolveReference(path) + + get, err := downloadBlob(ctx, baseURL, blobCreds) + if err != nil { + return Checkpoint{}, err + } + + return checkpointer.extractCheckpoint(get) +} + +func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID string) (Checkpoint, error) { + checkpoint := Checkpoint{ + PartitionID: partitionID, + } + + metadata := get.NewMetadata() + + if sequencenumber, ok := metadata["sequencenumber"]; ok { + if !ok { + if sequencenumber, ok = metadata["Sequencenumber"]; !ok { + return Checkpoint{}, fmt.Errorf("sequencenumber on blob not found") + } + } + + if sn, err := strconv.ParseInt(sequencenumber, 10, 64); err == nil { + checkpoint.SequenceNumber = sn + } else { + return Checkpoint{}, fmt.Errorf("sequencenumber is not a valid int64 value: %w", err) + } + } + + if offset, ok := metadata["offset"]; ok { + if !ok { + if offset, ok = metadata["Offset"]; !ok { + return Checkpoint{}, fmt.Errorf("offset on blob not found") + } + } + checkpoint.Offset = offset + } + + return checkpoint, nil +} + +func readToCheckpointFromBody(get *azblob.DownloadResponse, checkpoint interface{}) error { + blobData := &bytes.Buffer{} + + reader := get.Body(azblob.RetryReaderOptions{}) + if _, err := blobData.ReadFrom(reader); err != nil { + return fmt.Errorf("failed to read blob data: %s", err) + } + defer reader.Close() // The client must close the response body when finished with it + + if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil { + return fmt.Errorf("failed to decode blob data: %s", err) + } + + return nil +} + +func downloadBlob(ctx context.Context, baseURL *url.URL, blobCreds azblob.Credential) (*azblob.DownloadResponse, error) { + blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{})) + + get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to download file from blob storage: %w", err) + } + return get, nil +} diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index c7e89fe4a46..06f7a5a116b 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -1,37 +1,327 @@ package azure import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" "testing" - "github.com/stretchr/testify/assert" + "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/go-playground/assert/v2" ) -const csharpSdkCheckpoint = `{ - "Epoch": 123456, - "Offset": "test offset", - "Owner": "test owner", - "PartitionId": "test partitionId", - "SequenceNumber": 12345 - }` - -const pythonSdkCheckpoint = `{ - "epoch": 123456, - "offset": "test offset", - "owner": "test owner", - "partition_id": "test partitionId", - "sequence_number": 12345 - }` - -func TestGetCheckpoint(t *testing.T) { - cckp, err := getCheckpoint([]byte(csharpSdkCheckpoint)) - if err != nil { - t.Error(err) +// Add a valid Storage account connection string here +const StorageConnectionString = "" + +func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1001" + consumerGroup := "$Default1" + + sequencenumber := int64(1) + + containerName := "azure-webjobs-eventhub" + checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageDefault(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "1" + offset := "1005" + consumerGroup := "$Default2" + + sequencenumber := int64(1) + + containerName := "defaultcontainer" + checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "2" + offset := "1006" + consumerGroup := "$Default3" + + sequencenumber := int64(1) + + containerName := "defaultcontainerpython" + checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "4" + offset := "1002" + consumerGroup := "$default" + + sequencenumber := int64(1) + + metadata := map[string]string{ + "offset": offset, + "sequencenumber": strconv.FormatInt(sequencenumber, 10), + } + + containerName := "blobmetadatacontainer" + urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/checkpoint/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, "", metadata) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + CheckpointStrategy: "blobMetadata", + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1003" + + sequencenumber := int64(1) + + containerName := "gosdkcontainer" + checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" + checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber) + + urlPath := "" + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, } - pckp, err := getCheckpoint([]byte(pythonSdkCheckpoint)) + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubName: "hub", + BlobContainer: containerName, + CheckpointStrategy: "goSdk", + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestShouldParseCheckpointForFunction(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") +} + +func TestShouldParseCheckpointForFunctionWithCheckpointStrategy(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + CheckpointStrategy: "azureFunction", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") +} + +func TestShouldParseCheckpointForDefault(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "DefaultContainer", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/DefaultContainer/$Default/0") +} + +func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "containername", + CheckpointStrategy: "blobMetadata", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$default/checkpoint/0") +} + +func TestShouldParseCheckpointForGoSdk(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "containername", + CheckpointStrategy: "goSdk", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/containername/0") +} + +func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) { + credential, endpoint, _ := ParseAzureStorageBlobConnection(http.DefaultClient, "none", StorageConnectionString, "") + + // Create container + ctx := context.Background() + path, _ := url.Parse(containerName) + url := endpoint.ResolveReference(path) + containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{})) + _, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) + + err = errors.Unwrap(err) if err != nil { - t.Error(err) + if stErr, ok := err.(azblob.StorageError); ok { + if stErr.ServiceCode() == azblob.ServiceCodeContainerAlreadyExists { + return ctx, fmt.Errorf("failed to create container: %s", err) + } + } } - assert.Equal(t, cckp, pckp) + blobFolderURL := containerURL.NewBlockBlobURL(urlPath + partitionID) + + var b bytes.Buffer + b.WriteString(checkpoint) + + // Upload file + _, err = azblob.UploadBufferToBlockBlob(ctx, b.Bytes(), blobFolderURL, azblob.UploadToBlockBlobOptions{ + BlockSize: 4 * 1024 * 1024, + Metadata: metadata, + Parallelism: 16}) + if err != nil { + return ctx, fmt.Errorf("Err uploading file to blob: %s", err) + } + return ctx, nil } diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index c754c645bc3..3bfd324ef4c 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -28,6 +28,7 @@ const ( thresholdMetricName = "unprocessedEventThreshold" defaultEventHubConsumerGroup = "$Default" defaultBlobContainer = "" + defaultCheckpointStrategy = "" ) var eventhubLog = logf.Log.WithName("azure_eventhub_scaler") @@ -93,6 +94,11 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.EventHubConsumerGroup = val } + meta.eventHubInfo.CheckpointStrategy = defaultCheckpointStrategy + if val, ok := config.TriggerMetadata["checkpointStrategy"]; ok { + meta.eventHubInfo.CheckpointStrategy = val + } + meta.eventHubInfo.BlobContainer = defaultBlobContainer if val, ok := config.TriggerMetadata["blobContainer"]; ok { meta.eventHubInfo.BlobContainer = val @@ -145,7 +151,7 @@ func (scaler *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx conte // if blob not found return the total partition event count err = errors.Unwrap(err) if stErr, ok := err.(azblob.StorageError); ok { - if stErr.ServiceCode() == azblob.ServiceCodeBlobNotFound { + if stErr.ServiceCode() == azblob.ServiceCodeBlobNotFound || stErr.ServiceCode() == azblob.ServiceCodeContainerNotFound { return GetUnprocessedEventCountWithoutCheckpoint(partitionInfo), azure.Checkpoint{}, nil } } diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 0f33d520872..0565f7e2462 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -49,7 +49,7 @@ var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ // missing unprocessed event threshold - should replace with default {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false}, // added blob container details - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName}, false}, + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "azureFunction"}, false}, } var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{