Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,3 @@ resharper_inconsistent_naming_highlighting = none

# Solution-specific settings/overrides ------------------------------------------------------

## TBD


4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
<PackageVersion Include="Azure.Core" Version="1.39.0" />
<PackageVersion Include="Azure.Data.Tables" Version="12.8.3" />
<PackageVersion Include="Azure.Identity" Version="1.11.4" />
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.11.3" />
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.12.2" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.0.1" />
<PackageVersion Include="Azure.ResourceManager.Storage" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.EventHubs" Version="1.1.0" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageVersion Include="Azure.Security.KeyVault.Secrets" Version="4.6.0" />
Expand Down
1 change: 1 addition & 0 deletions Spotflow.InMemory.Azure.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ VisualStudioVersion = 17.8.34525.116
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{2C6BE026-65BE-4D7A-8E4D-F8CF8E2C300E}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
Directory.Build.props = Directory.Build.props
Directory.Packages.props = Directory.Packages.props
LICENSE.md = LICENSE.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,13 @@ private async Task<IEnumerable<EventData>> ReceiveBatchCoreAsync(int maximumEven
}
else
{
#pragma warning disable CS0618 // Type or member is obsolete
_lastEnqueuedEventProperties = new(
partitionProperties.LastEnqueuedSequenceNumber,
partitionProperties.LastEnqueuedOffset,
lastOffset: partitionProperties.LastEnqueuedOffset,
partitionProperties.LastEnqueuedTime,
_timeProvider.GetUtcNow());
#pragma warning restore CS0618 // Type or member is obsolete
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public PartitionProperties GetProperties()

var beginningSequenceNumber = currentSegment.Count > 0 ? currentSegment[0].SequenceNumber : lastSequenceNumber;

#pragma warning disable CS0618 // Type or member is obsolete

return EventHubsModelFactory.PartitionProperties(
eventHubName: name,
partitionId: PartitionId,
Expand All @@ -80,6 +82,8 @@ public PartitionProperties GetProperties()
lastSequenceNumber: _lastSequenceNumber,
lastOffset: _lastOffset,
lastEnqueuedTime: _lastEnqueuedTime);

#pragma warning restore CS0618 // Type or member is obsolete
}

public void SendEvent(EventData eventData, string? partitionKey)
Expand Down Expand Up @@ -114,6 +118,7 @@ public void SendEvent(EventData eventData, string? partitionKey)
var eventDataPropertiesCopy = new Dictionary<string, object>(eventData.Properties);
var eventDataSystemPropertiesCopy = new Dictionary<string, object>(eventData.SystemProperties);

#pragma warning disable CS0618 // Type or member is obsolete
var eventWithSystemProperties = EventHubsModelFactory.EventData(
eventBody: new(eventBodyCopy),
properties: eventDataPropertiesCopy,
Expand All @@ -123,6 +128,7 @@ public void SendEvent(EventData eventData, string? partitionKey)
offset: _lastOffset,
enqueuedTime: _lastEnqueuedTime
);
#pragma warning restore CS0618 // Type or member is obsolete

eventWithSystemProperties.MessageId = eventData.MessageId;
eventWithSystemProperties.CorrelationId = eventData.CorrelationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@ public static InMemoryEventPosition FromEventPosition(EventPosition position)

var isInclusive = ReflectionUtils.ReadInternalValueProperty<bool>(position, "IsInclusive");

var offset = ReflectionUtils.ReadInternalReferenceProperty<object>(position, "Offset");
if (ReflectionUtils.TryReadOptionalInternalReferenceProperty<object>(position, "OffsetString", out var offsetString) &&
offsetString is not null)
{
throw new NotSupportedException("EventPosition with offset is not supported.");
}

if (offset is not null)
// Offset was renamed to OffsetString after EventHub version 5.12.0, so we check for the old property name as well.
if (ReflectionUtils.TryReadOptionalInternalReferenceProperty<object>(position, "Offset", out var offset) &&
offset is not null)
{
throw new NotSupportedException("EventPosition with offset is not supported.");
}
Expand Down
41 changes: 30 additions & 11 deletions src/Spotflow.InMemory.Azure/Internals/ReflectionUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,45 @@ namespace Spotflow.InMemory.Azure.Internals;

