Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 86 additions & 10 deletions src/Worker/Grpc/GrpcEntityRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
using DurableTask.Core.Entities.OperationFormat;
using Google.Protobuf;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using P = Microsoft.DurableTask.Protobuf;

Expand All @@ -25,7 +26,7 @@
/// </para>
/// </remarks>
public static class GrpcEntityRunner
{
{
/// <summary>
/// Deserializes entity batch request from <paramref name="encodedEntityRequest"/> and uses it to invoke the
/// requested operations implemented by <paramref name="implementation"/>.
Expand All @@ -51,24 +52,99 @@
/// </exception>
public static async Task<string> LoadAndRunAsync(
string encodedEntityRequest, ITaskEntity implementation, IServiceProvider? services = null)
{
return await LoadAndRunAsync(encodedEntityRequest, implementation, extendedSessionsCache: null, services: services);
}

/// <summary>
/// Deserializes entity batch request from <paramref name="encodedEntityRequest"/> and uses it to invoke the
/// requested operations implemented by <paramref name="implementation"/>.
/// </summary>
/// <param name="encodedEntityRequest">
/// The encoded protobuf payload representing an entity batch request. This is a base64-encoded string.
/// </param>
/// <param name="implementation">
/// An <see cref="ITaskEntity"/> implementation that defines the entity logic.
/// </param>
/// <param name="extendedSessionsCache">
/// The cache of entity states which can be used to retrieve the entity state if this request is from within an extended session.
/// </param>
/// <param name="services">
/// Optional <see cref="IServiceProvider"/> from which injected dependencies can be retrieved.
/// </param>
/// <returns>
/// Returns a serialized result of the entity batch that should be used as the return value of the entity function
/// trigger.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown if <paramref name="encodedEntityRequest"/> or <paramref name="implementation"/> is <c>null</c>.
/// </exception>
/// <exception cref="ArgumentException">
/// Thrown if <paramref name="encodedEntityRequest"/> contains invalid data.
/// </exception>
public static async Task<string> LoadAndRunAsync(
string encodedEntityRequest, ITaskEntity implementation, ExtendedSessionsCache? extendedSessionsCache, IServiceProvider? services = null)
{
Check.NotNullOrEmpty(encodedEntityRequest);
Check.NotNull(implementation);

P.EntityBatchRequest request = P.EntityBatchRequest.Parser.Base64Decode<P.EntityBatchRequest>(
encodedEntityRequest);
encodedEntityRequest);
Dictionary<string, object?> properties = request.Properties.ToDictionary(

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 93 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchRequest' does not contain a definition for 'Properties' and no accessible extension method 'Properties' accepting a first argument of type 'EntityBatchRequest' could be found (are you missing a using directive or an assembly reference?)
pair => pair.Key,
pair => ProtoUtils.ConvertValueToObject(pair.Value));

EntityBatchRequest batch = request.ToEntityBatchRequest();
EntityId id = EntityId.FromString(batch.InstanceId!);
TaskName entityName = new(id.Name);

TaskName entityName = new(id.Name);

bool addToExtendedSessions = false;
bool stateCached = false;
GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache(
properties,
extendedSessionsCache,
out double extendedSessionIdleTimeoutInSeconds,
out bool isExtendedSession,
out bool entityStateIncluded,
out MemoryCache? extendedSessions);

if (isExtendedSession && extendedSessions != null)
{
addToExtendedSessions = true;

// If an entity state was provided, even if we already have one stored, we always want to use the provided state.
if (!entityStateIncluded && extendedSessions.TryGetValue(request.InstanceId, out string? entityState))
Comment thread
sophiatev marked this conversation as resolved.
{
batch.EntityState = entityState;
stateCached = true;
}
Comment thread
sophiatev marked this conversation as resolved.
}

if (!stateCached && !entityStateIncluded)
{
// No state was provided, and we do not have one cached, so we cannot execute the batch request.
return Convert.ToBase64String(new P.EntityBatchResult { RequiresState = true }.ToByteArray());

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / build

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'

Check failure on line 126 in src/Worker/Grpc/GrpcEntityRunner.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'EntityBatchResult' does not contain a definition for 'RequiresState'
}

DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);

TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);

: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);

TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);

