Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Hangfire.PostgreSql/Hangfire.PostgreSql.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<LangVersion>default</LangVersion>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<NoWarn>$(NoWarn);1591</NoWarn>
<InterceptorsNamespaces>$(InterceptorsNamespaces);Dapper.AOT</InterceptorsNamespaces>
</PropertyGroup>

<ItemGroup>
Expand All @@ -33,6 +34,7 @@

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.123" />
<PackageReference Include="Dapper.AOT" Version="1.0.48" />
<PackageReference Include="GitVersion.MsBuild" Version="5.11.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
32 changes: 9 additions & 23 deletions src/Hangfire.PostgreSql/JsonParameter.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,28 @@
using System;
using System.Data;
using System.Text.Json;
using Dapper;
using Hangfire.Annotations;
using Npgsql;
using NpgsqlTypes;

namespace Hangfire.PostgreSql;

internal class JsonParameter : SqlMapper.ICustomQueryParameter
internal static class JsonParameter
{
[CanBeNull] private readonly object _value;
private readonly ValueType _type;

public JsonParameter([CanBeNull] object value) : this(value, ValueType.Object)
{
}

public JsonParameter([CanBeNull] object value, ValueType type)
public static string GetParameterValue([CanBeNull] object value)
{
_value = value;
_type = type;
return GetParameterValue(value, ValueType.Object);
}

public void AddParameter(IDbCommand command, string name)
public static string GetParameterValue([CanBeNull] object value, ValueType type)
{
string value = _value switch {
return value switch {
string { Length: > 0 } stringValue => stringValue,
string { Length: 0 } or null => GetDefaultValue(),
var _ => JsonSerializer.Serialize(_value),
string { Length: 0 } or null => GetDefaultValue(type),
var _ => JsonSerializer.Serialize(value),
};
command.Parameters.Add(new NpgsqlParameter(name, NpgsqlDbType.Jsonb) { Value = value });
}

private string GetDefaultValue()
private static string GetDefaultValue(ValueType type)
{
return _type switch
{
return type switch {
ValueType.Object => "{}",
ValueType.Array => "[]",
var _ => throw new ArgumentOutOfRangeException(),
Expand Down
48 changes: 22 additions & 26 deletions src/Hangfire.PostgreSql/PostgreSqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

namespace Hangfire.PostgreSql
{
[DapperAot]
public class PostgreSqlConnection : JobStorageConnection
{
private readonly Dictionary<string, HashSet<Guid>> _lockedResources;
Expand Down Expand Up @@ -115,7 +116,7 @@ public override string CreateExpiredJob(

string createJobSql = $@"
INSERT INTO ""{_options.SchemaName}"".""job"" (""invocationdata"", ""arguments"", ""createdat"", ""expireat"")
VALUES (@InvocationData, @Arguments, @CreatedAt, @ExpireAt)
VALUES (@InvocationData::jsonb, @Arguments::jsonb, @CreatedAt, @ExpireAt)
RETURNING ""id"";
";

Expand All @@ -124,30 +125,28 @@ public override string CreateExpiredJob(
return _storage.UseTransaction(_dedicatedConnection, (connection, transaction) => {
string jobId = connection.QuerySingle<long>(createJobSql,
new {
InvocationData = new JsonParameter(SerializationHelper.Serialize(invocationData)),
Arguments = new JsonParameter(invocationData.Arguments, JsonParameter.ValueType.Array),
InvocationData = JsonParameter.GetParameterValue(SerializationHelper.Serialize(invocationData)),
Arguments = JsonParameter.GetParameterValue(invocationData.Arguments, JsonParameter.ValueType.Array),
CreatedAt = createdAt,
ExpireAt = createdAt.Add(expireIn),
}).ToString(CultureInfo.InvariantCulture);

if (parameters.Count > 0)
{
object[] parameterArray = new object[parameters.Count];
int parameterIndex = 0;
foreach (KeyValuePair<string, string> parameter in parameters)
{
parameterArray[parameterIndex++] = new {
JobId = Convert.ToInt64(jobId, CultureInfo.InvariantCulture),
Name = parameter.Key,
parameter.Value,
};
}
var parameterArray = parameters
.Select(parameter =>
new {
JobId = Convert.ToInt64(jobId, CultureInfo.InvariantCulture),
Name = parameter.Key,
parameter.Value,
}
)
.ToArray();

string insertParameterSql = $@"
INSERT INTO ""{_options.SchemaName}"".""jobparameter"" (""jobid"", ""name"", ""value"")
VALUES (@JobId, @Name, @Value);
";

connection.Execute(insertParameterSql, parameterArray, transaction);
}

Expand All @@ -170,8 +169,7 @@ public override JobData GetJobData(string id)

SqlJob jobData = _storage.UseConnection(_dedicatedConnection,
connection => connection
.Query<SqlJob>(sql, new { Id = Convert.ToInt64(id, CultureInfo.InvariantCulture) })
.SingleOrDefault());
.QuerySingleOrDefault<SqlJob>(sql, new { Id = Convert.ToInt64(id, CultureInfo.InvariantCulture) }));

if (jobData == null)
{
Expand Down Expand Up @@ -218,8 +216,7 @@ public override StateData GetStateData(string jobId)

SqlState sqlState = _storage.UseConnection(_dedicatedConnection,
connection => connection
.Query<SqlState>(sql, new { JobId = Convert.ToInt64(jobId, CultureInfo.InvariantCulture) })
.SingleOrDefault());
.QuerySingleOrDefault<SqlState>(sql, new { JobId = Convert.ToInt64(jobId, CultureInfo.InvariantCulture) }));
return sqlState == null
? null
: new StateData {
Expand Down Expand Up @@ -350,7 +347,7 @@ public override List<string> GetFirstByLowestScoreFromSet(string key, double fro
ORDER BY ""score"" LIMIT @Limit;
",
new { Key = key, FromScore = fromScore, ToScore = toScore, Limit = count }))
.ToList();
.AsList();
}

public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string, string>> keyValuePairs)
Expand Down Expand Up @@ -453,7 +450,7 @@ public override void AnnounceServer(string serverId, ServerContext context)

string sql = $@"
WITH ""inputvalues"" AS (
SELECT @Id ""id"", @Data ""data"", NOW() ""lastheartbeat""
SELECT @Id ""id"", @Data::jsonb ""data"", NOW() ""lastheartbeat""
), ""updatedrows"" AS (
UPDATE ""{_options.SchemaName}"".""server"" ""updatetarget""
SET ""data"" = ""inputvalues"".""data"", ""lastheartbeat"" = ""inputvalues"".""lastheartbeat""
Expand All @@ -472,7 +469,7 @@ SELECT 1
";

_storage.UseConnection(_dedicatedConnection, connection => connection
.Execute(sql, new { Id = serverId, Data = new JsonParameter(SerializationHelper.Serialize(data)) }));
.Execute(sql, new { Id = serverId, Data = JsonParameter.GetParameterValue(SerializationHelper.Serialize(data)) }));
}

public override void RemoveServer(string serverId)
Expand Down Expand Up @@ -546,7 +543,7 @@ public override List<string> GetAllItemsFromList(string key)

return _storage.UseConnection(_dedicatedConnection, connection => connection
.Query<string>(query, new { Key = key })
.ToList());
.AsList());
}

public override long GetCounter(string key)
Expand Down Expand Up @@ -612,7 +609,7 @@ LIMIT @Limit OFFSET @Offset
return _storage.UseConnection(_dedicatedConnection, connection =>
connection
.Query<string>(query, new { Key = key, Limit = endingAt - startingFrom + 1, Offset = startingFrom })
.ToList());
.AsList());
}

public override long GetHashCount(string key)
Expand Down Expand Up @@ -660,7 +657,7 @@ LIMIT @Limit OFFSET @Offset

return _storage.UseConnection(_dedicatedConnection, connection => connection
.Query<string>(query, new { Key = key, Limit = endingAt - startingFrom + 1, Offset = startingFrom })
.ToList());
.AsList());
}

public override TimeSpan GetSetTtl(string key)
Expand Down Expand Up @@ -693,8 +690,7 @@ public override string GetValueFromHash(string key, string name)
string query = $@"SELECT ""value"" FROM ""{_options.SchemaName}"".""hash"" WHERE ""key"" = @Key AND ""field"" = @Field";

return _storage.UseConnection(_dedicatedConnection, connection => connection
.Query<string>(query, new { Key = key, Field = name })
.SingleOrDefault());
.QuerySingleOrDefault<string>(query, new { Key = key, Field = name }));
}

