Skip to content

Commit ff80143

Browse files
committed
Update Event Hubs offset handling to use string representation
1 parent edd3014 commit ff80143

File tree

8 files changed

+22
-22
lines changed

8 files changed

+22
-22
lines changed

Directory.Packages.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
<PackageVersion Include="Azure.Core" Version="1.39.0" />
44
<PackageVersion Include="Azure.Data.Tables" Version="12.8.3" />
55
<PackageVersion Include="Azure.Identity" Version="1.11.4" />
6-
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.11.3" />
6+
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.12.2" />
77
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
88
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.0.1" />
99
<PackageVersion Include="Azure.ResourceManager.Storage" Version="1.3.0" />
10-
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
10+
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
1111
<PackageVersion Include="Azure.ResourceManager.EventHubs" Version="1.1.0" />
1212
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
1313
<PackageVersion Include="Azure.Security.KeyVault.Secrets" Version="4.6.0" />

src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private async Task<IEnumerable<EventData>> ReceiveBatchCoreAsync(int maximumEven
220220
{
221221
_lastEnqueuedEventProperties = new(
222222
partitionProperties.LastEnqueuedSequenceNumber,
223-
partitionProperties.LastEnqueuedOffset,
223+
partitionProperties.LastEnqueuedOffsetString,
224224
partitionProperties.LastEnqueuedTime,
225225
_timeProvider.GetUtcNow());
226226
}

src/Spotflow.InMemory.Azure.EventHubs/Internals/InMemoryPartition.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public PartitionProperties GetProperties()
7878
isEmpty: currentSegment.Count is 0,
7979
beginningSequenceNumber: beginningSequenceNumber,
8080
lastSequenceNumber: _lastSequenceNumber,
81-
lastOffset: _lastOffset,
81+
lastOffsetString: _lastOffset.ToString(),
8282
lastEnqueuedTime: _lastEnqueuedTime);
8383
}
8484

@@ -120,7 +120,7 @@ public void SendEvent(EventData eventData, string? partitionKey)
120120
systemProperties: eventDataSystemPropertiesCopy,
121121
partitionKey: partitionKey,
122122
sequenceNumber: _lastSequenceNumber,
123-
offset: _lastOffset,
123+
offsetString: _lastOffset.ToString(),
124124
enqueuedTime: _lastEnqueuedTime
125125
);
126126

src/Spotflow.InMemory.Azure.EventHubs/Internals/StartingPosition.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public static InMemoryEventPosition FromEventPosition(EventPosition position)
4545

4646
var isInclusive = ReflectionUtils.ReadInternalValueProperty<bool>(position, "IsInclusive");
4747

48-
var offset = ReflectionUtils.ReadInternalReferenceProperty<object>(position, "Offset");
48+
var offsetString = ReflectionUtils.ReadInternalReferenceProperty<object>(position, "OffsetString");
4949

50-
if (offset is not null)
50+
if (offsetString is not null)
5151
{
5252
throw new NotSupportedException("EventPosition with offset is not supported.");
5353
}

tests/Tests/EventHub/EventHubConsumerClientTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,17 @@ public async Task GetPartitionProperties_Should_Return_Correct_Offset()
5555

5656
var propertiesBeforeSend = await consumerClient.GetPartitionPropertiesAsync("0");
5757

58-
propertiesBeforeSend.LastEnqueuedOffset.Should().Be(-1);
58+
propertiesBeforeSend.LastEnqueuedOffsetString.Should().Be("-1");
5959

6060
await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" });
6161

6262
var propertiesAfterSend1 = await consumerClient.GetPartitionPropertiesAsync("0");
63-
propertiesAfterSend1.LastEnqueuedOffset.Should().Be(0);
63+
propertiesAfterSend1.LastEnqueuedOffsetString.Should().Be("0");
6464

6565
await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" });
6666

6767
var propertiesAfterSend2 = await consumerClient.GetPartitionPropertiesAsync("0");
68-
propertiesAfterSend2.LastEnqueuedOffset.Should().Be(26);
68+
propertiesAfterSend2.LastEnqueuedOffsetString.Should().Be("26");
6969