internal static class ReflectionUtils
{

public static TOut? ReadInternalReferenceProperty<TOut>(object obj, string propertyName) where TOut : class
{
var value = ReadInternalProperty(obj, propertyName);
if (!TryReadOptionalInternalReferenceProperty(obj, propertyName, out TOut? value))
{
throw new InvalidOperationException($"Property '{propertyName}' not found on type {obj.GetType().Name}");
}

return value;
}

public static bool TryReadOptionalInternalReferenceProperty<TOut>(object obj, string propertyName, out TOut? value) where TOut : class
{
value = null;

if (value is null)
if (!TryReadOptionalInternalProperty(obj, propertyName, out var rawValue))
{
return null;
return false;
}

if (!value.GetType().IsAssignableTo(typeof(TOut)))
if (rawValue is null)
{
throw new InvalidOperationException($"Property '{propertyName}' is not assignable to {typeof(TOut).Name}");
return true; // null is a valid value for reference types
}

return (TOut?) value;
if (!rawValue.GetType().IsAssignableTo(typeof(TOut)))
{
throw new InvalidOperationException($"Property '{propertyName}' is not assignable to {typeof(TOut).Name}");
}

value = (TOut?) rawValue;
return true;
}

public static TOut ReadInternalValueProperty<TOut>(object obj, string propertyName) where TOut : struct
{
var value = ReadInternalProperty(obj, propertyName);
if (!TryReadOptionalInternalProperty(obj, propertyName, out var value))
{
throw new InvalidOperationException($"Property '{propertyName}' not found on type {obj.GetType().Name}");
}

if (value is not TOut)
{
Expand All @@ -35,7 +52,7 @@ public static TOut ReadInternalValueProperty<TOut>(object obj, string propertyNa
return (TOut) value;
}

private static object? ReadInternalProperty(object obj, string propertyName)
private static bool TryReadOptionalInternalProperty(object obj, string propertyName, out object? value)
{
ArgumentNullException.ThrowIfNull(obj);
ArgumentException.ThrowIfNullOrWhiteSpace(propertyName);
Expand All @@ -46,10 +63,12 @@ public static TOut ReadInternalValueProperty<TOut>(object obj, string propertyNa

if (property is null)
{
throw new InvalidOperationException($"Property '{propertyName}' not found on type {obj.GetType().Name}");
value = null;
return false;
}

return property.GetValue(obj);
value = property.GetValue(obj);
return true;
}

}
6 changes: 6 additions & 0 deletions tests/Tests/EventHub/EventHubConsumerClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,23 @@ public async Task GetPartitionProperties_Should_Return_Correct_Offset()

var propertiesBeforeSend = await consumerClient.GetPartitionPropertiesAsync("0");

#pragma warning disable CS0618 // Type or member is obsolete
propertiesBeforeSend.LastEnqueuedOffset.Should().Be(-1);
propertiesBeforeSend.LastEnqueuedOffsetString.Should().Be("-1");

await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" });

var propertiesAfterSend1 = await consumerClient.GetPartitionPropertiesAsync("0");
propertiesAfterSend1.LastEnqueuedOffset.Should().Be(0);
propertiesAfterSend1.LastEnqueuedOffsetString.Should().Be("0");

await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" });

var propertiesAfterSend2 = await consumerClient.GetPartitionPropertiesAsync("0");
propertiesAfterSend2.LastEnqueuedOffset.Should().Be(26);
propertiesAfterSend2.LastEnqueuedOffsetString.Should().Be("26");

#pragma warning restore CS0618 // Type or member is obsolete

}
}
8 changes: 8 additions & 0 deletions tests/Tests/EventHub/EventHubProducerClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,23 @@ public async Task Offset_Should_Start_At_Zero_And_Increase_With_Each_Sent_Event(

await producer.SendAsync([emptyEvent]);
var emptyEventBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);

#pragma warning disable CS0618 // Type or member is obsolete

emptyEventBatch.Single().Offset.Should().Be(0);
emptyEventBatch.Single().OffsetString.Should().Be("0");

await producer.SendAsync([emptyEvent], new SendEventOptions { PartitionKey = "test-pk" });
var emptyEventWithPartitionKey = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
emptyEventWithPartitionKey.Single().Offset.Should().Be(26);
emptyEventWithPartitionKey.Single().OffsetString.Should().Be("26");

await producer.SendAsync([eventWithBody]);
var eventWithBodyBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
eventWithBodyBatch.Single().Offset.Should().Be(59);
eventWithBodyBatch.Single().OffsetString.Should().Be("59");

#pragma warning restore CS0618 // Type or member is obsolete
}

