Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
55dff8b
Implemented connection handle table to fix half closed session problem
LarryOsterman Jan 23, 2024
7954879
Correctly handle session begin after connection is open
LarryOsterman Jan 26, 2024
faa2b1c
Start adding back multithreaded processor tests; clang-format
LarryOsterman Jan 29, 2024
b2983b2
build fixes to clang-format
LarryOsterman Jan 29, 2024
a9947da
Merge branch 'Azure:main' into larryo/session_end_handle
LarryOsterman Jan 29, 2024
9966a26
cspell
LarryOsterman Jan 29, 2024
6b7f4e0
Merge branch 'larryo/session_end_handle' of https://github.com/LarryO…
LarryOsterman Jan 29, 2024
02c61ac
cspell2
LarryOsterman Jan 29, 2024
1c0d89b
Fixed non-test build error
LarryOsterman Jan 29, 2024
a252abf
cspell
LarryOsterman Jan 29, 2024
50d5710
added more tests to improve coverage
LarryOsterman Jan 29, 2024
d083ee4
Improved test coverage
LarryOsterman Jan 29, 2024
09aee5c
Improved test coverage2
LarryOsterman Jan 30, 2024
fccafd5
Test fix
LarryOsterman Jan 30, 2024
062bc17
Split out amqp settle mode to separate header; added test coverage fo…
LarryOsterman Jan 30, 2024
0244466
PR feedback
LarryOsterman Jan 30, 2024
eb76aac
clang-format
LarryOsterman Jan 30, 2024
2ae7809
clang-format; more code coverage
LarryOsterman Jan 30, 2024
1d29501
Still more code coverage
LarryOsterman Jan 30, 2024
624d849
clang-format
LarryOsterman Jan 30, 2024
bb9d692
bumped code coverage target to 88%
LarryOsterman Jan 30, 2024
5d2416c
Pull request feedback
LarryOsterman Jan 30, 2024
e5e01e7
clang-format
LarryOsterman Jan 30, 2024
25de134
Test fixes
LarryOsterman Jan 30, 2024
d263b63
Use absolute path for settle mode header
LarryOsterman Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ IncludeCategories:
Priority: 90
- Regex: '<Azure/.*>'
Priority: 50
- Regex: '^<azure_uamqp_c.*/(amqp_definitions_sequence_no|amqp_definitions_milliseconds|amqp_definitions_terminus_expiry_policy|amqp_definitions_terminus_durability|amqp_definitions_fields).h>$'
- Regex: '^<azure_uamqp_c.*/(amqp_definitions_fields).h>$'
Priority: 59
- Regex: '^<azure_uamqp_c.*/(amqp_definitions_sequence_no|amqp_definitions_milliseconds|amqp_definitions_terminus_expiry_policy|amqp_definitions_terminus_durability|amqp_definitions_number|amqp_definitions_error|amqp_definitions_handle).h>$'
Priority: 60
- Regex: '<azure_.*>'
Priority: 70
Expand Down
7 changes: 7 additions & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,16 @@
"filename": "CMakePresets.json",
"words": [
"ASAN",
"asan",
"fsanitize"
]
},
{
"filename": ".clang-format",
"words": [
"Dont"
]
},
{
"filename": "**/eng/pipelines/templates/**/*.yml",
"words": [
Expand Down
10 changes: 10 additions & 0 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,16 @@
"RUN_LONG_UNIT_TESTS": true
}
},
{
"name": "x86-static-release",
"displayName": "x86 Release, static",
"description": "Windows x86 Release build",
"inherits": [
"x86-static",
"release-build"
]

},
{
"name": "x64-debug-tests",
"displayName": "x64 Debug With Tests",
Expand Down
8 changes: 8 additions & 0 deletions sdk/core/azure-core-amqp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ set (AZURE_CORE_AMQP_HEADER
inc/azure/core/amqp/internal/models/message_source.hpp
inc/azure/core/amqp/internal/models/message_target.hpp
inc/azure/core/amqp/internal/models/messaging_values.hpp
inc/azure/core/amqp/internal/models/performatives/amqp_detach.hpp
inc/azure/core/amqp/internal/models/performatives/amqp_transfer.hpp
inc/azure/core/amqp/internal/network/amqp_header_detect_transport.hpp
inc/azure/core/amqp/internal/network/sasl_transport.hpp
inc/azure/core/amqp/internal/network/socket_listener.hpp
Expand Down Expand Up @@ -112,23 +114,28 @@ set(AZURE_CORE_AMQP_SOURCE
src/amqp/private/unique_handle.hpp
src/amqp/session.cpp
src/common/global_state.cpp
src/models/amqp_detach.cpp
src/models/amqp_error.cpp
src/models/amqp_header.cpp
src/models/amqp_message.cpp
src/models/amqp_properties.cpp
src/models/amqp_transfer.cpp
src/models/amqp_value.cpp
src/models/message_source.cpp
src/models/message_target.cpp
src/models/messaging_values.cpp
src/models/private/error_impl.hpp
src/models/private/header_impl.hpp
src/models/private/message_impl.hpp
src/models/private/performatives/detach_impl.hpp
src/models/private/performatives/transfer_impl.hpp
src/models/private/properties_impl.hpp
src/models/private/source_impl.hpp
src/models/private/target_impl.hpp
src/models/private/value_impl.hpp
src/network/amqp_header_transport.cpp
src/network/private/transport_impl.hpp
src/network/private/transport_impl.hpp
src/network/sasl_transport.cpp
src/network/socket_listener.cpp
src/network/socket_transport.cpp
Expand All @@ -137,6 +144,7 @@ set(AZURE_CORE_AMQP_SOURCE
src/private/package_version.hpp
)


add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} $<TARGET_OBJECTS:uamqp>)

if (VENDOR_UAMQP)
Expand Down
1 change: 1 addition & 0 deletions sdk/core/azure-core-amqp/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"words": [
"blang",
"dowork",
"performatives",
"SASLCLIENTIO",
"socketio",
"stringized",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
class TestSocketListenerEvents;
class LinkSocketListenerEvents;
class TestLinks_LinkAttachDetach_Test;
class TestSessions_MultipleSessionBeginEnd_Test;
class TestMessages_SenderOpenClose_Test;
class TestMessages_TestLocalhostVsTls_Test;
class TestMessages_SenderSendAsync_Test;
Expand Down Expand Up @@ -166,7 +167,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
class ConnectionEvents {
protected:
~ConnectionEvents(){};
virtual ~ConnectionEvents(){};

public:
/** @brief Called when the connection state changes.
Expand All @@ -181,27 +182,28 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
ConnectionState oldState)
= 0;

/** @brief Called when a new endpoint connects to the connection.
/** @brief called when an I/O error has occurred on the connection.
*
* @param connection The connection object.
* @param endpoint The endpoint that connected.
* @return true if the endpoint was accepted, false otherwise.
*
* @remarks Note that this function should only be overriden if the application is listening
* on the connection.
*/
virtual bool OnNewEndpoint(Connection const& connection, Endpoint& endpoint)
{
(void)connection;
(void)endpoint;
return false;
}
virtual void OnIOError(Connection const& connection) = 0;
};

/** @brief called when an I/O error has occurred on the connection.
class ConnectionEndpointEvents {
protected:
virtual ~ConnectionEndpointEvents(){};

public:
/** @brief Called when a new endpoint connects to the connection.
*
* @param connection The connection object.
* @param endpoint The endpoint that connected.
* @return true if the endpoint was accepted, false otherwise.
*
* @remarks Note that this function should only be overriden if
* the application is listening on the connection.
*/
virtual void OnIOError(Connection const& connection) = 0;
virtual bool OnNewEndpoint(Connection const& connection, Endpoint& endpoint) = 0;
};

/** @brief Options used to create a connection. */
Expand Down Expand Up @@ -299,7 +301,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Connection(
Network::_internal::Transport const& transport,
ConnectionOptions const& options,
ConnectionEvents* eventHandler = nullptr);
ConnectionEvents* eventHandler,
ConnectionEndpointEvents* endpointEvents);

