Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 131 additions & 103 deletions zenoh/src/net/routing/dispatcher/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{
any::Any,
borrow::{Borrow, Cow},
collections::HashMap,
collections::{HashMap, VecDeque},
convert::TryInto,
hash::{Hash, Hasher},
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -395,40 +395,46 @@ impl Resource {
pub fn make_resource(
tables: &mut Tables,
from: &mut Arc<Resource>,
suffix: &str,
mut suffix: &str,
) -> Arc<Resource> {
let Some((chunk, rest)) = Self::split_first_chunk(suffix) else {
Resource::upgrade_resource(from, tables.hat_code.new_resource());
return from.clone();
};
if !chunk.starts_with('/') {
if !suffix.is_empty() && !suffix.starts_with('/') {
if let Some(parent) = &mut from.parent.clone() {
return Resource::make_resource(tables, parent, &[from.suffix(), suffix].concat());
}
}
if let Some(child) = get_mut_unchecked(from).children.get(chunk) {
return Resource::make_resource(tables, &mut child.0.clone(), rest);
}
let mut new = Arc::new(Resource::new(from, chunk, None));
if rest.is_empty() {
tracing::debug!("Register resource {}", new.expr());
let mut from = from.clone();
// do not use recursion as the tree may have arbitrary depth
while let Some((chunk, rest)) = Self::split_first_chunk(suffix) {
if let Some(child) = get_mut_unchecked(&mut from).children.get(chunk) {
from = child.0.clone();
} else {
let new = Arc::new(Resource::new(&from, chunk, None));
if rest.is_empty() {
tracing::debug!("Register resource {}", new.expr());
}
get_mut_unchecked(&mut from)
.children
.insert(Child(new.clone()));
from = new;
};
suffix = rest;
}
let res = Resource::make_resource(tables, &mut new, rest);
get_mut_unchecked(from).children.insert(Child(new));
res
Resource::upgrade_resource(&mut from, tables.hat_code.new_resource());
from
}

#[inline]
pub fn get_resource(from: &Arc<Resource>, suffix: &str) -> Option<Arc<Resource>> {
let Some((chunk, rest)) = Self::split_first_chunk(suffix) else {
return Some(from.clone());
};
if !chunk.starts_with('/') {
pub fn get_resource(mut from: &Arc<Resource>, mut suffix: &str) -> Option<Arc<Resource>> {
if !suffix.is_empty() && !suffix.starts_with('/') {
if let Some(parent) = &from.parent {
return Resource::get_resource(parent, &[from.suffix(), suffix].concat());
}
}
Resource::get_resource(from.children.get(chunk)?, rest)
// do not use recursion as the tree may have arbitrary depth
while let Some((chunk, rest)) = Self::split_first_chunk(suffix) {
(from, suffix) = (from.children.get(chunk)?, rest);
}
Some(from.clone())
}

/// Split the suffix at the next '/' (after leading one), returning None if the suffix is empty.
Expand Down Expand Up @@ -553,25 +559,48 @@ impl Resource {
}
/// Walk through the children tree, looking for a declared keyexpr.
fn get_best_child_key<'a>(
prefix: &Resource,
mut prefix: &Resource,
suffix: &'a str,
sid: usize,
) -> Option<WireExpr<'a>> {
let (chunk, rest) = Resource::split_first_chunk(suffix)?;
let child = prefix.children.get(chunk)?;
get_best_child_key(child, rest, sid)
.or_else(|| get_wire_expr(child, || rest.into(), sid))
let mut suffix_rest = suffix;
// do not use recursion as the tree may have arbitrary depth
// first we get the closest matching child
while let Some((chunk, rest)) = Resource::split_first_chunk(suffix_rest) {
match prefix.children.get(chunk) {
Some(child) => prefix = child,
None => break,
}
suffix_rest = rest;
}
// then we go backward checking the child and its parents
while suffix_rest != suffix {
if let Some(wire_expr) = get_wire_expr(prefix, || suffix_rest.into(), sid) {
return Some(wire_expr);
}
suffix_rest = &suffix[suffix.len() - suffix_rest.len() - prefix.suffix().len()..];
prefix = prefix.parent.as_ref().unwrap();
}
None
}
/// Walk through the parent tree, looking for a declared keyexpr.
fn get_best_parent_key<'a>(
prefix: &Resource,
suffix: &'a str,
sid: usize,
parent: &Resource,
mut parent: &Resource,
) -> Option<WireExpr<'a>> {
let parent_suffix = || [&prefix.expr[parent.expr.len()..], suffix].concat().into();
get_wire_expr(parent, parent_suffix, sid)
.or_else(|| get_best_parent_key(prefix, suffix, sid, parent.parent.as_ref()?))
// do not use recursion as the tree may have arbitrary depth
loop {
let parent_suffix = || [&prefix.expr[parent.expr.len()..], suffix].concat().into();
if let Some(wire_expr) = get_wire_expr(parent, parent_suffix, sid) {
return Some(wire_expr);
}
match parent.parent.as_ref() {
Some(p) => parent = p,
None => return None,
}
}
}
get_best_child_key(self, suffix, sid)
.or_else(|| get_wire_expr(self, || suffix.into(), sid))
Expand All @@ -580,96 +609,95 @@ impl Resource {
}

pub fn get_matches(tables: &Tables, key_expr: &keyexpr) -> Vec<Weak<Resource>> {
fn recursive_push(from: &Arc<Resource>, matches: &mut Vec<Weak<Resource>>) {
if from.context.is_some() {
matches.push(Arc::downgrade(from));
}
for child in from.children.iter() {
recursive_push(child, matches)
pub fn visit_nodes<T>(node: T, mut visit: impl FnMut(T, &mut VecDeque<T>)) {
let mut nodes = VecDeque::from([node]);
while let Some(node) = nodes.pop_front() {
visit(node, &mut nodes);
}
}
fn push_all(from: &Arc<Resource>, matches: &mut Vec<Weak<Resource>>) {
visit_nodes(from, |from, nodes| {
if from.context.is_some() {
matches.push(Arc::downgrade(from));
}
for child in from.children.iter() {
nodes.push_back(child);
}
});
}
fn get_matches_from(
key_expr: &keyexpr,
from: &Arc<Resource>,
matches: &mut Vec<Weak<Resource>>,
) {
if from.parent.is_none() || from.suffix() == "/" {
for child in from.children.iter() {
get_matches_from(key_expr, child, matches);
visit_nodes((key_expr, from), |(key_expr, from), nodes| {
if from.parent.is_none() || from.suffix() == "/" {
for child in from.children.iter() {
nodes.push_back((key_expr, child));
}
return;
}
return;
}
let suffix: &keyexpr = from
.suffix()
.strip_prefix('/')
.unwrap_or(from.suffix())
.try_into()
.unwrap();
let (ke_chunk, ke_rest) = match key_expr.split_once('/') {
// SAFETY: chunks of keyexpr are valid keyexprs
Some((chunk, rest)) => unsafe {
(
keyexpr::from_str_unchecked(chunk),
Some(keyexpr::from_str_unchecked(rest)),
)
},
None => (key_expr, None),
};
if ke_chunk.intersects(suffix) {
match ke_rest {
None => {
if ke_chunk.as_bytes() == b"**" {
recursive_push(from, matches)
} else {
if from.context.is_some() {
matches.push(Arc::downgrade(from));
}
if suffix.as_bytes() == b"**" {
for child in from.children.iter() {
get_matches_from(key_expr, child, matches)
let suffix: &keyexpr = from
.suffix()
.strip_prefix('/')
.unwrap_or(from.suffix())
.try_into()
.unwrap();
let (ke_chunk, ke_rest) = match key_expr.split_once('/') {
// SAFETY: chunks of keyexpr are valid keyexprs
Some((chunk, rest)) => unsafe {
(
keyexpr::from_str_unchecked(chunk),
Some(keyexpr::from_str_unchecked(rest)),
)
},
None => (key_expr, None),
};
if ke_chunk.intersects(suffix) {
match ke_rest {
None => {
if ke_chunk.as_bytes() == b"**" {
push_all(from, matches)
} else {
if from.context.is_some() {
matches.push(Arc::downgrade(from));
}
}
if let Some(child) =
from.children.get("/**").or_else(|| from.children.get("**"))
{
if child.context.is_some() {
matches.push(Arc::downgrade(child))
if suffix.as_bytes() == b"**" {
for child in from.children.iter() {
nodes.push_back((key_expr, child));
}
}
if let Some(child) =
from.children.get("/**").or_else(|| from.children.get("**"))
{
if child.context.is_some() {
matches.push(Arc::downgrade(child))
}
}
}
}
}
Some(rest) if rest.as_bytes() == b"**" => recursive_push(from, matches),
Some(rest) => {
let recheck_keyexpr_one_level_lower =
ke_chunk.as_bytes() == b"**" || suffix.as_bytes() == b"**";
for child in from.children.iter() {
get_matches_from(rest, child, matches);
Some(rest) if rest.as_bytes() == b"**" => push_all(from, matches),
Some(rest) => {
let recheck_keyexpr_one_level_lower =
ke_chunk.as_bytes() == b"**" || suffix.as_bytes() == b"**";
for child in from.children.iter() {
nodes.push_back((rest, child));
if recheck_keyexpr_one_level_lower {
nodes.push_back((key_expr, child));
}
}
if recheck_keyexpr_one_level_lower {
get_matches_from(key_expr, child, matches)
nodes.push_back((rest, from));
}
}
if recheck_keyexpr_one_level_lower {
get_matches_from(rest, from, matches)
}
}
};
}
};
}
})
}
let mut matches = Vec::new();
get_matches_from(key_expr, &tables.root_res, &mut matches);
let mut i = 0;
while i < matches.len() {
let current = matches[i].as_ptr();
let mut j = i + 1;
while j < matches.len() {
if std::ptr::eq(current, matches[j].as_ptr()) {
matches.swap_remove(j);
} else {
j += 1
}
}
i += 1
}
matches.sort_unstable_by_key(Weak::as_ptr);
matches.dedup_by_key(|res| Weak::as_ptr(res));
matches
}

Expand Down
45 changes: 36 additions & 9 deletions zenoh/src/net/tests/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ use zenoh_protocol::{
zenoh::{PushBody, Put},
};

use crate::net::{
primitives::{DummyPrimitives, EPrimitives, Primitives},
routing::{
dispatcher::{
face::{Face, FaceState},
pubsub::SubscriberInfo,
tables::Tables,
use crate::{
key_expr::KeyExpr,
net::{
primitives::{DummyPrimitives, EPrimitives, Primitives},
routing::{
dispatcher::{
face::{Face, FaceState},
pubsub::SubscriberInfo,
tables::Tables,
},
router::*,
RoutingContext,
},
router::*,
RoutingContext,
},
};

Expand Down Expand Up @@ -889,3 +892,27 @@ fn get_best_key_test() {
assert_wire_expr!(get_best_key("a", "/d", &face2), { scope: 0, suffix: "a/d" });
assert_wire_expr!(get_best_key("a/b", "", &face2), { scope: 2, suffix: "" });
}

#[test]
fn big_key_expr() {
let config = Config::default();
let router = Router::new(
ZenohIdProto::try_from([1]).unwrap(),
WhatAmI::Client,
None,
&config,
)
.unwrap();

let primitives = Arc::new(DummyPrimitives {});
let face = router.new_primitives(primitives.clone());

let root = zread!(router.tables.tables)._get_root().clone();
let key_expr = KeyExpr::new(vec!["a/"; 10000].concat() + "a").unwrap();
let wire_expr = WireExpr::from(&**key_expr);
register_expr(&router.tables, &mut face.state.clone(), 1, &wire_expr);
let res = Resource::get_resource(&root, &key_expr).unwrap();
root.get_best_key(&key_expr, face.state.id);
res.get_best_key("/a", face.state.id + 1);
Resource::get_matches(&face.tables.tables.read().unwrap(), &key_expr);
}