[TestMethod]
Expand Down
13 changes: 12 additions & 1 deletion tests/Tests/EventHub/EventHubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,25 @@ public async Task Custom_Partition_Initial_State_Should_Be_Used_For_All_Partitio
batch0.Should().HaveCount(100);
batch1.Should().HaveCount(100);

#pragma warning disable CS0618 // Type or member is obsolete

batch0.ElementAt(0).SequenceNumber.Should().Be(43);
batch0.ElementAt(0).Offset.Should().Be(43);
batch0.ElementAt(0).OffsetString.Should().Be("43");
batch0.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow());
batch0.ElementAt(1).SequenceNumber.Should().Be(44);
batch0.ElementAt(1).Offset.Should().Be(72);
batch0.ElementAt(1).OffsetString.Should().Be("72");

batch1.ElementAt(0).SequenceNumber.Should().Be(43);
batch1.ElementAt(0).Offset.Should().Be(43);
batch1.ElementAt(0).OffsetString.Should().Be("43");
batch1.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow());
batch1.ElementAt(1).SequenceNumber.Should().Be(44);
batch1.ElementAt(1).Offset.Should().Be(72);
batch1.ElementAt(1).OffsetString.Should().Be("72");

#pragma warning restore CS0618 // Type or member is obsolete
}

[TestMethod]
Expand All @@ -98,15 +105,19 @@ public async Task Default_Partition_Initial_State_Should_Be_Used_For_All_Partiti
batch0.Should().HaveCount(1);
batch1.Should().HaveCount(1);


#pragma warning disable CS0618 // Type or member is obsolete
batch0.Single().SequenceNumber.Should().Be(0);
batch0.Single().Offset.Should().Be(0);
batch0.Single().OffsetString.Should().Be("0");
batch0.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2));

batch1.Single().SequenceNumber.Should().Be(0);
batch1.Single().Offset.Should().Be(0);
batch1.Single().OffsetString.Should().Be("0");
batch1.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2));

#pragma warning restore CS0618 // Type or member is obsolete

}

}
8 changes: 8 additions & 0 deletions tests/Tests/EventHub/PartitionReceiverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed()

var properties1 = receiver.ReadLastEnqueuedEventProperties();
properties1.SequenceNumber.Should().Be(0);

#pragma warning disable CS0618 // Type or member is obsolete

properties1.Offset.Should().Be(0);
properties1.OffsetString.Should().Be("0");


// Send second event
Expand All @@ -246,6 +250,7 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed()
var properties2 = receiver.ReadLastEnqueuedEventProperties();
properties2.SequenceNumber.Should().Be(0);
properties2.Offset.Should().Be(0);
properties2.OffsetString.Should().Be("0");

_ = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero);

Expand All @@ -255,6 +260,9 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed()

properties3.SequenceNumber.Should().Be(1);
properties3.Offset.Should().Be(37);
properties3.OffsetString.Should().Be("37");

#pragma warning restore CS0618 // Type or member is obsolete

}

Expand Down
4 changes: 2 additions & 2 deletions tests/Tests/Storage/Blobs/BlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void OpenWrite_And_Dispose_Immediately_Should_Create_Empty_Blob(BlobClien
// Intentionally empty
}

blobClient.DownloadContent().Value.Content.ToString().Should().BeEmpty();
blobClient.DownloadContent().Value.Content.ToArray().Should().BeEmpty();

ShouldHaveBlocks(containerClient.GetBlockBlobClient(blobName), commited: 0, uncommited: 0);

Expand All @@ -185,7 +185,7 @@ public void OpenWrite_And_Without_Dispose_Should_Create_Empty_Blob(BlobClientTyp

using var stream = OpenWrite(blobClient, true);

blobClient.DownloadContent().Value.Content.ToString().Should().BeEmpty();
blobClient.DownloadContent().Value.Content.ToArray().Should().BeEmpty();

ShouldHaveBlocks(containerClient.GetBlockBlobClient(blobName), commited: 0, uncommited: 0);
}
Expand Down
Loading