Skip to content

Commit

Permalink
introduce struct version of BasicProperties for basicPublish
Browse files Browse the repository at this point in the history
  • Loading branch information
bollhals committed Oct 8, 2021
1 parent 3b95fc3 commit 7fca46f
Show file tree
Hide file tree
Showing 104 changed files with 1,025 additions and 1,487 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand All @@ -29,7 +29,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, ReadOnlyMemory<byte> body)
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class ConsumerDispatcherBase
protected readonly ulong _deliveryTag = 500UL;
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected readonly IBasicProperties _properties = new Client.Framing.BasicProperties();
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
protected readonly byte[] _body = new byte[512];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ namespace Benchmarks.Networking
[MemoryDiagnoser]
public class Networking_BasicDeliver_Commons
{
private const int messageCount = 10000;


public static async Task Publish_Hello_World(IConnection connection, uint n, byte[] body)
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
using (var model = connection.CreateModel())
Expand All @@ -31,7 +28,7 @@ public static async Task Publish_Hello_World(IConnection connection, uint n, byt

for (int i = 0; i < messageCount; i++)
{
model.BasicPublish("", queue.QueueName, null, body);
model.BasicPublish("", queue.QueueName, body);
}

await tcs.Task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public void GlobalCleanup()
}

[Benchmark(Baseline = true)]
public async Task Publish_Hello_World()
public Task Publish_Hello_World()
{
await Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body);
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body);
}
}
}
7 changes: 3 additions & 4 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
using System.Text;

using BenchmarkDotNet.Attributes;

using RabbitMQ.Client;
using RabbitMQ.Client.client.impl;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

using BasicProperties = RabbitMQ.Client.Framing.BasicProperties;

namespace RabbitMQ.Benchmarks
{
[Config(typeof(Config))]
Expand All @@ -30,7 +29,7 @@ public class MethodFramingBasicPublish
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private readonly BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private readonly BasicProperties _propertiesEmpty = new BasicProperties();
private readonly EmptyBasicProperty _propertiesEmpty = new EmptyBasicProperty();
private readonly BasicProperties _properties = new BasicProperties { AppId = "Application id", MessageId = "Random message id" };
private readonly ReadOnlyMemory<byte> _bodyEmpty = ReadOnlyMemory<byte>.Empty;
private readonly ReadOnlyMemory<byte> _body = new byte[512];
Expand Down
24 changes: 12 additions & 12 deletions projects/Benchmarks/WireFormatting/MethodSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Text;

using BenchmarkDotNet.Attributes;

using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
using RabbitMQ.Client.Framing.Impl;

Expand All @@ -21,13 +21,13 @@ public virtual void SetUp() { }
public class MethodBasicAck : MethodSerializationBase
{
private readonly BasicAck _basicAck = new BasicAck(ulong.MaxValue, true);
public override void SetUp() => _basicAck.WriteArgumentsTo(_buffer.Span);
public override void SetUp() => _basicAck.WriteTo(_buffer.Span);

[Benchmark]
public ulong BasicAckRead() => new BasicAck(_buffer.Span)._deliveryTag; // return one property to not box when returning an object instead

[Benchmark]
public int BasicAckWrite() => _basicAck.WriteArgumentsTo(_buffer.Span);
public int BasicAckWrite() => _basicAck.WriteTo(_buffer.Span);
}

public class MethodBasicDeliver : MethodSerializationBase
Expand All @@ -50,10 +50,10 @@ public override void SetUp()
public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead

[Benchmark]
public int BasicPublishWrite() => _basicPublish.WriteArgumentsTo(_buffer.Span);
public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span);

[Benchmark]
public int BasicPublishMemoryWrite() => _basicPublishMemory.WriteArgumentsTo(_buffer.Span);
public int BasicPublishMemoryWrite() => _basicPublishMemory.WriteTo(_buffer.Span);

[Benchmark]
public int BasicPublishSize() => _basicPublish.GetRequiredBufferSize();
Expand All @@ -66,27 +66,27 @@ public class MethodChannelClose : MethodSerializationBase
{
private readonly ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999);

public override void SetUp() => _channelClose.WriteArgumentsTo(_buffer.Span);
public override void SetUp() => _channelClose.WriteTo(_buffer.Span);

[Benchmark]
public object ChannelCloseRead() => new ChannelClose(_buffer.Span)._replyText; // return one property to not box when returning an object instead

