Skip to content

Commit 17ed685

Browse files
add qos overwrite interceptor
1 parent 41ac4a7 commit 17ed685

File tree

7 files changed

+640
-29
lines changed

7 files changed

+640
-29
lines changed

DEFAULT_CONFIG.json5

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,39 @@
398398
// ]
399399
//},
400400

401+
// /// Configure QoS overwrite rules.
402+
// qos_overwrite: [
403+
// {
404+
// /// Id has to be unique within the rule set.
405+
// id: "rule1",
406+
// // Optional list of interfaces, if not specified, will be applied to all interfaces.
407+
// interfaces: [
408+
// "lo0",
409+
// "en0",
410+
// ],
411+
// messages: ["put", "delete"],
412+
// flows: ["egress"],
413+
// key_exprs: ["test/demo"],
414+
// overwrite: {
415+
// /// Optional new priority value, if not specified priority of the messages will stay unchanged.
416+
// priority: "real_time",
417+
// /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged.
418+
// congestion_control: "block",
419+
// /// Optional new priority value, if not specified priority of the messages will stay unchanged.
420+
// express: true
421+
// },
422+
// },
423+
// {
424+
// id: "rule2",
425+
// messages: ["query", "reply"],
426+
// flows: ["ingress"],
427+
// key_exprs: ["**"],
428+
// overwrite: {
429+
// priority: "interactive_high",
430+
// },
431+
// },
432+
// ],
433+
401434
/// Configure internal transport parameters
402435
transport: {
403436
unicast: {

commons/zenoh-config/src/lib.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::{
3232
};
3333

3434
use include::recursive_include;
35-
use qos::PublisherQoSConfList;
35+
use qos::{PublisherQoSConfList, QosOverwrites};
3636
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
3737
use serde::{Deserialize, Serialize};
3838
use serde_json::{Map, Value};
@@ -201,6 +201,32 @@ pub enum Permission {
201201
Deny,
202202
}
203203

204+
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
205+
#[serde(rename_all = "snake_case")]
206+
pub enum QosOverwriteMessage {
207+
Put,
208+
Delete,
209+
Query,
210+
Reply,
211+
}
212+
213+
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
214+
pub struct QosOverwriteItemConf {
215+
/// Optional identifier for the qos modification configuration item.
216+
pub id: Option<String>,
217+
/// A list of interfaces to which the qos will be applied.
218+
/// QosOverwrite will be applied for all interfaces if the parameter is None.
219+
pub interfaces: Option<Vec<String>>,
220+
/// List of message types on which the qos ovewrite will be applied.
221+
pub messages: Vec<QosOverwriteMessage>,
222+
/// List of key expressions to apply qos ovewrite.
223+
pub key_exprs: Vec<OwnedKeyExpr>,
224+
// The qos value to ovewrite with.
225+
pub overwrite: QosOverwrites,
226+
/// QosOverwrite flow directions: egress and/or ingress.
227+
pub flows: Option<Vec<InterceptorFlow>>,
228+
}
229+
204230
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
205231
#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
206232
#[serde(rename_all = "kebab-case")]
@@ -667,6 +693,9 @@ validated_struct::validator! {
667693
pub policies: Option<Vec<AclConfigPolicyEntry>>,
668694
},
669695

696+
/// Configuration of the qos overwrite rules.
697+
pub qos_overwrite: Vec<QosOverwriteItemConf>,
698+
670699
/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
671700
/// The executable's current directory will be added to the search paths.
672701
pub plugins_loading: #[derive(Default)]

commons/zenoh-config/src/qos.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//
1414
use serde::{Deserialize, Serialize};
1515
use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree};
16-
use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability};
16+
use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Priority, Reliability};
1717

