Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/AdoNet/Orleans.Persistence.AdoNet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Before using the ADO.NET provider, you need to set up the necessary database tab
- [MySQL Scripts](https://github.com/dotnet/orleans/tree/main/src/AdoNet/Orleans.Persistence.AdoNet/MySQL-Persistence.sql)
- [PostgreSQL Scripts](https://github.com/dotnet/orleans/tree/main/src/AdoNet/Orleans.Persistence.AdoNet/PostgreSQL-Persistence.sql)
- [Oracle Scripts](https://github.com/dotnet/orleans/tree/main/src/AdoNet/Orleans.Persistence.AdoNet/Oracle-Persistence.sql)
- [SQLite Scripts](https://github.com/dotnet/orleans/tree/main/src/AdoNet/Orleans.Persistence.AdoNet/Sqlite-Persistence.sql)

## Documentation
For more comprehensive documentation, please refer to:
Expand Down
176 changes: 176 additions & 0 deletions src/AdoNet/Orleans.Persistence.AdoNet/Sqlite-Persistence.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
-- The design criteria for this table are:
--
-- 1. It can contain arbitrary content serialized as binary, XML or JSON. These formats
-- are supported to allow one to take advantage of in-storage processing capabilities for
-- these types if required. This should not incur extra cost on storage.
--
-- 2. The table design should scale with the idea of tens or hundreds (or even more) types
-- of grains that may operate with even hundreds of thousands of grain IDs within each
-- type of a grain.
--
-- 3. The table and its associated operations should remain stable. There should not be
-- structural reason for unexpected delays in operations. It should be possible to also
-- insert data reasonably fast without resource contention.
--
-- 4. For reasons in 2. and 3., the index should be as narrow as possible so it fits well in
-- memory and should it require maintenance, isn't resource intensive. For this
-- reason the index is narrow by design (ideally non-clustered). Currently the entity
-- is recognized in the storage by the grain type and its ID, which are unique in Orleans silo.
-- The ID is the grain ID bytes (if string type UTF-8 bytes) and possible extension key as UTF-8
-- bytes concatenated with the ID and then hashed.
--
-- Reason for hashing: Database engines usually limit the length of the column sizes, which
-- would artificially limit the length of IDs or types. Even when within limitations, the
-- index would be thick and consume more memory.
--
-- In the current setup the ID and the type are hashed into two INT type instances, which
-- are made a compound index. When there are no collisions, the index can quickly locate
-- the unique row. Along with the hashed index values, the NVARCHAR(nnn) values are also
-- stored and they are used to prune hash collisions down to only one result row.
--
-- 5. The design leads to duplication in the storage. It is reasonable to assume there will
-- a low number of services with a given service ID operational at any given time. Or that
-- compared to the number of grain IDs, there are a fairly low number of different types of
-- grain. The catch is that were these data separated to another table, it would make INSERT
-- and UPDATE operations complicated and would require joins, temporary variables and additional
-- indexes or some combinations of them to make it work. It looks like fitting strategy
-- could be to use table compression.
--
-- 6. For the aforementioned reasons, grain state DELETE will set NULL to the data fields
-- and updates the Version number normally. This should alleviate the need for index or
-- statistics maintenance with the loss of some bytes of storage space. The table can be scrubbed
-- in a separate maintenance operation.
--
-- 7. In the storage operations queries the columns need to be in the exact same order
-- since the storage table operations support optionally streaming.
CREATE TABLE OrleansStorage
(
-- These are for the book keeping. Orleans calculates
-- these hashes (see RelationalStorageProvide implementation),
-- which are signed 32 bit integers mapped to the *Hash fields.
-- The mapping is done in the code. The
-- *String columns contain the corresponding clear name fields.
--
-- If there are duplicates, they are resolved by using GrainIdN0,
-- GrainIdN1, GrainIdExtensionString and GrainTypeString fields.
-- It is assumed these would be rarely needed.
GrainIdHash INT NOT NULL,
GrainIdN0 BIGINT NOT NULL,
GrainIdN1 BIGINT NOT NULL,
GrainTypeHash INT NOT NULL,
GrainTypeString NVARCHAR(512) NOT NULL,
GrainIdExtensionString NVARCHAR(512) NULL,
ServiceId NVARCHAR(150) NOT NULL,
-- Payload
PayloadBinary BLOB NULL,
-- Informational field, no other use.
ModifiedOn DATETIME NOT NULL,
-- The version of the stored payload.
Version INT NULL
-- The following would in principle be the primary key, but it would be too thick
-- to be indexed, so the values are hashed and only collisions will be solved
-- by using the fields. That is, after the indexed queries have pinpointed the right
-- rows down to [0, n] relevant ones, n being the number of collided value pairs.
);

CREATE INDEX IX_OrleansStorage ON OrleansStorage(GrainIdHash, GrainTypeHash);


-- Updates an existing grain state with optimistic concurrency control or inserts it if it does not exist.
INSERT INTO OrleansQuery (QueryKey, QueryText) VALUES
('WriteToStorageKey', '
BEGIN TRANSACTION;

CREATE TEMP TABLE IF NOT EXISTS OrleansStorageWriteState
(
TotalChangesBefore INT NOT NULL
);
DELETE FROM OrleansStorageWriteState;
INSERT INTO OrleansStorageWriteState (TotalChangesBefore) VALUES (total_changes() + 1);

UPDATE OrleansStorage
SET
PayloadBinary = @PayloadBinary,
ModifiedOn = datetime(''now''),
Version = Version + 1
WHERE
GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash
AND GrainIdN0 = @GrainIdN0 AND GrainIdN1 = @GrainIdN1
AND GrainTypeString = @GrainTypeString
AND (GrainIdExtensionString = @GrainIdExtensionString OR (GrainIdExtensionString IS NULL AND @GrainIdExtensionString IS NULL))
AND ServiceId = @ServiceId
AND Version = @GrainStateVersion;

INSERT INTO OrleansStorage (GrainIdHash, GrainIdN0, GrainIdN1, GrainTypeHash, GrainTypeString, GrainIdExtensionString, ServiceId, PayloadBinary, ModifiedOn, Version)
SELECT @GrainIdHash, @GrainIdN0, @GrainIdN1, @GrainTypeHash, @GrainTypeString, @GrainIdExtensionString, @ServiceId, @PayloadBinary, datetime(''now''), 1
WHERE changes() = 0
AND @GrainStateVersion IS NULL
AND NOT EXISTS (
SELECT 1 FROM OrleansStorage
WHERE GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash
AND GrainIdN0 = @GrainIdN0 AND GrainIdN1 = @GrainIdN1
AND GrainTypeString = @GrainTypeString
AND (GrainIdExtensionString = @GrainIdExtensionString OR (GrainIdExtensionString IS NULL AND @GrainIdExtensionString IS NULL))
AND ServiceId = @ServiceId
);

SELECT Version AS NewGrainStateVersion FROM OrleansStorage
WHERE total_changes() > (SELECT TotalChangesBefore FROM OrleansStorageWriteState LIMIT 1)
AND GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash
AND GrainIdN0 = @GrainIdN0 AND GrainIdN1 = @GrainIdN1
AND GrainTypeString = @GrainTypeString
AND (GrainIdExtensionString = @GrainIdExtensionString OR (GrainIdExtensionString IS NULL AND @GrainIdExtensionString IS NULL))
AND ServiceId = @ServiceId;

SELECT @GrainStateVersion AS NewGrainStateVersion
WHERE total_changes() = (SELECT TotalChangesBefore FROM OrleansStorageWriteState LIMIT 1)
AND @GrainStateVersion IS NOT NULL;

COMMIT;
');

-- Retrieves the binary payload and the current version of a specific grain state.
INSERT INTO OrleansQuery (QueryKey, QueryText) VALUES
('ReadFromStorageKey', '
SELECT
PayloadBinary,
Version AS Version
FROM
OrleansStorage
WHERE
GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash
AND GrainIdN0 = @GrainIdN0 AND GrainIdN1 = @GrainIdN1
AND GrainTypeString = @GrainTypeString
AND (GrainIdExtensionString = @GrainIdExtensionString OR (GrainIdExtensionString IS NULL AND @GrainIdExtensionString IS NULL))
AND ServiceId = @ServiceId
LIMIT 1;
');

-- Clears the grain state by setting the payload to null and incrementing the version for consistency.
INSERT INTO OrleansQuery (QueryKey, QueryText) VALUES
('ClearStorageKey', '
UPDATE OrleansStorage
SET
PayloadBinary = NULL,
ModifiedOn = datetime(''now''),
Version = Version + 1
WHERE
GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash
AND GrainIdN0 = @GrainIdN0 AND GrainIdN1 = @GrainIdN1
AND GrainTypeString = @GrainTypeString
AND (GrainIdExtensionString = @GrainIdExtensionString OR (GrainIdExtensionString IS NULL AND @GrainIdExtensionString IS NULL))
AND ServiceId = @ServiceId
AND Version = @GrainStateVersion;

SELECT Version AS NewGrainStateVersion FROM OrleansStorage
WHERE changes() > 0
AND GrainIdHash = @GrainIdHash AND GrainTypeHash = @GrainTypeHash
AND GrainIdN0 = @GrainIdN0 AND GrainIdN1 = @GrainIdN1
AND GrainTypeString = @GrainTypeString
AND (GrainIdExtensionString = @GrainIdExtensionString OR (GrainIdExtensionString IS NULL AND @GrainIdExtensionString IS NULL))
AND ServiceId = @ServiceId;

SELECT @GrainStateVersion AS NewGrainStateVersion
WHERE changes() = 0
AND @GrainStateVersion IS NOT NULL;
');
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private static InconsistentStateException CheckVersionInconsistency(string opera
//it means two grains were activated an the other one succeeded in writing its state.
//
//NOTE: the storage could return also the new and old ETag (Version), but currently it doesn't.
if (storageVersion == grainVersion || storageVersion == string.Empty)
if (storageVersion == grainVersion || storageVersion == string.Empty || (storageVersion is null && grainVersion is not null))
{
//TODO: Note that this error message should be canonical across back-ends.
return new InconsistentStateException($"Version conflict ({operation}): ServiceId={serviceId} ProviderName={providerName} GrainType={normalizedGrainType} GrainId={grainId} ETag={grainVersion}.");
Expand Down
34 changes: 34 additions & 0 deletions src/AdoNet/Shared/Sqlite-Main.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Implementation notes:

1) The general idea is that data is read and written through Orleans specific queries.
Orleans operates on column names and types when reading and on parameter names and types when writing.

2) The implementations *must* preserve input and output names and types. Orleans uses these parameters to reads query results by name and type.
Vendor and deployment specific tuning is allowed and contributions are encouraged as long as the interface contract
is maintained.

3) The implementation across vendor specific scripts *should* preserve the constraint names. This simplifies troubleshooting
by virtue of uniform naming across concrete implementations.

5) ETag for Orleans is an opaque column that represents a unique version. The type of its actual implementation
is not important as long as it represents a unique version. In this implementation we use integers for versioning

6) For the sake of being explicit and removing ambiguity, Orleans expects some queries to return either TRUE as >0 value

or FALSE as =0 value. That is, affected rows or such does not matter. If an error is raised or an exception is thrown
the query *must* ensure the entire transaction is rolled back and may either return FALSE or propagate the exception.
Orleans handles exception as a failure and will retry.

7) The implementation follows the Extended Orleans membership protocol. For more information, see at:
https://learn.microsoft.com/dotnet/orleans/implementation/cluster-management
https://github.com/dotnet/orleans/blob/main/src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs
*/

-- These settings improves throughput of the database by reducing locking by better separating readers from writers.
-- SQL Server 2012 and newer can refer to itself as CURRENT. Older ones need a workaround.
CREATE TABLE OrleansQuery
(
QueryKey TEXT PRIMARY KEY,
QueryText TEXT NOT NULL
);
12 changes: 11 additions & 1 deletion src/AdoNet/Shared/Storage/DbConstantsStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,17 @@ internal static class DbConstantsStore
supportsStreamNatively: true,
supportsCommandCancellation: true,
commandInterceptor: NoOpCommandInterceptor.Instance)
}
},
{
AdoNetInvariants.InvariantNameSqlLite,
new DbConstants(startEscapeIndicator: '\"',
endEscapeIndicator: '\"',
unionAllSelectTemplate: " UNION ALL SELECT ",
isSynchronousAdoNetImplementation: false,
supportsStreamNatively: false,
supportsCommandCancellation: true,
commandInterceptor: NoOpCommandInterceptor.Instance)
}
};

