Skip to content

Commit cb6f27c

Browse files
authored
Update Event Hubs offset handling to use string representation because of changes in Azure.Messaging.EventHubs 5.12.0+ (#55)
1 parent edd3014 commit cb6f27c

File tree

12 files changed

+86
-22
lines changed

12 files changed

+86
-22
lines changed

.editorconfig

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,3 @@ resharper_inconsistent_naming_highlighting = none
249249

250250
# Solution-specific settings/overrides ------------------------------------------------------
251251

252-
## TBD
253-
254-

Directory.Packages.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
<PackageVersion Include="Azure.Core" Version="1.39.0" />
44
<PackageVersion Include="Azure.Data.Tables" Version="12.8.3" />
55
<PackageVersion Include="Azure.Identity" Version="1.11.4" />
6-
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.11.3" />
6+
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.12.2" />
77
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
88
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.0.1" />
99
<PackageVersion Include="Azure.ResourceManager.Storage" Version="1.3.0" />
10-
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
10+
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
1111
<PackageVersion Include="Azure.ResourceManager.EventHubs" Version="1.1.0" />
1212
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
1313
<PackageVersion Include="Azure.Security.KeyVault.Secrets" Version="4.6.0" />

Spotflow.InMemory.Azure.sln

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ VisualStudioVersion = 17.8.34525.116
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{2C6BE026-65BE-4D7A-8E4D-F8CF8E2C300E}"
77
ProjectSection(SolutionItems) = preProject
8+
.editorconfig = .editorconfig
89
Directory.Build.props = Directory.Build.props
910
Directory.Packages.props = Directory.Packages.props
1011
LICENSE.md = LICENSE.md

src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,13 @@ private async Task<IEnumerable<EventData>> ReceiveBatchCoreAsync(int maximumEven
218218
}
219219
else
220220
{
221+
#pragma warning disable CS0618 // Type or member is obsolete
221222
_lastEnqueuedEventProperties = new(
222223
partitionProperties.LastEnqueuedSequenceNumber,
223-
partitionProperties.LastEnqueuedOffset,
224+
lastOffset: partitionProperties.LastEnqueuedOffset,
224225
partitionProperties.LastEnqueuedTime,
225226
_timeProvider.GetUtcNow());
227+
#pragma warning restore CS0618 // Type or member is obsolete
226228
}
227229
}
228230
}

src/Spotflow.InMemory.Azure.EventHubs/Internals/InMemoryPartition.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public PartitionProperties GetProperties()
7272

7373
var beginningSequenceNumber = currentSegment.Count > 0 ? currentSegment[0].SequenceNumber : lastSequenceNumber;
7474

75+
#pragma warning disable CS0618 // Type or member is obsolete
76+
7577
return EventHubsModelFactory.PartitionProperties(
7678
eventHubName: name,
7779
partitionId: PartitionId,
@@ -80,6 +82,8 @@ public PartitionProperties GetProperties()
8082
lastSequenceNumber: _lastSequenceNumber,
8183
lastOffset: _lastOffset,
8284
lastEnqueuedTime: _lastEnqueuedTime);
85+
86+
#pragma warning restore CS0618 // Type or member is obsolete
8387
}
8488

8589
public void SendEvent(EventData eventData, string? partitionKey)
@@ -114,6 +118,7 @@ public void SendEvent(EventData eventData, string? partitionKey)
114118
var eventDataPropertiesCopy = new Dictionary<string, object>(eventData.Properties);
115119
var eventDataSystemPropertiesCopy = new Dictionary<string, object>(eventData.SystemProperties);
116120

121+
#pragma warning disable CS0618 // Type or member is obsolete
117122
var eventWithSystemProperties = EventHubsModelFactory.EventData(
118123
eventBody: new(eventBodyCopy),
119124
properties: eventDataPropertiesCopy,
@@ -123,6 +128,7 @@ public void SendEvent(EventData eventData, string? partitionKey)
123128
offset: _lastOffset,
124129
enqueuedTime: _lastEnqueuedTime
125130
);
131+
#pragma warning restore CS0618 // Type or member is obsolete
126132

127133
eventWithSystemProperties.MessageId = eventData.MessageId;
128134
eventWithSystemProperties.CorrelationId = eventData.CorrelationId;

src/Spotflow.InMemory.Azure.EventHubs/Internals/StartingPosition.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,15 @@ public static InMemoryEventPosition FromEventPosition(EventPosition position)
4545

