diff --git a/TickerQ.sln b/TickerQ.sln index ef9bb45b..4049ad2c 100644 --- a/TickerQ.sln +++ b/TickerQ.sln @@ -44,6 +44,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Sample.WebApi", "sa EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Sample.Console", "samples\TickerQ.Sample.Console\TickerQ.Sample.Console.csproj", "{723F26D9-C675-4883-8B62-AEBA370566D4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.SDK", "src\TickerQ.SDK\TickerQ.SDK.csproj", "{458E9E1E-A58D-43B8-9CFF-FFC84C715094}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.RemoteExecutor", "src\TickerQ.RemoteExecutor\TickerQ.RemoteExecutor.csproj", "{F8599439-948C-4856-ACD5-79CCBC13E9AE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -174,6 +178,30 @@ Global {723F26D9-C675-4883-8B62-AEBA370566D4}.Release|x64.Build.0 = Release|Any CPU {723F26D9-C675-4883-8B62-AEBA370566D4}.Release|x86.ActiveCfg = Release|Any CPU {723F26D9-C675-4883-8B62-AEBA370566D4}.Release|x86.Build.0 = Release|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Debug|Any CPU.Build.0 = Debug|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Debug|x64.ActiveCfg = Debug|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Debug|x64.Build.0 = Debug|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Debug|x86.ActiveCfg = Debug|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Debug|x86.Build.0 = Debug|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Release|Any CPU.ActiveCfg = Release|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Release|Any CPU.Build.0 = Release|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Release|x64.ActiveCfg = Release|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Release|x64.Build.0 = Release|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Release|x86.ActiveCfg = Release|Any CPU + {458E9E1E-A58D-43B8-9CFF-FFC84C715094}.Release|x86.Build.0 = Release|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Debug|x64.ActiveCfg = Debug|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Debug|x64.Build.0 = Debug|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Debug|x86.ActiveCfg = Debug|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Debug|x86.Build.0 = Debug|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Release|Any CPU.Build.0 = Release|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Release|x64.ActiveCfg = Release|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Release|x64.Build.0 = Release|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Release|x86.ActiveCfg = Release|Any CPU + {F8599439-948C-4856-ACD5-79CCBC13E9AE}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -190,5 +218,7 @@ Global {D87B9599-22C7-4E82-B300-1239E504E097} = {BA086F2D-2778-4F58-A9AA-45F560CE3504} {4DC9FE10-966F-4C8A-B7D2-1215DC319B9E} = {45D577FA-DB7A-4B96-BB3F-97DDA0A929D5} {723F26D9-C675-4883-8B62-AEBA370566D4} = {45D577FA-DB7A-4B96-BB3F-97DDA0A929D5} + {458E9E1E-A58D-43B8-9CFF-FFC84C715094} = {BA086F2D-2778-4F58-A9AA-45F560CE3504} + {F8599439-948C-4856-ACD5-79CCBC13E9AE} = {BA086F2D-2778-4F58-A9AA-45F560CE3504} EndGlobalSection EndGlobal diff --git a/src/TickerQ.Caching.StackExchangeRedis/DependencyInjection/ServiceExtension.cs b/src/TickerQ.Caching.StackExchangeRedis/DependencyInjection/ServiceExtension.cs index e418cfbb..0c62815a 100644 --- a/src/TickerQ.Caching.StackExchangeRedis/DependencyInjection/ServiceExtension.cs +++ b/src/TickerQ.Caching.StackExchangeRedis/DependencyInjection/ServiceExtension.cs @@ -1,7 +1,10 @@ +using System; using System.Runtime.CompilerServices; using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.DependencyInjection; +using StackExchange.Redis; +using TickerQ.Caching.StackExchangeRedis.Infrastructure; using TickerQ.Utilities; using TickerQ.Utilities.Entities; using TickerQ.Utilities.Interfaces; @@ -23,10 +26,19 @@ public static TickerOptionsBuilder AddStackExchangeRed }; setupAction?.Invoke(options); + if (string.IsNullOrWhiteSpace(options.Configuration) && options.ConfigurationOptions == null) + throw new InvalidOperationException("Redis configuration must be provided when enabling StackExchange.Redis persistence."); + + services.AddSingleton(_ => + options.ConfigurationOptions != null + ? ConnectionMultiplexer.Connect(options.ConfigurationOptions) + : ConnectionMultiplexer.Connect(options.Configuration)); + services.AddSingleton(sp => sp.GetRequiredService().GetDatabase()); services.AddHostedService(); services.AddSingleton(); services.AddKeyedSingleton("tickerq", (sp, key) => new RedisCache(options)); services.AddSingleton(_ => options); + services.AddSingleton, TickerRedisPersistenceProvider>(); }; return tickerConfiguration; @@ -36,4 +48,4 @@ public class TickerQRedisOptionBuilder : RedisCacheOptions { public TimeSpan NodeHeartbeatInterval { get; set; } = TimeSpan.FromMinutes(1); } -} \ No newline at end of file +} diff --git a/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs b/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs new file mode 100644 index 00000000..a44b6202 --- /dev/null +++ b/src/TickerQ.Caching.StackExchangeRedis/Infrastructure/TickerRedisPersistenceProvider.cs @@ -0,0 +1,963 @@ +#nullable disable +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using StackExchange.Redis; +using TickerQ.Utilities; +using TickerQ.Utilities.Entities; +using TickerQ.Utilities.Enums; +using TickerQ.Utilities.Interfaces; +using TickerQ.Utilities.Models; + +namespace TickerQ.Caching.StackExchangeRedis.Infrastructure; + +/// +/// Redis-native persistence provider that stores all ticker data directly in Redis +/// (hashes/sets/zsets via string serialization) rather than keeping in-memory snapshots. +/// This mirrors the EF Core provider’s behavior: optimistic concurrency on UpdatedAt, +/// lock-holder semantics, and scheduling via execution time ordering. +/// +internal sealed class TickerRedisPersistenceProvider : ITickerPersistenceProvider + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() +{ + private readonly IDatabase _db; + private readonly ITickerClock _clock; + private readonly string _lockHolder; + private readonly JsonSerializerOptions _jsonOptions; + + private const string Prefix = "tq"; + private const string TimeTickerIdsKey = $"{Prefix}:tt:ids"; + private const string TimeTickerPendingKey = $"{Prefix}:tt:pending"; + private const string CronIdsKey = $"{Prefix}:cron:ids"; + private const string CronOccurrenceIdsKey = $"{Prefix}:co:ids"; + private const string CronOccurrencePendingKey = $"{Prefix}:co:pending"; + + public TickerRedisPersistenceProvider(IDatabase db, ITickerClock clock, SchedulerOptionsBuilder optionsBuilder) + { + _db = db ?? throw new ArgumentNullException(nameof(db)); + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + _lockHolder = optionsBuilder?.NodeIdentifier ?? Environment.MachineName; + _jsonOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + WriteIndented = false + }; + } + + #region Key helpers + private static string TimeTickerKey(Guid id) => $"{Prefix}:tt:{id}"; + private static string CronKey(Guid id) => $"{Prefix}:cron:{id}"; + private static string CronOccurrenceKey(Guid id) => $"{Prefix}:co:{id}"; + private static double ToScore(DateTime utc) => utc.ToUniversalTime().Ticks; + #endregion + + #region Serialization helpers + private async Task GetAsync(string key) where T : class + { + var val = await _db.StringGetAsync(key).ConfigureAwait(false); + if (val.IsNullOrEmpty) return null; + try { return JsonSerializer.Deserialize((string)val, _jsonOptions); } + catch { return null; } + } + + private Task SetAsync(string key, T value) where T : class + { + var payload = JsonSerializer.Serialize(value, _jsonOptions); + return _db.StringSetAsync(key, payload); + } + #endregion + + #region Common helpers + private static bool CanAcquire(TickerStatus status, string currentHolder, string lockHolder) + { + return status is TickerStatus.Idle or TickerStatus.Queued && + (string.IsNullOrEmpty(currentHolder) || string.Equals(currentHolder, lockHolder, StringComparison.Ordinal)); + } + + private static TimeTickerEntity MapForQueue(TTimeTicker ticker) + { + return new TimeTickerEntity + { + Id = ticker.Id, + Function = ticker.Function, + Retries = ticker.Retries, + RetryIntervals = ticker.RetryIntervals, + UpdatedAt = ticker.UpdatedAt, + ParentId = ticker.ParentId, + ExecutionTime = ticker.ExecutionTime, + Status = ticker.Status, + LockHolder = ticker.LockHolder, + LockedAt = ticker.LockedAt, + Children = ticker.Children?.OfType().Select(ch => new TimeTickerEntity + { + Id = ch.Id, + Function = ch.Function, + Retries = ch.Retries, + RetryIntervals = ch.RetryIntervals, + RunCondition = ch.RunCondition, + ParentId = ch.ParentId, + Children = ch.Children?.OfType().Select(gch => new TimeTickerEntity + { + Id = gch.Id, + Function = gch.Function, + Retries = gch.Retries, + RetryIntervals = gch.RetryIntervals, + RunCondition = gch.RunCondition, + ParentId = gch.ParentId + }).ToArray() + }).ToArray() + }; + } + + private async Task AddTimeTickerIndexesAsync(TTimeTicker ticker) + { + await _db.SetAddAsync(TimeTickerIdsKey, ticker.Id.ToString()).ConfigureAwait(false); + if (ticker.ExecutionTime.HasValue && CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder)) + { + await _db.SortedSetAddAsync(TimeTickerPendingKey, ticker.Id.ToString(), ToScore(ticker.ExecutionTime.Value)).ConfigureAwait(false); + } + else + { + await _db.SortedSetRemoveAsync(TimeTickerPendingKey, ticker.Id.ToString()).ConfigureAwait(false); + } + } + + private async Task RemoveTimeTickerIndexesAsync(Guid id) + { + await _db.SetRemoveAsync(TimeTickerIdsKey, id.ToString()).ConfigureAwait(false); + await _db.SortedSetRemoveAsync(TimeTickerPendingKey, id.ToString()).ConfigureAwait(false); + } + + private async Task AddCronIndexesAsync(TCronTicker ticker) + { + await _db.SetAddAsync(CronIdsKey, ticker.Id.ToString()).ConfigureAwait(false); + } + + private async Task RemoveCronIndexesAsync(Guid id) + { + await _db.SetRemoveAsync(CronIdsKey, id.ToString()).ConfigureAwait(false); + } + + private async Task AddCronOccurrenceIndexesAsync(CronTickerOccurrenceEntity occurrence) + { + await _db.SetAddAsync(CronOccurrenceIdsKey, occurrence.Id.ToString()).ConfigureAwait(false); + if (CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder)) + { + await _db.SortedSetAddAsync(CronOccurrencePendingKey, occurrence.Id.ToString(), ToScore(occurrence.ExecutionTime)).ConfigureAwait(false); + } + else + { + await _db.SortedSetRemoveAsync(CronOccurrencePendingKey, occurrence.Id.ToString()).ConfigureAwait(false); + } + } + + private async Task RemoveCronOccurrenceIndexesAsync(Guid id) + { + await _db.SetRemoveAsync(CronOccurrenceIdsKey, id.ToString()).ConfigureAwait(false); + await _db.SortedSetRemoveAsync(CronOccurrencePendingKey, id.ToString()).ConfigureAwait(false); + } + #endregion + + #region Time_Ticker_Core_Methods + public async IAsyncEnumerable QueueTimeTickers(TimeTickerEntity[] timeTickers, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + + foreach (var timeTicker in timeTickers) + { + cancellationToken.ThrowIfCancellationRequested(); + var stored = await GetAsync(TimeTickerKey(timeTicker.Id)).ConfigureAwait(false); + if (stored == null) continue; + if (stored.UpdatedAt != timeTicker.UpdatedAt) continue; + if (!CanAcquire(stored.Status, stored.LockHolder, _lockHolder)) continue; + + stored.LockHolder = _lockHolder; + stored.LockedAt = now; + stored.UpdatedAt = now; + stored.Status = TickerStatus.Queued; + + timeTicker.LockHolder = _lockHolder; + timeTicker.LockedAt = now; + timeTicker.UpdatedAt = now; + timeTicker.Status = TickerStatus.Queued; + + await SetAsync(TimeTickerKey(stored.Id), stored).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(stored).ConfigureAwait(false); + + yield return timeTicker; + } + } + + public async IAsyncEnumerable QueueTimedOutTimeTickers([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + var threshold = now.AddMilliseconds(-100); + + // Fetch due tickers by score + var dueIds = await _db.SortedSetRangeByScoreAsync(TimeTickerPendingKey, double.NegativeInfinity, ToScore(threshold)).ConfigureAwait(false); + + foreach (var redisValue in dueIds) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker == null) continue; + if (!CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder)) continue; + + ticker.LockHolder = _lockHolder; + ticker.LockedAt = now; + ticker.UpdatedAt = now; + ticker.Status = TickerStatus.InProgress; + + await SetAsync(TimeTickerKey(ticker.Id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + + yield return MapForQueue(ticker); + } + } + + public async Task ReleaseAcquiredTimeTickers(Guid[] timeTickerIds, CancellationToken cancellationToken = default) + { + var ids = timeTickerIds.Length == 0 + ? (await _db.SetMembersAsync(TimeTickerIdsKey).ConfigureAwait(false)).Select(x => Guid.Parse(x.ToString())).ToArray() + : timeTickerIds; + + var now = _clock.UtcNow; + foreach (var id in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker == null) continue; + if (!CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder)) continue; + + ticker.LockHolder = null; + ticker.LockedAt = null; + ticker.Status = TickerStatus.Idle; + ticker.UpdatedAt = now; + + await SetAsync(TimeTickerKey(id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + } + } + + public async Task GetEarliestTimeTickers(CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + var oneSecondAgoScore = ToScore(now.AddSeconds(-1)); + + // Get the earliest entry in window + var first = await _db.SortedSetRangeByScoreWithScoresAsync(TimeTickerPendingKey, oneSecondAgoScore, double.PositiveInfinity, Exclude.None, Order.Ascending, 0, 1).ConfigureAwait(false); + if (first.Length == 0) return []; + + var earliestScore = first[0].Score; + var minSecond = new DateTime((long)earliestScore, DateTimeKind.Utc); + var maxScore = ToScore(minSecond.AddSeconds(1)); + + // Fetch all tickers within that second + var ids = await _db.SortedSetRangeByScoreAsync(TimeTickerPendingKey, earliestScore, maxScore).ConfigureAwait(false); + var result = new List(); + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker == null) continue; + if (!CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder)) continue; + result.Add(MapForQueue(ticker)); + } + + return result.ToArray(); + } + + public async Task UpdateTimeTicker(InternalFunctionContext functionContexts, CancellationToken cancellationToken = default) + { + var ticker = await GetAsync(TimeTickerKey(functionContexts.TickerId)).ConfigureAwait(false); + if (ticker == null) return 0; + + ApplyFunctionContextToTicker(ticker, functionContexts); + await SetAsync(TimeTickerKey(ticker.Id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + return 1; + } + + public async Task GetTimeTickerRequest(Guid id, CancellationToken cancellationToken) + { + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + return ticker?.Request; + } + + public async Task UpdateTimeTickersWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + { + foreach (var id in timeTickerIds) + { + cancellationToken.ThrowIfCancellationRequested(); + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker == null) continue; + ApplyFunctionContextToTicker(ticker, functionContext); + await SetAsync(TimeTickerKey(id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + } + } + + public async Task AcquireImmediateTimeTickersAsync(Guid[] ids, CancellationToken cancellationToken = default) + { + if (ids == null || ids.Length == 0) return Array.Empty(); + + var now = _clock.UtcNow; + var acquired = new List(); + foreach (var id in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker == null) continue; + if (!CanAcquire(ticker.Status, ticker.LockHolder, _lockHolder)) continue; + + ticker.LockHolder = _lockHolder; + ticker.LockedAt = now; + ticker.Status = TickerStatus.InProgress; + ticker.UpdatedAt = now; + + await SetAsync(TimeTickerKey(id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + + acquired.Add(MapForQueue(ticker)); + } + + return acquired.ToArray(); + } + #endregion + + #region Cron_Ticker_Core_Methods + public async Task MigrateDefinedCronTickers((string Function, string Expression)[] cronTickers, CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + const string seedPrefix = "MemoryTicker_Seeded_"; + var functions = cronTickers.Select(x => x.Function).ToHashSet(StringComparer.Ordinal); + + // Load existing seeded cron tickers + var existingIds = await _db.SetMembersAsync(CronIdsKey).ConfigureAwait(false); + var existingList = new List(); + foreach (var redisValue in existingIds) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var cron = await GetAsync(CronKey(id)).ConfigureAwait(false); + if (cron != null) existingList.Add(cron); + } + + var seededCron = existingList.Where(c => c.InitIdentifier != null && c.InitIdentifier.StartsWith(seedPrefix)).ToList(); + var seededToDelete = seededCron.Where(c => !functions.Contains(c.Function)).Select(c => c.Id).ToArray(); + + foreach (var id in seededToDelete) + { + await RemoveCronIndexesAsync(id).ConfigureAwait(false); + await RemoveCronOccurrenceIndexesAsyncForCron(id).ConfigureAwait(false); + await _db.KeyDeleteAsync(CronKey(id)).ConfigureAwait(false); + } + + var existingByFunction = existingList.ToDictionary(c => c.Function, c => c, StringComparer.Ordinal); + foreach (var (function, expression) in cronTickers) + { + if (existingByFunction.TryGetValue(function, out var cron)) + { + if (!string.Equals(cron.Expression, expression, StringComparison.Ordinal)) + { + cron.Expression = expression; + cron.UpdatedAt = now; + await SetAsync(CronKey(cron.Id), cron).ConfigureAwait(false); + } + } + else + { + var entity = new TCronTicker + { + Id = Guid.NewGuid(), + Function = function, + Expression = expression, + InitIdentifier = $"{seedPrefix}{function}", + CreatedAt = now, + UpdatedAt = now, + Request = [] + }; + await SetAsync(CronKey(entity.Id), entity).ConfigureAwait(false); + await AddCronIndexesAsync(entity).ConfigureAwait(false); + } + } + } + + public async Task GetAllCronTickerExpressions(CancellationToken cancellationToken = default) + { + var ids = await _db.SetMembersAsync(CronIdsKey).ConfigureAwait(false); + var list = new List(); + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var cron = await GetAsync(CronKey(id)).ConfigureAwait(false); + if (cron == null) continue; + list.Add(new CronTickerEntity + { + Id = cron.Id, + Expression = cron.Expression, + Function = cron.Function, + RetryIntervals = cron.RetryIntervals, + Retries = cron.Retries + }); + } + return list.ToArray(); + } + + public async Task ReleaseDeadNodeTimeTickerResources(string instanceIdentifier, CancellationToken cancellationToken = default) + { + var ids = await _db.SetMembersAsync(TimeTickerIdsKey).ConfigureAwait(false); + var now = _clock.UtcNow; + + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker == null) continue; + + if (ticker.LockHolder == instanceIdentifier && ticker.Status == TickerStatus.InProgress) + { + ticker.Status = TickerStatus.Skipped; + ticker.SkippedReason = "Node is not alive!"; + ticker.ExecutedAt = now; + ticker.UpdatedAt = now; + } + else if (ticker.LockHolder == instanceIdentifier && (ticker.Status == TickerStatus.Idle || ticker.Status == TickerStatus.Queued)) + { + ticker.LockHolder = null; + ticker.LockedAt = null; + ticker.Status = TickerStatus.Idle; + ticker.UpdatedAt = now; + } + + await SetAsync(TimeTickerKey(id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + } + } + #endregion + + #region Cron_TickerOccurrence_Core_Methods + public async Task> GetEarliestAvailableCronOccurrence(Guid[] ids, CancellationToken cancellationToken = default) + { + if (ids == null || ids.Length == 0) return null; + + // Scan earliest pending occurrences and pick the first that belongs to provided ids + var candidates = await _db.SortedSetRangeByScoreAsync(CronOccurrencePendingKey, double.NegativeInfinity, double.PositiveInfinity, Exclude.None, Order.Ascending, 0, 200).ConfigureAwait(false); + foreach (var redisValue in candidates) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence == null) continue; + if (!ids.Contains(occurrence.CronTickerId)) continue; + if (!CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder)) continue; + return occurrence; + } + + return null; + } + + public async IAsyncEnumerable> QueueCronTickerOccurrences((DateTime Key, InternalManagerContext[] Items) cronTickerOccurrences, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + var executionTime = cronTickerOccurrences.Key; + + foreach (var item in cronTickerOccurrences.Items) + { + cancellationToken.ThrowIfCancellationRequested(); + if (item.NextCronOccurrence is null) + { + var occurrence = new CronTickerOccurrenceEntity + { + Id = Guid.NewGuid(), + Status = TickerStatus.Queued, + LockHolder = _lockHolder, + ExecutionTime = executionTime, + CronTickerId = item.Id, + LockedAt = now, + CreatedAt = now, + UpdatedAt = now, + CronTicker = new TCronTicker + { + Id = item.Id, + Function = item.FunctionName, + Expression = item.Expression, + Retries = item.Retries, + RetryIntervals = item.RetryIntervals + } + }; + + await SetAsync(CronOccurrenceKey(occurrence.Id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + yield return occurrence; + } + else + { + var existing = await GetAsync>(CronOccurrenceKey(item.NextCronOccurrence.Id)).ConfigureAwait(false); + if (existing == null) continue; + if (!CanAcquire(existing.Status, existing.LockHolder, _lockHolder)) continue; + + existing.LockHolder = _lockHolder; + existing.LockedAt = now; + existing.UpdatedAt = now; + existing.Status = TickerStatus.Queued; + existing.ExecutionTime = executionTime; + + if (existing.CronTicker == null) + { + existing.CronTicker = new TCronTicker + { + Id = item.Id, + Function = item.FunctionName, + Expression = item.Expression, + Retries = item.Retries, + RetryIntervals = item.RetryIntervals + }; + } + + await SetAsync(CronOccurrenceKey(existing.Id), existing).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(existing).ConfigureAwait(false); + yield return existing; + } + } + } + + public async IAsyncEnumerable> QueueTimedOutCronTickerOccurrences([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + var threshold = now.AddMilliseconds(-100); + + var dueIds = await _db.SortedSetRangeByScoreAsync(CronOccurrencePendingKey, double.NegativeInfinity, ToScore(threshold)).ConfigureAwait(false); + foreach (var redisValue in dueIds) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence == null) continue; + if (!CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder)) continue; + + occurrence.LockHolder = _lockHolder; + occurrence.LockedAt = now; + occurrence.UpdatedAt = now; + occurrence.Status = TickerStatus.InProgress; + + await SetAsync(CronOccurrenceKey(id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + + yield return occurrence; + } + } + + public async Task UpdateCronTickerOccurrence(InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + { + var occurrence = await GetAsync>(CronOccurrenceKey(functionContext.TickerId)).ConfigureAwait(false); + if (occurrence == null) return; + + ApplyFunctionContextToCronOccurrence(occurrence, functionContext); + await SetAsync(CronOccurrenceKey(occurrence.Id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + } + + public async Task ReleaseAcquiredCronTickerOccurrences(Guid[] occurrenceIds, CancellationToken cancellationToken = default) + { + var ids = occurrenceIds.Length == 0 + ? (await _db.SetMembersAsync(CronOccurrenceIdsKey).ConfigureAwait(false)).Select(x => Guid.Parse(x.ToString())).ToArray() + : occurrenceIds; + + var now = _clock.UtcNow; + foreach (var id in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence == null) continue; + if (!CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder)) continue; + + occurrence.LockHolder = null; + occurrence.LockedAt = null; + occurrence.Status = TickerStatus.Idle; + occurrence.UpdatedAt = now; + + await SetAsync(CronOccurrenceKey(id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + } + } + + public async Task GetCronTickerOccurrenceRequest(Guid tickerId, CancellationToken cancellationToken = default) + { + var occurrence = await GetAsync>(CronOccurrenceKey(tickerId)).ConfigureAwait(false); + return occurrence?.CronTicker?.Request; + } + + public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + { + foreach (var id in timeTickerIds) + { + cancellationToken.ThrowIfCancellationRequested(); + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence == null) continue; + ApplyFunctionContextToCronOccurrence(occurrence, functionContext); + await SetAsync(CronOccurrenceKey(id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + } + } + + public async Task ReleaseDeadNodeOccurrenceResources(string instanceIdentifier, CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + var ids = await _db.SetMembersAsync(CronOccurrenceIdsKey).ConfigureAwait(false); + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence == null) continue; + + if (occurrence.LockHolder == instanceIdentifier && occurrence.Status == TickerStatus.InProgress) + { + occurrence.Status = TickerStatus.Skipped; + occurrence.SkippedReason = "Node is not alive!"; + occurrence.ExecutedAt = now; + occurrence.UpdatedAt = now; + } + else if (occurrence.LockHolder == instanceIdentifier && (occurrence.Status == TickerStatus.Idle || occurrence.Status == TickerStatus.Queued)) + { + occurrence.LockHolder = null; + occurrence.LockedAt = null; + occurrence.Status = TickerStatus.Idle; + occurrence.UpdatedAt = now; + } + + await SetAsync(CronOccurrenceKey(id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + } + } + #endregion + + #region Time_Ticker_Shared_Methods + public async Task GetTimeTickerById(Guid id, CancellationToken cancellationToken = default) + { + return await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + } + + public async Task GetTimeTickers(Expression> predicate, CancellationToken cancellationToken) + { + var ids = await _db.SetMembersAsync(TimeTickerIdsKey).ConfigureAwait(false); + var list = new List(); + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var ticker = await GetAsync(TimeTickerKey(id)).ConfigureAwait(false); + if (ticker != null) list.Add(ticker); + } + + var query = list.Where(x => x.ParentId == null).AsQueryable(); + if (predicate != null) query = query.Where(predicate); + + return query.OrderByDescending(x => x.ExecutionTime).ToArray(); + } + + public async Task> GetTimeTickersPaginated(Expression> predicate, int pageNumber, int pageSize, CancellationToken cancellationToken = default) + { + var all = await GetTimeTickers(predicate, cancellationToken).ConfigureAwait(false); + var total = all.Length; + var items = all.Skip((pageNumber - 1) * pageSize).Take(pageSize).ToArray(); + return new PaginationResult(items, total, pageNumber, pageSize); + } + + public async Task AddTimeTickers(TTimeTicker[] tickers, CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + foreach (var ticker in tickers) + { + cancellationToken.ThrowIfCancellationRequested(); + ticker.CreatedAt = ticker.CreatedAt == default ? now : ticker.CreatedAt; + ticker.UpdatedAt = ticker.UpdatedAt == default ? now : ticker.UpdatedAt; + await SetAsync(TimeTickerKey(ticker.Id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + } + return tickers.Length; + } + + public async Task UpdateTimeTickers(TTimeTicker[] tickers, CancellationToken cancellationToken = default) + { + var now = _clock.UtcNow; + foreach (var ticker in tickers) + { + cancellationToken.ThrowIfCancellationRequested(); + ticker.UpdatedAt = now; + await SetAsync(TimeTickerKey(ticker.Id), ticker).ConfigureAwait(false); + await AddTimeTickerIndexesAsync(ticker).ConfigureAwait(false); + } + return tickers.Length; + } + + public async Task RemoveTimeTickers(Guid[] tickerIds, CancellationToken cancellationToken = default) + { + var count = 0; + foreach (var id in tickerIds) + { + cancellationToken.ThrowIfCancellationRequested(); + await RemoveTimeTickerIndexesAsync(id).ConfigureAwait(false); + if (await _db.KeyDeleteAsync(TimeTickerKey(id)).ConfigureAwait(false)) + count++; + } + return count; + } + #endregion + + #region Cron_Ticker_Shared_Methods + public async Task GetCronTickerById(Guid id, CancellationToken cancellationToken) + { + return await GetAsync(CronKey(id)).ConfigureAwait(false); + } + + public async Task GetCronTickers(Expression> predicate, CancellationToken cancellationToken) + { + var ids = await _db.SetMembersAsync(CronIdsKey).ConfigureAwait(false); + var list = new List(); + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var cron = await GetAsync(CronKey(id)).ConfigureAwait(false); + if (cron != null) list.Add(cron); + } + + var query = list.AsQueryable(); + if (predicate != null) query = query.Where(predicate); + + return query.OrderByDescending(x => x.CreatedAt).ToArray(); + } + + public async Task> GetCronTickersPaginated(Expression> predicate, int pageNumber, int pageSize, CancellationToken cancellationToken = default) + { + var all = await GetCronTickers(predicate, cancellationToken).ConfigureAwait(false); + var total = all.Length; + var items = all.Skip((pageNumber - 1) * pageSize).Take(pageSize).ToArray(); + return new PaginationResult(items, total, pageNumber, pageSize); + } + + public async Task InsertCronTickers(TCronTicker[] tickers, CancellationToken cancellationToken) + { + var now = _clock.UtcNow; + foreach (var ticker in tickers) + { + cancellationToken.ThrowIfCancellationRequested(); + ticker.CreatedAt = ticker.CreatedAt == default ? now : ticker.CreatedAt; + ticker.UpdatedAt = ticker.UpdatedAt == default ? now : ticker.UpdatedAt; + await SetAsync(CronKey(ticker.Id), ticker).ConfigureAwait(false); + await AddCronIndexesAsync(ticker).ConfigureAwait(false); + } + return tickers.Length; + } + + public async Task UpdateCronTickers(TCronTicker[] cronTicker, CancellationToken cancellationToken) + { + var now = _clock.UtcNow; + foreach (var ticker in cronTicker) + { + cancellationToken.ThrowIfCancellationRequested(); + ticker.UpdatedAt = now; + await SetAsync(CronKey(ticker.Id), ticker).ConfigureAwait(false); + await AddCronIndexesAsync(ticker).ConfigureAwait(false); + } + return cronTicker.Length; + } + + public async Task RemoveCronTickers(Guid[] cronTickerIds, CancellationToken cancellationToken) + { + var removed = 0; + foreach (var id in cronTickerIds) + { + cancellationToken.ThrowIfCancellationRequested(); + await RemoveCronIndexesAsync(id).ConfigureAwait(false); + await RemoveCronOccurrenceIndexesAsyncForCron(id).ConfigureAwait(false); + if (await _db.KeyDeleteAsync(CronKey(id)).ConfigureAwait(false)) + removed++; + } + return removed; + } + #endregion + + #region Cron_TickerOccurrence_Shared_Methods + public async Task[]> GetAllCronTickerOccurrences(Expression, bool>> predicate, CancellationToken cancellationToken = default) + { + var ids = await _db.SetMembersAsync(CronOccurrenceIdsKey).ConfigureAwait(false); + var list = new List>(); + foreach (var redisValue in ids) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!Guid.TryParse(redisValue.ToString(), out var id)) continue; + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence != null) list.Add(occurrence); + } + + var query = list.AsQueryable(); + if (predicate != null) query = query.Where(predicate); + return query.OrderByDescending(x => x.ExecutionTime).ToArray(); + } + + public async Task>> GetAllCronTickerOccurrencesPaginated(Expression, bool>> predicate, int pageNumber, int pageSize, CancellationToken cancellationToken = default) + { + var all = await GetAllCronTickerOccurrences(predicate, cancellationToken).ConfigureAwait(false); + var total = all.Length; + var items = all.Skip((pageNumber - 1) * pageSize).Take(pageSize).ToArray(); + return new PaginationResult>(items, total, pageNumber, pageSize); + } + + public async Task InsertCronTickerOccurrences(CronTickerOccurrenceEntity[] cronTickerOccurrences, CancellationToken cancellationToken) + { + foreach (var occurrence in cronTickerOccurrences) + { + cancellationToken.ThrowIfCancellationRequested(); + await SetAsync(CronOccurrenceKey(occurrence.Id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + } + return cronTickerOccurrences.Length; + } + + public async Task RemoveCronTickerOccurrences(Guid[] cronTickerOccurrences, CancellationToken cancellationToken) + { + var removed = 0; + foreach (var id in cronTickerOccurrences) + { + cancellationToken.ThrowIfCancellationRequested(); + await RemoveCronOccurrenceIndexesAsync(id).ConfigureAwait(false); + if (await _db.KeyDeleteAsync(CronOccurrenceKey(id)).ConfigureAwait(false)) + removed++; + } + return removed; + } + + public async Task[]> AcquireImmediateCronOccurrencesAsync(Guid[] occurrenceIds, CancellationToken cancellationToken = default) + { + if (occurrenceIds == null || occurrenceIds.Length == 0) + return Array.Empty>(); + + var now = _clock.UtcNow; + var acquired = new List>(); + foreach (var id in occurrenceIds) + { + cancellationToken.ThrowIfCancellationRequested(); + var occurrence = await GetAsync>(CronOccurrenceKey(id)).ConfigureAwait(false); + if (occurrence == null) continue; + if (!CanAcquire(occurrence.Status, occurrence.LockHolder, _lockHolder)) continue; + + occurrence.LockHolder = _lockHolder; + occurrence.LockedAt = now; + occurrence.Status = TickerStatus.InProgress; + occurrence.UpdatedAt = now; + + await SetAsync(CronOccurrenceKey(id), occurrence).ConfigureAwait(false); + await AddCronOccurrenceIndexesAsync(occurrence).ConfigureAwait(false); + + acquired.Add(occurrence); + } + + return acquired.ToArray(); + } + #endregion + + #region Apply FunctionContext + private void ApplyFunctionContextToTicker(TTimeTicker ticker, InternalFunctionContext context) + { + var propsToUpdate = context.GetPropsToUpdate(); + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.Status)) && + context.Status != TickerStatus.Skipped) + { + ticker.Status = context.Status; + } + else if (propsToUpdate.Contains(nameof(InternalFunctionContext.Status))) + { + ticker.Status = context.Status; + ticker.SkippedReason = context.ExceptionDetails; + } + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ExecutedAt))) + ticker.ExecutedAt = context.ExecutedAt; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ExceptionDetails)) && + context.Status != TickerStatus.Skipped) + ticker.ExceptionMessage = context.ExceptionDetails; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ElapsedTime))) + ticker.ElapsedTime = context.ElapsedTime; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.RetryCount))) + ticker.RetryCount = context.RetryCount; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ReleaseLock))) + { + ticker.LockHolder = null; + ticker.LockedAt = null; + } + + ticker.UpdatedAt = _clock.UtcNow; + } + + private void ApplyFunctionContextToCronOccurrence(CronTickerOccurrenceEntity occurrence, InternalFunctionContext context) + { + var propsToUpdate = context.GetPropsToUpdate(); + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.Status)) && + context.Status != TickerStatus.Skipped) + { + occurrence.Status = context.Status; + } + else if (propsToUpdate.Contains(nameof(InternalFunctionContext.Status))) + { + occurrence.Status = context.Status; + occurrence.SkippedReason = context.ExceptionDetails; + } + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ExecutedAt))) + occurrence.ExecutedAt = context.ExecutedAt; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ExceptionDetails)) && + context.Status != TickerStatus.Skipped) + occurrence.ExceptionMessage = context.ExceptionDetails; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ElapsedTime))) + occurrence.ElapsedTime = context.ElapsedTime; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.RetryCount))) + occurrence.RetryCount = context.RetryCount; + + if (propsToUpdate.Contains(nameof(InternalFunctionContext.ReleaseLock))) + { + occurrence.LockHolder = null; + occurrence.LockedAt = null; + } + + occurrence.UpdatedAt = _clock.UtcNow; + } + #endregion + + #region Helpers for cascade cleanup + private async Task RemoveCronOccurrenceIndexesAsyncForCron(Guid cronId) + { + var ids = await _db.SetMembersAsync(CronOccurrenceIdsKey).ConfigureAwait(false); + foreach (var redisValue in ids) + { + if (!Guid.TryParse(redisValue.ToString(), out var occId)) continue; + var occurrence = await GetAsync>(CronOccurrenceKey(occId)).ConfigureAwait(false); + if (occurrence == null || occurrence.CronTickerId != cronId) continue; + await RemoveCronOccurrenceIndexesAsync(occId).ConfigureAwait(false); + await _db.KeyDeleteAsync(CronOccurrenceKey(occId)).ConfigureAwait(false); + } + } + #endregion +} diff --git a/src/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs b/src/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs new file mode 100644 index 00000000..fbb447b1 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs @@ -0,0 +1,33 @@ +namespace TickerQ.RemoteExecutor.Models; + +public sealed class RegisteredFunctionsResponse +{ + public string ApplicationId { get; set; } = string.Empty; + public string EnvironmentId { get; set; } = string.Empty; + public string EnvironmentName { get; set; } = string.Empty; + public string WebhookSignature { get; set; } = string.Empty; + public List Nodes { get; set; } = new(); +} + +public sealed class Node +{ + public string Id { get; set; } = string.Empty; + public string NodeName { get; set; } = string.Empty; + public string CallbackUrl { get; set; } = string.Empty; + public bool IsSynced { get; set; } + public bool AutoSync { get; set; } + public bool AutoMigrateExpressions { get; set; } + public DateTime? LastSyncedAt { get; set; } + public List Functions { get; set; } = new(); +} + +public sealed class Function +{ + public string Id { get; set; } = string.Empty; + public string FunctionName { get; set; } = string.Empty; + public string RequestType { get; set; } = string.Empty; + public string? CronExpression { get; set; } + public int TaskPriority { get; set; } + public DateTime? AppliedAt { get; set; } + public bool IsActive { get; set; } +} diff --git a/src/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs b/src/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs new file mode 100644 index 00000000..a7b135eb --- /dev/null +++ b/src/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs @@ -0,0 +1,12 @@ +using TickerQ.Utilities.Enums; + +namespace TickerQ.RemoteExecutor.Models; + +public sealed class RemoteTickerFunctionDescriptor +{ + public string Name { get; set; } = string.Empty; + public string CronExpression { get; set; } = string.Empty; + public string Callback { get; set; } = string.Empty; + public TickerTaskPriority Priority { get; set; } + public bool IsActive { get; set; } = true; +} diff --git a/src/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs b/src/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs new file mode 100644 index 00000000..3ddf9fe8 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs @@ -0,0 +1,514 @@ +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Globalization; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; +using TickerQ.RemoteExecutor.Models; +using TickerQ.Utilities; +using TickerQ.Utilities.Entities; +using TickerQ.Utilities.Interfaces; +using TickerQ.Utilities.Interfaces.Managers; +using TickerQ.Utilities.Models; + +namespace TickerQ.RemoteExecutor; + +/// +/// Extension methods for mapping HTTP endpoints that the TickerQ SDK uses. +/// These endpoints translate incoming HTTP calls into calls on TickerQ's +/// persistence layer and internal managers. +/// +public static class RemoteExecutionEndpoints +{ + /// + /// Maps all TickerQ remote execution endpoints using the default entity types. + /// + public static IEndpointRouteBuilder MapTickerQRemoteExecutionEndpoints( + this IEndpointRouteBuilder endpoints, string prefix = "") + { + return endpoints.MapTickerQRemoteExecutionEndpoints(prefix); + } + + /// + /// Maps all TickerQ remote execution endpoints for the specified ticker types. + /// + public static IEndpointRouteBuilder MapTickerQRemoteExecutionEndpoints( + this IEndpointRouteBuilder endpoints, string prefix = "") + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() + { + if (endpoints == null) throw new ArgumentNullException(nameof(endpoints)); + + // Base group; the host can apply any path prefix by mapping this group under a route. + var group = endpoints.MapGroup(prefix); + + group.MapPost("webhooks/hub", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + RemoteFunctionsSyncService syncService, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + _ = syncService.SyncOnceAsync(CancellationToken.None); + return Results.Ok(); + }); + + group.MapPost("webhooks/hub/remove-function", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var payload = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (payload == null || string.IsNullOrWhiteSpace(payload.FunctionName)) + return Results.BadRequest("FunctionName is required."); + + var removed = RemoveFunctionByName(payload.FunctionName); + return removed ? Results.Ok() : Results.NotFound(); + }); + + MapFunctionRegistration(group); + MapTimeTickerEndpoints(group); + MapCronTickerEndpoints(group); + MapCronOccurrenceEndpoints(group); + + return endpoints; + } + + private static void MapFunctionRegistration(IEndpointRouteBuilder group) + { + group.MapPost("functions/register", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + IInternalTickerManager internalTickerManager, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var newFunctions = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (newFunctions == null || newFunctions.Length == 0) + return Results.Ok(); + + var cronPairs = newFunctions + .Where(f => f.IsActive && !string.IsNullOrWhiteSpace(f.CronExpression)) + .Select(f => (f.Name, f.CronExpression)) + .ToArray(); + + var functionDict = TickerFunctionProvider.TickerFunctions.ToDictionary(); + + foreach (var newFunction in newFunctions) + { + // Handle inactive functions by removing them + if (!newFunction.IsActive) + { + functionDict.Remove(newFunction.Name); + continue; + } + + // Capture callback URL to avoid closure issues + var callbackUrl = newFunction.Callback.TrimEnd('/'); + + var newFunctionDelegate = new TickerFunctionDelegate(async (ct, serviceProvider, context) => + { + var httpClientFactory = serviceProvider.GetRequiredService(); + var remoteOptions = serviceProvider.GetRequiredService(); + var httpClient = httpClientFactory.CreateClient("tickerq-callback"); + + // Build a minimal payload describing the execution + var payload = new + { + context.Id, + context.FunctionName, + context.Type, + context.RetryCount, + context.ScheduledFor + }; + + // Serialize payload for signature computation + var json = JsonSerializer.Serialize(payload); + var bodyBytes = Encoding.UTF8.GetBytes(json); + + // Build request with HMAC signature + var uri = new Uri($"{callbackUrl}/execute"); + var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString(CultureInfo.InvariantCulture); + var signature = ComputeCallbackSignature( + remoteOptions.WebHookSignature, + HttpMethod.Post.Method, + uri.PathAndQuery, + timestamp, + bodyBytes); + + using var request = new HttpRequestMessage(HttpMethod.Post, uri); + request.Content = new StringContent(json, Encoding.UTF8, "application/json"); + request.Headers.Add("X-TickerQ-Signature", signature); + request.Headers.Add("X-Timestamp", timestamp); + + using var response = await httpClient.SendAsync(request, ct); + response.EnsureSuccessStatusCode(); + }); + + functionDict.TryAdd(newFunction.Name, (newFunction.CronExpression, newFunction.Priority, newFunctionDelegate)); + } + + TickerFunctionProvider.RegisterFunctions(functionDict); + TickerFunctionProvider.Build(); + + if (cronPairs.Length > 0) + await internalTickerManager.MigrateDefinedCronTickers(cronPairs, cancellationToken) + .ConfigureAwait(false); + + return Results.Ok(); + }); + } + + private static void MapTimeTickerEndpoints(IEndpointRouteBuilder group) + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() + { + group.MapPost("time-tickers", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var tickers = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (tickers == null) + return Results.BadRequest("Invalid payload"); + + var affected = await provider.AddTimeTickers(tickers, cancellationToken).ConfigureAwait(false); + return Results.Ok(affected); + }); + + group.MapGet("time-tickers/request/{id:guid}", + async (Guid id, + HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var requestBytes = await provider.GetTimeTickerRequest(id, cancellationToken).ConfigureAwait(false); + if (requestBytes == null || requestBytes.Length == 0) + return Results.Bytes(Array.Empty(), "application/octet-stream"); + + return Results.Bytes(requestBytes, "application/octet-stream"); + }); + + group.MapPut("time-tickers", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var tickers = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (tickers == null) + return Results.BadRequest("Invalid payload"); + + var affected = await provider.UpdateTimeTickers(tickers, cancellationToken).ConfigureAwait(false); + return Results.Ok(affected); + }); + + group.MapPost("time-tickers/delete", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var ids = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (ids == null) + return Results.BadRequest("Invalid payload"); + + var affected = await provider.RemoveTimeTickers(ids, cancellationToken).ConfigureAwait(false); + return Results.Ok(affected); + }); + + group.MapPut("time-tickers/context", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + IInternalTickerManager internalTickerManager, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var context = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (context == null) + return Results.BadRequest("Invalid payload"); + + // Let InternalTickerManager route to the correct persistence methods and handle notifications. + await internalTickerManager.UpdateTickerAsync(context, cancellationToken).ConfigureAwait(false); + return Results.Ok(1); + }); + + group.MapPost("time-tickers/unified-context", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var payload = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (payload?.Ids == null || payload.Context == null) + return Results.BadRequest("Ids and context are required."); + + await provider.UpdateTimeTickersWithUnifiedContext(payload.Ids, payload.Context, cancellationToken) + .ConfigureAwait(false); + return Results.Ok(); + }); + } + + private static void MapCronTickerEndpoints(IEndpointRouteBuilder group) + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() + { + group.MapPost("cron-tickers", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var tickers = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (tickers == null) + return Results.BadRequest("Invalid payload"); + + var affected = await provider.InsertCronTickers(tickers, cancellationToken).ConfigureAwait(false); + return Results.Ok(affected); + }); + + group.MapPut("cron-tickers", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var tickers = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (tickers == null) + return Results.BadRequest("Invalid payload"); + + var affected = await provider.UpdateCronTickers(tickers, cancellationToken).ConfigureAwait(false); + return Results.Ok(affected); + }); + + group.MapPost("cron-tickers/delete", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var ids = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (ids == null) + return Results.BadRequest("Invalid payload"); + + var affected = await provider.RemoveCronTickers(ids, cancellationToken).ConfigureAwait(false); + return Results.Ok(affected); + }); + } + + private static void MapCronOccurrenceEndpoints(IEndpointRouteBuilder group) + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() + { + group.MapPut("cron-ticker-occurrences/context", + async (HttpRequest request, + TickerQRemoteExecutionOptions options, + IInternalTickerManager internalTickerManager, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var context = await request.ReadFromJsonAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (context == null) + return Results.BadRequest("Invalid payload"); + + // Same as time tickers: delegate to InternalTickerManager. + await internalTickerManager.UpdateTickerAsync(context, cancellationToken).ConfigureAwait(false); + return Results.Ok(); + }); + + group.MapGet("cron-ticker-occurrences/request/{id:guid}", + async (Guid id, + HttpRequest request, + TickerQRemoteExecutionOptions options, + ITickerPersistenceProvider provider, + CancellationToken cancellationToken) => + { + var authResult = await ValidateSignatureAsync(request, options, cancellationToken).ConfigureAwait(false); + if (authResult != null) + return authResult; + + var requestBytes = await provider.GetCronTickerOccurrenceRequest(id, cancellationToken).ConfigureAwait(false); + if (requestBytes == null || requestBytes.Length == 0) + return Results.Bytes(Array.Empty(), "application/octet-stream"); + + return Results.Bytes(requestBytes, "application/octet-stream"); + }); + } + + private static async Task ValidateSignatureAsync( + HttpRequest request, + TickerQRemoteExecutionOptions options, + CancellationToken cancellationToken) + { + const long maxSkewSeconds = 300; + + if (string.IsNullOrWhiteSpace(options.WebHookSignature)) + return null; + + var bodyBytes = await ReadBodyBytesAsync(request, cancellationToken).ConfigureAwait(false); + + if (!request.Headers.TryGetValue("X-TickerQ-Signature", out var sig)) + return Results.Unauthorized(); + + if (!request.Headers.TryGetValue("X-Timestamp", out var timestampHeader)) + return Results.Unauthorized(); + + var timestamp = timestampHeader.Count > 0 ? timestampHeader[0] : string.Empty; + if (!long.TryParse(timestamp, NumberStyles.Integer, CultureInfo.InvariantCulture, out var ts)) + return Results.Unauthorized(); + + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (Math.Abs(now - ts) > maxSkewSeconds) + return Results.Unauthorized(); + + byte[] received; + try + { + received = Convert.FromBase64String(sig.ToString()); + } + catch (FormatException) + { + return Results.Unauthorized(); + } + + var pathAndQuery = $"{request.Path}{request.QueryString}"; + var header = $"{request.Method}\n{pathAndQuery}\n{timestamp}\n"; + var headerBytes = Encoding.UTF8.GetBytes(header); + var payload = new byte[headerBytes.Length + bodyBytes.Length]; + Buffer.BlockCopy(headerBytes, 0, payload, 0, headerBytes.Length); + Buffer.BlockCopy(bodyBytes, 0, payload, headerBytes.Length, bodyBytes.Length); + + var key = Encoding.UTF8.GetBytes(options.WebHookSignature); + var expected = HMACSHA256.HashData(key, payload); + + if (expected.Length != received.Length || + !CryptographicOperations.FixedTimeEquals(expected, received)) + { + return Results.Unauthorized(); + } + + return null; + } + + private static async Task ReadBodyBytesAsync(HttpRequest request, CancellationToken cancellationToken) + { + request.EnableBuffering(); + if (request.Body.CanSeek) + request.Body.Position = 0; + + await using var ms = new MemoryStream(); + await request.Body.CopyToAsync(ms, cancellationToken).ConfigureAwait(false); + + if (request.Body.CanSeek) + request.Body.Position = 0; + + return ms.ToArray(); + } + + private static bool RemoveFunctionByName(string functionName) + { + var functionDict = TickerFunctionProvider.TickerFunctions.ToDictionary(); + if (!functionDict.Remove(functionName)) + return false; + + TickerFunctionProvider.RegisterFunctions(functionDict); + TickerFunctionProvider.Build(); + return true; + } + + private sealed class TimeTickerUnifiedContextRequest + { + public Guid[] Ids { get; set; } = []; + public InternalFunctionContext Context { get; set; } + } + + private sealed class RemoveFunctionPayload + { + public string FunctionName { get; set; } = string.Empty; + } + + private static string ComputeCallbackSignature( + string secret, + string method, + string pathAndQuery, + string timestamp, + byte[] bodyBytes) + { + if (string.IsNullOrWhiteSpace(secret)) + return string.Empty; + + var header = $"{method}\n{pathAndQuery}\n{timestamp}\n"; + var headerBytes = Encoding.UTF8.GetBytes(header); + var payload = new byte[headerBytes.Length + bodyBytes.Length]; + Buffer.BlockCopy(headerBytes, 0, payload, 0, headerBytes.Length); + Buffer.BlockCopy(bodyBytes, 0, payload, headerBytes.Length, bodyBytes.Length); + + var secretKey = Encoding.UTF8.GetBytes(secret); + var signatureBytes = HMACSHA256.HashData(secretKey, payload); + return Convert.ToBase64String(signatureBytes); + } +} diff --git a/src/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs b/src/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs new file mode 100644 index 00000000..ede161a5 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs @@ -0,0 +1,42 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using TickerQ.Utilities; +using TickerQ.Utilities.Entities; +using TickerQ.Utilities.Interfaces; + +namespace TickerQ.RemoteExecutor; + +public static class RemoteExecutionServiceExtension +{ + public static TickerOptionsBuilder AddTickerRemoteExecutor( + this TickerOptionsBuilder tickerConfiguration, Action optionsAction) + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() + { + var tickerqRemoteExecutionOptions = new TickerQRemoteExecutionOptions(); + + optionsAction(tickerqRemoteExecutionOptions); + tickerqRemoteExecutionOptions.Validate(); + + tickerConfiguration.ExternalProviderConfigServiceAction += services => + { + services.AddHttpClient("tickerq-hub", cfg => + { + cfg.BaseAddress = new Uri(tickerqRemoteExecutionOptions.HubEndpointUrl); + cfg.DefaultRequestHeaders.Add("X-Api-Key", tickerqRemoteExecutionOptions.ApiKey); + cfg.DefaultRequestHeaders.Add("X-Api-Secret", tickerqRemoteExecutionOptions.ApiSecret); + }); + services.AddHttpClient("tickerq-callback"); + services.AddSingleton(); + + // Register options as singleton so background service can access it + services.AddSingleton(tickerqRemoteExecutionOptions); + + // Register background service to sync remote functions (also injectable for webhooks) + services.AddSingleton(); + services.AddHostedService(sp => sp.GetRequiredService()); + }; + + return tickerConfiguration; + } +} diff --git a/src/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs b/src/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs new file mode 100644 index 00000000..b1fa22ae --- /dev/null +++ b/src/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs @@ -0,0 +1,268 @@ +using System.Net.Http.Json; +using System.Text; +using System.Text.Json; +using System.Globalization; +using System.Security.Cryptography; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using TickerQ.RemoteExecutor.Models; +using TickerQ.Utilities; +using TickerQ.Utilities.Enums; +using TickerQ.Utilities.Interfaces.Managers; + +namespace TickerQ.RemoteExecutor; + +public class RemoteFunctionsSyncService : BackgroundService +{ + private readonly IHttpClientFactory _httpClientFactory; + private readonly TickerQRemoteExecutionOptions _options; + private readonly IInternalTickerManager? _internalTickerManager; + private readonly ILogger? _logger; + private readonly IServiceProvider _serviceProvider; + + public RemoteFunctionsSyncService( + IHttpClientFactory httpClientFactory, + TickerQRemoteExecutionOptions options, + IServiceProvider serviceProvider, + ILogger? logger = null) + { + _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + _logger = logger; + _internalTickerManager = _serviceProvider.GetService(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await SyncOnceAsync(stoppingToken); + } + + public async Task SyncOnceAsync(CancellationToken stoppingToken) + { + // Run once on startup or on demand + try + { + if (string.IsNullOrWhiteSpace(_options.HubEndpointUrl)) + { + _logger?.LogWarning("FunctionsEndpointUrl is not configured. Skipping remote functions sync."); + return; + } + + _logger?.LogInformation("Starting remote functions sync from {EndpointUrl}", _options.HubEndpointUrl); + + var httpClient = _httpClientFactory.CreateClient("tickerq-hub"); + using var httpResponse = + await httpClient.GetAsync($"{_options.HubEndpointUrl}api/apps/sync/nodes-functions", + stoppingToken); + + if (!httpResponse.IsSuccessStatusCode) + { + var errorContent = await httpResponse.Content.ReadAsStringAsync(stoppingToken); + _logger?.LogError( + "Failed to fetch functions from {EndpointUrl}. Status: {StatusCode}, Response: {Response}", + _options.HubEndpointUrl, + httpResponse.StatusCode, + errorContent); + return; + } + + var responseContent = await httpResponse.Content.ReadAsStringAsync(stoppingToken); + if (string.IsNullOrWhiteSpace(responseContent)) + { + _logger?.LogWarning("Received empty response from functions endpoint"); + return; + } + + var contentType = httpResponse.Content.Headers.ContentType?.MediaType ?? string.Empty; + if (!contentType.Contains("json", StringComparison.OrdinalIgnoreCase)) + { + _logger?.LogWarning( + "Received non-JSON content type from {EndpointUrl}. Content-Type: {ContentType}. Attempting to parse anyway.", + _options.HubEndpointUrl, + contentType); + } + + RegisteredFunctionsResponse? response; + try + { + response = JsonSerializer.Deserialize( + responseContent, + new JsonSerializerOptions { PropertyNameCaseInsensitive = true }); + } + catch (JsonException ex) + { + _logger?.LogError(ex, "Failed to deserialize functions response"); + return; + } + + if (response == null) + { + _logger?.LogWarning("Received null response from functions endpoint"); + return; + } + _options.WebHookSignature = response.WebhookSignature; + await RegisterFunctionsFromResponse(response, stoppingToken); + + _logger?.LogInformation("Remote functions sync completed successfully"); + } + catch (HttpRequestException ex) + { + // Transient network failure - log and continue without functions + _logger?.LogError(ex, "Network error during remote functions sync. Status: {StatusCode}. The service will continue without remote functions.", + ex.StatusCode); + } + catch (TaskCanceledException ex) when (!stoppingToken.IsCancellationRequested) + { + // Request timeout - log and continue + _logger?.LogError(ex, "Timeout during remote functions sync. The service will continue without remote functions."); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Application shutting down - normal cancellation + _logger?.LogInformation("Remote functions sync cancelled due to application shutdown."); + } + // Note: Other exceptions (ArgumentException, NullReferenceException, etc.) are NOT caught + // and will propagate - this is intentional to fail fast on programming errors + } + + private async Task RegisterFunctionsFromResponse(RegisteredFunctionsResponse response, CancellationToken cancellationToken) + { + if (response.Nodes.Count == 0) + { + _logger?.LogInformation("No nodes found in response"); + return; + } + + var functionDict = TickerFunctionProvider.TickerFunctions.ToDictionary(); + var cronPairs = new List<(string Name, string CronExpression)>(); + + foreach (var node in response.Nodes) + { + if (string.IsNullOrWhiteSpace(node.CallbackUrl)) + { + _logger?.LogWarning("Node {NodeName} has no callback URL, skipping", node.NodeName); + continue; + } + + if (node.Functions.Count == 0) + { + _logger?.LogInformation("Node {NodeName} has no functions", node.NodeName); + continue; + } + + foreach (var function in node.Functions) + { + if (string.IsNullOrWhiteSpace(function.FunctionName)) + { + _logger?.LogWarning("Function has no name, skipping"); + continue; + } + + if (!function.IsActive) + { + if (functionDict.Remove(function.FunctionName)) + _logger?.LogDebug("Removed inactive function {FunctionName}", function.FunctionName); + else + _logger?.LogDebug("Skipping inactive function {FunctionName}", function.FunctionName); + continue; + } + + // Capture callbackUrl in local variable to avoid closure issues + var callbackUrl = node.CallbackUrl.TrimEnd('/'); + + // Create function delegate similar to MapFunctionRegistration + var functionDelegate = new TickerFunctionDelegate(async (ct, serviceProvider, context) => + { + var httpClientFactory = serviceProvider.GetRequiredService(); + var httpClient = httpClientFactory.CreateClient("tickerq-callback"); + + var payload = new + { + context.Id, + context.FunctionName, + context.Type, + context.RetryCount, + context.ScheduledFor + }; + + // 1. Serialize ONCE + var json = JsonSerializer.Serialize(payload); + var bodyBytes = Encoding.UTF8.GetBytes(json); + + // 2. Build request + signature + var uri = new Uri($"{callbackUrl}/execute"); + var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString(CultureInfo.InvariantCulture); + var signature = ComputeSignature( + response.WebhookSignature, + HttpMethod.Post.Method, + uri.PathAndQuery, + timestamp, + bodyBytes); + + // 3. Build request manually + using var request = new HttpRequestMessage(HttpMethod.Post, uri); + request.Content = new StringContent(json, Encoding.UTF8, "application/json"); + + request.Headers.Add("X-TickerQ-Signature", signature); + request.Headers.Add("X-Timestamp", timestamp); + + using var responseCallback = await httpClient.SendAsync(request, ct); + responseCallback.EnsureSuccessStatusCode(); + }); + + // Convert int priority to TickerTaskPriority enum + var priority = (TickerTaskPriority)function.TaskPriority; + + // Use cronExpression if available + var cronExpression = function.CronExpression ?? string.Empty; + + functionDict[function.FunctionName] = (cronExpression, priority, functionDelegate); + + if (!string.IsNullOrWhiteSpace(cronExpression)) + { + cronPairs.Add((function.FunctionName, cronExpression)); + } + + _logger?.LogDebug("Registered function {FunctionName} from node {NodeName}", + function.FunctionName, node.NodeName); + } + } + + if (functionDict.Count > 0) + TickerFunctionProvider.RegisterFunctions(functionDict); + + TickerFunctionProvider.Build(); + _logger?.LogInformation("Registered {Count} functions", functionDict.Count); + + // Migrate cron tickers if we have cron expressions and the manager is available + if (cronPairs.Count > 0 && _internalTickerManager != null) + { + await _internalTickerManager.MigrateDefinedCronTickers( + cronPairs.ToArray(), + cancellationToken) + .ConfigureAwait(false); + + _logger?.LogInformation("Migrated {Count} cron tickers", cronPairs.Count); + } + } + + private static string ComputeSignature( + string secret, + string method, + string pathAndQuery, + string timestamp, + byte[] bodyBytes) + { + var header = $"{method}\n{pathAndQuery}\n{timestamp}\n"; + var headerBytes = Encoding.UTF8.GetBytes(header); + var payload = new byte[headerBytes.Length + bodyBytes.Length]; + Buffer.BlockCopy(headerBytes, 0, payload, 0, headerBytes.Length); + Buffer.BlockCopy(bodyBytes, 0, payload, headerBytes.Length, bodyBytes.Length); + + var secretKey = Encoding.UTF8.GetBytes(secret); + var signatureBytes = HMACSHA256.HashData(secretKey, payload); + return Convert.ToBase64String(signatureBytes); + } +} diff --git a/src/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj b/src/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj new file mode 100644 index 00000000..39308726 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj @@ -0,0 +1,16 @@ + + + + enable + enable + TickerQ.RemoteExecutor + TickerQ Remote Executor + Remote execution integration for TickerQ, enabling hub registration and remote task dispatch. + $(PackageTags);remote;executor;hub;dispatch;callback + + + + + + + diff --git a/src/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs b/src/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs new file mode 100644 index 00000000..3a1eaf26 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs @@ -0,0 +1,43 @@ +namespace TickerQ.RemoteExecutor; + +public class TickerQRemoteExecutionOptions +{ + internal string? ApiKey { get; set; } + internal string? ApiSecret { get; set; } + + /// + /// The Hub endpoint URL. Fixed to the TickerQ Hub service and cannot be changed. + /// + internal string HubEndpointUrl { get; } = TickerQRemoteExecutorConstants.HubBaseUrl; + + internal string? WebHookSignature { get; set; } + + public void SetApiKey(string apiKey) + { + ApiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey)); + } + + public void SetApiSecret(string apiSecret) + { + ApiSecret = apiSecret ?? throw new ArgumentNullException(nameof(apiSecret)); + } + + /// + /// Validates that all required configuration options are set. + /// + /// Thrown when required options are missing. + internal void Validate() + { + var errors = new List(); + + if (string.IsNullOrWhiteSpace(ApiKey)) + errors.Add("ApiKey is required. Call SetApiKey() to configure."); + + if (string.IsNullOrWhiteSpace(ApiSecret)) + errors.Add("ApiSecret is required. Call SetApiSecret() to configure."); + + if (errors.Count > 0) + throw new InvalidOperationException( + $"TickerQ RemoteExecutor configuration is invalid:\n- {string.Join("\n- ", errors)}"); + } +} \ No newline at end of file diff --git a/src/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs b/src/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs new file mode 100644 index 00000000..9c75b3b9 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs @@ -0,0 +1,18 @@ +namespace TickerQ.RemoteExecutor; + +/// +/// Constants used by the TickerQ RemoteExecutor. +/// +public static class TickerQRemoteExecutorConstants +{ + /// + /// The base URL of the TickerQ Hub service. + /// This is a fixed endpoint and cannot be configured by users. + /// + public const string HubBaseUrl = "https://hub.tickerq.net/"; + + /// + /// The Hub hostname used for request routing. + /// + public const string HubHostname = "hub.tickerq.net"; +} diff --git a/src/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs b/src/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs new file mode 100644 index 00000000..14e2e240 --- /dev/null +++ b/src/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs @@ -0,0 +1,112 @@ +using System.Security.Cryptography; +using System.Text; +using System.Globalization; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; + +namespace TickerQ.RemoteExecutor; + +public sealed class TickerQRemoteSignatureFilter : IEndpointFilter +{ + private const long MaxTimestampSkewSeconds = 300; + + private readonly TickerQRemoteExecutionOptions _options; + private readonly ILogger? _logger; + + public TickerQRemoteSignatureFilter(TickerQRemoteExecutionOptions options, ILogger? logger = null) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger; + } + + public async ValueTask InvokeAsync( + EndpointFilterInvocationContext ctx, + EndpointFilterDelegate next) + { + var request = ctx.HttpContext.Request; + + // If signature not configured, skip validation (but log warning) + if (string.IsNullOrWhiteSpace(_options.WebHookSignature)) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation skipped: WebHookSignature not configured for {Method} {Path}", + request.Method, request.Path); + return await next(ctx); + } + + // Validate required headers first (fail fast) + if (!request.Headers.TryGetValue("X-TickerQ-Signature", out var sig) || string.IsNullOrWhiteSpace(sig)) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation failed: Missing X-TickerQ-Signature header for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + if (!request.Headers.TryGetValue("X-Timestamp", out var timestampHeader)) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation failed: Missing X-Timestamp header for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + var timestamp = timestampHeader.Count > 0 ? timestampHeader[0] : string.Empty; + if (!long.TryParse(timestamp, NumberStyles.Integer, CultureInfo.InvariantCulture, out var ts)) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation failed: Invalid timestamp format for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (Math.Abs(now - ts) > MaxTimestampSkewSeconds) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation failed: Timestamp skew too large ({SkewSeconds}s) for {Method} {Path}", + Math.Abs(now - ts), request.Method, request.Path); + return Results.Unauthorized(); + } + + // Parse signature with error handling + byte[] received; + try + { + received = Convert.FromBase64String(sig.ToString()); + } + catch (FormatException) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation failed: Invalid Base64 signature for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + // Enable buffering and read body + request.EnableBuffering(); + + byte[] bodyBytes; + await using (var ms = new MemoryStream()) + { + await request.Body.CopyToAsync(ms, ctx.HttpContext.RequestAborted); + bodyBytes = ms.ToArray(); + request.Body.Position = 0; + } + + // Compute expected signature + var pathAndQuery = $"{request.Path}{request.QueryString}"; + var header = $"{request.Method}\n{pathAndQuery}\n{timestamp}\n"; + var headerBytes = Encoding.UTF8.GetBytes(header); + var payload = new byte[headerBytes.Length + bodyBytes.Length]; + Buffer.BlockCopy(headerBytes, 0, payload, 0, headerBytes.Length); + Buffer.BlockCopy(bodyBytes, 0, payload, headerBytes.Length, bodyBytes.Length); + + var key = Encoding.UTF8.GetBytes(_options.WebHookSignature); + var expected = HMACSHA256.HashData(key, payload); + + if (expected.Length != received.Length || + !CryptographicOperations.FixedTimeEquals(expected, received)) + { + _logger?.LogWarning("TickerQ RemoteExecutor signature validation failed: Signature mismatch for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + return await next(ctx); + } +} diff --git a/src/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs b/src/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs new file mode 100644 index 00000000..f3ffb51f --- /dev/null +++ b/src/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs @@ -0,0 +1,40 @@ +using Microsoft.Extensions.DependencyInjection; +using TickerQ.Utilities; +using TickerQ.Utilities.Base; +using TickerQ.Utilities.Enums; +using TickerQ.Utilities.Interfaces; +using TickerQ.Utilities.Models; + +namespace TickerQ.RemoteExecutor; + +public class TickerRemoteExecutionTaskHandler : ITickerExecutionTaskHandler +{ + private readonly IServiceProvider _serviceProvider; + + public TickerRemoteExecutionTaskHandler(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + } + + public async Task ExecuteTaskAsync(InternalFunctionContext context, bool isDue, CancellationToken cancellationToken = default) + { + var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + await using var scope = _serviceProvider.CreateAsyncScope(); + + if (TickerFunctionProvider.TickerFunctions.TryGetValue(context.FunctionName, out var function)) + { + var tickerFunctionContext = new TickerFunctionContext + { + RequestCancelOperationAction = null, + Id = context.TickerId, + Type = context.Type, + FunctionName = context.FunctionName, + RetryCount = context.RetryCount, + IsDue = isDue, + ScheduledFor = context.ExecutionTime + }; + await function.Delegate(cancellationTokenSource.Token, scope.ServiceProvider, tickerFunctionContext); + } + } +} \ No newline at end of file diff --git a/src/TickerQ.SDK/Client/TickerQSdkHttpClient.cs b/src/TickerQ.SDK/Client/TickerQSdkHttpClient.cs new file mode 100644 index 00000000..44266a73 --- /dev/null +++ b/src/TickerQ.SDK/Client/TickerQSdkHttpClient.cs @@ -0,0 +1,220 @@ +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Globalization; +using Microsoft.Extensions.Logging; + +namespace TickerQ.SDK.Client; + +public class TickerQSdkHttpClient +{ + private readonly IHttpClientFactory _httpClientFactory; + private readonly TickerSdkOptions _options; + private readonly JsonSerializerOptions _serializerOptions; + private readonly ILogger? _logger; + + /// + /// HTTP client name for Hub requests (function registration). + /// + internal const string HubClientName = "tickerq-sdk-hub"; + + /// + /// HTTP client name for Scheduler requests (job operations). + /// + internal const string SchedulerClientName = "tickerq-sdk-scheduler"; + + public TickerQSdkHttpClient( + IHttpClientFactory httpClientFactory, + TickerSdkOptions options, + ILogger? logger = null) + { + _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger; + + _serializerOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + } + + public Task GetAsync(string path, CancellationToken cancellationToken = default) + { + return SendAsync(HttpMethod.Get, path, null, cancellationToken); + } + + public Task PostAsync(string path, TRequest request, CancellationToken cancellationToken = default) + { + return SendAsync(HttpMethod.Post, path, request, cancellationToken); + } + + public Task PutAsync(string path, TRequest request, CancellationToken cancellationToken = default) + { + return SendAsync(HttpMethod.Put, path, request, cancellationToken); + } + + public Task DeleteAsync(string path, CancellationToken cancellationToken = default) + { + return SendAsync(HttpMethod.Delete, path, null, cancellationToken); + } + + public async Task GetBytesAsync(string path, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(path)) + throw new ArgumentException("Path must be provided.", nameof(path)); + + var uri = BuildUri(path); + var requestMessage = new HttpRequestMessage(HttpMethod.Get, uri); + ApplyAuthentication(requestMessage); + ApplySignature(requestMessage, string.Empty); + + var httpClient = GetHttpClient(uri); + + try + { + _logger?.LogDebug("TickerQ SDK sending {Method} {Uri}", requestMessage.Method, requestMessage.RequestUri); + + using var response = await httpClient.SendAsync(requestMessage, cancellationToken).ConfigureAwait(false); + _logger?.LogDebug("TickerQ SDK received {StatusCode} for {Method} {Uri}", + (int)response.StatusCode, requestMessage.Method, requestMessage.RequestUri); + + response.EnsureSuccessStatusCode(); + + if (response.Content == null) + return null; + + return await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); + } + catch (HttpRequestException ex) + { + _logger?.LogError(ex, "TickerQ SDK HTTP request failed {Method} {Uri} - Status: {StatusCode}", + requestMessage.Method, requestMessage.RequestUri, ex.StatusCode); + throw; + } + catch (TaskCanceledException ex) when (!cancellationToken.IsCancellationRequested) + { + _logger?.LogError(ex, "TickerQ SDK request timed out {Method} {Uri}", requestMessage.Method, requestMessage.RequestUri); + throw; + } + } + + private async Task SendAsync(HttpMethod method, string path, TRequest? request, CancellationToken cancellationToken) + { + if (string.IsNullOrWhiteSpace(path)) + throw new ArgumentException("Path must be provided.", nameof(path)); + + var uri = BuildUri(path); + var requestMessage = new HttpRequestMessage(method, uri); + ApplyAuthentication(requestMessage); + + var body = string.Empty; + if (request is not null && method != HttpMethod.Get && method != HttpMethod.Delete) + { + body = JsonSerializer.Serialize(request, _serializerOptions); + requestMessage.Content = new StringContent(body, Encoding.UTF8, "application/json"); + } + ApplySignature(requestMessage, body); + + var httpClient = GetHttpClient(uri); + + try + { + _logger?.LogDebug("TickerQ SDK sending {Method} {Uri}", requestMessage.Method, requestMessage.RequestUri); + + using var response = await httpClient.SendAsync(requestMessage, cancellationToken).ConfigureAwait(false); + _logger?.LogDebug("TickerQ SDK received {StatusCode} for {Method} {Uri}", + (int)response.StatusCode, requestMessage.Method, requestMessage.RequestUri); + + response.EnsureSuccessStatusCode(); + + if (typeof(TResponse) == typeof(object) || typeof(TResponse) == typeof(void)) + return default; + + await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); + if (stream.Length == 0) + return default; + + return await JsonSerializer.DeserializeAsync(stream, _serializerOptions, cancellationToken).ConfigureAwait(false); + } + catch (HttpRequestException ex) + { + _logger?.LogError(ex, "TickerQ SDK HTTP request failed {Method} {Uri} - Status: {StatusCode}", + requestMessage.Method, requestMessage.RequestUri, ex.StatusCode); + throw; + } + catch (TaskCanceledException ex) when (!cancellationToken.IsCancellationRequested) + { + _logger?.LogError(ex, "TickerQ SDK request timed out {Method} {Uri}", requestMessage.Method, requestMessage.RequestUri); + throw; + } + } + + private HttpClient GetHttpClient(Uri uri) + { + var clientName = IsHubRequest(uri) ? HubClientName : SchedulerClientName; + return _httpClientFactory.CreateClient(clientName); + } + + private Uri BuildUri(string path) + { + if (_options.ApiUri == null) + throw new InvalidOperationException("TickerQ SDK options must be configured with an API URI."); + + if (Uri.TryCreate(path, UriKind.Absolute, out var absolute)) + return absolute; + + return new Uri(_options.ApiUri, path); + } + + private void ApplyAuthentication(HttpRequestMessage request) + { + if (!IsHubRequest(request.RequestUri)) + return; + + if (!string.IsNullOrEmpty(_options.ApiKey)) + request.Headers.TryAddWithoutValidation("X-Api-Key", _options.ApiKey); + + if (!string.IsNullOrEmpty(_options.ApiSecret)) + request.Headers.TryAddWithoutValidation("X-Api-Secret", _options.ApiSecret); + } + + private void ApplySignature(HttpRequestMessage request, string body) + { + if (IsHubRequest(request.RequestUri)) + return; + + if (string.IsNullOrWhiteSpace(_options.WebhookSignature)) + return; + + var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString(CultureInfo.InvariantCulture); + request.Headers.TryAddWithoutValidation("X-Timestamp", timestamp); + + var key = Encoding.UTF8.GetBytes(_options.WebhookSignature); + var pathAndQuery = request.RequestUri?.PathAndQuery ?? "/"; + var header = $"{request.Method.Method}\n{pathAndQuery}\n{timestamp}\n"; + var headerBytes = Encoding.UTF8.GetBytes(header); + var bodyBytes = Encoding.UTF8.GetBytes(body ?? string.Empty); + var payload = new byte[headerBytes.Length + bodyBytes.Length]; + Buffer.BlockCopy(headerBytes, 0, payload, 0, headerBytes.Length); + Buffer.BlockCopy(bodyBytes, 0, payload, headerBytes.Length, bodyBytes.Length); + + var signatureBytes = HMACSHA256.HashData(key, payload); + var signature = Convert.ToBase64String(signatureBytes); + + request.Headers.TryAddWithoutValidation("X-TickerQ-Signature", signature); + } + + private static bool IsHubRequest(Uri? requestUri) + { + if (requestUri is null) + return false; + + if (!requestUri.Host.Equals( + TickerQSdkConstants.HubHostname, + StringComparison.OrdinalIgnoreCase)) + return false; + + return (requestUri.Scheme == Uri.UriSchemeHttp && requestUri.Port == 80) + || (requestUri.Scheme == Uri.UriSchemeHttps && requestUri.Port == 443); + } +} diff --git a/src/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs b/src/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs new file mode 100644 index 00000000..b380eeac --- /dev/null +++ b/src/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs @@ -0,0 +1,59 @@ +using Microsoft.Extensions.DependencyInjection; +using TickerQ.SDK.Client; +using TickerQ.SDK.Infrastructure; +using TickerQ.SDK.HostedServices; +using TickerQ.SDK.Persistence; +using TickerQ.Utilities; +using TickerQ.Utilities.Entities; +using TickerQ.Utilities.Interfaces; + +namespace TickerQ.SDK.DependencyInjection; + +public static class TickerQSdkDependencyInjection +{ + /// + /// Default timeout for HTTP requests to Hub and Scheduler. + /// + private static readonly TimeSpan DefaultHttpTimeout = TimeSpan.FromSeconds(30); + + public static TickerOptionsBuilder AddTickerQSdk(this TickerOptionsBuilder builder, Action configure) + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() + { + builder.DisableBackgroundServices(); + builder.IgnoreSeedDefinedCronTickers(); + builder.ExternalProviderConfigServiceAction += (services) => + { + var options = new TickerSdkOptions + { + // ApiUri initially points to Hub, will be updated to Scheduler URL after sync + ApiUri = new Uri(TickerQSdkConstants.HubBaseUrl) + }; + + configure(options); + options.Validate(); + services.AddSingleton(options); + + // Register HTTP clients with IHttpClientFactory + services.AddHttpClient(TickerQSdkHttpClient.HubClientName, client => + { + client.BaseAddress = new Uri(TickerQSdkConstants.HubBaseUrl); + client.Timeout = DefaultHttpTimeout; + client.DefaultRequestHeaders.Add("X-Api-Key", options.ApiKey); + client.DefaultRequestHeaders.Add("X-Api-Secret", options.ApiSecret); + }); + + services.AddHttpClient(TickerQSdkHttpClient.SchedulerClientName, client => + { + client.Timeout = DefaultHttpTimeout; + }); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton, TickerQRemotePersistenceProvider>(); + services.AddHostedService(); + }; + + return builder; + } +} diff --git a/src/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs b/src/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs new file mode 100644 index 00000000..950fb475 --- /dev/null +++ b/src/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.Hosting; +using TickerQ.SDK.Infrastructure; + +namespace TickerQ.SDK.HostedServices; + +/// +/// Sends the list of registered ticker functions to the remote executor on application startup. +/// The delegate itself is not transmitted; instead, a callback name is provided. +/// +internal sealed class TickerQFunctionRegistrationHostedService : IHostedService +{ + private readonly TickerQFunctionSyncService _syncService; + + public TickerQFunctionRegistrationHostedService(TickerQFunctionSyncService syncService) + { + _syncService = syncService ?? throw new ArgumentNullException(nameof(syncService)); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + await _syncService.SyncAsync(cancellationToken).ConfigureAwait(false); + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} diff --git a/src/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs b/src/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs new file mode 100644 index 00000000..808cb5aa --- /dev/null +++ b/src/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs @@ -0,0 +1,103 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; + +namespace TickerQ.SDK.Infrastructure; + +internal static class JsonExampleGenerator +{ + private static object GenerateExample(Type type) => Generate(type); + + private static object Generate(Type type) + { + var underlyingType = Nullable.GetUnderlyingType(type); + if (underlyingType != null) + return Generate(underlyingType); + + if (type.IsPrimitive || type == typeof(string)) + return GetDefaultValue(type); + + if (type.IsArray) + { + var elementType = type.GetElementType(); + var array = Array.CreateInstance(elementType!, 1); + array.SetValue(Generate(elementType!), 0); + return array; + } + + if (type.IsGenericType && + (type.GetGenericTypeDefinition() == typeof(List<>) || + type.GetGenericTypeDefinition() == typeof(IList<>))) + { + var elementType = type.GetGenericArguments()[0]; + var listType = typeof(List<>).MakeGenericType(elementType); + var list = Activator.CreateInstance(listType); + list!.GetType().GetMethod("Add")!.Invoke(list, new[] { Generate(elementType) }); + return list; + } + + if (type.IsClass || type.IsValueType) + { + var instance = Activator.CreateInstance(type)!; + foreach (var property in type.GetProperties()) + { + if (!property.CanWrite) + continue; + + var value = Generate(property.PropertyType); + property.SetValue(instance, value); + } + + return instance; + } + + return GetDefaultValue(type); + } + + private static object GetDefaultValue(Type type) + { + return Type.GetTypeCode(type) switch + { + TypeCode.Boolean => true, + TypeCode.Byte => (byte)1, + TypeCode.Char => 'a', + TypeCode.DateTime => new DateTime(2023, 1, 1), + TypeCode.DBNull => DBNull.Value, + TypeCode.Decimal => 123.45m, + TypeCode.Double => 123.45, + TypeCode.Empty => null, + TypeCode.Int16 => (short)1, + TypeCode.Int32 => 123, + TypeCode.Int64 => 123L, + TypeCode.Object => Activator.CreateInstance(type)!, + TypeCode.SByte => (sbyte)1, + TypeCode.Single => 123.45f, + TypeCode.String => "string", + TypeCode.UInt16 => (ushort)1, + TypeCode.UInt32 => 123u, + TypeCode.UInt64 => 123ul, + _ => Activator.CreateInstance(type)!, + }; + } + + private static string GenerateExampleJson(Type type) + { + return JsonSerializer.Serialize( + GenerateExample(type), + new JsonSerializerOptions { WriteIndented = true }); + } + + public static bool TryGenerateExampleJson(Type type, out string json) + { + try + { + json = GenerateExampleJson(type); + return true; + } + catch (Exception) + { + json = string.Empty; + return false; + } + } +} diff --git a/src/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs b/src/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs new file mode 100644 index 00000000..70560540 --- /dev/null +++ b/src/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs @@ -0,0 +1,76 @@ +using TickerQ.SDK.Client; +using TickerQ.SDK.Models; +using TickerQ.Utilities; + +namespace TickerQ.SDK.Infrastructure; + +internal sealed class TickerQFunctionSyncService +{ + private readonly TickerQSdkHttpClient _client; + private readonly TickerSdkOptions _options; + + public TickerQFunctionSyncService(TickerQSdkHttpClient client, TickerSdkOptions options) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async Task SyncAsync(CancellationToken cancellationToken) + { + if (TickerFunctionProvider.TickerFunctions == null || + TickerFunctionProvider.TickerFunctions.Count == 0) + { + return null; + } + + var node = new Node + { + NodeName = _options.NodeName ?? "node", + CallbackUrl = _options.CallbackUri?.ToString(), + Functions = [] + }; + + foreach (var (name, value) in TickerFunctionProvider.TickerFunctions) + { + TickerFunctionProvider.TickerFunctionRequestTypes.TryGetValue(name, out var requestType); + var exampleJson = string.Empty; + if (requestType.Item2 != null) + JsonExampleGenerator.TryGenerateExampleJson(requestType.Item2, out exampleJson); + + var (cronExpression, priority, _) = value; + node.Functions.Add(new NodeFunction + { + FunctionName = name, + RequestType = requestType.Item1 ?? string.Empty, + RequestExampleJson = exampleJson, + TaskPriority = priority, + CronExpression = cronExpression + }); + } + + var hubBase = new Uri(TickerQSdkConstants.HubBaseUrl); + var syncUri = new Uri(hubBase, "api/apps/sync/nodes-functions/batch"); + + var result = await _client + .PostAsync( + syncUri.ToString(), + node, + cancellationToken) + .ConfigureAwait(false); + + if (result != null) + { + if (!string.IsNullOrWhiteSpace(result.ApplicationUrl)) + { + _options.ApiUri = new Uri(result.ApplicationUrl.TrimEnd('/') + "/"); + } + + if (!string.IsNullOrWhiteSpace(result.WebhookSignature)) + { + _options.WebhookSignature = result.WebhookSignature; + } + } + + return result; + } +} diff --git a/src/TickerQ.SDK/Models/Node.cs b/src/TickerQ.SDK/Models/Node.cs new file mode 100644 index 00000000..53b82410 --- /dev/null +++ b/src/TickerQ.SDK/Models/Node.cs @@ -0,0 +1,10 @@ +namespace TickerQ.SDK.Models; + +public sealed class Node +{ + public string NodeName { get; set; } + public string CallbackUrl { get; set; } + public bool IsProduction { get; set; } + public List Functions { get; set; } = new(); +} + diff --git a/src/TickerQ.SDK/Models/NodeFunction.cs b/src/TickerQ.SDK/Models/NodeFunction.cs new file mode 100644 index 00000000..b1e3cb21 --- /dev/null +++ b/src/TickerQ.SDK/Models/NodeFunction.cs @@ -0,0 +1,12 @@ +using TickerQ.Utilities.Enums; + +namespace TickerQ.SDK.Models; + +public sealed class NodeFunction +{ + public string FunctionName { get; set; } + public string RequestType { get; set; } + public string RequestExampleJson { get; set; } = string.Empty; + public TickerTaskPriority TaskPriority { get; set; } + public string CronExpression { get; set; } +} diff --git a/src/TickerQ.SDK/Models/RemoteExecutionContext.cs b/src/TickerQ.SDK/Models/RemoteExecutionContext.cs new file mode 100644 index 00000000..04646804 --- /dev/null +++ b/src/TickerQ.SDK/Models/RemoteExecutionContext.cs @@ -0,0 +1,13 @@ +using TickerQ.Utilities.Enums; + +namespace TickerQ.SDK.Models; + +public class RemoteExecutionContext +{ + public Guid Id { get; set; } + public TickerType Type { get; set; } + public int RetryCount { get; set; } + public bool IsDue { get; set; } + public DateTime ScheduledFor { get; set; } + public string FunctionName { get; set; } +} \ No newline at end of file diff --git a/src/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs b/src/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs new file mode 100644 index 00000000..eccfe3d3 --- /dev/null +++ b/src/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs @@ -0,0 +1,7 @@ +namespace TickerQ.SDK.Models; + +public class SyncNodesAndFunctionsResult +{ + public string ApplicationUrl { get; set; } + public string WebhookSignature { get; set; } +} \ No newline at end of file diff --git a/src/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs b/src/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs new file mode 100644 index 00000000..7d498ac1 --- /dev/null +++ b/src/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs @@ -0,0 +1,307 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using TickerQ.SDK.Client; +using TickerQ.Utilities.Entities; +using TickerQ.Utilities.Interfaces; +using TickerQ.Utilities.Models; + +namespace TickerQ.SDK.Persistence; + +/// +/// HTTP-based implementation of ITickerPersistenceProvider used by the SDK. +/// Only the methods required for creating, updating, and deleting jobs are implemented. +/// All other members throw NotImplementedException and are intended to be handled +/// by the server-side TickerQ host. +/// +internal sealed class TickerQRemotePersistenceProvider : + ITickerPersistenceProvider + where TTimeTicker : TimeTickerEntity, new() + where TCronTicker : CronTickerEntity, new() +{ + private readonly TickerQSdkHttpClient _client; + + private const string TimeTickersPath = "time-tickers"; + private const string CronTickersPath = "cron-tickers"; + + public TickerQRemotePersistenceProvider(TickerQSdkHttpClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + #region Time_Ticker_Core_Methods + + public IAsyncEnumerable QueueTimeTickers(TimeTickerEntity[] timeTickers, CancellationToken cancellationToken = default) + => throw new NotImplementedException("QueueTimeTickers is handled by the server-side TickerQ host."); + + public IAsyncEnumerable QueueTimedOutTimeTickers(CancellationToken cancellationToken = default) + => throw new NotImplementedException("QueueTimedOutTimeTickers is handled by the server-side TickerQ host."); + + public Task ReleaseAcquiredTimeTickers(Guid[] timeTickerIds, CancellationToken cancellationToken = default) + => throw new NotImplementedException("ReleaseAcquiredTimeTickers is handled by the server-side TickerQ host."); + + public Task GetEarliestTimeTickers(CancellationToken cancellationToken = default) + => throw new NotImplementedException("GetEarliestTimeTickers is handled by the server-side TickerQ host."); + + public async Task UpdateTimeTicker(InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + { + if (functionContext == null) + throw new ArgumentNullException(nameof(functionContext)); + + // PUT /time-tickers/context (body: InternalFunctionContext) + var affected = await _client + .PutAsync( + $"{TimeTickersPath}/context", + functionContext, + cancellationToken) + .ConfigureAwait(false); + + // Default to 1 row affected if server does not return a value + return affected ?? 1; + } + + public Task GetTimeTickerRequest(Guid id, CancellationToken cancellationToken) + => GetRequestBytesAsync($"{TimeTickersPath}/request/{id}", cancellationToken); + + public async Task UpdateTimeTickersWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + { + if (timeTickerIds == null) + throw new ArgumentNullException(nameof(timeTickerIds)); + if (functionContext == null) + throw new ArgumentNullException(nameof(functionContext)); + + // POST /time-tickers/unified-context + // Body carries both ids and the unified InternalFunctionContext + var payload = new + { + ids = timeTickerIds, + context = functionContext + }; + + await _client + .PostAsync( + $"{TimeTickersPath}/unified-context", + payload, + cancellationToken) + .ConfigureAwait(false); + } + + public Task AcquireImmediateTimeTickersAsync(Guid[] ids, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Immediate acquisition is handled by the server-side TickerQ host."); + + #endregion + + #region Cron_Ticker_Core_Methods + + public Task MigrateDefinedCronTickers((string Function, string Expression)[] cronTickers, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Cron seeding is handled by the server-side TickerQ host."); + + public Task GetAllCronTickerExpressions(CancellationToken cancellationToken) + => throw new NotImplementedException("GetAllCronTickerExpressions is handled by the server-side TickerQ host."); + + public Task ReleaseDeadNodeTimeTickerResources(string instanceIdentifier, CancellationToken cancellationToken = default) + => throw new NotImplementedException("ReleaseDeadNodeTimeTickerResources is handled by the server-side TickerQ host."); + + #endregion + + #region Cron_TickerOccurrence_Core_Methods + + public Task> GetEarliestAvailableCronOccurrence(Guid[] ids, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Cron occurrence scheduling is handled by the server-side TickerQ host."); + + public IAsyncEnumerable> QueueCronTickerOccurrences((DateTime Key, InternalManagerContext[] Items) cronTickerOccurrences, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Cron occurrence queueing is handled by the server-side TickerQ host."); + + public IAsyncEnumerable> QueueTimedOutCronTickerOccurrences(CancellationToken cancellationToken = default) + => throw new NotImplementedException("Cron occurrence timeout handling is handled by the server-side TickerQ host."); + + public async Task UpdateCronTickerOccurrence(InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + { + if (functionContext == null) + throw new ArgumentNullException(nameof(functionContext)); + + const string cronOccurrencesPath = "cron-ticker-occurrences"; + + // PUT /cron-ticker-occurrences/context (body: InternalFunctionContext) + var affected = await _client + .PutAsync( + $"{cronOccurrencesPath}/context", + functionContext, + cancellationToken) + .ConfigureAwait(false); + + // We ignore the result; InternalTickerManager does not use the return value + _ = affected; + } + + public Task ReleaseAcquiredCronTickerOccurrences(Guid[] occurrenceIds, CancellationToken cancellationToken = default) + => throw new NotImplementedException("ReleaseAcquiredCronTickerOccurrences is handled by the server-side TickerQ host."); + + public Task GetCronTickerOccurrenceRequest(Guid tickerId, CancellationToken cancellationToken = default) + => GetRequestBytesAsync($"cron-ticker-occurrences/request/{tickerId}", cancellationToken); + + public Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Execution status updates are handled by the server-side TickerQ host."); + + public Task ReleaseDeadNodeOccurrenceResources(string instanceIdentifier, CancellationToken cancellationToken = default) + => throw new NotImplementedException("ReleaseDeadNodeOccurrenceResources is handled by the server-side TickerQ host."); + + #endregion + + #region Time_Ticker_Shared_Methods + + public Task GetTimeTickerById(Guid id, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public Task GetTimeTickers(System.Linq.Expressions.Expression> predicate, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public Task> GetTimeTickersPaginated(System.Linq.Expressions.Expression> predicate, int pageNumber, int pageSize, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public async Task AddTimeTickers(TTimeTicker[] tickers, CancellationToken cancellationToken = default) + { + if (tickers == null || tickers.Length == 0) + return 0; + + // POST /time-tickers (body: TTimeTicker[]) + var affected = await _client + .PostAsync( + TimeTickersPath, + tickers, + cancellationToken) + .ConfigureAwait(false); + + return affected ?? tickers.Length; + } + + public async Task UpdateTimeTickers(TTimeTicker[] tickers, CancellationToken cancellationToken = default) + { + if (tickers == null || tickers.Length == 0) + return 0; + + // PUT /time-tickers (body: TTimeTicker[]) + var affected = await _client + .PutAsync( + TimeTickersPath, + tickers, + cancellationToken) + .ConfigureAwait(false); + + return affected ?? tickers.Length; + } + + public async Task RemoveTimeTickers(Guid[] tickerIds, CancellationToken cancellationToken = default) + { + tickerIds ??= Array.Empty(); + if (tickerIds.Length == 0) + return 0; + + // POST /time-tickers/delete (body: Guid[]) + var affected = await _client + .PostAsync( + $"{TimeTickersPath}/delete", + tickerIds, + cancellationToken) + .ConfigureAwait(false); + + return affected ?? tickerIds.Length; + } + + #endregion + + #region Cron_Ticker_Shared_Methods + + public Task GetCronTickerById(Guid id, CancellationToken cancellationToken) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public Task GetCronTickers(System.Linq.Expressions.Expression> predicate, CancellationToken cancellationToken) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public Task> GetCronTickersPaginated(System.Linq.Expressions.Expression> predicate, int pageNumber, int pageSize, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public async Task InsertCronTickers(TCronTicker[] tickers, CancellationToken cancellationToken) + { + if (tickers == null || tickers.Length == 0) + return 0; + + // POST /cron-tickers (body: TCronTicker[]) + var affected = await _client + .PostAsync( + CronTickersPath, + tickers, + cancellationToken) + .ConfigureAwait(false); + + return affected ?? tickers.Length; + } + + public async Task UpdateCronTickers(TCronTicker[] cronTicker, CancellationToken cancellationToken) + { + if (cronTicker == null || cronTicker.Length == 0) + return 0; + + // PUT /cron-tickers (body: TCronTicker[]) + var affected = await _client + .PutAsync( + CronTickersPath, + cronTicker, + cancellationToken) + .ConfigureAwait(false); + + return affected ?? cronTicker.Length; + } + + public async Task RemoveCronTickers(Guid[] cronTickerIds, CancellationToken cancellationToken) + { + cronTickerIds ??= Array.Empty(); + if (cronTickerIds.Length == 0) + return 0; + + // POST /cron-tickers/delete (body: Guid[]) + var affected = await _client + .PostAsync( + $"{CronTickersPath}/delete", + cronTickerIds, + cancellationToken) + .ConfigureAwait(false); + + return affected ?? cronTickerIds.Length; + } + + #endregion + + #region Cron_TickerOccurrence_Shared_Methods + + public Task[]> GetAllCronTickerOccurrences(System.Linq.Expressions.Expression, bool>> predicate, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public Task>> GetAllCronTickerOccurrencesPaginated(System.Linq.Expressions.Expression, bool>> predicate, int pageNumber, int pageSize, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Read operations should be performed against the server-side API directly."); + + public Task InsertCronTickerOccurrences(CronTickerOccurrenceEntity[] cronTickerOccurrences, CancellationToken cancellationToken) + => throw new NotImplementedException("Cron occurrence insertion is handled by the server-side TickerQ host."); + + public Task RemoveCronTickerOccurrences(Guid[] cronTickerOccurrences, CancellationToken cancellationToken) + => throw new NotImplementedException("Cron occurrence deletion is handled by the server-side TickerQ host."); + + public Task[]> AcquireImmediateCronOccurrencesAsync(Guid[] occurrenceIds, CancellationToken cancellationToken = default) + => throw new NotImplementedException("Immediate cron occurrence acquisition is handled by the server-side TickerQ host."); + + #endregion + + private async Task GetRequestBytesAsync(string path, CancellationToken cancellationToken) + { + try + { + return await _client.GetBytesAsync(path, cancellationToken).ConfigureAwait(false); + } + catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.NotFound) + { + return null; + } + } +} diff --git a/src/TickerQ.SDK/SdkExecutionEndpoint.cs b/src/TickerQ.SDK/SdkExecutionEndpoint.cs new file mode 100644 index 00000000..9a8065ce --- /dev/null +++ b/src/TickerQ.SDK/SdkExecutionEndpoint.cs @@ -0,0 +1,103 @@ +using System.Text; +using System.Text.Json; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; +using TickerQ.SDK.Infrastructure; +using TickerQ.SDK.Models; +using TickerQ.Utilities; +using TickerQ.Utilities.Enums; +using TickerQ.Utilities.Interfaces; +using TickerQ.Utilities.Models; + +namespace TickerQ.SDK; + +public static class SdkExecutionEndpoint +{ + + public static IEndpointRouteBuilder ExposeSdkExecutionEndpoint( + this IEndpointRouteBuilder endpoints, string prefix = "") + { + var group = endpoints.MapGroup(prefix); + group.MapPost("/execute", async (HttpContext http, [FromServices] IServiceProvider serviceProvider) => + { + http.Request.EnableBuffering(); + + http.Request.Body.Position = 0; + + using var reader = new StreamReader( + http.Request.Body, + Encoding.UTF8, + leaveOpen: true); + + var body = await reader.ReadToEndAsync(); + + http.Request.Body.Position = 0; // important if anything else reads it + + if (string.IsNullOrWhiteSpace(body)) + return Results.BadRequest("Empty body"); + + RemoteExecutionContext? context; + try + { + context = JsonSerializer.Deserialize( + body, + new JsonSerializerOptions { PropertyNameCaseInsensitive = true }); + } + catch (JsonException) + { + return Results.BadRequest("Invalid JSON payload"); + } + + if (context is null || string.IsNullOrWhiteSpace(context.FunctionName)) + return Results.BadRequest("Invalid payload"); + + await using var scope = serviceProvider.CreateAsyncScope(); + var scheduler = scope.ServiceProvider.GetService(); + var taskHandler = scope.ServiceProvider.GetRequiredService(); + + var function = new InternalFunctionContext + { + FunctionName = context.FunctionName, + TickerId = context.Id, + ParentId = null, + Type = context.Type, + Retries = 0, + RetryCount = context.RetryCount, + Status = TickerStatus.Idle, + ExecutionTime = context.ScheduledFor, + RunCondition = RunCondition.OnSuccess + }; + + if (TickerFunctionProvider.TickerFunctions.TryGetValue(function.FunctionName, out var tickerItem)) + { + function.CachedDelegate = tickerItem.Delegate; + function.CachedPriority = tickerItem.Priority; + } + + if (scheduler is not null && !scheduler.IsDisposed && !scheduler.IsFrozen) + { + await scheduler.QueueAsync( + ct => taskHandler.ExecuteTaskAsync(function, context.IsDue, ct), + TickerTaskPriority.LongRunning, + cancellationToken: CancellationToken.None); + } + else + { + await taskHandler.ExecuteTaskAsync(function, context.IsDue, CancellationToken.None); + } + + return Results.Ok(); + }).AddEndpointFilter(); + group.MapPost("/resync", async ([FromServices] TickerQFunctionSyncService syncService) => + { + await syncService.SyncAsync(CancellationToken.None).ConfigureAwait(false); + return Results.Ok(); + }).AddEndpointFilter(); + + + return endpoints; + } +} diff --git a/src/TickerQ.SDK/TickerQ.SDK.csproj b/src/TickerQ.SDK/TickerQ.SDK.csproj new file mode 100644 index 00000000..74530192 --- /dev/null +++ b/src/TickerQ.SDK/TickerQ.SDK.csproj @@ -0,0 +1,16 @@ + + + + enable + TickerQ.SDK + TickerQ.SDK + TickerQ SDK + HTTP client and integration helpers for interacting with TickerQ scheduler and hub endpoints. + $(PackageTags);sdk;client;http;remote;api + + + + + + + diff --git a/src/TickerQ.SDK/TickerQSdkConstants.cs b/src/TickerQ.SDK/TickerQSdkConstants.cs new file mode 100644 index 00000000..dde8d029 --- /dev/null +++ b/src/TickerQ.SDK/TickerQSdkConstants.cs @@ -0,0 +1,18 @@ +namespace TickerQ.SDK; + +/// +/// Constants used by the TickerQ SDK. +/// +public static class TickerQSdkConstants +{ + /// + /// The base URL of the TickerQ Hub service. + /// This is a fixed endpoint and cannot be configured by users. + /// + public const string HubBaseUrl = "https://hub.tickerq.net/"; + + /// + /// The Hub hostname used for request routing. + /// + public const string HubHostname = "hub.tickerq.net"; +} diff --git a/src/TickerQ.SDK/TickerQSignatureFilter.cs b/src/TickerQ.SDK/TickerQSignatureFilter.cs new file mode 100644 index 00000000..419983ad --- /dev/null +++ b/src/TickerQ.SDK/TickerQSignatureFilter.cs @@ -0,0 +1,112 @@ +using System.Security.Cryptography; +using System.Text; +using System.Globalization; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; + +namespace TickerQ.SDK; + +public sealed class TickerQSignatureFilter : IEndpointFilter +{ + private const long MaxTimestampSkewSeconds = 300; + + private readonly TickerSdkOptions _options; + private readonly ILogger? _logger; + + public TickerQSignatureFilter(TickerSdkOptions options, ILogger? logger = null) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger; + } + + public async ValueTask InvokeAsync( + EndpointFilterInvocationContext ctx, + EndpointFilterDelegate next) + { + var request = ctx.HttpContext.Request; + + // Validate WebhookSignature is configured + if (string.IsNullOrWhiteSpace(_options.WebhookSignature)) + { + _logger?.LogWarning("TickerQ signature validation skipped: WebhookSignature not configured for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + // Validate required headers first (fail fast) + if (!request.Headers.TryGetValue("X-TickerQ-Signature", out var sig) || string.IsNullOrWhiteSpace(sig)) + { + _logger?.LogWarning("TickerQ signature validation failed: Missing X-TickerQ-Signature header for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + if (!request.Headers.TryGetValue("X-Timestamp", out var timestampHeader)) + { + _logger?.LogWarning("TickerQ signature validation failed: Missing X-Timestamp header for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + var timestamp = timestampHeader.Count > 0 ? timestampHeader[0] : string.Empty; + if (!long.TryParse(timestamp, NumberStyles.Integer, CultureInfo.InvariantCulture, out var ts)) + { + _logger?.LogWarning("TickerQ signature validation failed: Invalid timestamp format for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + if (Math.Abs(now - ts) > MaxTimestampSkewSeconds) + { + _logger?.LogWarning("TickerQ signature validation failed: Timestamp skew too large ({SkewSeconds}s) for {Method} {Path}", + Math.Abs(now - ts), request.Method, request.Path); + return Results.Unauthorized(); + } + + // Parse signature with error handling + byte[] received; + try + { + received = Convert.FromBase64String(sig!); + } + catch (FormatException) + { + _logger?.LogWarning("TickerQ signature validation failed: Invalid Base64 signature for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + // Enable buffering before reading so downstream can read again + request.EnableBuffering(); + + byte[] bodyBytes; + await using (var ms = new MemoryStream()) + { + await request.Body.CopyToAsync(ms, ctx.HttpContext.RequestAborted); + bodyBytes = ms.ToArray(); + request.Body.Position = 0; + } + + // Compute expected signature (empty body is valid) + var pathAndQuery = $"{request.Path}{request.QueryString}"; + var header = $"{request.Method}\n{pathAndQuery}\n{timestamp}\n"; + var headerBytes = Encoding.UTF8.GetBytes(header); + var payload = new byte[headerBytes.Length + bodyBytes.Length]; + Buffer.BlockCopy(headerBytes, 0, payload, 0, headerBytes.Length); + Buffer.BlockCopy(bodyBytes, 0, payload, headerBytes.Length, bodyBytes.Length); + + var key = Encoding.UTF8.GetBytes(_options.WebhookSignature); + var expected = HMACSHA256.HashData(key, payload); + + if (expected.Length != received.Length || + !CryptographicOperations.FixedTimeEquals(expected, received)) + { + _logger?.LogWarning("TickerQ signature validation failed: Signature mismatch for {Method} {Path}", + request.Method, request.Path); + return Results.Unauthorized(); + } + + return await next(ctx); + } +} diff --git a/src/TickerQ.SDK/TickerSdkOptions.cs b/src/TickerQ.SDK/TickerSdkOptions.cs new file mode 100644 index 00000000..87ccc3d7 --- /dev/null +++ b/src/TickerQ.SDK/TickerSdkOptions.cs @@ -0,0 +1,69 @@ +namespace TickerQ.SDK; + +public class TickerSdkOptions +{ + /// + /// The API URL for job operations. Initially points to Hub, updated to Scheduler URL after sync. + /// + internal Uri? ApiUri { get; set; } + + /// + /// The Hub URL. Fixed to the TickerQ Hub service and cannot be changed. + /// + internal Uri HubUri { get; } = new Uri(TickerQSdkConstants.HubBaseUrl); + + internal string? WebhookSignature { get; set; } + internal Uri? CallbackUri { get; private set; } + internal string? ApiKey { get; private set; } + internal string? ApiSecret { get; private set; } + internal string? NodeName { get; private set; } + + public TickerSdkOptions SetApiKey(string apiKey) + { + ApiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey)); + return this; + } + + public TickerSdkOptions SetApiSecret(string apiSecret) + { + ApiSecret = apiSecret ?? throw new ArgumentNullException(nameof(apiSecret)); + return this; + } + + public TickerSdkOptions SetCallbackUri(Uri callbackUri) + { + CallbackUri = callbackUri ?? throw new ArgumentNullException(nameof(callbackUri)); + return this; + } + + public TickerSdkOptions SetNodeName(string nodeName) + { + NodeName = string.IsNullOrWhiteSpace(nodeName) ? throw new ArgumentNullException(nameof(nodeName)) : nodeName; + return this; + } + + /// + /// Validates that all required configuration options are set. + /// + /// Thrown when required options are missing. + internal void Validate() + { + var errors = new List(); + + if (string.IsNullOrWhiteSpace(ApiKey)) + errors.Add("ApiKey is required. Call SetApiKey() to configure."); + + if (string.IsNullOrWhiteSpace(ApiSecret)) + errors.Add("ApiSecret is required. Call SetApiSecret() to configure."); + + if (CallbackUri == null) + errors.Add("CallbackUri is required. Call SetCallbackUri() to configure."); + + if (string.IsNullOrWhiteSpace(NodeName)) + errors.Add("NodeName is required. Call SetNodeName() to configure."); + + if (errors.Count > 0) + throw new InvalidOperationException( + $"TickerQ SDK configuration is invalid:\n- {string.Join("\n- ", errors)}"); + } +} diff --git a/src/TickerQ.Utilities/Interfaces/ITickerExecutionTaskHandler.cs b/src/TickerQ.Utilities/Interfaces/ITickerExecutionTaskHandler.cs new file mode 100644 index 00000000..3028f462 --- /dev/null +++ b/src/TickerQ.Utilities/Interfaces/ITickerExecutionTaskHandler.cs @@ -0,0 +1,12 @@ +using System.Threading; +using System.Threading.Tasks; +using TickerQ.Utilities.Models; + +namespace TickerQ.Utilities.Interfaces +{ + public interface ITickerExecutionTaskHandler + { + Task ExecuteTaskAsync(InternalFunctionContext context, bool isDue, CancellationToken cancellationToken = default); + } +} + diff --git a/src/TickerQ.Utilities/Interfaces/ITickerQTaskScheduler.cs b/src/TickerQ.Utilities/Interfaces/ITickerQTaskScheduler.cs new file mode 100644 index 00000000..fedde32d --- /dev/null +++ b/src/TickerQ.Utilities/Interfaces/ITickerQTaskScheduler.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using TickerQ.Utilities.Enums; + +namespace TickerQ.Utilities.Interfaces +{ + /// + /// Abstraction for the internal TickerQ task scheduler. + /// + public interface ITickerQTaskScheduler + { + ValueTask QueueAsync( + Func work, + TickerTaskPriority priority, + CancellationToken cancellationToken = default); + + void Freeze(); + + void Resume(); + + bool IsFrozen { get; } + + int ActiveWorkers { get; } + + int TotalQueuedTasks { get; } + + bool IsDisposed { get; } + + string GetDiagnostics(); + + Task WaitForRunningTasksAsync(TimeSpan? timeout = null); + } +} + diff --git a/src/TickerQ.Utilities/Managers/InternalTickerManager.cs b/src/TickerQ.Utilities/Managers/InternalTickerManager.cs index 73d64678..b0c43aa6 100644 --- a/src/TickerQ.Utilities/Managers/InternalTickerManager.cs +++ b/src/TickerQ.Utilities/Managers/InternalTickerManager.cs @@ -391,7 +391,9 @@ public async Task GetRequestAsync(Guid tickerId, TickerType type, Cancella ? await _persistenceProvider.GetCronTickerOccurrenceRequest(tickerId, cancellationToken: cancellationToken).ConfigureAwait(false) : await _persistenceProvider.GetTimeTickerRequest(tickerId, cancellationToken: cancellationToken).ConfigureAwait(false); - return request == null ? default : TickerHelper.ReadTickerRequest(request); + return request == null || request.Length == 0 + ? default + : TickerHelper.ReadTickerRequest(request); } public async Task RunTimedOutTickers(CancellationToken cancellationToken = default) diff --git a/src/TickerQ.Utilities/Models/InternalFunctionContext.cs b/src/TickerQ.Utilities/Models/InternalFunctionContext.cs index 3166ce15..49265af1 100644 --- a/src/TickerQ.Utilities/Models/InternalFunctionContext.cs +++ b/src/TickerQ.Utilities/Models/InternalFunctionContext.cs @@ -3,19 +3,19 @@ using System.Diagnostics.CodeAnalysis; using System.Linq.Expressions; using System.Reflection; +using System.Text.Json.Serialization; using TickerQ.Utilities.Enums; namespace TickerQ.Utilities.Models { public class InternalFunctionContext { - private HashSet ParametersToUpdate { get; set; } = []; - + public HashSet ParametersToUpdate { get; set; } = []; // Cached function delegate and priority for performance optimization // Eliminates dictionary lookups during execution + [JsonIgnore] public TickerFunctionDelegate CachedDelegate { get; set; } public TickerTaskPriority CachedPriority { get; set; } - public string FunctionName { get; set; } public Guid TickerId { get; set; } public Guid? ParentId { get; set; } diff --git a/src/TickerQ.Utilities/Properties/AssemblyInfo.cs b/src/TickerQ.Utilities/Properties/AssemblyInfo.cs index d213ba07..fd2c503f 100644 --- a/src/TickerQ.Utilities/Properties/AssemblyInfo.cs +++ b/src/TickerQ.Utilities/Properties/AssemblyInfo.cs @@ -4,6 +4,8 @@ [assembly: InternalsVisibleTo("TickerQ.EntityFrameworkCore")] [assembly: InternalsVisibleTo("TickerQ.Dashboard")] [assembly: InternalsVisibleTo("TickerQ.Tests")] +[assembly: InternalsVisibleTo("TickerQ.SDK")] +[assembly: InternalsVisibleTo("TickerQ.RemoteExecutor")] [assembly: InternalsVisibleTo("TickerQ.Instrumentation.OpenTelemetry")] [assembly: InternalsVisibleTo("TickerQ.Caching.StackExchangeRedis")] // To be testable using NSubsitute diff --git a/src/TickerQ.Utilities/Temps/NoOpTickerQDispatcher.cs b/src/TickerQ.Utilities/Temps/NoOpTickerQDispatcher.cs index 68273f4f..119d3b4a 100644 --- a/src/TickerQ.Utilities/Temps/NoOpTickerQDispatcher.cs +++ b/src/TickerQ.Utilities/Temps/NoOpTickerQDispatcher.cs @@ -18,4 +18,9 @@ public Task DispatchAsync(InternalFunctionContext[] contexts, CancellationToken // No-op: dispatcher not available in queue-only mode return Task.CompletedTask; } + + public Task DispatchAsync(InternalFunctionContext context, CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } } diff --git a/src/TickerQ/DependencyInjection/TickerQServiceExtensions.cs b/src/TickerQ/DependencyInjection/TickerQServiceExtensions.cs index ca5b63f0..275dc7af 100644 --- a/src/TickerQ/DependencyInjection/TickerQServiceExtensions.cs +++ b/src/TickerQ/DependencyInjection/TickerQServiceExtensions.cs @@ -60,9 +60,8 @@ public static IServiceCollection AddTickerQ(this IServ provider.GetRequiredService()); services.AddHostedService(provider => provider.GetRequiredService()); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(sp => + services.AddSingleton(sp => { var notification = sp.GetRequiredService(); var notifyDebounce = new SoftSchedulerNotifyDebounce((value) => notification.UpdateActiveThreads(value)); @@ -71,11 +70,12 @@ public static IServiceCollection AddTickerQ(this IServ } else { + services.AddSingleton(_ => new TickerQTaskScheduler(schedulerOptionsBuilder.MaxConcurrency, schedulerOptionsBuilder.IdleWorkerTimeOut)); // Register NoOp implementations when background services are disabled services.AddSingleton(); services.AddSingleton(); } - + services.AddSingleton(); services.AddSingleton(); optionInstance.ExternalProviderConfigServiceAction?.Invoke(services); diff --git a/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs b/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs index e43090dc..49b0b655 100644 --- a/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs +++ b/src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs @@ -2,8 +2,8 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; -using TickerQ.TickerQThreadPool; using TickerQ.Utilities; +using TickerQ.Utilities.Interfaces; using TickerQ.Utilities.Interfaces.Managers; namespace TickerQ.BackgroundServices; @@ -12,11 +12,11 @@ internal class TickerQFallbackBackgroundService : BackgroundService { private int _started; private readonly IInternalTickerManager _internalTickerManager; - private readonly TickerExecutionTaskHandler _tickerExecutionTaskHandler; - private readonly TickerQTaskScheduler _tickerQTaskScheduler; + private readonly ITickerExecutionTaskHandler _tickerExecutionTaskHandler; + private readonly ITickerQTaskScheduler _tickerQTaskScheduler; private readonly TimeSpan _fallbackJobPeriod; - public TickerQFallbackBackgroundService(IInternalTickerManager internalTickerManager, SchedulerOptionsBuilder schedulerOptions, TickerExecutionTaskHandler tickerExecutionTaskHandler, TickerQTaskScheduler tickerQTaskScheduler) + public TickerQFallbackBackgroundService(IInternalTickerManager internalTickerManager, SchedulerOptionsBuilder schedulerOptions, ITickerExecutionTaskHandler tickerExecutionTaskHandler, ITickerQTaskScheduler tickerQTaskScheduler) { _internalTickerManager = internalTickerManager; _fallbackJobPeriod = schedulerOptions.FallbackIntervalChecker; diff --git a/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs b/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs index 81fd3aec..ffaf02f6 100644 --- a/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs +++ b/src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs @@ -17,8 +17,8 @@ internal class TickerQSchedulerBackgroundService : BackgroundService, ITickerQHo private readonly IInternalTickerManager _internalTickerManager; private readonly TickerExecutionContext _executionContext; private SafeCancellationTokenSource _schedulerLoopCancellationTokenSource; - private readonly TickerQTaskScheduler _taskScheduler; - private readonly TickerExecutionTaskHandler _taskHandler; + private readonly ITickerQTaskScheduler _taskScheduler; + private readonly ITickerExecutionTaskHandler _taskHandler; private int _started; public bool SkipFirstRun; public bool IsRunning => _started == 1; @@ -26,8 +26,8 @@ internal class TickerQSchedulerBackgroundService : BackgroundService, ITickerQHo public TickerQSchedulerBackgroundService( TickerExecutionContext executionContext, - TickerExecutionTaskHandler taskHandler, - TickerQTaskScheduler taskScheduler, + ITickerExecutionTaskHandler taskHandler, + ITickerQTaskScheduler taskScheduler, IInternalTickerManager internalTickerManager) { _executionContext = executionContext; diff --git a/src/TickerQ/Src/Dispatcher/TickerQDispatcher.cs b/src/TickerQ/Src/Dispatcher/TickerQDispatcher.cs index 95b36f2f..a50a5f43 100644 --- a/src/TickerQ/Src/Dispatcher/TickerQDispatcher.cs +++ b/src/TickerQ/Src/Dispatcher/TickerQDispatcher.cs @@ -1,7 +1,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using TickerQ.TickerQThreadPool; using TickerQ.Utilities.Interfaces; using TickerQ.Utilities.Models; @@ -9,12 +8,12 @@ namespace TickerQ.Dispatcher { internal class TickerQDispatcher : ITickerQDispatcher { - private readonly TickerQTaskScheduler _taskScheduler; - private readonly TickerExecutionTaskHandler _taskHandler; + private readonly ITickerQTaskScheduler _taskScheduler; + private readonly ITickerExecutionTaskHandler _taskHandler; public bool IsEnabled => true; - public TickerQDispatcher(TickerQTaskScheduler taskScheduler, TickerExecutionTaskHandler taskHandler) + public TickerQDispatcher(ITickerQTaskScheduler taskScheduler, ITickerExecutionTaskHandler taskHandler) { _taskScheduler = taskScheduler ?? throw new ArgumentNullException(nameof(taskScheduler)); _taskHandler = taskHandler ?? throw new ArgumentNullException(nameof(taskHandler)); @@ -35,4 +34,3 @@ await _taskScheduler.QueueAsync( } } } - diff --git a/src/TickerQ/Src/TickerExecutionTaskHandler.cs b/src/TickerQ/Src/TickerExecutionTaskHandler.cs index ec873ec9..534e8713 100644 --- a/src/TickerQ/Src/TickerExecutionTaskHandler.cs +++ b/src/TickerQ/Src/TickerExecutionTaskHandler.cs @@ -16,7 +16,7 @@ namespace TickerQ; -internal class TickerExecutionTaskHandler +internal class TickerExecutionTaskHandler : ITickerExecutionTaskHandler { private readonly IServiceProvider _serviceProvider; private readonly ITickerClock _clock; diff --git a/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs b/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs index 9f5a6098..f4f7b452 100644 --- a/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs +++ b/src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs @@ -3,13 +3,14 @@ using System.Threading; using System.Threading.Tasks; using TickerQ.Utilities.Enums; +using TickerQ.Utilities.Interfaces; namespace TickerQ.TickerQThreadPool; /// /// Elastic work-stealing task scheduler. /// -public sealed class TickerQTaskScheduler : IAsyncDisposable +public sealed class TickerQTaskScheduler : IAsyncDisposable, ITickerQTaskScheduler { private readonly int _maxConcurrency; private readonly TimeSpan _idleWorkerTimeout; diff --git a/src/src.sln b/src/src.sln new file mode 100644 index 00000000..faaf8a69 --- /dev/null +++ b/src/src.sln @@ -0,0 +1,72 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.2.0 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Dashboard", "TickerQ.Dashboard\TickerQ.Dashboard.csproj", "{01E20682-E5A0-56B0-B670-92C59A0693DE}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Utilities", "TickerQ.Utilities\TickerQ.Utilities.csproj", "{791B1939-42EB-FEC6-4415-7117EF0EDA37}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ", "TickerQ\TickerQ.csproj", "{18AB32B3-6AB6-C69B-E39C-CA2C43364188}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.SourceGenerator", "TickerQ.SourceGenerator\TickerQ.SourceGenerator.csproj", "{A3A3A9E3-C853-8C16-87A7-2829FAC084DF}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.RemoteExecutor", "TickerQ.RemoteExecutor\TickerQ.RemoteExecutor.csproj", "{3467E6BF-D4A0-E969-6FC2-3113EA08E567}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.EntityFrameworkCore", "TickerQ.EntityFrameworkCore\TickerQ.EntityFrameworkCore.csproj", "{63A97B66-4163-9B2B-9DB4-1CD235095817}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.SDK", "TickerQ.SDK\TickerQ.SDK.csproj", "{0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Caching.StackExchangeRedis", "TickerQ.Caching.StackExchangeRedis\TickerQ.Caching.StackExchangeRedis.csproj", "{E1B5AE18-2847-D83A-2F51-BC0E86571A9F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Instrumentation.OpenTelemetry", "TickerQ.Instrumentation.OpenTelemetry\TickerQ.Instrumentation.OpenTelemetry.csproj", "{C98790EB-D0E8-211E-E4FC-DC18E27ABA40}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {01E20682-E5A0-56B0-B670-92C59A0693DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {01E20682-E5A0-56B0-B670-92C59A0693DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {01E20682-E5A0-56B0-B670-92C59A0693DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {01E20682-E5A0-56B0-B670-92C59A0693DE}.Release|Any CPU.Build.0 = Release|Any CPU + {791B1939-42EB-FEC6-4415-7117EF0EDA37}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {791B1939-42EB-FEC6-4415-7117EF0EDA37}.Debug|Any CPU.Build.0 = Debug|Any CPU + {791B1939-42EB-FEC6-4415-7117EF0EDA37}.Release|Any CPU.ActiveCfg = Release|Any CPU + {791B1939-42EB-FEC6-4415-7117EF0EDA37}.Release|Any CPU.Build.0 = Release|Any CPU + {18AB32B3-6AB6-C69B-E39C-CA2C43364188}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {18AB32B3-6AB6-C69B-E39C-CA2C43364188}.Debug|Any CPU.Build.0 = Debug|Any CPU + {18AB32B3-6AB6-C69B-E39C-CA2C43364188}.Release|Any CPU.ActiveCfg = Release|Any CPU + {18AB32B3-6AB6-C69B-E39C-CA2C43364188}.Release|Any CPU.Build.0 = Release|Any CPU + {A3A3A9E3-C853-8C16-87A7-2829FAC084DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A3A3A9E3-C853-8C16-87A7-2829FAC084DF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A3A3A9E3-C853-8C16-87A7-2829FAC084DF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A3A3A9E3-C853-8C16-87A7-2829FAC084DF}.Release|Any CPU.Build.0 = Release|Any CPU + {3467E6BF-D4A0-E969-6FC2-3113EA08E567}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3467E6BF-D4A0-E969-6FC2-3113EA08E567}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3467E6BF-D4A0-E969-6FC2-3113EA08E567}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3467E6BF-D4A0-E969-6FC2-3113EA08E567}.Release|Any CPU.Build.0 = Release|Any CPU + {63A97B66-4163-9B2B-9DB4-1CD235095817}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {63A97B66-4163-9B2B-9DB4-1CD235095817}.Debug|Any CPU.Build.0 = Debug|Any CPU + {63A97B66-4163-9B2B-9DB4-1CD235095817}.Release|Any CPU.ActiveCfg = Release|Any CPU + {63A97B66-4163-9B2B-9DB4-1CD235095817}.Release|Any CPU.Build.0 = Release|Any CPU + {0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}.Release|Any CPU.Build.0 = Release|Any CPU + {E1B5AE18-2847-D83A-2F51-BC0E86571A9F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E1B5AE18-2847-D83A-2F51-BC0E86571A9F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E1B5AE18-2847-D83A-2F51-BC0E86571A9F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E1B5AE18-2847-D83A-2F51-BC0E86571A9F}.Release|Any CPU.Build.0 = Release|Any CPU + {C98790EB-D0E8-211E-E4FC-DC18E27ABA40}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C98790EB-D0E8-211E-E4FC-DC18E27ABA40}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C98790EB-D0E8-211E-E4FC-DC18E27ABA40}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C98790EB-D0E8-211E-E4FC-DC18E27ABA40}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {B3F7C24B-4E1D-4996-9C7D-9C3CB5FAE7EE} + EndGlobalSection +EndGlobal