if (addToExtendedSessions)
{
extendedSessions.Set(
Comment thread
sophiatev marked this conversation as resolved.
Outdated
request.InstanceId,
result.EntityState,
new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
}
else
{
extendedSessions?.Remove(request.InstanceId);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what situations would we successfully remove something from this cache here?

@sophiatev sophiatev Dec 18, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to handle the situation where the caller of this method has specifically indicated that the extended session has ended (so the extended sessions cache is non-null, but isExtendedSession is false. We want to honor the termination of the extended session and remove the entity state from the cache here). The other path that could get us here is if extended sessions are simply not enabled, in which case the cache will be null, hence the need for the ?

}
Comment thread
sophiatev marked this conversation as resolved.

P.EntityBatchResult response = result.ToEntityBatchResult();
byte[] responseBytes = response.ToByteArray();
return Convert.ToBase64String(responseBytes);
Expand Down
71 changes: 71 additions & 0 deletions src/Worker/Grpc/GrpcInstanceRunnerUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Text;
Comment thread
sophiatev marked this conversation as resolved.
Outdated
using Microsoft.Extensions.Caching.Memory;

namespace Microsoft.DurableTask.Worker.Grpc;

/// <summary>
/// Utility methods for the <see cref="GrpcOrchestrationRunner"/> and <see cref="GrpcEntityRunner"/> classes.
/// </summary>
static class GrpcInstanceRunnerUtils
{
/// <summary>
/// Parses request properties to determine extended session settings and initializes the extended sessions cache if
/// the settings are properly enabled.
/// </summary>
/// <remarks>
/// If any request property is missing or invalid (i.e. the key is misspelled or the value is of the wrong type),
/// extended sessions are not enabled and default values are assigned are assigned to the returns.
Comment thread
sophiatev marked this conversation as resolved.
Outdated
/// </remarks>
/// <param name="properties">
/// A dictionary containing request properties used to configure extended session behavior.
/// </param>
/// <param name="extendedSessionsCache">The extended sessions cache manager.</param>
/// <param name="extendedSessionIdleTimeoutInSeconds">
/// When the method returns, contains the idle timeout value for extended sessions, in seconds. Cache entries that
/// have not been accessed in this timeframe are evicted from <paramref name="extendedSessionsCache"/>.
/// Set to zero if extended sessions are not enabled.
/// </param>
/// <param name="isExtendedSession">When the method returns, indicates whether this request is from within an extended session.</param>
/// <param name="stateIncluded">When the method returns, indicates whether instance state is included in the request.</param>
/// <param name="extendedSessions">When the method returns, contains the extended sessions cache initialized from
/// <paramref name="extendedSessionsCache"/> if <paramref name="isExtendedSession"/> and <paramref name="extendedSessionIdleTimeoutInSeconds"/>
/// are correctly specified in the <paramref name="properties"/>; otherwise, null.
/// </param>
internal static void ParseRequestPropertiesAndInitializeCache(
Dictionary<string, object?> properties,
ExtendedSessionsCache? extendedSessionsCache,
out double extendedSessionIdleTimeoutInSeconds,
out bool isExtendedSession,
out bool stateIncluded,
out MemoryCache? extendedSessions)
{
// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the instance state is attached
extendedSessions = null;
stateIncluded = true;
isExtendedSession = false;
extendedSessionIdleTimeoutInSeconds = 0;

// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
&& extendedSessionIdleTimeout > 0
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
&& extendedSessionObj is bool extendedSession)
{
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
isExtendedSession = extendedSession;
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
}

if (properties.TryGetValue("IncludeState", out object? includeStateObj)
&& includeStateObj is bool includeState)
{
stateIncluded = includeState;
}
}
}
30 changes: 7 additions & 23 deletions src/Worker/Grpc/GrpcOrchestrationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,32 +131,16 @@ public static string LoadAndRun(
pair => ProtoUtils.ConvertValueToObject(pair.Value));

OrchestratorExecutionResult? result = null;
MemoryCache? extendedSessions = null;

// If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
bool addToExtendedSessions = false;
bool requiresHistory = false;
bool pastEventsIncluded = true;
bool isExtendedSession = false;
double extendedSessionIdleTimeoutInSeconds = 0;

// Only attempt to initialize the extended sessions cache if all the parameters are correctly specified
if (properties.TryGetValue("ExtendedSessionIdleTimeoutInSeconds", out object? extendedSessionIdleTimeoutObj)
&& extendedSessionIdleTimeoutObj is double extendedSessionIdleTimeout
&& extendedSessionIdleTimeout > 0
&& properties.TryGetValue("IsExtendedSession", out object? extendedSessionObj)
&& extendedSessionObj is bool extendedSession)
{
extendedSessionIdleTimeoutInSeconds = extendedSessionIdleTimeout;
isExtendedSession = extendedSession;
extendedSessions = extendedSessionsCache?.GetOrInitializeCache(extendedSessionIdleTimeoutInSeconds);
}

if (properties.TryGetValue("IncludePastEvents", out object? includePastEventsObj)
&& includePastEventsObj is bool includePastEvents)
{
pastEventsIncluded = includePastEvents;
}
GrpcInstanceRunnerUtils.ParseRequestPropertiesAndInitializeCache(
properties,
extendedSessionsCache,
out double extendedSessionIdleTimeoutInSeconds,
out bool isExtendedSession,
out bool pastEventsIncluded,
out MemoryCache? extendedSessions);

if (isExtendedSession && extendedSessions != null)
{
Expand Down
Loading
Loading