diff --git a/src/Http/Wolverine.Http.Tests/broker_role_tests.cs b/src/Http/Wolverine.Http.Tests/broker_role_tests.cs new file mode 100644 index 000000000..4456eff8b --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/broker_role_tests.cs @@ -0,0 +1,16 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Http.Transport; +using Xunit; + +namespace Wolverine.Http.Tests; + +public class broker_role_tests +{ + [Fact] + public void http_endpoint_broker_role_is_route() + { + new HttpEndpoint(new Uri("http://localhost:5000/orders"), EndpointRole.Application) + .BrokerRole.ShouldBe("route"); + } +} diff --git a/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs b/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs index 74f420a1b..191d3dfcb 100644 --- a/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs +++ b/src/Http/Wolverine.Http/Transport/HttpEndpoint.cs @@ -12,6 +12,7 @@ public class HttpEndpoint : Endpoint { public HttpEndpoint(Uri uri, EndpointRole role) : base(uri, role) { + BrokerRole = "route"; } internal bool SupportsNativeScheduledSend { get; set; } diff --git a/src/Persistence/MySql/MySqlTests/Transport/broker_role_tests.cs b/src/Persistence/MySql/MySqlTests/Transport/broker_role_tests.cs new file mode 100644 index 000000000..fb7628e92 --- /dev/null +++ b/src/Persistence/MySql/MySqlTests/Transport/broker_role_tests.cs @@ -0,0 +1,14 @@ +using Shouldly; +using Wolverine.MySql.Transport; +using Xunit; + +namespace MySqlTests.Transport; + +public class broker_role_tests +{ + [Fact] + public void mysql_queue_broker_role_is_queue() + { + new MySqlQueue("q", new MySqlTransport()).BrokerRole.ShouldBe("queue"); + } +} diff --git a/src/Persistence/MySql/Wolverine.MySql/Transport/MySqlQueue.cs b/src/Persistence/MySql/Wolverine.MySql/Transport/MySqlQueue.cs index d2c16bfe2..2640d1cc6 100644 --- a/src/Persistence/MySql/Wolverine.MySql/Transport/MySqlQueue.cs +++ b/src/Persistence/MySql/Wolverine.MySql/Transport/MySqlQueue.cs @@ -37,6 +37,7 @@ public MySqlQueue(string name, MySqlTransport parent, EndpointRole role = Endpoi Mode = EndpointMode.Durable; Name = name; EndpointName = name; + BrokerRole = "queue"; _queueTable = new Lazy(() => new QueueTable(Parent, queueTableName)); _scheduledMessageTable = diff --git a/src/Persistence/Oracle/OracleTests/Transport/broker_role_tests.cs b/src/Persistence/Oracle/OracleTests/Transport/broker_role_tests.cs new file mode 100644 index 000000000..67c27d43c --- /dev/null +++ b/src/Persistence/Oracle/OracleTests/Transport/broker_role_tests.cs @@ -0,0 +1,14 @@ +using Shouldly; +using Wolverine.Oracle.Transport; +using Xunit; + +namespace OracleTests.Transport; + +public class broker_role_tests +{ + [Fact] + public void oracle_queue_broker_role_is_queue() + { + new OracleQueue("q", new OracleTransport()).BrokerRole.ShouldBe("queue"); + } +} diff --git a/src/Persistence/Oracle/Wolverine.Oracle/Transport/OracleQueue.cs b/src/Persistence/Oracle/Wolverine.Oracle/Transport/OracleQueue.cs index 610573a77..ec726d954 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/Transport/OracleQueue.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/Transport/OracleQueue.cs @@ -37,6 +37,7 @@ public OracleQueue(string name, OracleTransport parent, EndpointRole role = Endp Mode = EndpointMode.Durable; Name = name; EndpointName = name; + BrokerRole = "queue"; _queueTable = new Lazy(() => new QueueTable(Parent, queueTableName)); _scheduledMessageTable = diff --git a/src/Persistence/PostgresqlTests/Transport/broker_role_tests.cs b/src/Persistence/PostgresqlTests/Transport/broker_role_tests.cs new file mode 100644 index 000000000..e46385e29 --- /dev/null +++ b/src/Persistence/PostgresqlTests/Transport/broker_role_tests.cs @@ -0,0 +1,14 @@ +using Shouldly; +using Wolverine.Postgresql.Transport; +using Xunit; + +namespace PostgresqlTests.Transport; + +public class broker_role_tests +{ + [Fact] + public void postgresql_queue_broker_role_is_queue() + { + new PostgresqlQueue("q", new PostgresqlTransport()).BrokerRole.ShouldBe("queue"); + } +} diff --git a/src/Persistence/SqlServerTests/Transport/broker_role_tests.cs b/src/Persistence/SqlServerTests/Transport/broker_role_tests.cs new file mode 100644 index 000000000..9a1f899d2 --- /dev/null +++ b/src/Persistence/SqlServerTests/Transport/broker_role_tests.cs @@ -0,0 +1,21 @@ +using IntegrationTests; +using Shouldly; +using Wolverine.RDBMS; +using Wolverine.SqlServer.Transport; +using Xunit; + +namespace SqlServerTests.Transport; + +public class broker_role_tests +{ + [Fact] + public void sql_server_queue_broker_role_is_queue() + { + var transport = new SqlServerTransport(new DatabaseSettings + { + ConnectionString = Servers.SqlServerConnectionString, + SchemaName = "transport" + }); + new SqlServerQueue("q", transport).BrokerRole.ShouldBe("queue"); + } +} diff --git a/src/Persistence/SqliteTests/Transport/broker_role_tests.cs b/src/Persistence/SqliteTests/Transport/broker_role_tests.cs new file mode 100644 index 000000000..2a2d3a6ad --- /dev/null +++ b/src/Persistence/SqliteTests/Transport/broker_role_tests.cs @@ -0,0 +1,14 @@ +using Shouldly; +using Wolverine.Sqlite.Transport; +using Xunit; + +namespace SqliteTests.Transport; + +public class broker_role_tests +{ + [Fact] + public void sqlite_queue_broker_role_is_queue() + { + new SqliteQueue("q", new SqliteTransport()).BrokerRole.ShouldBe("queue"); + } +} diff --git a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs index 5fdcfe9cb..06d314ef3 100644 --- a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs +++ b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs @@ -37,8 +37,9 @@ public PostgresqlQueue(string name, PostgresqlTransport parent, EndpointRole rol Mode = EndpointMode.Durable; Name = name; EndpointName = name; + BrokerRole = "queue"; - // Gotta be lazy so the schema names get set + // Gotta be lazy so the schema names get set _queueTable = new Lazy(() => new QueueTable(Parent, queueTableName)); _scheduledMessageTable = new Lazy(() => new ScheduledMessageTable(Parent, scheduledTableName)); diff --git a/src/Persistence/Wolverine.Postgresql/Transport/TenantedPostgresqlQueue.cs b/src/Persistence/Wolverine.Postgresql/Transport/TenantedPostgresqlQueue.cs index fd18b2369..741612a71 100644 --- a/src/Persistence/Wolverine.Postgresql/Transport/TenantedPostgresqlQueue.cs +++ b/src/Persistence/Wolverine.Postgresql/Transport/TenantedPostgresqlQueue.cs @@ -18,6 +18,7 @@ public TenantedPostgresqlQueue(PostgresqlQueue parent, NpgsqlDataSource dataSour _parent = parent; _dataSource = dataSource; _databaseName = databaseName; + BrokerRole = "queue"; } public override async ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) diff --git a/src/Persistence/Wolverine.RDBMS/Transport/DatabaseControlEndpoint.cs b/src/Persistence/Wolverine.RDBMS/Transport/DatabaseControlEndpoint.cs index 49a5b4c2e..beb7d6840 100644 --- a/src/Persistence/Wolverine.RDBMS/Transport/DatabaseControlEndpoint.cs +++ b/src/Persistence/Wolverine.RDBMS/Transport/DatabaseControlEndpoint.cs @@ -17,6 +17,7 @@ public DatabaseControlEndpoint(DatabaseControlTransport parent, Guid nodeId) : b _parent = parent; Mode = EndpointMode.BufferedInMemory; MaxDegreeOfParallelism = 1; + BrokerRole = "queue"; // No otel for this one! TelemetryEnabled = false; diff --git a/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs b/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs index 287231ca4..68e2745ff 100644 --- a/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs +++ b/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs @@ -17,6 +17,7 @@ public ExternalMessageTable(DbObjectName tableName) : base(new Uri($"{ExternalDb IsListener = true; TableName = tableName; Mode = EndpointMode.Durable; + BrokerRole = "table"; } protected override bool supportsMode(EndpointMode mode) diff --git a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs index 0cd8c9555..8775bbb86 100644 --- a/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs +++ b/src/Persistence/Wolverine.SqlServer/Transport/SqlServerQueue.cs @@ -42,6 +42,7 @@ public SqlServerQueue(string name, SqlServerTransport parent, EndpointRole role Mode = EndpointMode.Durable; Name = name; EndpointName = name; + BrokerRole = "queue"; // Gotta be lazy so the schema names get set _queueTable = new Lazy(() => new QueueTable(Parent, _queueTableName)); diff --git a/src/Persistence/Wolverine.Sqlite/Transport/SqliteQueue.cs b/src/Persistence/Wolverine.Sqlite/Transport/SqliteQueue.cs index 12231ebf0..c52fed14b 100644 --- a/src/Persistence/Wolverine.Sqlite/Transport/SqliteQueue.cs +++ b/src/Persistence/Wolverine.Sqlite/Transport/SqliteQueue.cs @@ -41,6 +41,7 @@ public SqliteQueue(string name, SqliteTransport parent, EndpointRole role = Endp Mode = EndpointMode.Durable; Name = name; EndpointName = name; + BrokerRole = "queue"; _queueTable = new Lazy(() => new QueueTable(Parent, queueTableName)); _scheduledMessageTable = diff --git a/src/Testing/CoreTests/Configuration/broker_role_tests.cs b/src/Testing/CoreTests/Configuration/broker_role_tests.cs new file mode 100644 index 000000000..1bac9618a --- /dev/null +++ b/src/Testing/CoreTests/Configuration/broker_role_tests.cs @@ -0,0 +1,75 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Configuration.Capabilities; +using Wolverine.Transports.Local; +using Wolverine.Transports.SharedMemory; +using Wolverine.Transports.Stub; +using Wolverine.Transports.Tcp; +using Xunit; + +namespace CoreTests.Configuration; + +/// +/// Locks down GH-2601: every endpoint kind exposes a short, non-empty +/// string identifying the underlying +/// broker object kind ("queue", "exchange", "topic", "subscription", +/// "stream", etc.). CritterWatch and other diagnostic UIs read this. +/// +public class broker_role_tests +{ + [Fact] + public void base_default_when_subclass_does_not_set_broker_role() + { + // The base Endpoint default — should be a sentinel value, not empty, + // so a custom subclass that forgets to set BrokerRole still produces + // something a UI can render. + new TestEndpoint(EndpointRole.Application).BrokerRole.ShouldBe("endpoint"); + } + + [Theory] + [MemberData(nameof(CoreEndpoints))] + public void core_endpoint_has_expected_broker_role(Endpoint endpoint, string expectedRole) + { + endpoint.BrokerRole.ShouldBe(expectedRole); + } + + [Fact] + public void endpoint_descriptor_lifts_broker_role_and_endpoint_role() + { + // EndpointDescriptor is what CritterWatch reads. Both the existing + // EndpointRole (System / Application) and the new BrokerRole string + // should be first-class properties on the descriptor, populated from + // the underlying Endpoint. See GH-2601. + var queue = new LocalQueue("queue-x"); + var descriptor = new EndpointDescriptor(queue); + + descriptor.BrokerRole.ShouldBe("queue"); + descriptor.EndpointRole.ShouldBe(EndpointRole.Application); + } + + [Fact] + public void endpoint_descriptor_endpoint_role_reflects_system_endpoints() + { + var systemSubscription = new SharedMemorySubscription( + new SharedMemoryTopic("topic-y"), "sub-y", EndpointRole.System); + var descriptor = new EndpointDescriptor(systemSubscription); + + descriptor.EndpointRole.ShouldBe(EndpointRole.System); + descriptor.BrokerRole.ShouldBe("subscription"); + } + + public static TheoryData CoreEndpoints() + { + var stubTransport = new StubTransport(); + var sharedTopic = new SharedMemoryTopic("topic-x"); + + return new TheoryData + { + { new LocalQueue("queue-x"), "queue" }, + { new StubEndpoint("stub-x", stubTransport), "stub" }, + { new TcpEndpoint("localhost", 2222), "socket" }, + { sharedTopic, "topic" }, + { new SharedMemorySubscription(sharedTopic, "sub-x", EndpointRole.Application), "subscription" }, + }; + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSns.Tests/broker_role_tests.cs b/src/Transports/AWS/Wolverine.AmazonSns.Tests/broker_role_tests.cs new file mode 100644 index 000000000..5fc390237 --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSns.Tests/broker_role_tests.cs @@ -0,0 +1,14 @@ +using Shouldly; +using Wolverine.AmazonSns.Internal; +using Xunit; + +namespace Wolverine.AmazonSns.Tests; + +public class broker_role_tests +{ + [Fact] + public void sns_topic_broker_role_is_topic() + { + new AmazonSnsTopic("t", new AmazonSnsTransport()).BrokerRole.ShouldBe("topic"); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSns/Internal/AmazonSnsTopic.cs b/src/Transports/AWS/Wolverine.AmazonSns/Internal/AmazonSnsTopic.cs index a8e7946cb..905b35349 100644 --- a/src/Transports/AWS/Wolverine.AmazonSns/Internal/AmazonSnsTopic.cs +++ b/src/Transports/AWS/Wolverine.AmazonSns/Internal/AmazonSnsTopic.cs @@ -21,13 +21,14 @@ internal AmazonSnsTopic(string topicName, AmazonSnsTransport parent) : base(new Uri($"{AmazonSnsTransport.SnsProtocol}://{topicName}"), EndpointRole.Application) { Parent = parent; - + TopicName = topicName; EndpointName = topicName; TopicArn = string.Empty; + BrokerRole = "topic"; Configuration = new CreateTopicRequest(TopicName); - + MessageBatchSize = 10; } diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/broker_role_tests.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/broker_role_tests.cs new file mode 100644 index 000000000..345d7eb0c --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/broker_role_tests.cs @@ -0,0 +1,14 @@ +using Shouldly; +using Wolverine.AmazonSqs.Internal; +using Xunit; + +namespace Wolverine.AmazonSqs.Tests; + +public class broker_role_tests +{ + [Fact] + public void sqs_queue_broker_role_is_queue() + { + new AmazonSqsQueue("q", new AmazonSqsTransport()).BrokerRole.ShouldBe("queue"); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs index 2ba62099e..1959d5ec6 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/Internal/AmazonSqsQueue.cs @@ -30,6 +30,7 @@ internal AmazonSqsQueue(string queueName, AmazonSqsTransport parent) : base( _parent = parent; QueueName = queueName; EndpointName = queueName; + BrokerRole = "queue"; Configuration = new CreateQueueRequest(QueueName); diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/broker_role_tests.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/broker_role_tests.cs new file mode 100644 index 000000000..a1962ca4c --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/broker_role_tests.cs @@ -0,0 +1,31 @@ +using Shouldly; +using Wolverine.AzureServiceBus.Internal; +using Xunit; + +namespace Wolverine.AzureServiceBus.Tests; + +// Locks down GH-2601 for the Azure Service Bus endpoints. +public class broker_role_tests +{ + [Fact] + public void queue_broker_role_is_queue() + { + var transport = new AzureServiceBusTransport(); + new AzureServiceBusQueue(transport, "q").BrokerRole.ShouldBe("queue"); + } + + [Fact] + public void topic_broker_role_is_topic() + { + var transport = new AzureServiceBusTransport(); + new AzureServiceBusTopic(transport, "t").BrokerRole.ShouldBe("topic"); + } + + [Fact] + public void subscription_broker_role_is_subscription() + { + var transport = new AzureServiceBusTransport(); + var topic = new AzureServiceBusTopic(transport, "t"); + new AzureServiceBusSubscription(transport, topic, "s").BrokerRole.ShouldBe("subscription"); + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusQueue.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusQueue.cs index f80e6cbf8..6a04944a1 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusQueue.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusQueue.cs @@ -35,6 +35,7 @@ public AzureServiceBusQueue(AzureServiceBusTransport parent, string queueName, { DeadLetteringOnMessageExpiration = false }; + BrokerRole = "queue"; } [ChildDescription] diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs index e95bbad49..dceb12739 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs @@ -40,6 +40,7 @@ public AzureServiceBusSubscription(AzureServiceBusTransport parent, AzureService // This is the same rule as the one used if you // use CreateSubscriptionAsync() without specifying a rule RuleOptions = new CreateRuleOptions(); + BrokerRole = "subscription"; } [ChildDescription] diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs index 85059448c..82966cc01 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs @@ -30,6 +30,7 @@ public AzureServiceBusTopic(AzureServiceBusTransport parent, string topicName) : TopicName = EndpointName = topicName ?? throw new ArgumentNullException(nameof(topicName)); Options = new CreateTopicOptions(TopicName); + BrokerRole = "topic"; } public string TopicName { get; } diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/broker_role_tests.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/broker_role_tests.cs new file mode 100644 index 000000000..b359f32a1 --- /dev/null +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/broker_role_tests.cs @@ -0,0 +1,23 @@ +using Google.Api.Gax; +using Google.Cloud.PubSub.V1; +using NSubstitute; +using Shouldly; +using Xunit; + +namespace Wolverine.Pubsub.Tests; + +public class broker_role_tests +{ + [Fact] + public void pubsub_endpoint_broker_role_is_pubsub() + { + var transport = new PubsubTransport("wolverine") + { + PublisherApiClient = Substitute.For(), + SubscriberApiClient = Substitute.For(), + EmulatorDetection = EmulatorDetection.EmulatorOnly + }; + + new PubsubEndpoint("foo", transport).BrokerRole.ShouldBe("pubsub"); + } +} diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs index e5113f310..f6694669a 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs @@ -55,6 +55,7 @@ public PubsubEndpoint( : topicName ); EndpointName = topicName; + BrokerRole = "pubsub"; if (transport.DeadLetter.Enabled) { diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/broker_role_tests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/broker_role_tests.cs new file mode 100644 index 000000000..111eda326 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/broker_role_tests.cs @@ -0,0 +1,15 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Kafka.Internals; +using Xunit; + +namespace Wolverine.Kafka.Tests; + +public class broker_role_tests +{ + [Fact] + public void kafka_topic_broker_role_is_topic() + { + new KafkaTopic(new KafkaTransport(), "t", EndpointRole.Application).BrokerRole.ShouldBe("topic"); + } +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index dd7359781..9ff639937 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -24,6 +24,7 @@ public KafkaTopic(KafkaTransport parent, string topicName, EndpointRole role) : Parent = parent; EndpointName = topicName; TopicName = topicName; + BrokerRole = "topic"; Specification.Name = topicName; } diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/broker_role_tests.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/broker_role_tests.cs new file mode 100644 index 000000000..383a057c7 --- /dev/null +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/broker_role_tests.cs @@ -0,0 +1,16 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.MQTT.Internals; +using Xunit; + +namespace Wolverine.MQTT.Tests; + +public class broker_role_tests +{ + [Fact] + public void mqtt_topic_broker_role_is_topic() + { + new MqttTopic("orders/created", new MqttTransport(), EndpointRole.Application) + .BrokerRole.ShouldBe("topic"); + } +} diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs index 1702038b8..aad68582b 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTopic.cs @@ -38,6 +38,7 @@ public MqttTopic(string topicName, MqttTransport parent, EndpointRole role) : ba } EndpointName = topicName; + BrokerRole = "topic"; EnvelopeMapper = new MqttEnvelopeMapper(this); Mode = EndpointMode.BufferedInMemory; diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/broker_role_tests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/broker_role_tests.cs new file mode 100644 index 000000000..3261ce2da --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/broker_role_tests.cs @@ -0,0 +1,31 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Nats.Internal; +using Xunit; + +namespace Wolverine.Nats.Tests; + +// Locks down GH-2601 for NATS. Unlike other transports, NatsEndpoint +// computes BrokerRole at runtime: Core NATS surfaces a "subject" while +// JetStream surfaces a "stream". Toggling the UseJetStream flag must +// flip the reported role without reconstruction. +public class broker_role_tests +{ + [Fact] + public void core_nats_endpoint_broker_role_is_subject() + { + new NatsEndpoint("orders.created", new NatsTransport(), EndpointRole.Application) + .BrokerRole.ShouldBe("subject"); + } + + [Fact] + public void jetstream_endpoint_broker_role_is_stream() + { + var endpoint = new NatsEndpoint("orders.created", new NatsTransport(), EndpointRole.Application) + { + UseJetStream = true + }; + + endpoint.BrokerRole.ShouldBe("stream"); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs index abbb9b177..8261697e1 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs @@ -27,6 +27,14 @@ public NatsEndpoint(string subject, NatsTransport transport, EndpointRole role) Mode = EndpointMode.BufferedInMemory; } + /// + /// NATS plays both roles: Core NATS surfaces a "subject", while JetStream surfaces + /// a "stream". The choice is configuration-driven () so + /// it can change after construction — compute it on access rather than fixing it + /// in the constructor. See GH-2601. + /// + public override string BrokerRole => UseJetStream ? "stream" : "subject"; + public string Subject { get; } [IgnoreDescription] public object? NatsSerializer { get; set; } diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/broker_role_tests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/broker_role_tests.cs new file mode 100644 index 000000000..8e0bfb4a6 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/broker_role_tests.cs @@ -0,0 +1,15 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Pulsar.Tests; + +public class broker_role_tests +{ + [Fact] + public void pulsar_endpoint_broker_role_is_topic() + { + var transport = new PulsarTransport(); + new PulsarEndpoint(new Uri("pulsar://persistent/public/default/sample"), transport) + .BrokerRole.ShouldBe("topic"); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs index ba9bfec81..27b6eb628 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarEndpoint.cs @@ -20,6 +20,7 @@ public PulsarEndpoint(Uri uri, PulsarTransport parent) : base(uri, EndpointRole. { _parent = parent; Parse(uri); + BrokerRole = "topic"; } protected override PulsarEnvelopeMapper buildMapper(IWolverineRuntime runtime) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/broker_role_tests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/broker_role_tests.cs new file mode 100644 index 000000000..86ed0512b --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/broker_role_tests.cs @@ -0,0 +1,39 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.RabbitMQ.Internal; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +// Locks down GH-2601: every concrete RabbitMQ endpoint type reports the +// expected BrokerRole the CritterWatch UI expects. +public class broker_role_tests +{ + [Fact] + public void rabbitmq_queue_broker_role_is_queue() + { + new RabbitMqQueue("q", new RabbitMqTransport()).BrokerRole.ShouldBe("queue"); + } + + [Fact] + public void rabbitmq_exchange_broker_role_is_exchange() + { + new RabbitMqExchange("ex", new RabbitMqTransport()).BrokerRole.ShouldBe("exchange"); + } + + [Fact] + public void rabbitmq_topic_endpoint_broker_role_is_topic() + { + var transport = new RabbitMqTransport(); + var exchange = new RabbitMqExchange("ex", transport); + new RabbitMqTopicEndpoint("t", exchange, transport).BrokerRole.ShouldBe("topic"); + } + + [Fact] + public void rabbitmq_routing_broker_role_is_exchange() + { + var transport = new RabbitMqTransport(); + var exchange = new RabbitMqExchange("ex", transport); + new RabbitMqRouting(exchange, "rk", transport).BrokerRole.ShouldBe("exchange"); + } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs index bc699b053..b6e727578 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs @@ -25,6 +25,7 @@ internal RabbitMqExchange(string name, RabbitMqTransport parent) ExchangeName = name; EndpointName = name; + BrokerRole = "exchange"; Topics = new(topic => new RabbitMqTopicEndpoint(topic, this, _parent)); Routings = new LightweightCache(key => new RabbitMqRouting(this, key, _parent)); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs index d4bcd1056..7bf9ee3da 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs @@ -41,7 +41,8 @@ internal RabbitMqQueue(string queueName, RabbitMqTransport parent, EndpointRole _parent = parent; QueueName = EndpointName = queueName; Mode = EndpointMode.Inline; - + BrokerRole = "queue"; + if (Role == EndpointRole.Application && QueueName != _parent.DeadLetterQueue.QueueName) { DeadLetterQueue = _parent.DeadLetterQueue.Clone(); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqRouting.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqRouting.cs index 829f99814..66fe8f6b0 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqRouting.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqRouting.cs @@ -21,6 +21,7 @@ public RabbitMqRouting(RabbitMqExchange exchange, string routingKey, RabbitMqTra _routingKey = routingKey; ExchangeName = _exchange.ExchangeName; + BrokerRole = "exchange"; } public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTopicEndpoint.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTopicEndpoint.cs index afbf602fe..8ce8e9cca 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTopicEndpoint.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTopicEndpoint.cs @@ -15,6 +15,7 @@ public RabbitMqTopicEndpoint(string topicName, RabbitMqExchange exchange, Rabbit Exchange = exchange; ExchangeName = Exchange.Name; + BrokerRole = "topic"; } public RabbitMqExchange Exchange { get; } diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/broker_role_tests.cs b/src/Transports/Redis/Wolverine.Redis.Tests/broker_role_tests.cs new file mode 100644 index 000000000..1ed70336c --- /dev/null +++ b/src/Transports/Redis/Wolverine.Redis.Tests/broker_role_tests.cs @@ -0,0 +1,17 @@ +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Redis.Internal; +using Xunit; + +namespace Wolverine.Redis.Tests; + +public class broker_role_tests +{ + [Fact] + public void redis_stream_endpoint_broker_role_is_stream() + { + var transport = new RedisTransport(); + new RedisStreamEndpoint(new Uri("redis://stream/0/sample"), transport, EndpointRole.Application) + .BrokerRole.ShouldBe("stream"); + } +} diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs index 59be95c26..220dfef80 100644 --- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamEndpoint.cs @@ -24,7 +24,8 @@ internal RedisStreamEndpoint(Uri uri, RedisTransport transport, EndpointRole rol DatabaseId = databaseId; ConsumerGroup = ParseConsumerGroup(uri); EndpointName = StreamKey; - + BrokerRole = "stream"; + // Redis Streams work well in buffered mode by default Mode = EndpointMode.BufferedInMemory; } diff --git a/src/Transports/SignalR/Wolverine.SignalR.Tests/broker_role_tests.cs b/src/Transports/SignalR/Wolverine.SignalR.Tests/broker_role_tests.cs new file mode 100644 index 000000000..aa7364c2c --- /dev/null +++ b/src/Transports/SignalR/Wolverine.SignalR.Tests/broker_role_tests.cs @@ -0,0 +1,23 @@ +using Shouldly; +using Wolverine.SignalR.Client; +using Wolverine.SignalR.Internals; +using Xunit; + +namespace Wolverine.SignalR.Tests; + +public class broker_role_tests +{ + [Fact] + public void signalr_transport_broker_role_is_hub() + { + new SignalRTransport().BrokerRole.ShouldBe("hub"); + } + + [Fact] + public void signalr_client_endpoint_broker_role_is_hub() + { + var clientTransport = new SignalRClientTransport(); + new SignalRClientEndpoint(new Uri("https://localhost:5000/hub"), clientTransport) + .BrokerRole.ShouldBe("hub"); + } +} diff --git a/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs b/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs index e9c690853..bb5f5b203 100644 --- a/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs +++ b/src/Transports/SignalR/Wolverine.SignalR/Client/SignalRClientEndpoint.cs @@ -31,6 +31,7 @@ public SignalRClientEndpoint(Uri uri, SignalRClientTransport parent) : base(Tran SignalRUri = uri; IsListener = true; + BrokerRole = "hub"; Mode = EndpointMode.Inline; diff --git a/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs b/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs index 721413c23..1fdad34e4 100644 --- a/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs +++ b/src/Transports/SignalR/Wolverine.SignalR/Internals/SignalRTransport.cs @@ -24,6 +24,7 @@ public class SignalRTransport : Endpoint, ITransport, IListener, ISender public SignalRTransport() : base($"{ProtocolName}://wolverine".ToUri(), EndpointRole.Application) { IsListener = true; + BrokerRole = "hub"; #region sample_signalr_default_json_configuration JsonOptions = new(JsonSerializerOptions.Web) { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; diff --git a/src/Wolverine.Grpc.Tests/broker_role_tests.cs b/src/Wolverine.Grpc.Tests/broker_role_tests.cs new file mode 100644 index 000000000..50977748d --- /dev/null +++ b/src/Wolverine.Grpc.Tests/broker_role_tests.cs @@ -0,0 +1,13 @@ +using Shouldly; +using Xunit; + +namespace Wolverine.Grpc.Tests; + +public class broker_role_tests +{ + [Fact] + public void grpc_endpoint_broker_role_is_grpc() + { + new GrpcEndpoint(new Uri("grpc://localhost:5001")).BrokerRole.ShouldBe("grpc"); + } +} diff --git a/src/Wolverine.Grpc/GrpcEndpoint.cs b/src/Wolverine.Grpc/GrpcEndpoint.cs index 0aa8faa91..497032fbe 100644 --- a/src/Wolverine.Grpc/GrpcEndpoint.cs +++ b/src/Wolverine.Grpc/GrpcEndpoint.cs @@ -13,6 +13,7 @@ public GrpcEndpoint(Uri uri) : base(uri, EndpointRole.Application) { Host = uri.Host; Port = uri.IsDefaultPort ? 5000 : uri.Port; + BrokerRole = "grpc"; } public string Host { get; } diff --git a/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs b/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs index e5a9f6a62..a42e07456 100644 --- a/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs +++ b/src/Wolverine/Configuration/Capabilities/EndpointDescriptor.cs @@ -34,6 +34,8 @@ public EndpointDescriptor(Endpoint endpoint) : base(endpoint) InteropMode = ResolveInteropMode(endpoint); IsSystemEndpoint = endpoint.Uri?.ToString().Contains("wolverine.response", StringComparison.OrdinalIgnoreCase) == true || endpoint.Uri?.Scheme.Equals("local", StringComparison.OrdinalIgnoreCase) == true; + EndpointRole = endpoint.Role; + BrokerRole = endpoint.BrokerRole; } public Uri Uri { get; set; } = null!; @@ -59,6 +61,23 @@ public EndpointDescriptor(Endpoint endpoint) : base(endpoint) /// public bool IsSystemEndpoint { get; init; } + /// + /// Whether the endpoint is owned by Wolverine itself (System) or by the + /// application (Application). Lifted from so + /// CritterWatch and other UIs can filter system-owned endpoints (e.g., reply + /// queues, control queues) without having to crack the underlying URI. See GH-2601. + /// + public EndpointRole EndpointRole { get; init; } + + /// + /// Short, human-readable name of the underlying broker object kind this endpoint + /// represents — "queue", "exchange", "topic", + /// "subscription", "stream", etc. Lifted from + /// ; see that property for the full per-transport + /// mapping. See GH-2601. + /// + public string? BrokerRole { get; init; } + internal static string? ResolveInteropMode(Endpoint endpoint) { return ResolveInteropMode(endpoint.DefaultSerializer?.GetType().Name); diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 9ad873694..f0fdc108d 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -184,6 +184,17 @@ protected Endpoint(Uri uri, EndpointRole role) EndpointName = uri.ToString(); } + /// + /// Short, human-readable name of the underlying broker object kind this endpoint + /// represents — e.g. "queue", "exchange", "topic", + /// "subscription", "stream". Each transport-specific subclass sets + /// this value in its constructor; transports whose role is only knowable at + /// runtime (e.g. NatsEndpoint choosing between Core subject and + /// JetStream stream) override the property. Surfaced to CritterWatch and + /// other diagnostic UIs to drive endpoint display. See GH-2601. + /// + public virtual string BrokerRole { get; protected set; } = "endpoint"; + /// /// Controls the maximum number of messages that could be processed at one time. /// Default is the greater of Environment.ProcessorCount or 5. Setting this to 1 makes this listening endpoint diff --git a/src/Wolverine/Transports/Local/LocalQueue.cs b/src/Wolverine/Transports/Local/LocalQueue.cs index 59cc770c3..ee8f2791e 100644 --- a/src/Wolverine/Transports/Local/LocalQueue.cs +++ b/src/Wolverine/Transports/Local/LocalQueue.cs @@ -12,6 +12,7 @@ public class LocalQueue : Endpoint public LocalQueue(string name) : base($"local://{name}".ToUri(), EndpointRole.Application) { EndpointName = name.ToLowerInvariant(); + BrokerRole = "queue"; } internal List HandledMessageTypes { get; } = new(); diff --git a/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs b/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs index d4d68b881..e2abf899b 100644 --- a/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs +++ b/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs @@ -17,6 +17,7 @@ public SharedMemorySubscription(SharedMemoryTopic parent, string name, EndpointR Parent = parent; Name = name; IsListener = true; + BrokerRole = "subscription"; } public override ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver) diff --git a/src/Wolverine/Transports/SharedMemory/SharedMemoryTopic.cs b/src/Wolverine/Transports/SharedMemory/SharedMemoryTopic.cs index b8ba0e9f5..dad7e2e36 100644 --- a/src/Wolverine/Transports/SharedMemory/SharedMemoryTopic.cs +++ b/src/Wolverine/Transports/SharedMemory/SharedMemoryTopic.cs @@ -26,6 +26,8 @@ public SharedMemoryTopic(string topicName, EndpointRole role) : base(new Uri($"{ TopicSubscriptions[topicName] = new SharedMemorySubscription(this, topicName, EndpointRole.System); } + BrokerRole = "topic"; + // Placeholder ReplyUri = Uri; } diff --git a/src/Wolverine/Transports/Stub/StubEndpoint.cs b/src/Wolverine/Transports/Stub/StubEndpoint.cs index 9f60d4b4b..fbc058f75 100644 --- a/src/Wolverine/Transports/Stub/StubEndpoint.cs +++ b/src/Wolverine/Transports/Stub/StubEndpoint.cs @@ -25,6 +25,7 @@ public StubEndpoint(string queueName, StubTransport stubTransport) : base($"stub _stubTransport = stubTransport; Agent = this; EndpointName = queueName; + BrokerRole = "stub"; } public ValueTask StopAsync() diff --git a/src/Wolverine/Transports/Tcp/TcpEndpoint.cs b/src/Wolverine/Transports/Tcp/TcpEndpoint.cs index 273580075..26d2393cf 100644 --- a/src/Wolverine/Transports/Tcp/TcpEndpoint.cs +++ b/src/Wolverine/Transports/Tcp/TcpEndpoint.cs @@ -24,6 +24,7 @@ public TcpEndpoint(string hostName, int port) : base(ToUri(port, hostName), Endp // ReSharper disable once VirtualMemberCallInConstructor EndpointName = Uri.ToString(); + BrokerRole = "socket"; } public string HostName { get; }