diff --git a/batch_disposition.go b/batch_disposition.go index 4494db0..e0c6ac5 100644 --- a/batch_disposition.go +++ b/batch_disposition.go @@ -16,6 +16,15 @@ type ( Status MessageStatus cursor int } + // BatchDispositionError is an error which returns a collection of DispositionError. + BatchDispositionError struct { + Errors []DispositionError + } + // DispositionError is an error associated with a LockTokenID. + DispositionError struct { + LockTokenID *uuid.UUID + err error + } ) const ( @@ -25,6 +34,23 @@ const ( Abort MessageStatus = MessageStatus(abandonedDisposition) ) +func (bde BatchDispositionError) Error() string { + msg := "" + if len(bde.Errors) != 0 { + msg = fmt.Sprintf("Operation failed, %d error(s) reported.", len(bde.Errors)) + } + return msg +} + +func (de DispositionError) Error() string { + return de.err.Error() +} + +// UnWrap will return the private error. +func (de DispositionError) UnWrap() error { + return de.err +} + // Done communicates whether there are more messages remaining to be iterated over. func (bdi *BatchDispositionIterator) Done() bool { return len(bdi.LockTokenIDs) == bdi.cursor @@ -39,7 +65,8 @@ func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) { return uuid } -func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) error { +func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) BatchDispositionError { + batchError := BatchDispositionError{} for !bdi.Done() { if uuid := bdi.Next(); uuid != nil { m := &Message{ @@ -48,21 +75,24 @@ func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConn m.ec = ec err := m.sendDisposition(ctx, bdi.Status) if err != nil { - return err + batchError.Errors = append(batchError.Errors, DispositionError{ + LockTokenID: uuid, + err: err, + }) } } } - return nil + return batchError } -// SendBatchDisposition updates the LockToken id to the desired status. +// SendBatchDisposition updates the LockTokenIDs to the disposition status. func (q *Queue) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error { span, ctx := q.startSpanFromContext(ctx, "sb.Queue.SendBatchDisposition") defer span.Finish() return iterator.doUpdate(ctx, q) } -// SendBatchDisposition updates the LockToken id to the desired status. +// SendBatchDisposition updates the LockTokenIDs to the desired disposition status. func (s *Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error { span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.SendBatchDisposition") defer span.Finish() diff --git a/batch_disposition_test.go b/batch_disposition_test.go index 30ec62b..3d7fc01 100644 --- a/batch_disposition_test.go +++ b/batch_disposition_test.go @@ -2,6 +2,7 @@ package servicebus import ( "context" + "fmt" "testing" "github.com/Azure/azure-amqp-common-go/uuid" @@ -36,12 +37,19 @@ func TestBatchDispositionUnsupportedStatus(t *testing.T) { id := uuid.UUID{} bdi := BatchDispositionIterator{ LockTokenIDs: []*uuid.UUID{ - &id, + &id, &id, &id, }, Status: status, } subscription := Subscription{} err := subscription.SendBatchDisposition(context.Background(), bdi) - assert.EqualErrorf(t, err, "unsupported bulk disposition status \"suspended\"", err.Error()) + be := err.(BatchDispositionError) + assert.NotNil(t, be, fmt.Sprintf("Wrong error type %T", err)) + assert.EqualErrorf(t, err, fmt.Sprintf("Operation failed, %d error(s) reported.", len(be.Errors)), err.Error()) + + for _, innerErr := range be.Errors { + assert.NotNil(t, innerErr.UnWrap(), "Unwrapped error is nil") + assert.EqualErrorf(t, innerErr, "unsupported bulk disposition status \"suspended\"", innerErr.Error()) + } }