Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="FSharp.Core" Version="9.0.100" />
<PackageVersion Include="FSharp.SystemTextJson" Version="1.3.13" />
<PackageVersion Include="JasperFx" Version="1.21.0" />
<PackageVersion Include="JasperFx.Events" Version="1.23.1" />
<PackageVersion Include="JasperFx" Version="1.21.1" />
<PackageVersion Include="JasperFx.Events" Version="1.24.0" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="1.2.0" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.4.0" />
<PackageVersion Include="Jil" Version="3.0.0-alpha2" />
Expand Down
2 changes: 1 addition & 1 deletion docs/documents/querying/check-exists.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Sometimes you only need to know whether a document with a given id exists in the
## Supported Identity Types

| Id Type | Supported |
|---------|-----------|
| ------- | --------- |
| `Guid` | Yes |
| `int` | Yes |
| `long` | Yes |
Expand Down
2 changes: 1 addition & 1 deletion docs/documents/querying/linq/group-join.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,4 @@ The following patterns are **not yet supported** and will throw `NotSupportedExc

- **GroupJoin as a final operator** (without `SelectMany`) — materializing the grouped collection is not supported. Use `GroupJoin` + `SelectMany` instead.
- **Composite keys** — joining on multiple fields simultaneously is not supported.
- **Cross-apply / subquery joins** — only simple key-to-key equi-joins are supported.
- **Cross-apply / subquery joins** — only simple key-to-key joins are supported.
1 change: 1 addition & 0 deletions src/Marten.sln
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\.github\workflows\on-push-do-ci-build-pg15-jsonnet.yml = ..\.github\workflows\on-push-do-ci-build-pg15-jsonnet.yml
..\.github\workflows\on-push-do-ci-build-pgLatest-systemtextjson.yml = ..\.github\workflows\on-push-do-ci-build-pgLatest-systemtextjson.yml
..\.github\workflows\daemon.yml = ..\.github\workflows\daemon.yml
..\Directory.Packages.props = ..\Directory.Packages.props
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AspNetCoreWithMarten", "AspNetCoreWithMarten\AspNetCoreWithMarten.csproj", "{739B657C-4E0D-40E8-853E-ADF5C2D3D89D}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ public ProjectionProgressStatement(EventGraph events)

protected override void configure(ICommandBuilder builder)
{
if (_events.UseOptimizedProjectionRebuilds)
if (_events.UseOptimizedProjectionRebuilds && _events.EnableExtendedProgressionTracking)
{
builder.Append($"select name, last_seq_id, mode, rebuild_threshold, assigned_node, heartbeat, agent_status, pause_reason, running_on_node from {_events.DatabaseSchemaName}.mt_event_progression");
}
else if (_events.UseOptimizedProjectionRebuilds)
{
builder.Append($"select name, last_seq_id, mode, rebuild_threshold, assigned_node from {_events.DatabaseSchemaName}.mt_event_progression");
}
else if (_events.EnableExtendedProgressionTracking)
{
builder.Append($"select name, last_seq_id, heartbeat, agent_status, pause_reason, running_on_node from {_events.DatabaseSchemaName}.mt_event_progression");
}
else
{
builder.Append($"select name, last_seq_id from {_events.DatabaseSchemaName}.mt_event_progression");
Expand Down
35 changes: 32 additions & 3 deletions src/Marten/Events/Daemon/Progress/ShardStateSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,45 @@ public async Task<ShardState> ResolveAsync(DbDataReader reader, CancellationToke
var sequence = await reader.GetFieldValueAsync<long>(1, token).ConfigureAwait(false);
var state = new ShardState(name, sequence);

var nextIndex = 2;

if (_events.UseOptimizedProjectionRebuilds)
{
var modeString = await reader.GetFieldValueAsync<string>(2, token).ConfigureAwait(false);
var modeString = await reader.GetFieldValueAsync<string>(nextIndex++, token).ConfigureAwait(false);
if (Enum.TryParse<ShardMode>(modeString, out var mode))
{
state.Mode = mode;
}

state.RebuildThreshold = await reader.GetFieldValueAsync<long>(3, token).ConfigureAwait(false);
state.AssignedNodeNumber = await reader.GetFieldValueAsync<int>(4, token).ConfigureAwait(false);
state.RebuildThreshold = await reader.GetFieldValueAsync<long>(nextIndex++, token).ConfigureAwait(false);
state.AssignedNodeNumber = await reader.GetFieldValueAsync<int>(nextIndex++, token).ConfigureAwait(false);
}

if (_events.EnableExtendedProgressionTracking)
{
if (!await reader.IsDBNullAsync(nextIndex, token).ConfigureAwait(false))
{
state.LastHeartbeat = await reader.GetFieldValueAsync<DateTimeOffset>(nextIndex, token).ConfigureAwait(false);
}
nextIndex++;

if (!await reader.IsDBNullAsync(nextIndex, token).ConfigureAwait(false))
{
state.AgentStatus = await reader.GetFieldValueAsync<string>(nextIndex, token).ConfigureAwait(false);
}
nextIndex++;

if (!await reader.IsDBNullAsync(nextIndex, token).ConfigureAwait(false))
{
state.PauseReason = await reader.GetFieldValueAsync<string>(nextIndex, token).ConfigureAwait(false);
}
nextIndex++;

if (!await reader.IsDBNullAsync(nextIndex, token).ConfigureAwait(false))
{
state.RunningOnNode = await reader.GetFieldValueAsync<int>(nextIndex, token).ConfigureAwait(false);
}
nextIndex++;
}

return state;
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/EventGraph.FeatureSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ private IEnumerable<ISchemaObject> createAllSchemaObjects()
yield return new EventProgressionTable(this);

yield return new SystemFunction(DatabaseSchemaName, "mt_mark_event_progression", "varchar, bigint");

if (EnableExtendedProgressionTracking)
{
yield return new SystemFunction(DatabaseSchemaName, "mt_mark_event_progression_extended",
"varchar, bigint, timestamp with time zone, varchar, text, integer");
}
yield return new ArchiveStreamFunction(this);

yield return new QuickAppendEventFunction(this);
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ public override IEvent BuildEvent(object eventData)
public bool UseMonitoredAdvisoryLock { get; set; } = true;
public bool EnableAdvancedAsyncTracking { get; set; }
public bool EnableEventSkippingInProjectionsOrSubscriptions { get; set; }

/// <summary>
/// When enabled, adds heartbeat, agent_status, pause_reason, and running_on_node
/// columns to the event progression table for CritterWatch monitoring
/// </summary>
public bool EnableExtendedProgressionTracking { get; set; }
public bool UseArchivedStreamPartitioning { get; set; }
public IMessageOutbox MessageOutbox { get; set; } = new NulloMessageOutbox();

Expand Down
8 changes: 8 additions & 0 deletions src/Marten/Events/Schema/EventProgressionTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ public EventProgressionTable(EventGraph eventGraph): base(new PostgresqlObjectNa
AddColumn<int>("assigned_node").DefaultValueByExpression("0");
}

if (eventGraph.EnableExtendedProgressionTracking)
{
AddColumn("heartbeat", "timestamp with time zone").AllowNulls();
AddColumn("agent_status", "varchar(20)").AllowNulls();
AddColumn("pause_reason", "text").AllowNulls();
AddColumn("running_on_node", "integer").AllowNulls();
}

PrimaryKeyName = "pk_mt_event_progression";
}
}
13 changes: 13 additions & 0 deletions src/Marten/Schema/SQL/mt_mark_event_progression_extended.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE
OR REPLACE FUNCTION {databaseSchema}.mt_mark_event_progression_extended(name varchar, last_encountered bigint, p_heartbeat timestamp with time zone, p_agent_status varchar, p_pause_reason text, p_running_on_node integer) RETURNS VOID LANGUAGE plpgsql AS
$function$
BEGIN
INSERT INTO {databaseSchema}.mt_event_progression (name, last_seq_id, last_updated, heartbeat, agent_status, pause_reason, running_on_node)
VALUES (name, last_encountered, transaction_timestamp(), p_heartbeat, p_agent_status, p_pause_reason, p_running_on_node)
ON CONFLICT ON CONSTRAINT pk_mt_event_progression
DO
UPDATE SET last_seq_id = last_encountered, last_updated = transaction_timestamp(), heartbeat = p_heartbeat, agent_status = p_agent_status, pause_reason = p_pause_reason, running_on_node = p_running_on_node;

END;

$function$;
Loading