diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index 1563ff9d6774..8af3752a2309 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 0.6.0 (Unreleased) +## 0.6.0 (2023-03-07) ### Features Added @@ -14,7 +14,8 @@ ### Bugs Fixed -### Other Changes +- Recover the connection when the $cbs Receiver/Sender is not closed properly. This would cause + clients to return an error saying "$cbs node has already been opened." (PR#20334) ## 0.5.0 (2023-02-07) diff --git a/sdk/messaging/azeventhubs/internal/cbs.go b/sdk/messaging/azeventhubs/internal/cbs.go index ae3aa6c359b6..5148e127a783 100644 --- a/sdk/messaging/azeventhubs/internal/cbs.go +++ b/sdk/messaging/azeventhubs/internal/cbs.go @@ -7,6 +7,7 @@ import ( "context" "errors" + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/auth" @@ -35,6 +36,14 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie }) if err != nil { + // In some circumstances we can end up in a situation where the link closing was cancelled + // or interrupted, leaving $cbs still open by some dangling receiver or sender. The only way + // to fix this is to restart the connection. + if IsNotAllowedError(err) { + log.Writef(exported.EventAuth, "Not allowed to open, connection will be reset: %s", err) + return errConnResetNeeded + } + return err } diff --git a/sdk/messaging/azeventhubs/internal/errors.go b/sdk/messaging/azeventhubs/internal/errors.go index 465b16165189..ae36ae41b8dd 100644 --- a/sdk/messaging/azeventhubs/internal/errors.go +++ b/sdk/messaging/azeventhubs/internal/errors.go @@ -323,6 +323,13 @@ func IsErrNotFound(err error) bool { return ok } +func IsNotAllowedError(err error) bool { + var e *amqp.Error + + return errors.As(err, &e) && + e.Condition == amqp.ErrorNotAllowed +} + func (e ErrConnectionClosed) Error() string { return fmt.Sprintf("the connection has been closed: %s", string(e)) } diff --git a/sdk/messaging/azeventhubs/internal/links_test.go b/sdk/messaging/azeventhubs/internal/links_test.go new file mode 100644 index 000000000000..5170eca157b8 --- /dev/null +++ b/sdk/messaging/azeventhubs/internal/links_test.go @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package internal + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test" + "github.com/stretchr/testify/require" +) + +func TestLinksCBSLinkStillOpen(t *testing.T) { + // we're not going to use this client for these tests. + testParams := test.GetConnectionParamsForTest(t) + ns, err := NewNamespace(NamespaceWithConnectionString(testParams.ConnectionString)) + require.NoError(t, err) + + defer func() { _ = ns.Close(context.Background(), true) }() + + session, oldConnID, err := ns.NewAMQPSession(context.Background()) + require.NoError(t, err) + + // opening a Sender to the $cbs endpoint. This endpoint can only be opened by a single + // sender/receiver pair in a connection. + _, err = session.NewSender(context.Background(), "$cbs", nil) + require.NoError(t, err) + + newLinkFn := func(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (AMQPSenderCloser, error) { + return session.NewSender(ctx, entityPath, &amqp.SenderOptions{ + SettlementMode: to.Ptr(amqp.ModeMixed), + RequestedReceiverSettleMode: to.Ptr(amqp.ModeFirst), + IgnoreDispositionErrors: true, + }) + } + + formatEntityPath := func(partitionID string) string { + return fmt.Sprintf("%s/Partitions/%s", testParams.EventHubName, partitionID) + } + + links := NewLinks(ns, fmt.Sprintf("%s/$management", testParams.EventHubName), formatEntityPath, newLinkFn) + + var lwid LinkWithID[AMQPSenderCloser] + + err = links.Retry(context.Background(), exported.EventConn, "test", "0", exported.RetryOptions{ + RetryDelay: -1, + MaxRetryDelay: time.Millisecond, + }, func(ctx context.Context, innerLWID LinkWithID[AMQPSenderCloser]) error { + lwid = innerLWID + return nil + }) + require.NoError(t, err) + + defer func() { + err := links.Close(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, err) + require.Equal(t, oldConnID+1, lwid.ConnID, "Connection gets incremented since it had to be reset") +} diff --git a/sdk/messaging/azeventhubs/internal/test/test_helpers.go b/sdk/messaging/azeventhubs/internal/test/test_helpers.go index 0ca9c0ce9a09..b3061b0bd7ba 100644 --- a/sdk/messaging/azeventhubs/internal/test/test_helpers.go +++ b/sdk/messaging/azeventhubs/internal/test/test_helpers.go @@ -101,7 +101,11 @@ type ConnectionParamsForTest struct { } func GetConnectionParamsForTest(t *testing.T) ConnectionParamsForTest { - _ = godotenv.Load() + if _, err := os.Stat("../.env"); err == nil { + _ = godotenv.Load("../.env") + } else { + _ = godotenv.Load() + } envVars := mustGetEnvironmentVars(t, []string{ "AZURE_SUBSCRIPTION_ID", diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 81f58dc68b0e..1e1654af3df7 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,17 +1,13 @@ # Release History -## 1.2.1 (Unreleased) - -### Features Added - -### Breaking Changes +## 1.2.1 (2023-03-07) ### Bugs Fixed -- Fixing issues where we could over-request credit (#19965) or allow for negative/zero credits (#19743), both of +- Prevent over-requesting credit (#19965) or requesting negative/zero credits (#19743), both of which could cause issues with go-amqp. (PR#19992) - -### Other Changes +- Recover the connection when the $cbs Receiver/Sender is not closed properly. This would cause + clients to return an error saying "$cbs node has already been opened." (PR#20334) ## 1.2.0 (2023-02-07) diff --git a/sdk/messaging/azservicebus/internal/amqpLinks_test.go b/sdk/messaging/azservicebus/internal/amqpLinks_test.go index 6be5525b2058..3ca376c29988 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks_test.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks_test.go @@ -102,7 +102,7 @@ func TestAMQPLinksBasic(t *testing.T) { } func TestAMQPLinksLive(t *testing.T) { - // we're not going to use this client for tehse tests. + // we're not going to use this client for these tests. entityPath, cleanup := test.CreateExpiringQueue(t, nil) defer cleanup() @@ -174,6 +174,56 @@ func TestAMQPLinksLive(t *testing.T) { assertLinks(t, lwr) } +// TestAMQPLinksCBSLinkStillOpen makes sure we can recover from an incompletely +// closed $cbs link, which can happen if a user cancels and we can't properly close +// the link as a result. +func TestAMQPLinksCBSLinkStillOpen(t *testing.T) { + // we're not going to use this client for these tests. + entityPath, cleanup := test.CreateExpiringQueue(t, nil) + defer cleanup() + + cs := test.GetConnectionString(t) + ns, err := NewNamespace(NamespaceWithConnectionString(cs)) + require.NoError(t, err) + + defer func() { _ = ns.Close(false) }() + + session, oldConnID, err := ns.NewAMQPSession(context.Background()) + require.NoError(t, err) + + // opening a Sender to the $cbs endpoint. This endpoint can only be opened by a single + // sender/receiver pair in a connection. + _, err = session.NewSender(context.Background(), "$cbs", nil) + require.NoError(t, err) + + links := NewAMQPLinks(NewAMQPLinksArgs{ + NS: ns, + EntityPath: entityPath, + CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) { + return newLinksForAMQPLinksTest(entityPath, session) + }, + GetRecoveryKindFunc: GetRecoveryKind, + }) + + var lwid *LinksWithID + + err = links.Retry(context.Background(), exported.EventConn, "test", func(ctx context.Context, innerLwid *LinksWithID, args *utils.RetryFnArgs) error { + lwid = innerLwid + return nil + }, exported.RetryOptions{ + RetryDelay: -1, + MaxRetryDelay: time.Millisecond, + }) + + defer func() { + err := links.Close(context.Background(), true) + require.NoError(t, err) + }() + + require.NoError(t, err) + require.Equal(t, oldConnID+1, lwid.ID.Conn, "Connection gets incremented since it had to be reset") +} + func TestAMQPLinksLiveRecoverLink(t *testing.T) { // we're not going to use this client for these tests. entityPath, cleanup := test.CreateExpiringQueue(t, nil) diff --git a/sdk/messaging/azservicebus/internal/cbs.go b/sdk/messaging/azservicebus/internal/cbs.go index ffebbd6f8fb5..53ac8cbaf023 100644 --- a/sdk/messaging/azservicebus/internal/cbs.go +++ b/sdk/messaging/azservicebus/internal/cbs.go @@ -6,6 +6,7 @@ package internal import ( "context" + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/auth" @@ -31,6 +32,14 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie }) if err != nil { + // In some circumstances we can end up in a situation where the link closing was cancelled + // or interrupted, leaving $cbs still open by some dangling receiver or sender. The only way + // to fix this is to restart the connection. + if IsNotAllowedError(err) { + log.Writef(exported.EventAuth, "Not allowed to open, connection will be reset: %s", err) + return errConnResetNeeded + } + return err } diff --git a/sdk/messaging/azservicebus/internal/errors.go b/sdk/messaging/azservicebus/internal/errors.go index 9d4dfee4f92b..7edb849dd0a3 100644 --- a/sdk/messaging/azservicebus/internal/errors.go +++ b/sdk/messaging/azservicebus/internal/errors.go @@ -102,6 +102,13 @@ func IsDetachError(err error) bool { return errors.As(err, &de) } +func IsNotAllowedError(err error) bool { + var e *amqp.Error + + return errors.As(err, &e) && + e.Condition == amqp.ErrorNotAllowed +} + func IsCancelError(err error) bool { if err == nil { return false