/** @brief Destroy an AMQP connection */
~Connection();
Expand Down Expand Up @@ -457,6 +460,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionAttributes_Test;
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionOpenClose_Test;
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionListenClose_Test;
friend class Azure::Core::Amqp::Tests::TestSessions_MultipleSessionBeginEnd_Test;
friend class Azure::Core::Amqp::Tests::TestLinks_LinkAttachDetach_Test;
friend class Azure::Core::Amqp::Tests::TestMessages_SenderOpenClose_Test;
friend class Azure::Core::Amqp::Tests::TestMessages_TestLocalhostVsTls_Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
class LinkEndpointFactory;
}}}} // namespace Azure::Core::Amqp::_detail
namespace Azure { namespace Core { namespace Amqp { namespace _internal {
class ConnectionEvents;

// An "Endpoint" is an intermediate type used to create sessions in an OnNewSession callback.
class Endpoint final {
Expand All @@ -43,6 +44,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
return rv;
}
friend class _detail::EndpointFactory;
friend class _internal::ConnectionEvents;
};

// A "Link Endpoint" is an intermediate type used to create new Links in an OnLinkAttached
Expand All @@ -58,6 +60,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
}
~LinkEndpoint(){};

LINK_ENDPOINT_INSTANCE_TAG* Get() const { return m_endpoint; }
std::uint32_t GetHandle() const;

private:
LINK_ENDPOINT_INSTANCE_TAG* m_endpoint;

Expand Down
55 changes: 51 additions & 4 deletions sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

#include "azure/core/amqp/internal/models/message_source.hpp"
#include "azure/core/amqp/internal/models/message_target.hpp"
#include "azure/core/amqp/internal/models/performatives/amqp_transfer.hpp"
#include "azure/core/amqp/models/amqp_value.hpp"
#include "azure/core/context.hpp"

#include <chrono>
#include <memory>
Expand Down Expand Up @@ -40,11 +42,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Invalid,
Detached,
HalfAttachedAttachSent,
HalfAttachAttachReceived,
HalfAttachedAttachReceived,
Attached,
Error,
};

std::ostream& operator<<(std::ostream& stream, LinkState const& linkState);

enum class LinkTransferResult
{
Error,
Expand All @@ -58,24 +62,62 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
NotDelivered,
Timeout,
Cancelled,
Invalid
};

class Link;

class LinkEvents {
public:
virtual Models::AmqpValue OnTransferReceived(
#if defined(TESTING_BUILD)
Link const& link,
#else
std::shared_ptr<LinkImpl> link,
#endif
Models::_internal::Performatives::AmqpTransfer transfer,
uint32_t payloadSize,
const unsigned char* payloadBytes)
= 0;
virtual void OnLinkStateChanged(
#if defined(TESTING_BUILD)
Link const& link,
#else
std::shared_ptr<LinkImpl> link,
#endif
LinkState newLinkState,
LinkState previousLinkState)
= 0;
virtual void OnLinkFlowOn(
#if defined(TESTING_BUILD)
Link const& link
#else
std::shared_ptr<LinkImpl> link
#endif
)
= 0;
virtual ~LinkEvents() = default;
};

#if defined(TESTING_BUILD)

class Link final {
public:
Link(
_internal::Session const& session,
std::string const& name,
_internal::SessionRole role,
Models::_internal::MessageSource const& source,
Models::_internal::MessageTarget const& target);
Models::_internal::MessageTarget const& target,
LinkEvents* events = nullptr);
Link(
_internal::Session const& session,
_internal::LinkEndpoint& linkEndpoint,
std::string const& name,
_internal::SessionRole role,
Models::_internal::MessageSource const& source,
Models::_internal::MessageTarget const& target);
Models::_internal::MessageTarget const& target,
LinkEvents* events = nullptr);
~Link() noexcept;