7070
}
7171
}

tests/Tests/EventHub/EventHubProducerClientTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ public async Task Offset_Should_Start_At_Zero_And_Increase_With_Each_Sent_Event(
6565

6666
await producer.SendAsync([emptyEvent]);
6767
var emptyEventBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
68-
emptyEventBatch.Single().Offset.Should().Be(0);
68+
emptyEventBatch.Single().OffsetString.Should().Be("0");
6969

7070
await producer.SendAsync([emptyEvent], new SendEventOptions { PartitionKey = "test-pk" });
7171
var emptyEventWithPartitionKey = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
72-
emptyEventWithPartitionKey.Single().Offset.Should().Be(26);
72+
emptyEventWithPartitionKey.Single().OffsetString.Should().Be("26");
7373

7474
await producer.SendAsync([eventWithBody]);
7575
var eventWithBodyBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero);
76-
eventWithBodyBatch.Single().Offset.Should().Be(59);
76+
eventWithBodyBatch.Single().OffsetString.Should().Be("59");
7777
}
7878

7979
[TestMethod]

tests/Tests/EventHub/EventHubTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,16 @@ public async Task Custom_Partition_Initial_State_Should_Be_Used_For_All_Partitio
6262
batch1.Should().HaveCount(100);
6363

6464
batch0.ElementAt(0).SequenceNumber.Should().Be(43);
65-
batch0.ElementAt(0).Offset.Should().Be(43);
65+
batch0.ElementAt(0).OffsetString.Should().Be("43");
6666
batch0.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow());
6767
batch0.ElementAt(1).SequenceNumber.Should().Be(44);
68-
batch0.ElementAt(1).Offset.Should().Be(72);
68+
batch0.ElementAt(1).OffsetString.Should().Be("72");
6969

7070
batch1.ElementAt(0).SequenceNumber.Should().Be(43);
71-
batch1.ElementAt(0).Offset.Should().Be(43);
71+
batch1.ElementAt(0).OffsetString.Should().Be("43");
7272
batch1.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow());
7373
batch1.ElementAt(1).SequenceNumber.Should().Be(44);
74-
batch1.ElementAt(1).Offset.Should().Be(72);
74+
batch1.ElementAt(1).OffsetString.Should().Be("72");
7575

7676
}
7777

@@ -100,11 +100,11 @@ public async Task Default_Partition_Initial_State_Should_Be_Used_For_All_Partiti
100100

101101

102102
batch0.Single().SequenceNumber.Should().Be(0);
103-
batch0.Single().Offset.Should().Be(0);
103+
batch0.Single().OffsetString.Should().Be("0");
104104
batch0.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2));
105105

106106
batch1.Single().SequenceNumber.Should().Be(0);
107-
batch1.Single().Offset.Should().Be(0);
107+
batch1.Single().OffsetString.Should().Be("0");
108108
batch1.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2));
109109

110110
}

tests/Tests/EventHub/PartitionReceiverTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed()
235235

236236
var properties1 = receiver.ReadLastEnqueuedEventProperties();
237237
properties1.SequenceNumber.Should().Be(0);
238-
properties1.Offset.Should().Be(0);
238+
properties1.OffsetString.Should().Be("0");
239239

240240

241241
// Send second event
@@ -245,7 +245,7 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed()
245245

246246
var properties2 = receiver.ReadLastEnqueuedEventProperties();
247247
properties2.SequenceNumber.Should().Be(0);
248-
properties2.Offset.Should().Be(0);
248+
properties2.OffsetString.Should().Be("0");
249249

250250
_ = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero);
251251

@@ -254,7 +254,7 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed()
254254
var properties3 = receiver.ReadLastEnqueuedEventProperties();
255255

256256
properties3.SequenceNumber.Should().Be(1);
257-
properties3.Offset.Should().Be(37);
257+
properties3.OffsetString.Should().Be("37");
258258

259259
}
260260

0 commit comments

Comments
 (0)