4646
var isInclusive = ReflectionUtils.ReadInternalValueProperty<bool>(position, "IsInclusive");
4747

48-
var offset = ReflectionUtils.ReadInternalReferenceProperty<object>(position, "Offset");
48+
if (ReflectionUtils.TryReadOptionalInternalReferenceProperty<object>(position, "OffsetString", out var offsetString) &&
49+
offsetString is not null)
50+
{
51+
throw new NotSupportedException("EventPosition with offset is not supported.");
52+
}
4953

50-
if (offset is not null)
54+
// Offset was renamed to OffsetString after EventHub version 5.12.0, so we check for the old property name as well.
55+
if (ReflectionUtils.TryReadOptionalInternalReferenceProperty<object>(position, "Offset", out var offset) &&
56+
offset is not null)
5157
{
5258
throw new NotSupportedException("EventPosition with offset is not supported.");
5359
}

src/Spotflow.InMemory.Azure/Internals/ReflectionUtils.cs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,45 @@ namespace Spotflow.InMemory.Azure.Internals;
44

55
internal static class ReflectionUtils
66
{
7-
87
public static TOut? ReadInternalReferenceProperty<TOut>(object obj, string propertyName) where TOut : class
98
{
10-
var value = ReadInternalProperty(obj, propertyName);
9+
if (!TryReadOptionalInternalReferenceProperty(obj, propertyName, out TOut? value))
10+
{
11+
throw new InvalidOperationException($"Property '{propertyName}' not found on type {obj.GetType().Name}");
12+
}
13+
14+
return value;
15+
}
16+
17+
public static bool TryReadOptionalInternalReferenceProperty<TOut>(object obj, string propertyName, out TOut? value) where TOut : class
18+
{
19+
value = null;
1120

12-
if (value is null)
21+
if (!TryReadOptionalInternalProperty(obj, propertyName, out var rawValue))
1322
{
14-
return null;
23+
return false;
1524
}
1625

17-
if (!value.GetType().IsAssignableTo(typeof(TOut)))
26+
if (rawValue is null)
1827
{
19-
throw new InvalidOperationException($"Property '{propertyName}' is not assignable to {typeof(TOut).Name}");
28+
return true; // null is a valid value for reference types
2029
}
2130

22-
return (TOut?) value;
31+
if (!rawValue.GetType().IsAssignableTo(typeof(TOut)))
32+
{
33+
throw new InvalidOperationException($"Property '{propertyName}' is not assignable to {typeof(TOut).Name}");
34+
}
2335

36+
value = (TOut?) rawValue;
37+
return true;
2438
}
2539

2640
public static TOut ReadInternalValueProperty<TOut>(object obj, string propertyName) where TOut : struct
2741
{
28-
var value = ReadInternalProperty(obj, propertyName);
42+
if (!TryReadOptionalInternalProperty(obj, propertyName, out var value))
43+
{
44+
throw new InvalidOperationException($"Property '{propertyName}' not found on type {obj.GetType().Name}");
45+
}
2946

3047
if (value is not TOut)
3148
{
@@ -35,7 +52,7 @@ public static TOut ReadInternalValueProperty<TOut>(object obj, string propertyNa
3552
return (TOut) value;
3653
}
3754

38-
private static object? ReadInternalProperty(object obj, string propertyName)
55+
private static bool TryReadOptionalInternalProperty(object obj, string propertyName, out object? value)
3956
{
4057
ArgumentNullException.ThrowIfNull(obj);
4158
ArgumentException.ThrowIfNullOrWhiteSpace(propertyName);
@@ -46,10 +63,12 @@ public static TOut ReadInternalValueProperty<TOut>(object obj, string propertyNa
4663

4764
if (property is null)
4865
{
49-
throw new InvalidOperationException($"Property '{propertyName}' not found on type {obj.GetType().Name}");
66+
value = null;
67+
return false;
5068
}
5169

52-
return property.GetValue(obj);
70+
value = property.GetValue(obj);
71+
return true;
5372
}
5473

5574
}

tests/Tests/EventHub/EventHubConsumerClientTests.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,23 @@ public async Task GetPartitionProperties_Should_Return_Correct_Offset()
5555

5656
var propertiesBeforeSend = await consumerClient.GetPartitionPropertiesAsync("0");
5757

58+
#pragma warning disable CS0618 // Type or member is obsolete
5859
propertiesBeforeSend.LastEnqueuedOffset.Should().Be(-1);
60+
propertiesBeforeSend.LastEnqueuedOffsetString.Should().Be("-1");
5961

6062
await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" });
6163

6264
var propertiesAfterSend1 = await consumerClient.GetPartitionPropertiesAsync("0");
6365
propertiesAfterSend1.LastEnqueuedOffset.Should().Be(0);
66+
propertiesAfterSend1.LastEnqueuedOffsetString.Should().Be("0");
6467

6568
await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" });
6669

6770
var propertiesAfterSend2 = await consumerClient.GetPartitionPropertiesAsync("0");
6871
propertiesAfterSend2.LastEnqueuedOffset.Should().Be(26);
72+
propertiesAfterSend2.LastEnqueuedOffsetString.Should().Be("26");
73+
74+
#pragma warning restore CS0618 // Type or member is obsolete
6975

7076
}
7177
}

tests/Tests/EventHub/EventHubProducerClientTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,23 @@ public async Task Offset_Should_Start_At_Zero_And_Increase_With_Each_Sent_Event(
6565

6666
await producer.SendAsync([emptyEvent]);
6767
var emptyEventBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
68+
69+
#pragma warning disable CS0618 // Type or member is obsolete
70+
6871
emptyEventBatch.Single().Offset.Should().Be(0);
72+
emptyEventBatch.Single().OffsetString.Should().Be("0");
6973

7074
await producer.SendAsync([emptyEvent], new SendEventOptions { PartitionKey = "test-pk" });
7175
var emptyEventWithPartitionKey = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
7276
emptyEventWithPartitionKey.Single().Offset.Should().Be(26);
77+
emptyEventWithPartitionKey.Single().OffsetString.Should().Be("26");
7378

7479
await producer.SendAsync([eventWithBody]);
7580
var eventWithBodyBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
7681
eventWithBodyBatch.Single().Offset.Should().Be(59);
82+
eventWithBodyBatch.Single().OffsetString.Should().Be("59");
83+
84+
#pragma warning restore CS0618 // Type or member is obsolete
7785
}
7886

7987
[TestMethod]

tests/Tests/EventHub/EventHubTests.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,25 @@ public async Task Custom_Partition_Initial_State_Should_Be_Used_For_All_Partitio
6161
batch0.Should().HaveCount(100);
6262
batch1.Should().HaveCount(100);
6363

64+
#pragma warning disable CS0618 // Type or member is obsolete
65+
6466
batch0.ElementAt(0).SequenceNumber.Should().Be(43);
6567
batch0.ElementAt(0).Offset.Should().Be(43);
68+
batch0.ElementAt(0).OffsetString.Should().Be("43");
6669
batch0.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow());
6770
batch0.ElementAt(1).SequenceNumber.Should().Be(44);
6871
batch0.ElementAt(1).Offset.Should().Be(72);
72+
batch0.ElementAt(1).OffsetString.Should().Be("72");
6973

