Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<PackageVersion Include="AWSSDK.SQS" Version="3.7.2.119" />
<PackageVersion Include="Consul" Version="1.6.10.7" />
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="1.0.0-beta13" />
<PackageVersion Include="Google.Protobuf" Version="3.21.7" />
<PackageVersion Include="Google.Protobuf" Version="3.22.0" />
<PackageVersion Include="protobuf-net" Version="3.1.22" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="ZooKeeperNetEx" Version="3.4.12.4" />
Expand Down Expand Up @@ -93,7 +93,7 @@
<PackageVersion Include="Utf8Json" Version="1.3.7" />
<PackageVersion Include="SpanJson" Version="3.3.1" />
<PackageVersion Include="Hyperion" Version="0.12.2" />
<PackageVersion Include="Grpc.Tools" Version="2.50.0" />
<PackageVersion Include="Grpc.Tools" Version="2.52.0" />
<!-- Tooling related packages -->
<PackageVersion Include="Microsoft.SourceLink.AzureRepos.Git" Version="$(SourceLinkVersion)" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="$(SourceLinkVersion)" />
Expand Down
27 changes: 17 additions & 10 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.GrainDirectory.Redi
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extensions\Tester.Redis\Tester.Redis.csproj", "{F13247A0-70C9-4200-9CB1-2002CB8105E0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -466,14 +468,6 @@ Global
{16B9B850-ED3B-4B45-B0F2-3F802D44F382}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16B9B850-ED3B-4B45-B0F2-3F802D44F382}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16B9B850-ED3B-4B45-B0F2-3F802D44F382}.Release|Any CPU.Build.0 = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.Build.0 = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.Build.0 = Release|Any CPU
{D1214CD3-EB99-4420-9E30-A50ACFD66A48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D1214CD3-EB99-4420-9E30-A50ACFD66A48}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D1214CD3-EB99-4420-9E30-A50ACFD66A48}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -562,6 +556,18 @@ Global
{2268B639-02B8-4903-B719-65F7EBD05D52}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2268B639-02B8-4903-B719-65F7EBD05D52}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2268B639-02B8-4903-B719-65F7EBD05D52}.Release|Any CPU.Build.0 = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.Build.0 = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.Build.0 = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -640,8 +646,6 @@ Global
{8E01A6EB-DE96-4DFD-AA7D-B07078F12372} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A}
{D53D80CC-3E14-4499-B03F-610A5D3F6359} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{16B9B850-ED3B-4B45-B0F2-3F802D44F382} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34}
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{D1214CD3-EB99-4420-9E30-A50ACFD66A48} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A}
{65D8F6B3-DEE2-412B-95F2-77274461D58C} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{A145AFFC-E0CF-4861-AB0C-427C7670FF37} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
Expand All @@ -666,6 +670,9 @@ Global
{671AE42C-974A-467B-BE89-0A3F706B5B21} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{28B35216-0C6E-4CF1-8C14-7D9A4BE161A5} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{2268B639-02B8-4903-B719-65F7EBD05D52} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
29 changes: 0 additions & 29 deletions src/Serializers/Directory.Build.props

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Microsoft.Orleans.OrleansGoogleUtils</PackageId>
<Title>Microsoft Orleans Google Utilities</Title>
<Description>Library of utility types for Google of Microsoft Orleans.</Description>
<PackageTags>$(PackageTags) ProtoBuf</PackageTags>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
</PropertyGroup>

<PropertyGroup>
<AssemblyName>Orleans.Serialization.Protobuf</AssemblyName>
<RootNamespace>OrleansGoogleUtils</RootNamespace>
<PackageId>Microsoft.Orleans.Serialization.Protobuf</PackageId>
<TargetFrameworks>$(DefaultTargetFrameworks);netstandard2.1</TargetFrameworks>
<PackageDescription>Google.Protobuf integration for Orleans.Serialization</PackageDescription>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
<IsOrleansFrameworkPart>false</IsOrleansFrameworkPart>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufVersion)" />
<PackageReference Include="Microsoft.CSharp" Version="$(MicrosoftCSharpVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Orleans.Serialization\Orleans.Serialization.csproj" />
</ItemGroup>

</Project>
219 changes: 219 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/ProtobufCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
using Google.Protobuf;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Buffers.Adaptors;
using Orleans.Serialization.Cloning;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.Serializers;
using Orleans.Serialization.WireProtocol;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace Orleans.Serialization;

