Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<PackageVersion Include="AWSSDK.SQS" Version="3.7.2.119" />
<PackageVersion Include="Consul" Version="1.6.10.7" />
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="1.0.0-beta13" />
<PackageVersion Include="Google.Protobuf" Version="3.21.7" />
<PackageVersion Include="Google.Protobuf" Version="3.22.0" />
<PackageVersion Include="protobuf-net" Version="3.1.22" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="ZooKeeperNetEx" Version="3.4.12.4" />
Expand Down Expand Up @@ -93,7 +93,7 @@
<PackageVersion Include="Utf8Json" Version="1.3.7" />
<PackageVersion Include="SpanJson" Version="3.3.1" />
<PackageVersion Include="Hyperion" Version="0.12.2" />
<PackageVersion Include="Grpc.Tools" Version="2.50.0" />
<PackageVersion Include="Grpc.Tools" Version="2.52.0" />
<!-- Tooling related packages -->
<PackageVersion Include="Microsoft.SourceLink.AzureRepos.Git" Version="$(SourceLinkVersion)" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="$(SourceLinkVersion)" />
Expand Down
27 changes: 17 additions & 10 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.GrainDirectory.Redi
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extensions\Tester.Redis\Tester.Redis.csproj", "{F13247A0-70C9-4200-9CB1-2002CB8105E0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -466,14 +468,6 @@ Global
{16B9B850-ED3B-4B45-B0F2-3F802D44F382}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16B9B850-ED3B-4B45-B0F2-3F802D44F382}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16B9B850-ED3B-4B45-B0F2-3F802D44F382}.Release|Any CPU.Build.0 = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.Build.0 = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.Build.0 = Release|Any CPU
{D1214CD3-EB99-4420-9E30-A50ACFD66A48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D1214CD3-EB99-4420-9E30-A50ACFD66A48}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D1214CD3-EB99-4420-9E30-A50ACFD66A48}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -562,6 +556,18 @@ Global
{2268B639-02B8-4903-B719-65F7EBD05D52}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2268B639-02B8-4903-B719-65F7EBD05D52}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2268B639-02B8-4903-B719-65F7EBD05D52}.Release|Any CPU.Build.0 = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936}.Release|Any CPU.Build.0 = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F13247A0-70C9-4200-9CB1-2002CB8105E0}.Release|Any CPU.Build.0 = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -640,8 +646,6 @@ Global
{8E01A6EB-DE96-4DFD-AA7D-B07078F12372} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A}
{D53D80CC-3E14-4499-B03F-610A5D3F6359} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A}
{16B9B850-ED3B-4B45-B0F2-3F802D44F382} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34}
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{D1214CD3-EB99-4420-9E30-A50ACFD66A48} = {FE2E08C6-9C3B-4AEE-AE07-CCA387580D7A}
{65D8F6B3-DEE2-412B-95F2-77274461D58C} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{A145AFFC-E0CF-4861-AB0C-427C7670FF37} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
Expand All @@ -666,6 +670,9 @@ Global
{671AE42C-974A-467B-BE89-0A3F706B5B21} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{28B35216-0C6E-4CF1-8C14-7D9A4BE161A5} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{2268B639-02B8-4903-B719-65F7EBD05D52} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{CCEF897C-F4F8-48F0-8F95-CC1487EE2936} = {A734945A-36DC-485E-B84D-3C2D395BC7BE}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
29 changes: 0 additions & 29 deletions src/Serializers/Directory.Build.props

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Microsoft.Orleans.OrleansGoogleUtils</PackageId>
<Title>Microsoft Orleans Google Utilities</Title>
<Description>Library of utility types for Google of Microsoft Orleans.</Description>
<PackageTags>$(PackageTags) ProtoBuf</PackageTags>
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
</PropertyGroup>

<PropertyGroup>
<AssemblyName>Orleans.Serialization.Protobuf</AssemblyName>
<RootNamespace>OrleansGoogleUtils</RootNamespace>
<PackageId>Microsoft.Orleans.Serialization.Protobuf</PackageId>
<TargetFrameworks>$(DefaultTargetFrameworks);netstandard2.1</TargetFrameworks>
<PackageDescription>Google.Protobuf integration for Orleans.Serialization</PackageDescription>
<OrleansBuildTimeCodeGen>true</OrleansBuildTimeCodeGen>
<IsOrleansFrameworkPart>false</IsOrleansFrameworkPart>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufVersion)" />
<PackageReference Include="Microsoft.CSharp" Version="$(MicrosoftCSharpVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Orleans.Serialization\Orleans.Serialization.csproj" />
</ItemGroup>

</Project>
219 changes: 219 additions & 0 deletions src/Serializers/Orleans.Serialization.Protobuf/ProtobufCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
using Google.Protobuf;
using Orleans.Serialization.Buffers;
using Orleans.Serialization.Buffers.Adaptors;
using Orleans.Serialization.Cloning;
using Orleans.Serialization.Codecs;
using Orleans.Serialization.Serializers;
using Orleans.Serialization.WireProtocol;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace Orleans.Serialization;

