Skip to content

Commit 6e88952

Browse files
committed
move MonolithDiscoverer trait and implementations to ott-common crate
1 parent ea8574b commit 6e88952

File tree

11 files changed

+103
-83
lines changed

11 files changed

+103
-83
lines changed

Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ tokio-tungstenite = "0.21.0"
3333
tokio-util = "0.7.8"
3434
tracing = "0.1.40"
3535
tracing-subscriber = "0.3.17"
36+
trust-dns-resolver = { version = "0.22.0", features = ["system-config"] }
3637
tungstenite = "0.21.0"
3738
typeshare = "1.0.0"
3839
url = "2.3.1"

crates/ott-balancer/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ route-recognizer = "0.3.1"
3333
once_cell.workspace = true
3434
pin-project.workspace = true
3535
prometheus.workspace = true
36-
trust-dns-resolver = { version = "0.22.0", features = ["system-config"] }
36+
trust-dns-resolver.workspace = true
3737

3838
[dev-dependencies]
3939
criterion.workspace = true

crates/ott-balancer/src/discovery.rs

+2-82
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,8 @@
1-
//! Handles discovery of Monoliths.
1+
use std::{collections::HashSet, time::Duration};
22

3-
use std::net::IpAddr;
4-
use std::time::Duration;
5-
use std::{collections::HashSet, net::SocketAddr};
6-
7-
mod dns;
8-
mod fly;
9-
mod harness;
10-
mod manual;
11-
12-
pub use dns::*;
13-
pub use fly::*;
14-
pub use harness::*;
15-
pub use manual::*;
16-
17-
use async_trait::async_trait;
18-
use serde::Deserialize;
3+
pub use ott_common::discovery::*;
194
use tokio::task::JoinHandle;
205
use tracing::{debug, error, warn};
21-
use url::Url;
22-
23-
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize)]
24-
pub struct MonolithConnectionConfig {
25-
pub host: HostOrIp,
26-
pub port: u16,
27-
}
28-
29-
impl MonolithConnectionConfig {
30-
pub fn uri(&self) -> Url {
31-
let mut url = Url::parse("ws://localhost").unwrap();
32-
match self.host {
33-
HostOrIp::Host(ref host) => {
34-
url.set_host(Some(host)).unwrap();
35-
}
36-
HostOrIp::Ip(ip) => {
37-
url.set_ip_host(ip).unwrap();
38-
}
39-
}
40-
url.set_port(Some(self.port)).unwrap();
41-
42-
url
43-
}
44-
}
45-
46-
impl From<SocketAddr> for MonolithConnectionConfig {
47-
fn from(addr: SocketAddr) -> Self {
48-
Self {
49-
host: HostOrIp::Ip(addr.ip()),
50-
port: addr.port(),
51-
}
52-
}
53-
}
54-
55-
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
56-
pub enum HostOrIp {
57-
Host(String),
58-
Ip(IpAddr),
59-
}
60-
61-
impl<'de> Deserialize<'de> for HostOrIp {
62-
fn deserialize<D>(deserializer: D) -> Result<HostOrIp, D::Error>
63-
where
64-
D: serde::Deserializer<'de>,
65-
{
66-
let s = String::deserialize(deserializer)?;
67-
if let Ok(ip) = s.parse::<IpAddr>() {
68-
Ok(HostOrIp::Ip(ip))
69-
} else {
70-
Ok(HostOrIp::Host(s))
71-
}
72-
}
73-
}
74-
75-
#[async_trait]
76-
pub trait MonolithDiscoverer {
77-
/// In polling mode, this function should immediately return the current list of monoliths. In continuous mode, this function should wait until the list of monoliths changes, then return the new list.
78-
async fn discover(&mut self) -> anyhow::Result<Vec<MonolithConnectionConfig>>;
79-
fn mode(&self) -> DiscoveryMode;
80-
}
81-
82-
pub enum DiscoveryMode {
83-
Polling(Duration),
84-
Continuous,
85-
}
866