1818
#[derive(Debug, Deserialize, Default, Serialize, Clone)]
1919
pub struct PublisherQoSConfList(pub(crate) Vec<PublisherQoSConf>);
@@ -39,32 +39,32 @@ pub(crate) struct PublisherQoSConf {
3939

4040
#[derive(Debug, Default, Deserialize, Serialize, Clone)]
4141
pub struct PublisherQoSConfig {
42-
pub congestion_control: Option<PublisherCongestionControlConf>,
43-
pub priority: Option<PublisherPriorityConf>,
42+
pub congestion_control: Option<CongestionControlConf>,
43+
pub priority: Option<PriorityConf>,
4444
pub express: Option<bool>,
4545
#[cfg(feature = "unstable")]
46-
pub reliability: Option<PublisherReliabilityConf>,
46+
pub reliability: Option<ReliabilityConf>,
4747
#[cfg(feature = "unstable")]
4848
pub allowed_destination: Option<PublisherLocalityConf>,
4949
}
5050

5151
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
5252
#[serde(rename_all = "lowercase")]
53-
pub enum PublisherCongestionControlConf {
53+
pub enum CongestionControlConf {
5454
Drop,
5555
Block,
5656
}
5757

58-
impl From<PublisherCongestionControlConf> for CongestionControl {
59-
fn from(value: PublisherCongestionControlConf) -> Self {
58+
impl From<CongestionControlConf> for CongestionControl {
59+
fn from(value: CongestionControlConf) -> Self {
6060
match value {
61-
PublisherCongestionControlConf::Drop => Self::Drop,
62-
PublisherCongestionControlConf::Block => Self::Block,
61+
CongestionControlConf::Drop => Self::Drop,
62+
CongestionControlConf::Block => Self::Block,
6363
}
6464
}
6565
}
6666

67-
impl From<CongestionControl> for PublisherCongestionControlConf {
67+
impl From<CongestionControl> for CongestionControlConf {
6868
fn from(value: CongestionControl) -> Self {
6969
match value {
7070
CongestionControl::Drop => Self::Drop,
@@ -75,7 +75,7 @@ impl From<CongestionControl> for PublisherCongestionControlConf {
7575

7676
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
7777
#[serde(rename_all = "snake_case")]
78-
pub enum PublisherPriorityConf {
78+
pub enum PriorityConf {
7979
RealTime = 1,
8080
InteractiveHigh = 2,
8181
InteractiveLow = 3,
@@ -85,23 +85,37 @@ pub enum PublisherPriorityConf {
8585
Background = 7,
8686
}
8787

88+
impl From<PriorityConf> for Priority {
89+
fn from(value: PriorityConf) -> Self {
90+
match value {
91+
PriorityConf::RealTime => Self::RealTime,
92+
PriorityConf::InteractiveHigh => Self::InteractiveHigh,
93+
PriorityConf::InteractiveLow => Self::InteractiveLow,
94+
PriorityConf::DataHigh => Self::DataHigh,
95+
PriorityConf::Data => Self::Data,
96+
PriorityConf::DataLow => Self::DataLow,
97+
PriorityConf::Background => Self::Background,
98+
}
99+
}
100+
}
101+
88102
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
89103
#[serde(rename_all = "snake_case")]
90-
pub enum PublisherReliabilityConf {
104+
pub enum ReliabilityConf {
91105
BestEffort,
92106
Reliable,
93107
}
94108

95-
impl From<PublisherReliabilityConf> for Reliability {
96-
fn from(value: PublisherReliabilityConf) -> Self {
109+
impl From<ReliabilityConf> for Reliability {
110+
fn from(value: ReliabilityConf) -> Self {
97111
match value {
98-
PublisherReliabilityConf::BestEffort => Self::BestEffort,
99-
PublisherReliabilityConf::Reliable => Self::Reliable,
112+
ReliabilityConf::BestEffort => Self::BestEffort,
113+
ReliabilityConf::Reliable => Self::Reliable,
100114
}
101115
}
102116
}
103117

104-
impl From<Reliability> for PublisherReliabilityConf {
118+
impl From<Reliability> for ReliabilityConf {
105119
fn from(value: Reliability) -> Self {
106120
match value {
107121
Reliability::BestEffort => Self::BestEffort,
@@ -117,3 +131,13 @@ pub enum PublisherLocalityConf {
117131
Remote,
118132
Any,
119133
}
134+
135+
#[derive(Debug, Default, Deserialize, Serialize, Clone)]
136+
pub struct QosOverwrites {
137+
pub congestion_control: Option<CongestionControlConf>,
138+
pub priority: Option<PriorityConf>,
139+
pub express: Option<bool>,
140+
// TODO: Add support for reliability overwrite (it is not possible right now, since reliability is not a part of RoutingContext, nor NetworkMessage)
141+
// #[cfg(feature = "unstable")]
142+
// pub reliability: Option<ReliabilityConf>,
143+
}

zenoh/src/api/publisher.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{
2323
use futures::Sink;
2424
use serde::Deserialize;
2525
use tracing::error;
26-
use zenoh_config::qos::PublisherPriorityConf;
26+
use zenoh_config::qos::PriorityConf;
2727
use zenoh_core::{Resolvable, Resolve, Wait};
2828
use zenoh_protocol::core::CongestionControl;
2929
use zenoh_result::{Error, ZResult};
@@ -484,21 +484,21 @@ impl TryFrom<u8> for Priority {
484484
}
485485
}
486486

487-
impl From<PublisherPriorityConf> for Priority {
488-
fn from(value: PublisherPriorityConf) -> Self {
487+
impl From<PriorityConf> for Priority {
488+
fn from(value: PriorityConf) -> Self {
489489
match value {
490-
PublisherPriorityConf::RealTime => Self::RealTime,
491-
PublisherPriorityConf::InteractiveHigh => Self::InteractiveHigh,
492-
PublisherPriorityConf::InteractiveLow => Self::InteractiveLow,
493-
PublisherPriorityConf::DataHigh => Self::DataHigh,
494-
PublisherPriorityConf::Data => Self::Data,
495-
PublisherPriorityConf::DataLow => Self::DataLow,
496-
PublisherPriorityConf::Background => Self::Background,
490+
PriorityConf::RealTime => Self::RealTime,
491+
PriorityConf::InteractiveHigh => Self::InteractiveHigh,
492+
PriorityConf::InteractiveLow => Self::InteractiveLow,
493+
PriorityConf::DataHigh => Self::DataHigh,
494+
PriorityConf::Data => Self::Data,
495+
PriorityConf::DataLow => Self::DataLow,
496+
PriorityConf::Background => Self::Background,
497497
}
498498
}
499499
}
500500

501-
impl From<Priority> for PublisherPriorityConf {
501+
impl From<Priority> for PriorityConf {
502502
fn from(value: Priority) -> Self {
503503
match value {
504504
Priority::RealTime => Self::RealTime,

zenoh/src/net/routing/interceptor/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ use crate::api::key_expr::KeyExpr;
3535
pub mod downsampling;
3636
use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories;
3737

38+
pub mod qos_overwrite;
39+
use crate::net::routing::interceptor::qos_overwrite::qos_overwrite_interceptor_factories;
40+
3841
#[derive(Default, Debug)]
3942
pub struct InterfaceEnabled {
4043
pub ingress: bool,
@@ -88,6 +91,7 @@ pub(crate) fn interceptor_factories(config: &Config) -> ZResult<Vec<InterceptorF
8891
// res.push(Box::new(LoggerInterceptor {}));
8992
res.extend(downsampling_interceptor_factories(config.downsampling())?);
9093
res.extend(acl_interceptor_factories(config.access_control())?);
94+
res.extend(qos_overwrite_interceptor_factories(config.qos_overwrite())?);
9195
Ok(res)
9296
}
9397

0 commit comments

Comments
 (0)