Skip to content

Commit 27aab78

Browse files
AMQP Session related fixes. (#5299)
* Implemented connection handle table to fix half closed session problem * Correctly handle session begin after connection is open * Split out amqp settle mode to separate header; added test coverage for AMQP Transfer and Detach performatives * Use absolute path for settle mode header
1 parent dffa3ed commit 27aab78

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2919
-866
lines changed

.clang-format

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ IncludeCategories:
4747
Priority: 90
4848
- Regex: '<Azure/.*>'
4949
Priority: 50
50-
- Regex: '^<azure_uamqp_c.*/(amqp_definitions_sequence_no|amqp_definitions_milliseconds|amqp_definitions_terminus_expiry_policy|amqp_definitions_terminus_durability|amqp_definitions_fields).h>$'
50+
- Regex: '^<azure_uamqp_c.*/(amqp_definitions_fields).h>$'
51+
Priority: 59
52+
- Regex: '^<azure_uamqp_c.*/(amqp_definitions_sequence_no|amqp_definitions_milliseconds|amqp_definitions_terminus_expiry_policy|amqp_definitions_terminus_durability|amqp_definitions_number|amqp_definitions_error|amqp_definitions_handle).h>$'
5153
Priority: 60
5254
- Regex: '<azure_.*>'
5355
Priority: 70

.vscode/cspell.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,16 @@
255255
"filename": "CMakePresets.json",
256256
"words": [
257257
"ASAN",
258+
"asan",
258259
"fsanitize"
259260
]
260261
},
262+
{
263+
"filename": ".clang-format",
264+
"words": [
265+
"Dont"
266+
]
267+
},
261268
{
262269
"filename": "**/eng/pipelines/templates/**/*.yml",
263270
"words": [

CMakePresets.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,16 @@
344344
"RUN_LONG_UNIT_TESTS": true
345345
}
346346
},
347+
{
348+
"name": "x86-static-release",
349+
"displayName": "x86 Release, static",
350+
"description": "Windows x86 Release build",
351+
"inherits": [
352+
"x86-static",
353+
"release-build"
354+
]
355+
356+
},
347357
{
348358
"name": "x64-debug-tests",
349359
"displayName": "x64 Debug With Tests",

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ The `Close` method on AMQP Message Sender and Message Receiver now blocks until
1212

1313
### Bugs Fixed
1414

15+
- Fixed uAMQP connection channel so that a channel is released when an END performative is received from the remote node instead of when the END performative is sent to the remote node.
16+
- Enabled more than one uAMQP session to be created on a single connection.
17+
1518
### Other Changes
1619

1720
## 1.0.0-beta.6 (2024-01-11)

sdk/core/azure-core-amqp/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ find_package(azure_c_shared_utility CONFIG REQUIRED)
6161
set (AZURE_CORE_AMQP_HEADER
6262
inc/azure/core/amqp.hpp
6363
inc/azure/core/amqp/dll_import_export.hpp
64+
inc/azure/core/amqp/internal/amqp_settle_mode.hpp
6465
inc/azure/core/amqp/internal/cancellable.hpp
6566
inc/azure/core/amqp/internal/claims_based_security.hpp
6667
inc/azure/core/amqp/internal/common/async_operation_queue.hpp
@@ -79,6 +80,8 @@ set (AZURE_CORE_AMQP_HEADER
7980
inc/azure/core/amqp/internal/models/message_source.hpp
8081
inc/azure/core/amqp/internal/models/message_target.hpp
8182
inc/azure/core/amqp/internal/models/messaging_values.hpp
83+
inc/azure/core/amqp/internal/models/performatives/amqp_detach.hpp
84+
inc/azure/core/amqp/internal/models/performatives/amqp_transfer.hpp
8285
inc/azure/core/amqp/internal/network/amqp_header_detect_transport.hpp
8386
inc/azure/core/amqp/internal/network/sasl_transport.hpp
8487
inc/azure/core/amqp/internal/network/socket_listener.hpp
@@ -112,23 +115,28 @@ set(AZURE_CORE_AMQP_SOURCE
112115
src/amqp/private/unique_handle.hpp
113116
src/amqp/session.cpp
114117
src/common/global_state.cpp
118+
src/models/amqp_detach.cpp
115119
src/models/amqp_error.cpp
116120
src/models/amqp_header.cpp
117121
src/models/amqp_message.cpp
118122
src/models/amqp_properties.cpp
123+
src/models/amqp_transfer.cpp
119124
src/models/amqp_value.cpp
120125
src/models/message_source.cpp
121126
src/models/message_target.cpp
122127
src/models/messaging_values.cpp
123128
src/models/private/error_impl.hpp
124129
src/models/private/header_impl.hpp
125130
src/models/private/message_impl.hpp
131+
src/models/private/performatives/detach_impl.hpp
132+
src/models/private/performatives/transfer_impl.hpp
126133
src/models/private/properties_impl.hpp
127134
src/models/private/source_impl.hpp
128135
src/models/private/target_impl.hpp
129136
src/models/private/value_impl.hpp
130137
src/network/amqp_header_transport.cpp
131138
src/network/private/transport_impl.hpp
139+
src/network/private/transport_impl.hpp
132140
src/network/sasl_transport.cpp
133141
src/network/socket_listener.cpp
134142
src/network/socket_transport.cpp
@@ -137,6 +145,7 @@ set(AZURE_CORE_AMQP_SOURCE
137145
src/private/package_version.hpp
138146
)
139147

148+
140149
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} $<TARGET_OBJECTS:uamqp>)
141150

142151
if (VENDOR_UAMQP)

sdk/core/azure-core-amqp/cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"words": [
1313
"blang",
1414
"dowork",
15+
"performatives",
1516
"SASLCLIENTIO",
1617
"socketio",
1718
"stringized",
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
#pragma once
5+
6+
#include <iostream>
7+
8+
namespace Azure { namespace Core { namespace Amqp { namespace _internal {
9+
10+
enum class SenderSettleMode
11+
{
12+
Unsettled,
13+
Settled,
14+
Mixed,
15+
};
16+
17+
std::ostream& operator<<(std::ostream& os, SenderSettleMode const& mode);
18+
19+
enum class ReceiverSettleMode
20+
{
21+
First,
22+
Second,
23+
};
24+
std::ostream& operator<<(std::ostream& os, ReceiverSettleMode const& mode);
25+
26+
}}}} // namespace Azure::Core::Amqp::_internal

sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/connection.hpp

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
3535
class TestSocketListenerEvents;
3636
class LinkSocketListenerEvents;
3737
class TestLinks_LinkAttachDetach_Test;
38+
class TestSessions_MultipleSessionBeginEnd_Test;
3839
class TestMessages_SenderOpenClose_Test;
3940
class TestMessages_TestLocalhostVsTls_Test;
4041
class TestMessages_SenderSendAsync_Test;
@@ -166,7 +167,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
166167
*/
167168
class ConnectionEvents {
168169
protected:
169-
~ConnectionEvents(){};
170+
virtual ~ConnectionEvents() = default;
170171

171172
public:
172173
/** @brief Called when the connection state changes.
@@ -181,27 +182,28 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
181182
ConnectionState oldState)
182183
= 0;
183184

184-
/** @brief Called when a new endpoint connects to the connection.
185+
/** @brief called when an I/O error has occurred on the connection.
185186
*
186187
* @param connection The connection object.
187-
* @param endpoint The endpoint that connected.
188-
* @return true if the endpoint was accepted, false otherwise.
189-
*
190-
* @remarks Note that this function should only be overriden if the application is listening
191-
* on the connection.
192188
*/
193-
virtual bool OnNewEndpoint(Connection const& connection, Endpoint& endpoint)
194-
{
195-
(void)connection;
196-
(void)endpoint;
197-
return false;
198-
}
189+
virtual void OnIOError(Connection const& connection) = 0;
190+
};
199191

200-
/** @brief called when an I/O error has occurred on the connection.
192+
class ConnectionEndpointEvents {
193+
protected:
194+
virtual ~ConnectionEndpointEvents() = default;
195+
196+
public:
197+
/** @brief Called when a new endpoint connects to the connection.
201198
*
202199
* @param connection The connection object.
200+
* @param endpoint The endpoint that connected.
201+
* @return true if the endpoint was accepted, false otherwise.
202+
*
203+
* @remarks Note that this function should only be overriden if
204+
* the application is listening on the connection.
203205
*/
204-
virtual void OnIOError(Connection const& connection) = 0;
206+
virtual bool OnNewEndpoint(Connection const& connection, Endpoint& endpoint) = 0;
205207
};
206208

207209
/** @brief Options used to create a connection. */
@@ -299,7 +301,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
299301
Connection(
300302
Network::_internal::Transport const& transport,
301303
ConnectionOptions const& options,
302-
ConnectionEvents* eventHandler = nullptr);
304+
ConnectionEvents* eventHandler,
305+
ConnectionEndpointEvents* endpointEvents);
303306

304307
/** @brief Destroy an AMQP connection */
305308
~Connection();
@@ -457,6 +460,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
457460
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionAttributes_Test;
458461
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionOpenClose_Test;
459462
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionListenClose_Test;
463+
friend class Azure::Core::Amqp::Tests::TestSessions_MultipleSessionBeginEnd_Test;
460464
friend class Azure::Core::Amqp::Tests::TestLinks_LinkAttachDetach_Test;
461465
friend class Azure::Core::Amqp::Tests::TestMessages_SenderOpenClose_Test;
462466
friend class Azure::Core::Amqp::Tests::TestMessages_TestLocalhostVsTls_Test;

sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/endpoint.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
2222
class LinkEndpointFactory;
2323
}}}} // namespace Azure::Core::Amqp::_detail
2424
namespace Azure { namespace Core { namespace Amqp { namespace _internal {
25+
class ConnectionEvents;
2526

2627
// An "Endpoint" is an intermediate type used to create sessions in an OnNewSession callback.
2728
class Endpoint final {
@@ -43,6 +44,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
4344
return rv;
4445
}
4546
friend class _detail::EndpointFactory;
47+
friend class _internal::ConnectionEvents;
4648
};
4749

4850
// A "Link Endpoint" is an intermediate type used to create new Links in an OnLinkAttached
@@ -58,6 +60,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
5860
}
5961
~LinkEndpoint(){};
6062

63+
LINK_ENDPOINT_INSTANCE_TAG* Get() const { return m_endpoint; }
64+
std::uint32_t GetHandle() const;
65+
6166
private:
6267
LINK_ENDPOINT_INSTANCE_TAG* m_endpoint;
6368

sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
#include "azure/core/amqp/internal/models/message_source.hpp"
77
#include "azure/core/amqp/internal/models/message_target.hpp"
8+
#include "azure/core/amqp/internal/models/performatives/amqp_transfer.hpp"
89
#include "azure/core/amqp/models/amqp_value.hpp"
10+
#include "azure/core/context.hpp"
911

1012
#include <chrono>
1113
#include <memory>
@@ -40,11 +42,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
4042
Invalid,
4143
Detached,
4244
HalfAttachedAttachSent,
43-
HalfAttachAttachReceived,
45+
HalfAttachedAttachReceived,
4446
Attached,
4547
Error,
4648
};
4749

50+
std::ostream& operator<<(std::ostream& stream, LinkState const& linkState);
51+
4852
enum class LinkTransferResult
4953
{
5054
Error,
@@ -58,24 +62,62 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
5862
NotDelivered,
5963
Timeout,
6064
Cancelled,
65+
Invalid
66+
};
67+
68+
class Link;
69+
70+
class LinkEvents {
71+
public:
72+
virtual Models::AmqpValue OnTransferReceived(
73+
#if defined(TESTING_BUILD)
74+
Link const& link,
75+
#else
76+
std::shared_ptr<LinkImpl> link,
77+
#endif
78+
Models::_internal::Performatives::AmqpTransfer transfer,
79+
uint32_t payloadSize,
80+
const unsigned char* payloadBytes)
81+
= 0;
82+
virtual void OnLinkStateChanged(
83+
#if defined(TESTING_BUILD)
84+
Link const& link,
85+
#else
86+
std::shared_ptr<LinkImpl> link,
87+
#endif
88+
LinkState newLinkState,
89+
LinkState previousLinkState)
90+
= 0;
91+
virtual void OnLinkFlowOn(
92+
#if defined(TESTING_BUILD)
93+
Link const& link
94+
#else
95+
std::shared_ptr<LinkImpl> link
96+
#endif
97+
)
98+
= 0;
99+
virtual ~LinkEvents() = default;
61100
};
62101

63102
#if defined(TESTING_BUILD)
103+
64104
class Link final {
65105
public:
66106
Link(
67107
_internal::Session const& session,
68108
std::string const& name,
69109
_internal::SessionRole role,
70110
Models::_internal::MessageSource const& source,
71-
Models::_internal::MessageTarget const& target);
111+
Models::_internal::MessageTarget const& target,
112+
LinkEvents* events = nullptr);
72113
Link(
73114
_internal::Session const& session,
74115
_internal::LinkEndpoint& linkEndpoint,
75116
std::string const& name,
76117
_internal::SessionRole role,
77118
Models::_internal::MessageSource const& source,
78-
Models::_internal::MessageTarget const& target);
119+
Models::_internal::MessageTarget const& target,
120+
LinkEvents* events = nullptr);
79121
~Link() noexcept;
80122

