diff --git a/Libraries/Opc.Ua.Server/NodeManager/MasterNodeManager.cs b/Libraries/Opc.Ua.Server/NodeManager/MasterNodeManager.cs
index e66398d4c..493ec5a39 100644
--- a/Libraries/Opc.Ua.Server/NodeManager/MasterNodeManager.cs
+++ b/Libraries/Opc.Ua.Server/NodeManager/MasterNodeManager.cs
@@ -414,9 +414,9 @@ await nodeManager.SessionClosingAsync(context, sessionId, deleteSubscriptions, c
///
/// Shuts down the node managers.
///
- public virtual async ValueTask ShutdownAsync()
+ public virtual async ValueTask ShutdownAsync(CancellationToken cancellationToken = default)
{
- await m_startupShutdownSemaphoreSlim.WaitAsync().ConfigureAwait(false);
+ await m_startupShutdownSemaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
@@ -427,7 +427,7 @@ public virtual async ValueTask ShutdownAsync()
foreach (IAsyncNodeManager nodeManager in m_nodeManagers)
{
- await nodeManager.DeleteAddressSpaceAsync()
+ await nodeManager.DeleteAddressSpaceAsync(cancellationToken)
.ConfigureAwait(false);
}
}
@@ -2406,20 +2406,6 @@ await nodeManager.HistoryUpdateAsync(
return (results, diagnosticInfos);
}
- ///
- /// Calls a method defined on an object.
- ///
- public virtual void Call(
- OperationContext context,
- CallMethodRequestCollection methodsToCall,
- out CallMethodResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
- {
- (results, diagnosticInfos) = CallAsync(
- context,
- methodsToCall).AsTask().GetAwaiter().GetResult();
- }
-
///
/// Calls a method defined on an object.
///
@@ -2537,18 +2523,6 @@ await nodeManager.CallAsync(
return (results, diagnosticInfos);
}
- ///
- /// Calls a method defined on an object.
- ///
- public virtual void ConditionRefresh(
- OperationContext context,
- IList monitoredItems)
- {
- ConditionRefreshAsync(
- context,
- monitoredItems).AsTask().GetAwaiter().GetResult();
- }
-
///
/// Handles condition refresh request.
///
@@ -2880,22 +2854,6 @@ await manager.SubscribeToAllEventsAsync(
}
}
- ///
- /// Restore a set of monitored items after a Server Restart.
- ///
- /// is null.
- ///
- public virtual void RestoreMonitoredItems(
- IList itemsToRestore,
- IList monitoredItems,
- IUserIdentity savedOwnerIdentity)
- {
- RestoreMonitoredItemsAsync(
- itemsToRestore,
- monitoredItems,
- savedOwnerIdentity).AsTask().GetAwaiter().GetResult();
- }
-
///
/// Restore a set of monitored items after a Server Restart.
///
@@ -3448,23 +3406,6 @@ await nodeManager.SubscribeToAllEventsAsync(
}
}
- ///
- /// Changes the monitoring mode for a set of items.
- ///
- /// is null.
- public virtual void SetMonitoringMode(
- OperationContext context,
- MonitoringMode monitoringMode,
- IList itemsToModify,
- IList errors)
- {
- SetMonitoringModeAsync(
- context,
- monitoringMode,
- itemsToModify,
- errors).AsTask().GetAwaiter().GetResult();
- }
-
///
/// Changes the monitoring mode for a set of items.
///
diff --git a/Libraries/Opc.Ua.Server/Server/StandardServer.cs b/Libraries/Opc.Ua.Server/Server/StandardServer.cs
index 27508e092..f0c3b4cf0 100644
--- a/Libraries/Opc.Ua.Server/Server/StandardServer.cs
+++ b/Libraries/Opc.Ua.Server/Server/StandardServer.cs
@@ -1001,25 +1001,27 @@ await ServerInternal.CloseSessionAsync(context, context.Session.Id, deleteSubscr
/// The secure channel context.
/// The request header.
/// The request handle assigned to the request.
- /// The number of cancelled requests.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader Cancel(
+ public override Task CancelAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint requestHandle,
- out uint cancelCount)
+ CancellationToken ct)
{
- cancelCount = 0;
-
OperationContext context = ValidateRequest(secureChannelContext, requestHeader, RequestType.Cancel);
try
{
- m_serverInternal.RequestManager.CancelRequests(requestHandle, out cancelCount);
+ m_serverInternal.RequestManager.CancelRequests(requestHandle, out uint cancelCount);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new CancelResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable),
+ CancelCount = cancelCount
+ });
}
catch (ServiceResultException e)
{
@@ -1151,18 +1153,16 @@ await m_serverInternal.NodeManager.BrowseNextAsync(
/// The secure channel context.
/// The request header.
/// The list of NodeIds to register.
- /// The list of NodeIds identifying the registered nodes.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader RegisterNodes(
+ public override Task RegisterNodesAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
NodeIdCollection nodesToRegister,
- out NodeIdCollection registeredNodeIds)
+ CancellationToken ct)
{
- registeredNodeIds = null;
-
OperationContext context = ValidateRequest(secureChannelContext, requestHeader, RequestType.RegisterNodes);
try
@@ -1170,9 +1170,13 @@ public override ResponseHeader RegisterNodes(
ValidateOperationLimits(nodesToRegister, OperationLimits.MaxNodesPerRegisterNodes);
m_serverInternal.NodeManager
- .RegisterNodes(context, nodesToRegister, out registeredNodeIds);
+ .RegisterNodes(context, nodesToRegister, out NodeIdCollection registeredNodeIds);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new RegisterNodesResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable),
+ RegisteredNodeIds = registeredNodeIds
+ });
}
catch (ServiceResultException e)
{
@@ -1200,13 +1204,15 @@ public override ResponseHeader RegisterNodes(
/// The secure channel context.
/// The request header.
/// The list of NodeIds to unregister
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader UnregisterNodes(
+ public override Task UnregisterNodesAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
- NodeIdCollection nodesToUnregister)
+ NodeIdCollection nodesToUnregister,
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(secureChannelContext, requestHeader, RequestType.UnregisterNodes);
@@ -1218,7 +1224,10 @@ public override ResponseHeader UnregisterNodes(
m_serverInternal.NodeManager.UnregisterNodes(context, nodesToUnregister);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new UnregisterNodesResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -1529,14 +1538,11 @@ public override async Task HistoryUpdateAsync(
/// The maximum number of notifications that the Client wishes to receive in a single Publish response.
/// If set to true publishing is enabled for the Subscription.
/// The relative priority of the Subscription.
- /// The Server-assigned identifier for the Subscription.
- /// The actual publishing interval that the Server will use.
- /// The revised lifetime count.
- /// The revised max keep alive count.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader CreateSubscription(
+ public override Task CreateSubscriptionAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
double requestedPublishingInterval,
@@ -1545,10 +1551,7 @@ public override ResponseHeader CreateSubscription(
uint maxNotificationsPerPublish,
bool publishingEnabled,
byte priority,
- out uint subscriptionId,
- out double revisedPublishingInterval,
- out uint revisedLifetimeCount,
- out uint revisedMaxKeepAliveCount)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -1565,12 +1568,19 @@ public override ResponseHeader CreateSubscription(
maxNotificationsPerPublish,
publishingEnabled,
priority,
- out subscriptionId,
- out revisedPublishingInterval,
- out revisedLifetimeCount,
- out revisedMaxKeepAliveCount);
+ out uint subscriptionId,
+ out double revisedPublishingInterval,
+ out uint revisedLifetimeCount,
+ out uint revisedMaxKeepAliveCount);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new CreateSubscriptionResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable),
+ SubscriptionId = subscriptionId,
+ RevisedPublishingInterval = revisedPublishingInterval,
+ RevisedLifetimeCount = revisedLifetimeCount,
+ RevisedMaxKeepAliveCount = revisedMaxKeepAliveCount
+ });
}
catch (ServiceResultException e)
{
@@ -1599,19 +1609,14 @@ public override ResponseHeader CreateSubscription(
/// The request header.
/// The list of Subscriptions to transfer.
/// If the initial values should be sent.
- /// The list of result StatusCodes for the Subscriptions to transfer.
- /// The diagnostic information for the results.
- public override ResponseHeader TransferSubscriptions(
+ /// The cancellation token.
+ public override Task TransferSubscriptionsAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
UInt32Collection subscriptionIds,
bool sendInitialValues,
- out TransferResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
- results = null;
- diagnosticInfos = null;
-
OperationContext context = ValidateRequest(
secureChannelContext,
requestHeader,
@@ -1625,10 +1630,15 @@ public override ResponseHeader TransferSubscriptions(
context,
subscriptionIds,
sendInitialValues,
- out results,
- out diagnosticInfos);
+ out TransferResultCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new TransferSubscriptionsResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable),
+ Results = results,
+ DiagnosticInfos = diagnosticInfos
+ });
}
catch (ServiceResultException e)
{
@@ -1656,17 +1666,15 @@ public override ResponseHeader TransferSubscriptions(
/// The secure channel context.
/// The request header.
/// The list of Subscriptions to delete.
- /// The list of result StatusCodes for the Subscriptions to delete.
- /// The diagnostic information for the results.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader DeleteSubscriptions(
+ public override Task DeleteSubscriptionsAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
UInt32Collection subscriptionIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -1680,10 +1688,15 @@ public override ResponseHeader DeleteSubscriptions(
ServerInternal.SubscriptionManager.DeleteSubscriptions(
context,
subscriptionIds,
- out results,
- out diagnosticInfos);
+ out StatusCodeCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new DeleteSubscriptionsResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable),
+ Results = results,
+ DiagnosticInfos = diagnosticInfos
+ });
}
catch (ServiceResultException e)
{
@@ -1789,27 +1802,31 @@ public override async Task PublishAsync(
/// The request header.
/// The subscription id.
/// The sequence number of a specific NotificationMessage to be republished.
- /// The requested NotificationMessage.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader Republish(
+ public override Task RepublishAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
uint retransmitSequenceNumber,
- out NotificationMessage notificationMessage)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(secureChannelContext, requestHeader, RequestType.Republish);
try
{
- notificationMessage = ServerInternal.SubscriptionManager.Republish(
+ NotificationMessage notificationMessage = ServerInternal.SubscriptionManager.Republish(
context,
subscriptionId,
retransmitSequenceNumber);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new RepublishResponse
+ {
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable),
+ NotificationMessage = notificationMessage
+ });
}
catch (ServiceResultException e)
{
@@ -1842,13 +1859,11 @@ public override ResponseHeader Republish(
/// The requested max keep alive count.
/// The maximum number of notifications that the Client wishes to receive in a single Publish response.
/// The relative priority of the Subscription.
- /// The revised publishing interval.
- /// The revised lifetime count.
- /// The revised max keep alive count.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader ModifySubscription(
+ public override Task ModifySubscriptionAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
@@ -1857,9 +1872,7 @@ public override ResponseHeader ModifySubscription(
uint requestedMaxKeepAliveCount,
uint maxNotificationsPerPublish,
byte priority,
- out double revisedPublishingInterval,
- out uint revisedLifetimeCount,
- out uint revisedMaxKeepAliveCount)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -1876,11 +1889,17 @@ public override ResponseHeader ModifySubscription(
requestedMaxKeepAliveCount,
maxNotificationsPerPublish,
priority,
- out revisedPublishingInterval,
- out revisedLifetimeCount,
- out revisedMaxKeepAliveCount);
+ out double revisedPublishingInterval,
+ out uint revisedLifetimeCount,
+ out uint revisedMaxKeepAliveCount);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new ModifySubscriptionResponse
+ {
+ RevisedPublishingInterval = revisedPublishingInterval,
+ RevisedLifetimeCount = revisedLifetimeCount,
+ RevisedMaxKeepAliveCount = revisedMaxKeepAliveCount,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -1909,18 +1928,16 @@ public override ResponseHeader ModifySubscription(
/// The request header.
/// If set to true publishing of NotificationMessages is enabled for the Subscription.
/// The list of subscription ids.
- /// The list of StatusCodes for the Subscriptions to enable/disable.
- /// The diagnostic information for the results.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader SetPublishingMode(
+ public override Task SetPublishingModeAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
bool publishingEnabled,
UInt32Collection subscriptionIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -1935,10 +1952,15 @@ public override ResponseHeader SetPublishingMode(
context,
publishingEnabled,
subscriptionIds,
- out results,
- out diagnosticInfos);
+ out StatusCodeCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new SetPublishingModeResponse
+ {
+ Results = results,
+ DiagnosticInfos = diagnosticInfos,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -1969,30 +1991,19 @@ public override ResponseHeader SetPublishingMode(
/// The id for the MonitoredItem used as the triggering item.
/// The list of ids of the items to report that are to be added as triggering links.
/// The list of ids of the items to report for the triggering links to be deleted.
- /// The list of StatusCodes for the items to add.
- /// The list of diagnostic information for the links to add.
- /// The list of StatusCodes for the items to delete.
- /// The list of diagnostic information for the links to delete.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader SetTriggering(
+ public override Task SetTriggeringAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
uint triggeringItemId,
UInt32Collection linksToAdd,
UInt32Collection linksToRemove,
- out StatusCodeCollection addResults,
- out DiagnosticInfoCollection addDiagnosticInfos,
- out StatusCodeCollection removeResults,
- out DiagnosticInfoCollection removeDiagnosticInfos)
+ CancellationToken ct)
{
- addResults = null;
- addDiagnosticInfos = null;
- removeResults = null;
- removeDiagnosticInfos = null;
-
OperationContext context = ValidateRequest(secureChannelContext, requestHeader, RequestType.SetTriggering);
try
@@ -2016,12 +2027,19 @@ public override ResponseHeader SetTriggering(
triggeringItemId,
linksToAdd,
linksToRemove,
- out addResults,
- out addDiagnosticInfos,
- out removeResults,
- out removeDiagnosticInfos);
+ out StatusCodeCollection addResults,
+ out DiagnosticInfoCollection addDiagnosticInfos,
+ out StatusCodeCollection removeResults,
+ out DiagnosticInfoCollection removeDiagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new SetTriggeringResponse
+ {
+ AddResults = addResults,
+ AddDiagnosticInfos = addDiagnosticInfos,
+ RemoveResults = removeResults,
+ RemoveDiagnosticInfos = removeDiagnosticInfos,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -2051,19 +2069,17 @@ public override ResponseHeader SetTriggering(
/// The subscription id that will report notifications.
/// The type of timestamps to be returned for the MonitoredItems.
/// The list of MonitoredItems to be created and assigned to the specified subscription
- /// The list of results for the MonitoredItems to create.
- /// The diagnostic information for the results.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader CreateMonitoredItems(
+ public override Task CreateMonitoredItemsAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
TimestampsToReturn timestampsToReturn,
MonitoredItemCreateRequestCollection itemsToCreate,
- out MonitoredItemCreateResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -2079,10 +2095,15 @@ public override ResponseHeader CreateMonitoredItems(
subscriptionId,
timestampsToReturn,
itemsToCreate,
- out results,
- out diagnosticInfos);
+ out MonitoredItemCreateResultCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new CreateMonitoredItemsResponse
+ {
+ Results = results,
+ DiagnosticInfos = diagnosticInfos,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -2112,19 +2133,17 @@ public override ResponseHeader CreateMonitoredItems(
/// The subscription id.
/// The type of timestamps to be returned for the MonitoredItems.
/// The list of MonitoredItems to modify.
- /// The list of results for the MonitoredItems to modify.
- /// The diagnostic information for the results.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader ModifyMonitoredItems(
+ public override Task ModifyMonitoredItemsAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
TimestampsToReturn timestampsToReturn,
MonitoredItemModifyRequestCollection itemsToModify,
- out MonitoredItemModifyResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -2140,10 +2159,15 @@ public override ResponseHeader ModifyMonitoredItems(
subscriptionId,
timestampsToReturn,
itemsToModify,
- out results,
- out diagnosticInfos);
+ out MonitoredItemModifyResultCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new ModifyMonitoredItemsResponse
+ {
+ Results = results,
+ DiagnosticInfos = diagnosticInfos,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -2172,18 +2196,16 @@ public override ResponseHeader ModifyMonitoredItems(
/// The request header.
/// The subscription id.
/// The list of MonitoredItems to delete.
- /// The list of results for the MonitoredItems to delete.
- /// The diagnostic information for the results.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a object
///
- public override ResponseHeader DeleteMonitoredItems(
+ public override Task DeleteMonitoredItemsAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
UInt32Collection monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -2198,10 +2220,15 @@ public override ResponseHeader DeleteMonitoredItems(
context,
subscriptionId,
monitoredItemIds,
- out results,
- out diagnosticInfos);
+ out StatusCodeCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
- return CreateResponse(requestHeader, context.StringTable);
+ return Task.FromResult(new DeleteMonitoredItemsResponse
+ {
+ Results = results,
+ DiagnosticInfos = diagnosticInfos,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ });
}
catch (ServiceResultException e)
{
@@ -2231,19 +2258,17 @@ public override ResponseHeader DeleteMonitoredItems(
/// The subscription id.
/// The monitoring mode to be set for the MonitoredItems.
/// The list of MonitoredItems to modify.
- /// The list of results for the MonitoredItems to modify.
- /// The diagnostic information for the results.
+ /// The cancellation token.
///
- /// Returns a object
+ /// Returns a
///
- public override ResponseHeader SetMonitoringMode(
+ public override async Task SetMonitoringModeAsync(
SecureChannelContext secureChannelContext,
RequestHeader requestHeader,
uint subscriptionId,
MonitoringMode monitoringMode,
UInt32Collection monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken ct)
{
OperationContext context = ValidateRequest(
secureChannelContext,
@@ -2254,15 +2279,21 @@ public override ResponseHeader SetMonitoringMode(
{
ValidateOperationLimits(monitoredItemIds, OperationLimits.MaxMonitoredItemsPerCall);
- ServerInternal.SubscriptionManager.SetMonitoringMode(
+ (StatusCodeCollection results, DiagnosticInfoCollection diagnosticInfos) =
+ await ServerInternal.SubscriptionManager.SetMonitoringModeAsync(
context,
subscriptionId,
monitoringMode,
monitoredItemIds,
- out results,
- out diagnosticInfos);
+ ct)
+ .ConfigureAwait(false);
- return CreateResponse(requestHeader, context.StringTable);
+ return new SetMonitoringModeResponse
+ {
+ Results = results,
+ DiagnosticInfos = diagnosticInfos,
+ ResponseHeader = CreateResponse(requestHeader, context.StringTable)
+ };
}
catch (ServiceResultException e)
{
@@ -2300,7 +2331,7 @@ public override async Task CallAsync(
CallMethodRequestCollection methodsToCall,
CancellationToken ct)
{
- OperationContext context = ValidateRequest(secureChannelContext,requestHeader, RequestType.Call);
+ OperationContext context = ValidateRequest(secureChannelContext, requestHeader, RequestType.Call);
try
{
@@ -3226,7 +3257,8 @@ IMonitoredItemQueueFactory monitoredItemQueueFactory
ISubscriptionManager subscriptionManager = CreateSubscriptionManager(
m_serverInternal,
configuration);
- subscriptionManager.Startup();
+ await subscriptionManager.StartupAsync(cancellationToken)
+ .ConfigureAwait(false);
// add the session manager to the datastore.
m_serverInternal.SetSessionManager(sessionManager, subscriptionManager);
@@ -3378,9 +3410,9 @@ protected override async ValueTask OnServerStoppingAsync(CancellationToken cance
{
m_serverInternal.SessionManager.SessionChannelKeepAlive
-= SessionChannelKeepAliveEvent;
- m_serverInternal.SubscriptionManager.Shutdown();
+ await m_serverInternal.SubscriptionManager.ShutdownAsync(cancellationToken).ConfigureAwait(false);
m_serverInternal.SessionManager.Shutdown();
- await m_serverInternal.NodeManager.ShutdownAsync().ConfigureAwait(false);
+ await m_serverInternal.NodeManager.ShutdownAsync(cancellationToken).ConfigureAwait(false);
}
}
finally
diff --git a/Libraries/Opc.Ua.Server/Subscription/ISubscription.cs b/Libraries/Opc.Ua.Server/Subscription/ISubscription.cs
index 8383aa340..072bdfe85 100644
--- a/Libraries/Opc.Ua.Server/Subscription/ISubscription.cs
+++ b/Libraries/Opc.Ua.Server/Subscription/ISubscription.cs
@@ -28,6 +28,8 @@
* ======================================================================*/
using System;
+using System.Threading;
+using System.Threading.Tasks;
namespace Opc.Ua.Server
{
@@ -120,12 +122,12 @@ public interface ISubscription : IDisposable
///
/// Refreshes the conditions.
///
- void ConditionRefresh2(uint monitoredItemId);
+ ValueTask ConditionRefresh2Async(uint monitoredItemId, CancellationToken cancellationToken = default);
///
/// Refreshes the conditions.
///
- void ConditionRefresh();
+ ValueTask ConditionRefreshAsync(CancellationToken cancellationToken = default);
///
/// Updates the publishing parameters for the subscription.
@@ -141,12 +143,12 @@ void Modify(
///
/// Changes the monitoring mode for a set of items.
///
- void SetMonitoringMode(
+
+ ValueTask<(StatusCodeCollection results, DiagnosticInfoCollection diagnosticInfos)> SetMonitoringModeAsync(
OperationContext context,
MonitoringMode monitoringMode,
UInt32Collection monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
+ CancellationToken cancellationToken = default);
///
/// Enables/disables publishing for the subscription.
diff --git a/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs b/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs
index 4071992e9..a42d402b2 100644
--- a/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs
+++ b/Libraries/Opc.Ua.Server/Subscription/ISubscriptionManager.cs
@@ -86,22 +86,22 @@ void CreateSubscription(
///
/// Starts up the manager makes it ready to create subscriptions.
///
- void Startup();
+ ValueTask StartupAsync(CancellationToken cancellationToken = default);
///
/// Closes all subscriptions and rejects any new requests.
///
- void Shutdown();
+ ValueTask ShutdownAsync(CancellationToken cancellationToken = default);
///
/// Stores durable subscriptions to be able to restore them after a restart
///
- void StoreSubscriptions();
+ ValueTask StoreSubscriptionsAsync(CancellationToken cancellationToken = default);
///
/// Restore durable subscriptions after a server restart
///
- void RestoreSubscriptions();
+ ValueTask RestoreSubscriptionsAsync(CancellationToken cancellationToken = default);
///
/// Deletes group of subscriptions.
@@ -212,13 +212,12 @@ void DeleteMonitoredItems(
///
/// Changes the monitoring mode for a set of items.
///
- void SetMonitoringMode(
+ ValueTask<(StatusCodeCollection results, DiagnosticInfoCollection diagnosticInfos)> SetMonitoringModeAsync(
OperationContext context,
uint subscriptionId,
MonitoringMode monitoringMode,
UInt32Collection monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
+ CancellationToken cancellationToken = default);
///
/// Signals that a session is closing.
diff --git a/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs b/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs
index b5e78712f..c88aa2750 100644
--- a/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs
+++ b/Libraries/Opc.Ua.Server/Subscription/SessionPublishQueue.cs
@@ -244,6 +244,10 @@ public void RemoveQueuedRequests()
///
/// Try to publish a custom status message
/// using a queued publish request.
+ /// Returns true if a queued request was found and processed.
+ /// Returns the found publish request immediately to the caller.
+ /// If status code is good, the caller is expected to publish any queued status messages.
+ /// If status code is bad a ServiceResultException is thrown to the caller.
///
public bool TryPublishCustomStatus(StatusCode statusCode)
{
@@ -259,7 +263,17 @@ public bool TryPublishCustomStatus(StatusCode statusCode)
continue;
}
- request.Tcs.TrySetException(new ServiceResultException(statusCode));
+ // for good status codes return to caller (SubscriptionManager) with null subscription
+ // to publish queued StatusMessages from there
+ if (ServiceResult.IsGood(statusCode))
+ {
+ request.Tcs.TrySetResult(null);
+ }
+ // throw a ServiceResultException for bad status codes
+ else
+ {
+ request.Tcs.TrySetException(new ServiceResultException(statusCode));
+ }
request.Dispose();
return true;
}
diff --git a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs
index 12018f98c..bd09eb685 100644
--- a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs
+++ b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs
@@ -32,6 +32,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
+using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace Opc.Ua.Server
@@ -131,10 +132,26 @@ public Subscription(
TraceState(LogLevel.Information, TraceStateId.Config, "CREATED");
}
+ ///
+ /// Restore a subscription and its monitored items after a restart from a template
+ ///
+ public static async ValueTask RestoreAsync(
+ IServerInternal server,
+ IStoredSubscription storedSubscription,
+ CancellationToken cancellationToken = default)
+ {
+ var subscription = new Subscription(server, storedSubscription);
+
+ await subscription.RestoreMonitoredItemsAsync(storedSubscription.MonitoredItems, cancellationToken)
+ .ConfigureAwait(false);
+
+ return subscription;
+ }
+
///
/// Initialize subscription after a restart from a template
///
- public Subscription(IServerInternal server, IStoredSubscription storedSubscription)
+ protected Subscription(IServerInternal server, IStoredSubscription storedSubscription)
{
if (server.IsRunning)
{
@@ -212,8 +229,6 @@ public Subscription(IServerInternal server, IStoredSubscription storedSubscripti
OnUpdateDiagnostics);
TraceState(LogLevel.Information, TraceStateId.Config, "RESTORED");
-
- RestoreMonitoredItems(storedSubscription.MonitoredItems);
}
///
@@ -2088,12 +2103,11 @@ private void DeleteMonitoredItems(
/// Changes the monitoring mode for a set of items.
///
/// is null.
- public void SetMonitoringMode(
+ public async ValueTask<(StatusCodeCollection results, DiagnosticInfoCollection diagnosticInfos)> SetMonitoringModeAsync(
OperationContext context,
MonitoringMode monitoringMode,
UInt32Collection monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken cancellationToken = default)
{
if (context == null)
{
@@ -2108,8 +2122,8 @@ public void SetMonitoringMode(
int count = monitoredItemIds.Count;
bool diagnosticsExist = false;
- results = new StatusCodeCollection(count);
- diagnosticInfos = null;
+ var results = new StatusCodeCollection(count);
+ DiagnosticInfoCollection diagnosticInfos = null;
if ((context.DiagnosticsMask & DiagnosticsMasks.OperationAll) != 0)
{
@@ -2173,8 +2187,9 @@ public void SetMonitoringMode(
// update items.
if (validItems)
{
- m_server.NodeManager
- .SetMonitoringMode(context, monitoringMode, monitoredItems, errors);
+ await m_server.NodeManager
+ .SetMonitoringModeAsync(context, monitoringMode, monitoredItems, errors, cancellationToken)
+ .ConfigureAwait(false);
}
lock (m_lock)
@@ -2234,6 +2249,8 @@ public void SetMonitoringMode(
TraceState(LogLevel.Information, TraceStateId.Monitor, "SAMPLING");
}
}
+
+ return (results, diagnosticInfos);
}
///
@@ -2275,7 +2292,7 @@ public void ValidateConditionRefresh2(OperationContext context, uint monitoredIt
///
/// Refreshes the conditions.
///
- public void ConditionRefresh()
+ public async ValueTask ConditionRefreshAsync(CancellationToken cancellationToken = default)
{
var monitoredItems = new List();
@@ -2299,14 +2316,15 @@ public void ConditionRefresh()
}
}
- ConditionRefresh(monitoredItems, 0);
+ await ConditionRefreshAsync(monitoredItems, 0, cancellationToken)
+ .ConfigureAwait(false);
}
///
/// Refreshes the conditions.
///
///
- public void ConditionRefresh2(uint monitoredItemId)
+ public async ValueTask ConditionRefresh2Async(uint monitoredItemId, CancellationToken cancellationToken = default)
{
var monitoredItems = new List();
@@ -2338,15 +2356,17 @@ public void ConditionRefresh2(uint monitoredItemId)
}
}
- ConditionRefresh(monitoredItems, monitoredItemId);
+ await ConditionRefreshAsync(monitoredItems, monitoredItemId, cancellationToken)
+ .ConfigureAwait(false);
}
///
/// Refreshes the conditions. Works for both ConditionRefresh and ConditionRefresh2
///
- private void ConditionRefresh(
+ private async ValueTask ConditionRefreshAsync(
List monitoredItems,
- uint monitoredItemId)
+ uint monitoredItemId,
+ CancellationToken cancellationToken = default)
{
ServerSystemContext systemContext = m_server.DefaultSystemContext.Copy(Session);
@@ -2406,7 +2426,8 @@ private void ConditionRefresh(
m_refreshInProgress = true;
var operationContext = new OperationContext(Session, DiagnosticsMasks.None);
- m_server.NodeManager.ConditionRefresh(operationContext, monitoredItems);
+ await m_server.NodeManager.ConditionRefreshAsync(operationContext, monitoredItems, cancellationToken)
+ .ConfigureAwait(false);
}
finally
{
@@ -2542,8 +2563,9 @@ public IStoredSubscription ToStorableSubscription()
///
/// Restore MonitoredItems after a Server restart
///
- protected virtual void RestoreMonitoredItems(
- IEnumerable storedMonitoredItems)
+ protected virtual async ValueTask RestoreMonitoredItemsAsync(
+ IEnumerable storedMonitoredItems,
+ CancellationToken cancellationToken = default)
{
int count = storedMonitoredItems.Count();
@@ -2555,10 +2577,11 @@ protected virtual void RestoreMonitoredItems(
monitoredItems.Add(null);
}
- m_server.NodeManager.RestoreMonitoredItems(
+ await m_server.NodeManager.RestoreMonitoredItemsAsync(
[.. storedMonitoredItems],
monitoredItems,
- m_savedOwnerIdentity);
+ m_savedOwnerIdentity,
+ cancellationToken).ConfigureAwait(false);
lock (m_lock)
{
diff --git a/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs b/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs
index fa6654e1e..5090ed144 100644
--- a/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs
+++ b/Libraries/Opc.Ua.Server/Subscription/SubscriptionManager.cs
@@ -109,7 +109,8 @@ protected virtual void Dispose(bool disposing)
List subscriptions = null;
List publishQueues = null;
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
publishQueues = [.. m_publishQueues.Values];
m_publishQueues.Clear();
@@ -117,6 +118,10 @@ protected virtual void Dispose(bool disposing)
subscriptions = [.. m_subscriptions.Values];
m_subscriptions.Clear();
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
foreach (SessionPublishQueue publishQueue in publishQueues)
{
@@ -130,6 +135,7 @@ protected virtual void Dispose(bool disposing)
Utils.SilentDispose(m_shutdownEvent);
Utils.SilentDispose(m_conditionRefreshEvent);
+ Utils.SilentDispose(m_semaphoreSlim);
}
}
@@ -217,17 +223,19 @@ protected virtual void RaiseSubscriptionEvent(ISubscription subscription, bool d
///
/// Starts up the manager makes it ready to create subscriptions.
///
- public virtual void Startup()
+ public virtual async ValueTask StartupAsync(CancellationToken cancellationToken = default)
{
- lock (m_lock)
+ await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
{
// restore subscriptions on startup
- RestoreSubscriptions();
+ await RestoreSubscriptionsAsync(cancellationToken)
+ .ConfigureAwait(false);
m_shutdownEvent.Reset();
// TODO: Ensure shutdown awaits completion and a cancellation token is passed
- Task.Factory.StartNew(
+ _ = Task.Factory.StartNew(
() => PublishSubscriptions(m_publishingResolution),
default,
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach,
@@ -236,20 +244,25 @@ public virtual void Startup()
m_conditionRefreshEvent.Reset();
// TODO: Ensure shutdown awaits completion and a cancellation token is passed
- Task.Factory.StartNew(
- ConditionRefreshWorker,
+ _ = Task.Factory.StartNew(
+ ConditionRefreshWorkerAsync,
default,
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
}
///
/// Closes all subscriptions and rejects any new requests.
///
- public virtual void Shutdown()
+ public virtual async ValueTask ShutdownAsync(CancellationToken cancellationToken = default)
{
- lock (m_lock)
+ await m_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
{
// stop the publishing thread.
m_shutdownEvent.Set();
@@ -266,7 +279,8 @@ public virtual void Shutdown()
m_publishQueues.Clear();
// store subscriptions to be able to restore them after a restart
- StoreSubscriptions();
+ await StoreSubscriptionsAsync(cancellationToken)
+ .ConfigureAwait(false);
// dispose of subscriptions objects.
foreach (ISubscription subscription in m_subscriptions.Values)
@@ -276,14 +290,18 @@ public virtual void Shutdown()
m_subscriptions.Clear();
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
}
///
/// Stores durable subscriptions to be able to restore them after a restart
///
- public virtual void StoreSubscriptions()
+ public virtual async ValueTask StoreSubscriptionsAsync(CancellationToken cancellationToken = default)
{
- // only store subscriptions if durable subscriptions are enabeld
+ // only store subscriptions if durable subscriptions are enabled
if (!m_durableSubscriptionsEnabled || m_subscriptionStore == null)
{
return;
@@ -322,7 +340,7 @@ public virtual void StoreSubscriptions()
/// Restore durable subscriptions after a server restart
///
///
- public virtual void RestoreSubscriptions()
+ public virtual async ValueTask RestoreSubscriptionsAsync(CancellationToken cancellationToken = default)
{
if (m_server.IsRunning)
{
@@ -363,7 +381,8 @@ public virtual void RestoreSubscriptions()
try
{
- subscription = RestoreSubscription(storedSubscription);
+ subscription = await RestoreSubscriptionAsync(storedSubscription, cancellationToken)
+ .ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -387,7 +406,9 @@ public virtual void RestoreSubscriptions()
/// Restore a subscription after a restart
///
///
- protected virtual ISubscription RestoreSubscription(IStoredSubscription storedSubscription)
+ protected virtual async ValueTask RestoreSubscriptionAsync(
+ IStoredSubscription storedSubscription,
+ CancellationToken cancellationToken = default)
{
if (m_subscriptions.Count >= m_maxSubscriptionCount)
{
@@ -416,7 +437,8 @@ protected virtual ISubscription RestoreSubscription(IStoredSubscription storedSu
storedSubscription.MaxNotificationsPerPublish);
// create the subscription.
- var subscription = new Subscription(m_server, storedSubscription);
+ var subscription = await Subscription.RestoreAsync(m_server, storedSubscription, cancellationToken)
+ .ConfigureAwait(false);
uint publishingIntervalCount;
@@ -506,13 +528,18 @@ public virtual void SessionClosing(
// mark the subscriptions as abandoned.
else
{
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
(m_abandonedSubscriptions ??= []).Add(subscription);
m_logger.LogWarning(
"Subscription ABANDONED, Id={SubscriptionId}.",
subscription.Id);
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
}
}
}
@@ -599,12 +626,13 @@ public void ConditionRefresh2(
///
/// Completes a refresh conditions request.
///
- private void DoConditionRefresh(ISubscription subscription)
+ private async ValueTask DoConditionRefreshAsync(ISubscription subscription, CancellationToken cancellationToken = default)
{
try
{
m_logger.LogTrace("Subscription ConditionRefresh started, Id={SubscriptionId}.", subscription.Id);
- subscription.ConditionRefresh();
+ await subscription.ConditionRefreshAsync(cancellationToken)
+ .ConfigureAwait(false);
}
catch (Exception e)
{
@@ -615,7 +643,7 @@ private void DoConditionRefresh(ISubscription subscription)
///
/// Completes a refresh conditions request.
///
- private void DoConditionRefresh2(ISubscription subscription, uint monitoredItemId)
+ private async ValueTask DoConditionRefresh2Async(ISubscription subscription, uint monitoredItemId, CancellationToken cancellationToken = default)
{
try
{
@@ -623,7 +651,8 @@ private void DoConditionRefresh2(ISubscription subscription, uint monitoredItemI
"Subscription ConditionRefresh2 started, Id={SubscriptionId}, MonitoredItemId={MonitoredItemId}.",
subscription.Id,
monitoredItemId);
- subscription.ConditionRefresh2(monitoredItemId);
+ await subscription.ConditionRefresh2Async(monitoredItemId, cancellationToken)
+ .ConfigureAwait(false);
}
catch (Exception e)
{
@@ -639,7 +668,8 @@ public StatusCode DeleteSubscription(OperationContext context, uint subscription
{
ISubscription subscription = null;
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
// remove from publish queue.
if (m_subscriptions.TryGetValue(subscriptionId, out subscription))
@@ -681,6 +711,10 @@ public StatusCode DeleteSubscription(OperationContext context, uint subscription
// remove subscription.
m_subscriptions.TryRemove(subscriptionId, out _);
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
if (subscription != null)
{
@@ -745,19 +779,16 @@ private uint GetPublishingIntervalCount()
{
var publishingDiagnostics = new Dictionary();
- lock (m_lock)
+ foreach (ISubscription subscription in m_subscriptions.Values)
{
- foreach (ISubscription subscription in m_subscriptions.Values)
- {
- double publishingInterval = subscription.PublishingInterval;
+ double publishingInterval = subscription.PublishingInterval;
- if (!publishingDiagnostics.TryGetValue(publishingInterval, out uint total))
- {
- total = 0;
- }
-
- publishingDiagnostics[publishingInterval] = total + 1;
+ if (!publishingDiagnostics.TryGetValue(publishingInterval, out uint total))
+ {
+ total = 0;
}
+
+ publishingDiagnostics[publishingInterval] = total + 1;
}
return (uint)publishingDiagnostics.Count;
@@ -822,7 +853,8 @@ public virtual void CreateSubscription(
priority,
publishingEnabled);
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
// save subscription.
if (!m_subscriptions.TryAdd(subscriptionId, subscription))
@@ -849,10 +881,14 @@ public virtual void CreateSubscription(
return queue;
}
);
-
- // get the count for the diagnostics.
- publishingIntervalCount = GetPublishingIntervalCount();
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
+
+ // get the count for the diagnostics.
+ publishingIntervalCount = GetPublishingIntervalCount();
lock (m_statusMessagesLock)
{
@@ -1031,8 +1067,11 @@ public async Task PublishAsync(
// check for pending status message that may have arrived while waiting.
if (ReturnPendingStatusMessage(context, out statusMessage, out statusSubscriptionId))
{
- // requeue the subscription that was ready to publish.
- queue.Requeue(subscription);
+ if (subscription != null)
+ {
+ // requeue the subscription that was ready to publish.
+ queue.Requeue(subscription);
+ }
return new PublishResponse
{
@@ -1044,6 +1083,13 @@ public async Task PublishAsync(
};
}
+ // false alarm or race condition, requeue the request.
+ if (subscription == null)
+ {
+ requeue = true;
+ continue;
+ }
+
bool moreNotifications = false;
// publish notifications.
@@ -1374,7 +1420,8 @@ public void TransferSubscriptions(
}
// transfer session, add subscription to publish queue
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
subscription.TransferSession(context, sendInitialValues);
@@ -1403,6 +1450,10 @@ public void TransferSubscriptions(
}
publishQueue.Add(subscription);
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
lock (m_statusMessagesLock)
{
@@ -1469,7 +1520,8 @@ public void TransferSubscriptions(
}
}
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
// trigger publish response to return status immediately
if (m_publishQueues.TryGetValue(
@@ -1495,6 +1547,10 @@ public void TransferSubscriptions(
ownerPublishQueue.RemoveQueuedRequests();
}
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
}
// Return the sequence numbers that are available for retransmission.
@@ -1706,13 +1762,12 @@ public void DeleteMonitoredItems(
/// Changes the monitoring mode for a set of items.
///
///
- public void SetMonitoringMode(
+ public ValueTask<(StatusCodeCollection results, DiagnosticInfoCollection diagnosticInfos)> SetMonitoringModeAsync(
OperationContext context,
uint subscriptionId,
MonitoringMode monitoringMode,
UInt32Collection monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos)
+ CancellationToken cancellationToken = default)
{
// find subscription.
if (!m_subscriptions.TryGetValue(subscriptionId, out ISubscription subscription))
@@ -1721,12 +1776,11 @@ public void SetMonitoringMode(
}
// create the items.
- subscription.SetMonitoringMode(
+ return subscription.SetMonitoringModeAsync(
context,
monitoringMode,
monitoredItemIds,
- out results,
- out diagnosticInfos);
+ cancellationToken);
}
///
@@ -1989,7 +2043,8 @@ private void PublishSubscriptions(object data)
SessionPublishQueue[] queues = null;
ISubscription[] abandonedSubscriptions = null;
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
// collect active session queues.
queues = new SessionPublishQueue[m_publishQueues.Count];
@@ -2007,6 +2062,10 @@ private void PublishSubscriptions(object data)
}
}
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
// check the publish timer for each subscription.
for (int ii = 0; ii < queues.Length; ii++)
@@ -2038,13 +2097,18 @@ private void PublishSubscriptions(object data)
// schedule cleanup on a background thread.
if (subscriptionsToDelete.Count > 0)
{
- lock (m_lock)
+ m_semaphoreSlim.Wait();
+ try
{
for (int ii = 0; ii < subscriptionsToDelete.Count; ii++)
{
m_abandonedSubscriptions.Remove(subscriptionsToDelete[ii]);
}
}
+ finally
+ {
+ m_semaphoreSlim.Release();
+ }
CleanupSubscriptions(m_server, subscriptionsToDelete, m_logger);
}
@@ -2074,7 +2138,7 @@ private void PublishSubscriptions(object data)
///
/// A single thread to execute the condition refresh.
///
- private void ConditionRefreshWorker()
+ private async Task ConditionRefreshWorkerAsync()
{
try
{
@@ -2104,13 +2168,15 @@ private void ConditionRefreshWorker()
}
else if (conditionRefreshTask.MonitoredItemId == 0)
{
- DoConditionRefresh(conditionRefreshTask.Subscription);
+ await DoConditionRefreshAsync(conditionRefreshTask.Subscription)
+ .ConfigureAwait(false);
}
else
{
- DoConditionRefresh2(
+ await DoConditionRefresh2Async(
conditionRefreshTask.Subscription,
- conditionRefreshTask.MonitoredItemId);
+ conditionRefreshTask.MonitoredItemId)
+ .ConfigureAwait(false);
}
// use shutdown event to end loop
@@ -2210,7 +2276,7 @@ public override int GetHashCode()
}
}
- private readonly Lock m_lock = new();
+ private readonly SemaphoreSlim m_semaphoreSlim = new(1, 1);
private uint m_lastSubscriptionId;
private readonly ILogger m_logger;
private readonly IServerInternal m_server;
diff --git a/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj b/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj
index 31925f7a0..f413814ef 100644
--- a/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj
+++ b/Stack/Opc.Ua.Core/Opc.Ua.Core.csproj
@@ -1,8 +1,8 @@
$(DefineConstants);NET_STANDARD;NET_STANDARD_ASYNC;NET_STANDARD_OBSOLETE_SYNC;NET_STANDARD_OBSOLETE_APM
- $(DefineConstants);OPCUA_INCLUDE_ASYNC; OPCUA_EXCLUDE_FindServersOnNetwork_ASYNC; OPCUA_EXCLUDE_Cancel_ASYNC; OPCUA_EXCLUDE_AddNodes_ASYNC; OPCUA_EXCLUDE_AddReferences_ASYNC; OPCUA_EXCLUDE_DeleteNodes_ASYNC; OPCUA_EXCLUDE_DeleteReferences_ASYNC; OPCUA_EXCLUDE_RegisterNodes_ASYNC; OPCUA_EXCLUDE_UnregisterNodes_ASYNC; OPCUA_EXCLUDE_QueryFirst_ASYNC; OPCUA_EXCLUDE_QueryNext_ASYNC; OPCUA_EXCLUDE_CreateMonitoredItems_ASYNC; OPCUA_EXCLUDE_ModifyMonitoredItems_ASYNC; OPCUA_EXCLUDE_SetMonitoringMode_ASYNC; OPCUA_EXCLUDE_SetTriggering_ASYNC; OPCUA_EXCLUDE_DeleteMonitoredItems_ASYNC; OPCUA_EXCLUDE_CreateSubscription_ASYNC; OPCUA_EXCLUDE_ModifySubscription_ASYNC; OPCUA_EXCLUDE_SetPublishingMode_ASYNC; OPCUA_EXCLUDE_Republish_ASYNC; OPCUA_EXCLUDE_TransferSubscriptions_ASYNC; OPCUA_EXCLUDE_DeleteSubscriptions_ASYNC; OPCUA_EXCLUDE_RegisterServer_ASYNC; OPCUA_EXCLUDE_RegisterServer2_ASYNC
- $(DefineConstants);OPCUA_EXCLUDE_AccessLevelExType;OPCUA_EXCLUDE_AccessRestrictionType ;OPCUA_EXCLUDE_ReferenceDescription; OPCUA_EXCLUDE_ReferenceDescriptionCollection; OPCUA_EXCLUDE_AttributeWriteMask; OPCUA_EXCLUDE_Argument; OPCUA_EXCLUDE_ArgumentCollection; OPCUA_EXCLUDE_IdType; OPCUA_EXCLUDE_IdTypeCollection; OPCUA_EXCLUDE_RolePermissionType; OPCUA_EXCLUDE_RolePermissionTypeCollection; OPCUA_EXCLUDE_PermissionType; OPCUA_EXCLUDE_ViewDescription; OPCUA_EXCLUDE_BrowseDescription; OPCUA_EXCLUDE_BrowseDescriptionCollection; OPCUA_EXCLUDE_StructureDefinition; OPCUA_EXCLUDE_StructureType; OPCUA_EXCLUDE_StructureField; OPCUA_EXCLUDE_StructureFieldCollection; OPCUA_EXCLUDE_InstanceNode; OPCUA_EXCLUDE_ReferenceTypeNode; OPCUA_EXCLUDE_ReferenceNode; OPCUA_EXCLUDE_ReferenceNodeCollection; OPCUA_EXCLUDE_DataTypeDefinition; OPCUA_EXCLUDE_EnumDefinition; OPCUA_EXCLUDE_EnumField; OPCUA_EXCLUDE_EnumFieldCollection; OPCUA_EXCLUDE_EnumValueType; OPCUA_EXCLUDE_EnumValueTypeCollection; OPCUA_EXCLUDE_RelativePath; OPCUA_EXCLUDE_BrowseDirection; OPCUA_EXCLUDE_RelativePathElement; OPCUA_EXCLUDE_RelativePathElementCollection; OPCUA_EXCLUDE_NodeClass; OPCUA_EXCLUDE_Node; OPCUA_EXCLUDE_ViewNode; OPCUA_EXCLUDE_ObjectNode; OPCUA_EXCLUDE_MethodNode; OPCUA_EXCLUDE_TypeNode; OPCUA_EXCLUDE_ObjectTypeNode; OPCUA_EXCLUDE_DataTypeNode; OPCUA_EXCLUDE_VariableTypeNode; OPCUA_EXCLUDE_VariableNode
+ $(DefineConstants);OPCUA_INCLUDE_ASYNC;
+ $(DefineConstants);OPCUA_EXCLUDE_AccessRestrictionType ;OPCUA_EXCLUDE_ReferenceDescription; OPCUA_EXCLUDE_ReferenceDescriptionCollection; OPCUA_EXCLUDE_AttributeWriteMask; OPCUA_EXCLUDE_Argument; OPCUA_EXCLUDE_ArgumentCollection; OPCUA_EXCLUDE_IdType; OPCUA_EXCLUDE_IdTypeCollection; OPCUA_EXCLUDE_RolePermissionType; OPCUA_EXCLUDE_RolePermissionTypeCollection; OPCUA_EXCLUDE_PermissionType; OPCUA_EXCLUDE_ViewDescription; OPCUA_EXCLUDE_BrowseDescription; OPCUA_EXCLUDE_BrowseDescriptionCollection; OPCUA_EXCLUDE_StructureDefinition; OPCUA_EXCLUDE_StructureType; OPCUA_EXCLUDE_StructureField; OPCUA_EXCLUDE_StructureFieldCollection; OPCUA_EXCLUDE_InstanceNode; OPCUA_EXCLUDE_ReferenceTypeNode; OPCUA_EXCLUDE_ReferenceNode; OPCUA_EXCLUDE_ReferenceNodeCollection; OPCUA_EXCLUDE_DataTypeDefinition; OPCUA_EXCLUDE_EnumDefinition; OPCUA_EXCLUDE_EnumField; OPCUA_EXCLUDE_EnumFieldCollection; OPCUA_EXCLUDE_EnumValueType; OPCUA_EXCLUDE_EnumValueTypeCollection; OPCUA_EXCLUDE_RelativePath; OPCUA_EXCLUDE_BrowseDirection; OPCUA_EXCLUDE_RelativePathElement; OPCUA_EXCLUDE_RelativePathElementCollection; OPCUA_EXCLUDE_NodeClass; OPCUA_EXCLUDE_Node; OPCUA_EXCLUDE_ViewNode; OPCUA_EXCLUDE_ObjectNode; OPCUA_EXCLUDE_MethodNode; OPCUA_EXCLUDE_TypeNode; OPCUA_EXCLUDE_ObjectTypeNode; OPCUA_EXCLUDE_DataTypeNode; OPCUA_EXCLUDE_VariableTypeNode; OPCUA_EXCLUDE_VariableNode
$(LibCoreTargetFrameworks)
$(AssemblyPrefix).Core
$(PackagePrefix).Opc.Ua.Core
diff --git a/Tests/Opc.Ua.Server.Tests/ServerTestServices.cs b/Tests/Opc.Ua.Server.Tests/ServerTestServices.cs
index 57317c7a0..3c011426a 100644
--- a/Tests/Opc.Ua.Server.Tests/ServerTestServices.cs
+++ b/Tests/Opc.Ua.Server.Tests/ServerTestServices.cs
@@ -191,7 +191,7 @@ public ValueTask CreateSubscriptionAsync(
byte priority,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.CreateSubscription(
+ return new ValueTask(m_server.CreateSubscriptionAsync(
SecureChannelContext,
requestHeader,
requestedPublishingInterval,
@@ -200,19 +200,7 @@ public ValueTask CreateSubscriptionAsync(
maxNotificationsPerPublish,
publishingEnabled,
priority,
- out uint subscriptionId,
- out double revisedPublishingInterval,
- out uint revisedLifetimeCount,
- out uint revisedMaxKeepAliveCount);
- var response = new CreateSubscriptionResponse
- {
- ResponseHeader = responseHeader,
- SubscriptionId = subscriptionId,
- RevisedPublishingInterval = revisedPublishingInterval,
- RevisedLifetimeCount = revisedLifetimeCount,
- RevisedMaxKeepAliveCount = revisedMaxKeepAliveCount
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask CreateMonitoredItemsAsync(
@@ -222,21 +210,13 @@ public ValueTask CreateMonitoredItemsAsync(
MonitoredItemCreateRequestCollection itemsToCreate,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.CreateMonitoredItems(
+ return new ValueTask(m_server.CreateMonitoredItemsAsync(
SecureChannelContext,
requestHeader,
subscriptionId,
timestampsToReturn,
itemsToCreate,
- out MonitoredItemCreateResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
- var response = new CreateMonitoredItemsResponse
- {
- ResponseHeader = responseHeader,
- Results = results,
- DiagnosticInfos = diagnosticInfos
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask ModifySubscriptionAsync(
@@ -249,7 +229,7 @@ public ValueTask ModifySubscriptionAsync(
byte priority,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.ModifySubscription(
+ return new ValueTask(m_server.ModifySubscriptionAsync(
SecureChannelContext,
requestHeader,
subscriptionId,
@@ -258,17 +238,7 @@ public ValueTask ModifySubscriptionAsync(
requestedMaxKeepAliveCount,
maxNotificationsPerPublish,
priority,
- out double revisedPublishingInterval,
- out uint revisedLifetimeCount,
- out uint revisedMaxKeepAliveCount);
- var response = new ModifySubscriptionResponse
- {
- ResponseHeader = responseHeader,
- RevisedPublishingInterval = revisedPublishingInterval,
- RevisedLifetimeCount = revisedLifetimeCount,
- RevisedMaxKeepAliveCount = revisedMaxKeepAliveCount
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask ModifyMonitoredItemsAsync(
@@ -278,21 +248,13 @@ public ValueTask ModifyMonitoredItemsAsync(
MonitoredItemModifyRequestCollection itemsToModify,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.ModifyMonitoredItems(
+ return new ValueTask(m_server.ModifyMonitoredItemsAsync(
SecureChannelContext,
requestHeader,
subscriptionId,
timestampsToReturn,
itemsToModify,
- out MonitoredItemModifyResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
- var response = new ModifyMonitoredItemsResponse
- {
- ResponseHeader = responseHeader,
- Results = results,
- DiagnosticInfos = diagnosticInfos
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask PublishAsync(
@@ -313,20 +275,12 @@ public ValueTask SetPublishingModeAsync(
UInt32Collection subscriptionIds,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.SetPublishingMode(
+ return new ValueTask(m_server.SetPublishingModeAsync(
SecureChannelContext,
requestHeader,
publishingEnabled,
subscriptionIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
- var response = new SetPublishingModeResponse
- {
- ResponseHeader = responseHeader,
- Results = results,
- DiagnosticInfos = diagnosticInfos
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask SetMonitoringModeAsync(
@@ -336,21 +290,14 @@ public ValueTask SetMonitoringModeAsync(
UInt32Collection monitoredItemIds,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.SetMonitoringMode(
- SecureChannelContext,
- requestHeader,
- subscriptionId,
- monitoringMode,
- monitoredItemIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
- var response = new SetMonitoringModeResponse
- {
- ResponseHeader = responseHeader,
- Results = results,
- DiagnosticInfos = diagnosticInfos
- };
- return new ValueTask(response);
+ return new ValueTask(
+ m_server.SetMonitoringModeAsync(
+ SecureChannelContext,
+ requestHeader,
+ subscriptionId,
+ monitoringMode,
+ monitoredItemIds,
+ ct));
}
public ValueTask RepublishAsync(
@@ -359,18 +306,12 @@ public ValueTask RepublishAsync(
uint retransmitSequenceNumber,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.Republish(
+ return new ValueTask(m_server.RepublishAsync(
SecureChannelContext,
requestHeader,
subscriptionId,
retransmitSequenceNumber,
- out NotificationMessage notificationMessage);
- var response = new RepublishResponse
- {
- ResponseHeader = responseHeader,
- NotificationMessage = notificationMessage
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask DeleteSubscriptionsAsync(
@@ -378,19 +319,11 @@ public ValueTask DeleteSubscriptionsAsync(
UInt32Collection subscriptionIds,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.DeleteSubscriptions(
+ return new ValueTask(m_server.DeleteSubscriptionsAsync(
SecureChannelContext,
requestHeader,
subscriptionIds,
- out StatusCodeCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
- var response = new DeleteSubscriptionsResponse
- {
- ResponseHeader = responseHeader,
- Results = results,
- DiagnosticInfos = diagnosticInfos
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask TransferSubscriptionsAsync(
@@ -399,20 +332,12 @@ public ValueTask TransferSubscriptionsAsync(
bool sendInitialValues,
CancellationToken ct = default)
{
- ResponseHeader responseHeader = m_server.TransferSubscriptions(
+ return new ValueTask(m_server.TransferSubscriptionsAsync(
SecureChannelContext,
requestHeader,
subscriptionIds,
sendInitialValues,
- out TransferResultCollection results,
- out DiagnosticInfoCollection diagnosticInfos);
- var response = new TransferSubscriptionsResponse
- {
- ResponseHeader = responseHeader,
- Results = results,
- DiagnosticInfos = diagnosticInfos
- };
- return new ValueTask(response);
+ ct));
}
public ValueTask TranslateBrowsePathsToNodeIdsAsync(