Skip to content

Commit 3695afb

Browse files
mergify[bot]JEnoch
andauthored
Bump Zenoh to v1.3.2 and improve e2e reliability with HeartbeatSporadic (#591) (#594)
* Bump zenoh-c to ffa4bdd, zenoh-cpp to 868fdad and zenoh to 3f62ebc * Apply same config changes than eclipse-zenoh/zenoh#1825 * Apply same config changes than eclipse-zenoh/zenoh#1850 * For RELIABLE+TRANSIENT_LOCAL topics, enable sample_miss_detection and recovery for end-to-end reliability (cherry picked from commit a9ab960) Co-authored-by: Julien Enoch <[email protected]>
1 parent a3bf7e9 commit 3695afb

File tree

5 files changed

+131
-25
lines changed

5 files changed

+131
-25
lines changed

rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,42 @@
257257
// },
258258
// },
259259
// ],
260+
// /// Overwrite QoS options for messages sent and received from/to the network
261+
// /// This allows more fine grained rules (per network card, etc...) but is
262+
// /// less performant than the publication option above.
263+
// network: [
264+
// {
265+
// /// Optional Id, has to be unique.
266+
// id: "lo0_en0_qos_overwrite",
267+
// // Optional list of interfaces, if not specified, will be applied to all interfaces.
268+
// interfaces: [
269+
// "lo0",
270+
// "en0",
271+
// ],
272+
// /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten.
273+
// /// If absent, the overwrite will be applied to all transports. An empty list is invalid.
274+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
275+
// /// List of message types to apply to.
276+
// messages: [
277+
// "put", // put publications
278+
// "delete" // delete publications
279+
// "query", // get queries
280+
// "reply", // replies to queries
281+
// ],
282+
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
283+
// /// If absent, the rules will be applied to both flows.
284+
// flows: ["egress", "ingress"],
285+
// key_exprs: ["test/demo"],
286+
// overwrite: {
287+
// /// Optional new priority value, if not specified priority of the messages will stay unchanged.
288+
// priority: "real_time",
289+
// /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged.
290+
// congestion_control: "block",
291+
// /// Optional new express value, if not specified express flag of the messages will stay unchanged.
292+
// express: true
293+
// },
294+
// },
295+
// ],
260296
// },
261297

262298
// /// The declarations aggregation strategy.
@@ -287,8 +323,11 @@
287323
// /// Optional Id, has to be unique
288324
// "id": "wlan0egress",
289325
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
290-
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
326+
// /// If absent, the rules will be applied to all interfaces. An empty list is invalid.
291327
// interfaces: [ "wlan0" ],
328+
// /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered.
329+
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
330+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
292331
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
293332
// /// If absent, the rules will be applied to both flows.
294333
// flow: ["ingress", "egress"],
@@ -387,6 +426,12 @@
387426
// "id": "subject3",
388427
// /// An empty subject combination is a wildcard
389428
// },
429+
// {
430+
// "id": "subject4",
431+
// /// link protocols can also be used to identify transports to filter messages on.
432+
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
433+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
434+
// },
390435
// ],
391436
// /// The policies list associates rules to subjects
392437
// "policies":
@@ -401,7 +446,7 @@
401446
// },
402447
// {
403448
// "rules": ["rule2"],
404-
// "subjects": ["subject3"],
449+
// "subjects": ["subject3", "subject4"],
405450
// },
406451
// ]
407452
//},

rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,42 @@
265265
// },
266266
// },
267267
// ],
268+
// /// Overwrite QoS options for messages sent and received from/to the network
269+
// /// This allows more fine grained rules (per network card, etc...) but is
270+
// /// less performant than the publication option above.
271+
// network: [
272+
// {
273+
// /// Optional Id, has to be unique.
274+
// id: "lo0_en0_qos_overwrite",
275+
// // Optional list of interfaces, if not specified, will be applied to all interfaces.
276+
// interfaces: [
277+
// "lo0",
278+
// "en0",
279+
// ],
280+
// /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten.
281+
// /// If absent, the overwrite will be applied to all transports. An empty list is invalid.
282+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
283+
// /// List of message types to apply to.
284+
// messages: [
285+
// "put", // put publications
286+
// "delete" // delete publications
287+
// "query", // get queries
288+
// "reply", // replies to queries
289+
// ],
290+
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
291+
// /// If absent, the rules will be applied to both flows.
292+
// flows: ["egress", "ingress"],
293+
// key_exprs: ["test/demo"],
294+
// overwrite: {
295+
// /// Optional new priority value, if not specified priority of the messages will stay unchanged.
296+
// priority: "real_time",
297+
// /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged.
298+
// congestion_control: "block",
299+
// /// Optional new express value, if not specified express flag of the messages will stay unchanged.
300+
// express: true
301+
// },
302+
// },
303+
// ],
268304
// },
269305

270306
// /// The declarations aggregation strategy.
@@ -295,8 +331,11 @@
295331
// /// Optional Id, has to be unique
296332
// "id": "wlan0egress",
297333
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
298-
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
334+
// /// If absent, the rules will be applied to all interfaces. An empty list is invalid.
299335
// interfaces: [ "wlan0" ],
336+
// /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered.
337+
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
338+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
300339
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
301340
// /// If absent, the rules will be applied to both flows.
302341
// flow: ["ingress", "egress"],
@@ -395,6 +434,12 @@
395434
// "id": "subject3",
396435
// /// An empty subject combination is a wildcard
397436
// },
437+
// {
438+
// "id": "subject4",
439+
// /// link protocols can also be used to identify transports to filter messages on.
440+
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
441+
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
442+
// },
398443
// ],
399444
// /// The policies list associates rules to subjects
400445
// "policies":
@@ -409,7 +454,7 @@
409454
// },
410455
// {
411456
// "rules": ["rule2"],
412-
// "subjects": ["subject3"],
457+
// "subjects": ["subject3", "subject4"],
413458
// },
414459
// ]
415460
//},

rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ namespace rmw_zenoh_cpp
4444
// TODO(yuyuan): SHM, make this configurable
4545
#define SHM_BUF_OK_SIZE 2621440
4646

47+
// Period (ms) of heartbeats sent for detection of lost samples
48+
// by a RELIABLE + TRANSIENT_LOCAL Publisher
49+
#define SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD 500
50+
4751
///=============================================================================
4852
std::shared_ptr<PublisherData> PublisherData::make(
4953
std::shared_ptr<zenoh::Session> session,
@@ -100,6 +104,14 @@ std::shared_ptr<PublisherData> PublisherData::make(
100104
adv_pub_opts.publisher_detection = true;
101105
adv_pub_opts.cache = AdvancedPublisherOptions::CacheOptions::create_default();
102106
adv_pub_opts.cache->max_samples = adapted_qos_profile.depth;
107+
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
108+
// If RELIABLE + TRANSIENT_LOCAL activate sample miss detection for subscriber
109+
// to detect missed samples and retrieve those from the Publisher cache.
110+
// HeartbeatSporadic is used to prevent excessive background traffic
111+
adv_pub_opts.sample_miss_detection.emplace().heartbeat =
112+
AdvancedPublisherOptions::SampleMissDetectionOptions::HeartbeatSporadic{
113+
SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD};
114+
}
103115
}
104116

105117
zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);

rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,13 @@ bool SubscriptionData::init()
174174
// Enable detection of late joiner publishers and query for their historical data.
175175
adv_sub_opts.history->detect_late_publishers = true;
176176
adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth;
177+
if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
178+
// Activate recovery of lost samples.
179+
// This requires the Publisher to have sample_miss_detection configured,
180+
// which is the case for a RELIABLE + TRANSIENT_LOCAL Publisher.
181+
adv_sub_opts.recovery.emplace().last_sample_miss_detection =
182+
AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
183+
}
177184
}
178185

179186
std::weak_ptr<SubscriptionData> data_wp = shared_from_this();

zenoh_cpp_vendor/CMakeLists.txt

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,25 @@ find_package(ament_cmake_vendor_package REQUIRED)
1818
set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_udp zenoh/transport_tls")
1919

2020
# Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from:
21-
# - Reword SHM warning log about "setting scheduling priority":
22-
# - https://github.com/eclipse-zenoh/zenoh/pull/1778
23-
# - Performances improvements at launch time:
24-
# - https://github.com/eclipse-zenoh/zenoh/pull/1786
25-
# - https://github.com/eclipse-zenoh/zenoh/pull/1789
26-
# - https://github.com/eclipse-zenoh/zenoh/pull/1793
27-
# - Fixed open timeout
28-
# - https://github.com/eclipse-zenoh/zenoh/pull/1796
29-
# - Improve ACL behaviour, notably for S-ROS
30-
# - https://github.com/eclipse-zenoh/zenoh/pull/1781
31-
# - https://github.com/eclipse-zenoh/zenoh/pull/1785
32-
# - https://github.com/eclipse-zenoh/zenoh/pull/1795
33-
# - https://github.com/eclipse-zenoh/zenoh/pull/1806
34-
# - Reduce the number of threads in case of scouting
35-
# - https://github.com/eclipse-zenoh/zenoh-c/pull/937
36-
# - Namespace prefix support
37-
# - https://github.com/eclipse-zenoh/zenoh/pull/1792
38-
# - Fix debug mode crash
39-
# - https://github.com/eclipse-zenoh/zenoh-cpp/pull/432
21+
# - Fix a bug leading to invalid inapropriate "Unable to push non droppable network message" log and transport closure:
22+
# - https://github.com/eclipse-zenoh/zenoh/pull/1855
23+
# - Fix crash with highly chunked keys:
24+
# - https://github.com/eclipse-zenoh/zenoh/pull/1826
25+
# - Resolve issue with closing the Session in atexit:
26+
# - https://github.com/eclipse-zenoh/zenoh/pull/1632
27+
# - Change `Session::close()` implementation so it can be safely waited and awaited in `atexit``
28+
# - https://github.com/eclipse-zenoh/zenoh/pull/1632
29+
# - Add QoS overwrite interceptor allowing for instance a Router to be configured to change QoS on the fly
30+
# - https://github.com/eclipse-zenoh/zenoh/pull/1825
31+
# - Add link protocols as subject to interceptors (access_control, downsampling or qos overwrite):
32+
# - https://github.com/eclipse-zenoh/zenoh/pull/1850
33+
# - Add new non periodic last sample miss detection mechanism for Advanced Publisher:
34+
# - https://github.com/eclipse-zenoh/zenoh/pull/1861
35+
# - Improve tracing for better analysis on the system like rmw_zenoh
36+
# - https://github.com/eclipse-zenoh/zenoh/pull/1844
4037
ament_vendor(zenoh_c_vendor
4138
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
42-
VCS_VERSION e6a1971139f405f7887bf5bb54f0efe402123032
39+
VCS_VERSION ffa4bddc947f7ed6c0e3b4546205dd1b73e7df81
4340
CMAKE_ARGS
4441
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
4542
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
@@ -50,7 +47,7 @@ ament_export_dependencies(zenohc)
5047

5148
ament_vendor(zenoh_cpp_vendor
5249
VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp
53-
VCS_VERSION 8ad67f6c7a9031acd437c8739bbc8ddab0ca8173
50+
VCS_VERSION 868fdad0e7418e8f8cb96e94c89a3aed05905e63
5451
CMAKE_ARGS
5552
-DZENOHCXX_ZENOHC=OFF
5653
)

0 commit comments

Comments
 (0)