Skip to content

Commit 546eaa2

Browse files
authored
Add unprocessedBytes property on NIOSingleStepByteToMessageProcessor (#2419)
1 parent 014812a commit 546eaa2

File tree

2 files changed

+45
-13
lines changed

2 files changed

+45
-13
lines changed

Sources/NIOCore/SingleStepByteToMessageDecoder.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,12 @@ extension NIOSingleStepByteToMessageProcessor: Sendable {}
281281

282282
// MARK: NIOSingleStepByteToMessageProcessor Public API
283283
extension NIOSingleStepByteToMessageProcessor {
284+
/// The number of bytes that are currently not processed by the ``process(buffer:_:)`` method. Having unprocessed
285+
/// bytes may result from receiving only partial messages or from receiving multiple messages at once.
286+
public var unprocessedBytes: Int {
287+
self._buffer?.readableBytes ?? 0
288+
}
289+
284290
/// Feed data into the `NIOSingleStepByteToMessageProcessor`
285291
///
286292
/// - parameters:

Tests/NIOCoreTests/SingleStepByteToMessageDecoderTest.swift

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,6 @@ public final class NIOSingleStepByteToMessageDecoderTest: XCTestCase {
3030
}
3131
}
3232

33-
private final class ForeverDecoder: NIOSingleStepByteToMessageDecoder {
34-
typealias InboundOut = Never
35-
36-
func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
37-
return nil
38-
}
39-
40-
func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
41-
XCTAssertTrue(seenEOF)
42-
return try self.decode(buffer: &buffer)
43-
}
44-
}
45-
4633
private final class LargeChunkDecoder: NIOSingleStepByteToMessageDecoder {
4734
typealias InboundOut = ByteBuffer
4835

@@ -227,14 +214,19 @@ public final class NIOSingleStepByteToMessageDecoderTest: XCTestCase {
227214
var buffer = allocator.buffer(capacity: 16)
228215
buffer.clear()
229216
buffer.writeStaticString("1")
217+
XCTAssertEqual(processor.unprocessedBytes, 0)
230218
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
219+
XCTAssertEqual(processor.unprocessedBytes, 1)
231220
buffer.clear()
232221
buffer.writeStaticString("23")
233222
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
223+
XCTAssertEqual(processor.unprocessedBytes, 1)
234224
buffer.clear()
235225
buffer.writeStaticString("4567890x")
236226
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
227+
XCTAssertEqual(processor.unprocessedBytes, 1)
237228
XCTAssertNoThrow(try processor.finishProcessing(seenEOF: false, messageReceiver.receiveMessage))
229+
XCTAssertEqual(processor.unprocessedBytes, 1)
238230

239231
XCTAssertEqual("12", messageReceiver.retrieveMessage().map {
240232
String(decoding: $0.readableBytesView, as: Unicode.UTF8.self)
@@ -495,4 +487,38 @@ public final class NIOSingleStepByteToMessageDecoderTest: XCTestCase {
495487
XCTAssertEqual(processor.decoder.reclaimHits, 1)
496488
XCTAssertEqual(processor._buffer!.readableBytes, 1)
497489
}
490+
491+
func testUnprocessedBytes() {
492+
let allocator = ByteBufferAllocator()
493+
let processor = NIOSingleStepByteToMessageProcessor(LargeChunkDecoder()) // reads slices of 512 bytes
494+
let messageReceiver: MessageReceiver<ByteBuffer> = MessageReceiver()
495+
496+
// We're going to send in 128 bytes. This will be held.
497+
var buffer = allocator.buffer(capacity: 128)
498+
buffer.writeBytes(Array(repeating: 0x04, count: 128))
499+
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
500+
XCTAssertEqual(0, messageReceiver.count)
501+
XCTAssertEqual(processor.unprocessedBytes, 128)
502+
503+
// Adding 513 bytes, will cause a message to be returned and an extra byte to be saved.
504+
buffer.clear()
505+
buffer.writeBytes(Array(repeating: 0x04, count: 513))
506+
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
507+
XCTAssertEqual(1, messageReceiver.count)
508+
XCTAssertEqual(processor.unprocessedBytes, 129)
509+
510+
// Adding 255 bytes, will cause 255 more bytes to be held.
511+
buffer.clear()
512+
buffer.writeBytes(Array(repeating: 0x04, count: 255))
513+
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
514+
XCTAssertEqual(1, messageReceiver.count)
515+
XCTAssertEqual(processor.unprocessedBytes, 384)
516+
517+
// Adding 128 bytes, will cause another message to be returned and the buffer to be empty.
518+
buffer.clear()
519+
buffer.writeBytes(Array(repeating: 0x04, count: 128))
520+
XCTAssertNoThrow(try processor.process(buffer: buffer, messageReceiver.receiveMessage))
521+
XCTAssertEqual(2, messageReceiver.count)
522+
XCTAssertEqual(processor.unprocessedBytes, 0)
523+
}
498524
}

0 commit comments

Comments
 (0)