Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify maximum message size #1218

Merged
merged 1 commit into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class AmqpTcpEndpoint// : ICloneable
public const int UseDefaultPort = -1;

private int _port;
private uint _maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
Expand Down Expand Up @@ -177,6 +178,25 @@ public IProtocol Protocol
/// </summary>
public SslOption Ssl { get; set; }

/// <summary>
/// Set the maximum size for a message in bytes. Setting it to 0 reverts to the default of 128MiB
/// </summary>
public uint MaxMessageSize
{
get { return _maxMessageSize; }
set
{
if (value == default(uint))
{
_maxMessageSize = Constants.DefaultMaxMessageSizeInBytes;
}
else
{
_maxMessageSize = value;
}
}
}

/// <summary>
/// Construct an instance from a protocol and an address in "hostname:port" format.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,8 @@ public static class Constants
public const int NotImplemented = 540;
///<summary>(= 541)</summary>
public const int InternalError = 541;

///<summary>(= 134217728)</summary>
public const uint DefaultMaxMessageSizeInBytes = 134217728;
}
}
8 changes: 6 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> fram
}
}

internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, uint maxMessageSize)
{
try
{
Expand Down Expand Up @@ -277,7 +277,11 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
FrameType type = (FrameType)firstByte;
var frameHeaderSpan = new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 6);
int channel = NetworkOrderDeserializer.ReadUInt16(frameHeaderSpan);
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4)); // FIXME - throw exn on unreasonable value
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4));
if (payloadSize > maxMessageSize)
{
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
}

const int EndMarkerLength = 1;
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
Expand Down
11 changes: 8 additions & 3 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)

internal sealed class SocketFrameHandler : IFrameHandler
{
private readonly AmqpTcpEndpoint _amqpTcpEndpoint;
private readonly ITcpClient _socket;
private readonly Stream _reader;
private readonly Stream _writer;
Expand All @@ -79,7 +80,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
{
Endpoint = endpoint;
_amqpTcpEndpoint = endpoint;
_frameHeaderBuffer = new byte[7];
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
new UnboundedChannelOptions
Expand Down Expand Up @@ -149,7 +150,11 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
WriteTimeout = writeTimeout;
_writerTask = Task.Run(WriteLoop);
}
public AmqpTcpEndpoint Endpoint { get; set; }

public AmqpTcpEndpoint Endpoint
{
get { return _amqpTcpEndpoint; }
}

public EndPoint LocalEndPoint
{
Expand Down Expand Up @@ -235,7 +240,7 @@ public void Close()

public InboundFrame ReadFrame()
{
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer);
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer, _amqpTcpEndpoint.MaxMessageSize);
}

public void SendHeader()
Expand Down
4 changes: 3 additions & 1 deletion projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Unit, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
namespace RabbitMQ.Client
{
Expand All @@ -13,6 +13,7 @@ namespace RabbitMQ.Client
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public int Port { get; set; }
public RabbitMQ.Client.IProtocol Protocol { get; }
public RabbitMQ.Client.SslOption Ssl { get; set; }
Expand Down Expand Up @@ -207,6 +208,7 @@ namespace RabbitMQ.Client
public const int CommandInvalid = 503;
public const int ConnectionForced = 320;
public const int ContentTooLarge = 311;
public const uint DefaultMaxMessageSizeInBytes = 134217728u;
public const int FrameBody = 3;
public const int FrameEnd = 206;
public const int FrameError = 501;
Expand Down