877
pub struct DiscoveryTask {
888
discovery: Box<dyn MonolithDiscoverer + Send + Sync>,

crates/ott-common/Cargo.toml

+7
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,22 @@ version = "0.10.0"
44
edition = "2021"
55

66
[dependencies]
7+
anyhow.workspace = true
78
async-trait.workspace = true
89
futures-util.workspace = true
910
hyper.workspace = true
1011
hyper-util.workspace = true
1112
http-body-util.workspace = true
13+
ott-balancer-protocol.workspace = true
1214
pin-project.workspace = true
15+
serde.workspace = true
16+
serde_json.workspace = true
17+
serde_path_to_error.workspace = true
1318
tracing.workspace = true
1419
tracing-subscriber.workspace = true
1520
tokio.workspace = true
1621
tokio-tungstenite.workspace = true
1722
tokio-util.workspace = true
23+
trust-dns-resolver.workspace = true
1824
tungstenite.workspace = true
25+
url.workspace = true

crates/ott-common/src/discovery.rs

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//! Handles discovery of Monoliths.
2+
3+
use std::net::IpAddr;
4+
use std::net::SocketAddr;
5+
use std::time::Duration;
6+
7+
mod dns;
8+
mod fly;
9+
mod harness;
10+
mod manual;
11+
12+
pub use dns::*;
13+
pub use fly::*;
14+
pub use harness::*;
15+
pub use manual::*;
16+
17+
use async_trait::async_trait;
18+
use serde::Deserialize;
19+
use tokio::task::JoinHandle;
20+
use url::Url;
21+
22+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize)]
23+
pub struct MonolithConnectionConfig {
24+
pub host: HostOrIp,
25+
pub port: u16,
26+
}
27+
28+
impl MonolithConnectionConfig {
29+
pub fn uri(&self) -> Url {
30+
let mut url = Url::parse("ws://localhost").unwrap();
31+
match self.host {
32+
HostOrIp::Host(ref host) => {
33+
url.set_host(Some(host)).unwrap();
34+
}
35+
HostOrIp::Ip(ip) => {
36+
url.set_ip_host(ip).unwrap();
37+
}
38+
}
39+
url.set_port(Some(self.port)).unwrap();
40+
41+
url
42+
}
43+
}
44+
45+
impl From<SocketAddr> for MonolithConnectionConfig {
46+
fn from(addr: SocketAddr) -> Self {
47+
Self {
48+
host: HostOrIp::Ip(addr.ip()),
49+
port: addr.port(),
50+
}
51+
}
52+
}
53+
54+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
55+
pub enum HostOrIp {
56+
Host(String),
57+
Ip(IpAddr),
58+
}
59+
60+
impl<'de> Deserialize<'de> for HostOrIp {
61+
fn deserialize<D>(deserializer: D) -> Result<HostOrIp, D::Error>
62+
where
63+
D: serde::Deserializer<'de>,
64+
{
65+
let s = String::deserialize(deserializer)?;
66+
if let Ok(ip) = s.parse::<IpAddr>() {
67+
Ok(HostOrIp::Ip(ip))
68+
} else {
69+
Ok(HostOrIp::Host(s))
70+
}
71+
}
72+
}
73+
74+
#[async_trait]
75+
pub trait MonolithDiscoverer {
76+
/// In polling mode, this function should immediately return the current list of monoliths. In continuous mode, this function should wait until the list of monoliths changes, then return the new list.
77+
async fn discover(&mut self) -> anyhow::Result<Vec<MonolithConnectionConfig>>;
78+
fn mode(&self) -> DiscoveryMode;
79+
}
80+
81+
pub enum DiscoveryMode {
82+
Polling(Duration),
83+
Continuous,
84+
}

crates/ott-common/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub mod discovery;
12
pub mod websocket;

0 commit comments

Comments
 (0)