81123
Link(Link const&) = default;
@@ -97,9 +139,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
97139

98140
uint64_t GetPeerMaxMessageSize() const;
99141

100-
void SetAttachProperties(Models::AmqpValue attachProperties);
142+
void SetAttachProperties(Models::AmqpValue const& attachProperties);
101143
void SetMaxLinkCredit(uint32_t maxLinkCredit);
102144

145+
void SetDesiredCapabilities(Models::AmqpValue const& desiredCapabilities);
146+
Models::AmqpValue GetDesiredCapabilities() const;
147+
148+
void ResetLinkCredit(std::uint32_t linkCredit, bool drain);
149+
103150
std::string GetName() const;
104151

105152
Models::_internal::MessageTarget const& GetTarget() const;
@@ -109,13 +156,18 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
109156

110157
void Attach();
111158

159+
std::tuple<uint32_t, LinkDeliverySettleReason, Models::AmqpValue> Transfer(
160+
std::vector<uint8_t> const& payload,
161+
Azure::Core::Context const& context);
162+
112163
void Detach(
113164
bool close,
114165
std::string const& errorCondition,
115166
std::string const& errorDescription,
116-
Models::AmqpValue& info);
167+
const Models::AmqpValue& info);
117168

118169
private:
170+
friend class LinkImpl;
119171
Link(std::shared_ptr<LinkImpl> impl) : m_impl{impl} {}
120172

121173
std::shared_ptr<LinkImpl> m_impl;

0 commit comments

Comments
 (0)