[Benchmark]
public int ChannelCloseWrite() => _channelClose.WriteArgumentsTo(_buffer.Span);
public int ChannelCloseWrite() => _channelClose.WriteTo(_buffer.Span);
}

public class MethodBasicProperties : MethodSerializationBase
{
private readonly BasicProperties _basicProperties = new BasicProperties { Persistent = true, AppId = "AppId", ContentEncoding = "content", };
public override void SetUp() => _basicProperties.WritePropertiesTo(_buffer.Span);
private readonly IAmqpWriteable _basicProperties = new BasicProperties { Persistent = true, AppId = "AppId", ContentEncoding = "content", };
public override void SetUp() => _basicProperties.WriteTo(_buffer.Span);

[Benchmark]
public object BasicPropertiesRead() => new BasicProperties(_buffer.Span);
public ReadOnlyBasicProperties BasicPropertiesRead() => new ReadOnlyBasicProperties(_buffer.Span);

[Benchmark]
public int BasicPropertiesWrite() => _basicProperties.WritePropertiesTo(_buffer.Span);
public int BasicPropertiesWrite() => _basicProperties.WriteTo(_buffer.Span);

[Benchmark]
public int BasicDeliverSize() => _basicProperties.GetRequiredPayloadBufferSize();
public int BasicDeliverSize() => _basicProperties.GetRequiredBufferSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
// Nothing to do here.
Expand Down Expand Up @@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
throw new InvalidOperationException("Should never be called.");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
throw new InvalidOperationException("Should never be called.");
}
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/api/BasicGetResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public sealed class BasicGetResult : IDisposable
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body">The body</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Expand All @@ -76,7 +76,7 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, stri
/// <param name="body">The body</param>
/// <param name="rentedArray">The rented array which body is part of.</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Expand All @@ -91,7 +91,7 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, stri
/// <summary>
/// Retrieves the Basic-class content header properties for this message.
/// </summary>
public IBasicProperties BasicProperties { get; }
public ReadOnlyBasicProperties BasicProperties { get; }

