diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlConfigurationExtensions.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlConfigurationExtensions.cs index 8fd8906f0..c27a1c96d 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlConfigurationExtensions.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlConfigurationExtensions.cs @@ -15,18 +15,25 @@ internal static void AssertValidSchemaName(this string schemaName) { if (schemaName.IsEmpty()) throw new ArgumentNullException(nameof(schemaName), "Schema Name cannot be empty or null"); - + if (schemaName.IsNotEmpty() && schemaName != schemaName.ToLowerInvariant()) { throw new ArgumentOutOfRangeException(nameof(schemaName), "The schema name must be in all lower case characters"); } + } - if (schemaName.Contains("-")) - { - throw new ArgumentOutOfRangeException(nameof(schemaName), - "PostgreSQL schema names cannot include dashes. Use underscores instead"); - } + /// + /// Quotes a PostgreSQL identifier (schema, table, column name) with double quotes. + /// Internal double quotes are escaped by doubling them. + /// + internal static string QuoteIdentifier(this string? identifier) + { + if (identifier.IsEmpty()) return identifier ?? string.Empty; + + // Escape any internal double quotes by doubling them + var escaped = identifier.Replace("\"", "\"\""); + return $"\"{escaped}\""; } /// diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs index 03f11f53f..1c609726c 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs @@ -35,9 +35,14 @@ internal class PostgresqlMessageStore : MessageDatabase private readonly string _reassignIncomingSql; private readonly List _externalTables = new(); - + private ImHashMap _sagaStorage = ImHashMap.Empty; + /// + /// Returns the schema name properly quoted for use as a PostgreSQL identifier in SQL statements. + /// + protected override string QuotedSchemaName => SchemaName.QuoteIdentifier(); + public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySettings settings, NpgsqlDataSource dataSource, ILogger logger) : this(databaseSettings, settings, GetPrimaryNpgsqlNodeIfPossible(dataSource), logger, Array.Empty()) @@ -59,15 +64,21 @@ public PostgresqlMessageStore(DatabaseSettings databaseSettings, DurabilitySetti settings, logger, new PostgresqlMigrator(), PostgresqlProvider.Instance) { _reassignIncomingSql = - $"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = @owner, status = '{EnvelopeStatus.Incoming}' where id = ANY(@ids)"; + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set owner_id = @owner, status = '{EnvelopeStatus.Incoming}' where id = ANY(@ids)"; _deleteOutgoingEnvelopesSql = - $"delete from {SchemaName}.{DatabaseConstants.OutgoingTable} WHERE id = ANY(@ids);"; + $"delete from {QuotedSchemaName}.{DatabaseConstants.OutgoingTable} WHERE id = ANY(@ids);"; _findAtLargeEnvelopesSql = - $"select {DatabaseConstants.IncomingFields} from {SchemaName}.{DatabaseConstants.IncomingTable} where owner_id = {TransportConstants.AnyNode} and status = '{EnvelopeStatus.Incoming}' and {DatabaseConstants.ReceivedAt} = :address limit :limit"; + $"select {DatabaseConstants.IncomingFields} from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where owner_id = {TransportConstants.AnyNode} and status = '{EnvelopeStatus.Incoming}' and {DatabaseConstants.ReceivedAt} = :address limit :limit"; _discardAndReassignOutgoingSql = _deleteOutgoingEnvelopesSql + - $";update {SchemaName}.{DatabaseConstants.OutgoingTable} set owner_id = @node where id = ANY(@rids)"; + $";update {QuotedSchemaName}.{DatabaseConstants.OutgoingTable} set owner_id = @node where id = ANY(@rids)"; + + // Rebuild base class SQL strings with properly quoted schema name for PostgreSQL + _markEnvelopeAsHandledById = + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = @keepUntil where id = @id and {DatabaseConstants.ReceivedAt} = @uri"; + _incrementIncomingEnvelopeAttempts = + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set attempts = @attempts where id = @id and {DatabaseConstants.ReceivedAt} = @uri"; NpgsqlDataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource)); @@ -232,7 +243,7 @@ from pg_catalog.pg_class c // by VACUUM/ANALYZE, fall back to exact count if (reltuples <= 0 && relationSize > 0) { - var exactCount = await CreateCommand($"select count(*) from {SchemaName}.{tableName}") + var exactCount = await CreateCommand($"select count(*) from {QuotedSchemaName}.{tableName}") .ExecuteScalarAsync(); return Convert.ToInt32(exactCount); } @@ -247,7 +258,7 @@ from pg_catalog.pg_class c private async Task fetchCountsWithGroupBy(PersistedCounts counts) { await using var reader = await CreateCommand( - $"select status, count(*) from {SchemaName}.{DatabaseConstants.IncomingTable} group by status") + $"select status, count(*) from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} group by status") .ExecuteReaderAsync(); while (await reader.ReadAsync()) @@ -345,14 +356,14 @@ protected override async Task afterTruncateEnvelopeDataAsync(DbConnection conn) // After deleting data, PostgreSQL's pg_class.reltuples statistics become stale. // FetchCountsAsync() uses these stats for fast estimation, so we must run ANALYZE // to update them after bulk deletes. - await conn.CreateCommand($"ANALYZE {SchemaName}.{DatabaseConstants.DeadLetterTable}") + await conn.CreateCommand($"ANALYZE {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable}") .ExecuteNonQueryAsync(_cancellation); - await conn.CreateCommand($"ANALYZE {SchemaName}.{DatabaseConstants.OutgoingTable}") + await conn.CreateCommand($"ANALYZE {QuotedSchemaName}.{DatabaseConstants.OutgoingTable}") .ExecuteNonQueryAsync(_cancellation); if (Durability.EnableInboxPartitioning) { - await conn.CreateCommand($"ANALYZE {SchemaName}.{DatabaseConstants.IncomingTable}") + await conn.CreateCommand($"ANALYZE {QuotedSchemaName}.{DatabaseConstants.IncomingTable}") .ExecuteNonQueryAsync(_cancellation); } } @@ -379,7 +390,7 @@ await CreateCommand(_deleteOutgoingEnvelopesSql) protected override string determineOutgoingEnvelopeSql(DurabilitySettings settings) { return - $"select {DatabaseConstants.OutgoingFields} from {SchemaName}.{DatabaseConstants.OutgoingTable} where owner_id = {TransportConstants.AnyNode} and destination = @destination LIMIT {settings.RecoveryBatchSize}"; + $"select {DatabaseConstants.OutgoingFields} from {QuotedSchemaName}.{DatabaseConstants.OutgoingTable} where owner_id = {TransportConstants.AnyNode} and destination = @destination LIMIT {settings.RecoveryBatchSize}"; } public override async Task> LoadPageOfGloballyOwnedIncomingAsync(Uri listenerAddress, @@ -404,7 +415,7 @@ public override async Task ExistsAsync(Envelope envelope, CancellationToke { await using var conn = await NpgsqlDataSource.OpenConnectionAsync(cancellation); var count = await conn - .CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = :id") + .CreateCommand($"select count(id) from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where id = :id") .With("id", envelope.Id) .ExecuteScalarAsync(cancellation); @@ -414,7 +425,7 @@ public override async Task ExistsAsync(Envelope envelope, CancellationToke { await using var conn = await NpgsqlDataSource.OpenConnectionAsync(cancellation); var count = await conn - .CreateCommand($"select count(id) from {SchemaName}.{DatabaseConstants.IncomingTable} where id = :id and {DatabaseConstants.ReceivedAt} = :destination") + .CreateCommand($"select count(id) from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where id = :id and {DatabaseConstants.ReceivedAt} = :destination") .With("id", envelope.Id) .With("destination", envelope.Destination!.ToString()) .ExecuteScalarAsync(cancellation); @@ -426,7 +437,7 @@ public override async Task ExistsAsync(Envelope envelope, CancellationToke public override void WriteLoadScheduledEnvelopeSql(DbCommandBuilder builder, DateTimeOffset utcNow) { builder.Append( - $"select {DatabaseConstants.IncomingFields} from {SchemaName}.{DatabaseConstants.IncomingTable} where status = '{EnvelopeStatus.Scheduled}' and execution_time <= "); + $"select {DatabaseConstants.IncomingFields} from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where status = '{EnvelopeStatus.Scheduled}' and execution_time <= "); builder.AppendParameter(utcNow); builder.Append($" order by execution_time LIMIT {Durability.RecoveryBatchSize};"); @@ -698,17 +709,17 @@ public override async Task DeleteAllHandledAsync() await conn.OpenAsync(CancellationToken.None); var deleted = 1; - + var sql = $@" WITH todo AS ( SELECT id - FROM {_settings.SchemaName}.{DatabaseConstants.IncomingTable} + FROM {QuotedSchemaName}.{DatabaseConstants.IncomingTable} WHERE status = '{EnvelopeStatus.Handled}' ORDER BY id LIMIT 10000 FOR UPDATE SKIP LOCKED ) - DELETE FROM {_settings.SchemaName}.{DatabaseConstants.IncomingTable} w + DELETE FROM {QuotedSchemaName}.{DatabaseConstants.IncomingTable} w USING todo WHERE w.id = todo.id; "; diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs index e27eaa306..bc1c324f1 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlNodePersistence.cs @@ -71,8 +71,9 @@ public Task DeleteAsync(Guid nodeId, int assignedNodeNumber) return Task.CompletedTask; } + var quotedSchema = _settings.SchemaName.QuoteIdentifier(); return _dataSource.CreateCommand( - $"delete from {_nodeTable} where id = :id;update {_settings.SchemaName}.{IncomingTable} set {OwnerId} = 0 where {OwnerId} = :number;update {_settings.SchemaName}.{OutgoingTable} set {OwnerId} = 0 where {OwnerId} = :number;") + $"delete from {_nodeTable} where id = :id;update {quotedSchema}.{IncomingTable} set {OwnerId} = 0 where {OwnerId} = :number;update {quotedSchema}.{OutgoingTable} set {OwnerId} = 0 where {OwnerId} = :number;") .With("id", nodeId) .With("number", assignedNodeNumber) .ExecuteNonQueryAsync(); @@ -301,9 +302,10 @@ public async Task> FetchRecentRecordsAsync(int count) }; }; + var quotedSchema = _settings.SchemaName.QuoteIdentifier(); return await _dataSource .CreateCommand( - $"select node_number, event_name, timestamp, description from {_settings.SchemaName}.{NodeRecordTableName} order by id desc LIMIT :limit") + $"select node_number, event_name, timestamp, description from {quotedSchema}.{NodeRecordTableName} order by id desc LIMIT :limit") .With("limit", count) .FetchListAsync(readRecord); } @@ -312,8 +314,9 @@ public Task DeleteOldNodeRecordsAsync(int retainCount) { if (retainCount <= 0) return Task.CompletedTask; + var quotedSchema = _settings.SchemaName.QuoteIdentifier(); return _dataSource.CreateCommand( - $"delete from {_settings.SchemaName}.{NodeRecordTableName} where id not in (select id from {_settings.SchemaName}.{NodeRecordTableName} order by id desc limit :retain)") + $"delete from {quotedSchema}.{NodeRecordTableName} where id not in (select id from {quotedSchema}.{NodeRecordTableName} order by id desc limit :retain)") .With("retain", retainCount) .ExecuteNonQueryAsync(); } diff --git a/src/Persistence/Wolverine.Postgresql/Sagas/DatabaseSagaSchema.cs b/src/Persistence/Wolverine.Postgresql/Sagas/DatabaseSagaSchema.cs index a04d3899d..f038fae3c 100644 --- a/src/Persistence/Wolverine.Postgresql/Sagas/DatabaseSagaSchema.cs +++ b/src/Persistence/Wolverine.Postgresql/Sagas/DatabaseSagaSchema.cs @@ -8,6 +8,7 @@ using Weasel.Postgresql; using Weasel.Postgresql.Tables; using Wolverine; +using Wolverine.Postgresql; using Wolverine.RDBMS; using Wolverine.RDBMS.Sagas; @@ -24,12 +25,13 @@ public DatabaseSagaSchema(SagaTableDefinition definition, DatabaseSettings setti _settings = settings; IdSource = LambdaBuilder.Getter(definition.IdMember); - _insertSql = $"insert into {settings.SchemaName}.{definition.TableName} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.Version}) values (@id, @body, 1)"; + var quotedSchema = settings.SchemaName.QuoteIdentifier(); + _insertSql = $"insert into {quotedSchema}.{definition.TableName} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.Version}) values (@id, @body, 1)"; _updateSql = - $"update {settings.SchemaName}.{definition.TableName} set {DatabaseConstants.Body} = @body, {DatabaseConstants.Version} = @version + 1, last_modified = now() where {DatabaseConstants.Id} = @id and {DatabaseConstants.Version} = @version"; - _loadSql = $"select body, version from {settings.SchemaName}.{definition.TableName} where {DatabaseConstants.Id} = @id"; + $"update {quotedSchema}.{definition.TableName} set {DatabaseConstants.Body} = @body, {DatabaseConstants.Version} = @version + 1, last_modified = now() where {DatabaseConstants.Id} = @id and {DatabaseConstants.Version} = @version"; + _loadSql = $"select body, version from {quotedSchema}.{definition.TableName} where {DatabaseConstants.Id} = @id"; - _deleteSql = $"delete from {settings.SchemaName}.{definition.TableName} where id = @id"; + _deleteSql = $"delete from {quotedSchema}.{definition.TableName} where id = @id"; var table = new Table(new DbObjectName(settings.SchemaName!, definition.TableName)); table.AddColumn("id").AsPrimaryKey(); diff --git a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueListener.cs b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueListener.cs index 0b925bcc9..69558e687 100644 --- a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueListener.cs +++ b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueListener.cs @@ -27,7 +27,7 @@ internal class PostgresqlQueueListener : IListener private readonly string _tryPopMessagesDirectlySql; private readonly string _queueTableName; private readonly string _queueName; - private readonly string _schemaName; + private readonly string _quotedSchemaName; private readonly string _scheduledTableName; private readonly TimeSpan _pollingInterval; @@ -47,7 +47,7 @@ public PostgresqlQueueListener(PostgresqlQueue queue, IWolverineRuntime runtime, _queueTableName = _queue.QueueTable.Identifier.QualifiedName; _scheduledTableName = _queue.ScheduledTable.Identifier.QualifiedName; - _schemaName = _queue.Parent.MessageStorageSchemaName; + _quotedSchemaName = _queue.Parent.MessageStorageSchemaName.QuoteIdentifier(); _tryPopMessagesDirectlySql = $@" WITH message AS ( @@ -223,7 +223,7 @@ public async Task> TryPopDurablyAsync(int count, Durabil { var builder = new BatchBuilder(); - builder.Append($"delete FROM {_queueTableName} where id in (select id from {_schemaName}.{DatabaseConstants.IncomingTable})"); + builder.Append($"delete FROM {_queueTableName} where id in (select id from {_quotedSchemaName}.{DatabaseConstants.IncomingTable})"); builder.StartNewCommand(); builder.Append($"create temporary table temp_pop_{_queueName} ON COMMIT DROP as select id, body, message_type, keep_until from {_queueTableName} ORDER BY {_queueTableName}.timestamp limit "); builder.AppendParameter(count); @@ -232,7 +232,7 @@ public async Task> TryPopDurablyAsync(int count, Durabil builder.StartNewCommand(); builder.Append($"delete from {_queueTableName} where id in (select id from temp_pop_{_queueName})"); builder.StartNewCommand(); - var parameters = builder.AppendWithParameters($"INSERT INTO {_schemaName}.{DatabaseConstants.IncomingTable} (id, status, owner_id, body, message_type, received_at, keep_until) SELECT id, 'Incoming', ?, body, message_type, '{Address}', keep_until FROM temp_pop_{_queueName}"); + var parameters = builder.AppendWithParameters($"INSERT INTO {_quotedSchemaName}.{DatabaseConstants.IncomingTable} (id, status, owner_id, body, message_type, received_at, keep_until) SELECT id, 'Incoming', ?, body, message_type, '{Address}', keep_until FROM temp_pop_{_queueName}"); parameters[0].Value = settings.AssignedNodeNumber; parameters[0].NpgsqlDbType = NpgsqlDbType.Integer; diff --git a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueSender.cs b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueSender.cs index fb2640b6f..462ae456c 100644 --- a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueSender.cs +++ b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueueSender.cs @@ -24,6 +24,7 @@ internal class PostgresqlQueueSender : IPostgresqlQueueSender private readonly string _writeDirectlyToQueueTableSql; private readonly string _writeDirectlyToTheScheduledTable; private readonly string _schemaName; + private readonly string _quotedStorageSchemaName; // Strictly for testing public PostgresqlQueueSender(PostgresqlQueue queue) : this(queue, queue.DataSource, null) @@ -39,22 +40,24 @@ public PostgresqlQueueSender(PostgresqlQueue queue, NpgsqlDataSource dataSource, Destination = PostgresqlQueue.ToUri(queue.Name, databaseName); _schemaName = queue.Parent.TransportSchemaName; + _quotedStorageSchemaName = queue.Parent.MessageStorageSchemaName.QuoteIdentifier(); + var quotedStorageSchema = _quotedStorageSchemaName; _moveFromOutgoingToQueueSql = $@" -INSERT into {queue.QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) -SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.DeliverBy} +INSERT into {queue.QueueTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.KeepUntil}) +SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.DeliverBy} FROM - {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} + {quotedStorageSchema}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = :id; -DELETE FROM {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = :id; +DELETE FROM {quotedStorageSchema}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = :id; "; _moveFromOutgoingToScheduledSql = $@" -INSERT into {queue.ScheduledTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.KeepUntil}) -SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, :time, {DatabaseConstants.DeliverBy} +INSERT into {queue.ScheduledTable.Identifier} ({DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.KeepUntil}) +SELECT {DatabaseConstants.Id}, {DatabaseConstants.Body}, {DatabaseConstants.MessageType}, :time, {DatabaseConstants.DeliverBy} FROM - {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} + {quotedStorageSchema}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = :id; -DELETE FROM {queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = :id; +DELETE FROM {quotedStorageSchema}.{DatabaseConstants.OutgoingTable} WHERE {DatabaseConstants.Id} = :id; "; _writeDirectlyToQueueTableSql = @@ -102,7 +105,7 @@ public async Task ScheduleMessageAsync(Envelope envelope, CancellationToken canc public async Task ScheduleRetryAsync(Envelope envelope, CancellationToken cancellationToken) { await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken); - await conn.CreateCommand($"delete from {_queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.IncomingTable} where id = :id;" + _writeDirectlyToTheScheduledTable) + await conn.CreateCommand($"delete from {_quotedStorageSchemaName}.{DatabaseConstants.IncomingTable} where id = :id;" + _writeDirectlyToTheScheduledTable) .With("id", envelope.Id) .With("body", EnvelopeSerializer.Serialize(envelope)) .With("type", envelope.MessageType!) @@ -186,7 +189,7 @@ public async Task MoveFromOutgoingToScheduledAsync(Envelope envelope, Cancellati if (e.Message.ContainsIgnoreCase("duplicate key value")) { await conn.CreateCommand( - $"delete * from {_queue.Parent.MessageStorageSchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") + $"delete from {_quotedStorageSchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") .With("id", envelope.Id) .ExecuteNonQueryAsync(cancellationToken); diff --git a/src/Persistence/Wolverine.RDBMS/DatabaseSettings.cs b/src/Persistence/Wolverine.RDBMS/DatabaseSettings.cs index 7be643153..7da8e1b2f 100644 --- a/src/Persistence/Wolverine.RDBMS/DatabaseSettings.cs +++ b/src/Persistence/Wolverine.RDBMS/DatabaseSettings.cs @@ -12,6 +12,21 @@ public class DatabaseSettings public string? ConnectionString { get; set; } public string? SchemaName { get; set; } + + /// + /// Returns the schema name properly quoted for use in SQL statements. + /// Uses ANSI SQL double quotes which work for PostgreSQL and SQL Server (with QUOTED_IDENTIFIER ON). + /// + public string QuotedSchemaName + { + get + { + if (string.IsNullOrEmpty(SchemaName)) return SchemaName ?? string.Empty; + // Escape any internal double quotes by doubling them + var escaped = SchemaName.Replace("\"", "\"\""); + return $"\"{escaped}\""; + } + } public AutoCreate AutoCreate { get; set; } = JasperFx.AutoCreate.CreateOrUpdate; /// diff --git a/src/Persistence/Wolverine.RDBMS/Durability/PersistNodeRecord.cs b/src/Persistence/Wolverine.RDBMS/Durability/PersistNodeRecord.cs index 84d717a0d..56ab1bffc 100644 --- a/src/Persistence/Wolverine.RDBMS/Durability/PersistNodeRecord.cs +++ b/src/Persistence/Wolverine.RDBMS/Durability/PersistNodeRecord.cs @@ -24,7 +24,7 @@ public void ConfigureCommand(DbCommandBuilder builder) foreach (var @event in _events) { builder.Append("insert into "); - builder.Append(_settings.SchemaName!); + builder.Append(_settings.QuotedSchemaName); builder.Append('.'); builder.Append(DatabaseConstants.NodeRecordTableName); builder.Append(" (node_number, event_name, description) values ("); diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs index faf56901c..aed123576 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Admin.cs @@ -133,28 +133,28 @@ private async Task acquireMigrationLockAsync(int lockId, T conn, Cancellat public async Task> AllIncomingAsync() { return await CreateCommand( - $"select {DatabaseConstants.IncomingFields} from {SchemaName}.{DatabaseConstants.IncomingTable}") + $"select {DatabaseConstants.IncomingFields} from {QuotedSchemaName}.{DatabaseConstants.IncomingTable}") .FetchListAsync(r => DatabasePersistence.ReadIncomingAsync(r, _cancellation), _cancellation); } public Task> AllOutgoingAsync() { return CreateCommand( - $"select {DatabaseConstants.OutgoingFields} from {SchemaName}.{DatabaseConstants.OutgoingTable}") + $"select {DatabaseConstants.OutgoingFields} from {QuotedSchemaName}.{DatabaseConstants.OutgoingTable}") .FetchListAsync(r => DatabasePersistence.ReadOutgoingAsync(r, _cancellation), _cancellation); } public Task ReleaseAllOwnershipAsync() { return CreateCommand( - $"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0;update {SchemaName}.{DatabaseConstants.OutgoingTable} set owner_id = 0") + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0;update {QuotedSchemaName}.{DatabaseConstants.OutgoingTable} set owner_id = 0") .ExecuteNonQueryAsync(_cancellation); } public Task ReleaseAllOwnershipAsync(int ownerId) { return CreateCommand( - $"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0 where owner_id = @id;update {SchemaName}.{DatabaseConstants.OutgoingTable} set owner_id = 0 where owner_id = @id") + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0 where owner_id = @id;update {QuotedSchemaName}.{DatabaseConstants.OutgoingTable} set owner_id = 0 where owner_id = @id") .With("id", ownerId) .ExecuteNonQueryAsync(_cancellation); } @@ -180,19 +180,19 @@ private async Task truncateEnvelopeDataAsync(DbConnection conn) try { var tx = await conn.BeginTransactionAsync(_cancellation); - await tx.CreateCommand($"delete from {SchemaName}.{DatabaseConstants.OutgoingTable}") + await tx.CreateCommand($"delete from {QuotedSchemaName}.{DatabaseConstants.OutgoingTable}") .ExecuteNonQueryAsync(_cancellation); - await tx.CreateCommand($"delete from {SchemaName}.{DatabaseConstants.IncomingTable}") + await tx.CreateCommand($"delete from {QuotedSchemaName}.{DatabaseConstants.IncomingTable}") .ExecuteNonQueryAsync(_cancellation); - await tx.CreateCommand($"delete from {SchemaName}.{DatabaseConstants.DeadLetterTable}") + await tx.CreateCommand($"delete from {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable}") .ExecuteNonQueryAsync(_cancellation); if (_settings.Role == MessageStoreRole.Main) { - await tx.CreateCommand($"delete from {SchemaName}.{DatabaseConstants.AgentRestrictionsTableName}") + await tx.CreateCommand($"delete from {QuotedSchemaName}.{DatabaseConstants.AgentRestrictionsTableName}") .ExecuteNonQueryAsync(_cancellation); - await tx.CreateCommand($"delete from {SchemaName}.{DatabaseConstants.NodeRecordTableName}") + await tx.CreateCommand($"delete from {QuotedSchemaName}.{DatabaseConstants.NodeRecordTableName}") .ExecuteNonQueryAsync(_cancellation); } diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs index a2c106abe..4af7c4c05 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs @@ -15,7 +15,7 @@ public async Task> SummarizeAllAsync(string { var builder = ToCommandBuilder(); builder.Append($"select {DatabaseConstants.ReceivedAt}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExceptionType}, count(*) as total"); - builder.Append($" from {SchemaName}.{DatabaseConstants.DeadLetterTable}"); + builder.Append($" from {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable}"); builder.Append(" where 1 = 1"); if (range.From.HasValue) @@ -71,7 +71,7 @@ public async Task QueryAsync(DeadLetterEnvelopeQuery var topSelect = toTopClause(query); - builder.Append($"select{topSelect} {DatabaseConstants.DeadLetterFields}, count(*) OVER() as total_rows from {SchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1"); + builder.Append($"select{topSelect} {DatabaseConstants.DeadLetterFields}, count(*) OVER() as total_rows from {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1"); writeDeadLetterWhereClause(query, builder); @@ -166,7 +166,7 @@ public Task DiscardAsync(DeadLetterEnvelopeQuery query, CancellationToken token) { var builder = ToCommandBuilder(); - builder.Append($"delete from {SchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1"); + builder.Append($"delete from {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1"); writeDeadLetterWhereClause(query, builder); @@ -178,7 +178,7 @@ public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) var builder = ToCommandBuilder(); builder.Append( - $"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = "); + $"update {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = "); builder.AppendParameter(true); builder.Append(" where 1 = 1"); writeDeadLetterWhereClause(query, builder); @@ -198,7 +198,7 @@ public async Task EditAndReplayAsync(Guid envelopeId, byte[] newBody, Cancellati var builder = ToCommandBuilder(); builder.Append( - $"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Body} = "); + $"update {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Body} = "); builder.AppendParameter(serialized); builder.Append($", {DatabaseConstants.Replayable} = "); builder.AppendParameter(true); diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetters.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetters.cs index dc9e87a41..66bc95a10 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetters.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetters.cs @@ -9,7 +9,7 @@ public abstract partial class MessageDatabase public async Task DeadLetterEnvelopeByIdAsync(Guid id, string? tenantId = null) { await using var reader = await CreateCommand( - $"select {DatabaseConstants.DeadLetterFields} from {SchemaName}.{DatabaseConstants.DeadLetterTable} where id = @id") + $"select {DatabaseConstants.DeadLetterFields} from {QuotedSchemaName}.{DatabaseConstants.DeadLetterTable} where id = @id") .With("id", id) .ExecuteReaderAsync(_cancellation); diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs index ff2f6ac02..89a5d279e 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs @@ -9,8 +9,8 @@ namespace Wolverine.RDBMS; public abstract partial class MessageDatabase { - private readonly string _markEnvelopeAsHandledById; - private readonly string _incrementIncomingEnvelopeAttempts; + protected string _markEnvelopeAsHandledById; + protected string _incrementIncomingEnvelopeAttempts; public abstract Task> LoadPageOfGloballyOwnedIncomingAsync(Uri listenerAddress, int limit); @@ -22,7 +22,7 @@ public Task ReassignIncomingAsync(int ownerId, IReadOnlyList incoming) var builder = ToCommandBuilder(); foreach (var envelope in incoming) { - builder.Append($"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = "); + builder.Append($"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set owner_id = "); builder.AppendParameter(ownerId); builder.Append($" where {DatabaseConstants.Id} = "); builder.AppendParameter(envelope.Id); @@ -56,7 +56,7 @@ public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exc try { var builder = ToCommandBuilder(); - builder.Append($"delete from {SchemaName}.{DatabaseConstants.IncomingTable} WHERE id = "); + builder.Append($"delete from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} WHERE id = "); builder.AppendParameter(envelope.Id); builder.Append($" and {DatabaseConstants.ReceivedAt} = "); builder.AppendParameter(envelope.Destination!.ToString()); @@ -88,13 +88,13 @@ public async Task MarkIncomingEnvelopeAsHandledAsync(IReadOnlyList env { if (HasDisposed) return; var keepUntil = DateTimeOffset.UtcNow.Add(Durability.KeepAfterMessageHandling); - + var builder = ToCommandBuilder(); builder.AddNamedParameter("keepUntil", keepUntil); foreach (var envelope in envelopes) { - builder.Append($"update {SchemaName}.{DatabaseConstants.IncomingTable} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = @keepUntil where id = "); + builder.Append($"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set {DatabaseConstants.Status} = '{EnvelopeStatus.Handled}', {DatabaseConstants.KeepUntil} = @keepUntil where id = "); builder.AppendParameter(envelope.Id); builder.Append(" and "); builder.Append(DatabaseConstants.ReceivedAt); diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Outgoing.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Outgoing.cs index 1750381f6..f32f82818 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Outgoing.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Outgoing.cs @@ -34,7 +34,7 @@ public Task DeleteOutgoingAsync(Envelope envelope) if (HasDisposed) return Task.CompletedTask; return CreateCommand( - $"delete from {SchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") + $"delete from {QuotedSchemaName}.{DatabaseConstants.OutgoingTable} where id = @id") .With("id", envelope.Id) .ExecuteNonQueryAsync(_cancellation); } diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs index 66c79c8ee..bbef748da 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Scheduled.cs @@ -12,7 +12,7 @@ public Task ScheduleExecutionAsync(Envelope envelope) { Logger.LogDebug("Persisting envelope {EnvelopeId} ({MessageType}) as Scheduled in database inbox at {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); return CreateCommand( - $"update {SchemaName}.{DatabaseConstants.IncomingTable} set execution_time = @time, status = \'{EnvelopeStatus.Scheduled}\', attempts = @attempts, owner_id = {TransportConstants.AnyNode} where id = @id and {DatabaseConstants.ReceivedAt} = @uri;") + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set execution_time = @time, status = \'{EnvelopeStatus.Scheduled}\', attempts = @attempts, owner_id = {TransportConstants.AnyNode} where id = @id and {DatabaseConstants.ReceivedAt} = @uri;") .With("time", envelope.ScheduledTime!.Value) .With("attempts", envelope.Attempts) .With("id", envelope.Id) diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.ScheduledMessages.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.ScheduledMessages.cs index 32ef91ebe..2dc4ec2af 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.ScheduledMessages.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.ScheduledMessages.cs @@ -15,7 +15,7 @@ public async Task QueryAsync(ScheduledMessageQuery quer // Columns: 0=id, 1=message_type, 2=execution_time, 3=received_at, 4=attempts, 5=total_rows builder.Append( - $"select{topSelect} {DatabaseConstants.Id}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.ReceivedAt}, {DatabaseConstants.Attempts}, count(*) OVER() as total_rows from {SchemaName}.{DatabaseConstants.IncomingTable} where {DatabaseConstants.Status} = '{EnvelopeStatus.Scheduled}'"); + $"select{topSelect} {DatabaseConstants.Id}, {DatabaseConstants.MessageType}, {DatabaseConstants.ExecutionTime}, {DatabaseConstants.ReceivedAt}, {DatabaseConstants.Attempts}, count(*) OVER() as total_rows from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where {DatabaseConstants.Status} = '{EnvelopeStatus.Scheduled}'"); writeScheduledMessageWhereClause(query, builder); @@ -77,7 +77,7 @@ public async Task CancelAsync(ScheduledMessageQuery query, CancellationToken tok var builder = ToCommandBuilder(); builder.Append( - $"delete from {SchemaName}.{DatabaseConstants.IncomingTable} where {DatabaseConstants.Status} = '{EnvelopeStatus.Scheduled}'"); + $"delete from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where {DatabaseConstants.Status} = '{EnvelopeStatus.Scheduled}'"); writeScheduledMessageWhereClause(query, builder); @@ -100,7 +100,7 @@ public async Task RescheduleAsync(Guid envelopeId, DateTimeOffset newExecutionTi var builder = ToCommandBuilder(); builder.Append( - $"update {SchemaName}.{DatabaseConstants.IncomingTable} set {DatabaseConstants.ExecutionTime} = "); + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set {DatabaseConstants.ExecutionTime} = "); builder.AppendParameter(newExecutionTime.ToUniversalTime()); builder.Append($" where {DatabaseConstants.Id} = "); builder.AppendParameter(envelopeId); @@ -124,7 +124,7 @@ public async Task> SummarizeAsync(string se { var builder = ToCommandBuilder(); builder.Append( - $"select {DatabaseConstants.MessageType}, count(*) as total from {SchemaName}.{DatabaseConstants.IncomingTable} where {DatabaseConstants.Status} = '{EnvelopeStatus.Scheduled}' group by {DatabaseConstants.MessageType}"); + $"select {DatabaseConstants.MessageType}, count(*) as total from {QuotedSchemaName}.{DatabaseConstants.IncomingTable} where {DatabaseConstants.Status} = '{EnvelopeStatus.Scheduled}' group by {DatabaseConstants.MessageType}"); var cmd = builder.Compile(); diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs index 1ea7aa8ff..eb91a37d3 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs @@ -139,11 +139,17 @@ public string SchemaName { _schemaName = value; - IncomingFullName = $"{value}.{DatabaseConstants.IncomingTable}"; - OutgoingFullName = $"{value}.{DatabaseConstants.OutgoingTable}"; + IncomingFullName = $"{QuotedSchemaName}.{DatabaseConstants.IncomingTable}"; + OutgoingFullName = $"{QuotedSchemaName}.{DatabaseConstants.OutgoingTable}"; } } + /// + /// Returns the schema name properly quoted for use in SQL statements. + /// Override in derived classes for database-specific quoting (e.g., double quotes for PostgreSQL, square brackets for SQL Server). + /// + protected virtual string QuotedSchemaName => SchemaName; + public Task EnqueueAsync(IDatabaseOperation operation) { // Really probably only an issue w/ testing, but this lets us ignore @@ -231,7 +237,7 @@ public async Task ReleaseIncomingAsync(int ownerId, Uri receivedAt) var impacted = await _dataSource .CreateCommand( - $"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0 where owner_id = @owner and {DatabaseConstants.ReceivedAt} = @uri") + $"update {QuotedSchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0 where owner_id = @owner and {DatabaseConstants.ReceivedAt} = @uri") .With("owner", ownerId) .With("uri", receivedAt.ToString()) .ExecuteNonQueryAsync(_cancellation);