private IDisposable AcquireLock(string resource, TimeSpan timeout)
Expand Down
18 changes: 9 additions & 9 deletions src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

namespace Hangfire.PostgreSql
{
[DapperAot]
public class PostgreSqlJobQueue : IPersistentJobQueue
{
private const string JobNotificationChannel = "new_job";
Expand Down Expand Up @@ -163,17 +164,16 @@ LIMIT 1
try
{
using NpgsqlTransaction trx = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedJob jobToFetch = connection.Query<FetchedJob>(fetchJobSql,
new { Queues = queues.ToList() }, trx)
.SingleOrDefault();
FetchedJob jobToFetch = connection.QuerySingleOrDefault<FetchedJob>(fetchJobSql,
new { Queues = queues.ToList() }, trx);

trx.Commit();

return jobToFetch;
}
catch (InvalidOperationException)
{
// thrown by .SingleOrDefault(): stop the exception propagation if the fetched job was concurrently fetched by another worker
// thrown by .QuerySingleOrDefault(): stop the exception propagation if the fetched job was concurrently fetched by another worker
}
finally
{
Expand Down Expand Up @@ -237,9 +237,9 @@ internal IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken canc
{
cancellationToken.ThrowIfCancellationRequested();

FetchedJob jobToFetch = _storage.UseConnection(null, connection => connection.Query<FetchedJob>(jobToFetchSql,
FetchedJob jobToFetch = _storage.UseConnection(null, connection => connection.QuerySingleOrDefault<FetchedJob>(jobToFetchSql,
new { Queues = queues.ToList() })
.SingleOrDefault());
);

if (jobToFetch == null)
{
Expand All @@ -254,9 +254,9 @@ internal IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken canc
}
else
{
markJobAsFetched = _storage.UseConnection(null, connection => connection.Query<FetchedJob>(markJobAsFetchedSql,
markJobAsFetched = _storage.UseConnection(null, connection => connection.QuerySingleOrDefault<FetchedJob>(markJobAsFetchedSql,
jobToFetch)
.SingleOrDefault());
);
}
}
while (markJobAsFetched == null);
Expand Down Expand Up @@ -318,7 +318,7 @@ private Task ListenForNotificationsAsync(CancellationToken cancellationToken)
}

[UsedImplicitly(ImplicitUseTargetFlags.WithMembers)]
private class FetchedJob
internal class FetchedJob
{
public long Id { get; set; }
public long JobId { get; set; }
Expand Down
9 changes: 6 additions & 3 deletions src/Hangfire.PostgreSql/PostgreSqlJobQueueMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

namespace Hangfire.PostgreSql
{
[DapperAot]
internal class PostgreSqlJobQueueMonitoringApi : IPersistentJobQueueMonitoringApi
{
private readonly PostgreSqlStorage _storage;
Expand All @@ -38,7 +39,7 @@ public PostgreSqlJobQueueMonitoringApi(PostgreSqlStorage storage)
public IEnumerable<string> GetQueues()
{
string sqlQuery = $@"SELECT DISTINCT ""queue"" FROM ""{_storage.Options.SchemaName}"".""jobqueue""";
return _storage.UseConnection(null, connection => connection.Query<string>(sqlQuery).ToList());
return _storage.UseConnection(null, connection => connection.Query<string>(sqlQuery).AsList());
}

public IEnumerable<long> GetEnqueuedJobIds(string queue, int from, int perPage)
Expand Down Expand Up @@ -69,7 +70,7 @@ SELECT COUNT(*)
";

(long enqueuedCount, long fetchedCount) = _storage.UseConnection(null, connection =>
connection.QuerySingle<(long EnqueuedCount, long FetchedCount)>(sqlQuery, new { Queue = queue }));
connection.Query<EnqueuedAndFetchedCount>(sqlQuery, new { Queue = queue })).Single();

return new EnqueuedAndFetchedCountDto {
EnqueuedCount = enqueuedCount,
Expand All @@ -92,7 +93,9 @@ AND j.""id"" IS NOT NULL

return _storage.UseConnection(null, connection => connection.Query<long>(sqlQuery,
new { Queue = queue, Offset = from, Limit = perPage })
.ToList());
.AsList());
}

internal record struct EnqueuedAndFetchedCount(long EnqueuedCount, long FetchedCount);
}
}
13 changes: 8 additions & 5 deletions src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

namespace Hangfire.PostgreSql
{
[DapperAot]
public class PostgreSqlMonitoringApi : IMonitoringApi
{
private readonly PersistentJobQueueProviderCollection _queueProviders;
Expand Down Expand Up @@ -239,7 +240,7 @@
WHERE ""jobid"" = @Id
ORDER BY ""id"" DESC;
";
using SqlMapper.GridReader multi = connection.QueryMultiple(sql, new { Id = Convert.ToInt64(jobId, CultureInfo.InvariantCulture) });

Check warning on line 243 in src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs

View workflow job for this annotation

GitHub Actions / build-and-test

The Dapper method 'SqlMapper.QueryMultiple(IDbConnection, string, object, IDbTransaction, int?, CommandType?)' is not currently supported by Dapper.AOT (https://aot.dapperlib.dev/rules/DAP001)
SqlJob job = multi.Read<SqlJob>().SingleOrDefault();
if (job == null)
{
Expand Down Expand Up @@ -315,7 +316,7 @@
";

StatisticsDto stats = new();
using (SqlMapper.GridReader multi = connection.QueryMultiple(sql))

Check warning on line 319 in src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs

View workflow job for this annotation

GitHub Actions / build-and-test

The Dapper method 'SqlMapper.QueryMultiple(IDbConnection, string, object, IDbTransaction, int?, CommandType?)' is not currently supported by Dapper.AOT (https://aot.dapperlib.dev/rules/DAP001)
{
Dictionary<string, long> countByStates = multi.Read<(string StateName, long Count)>()
.ToDictionary(x => x.StateName, x => x.Count);
Expand Down Expand Up @@ -399,9 +400,9 @@
GROUP BY "key"
""";

Dictionary<string, long> valuesMap = UseConnection(connection => connection.Query<(string Key, long Count)>(query,
Dictionary<string, long> valuesMap = UseConnection(connection => connection.Query<KeyCount>(query,
new { Keys = keyMaps.Keys.ToList() })
.ToList()
.AsList()
.ToDictionary(x => x.Key, x => x.Count));

foreach (string key in keyMaps.Keys)
Expand All @@ -422,6 +423,8 @@
return result;
}

internal record struct KeyCount(string Key, long Count);

private IPersistentJobQueueMonitoringApi GetQueueApi(string queueName)
{
IPersistentJobQueueProvider provider = _queueProviders.GetProvider(queueName);
Expand All @@ -444,7 +447,7 @@

List<SqlJob> jobs = UseConnection(connection => connection.Query<SqlJob>(enqueuedJobsSql,
new { JobIds = jobIds.ToList() })
.ToList());
.AsList());

return DeserializeJobs(jobs,
(sqlJob, job, stateData) => new EnqueuedJobDto {
Expand Down Expand Up @@ -493,7 +496,7 @@

List<SqlJob> jobs = UseConnection(connection => connection.Query<SqlJob>(jobsSql,
new { StateName = stateName, Limit = count, Offset = from })
.ToList());
.AsList());

return DeserializeJobs(jobs, selector);
}
Expand Down Expand Up @@ -540,7 +543,7 @@

List<SqlJob> jobs = UseConnection(connection => connection.Query<SqlJob>(fetchedJobsSql,
new { JobIds = jobIds.ToList() })
.ToList());
.AsList());

Dictionary<string, FetchedJobDto> result = jobs.ToDictionary(job => job.Id.ToString(), job => new FetchedJobDto {
Job = DeserializeJob(job.InvocationData, job.Arguments),
Expand Down
Loading
Loading