[Alias(WellKnownAlias)]
public sealed class ProtobufCodec : IGeneralizedCodec, IGeneralizedCopier, ITypeFilter
{
public const string WellKnownAlias = "protobuf";

private static readonly Type SelfType = typeof(ProtobufCodec);
private static readonly ConcurrentDictionary<RuntimeTypeHandle, MessageParser> MessageParsers = new();

private readonly ICodecSelector[] _serializableTypeSelectors;
private readonly ICopierSelector[] _copyableTypeSelectors;

/// <summary>
/// Initializes a new instance of the <see cref="ProtobufCodec"/> class.
/// </summary>
/// <param name="serializableTypeSelectors">Filters used to indicate which types should be serialized by this codec.</param>
/// <param name="copyableTypeSelectors">Filters used to indicate which types should be copied by this codec.</param>
public ProtobufCodec(
IEnumerable<ICodecSelector> serializableTypeSelectors,
IEnumerable<ICopierSelector> copyableTypeSelectors)
{
_serializableTypeSelectors = serializableTypeSelectors.Where(t => string.Equals(t.CodecName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
_copyableTypeSelectors = copyableTypeSelectors.Where(t => string.Equals(t.CopierName, WellKnownAlias, StringComparison.Ordinal)).ToArray();
}

/// <inheritdoc/>
public object DeepCopy(object input, CopyContext context)
{
if (!context.TryGetCopy(input, out object result))
{
if (input is not IMessage protobufMessage)
{
throw new InvalidOperationException("Input is not a protobuf message");
}

var messageSize = protobufMessage.CalculateSize();
using var buffer = new PooledArrayBufferWriter();
var spanBuffer = buffer.GetSpan(messageSize)[..messageSize];
protobufMessage.WriteTo(spanBuffer);

result = protobufMessage.Descriptor.Parser.ParseFrom(spanBuffer);

context.RecordCopy(input, result);
}

return result;
}

/// <inheritdoc/>
bool IGeneralizedCodec.IsSupportedType(Type type)
{
if (type == SelfType)
{
return true;
}

foreach (var selector in _serializableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return IsProtobufMessage(type);
}
}

return false;
}

/// <inheritdoc/>
bool IGeneralizedCopier.IsSupportedType(Type type)
{
foreach (var selector in _copyableTypeSelectors)
{
if (selector.IsSupportedType(type))
{
return IsProtobufMessage(type);
}
}

return false;
}

/// <inheritdoc/>
bool? ITypeFilter.IsTypeAllowed(Type type)
{
if (!typeof(IMessage).IsAssignableFrom(type))
{
return null;
}

return ((IGeneralizedCodec)this).IsSupportedType(type) || ((IGeneralizedCopier)this).IsSupportedType(type);
}

private static bool IsProtobufMessage(Type type)
{
if (!MessageParsers.ContainsKey(type.TypeHandle))
{
if (Activator.CreateInstance(type) is not IMessage protobufMessageInstance)
{
return false;
}

MessageParsers.TryAdd(type.TypeHandle, protobufMessageInstance.Descriptor.Parser);
}

return true;
}

/// <inheritdoc/>
object IFieldCodec.ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
if (field.IsReference)
{
return ReferenceCodec.ReadReference(ref reader, field.FieldType);
}

field.EnsureWireTypeTagDelimited();

var placeholderReferenceId = ReferenceCodec.CreateRecordPlaceholder(reader.Session);
object result = null;
Type type = null;
uint fieldId = 0;

while (true)
{
var header = reader.ReadFieldHeader();
if (header.IsEndBaseOrEndObject)
{
break;
}

fieldId += header.FieldIdDelta;
switch (fieldId)
{
case 0:
ReferenceCodec.MarkValueField(reader.Session);
type = reader.Session.TypeCodec.ReadLengthPrefixed(ref reader);
break;
case 1:
if (type is null)
{
ThrowTypeFieldMissing();
}

if (!MessageParsers.TryGetValue(type.TypeHandle, out var messageParser))
{
throw new ArgumentException($"No parser found for the expected type {type.Name}", nameof(TInput));
}

ReferenceCodec.MarkValueField(reader.Session);
var length = (int)reader.ReadVarUInt32();

using (var buffer = new PooledArrayBufferWriter())
{
var spanBuffer = buffer.GetSpan(length)[..length];
reader.ReadBytes(spanBuffer);
result = messageParser.ParseFrom(spanBuffer);
}
break;
default:
reader.ConsumeUnknownField(header);
break;
}
}

ReferenceCodec.RecordObject(reader.Session, result, placeholderReferenceId);
return result;
}

private static void ThrowTypeFieldMissing() => throw new RequiredFieldMissingException("Serialized value is missing its type field.");

/// <inheritdoc/>
void IFieldCodec.WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, object value)
{
if (ReferenceCodec.TryWriteReferenceField(ref writer, fieldIdDelta, expectedType, value))
{
return;
}

if (value is not IMessage protobufMessage)
{
throw new ArgumentException("The provided value for serialization in not an instance of IMessage");
}

writer.WriteFieldHeader(fieldIdDelta, expectedType, SelfType, WireType.TagDelimited);

// Write the type name
ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(0, WireType.LengthPrefixed);
writer.Session.TypeCodec.WriteLengthPrefixed(ref writer, value.GetType());

var messageSize = protobufMessage.CalculateSize();

using var buffer = new PooledArrayBufferWriter();
var spanBuffer = buffer.GetSpan(messageSize)[..messageSize];

// Write the serialized payload
protobufMessage.WriteTo(spanBuffer);

ReferenceCodec.MarkValueField(writer.Session);
writer.WriteFieldHeaderExpected(1, WireType.LengthPrefixed);
writer.WriteVarUInt32((uint)spanBuffer.Length);
writer.Write(spanBuffer);

writer.WriteEndObject();
}
}
Loading