7074
batch1.ElementAt(0).SequenceNumber.Should().Be(43);
7175
batch1.ElementAt(0).Offset.Should().Be(43);
76+
batch1.ElementAt(0).OffsetString.Should().Be("43");
7277
batch1.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow());
7378
batch1.ElementAt(1).SequenceNumber.Should().Be(44);
7479
batch1.ElementAt(1).Offset.Should().Be(72);
80+
batch1.ElementAt(1).OffsetString.Should().Be("72");
7581

82+
#pragma warning restore CS0618 // Type or member is obsolete
7683
}
7784

7885
[TestMethod]
@@ -98,15 +105,19 @@ public async Task Default_Partition_Initial_State_Should_Be_Used_For_All_Partiti
98105
batch0.Should().HaveCount(1);
99106
batch1.Should().HaveCount(1);
100107

101-
108+
#pragma warning disable CS0618 // Type or member is obsolete
102109
batch0.Single().SequenceNumber.Should().Be(0);
103110
batch0.Single().Offset.Should().Be(0);
111+
batch0.Single().OffsetString.Should().Be("0");
104112
batch0.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2));
105113

106114
batch1.Single().SequenceNumber.Should().Be(0);
107115
batch1.Single().Offset.Should().Be(0);
116+
batch1.Single().OffsetString.Should().Be("0");
108117
batch1.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2));
109118

119+
#pragma warning restore CS0618 // Type or member is obsolete
120+
110121
}
111122

112123
}

0 commit comments

Comments
 (0)