Link(Link const&) = default;
Expand Down Expand Up @@ -109,13 +151,18 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {

void Attach();

std::tuple<uint32_t, LinkDeliverySettleReason, Models::AmqpValue> Transfer(
std::vector<uint8_t> const& payload,
Azure::Core::Context const& context);

void Detach(
bool close,
std::string const& errorCondition,
std::string const& errorDescription,
Models::AmqpValue& info);
const Models::AmqpValue& info);

private:
friend class LinkImpl;
Link(std::shared_ptr<LinkImpl> impl) : m_impl{impl} {}

std::shared_ptr<LinkImpl> m_impl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
MessageReceiver const& receiver,
std::shared_ptr<Models::AmqpMessage> const& message)
= 0;
virtual void OnMessageReceiverDisconnected(Models::_internal::AmqpError const& error) = 0;
virtual void OnMessageReceiverDisconnected(
MessageReceiver const& receiver,
Models::_internal::AmqpError const& error)
= 0;
};

/** @brief MessageReceiver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
MessageSenderState newState,
MessageSenderState oldState)
= 0;
virtual void OnMessageSenderDisconnected(Models::_internal::AmqpError const& error) = 0;
virtual void OnMessageSenderDisconnected(
MessageSender const& sender,
Models::_internal::AmqpError const& error)
= 0;
};

struct MessageSenderOptions final
Expand Down Expand Up @@ -142,6 +145,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
void Close(Context const& context = {});

/** @brief Gets the name of the underlying link.
*
* @return The name of the underlying link object.
*/
std::string GetLinkName() const;

/** @brief Returns the link negotiated maximum message size
*
* @return The negotiated maximum message size.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//
// This file contains protocol level definitions for the AMQP protocol.

#pragma once

#include "azure/core/amqp/internal/models/amqp_error.hpp"

#include <cstdint>

namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal {
namespace Performatives {

/** Detach Performative (0x00000000:0x00000016)
*
* See
* https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-detach
* for more information.
*/
class AmqpDetach {
public:
AmqpDetach() = default;

uint32_t Handle{};
bool Closed{};
AmqpError Error;
};
}}}}}} // namespace Azure::Core::Amqp::Models::_internal::Performatives
Loading