diff --git a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go index be8978f9c292..ea3aa5677ee6 100644 --- a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go +++ b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go @@ -206,13 +206,18 @@ func (cps *testCheckpointStore) ClaimOwnership(ctx context.Context, partitionOwn current, exists := cps.ownerships[key] - if exists && po.ETag != nil && *current.ETag != *po.ETag { - // can't own it, didn't have the expected etag - return nil, nil + if exists { + if po.ETag == nil { + panic("Ownership blob exists, we should have claimed it using an etag") + } + + if *po.ETag != *current.ETag { + // can't own it, didn't have the expected etag + return nil, nil + } } newOwnership := po - uuid, err := uuid.New() if err != nil { diff --git a/sdk/messaging/azeventhubs/internal/errors.go b/sdk/messaging/azeventhubs/internal/errors.go index 2f9fb1a90b56..c9e011725dfc 100644 --- a/sdk/messaging/azeventhubs/internal/errors.go +++ b/sdk/messaging/azeventhubs/internal/errors.go @@ -148,6 +148,7 @@ var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{ amqp.ErrCondNotAllowed: RecoveryKindFatal, // "amqp:not-allowed" amqp.ErrCond("com.microsoft:entity-disabled"): RecoveryKindFatal, // entity is disabled in the portal amqp.ErrCond("com.microsoft:session-cannot-be-locked"): RecoveryKindFatal, + amqp.ErrCond("com.microsoft:argument-out-of-range"): RecoveryKindFatal, // asked for a partition ID that doesn't exist errorConditionLockLost: RecoveryKindFatal, } diff --git a/sdk/messaging/azeventhubs/processor_load_balancer.go b/sdk/messaging/azeventhubs/processor_load_balancer.go index 419e227ee163..99f396cfd652 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancer.go +++ b/sdk/messaging/azeventhubs/processor_load_balancer.go @@ -9,6 +9,8 @@ import ( "math" "math/rand" "time" + + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" ) type processorLoadBalancer struct { @@ -73,6 +75,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [ // - I have too many. We expect to have some stolen from us, but we'll maintain // ownership for now. claimMorePartitions = false + log.Writef(EventConsumer, "Owns %d/%d, no more needed", len(lbinfo.current), lbinfo.maxAllowed) } else if lbinfo.extraPartitionPossible && len(lbinfo.current) == lbinfo.maxAllowed-1 { // In the 'extraPartitionPossible' scenario, some consumers will have an extra partition // since things don't divide up evenly. We're one under the max, which means we _might_ @@ -81,6 +84,10 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [ // We will attempt to grab _one_ more but only if there are free partitions available // or if one of the consumers has more than the max allowed. claimMorePartitions = len(lbinfo.unownedOrExpired) > 0 || len(lbinfo.aboveMax) > 0 + log.Writef(EventConsumer, "Unowned/expired: %d, above max: %d, need to claim: %t", + len(lbinfo.unownedOrExpired), + len(lbinfo.aboveMax), + claimMorePartitions) } ownerships := lbinfo.current @@ -88,8 +95,10 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [ if claimMorePartitions { switch lb.strategy { case ProcessorStrategyGreedy: + log.Writef(EventConsumer, "Using greedy strategy to claim partitions") ownerships = lb.greedyLoadBalancer(ctx, lbinfo) case ProcessorStrategyBalanced: + log.Writef(EventConsumer, "Using balanced strategy to claim partitions") o := lb.balancedLoadBalancer(ctx, lbinfo) if o != nil { @@ -106,6 +115,8 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [ // getAvailablePartitions finds all partitions that are either completely unowned _or_ // their ownership is stale. func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, partitionIDs []string) (loadBalancerInfo, error) { + log.Writef(EventConsumer, "[%s] Listing ownership for %s/%s/%s", lb.details.ClientID, lb.details.FullyQualifiedNamespace, lb.details.EventHubName, lb.details.ConsumerGroup) + ownerships, err := lb.checkpointStore.ListOwnership(ctx, lb.details.FullyQualifiedNamespace, lb.details.EventHubName, lb.details.ConsumerGroup, nil) if err != nil { @@ -132,6 +143,9 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par groupedByOwner[o.OwnerID] = append(groupedByOwner[o.OwnerID], o) } + numExpired := len(unownedOrExpired) + log.Writef(EventConsumer, "Expired: %d", numExpired) + // add in all the unowned partitions for _, partID := range partitionIDs { if alreadyAdded[partID] { @@ -149,6 +163,8 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par }) } + log.Writef(EventConsumer, "Unowned: %d", len(unownedOrExpired)-numExpired) + maxAllowed := len(partitionIDs) / len(groupedByOwner) hasRemainder := len(partitionIDs)%len(groupedByOwner) > 0 @@ -188,6 +204,8 @@ func (lb *processorLoadBalancer) greedyLoadBalancer(ctx context.Context, lbinfo ours = append(ours, randomOwnerships...) if len(ours) < lbinfo.maxAllowed { + log.Writef(EventConsumer, "Not enough expired or unowned partitions, will need to steal from other processors") + // if that's not enough then we'll randomly steal from any owners that had partitions // above the maximum. randomOwnerships := getRandomOwnerships(lb.rnd, lbinfo.aboveMax, lbinfo.maxAllowed-len(ours)) @@ -197,6 +215,7 @@ func (lb *processorLoadBalancer) greedyLoadBalancer(ctx context.Context, lbinfo for i := 0; i < len(ours); i++ { ours[i] = lb.resetOwnership(ours[i]) } + return ours } @@ -225,7 +244,6 @@ func (lb *processorLoadBalancer) balancedLoadBalancer(ctx context.Context, lbinf } func (lb *processorLoadBalancer) resetOwnership(o Ownership) Ownership { - o.ETag = nil o.OwnerID = lb.details.ClientID return o }