/// <summary>
/// Retrieves the body of this message.
Expand Down Expand Up @@ -130,7 +130,7 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, stri
/// <inheritdoc />
public void Dispose()
{
if (!(_rentedArray is null))
if (_rentedArray is not null)
{
ArrayPool<byte>.Shared.Return(_rentedArray);
}
Expand Down
169 changes: 169 additions & 0 deletions projects/RabbitMQ.Client/client/api/BasicProperties.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
#nullable enable
/// <summary>
/// AMQP specification content header properties for content class "basic".
/// </summary>
public struct BasicProperties : IBasicProperties, IAmqpHeader
{
public string? ContentType { get; set; }
public string? ContentEncoding { get; set; }
public IDictionary<string, object?>? Headers { get; set; }
public byte DeliveryMode { get; set; }
public byte Priority { get; set; }
public string? CorrelationId { get; set; }
public string? ReplyTo { get; set; }
public string? Expiration { get; set; }
public string? MessageId { get; set; }
public AmqpTimestamp Timestamp { get; set; }
public string? Type { get; set; }
public string? UserId { get; set; }
public string? AppId { get; set; }
public string? ClusterId { get; set; }

public bool Persistent
{
readonly get { return DeliveryMode == 2; }
set { DeliveryMode = value ? (byte)2 : (byte)1; }
}

public PublicationAddress? ReplyToAddress
{
readonly get
{
PublicationAddress.TryParse(ReplyTo, out PublicationAddress result);
return result;
}
set { ReplyTo = value?.ToString(); }
}

public BasicProperties(in ReadOnlyBasicProperties input)
{
ContentType = input.ContentType;
ContentEncoding = input.ContentEncoding;
Headers = input.Headers;
DeliveryMode = input.DeliveryMode;
Priority = input.Priority;
CorrelationId = input.CorrelationId;
ReplyTo = input.ReplyTo;
Expiration = input.Expiration;
MessageId = input.MessageId;
Timestamp = input.Timestamp;
Type = input.Type;
UserId = input.UserId;
AppId = input.AppId;
ClusterId = input.ClusterId;
}

public void ClearContentType() => ContentType = default;
public void ClearContentEncoding() => ContentEncoding = default;
public void ClearHeaders() => Headers = default;
public void ClearDeliveryMode() => DeliveryMode = default;
public void ClearPriority() => Priority = default;
public void ClearCorrelationId() => CorrelationId = default;
public void ClearReplyTo() => ReplyTo = default;
public void ClearExpiration() => Expiration = default;
public void ClearMessageId() => MessageId = default;
public void ClearTimestamp() => Timestamp = default;
public void ClearType() => Type = default;
public void ClearUserId() => UserId = default;
public void ClearAppId() => AppId = default;
public void ClearClusterId() => ClusterId = default;

public readonly bool IsContentTypePresent() => ContentType != default;
public readonly bool IsContentEncodingPresent() => ContentEncoding != default;
public readonly bool IsHeadersPresent() => Headers != default;
public readonly bool IsDeliveryModePresent() => DeliveryMode != default;
public readonly bool IsPriorityPresent() => Priority != default;
public readonly bool IsCorrelationIdPresent() => CorrelationId != default;
public readonly bool IsReplyToPresent() => ReplyTo != default;
public readonly bool IsExpirationPresent() => Expiration != default;
public readonly bool IsMessageIdPresent() => MessageId != default;
public readonly bool IsTimestampPresent() => Timestamp != default;
public readonly bool IsTypePresent() => Type != default;
public readonly bool IsUserIdPresent() => UserId != default;
public readonly bool IsAppIdPresent() => AppId != default;
public readonly bool IsClusterIdPresent() => ClusterId != default;

ushort IAmqpHeader.ProtocolClassId => ClassConstants.Basic;

readonly int IAmqpWriteable.WriteTo(Span<byte> span)
{
int offset = WireFormatting.WriteBits(ref span.GetStart(),
IsContentTypePresent(), IsContentEncodingPresent(), IsHeadersPresent(), IsDeliveryModePresent(), IsPriorityPresent(),
IsCorrelationIdPresent(), IsReplyToPresent(), IsExpirationPresent(), IsMessageIdPresent(), IsTimestampPresent(),
IsTypePresent(), IsUserIdPresent(), IsAppIdPresent(), IsClusterIdPresent());
if (IsContentTypePresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), ContentType); }
if (IsContentEncodingPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), ContentEncoding); }
if (IsHeadersPresent()) { offset += WireFormatting.WriteTable(ref span.GetOffset(offset), Headers); }
if (IsDeliveryModePresent()) { span[offset++] = DeliveryMode; }
if (IsPriorityPresent()) { span[offset++] = Priority; }
if (IsCorrelationIdPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), CorrelationId); }
if (IsReplyToPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), ReplyTo); }
if (IsExpirationPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), Expiration); }
if (IsMessageIdPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), MessageId); }
if (IsTimestampPresent()) { offset += WireFormatting.WriteTimestamp(ref span.GetOffset(offset), Timestamp); }
if (IsTypePresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), Type); }
if (IsUserIdPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), UserId); }
if (IsAppIdPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), AppId); }
if (IsClusterIdPresent()) { offset += WireFormatting.WriteShortstr(ref span.GetOffset(offset), ClusterId); }
return offset;
}

readonly int IAmqpWriteable.GetRequiredBufferSize()
{
int bufferSize = 2; // number of presence fields (14) in 2 bytes blocks
if (IsContentTypePresent()) { bufferSize += 1 + WireFormatting.GetByteCount(ContentType); } // _contentType in bytes
if (IsContentEncodingPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(ContentEncoding); } // _contentEncoding in bytes
if (IsHeadersPresent()) { bufferSize += WireFormatting.GetTableByteCount(Headers); } // _headers in bytes
if (IsDeliveryModePresent()) { bufferSize++; } // _deliveryMode in bytes
if (IsPriorityPresent()) { bufferSize++; } // _priority in bytes
if (IsCorrelationIdPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(CorrelationId); } // _correlationId in bytes
if (IsReplyToPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(ReplyTo); } // _replyTo in bytes
if (IsExpirationPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(Expiration); } // _expiration in bytes
if (IsMessageIdPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(MessageId); } // _messageId in bytes
if (IsTimestampPresent()) { bufferSize += 8; } // _timestamp in bytes
if (IsTypePresent()) { bufferSize += 1 + WireFormatting.GetByteCount(Type); } // _type in bytes
if (IsUserIdPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(UserId); } // _userId in bytes
if (IsAppIdPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(AppId); } // _appId in bytes
if (IsClusterIdPresent()) { bufferSize += 1 + WireFormatting.GetByteCount(ClusterId); } // _clusterId in bytes
return bufferSize;
}
}
}
Loading

0 comments on commit 7fca46f

Please sign in to comment.