Skip to content

Commit b158d04

Browse files
SeanTAllenjemc
authored andcommitted
Allow TCPConnectionNotify to cause connection to yield while receiving (ponylang#1355)
This comes from direct experience using the existing `TCPConnection` functionality at Sendence. We are heavy users of `expect` on `TCPConnection` in order to support framed protocols. Our `received` methods on notifiers are generally of the following form: ```pony fun ref received(conn: TCPConnection ref, data: Array[U8] iso) => if _header then // convert the 4 byte header into a value for expect, aka payload length let expect = Bytes.to_u32(data(0), data(1), data(2), data(3)).usize() conn.expect(expect) _header = false else // do something with payload ... // reset expect for next 4 byte header conn.expect(4) _header = true end ``` This short of usage is why `expect` was initially added to `TCPConnection`. Upon usage, we found a serious drawback with this approach. `TCPConnection` will read up to 4k of data on a single behavior run and if there is still data available, it will then send itself a `_read_again` message to trigger more reading of additional data. It does this so that it doesn't hogged the scheduler while reading from the socket. This can work reasonably well in some scenarios but not others. In the framed protocol example above, if the message payloads are small then 4k of data can result in a lot of messages being sent from our `received` method to other actors in the application. In an application that is continously receiving data, this results in a very bursty scheduling experience. After consulting with Sylvan, we changed `received` and `TCPConnection` to allow `received` to return a Boolean to indicate whether `TCPConnection` should continue sending more data on this behavior run. We've found that for some workloads, we are able to get equal performance while greatly lowering latency by having `TCPConnection` call `_read_again` earlier than it otherwise would. Closes RFC ponylang#19 Closes ponylang#1343
1 parent 79f8e88 commit b158d04

10 files changed

+38
-17
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ All notable changes to the Pony compiler and standard library will be documented
1111
### Added
1212

1313
- TCP read and write backpressure hooks in `TCPConnection` (issue #1311)
14+
- Allow TCP notifiers to cause connections to yield while receiving (issue #1343)
1415

1516
### Changed
1617

examples/echo/echo.pony

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ class Server is TCPConnectionNotify
4141
fun ref accepted(conn: TCPConnection ref) =>
4242
_out.print("connection accepted")
4343

44-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
44+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
4545
_out.print("data received, looping it back")
4646
conn.write("server says: ")
4747
conn.write(consume data)
48+
true
4849

4950
fun ref closed(conn: TCPConnection ref) =>
5051
_out.print("server closed")

examples/net/client.pony

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ class ClientSide is TCPConnectionNotify
2121
fun ref connect_failed(conn: TCPConnection ref) =>
2222
_env.out.print("connect failed")
2323

24-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
24+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
2525
_env.out.print(consume data)
26+
true
2627

2728
fun ref closed(conn: TCPConnection ref) =>
2829
_env.out.print("client closed")

examples/net/server.pony

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ class ServerSide is TCPConnectionNotify
1313
conn.write("server says hi")
1414
end
1515

16-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
16+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
1717
_env.out.print(consume data)
1818
conn.dispose()
19+
true
1920

2021
fun ref closed(conn: TCPConnection ref) =>
2122
_env.out.print("server closed")

packages/net/_test.pony

+12-6
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class _TestTCPExpectNotify is TCPConnectionNotify
225225
_h.complete_action("expect received")
226226
qty
227227

228-
fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
228+
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
229229
if _frame then
230230
_frame = false
231231
_expect = 0
@@ -250,6 +250,7 @@ class _TestTCPExpectNotify is TCPConnectionNotify
250250
end
251251

252252
conn.expect(_expect)
253+
true
253254

254255
fun ref _send(conn: TCPConnection ref, data: String) =>
255256
let len = data.size()
@@ -301,7 +302,7 @@ class _TestTCPWritevNotifyServer is TCPConnectionNotify
301302
new iso create(h: TestHelper) =>
302303
_h = h
303304

304-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
305+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
305306
_buffer.append(consume data)
306307

307308
let expected = "hello, hello (from client)"
@@ -311,6 +312,7 @@ class _TestTCPWritevNotifyServer is TCPConnectionNotify
311312
_h.assert_eq[String](expected, consume buffer)
312313
_h.complete_action("server receive")
313314
end
315+
true
314316

315317
class iso _TestTCPMute is UnitTest
316318
"""
@@ -360,8 +362,9 @@ class _TestTCPMuteReceiveNotify is TCPConnectionNotify
360362
_h.complete_action("receiver asks for data")
361363
_h.dispose_when_done(conn)
362364

363-
fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
365+
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
364366
_h.complete(false)
367+
true
365368

366369
class _TestTCPMuteSendNotify is TCPConnectionNotify
367370
"""
@@ -383,9 +386,10 @@ class _TestTCPMuteSendNotify is TCPConnectionNotify
383386
fun ref connect_failed(conn: TCPConnection ref) =>
384387
_h.fail_action("sender connected")
385388

386-
fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
389+
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
387390
conn.write("it's sad that you won't ever read this")
388391
_h.complete_action("sender sent data")
392+
true
389393

390394
class iso _TestTCPUnmute is UnitTest
391395
"""
@@ -434,8 +438,9 @@ class _TestTCPUnmuteReceiveNotify is TCPConnectionNotify
434438
conn.unmute()
435439
_h.complete_action("receiver unmuted")
436440

437-
fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
441+
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
438442
_h.complete(true)
443+
true
439444

440445
class iso _TestTCPThrottle is UnitTest
441446
"""
@@ -505,9 +510,10 @@ class _TestTCPThrottleSendNotify is TCPConnectionNotify
505510
fun ref connect_failed(conn: TCPConnection ref) =>
506511
_h.fail_action("sender connected")
507512

508-
fun ref received(conn: TCPConnection ref, data: Array[U8] val) =>
513+
fun ref received(conn: TCPConnection ref, data: Array[U8] val): Bool =>
509514
conn.write("it's sad that you won't ever read this")
510515
_h.complete_action("sender sent data")
516+
true
511517

512518
fun ref throttled(conn: TCPConnection ref) =>
513519
_throttled_yet = true

packages/net/http/_request_builder.pony

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class _RequestBuilder is TCPConnectionNotify
3434

3535
_server = _ServerConnection(_handler, _logger, conn, host)
3636

37-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
37+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
3838
"""
3939
Assemble chunks of data into a request. When we have a whole request,
4040
dispatch it.
@@ -56,3 +56,4 @@ class _RequestBuilder is TCPConnectionNotify
5656
break
5757
end
5858
end
59+
true

packages/net/http/_response_builder.pony

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@ class _ResponseBuilder is TCPConnectionNotify
3333
"""
3434
_client._auth_failed(conn)
3535

36-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
36+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
3737
"""
3838
Assemble chunks of data into a response. When we have a whole response,
3939
give it to the client and start a new one.
4040
"""
4141
// TODO: inactivity timer
4242
_buffer.append(consume data)
4343
_dispatch(conn)
44+
true
4445

4546
fun ref closed(conn: TCPConnection ref) =>
4647
"""

packages/net/ssl/ssl_connection.pony

+2-1
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,14 @@ class SSLConnection is TCPConnectionNotify
6969

7070
recover val Array[ByteSeq] end
7171

72-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
72+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
7373
"""
7474
Pass the data to the SSL session and check for both new application data
7575
and new destination data.
7676
"""
7777
_ssl.receive(consume data)
7878
_poll(conn)
79+
true
7980

8081
fun ref expect(conn: TCPConnection ref, qty: USize): USize =>
8182
"""

packages/net/tcp_connection.pony

+7-2
Original file line numberDiff line numberDiff line change
@@ -616,8 +616,13 @@ actor TCPConnection
616616
data.truncate(_read_len)
617617
_read_len = 0
618618

619-
_notify.received(this, consume data)
620-
_read_buf_size()
619+
if not _notify.received(this, consume data) then
620+
_read_buf_size()
621+
_read_again()
622+
return
623+
else
624+
_read_buf_size()
625+
end
621626
end
622627

623628
sum = sum + len

packages/net/tcp_connection_notify.pony

+6-3
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,14 @@ interface TCPConnectionNotify
5858
"""
5959
data
6060

61-
fun ref received(conn: TCPConnection ref, data: Array[U8] iso) =>
61+
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
6262
"""
63-
Called when new data is received on the connection.
63+
Called when new data is received on the connection. Return true if you
64+
want to continue receiving messages without yielding until you read
65+
max_size on the TCPConnection. Return false to cause the TCPConnection
66+
to yield now.
6467
"""
65-
None
68+
true
6669

6770
fun ref expect(conn: TCPConnection ref, qty: USize): USize =>
6871
"""

0 commit comments

Comments
 (0)