[Alias(WellKnownAlias)]
public sealed class ProtobufCodec : IGeneralizedCodec, IGeneralizedCopier, ITypeFilter
{
public const string WellKnownAlias = "protobuf";

private static readonly Type SelfType = typeof(ProtobufCodec);
private static readonly ConcurrentDictionary<RuntimeTypeHandle, MessageParser> MessageParsers = new();

private readonly ICodecSelector[] _serializableTypeSelectors;
private readonly ICopierSelector[] _copyableTypeSelectors;

/// <summary>
/// Initializes a new instance of the <see cref="ProtobufCodec"/> class.
/// </summary>
/// <param name="serializableTypeSelectors">Filters used to indicate which types should be serialized by this codec.</param>
/// <param name="copyableTypeSelectors">Filters used to indicate which types should be copied by this codec.</param>
public ProtobufCodec(
IEnumerable<ICodecSelector> serializableTypeSelectors,
IEnumerable<ICopierSelector> copyableTypeSelectors)
{
_serializableTypeSelectors = serializableTypeSelectors.Where(t => string.Equals(t.CodecName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
_copyableTypeSelectors = copyableTypeSelectors.Where(t => string.Equals(t.CopierName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
}

/// <inheritdoc/>
public object DeepCopy(object input, CopyContext context)
{
if (!context.TryGetCopy(input, out object result))
{
if (input is not IMessage protobufMessage)
{
throw new InvalidOperationException("Input is not a protobuf message");
}

var messageSize = protobufMessage.CalculateSize();
using var buffer = new PooledArrayBufferWriter();
var spanBuffer = buffer.GetSpan(messageSize)[..messageSize];
protobufMessage.WriteTo(spanBuffer);

result = protobufMessage.Descriptor.Parser.ParseFrom(spanBuffer);

context.RecordCopy(input, result);
}

return result;
}

/// <inheritdoc/>
bool IGeneralizedCodec.IsSupportedType(Type type)
{
if (type == SelfType)
{
return true;
}

foreach (var selector in _serializableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return IsProtobufMessage(type);
}
}

return false;
}

/// <inheritdoc/>
bool IGeneralizedCopier.IsSupportedType(Type type)
{
foreach (var selector in _copyableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return IsProtobufMessage(type);
}
}

return false;
}

/// <inheritdoc/>
bool? ITypeFilter.IsTypeAllowed(Type type)
{
if (!typeof(IMessage).IsAssignableFrom(type))
{
return null;
}

return ((IGeneralizedCodec)this).IsSupportedType(type) || ((IGeneralizedCopier)this).IsSupportedType(type);
}

private static bool IsProtobufMessage(Type type)
{
if (!MessageParsers.ContainsKey(type.TypeHandle))
{
if (Activator.CreateInstance(type) is not IMessage protobufMessageInstance)
{
return false;
}

MessageParsers.TryAdd(type.TypeHandle, protobufMessageInstance.Descriptor.Parser);
}

return true;
}

/// <inheritdoc/>
object IFieldCodec.ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.IsReference)
{
return ReferenceCodec.ReadReference(ref reader, field.FieldType);
}

field.EnsureWireTypeTagDelimited();

var placeholderReferenceId = ReferenceCodec.CreateRecordPlaceholder(reader.Session);
object result = null;
Type type = null;
uint fieldId = 0;

while (true)
{
var header = reader.ReadFieldHeader();
if (header.IsEndBaseOrEndObject)
{
break;
}

fieldId += header.FieldIdDelta;
switch (fieldId)
{
case 0:
ReferenceCodec.MarkValueField(reader.Session);
type = reader.Session.TypeCodec.ReadLengthPrefixed(ref reader);
break;
case 1:
if (type is null)
{
ThrowTypeFieldMissing();
}

if (!MessageParsers.TryGetValue(type.TypeHandle, out var messageParser))
{
throw new ArgumentException($"No parser found for the expected type {type.Name}", nameof(TInput));
}

ReferenceCodec.MarkValueField(reader.Session);
var length = (int)reader.ReadVarUInt32();

using (var buffer = new PooledArrayBufferWriter())
{
var spanBuffer = buffer.GetSpan(length)[..length];
reader.ReadBytes(spanBuffer);
result = messageParser.ParseFrom(spanBuffer);
}
break;
default:
reader.ConsumeUnknownField(header);
break;
}
}

ReferenceCodec.RecordObject(reader.Session, result, placeholderReferenceId);
return result;
}

private static void ThrowTypeFieldMissing() => throw new RequiredFieldMissingException("Serialized value is missing its type field.");

/// <inheritdoc/>
void IFieldCodec.WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, object value)
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

if (value is not IMessage protobufMessage)
{
throw new ArgumentException("The provided value for serialization in not an instance of IMessage");
}

writer.WriteFieldHeader(fieldIdDelta, expectedType, SelfType, WireType.TagDelimited);

// Write the type name
ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(0, WireType.LengthPrefixed);
writer.Session.TypeCodec.WriteLengthPrefixed(ref writer, value.GetType());

var messageSize = protobufMessage.CalculateSize();

using var buffer = new PooledArrayBufferWriter();
var spanBuffer = buffer.GetSpan(messageSize)[..messageSize];

// Write the serialized payload
protobufMessage.WriteTo(spanBuffer);

ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(1, WireType.LengthPrefixed);
writer.WriteVarUInt32((uint)spanBuffer.Length);
writer.Write(spanBuffer);

writer.WriteEndObject();
}
}
Loading