public static DbConstants GetDbConstants(string invariantName)
Expand Down
2 changes: 1 addition & 1 deletion test/Extensions/TesterAdoNet/PackageReferences.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal static class PackageReferences
MySql.Data.MySqlClient.MySqlClientFactory.Instance,
//Oracle.ManagedDataAccess.Client.OracleClientFactory.Instance, // no tests currently
Npgsql.NpgsqlFactory.Instance,
//Microsoft.Data.Sqlite.SqliteFactory.Instance, // no tests currently
Microsoft.Data.Sqlite.SqliteFactory.Instance,
//MySqlConnector.MySqlConnectorFactory.Instance, // no tests currently
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Serializers;
using Orleans.Storage;
using Orleans.Tests.SqlUtils;
using TestExtensions;

namespace Tester.AdoNet.Persistence
{
public sealed class SqlitePersistenceGrainStorageFixture : TestEnvironmentFixture
{
public const string AdoInvariant = AdoNetInvariants.InvariantNameSqlLite;

public SqlitePersistenceGrainStorageFixture()
{
this.DatabaseFilePath = Path.Combine(Path.GetTempPath(), $"orleans-sqlite-persistence-{Guid.NewGuid():N}.db");
this.ConnectionString = new SqliteConnectionStringBuilder
{
DataSource = this.DatabaseFilePath,
Mode = SqliteOpenMode.ReadWriteCreate
}.ToString();

this.DatabaseStorage = RelationalStorage.CreateInstance(AdoInvariant, this.ConnectionString);
this.InitializeSchemaAsync().GetAwaiter().GetResult();
this.Storage = this.CreateGrainStorageAsync().GetAwaiter().GetResult();
}

public string DatabaseFilePath { get; }

public string ConnectionString { get; }

public IRelationalStorage DatabaseStorage { get; }

public AdoNetGrainStorage Storage { get; }

public async Task InitializeSchemaAsync()
{
await this.DatabaseStorage.ExecuteAsync(await LoadScriptAsync("Sqlite-Main.sql"), command => { }).ConfigureAwait(false);
await this.DatabaseStorage.ExecuteAsync(await LoadScriptAsync("Sqlite-Persistence.sql"), command => { }).ConfigureAwait(false);
}

public async Task<AdoNetGrainStorage> CreateGrainStorageAsync(string storageName = "SqliteGrainStorageForTest")
{
var providerRuntime = new ClientProviderRuntime(
this.InternalGrainFactory,
this.Services,
this.Services.GetRequiredService<ClientGrainContext>());

var options = new AdoNetGrainStorageOptions
{
ConnectionString = this.ConnectionString,
Invariant = AdoInvariant,
GrainStorageSerializer = new JsonGrainStorageSerializer(providerRuntime.ServiceProvider.GetService<OrleansJsonSerializer>())
};

var storageProvider = new AdoNetGrainStorage(
providerRuntime.ServiceProvider.GetRequiredService<IActivatorProvider>(),
providerRuntime.ServiceProvider.GetRequiredService<ILogger<AdoNetGrainStorage>>(),
Options.Create(options),
Options.Create(new ClusterOptions { ServiceId = Guid.NewGuid().ToString() }),
storageName);

ISiloLifecycleSubject siloLifeCycle = new SiloLifecycleSubject(NullLoggerFactory.Instance.CreateLogger<SiloLifecycleSubject>());
storageProvider.Participate(siloLifeCycle);
await siloLifeCycle.OnStart(CancellationToken.None).ConfigureAwait(false);
return storageProvider;
}

private static async Task<string> LoadScriptAsync(string fileName)
{
var scriptPath = Path.Combine(AppContext.BaseDirectory, fileName);
if (!File.Exists(scriptPath))
{
scriptPath = Path.Combine(Environment.CurrentDirectory, fileName);
}

if (!File.Exists(scriptPath))
{
throw new FileNotFoundException($"Unable to locate SQL script '{fileName}'.", fileName);
}

return await File.ReadAllTextAsync(scriptPath).ConfigureAwait(false);
}
}
}
Loading
Loading