Skip to content

Commit 6a5498d

Browse files
authored
split ott-balancer-bin into 2 crates (#1190)
* rename `ott-balancer-bin` to `ott-balancer` * add new `ott-balancer-bin` that just calls in to the `ott-balancer` crate
1 parent f65eaf6 commit 6a5498d

18 files changed

+175
-153
lines changed

Cargo.lock

+10-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ hyper-util = { git = "https://github.com/hyperium/hyper-util.git", rev = "f89801
1717
http-body-util = "0.1.0-rc.2"
1818
once_cell = "1.17.1"
1919
ott-common = { path = "crates/ott-common" }
20+
ott-balancer = { path = "crates/ott-balancer" }
2021
ott-balancer-protocol = { path = "crates/ott-balancer-protocol" }
2122
pin-project = "1.0.12"
2223
prometheus = { version = "0.13.3", features = ["process"] }

crates/ott-balancer-bin/Cargo.toml

+1-29
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,7 @@ name = "ott-balancer-bin"
33
version = "0.7.0"
44
edition = "2021"
55

6-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7-
86
[dependencies]
97
anyhow.workspace = true
10-
async-trait.workspace = true
11-
bytes.workspace = true
12-
clap.workspace = true
13-
console-subscriber.workspace = true
14-
figment.workspace = true
15-
futures-util.workspace = true
16-
hyper.workspace = true
17-
hyper-util.workspace = true
18-
http-body-util.workspace = true
19-
rand.workspace = true
20-
reqwest.workspace = true
21-
serde.workspace = true
22-
serde_json.workspace = true
23-
tracing.workspace = true
24-
tracing-subscriber.workspace = true
8+
ott-balancer.workspace = true
259
tokio.workspace = true
26-
tokio-tungstenite.workspace = true
27-
tokio-util.workspace = true
28-
tungstenite.workspace = true
29-
uuid.workspace = true
30-
url.workspace = true
31-
ott-common.workspace = true
32-
ott-balancer-protocol.workspace = true
33-
route-recognizer = "0.3.1"
34-
once_cell.workspace = true
35-
pin-project.workspace = true
36-
prometheus.workspace = true
37-
trust-dns-resolver = { version = "0.22.0", features = ["system-config"] }

crates/ott-balancer-bin/src/main.rs

+1-123
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,4 @@
1-
use std::net::Ipv6Addr;
2-
use std::{net::SocketAddr, sync::Arc};
3-
4-
use balancer::{start_dispatcher, Balancer, BalancerContext};
5-
use clap::Parser;
6-
use hyper::server::conn::http1;
7-
use tokio::net::TcpListener;
8-
use tokio::sync::RwLock;
9-
use tracing::{error, info};
10-
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
11-
use tracing_subscriber::util::SubscriberInitExt;
12-
use tracing_subscriber::EnvFilter;
13-
14-
use crate::config::{BalancerConfig, DiscoveryConfig};
15-
use crate::discovery::start_discovery_task;
16-
use crate::service::BalancerService;
17-
18-
mod balancer;
19-
mod client;
20-
mod config;
21-
mod connection;
22-
mod discovery;
23-
mod messages;
24-
mod monolith;
25-
mod room;
26-
mod service;
27-
281
#[tokio::main]
292
async fn main() -> anyhow::Result<()> {
30-
let args = config::Cli::parse();
31-
32-
BalancerConfig::load(&args.config_path)?;
33-
let config = BalancerConfig::get();
34-
35-
let console_layer = console_subscriber::spawn();
36-
let fmt_layer = tracing_subscriber::fmt::layer();
37-
let filter = args.build_tracing_filter();
38-
let filter_layer = EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new(filter))?;
39-
tracing_subscriber::registry()
40-
.with(console_layer)
41-
.with(filter_layer)
42-
.with(fmt_layer)
43-
.init();
44-
info!("Loaded config: {:?}", config);
45-
46-
let (discovery_tx, discovery_rx) = tokio::sync::mpsc::channel(2);
47-
48-
info!("Starting balancer");
49-
let ctx = Arc::new(RwLock::new(BalancerContext::new()));
50-
let balancer = Balancer::new(ctx.clone());
51-
let service_link = balancer.new_link();
52-
let conman_link = balancer.new_link();
53-
let _dispatcher_handle = start_dispatcher(balancer)?;
54-
info!("Dispatcher started");
55-
56-
info!("Starting monolith discovery");
57-
let _discovery_handle = match &config.discovery {
58-
DiscoveryConfig::Fly(config) => {
59-
let discovery = discovery::FlyMonolithDiscoverer::new(config.clone());
60-
start_discovery_task(discovery, discovery_tx)
61-
}
62-
DiscoveryConfig::Manual(config) => {
63-
let discovery = discovery::ManualMonolithDiscoverer::new(config.clone());
64-
start_discovery_task(discovery, discovery_tx)
65-
}
66-
DiscoveryConfig::Harness(config) => {
67-
let discovery = discovery::HarnessMonolithDiscoverer::new(config.clone());
68-
start_discovery_task(discovery, discovery_tx)
69-
}
70-
};
71-
info!("Monolith discovery started");
72-
73-
info!("Starting connection manager");
74-
let mut conman = connection::MonolithConnectionManager::new(discovery_rx, conman_link);
75-
let _conman_handle = tokio::task::Builder::new()
76-
.name("connection manager")
77-
.spawn(async move {
78-
loop {
79-
if let Err(err) = conman.do_connection_job().await {
80-
error!("Error in connection manager: {:?}", err);
81-
}
82-
}
83-
});
84-
85-
let bind_addr6: SocketAddr =
86-
SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), config.port);
87-
88-
let service = BalancerService {
89-
ctx,
90-
link: service_link,
91-
addr: bind_addr6,
92-
};
93-
94-
// on linux, binding ipv6 will also bind ipv4
95-
let listener6 = TcpListener::bind(bind_addr6).await?;
96-
97-
info!("Serving on {}", bind_addr6);
98-
loop {
99-
let (stream, addr) = tokio::select! {
100-
stream = listener6.accept() => {
101-
let (stream, addr) = stream?;
102-
(stream, addr)
103-
}
104-
};
105-
106-
let mut service = service.clone();
107-
service.addr = addr;
108-
109-
let io = hyper_util::rt::TokioIo::new(stream);
110-
111-
// Spawn a tokio task to serve multiple connections concurrently
112-
let result = tokio::task::Builder::new()
113-
.name("serve http")
114-
.spawn(async move {
115-
let conn = http1::Builder::new()
116-
.serve_connection(io, service)
117-
.with_upgrades();
118-
if let Err(err) = conn.await {
119-
error!("Error serving connection: {:?}", err);
120-
}
121-
});
122-
if let Err(err) = result {
123-
error!("Error spawning task to serve http: {:?}", err);
124-
}
125-
}
3+
ott_balancer::run().await
1264
}

crates/ott-balancer/Cargo.toml

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
[package]
2+
name = "ott-balancer"
3+
version = "0.7.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
anyhow.workspace = true
10+
async-trait.workspace = true
11+
bytes.workspace = true
12+
clap.workspace = true
13+
console-subscriber.workspace = true
14+
figment.workspace = true
15+
futures-util.workspace = true
16+
hyper.workspace = true
17+
hyper-util.workspace = true
18+
http-body-util.workspace = true
19+
rand.workspace = true
20+
reqwest.workspace = true
21+
serde.workspace = true
22+
serde_json.workspace = true
23+
tracing.workspace = true
24+
tracing-subscriber.workspace = true
25+
tokio.workspace = true
26+
tokio-tungstenite.workspace = true
27+
tokio-util.workspace = true
28+
tungstenite.workspace = true
29+
uuid.workspace = true
30+
url.workspace = true
31+
ott-common.workspace = true
32+
ott-balancer-protocol.workspace = true
33+
route-recognizer = "0.3.1"
34+
once_cell.workspace = true
35+
pin-project.workspace = true
36+
prometheus.workspace = true
37+
trust-dns-resolver = { version = "0.22.0", features = ["system-config"] }

crates/ott-balancer/src/lib.rs

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::net::Ipv6Addr;
2+
use std::{net::SocketAddr, sync::Arc};
3+
4+
use balancer::{start_dispatcher, Balancer, BalancerContext};
5+
use clap::Parser;
6+
use hyper::server::conn::http1;
7+
use tokio::net::TcpListener;
8+
use tokio::sync::RwLock;
9+
use tracing::{error, info};
10+
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
11+
use tracing_subscriber::util::SubscriberInitExt;
12+
use tracing_subscriber::EnvFilter;
13+
14+
use crate::config::{BalancerConfig, DiscoveryConfig};
15+
use crate::discovery::start_discovery_task;
16+
use crate::service::BalancerService;
17+
18+
mod balancer;
19+
mod client;
20+
mod config;
21+
mod connection;
22+
mod discovery;
23+
mod messages;
24+
mod monolith;
25+
mod room;
26+
mod service;
27+
28+
pub async fn run() -> anyhow::Result<()> {
29+
let args = config::Cli::parse();
30+
31+
BalancerConfig::load(&args.config_path)?;
32+
let config = BalancerConfig::get();
33+
34+
let console_layer = console_subscriber::spawn();
35+
let fmt_layer = tracing_subscriber::fmt::layer();
36+
let filter = args.build_tracing_filter();
37+
let filter_layer = EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new(filter))?;
38+
tracing_subscriber::registry()
39+
.with(console_layer)
40+
.with(filter_layer)
41+
.with(fmt_layer)
42+
.init();
43+
info!("Loaded config: {:?}", config);
44+
45+
let (discovery_tx, discovery_rx) = tokio::sync::mpsc::channel(2);
46+
47+
info!("Starting balancer");
48+
let ctx = Arc::new(RwLock::new(BalancerContext::new()));
49+
let balancer = Balancer::new(ctx.clone());
50+
let service_link = balancer.new_link();
51+
let conman_link = balancer.new_link();
52+
let _dispatcher_handle = start_dispatcher(balancer)?;
53+
info!("Dispatcher started");
54+
55+
info!("Starting monolith discovery");
56+
let _discovery_handle = match &config.discovery {
57+
DiscoveryConfig::Fly(config) => {
58+
let discovery = discovery::FlyMonolithDiscoverer::new(config.clone());
59+
start_discovery_task(discovery, discovery_tx)
60+
}
61+
DiscoveryConfig::Manual(config) => {
62+
let discovery = discovery::ManualMonolithDiscoverer::new(config.clone());
63+
start_discovery_task(discovery, discovery_tx)
64+
}
65+
DiscoveryConfig::Harness(config) => {
66+
let discovery = discovery::HarnessMonolithDiscoverer::new(config.clone());
67+
start_discovery_task(discovery, discovery_tx)
68+
}
69+
};
70+
info!("Monolith discovery started");
71+
72+
info!("Starting connection manager");
73+
let mut conman = connection::MonolithConnectionManager::new(discovery_rx, conman_link);
74+
let _conman_handle = tokio::task::Builder::new()
75+
.name("connection manager")
76+
.spawn(async move {
77+
loop {
78+
if let Err(err) = conman.do_connection_job().await {
79+
error!("Error in connection manager: {:?}", err);
80+
}
81+
}
82+
});
83+
84+
let bind_addr6: SocketAddr =
85+
SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), config.port);
86+
87+
let service = BalancerService {
88+
ctx,
89+
link: service_link,
90+
addr: bind_addr6,
91+
};
92+
93+
// on linux, binding ipv6 will also bind ipv4
94+
let listener6 = TcpListener::bind(bind_addr6).await?;
95+
96+
info!("Serving on {}", bind_addr6);
97+
loop {
98+
let (stream, addr) = tokio::select! {
99+
stream = listener6.accept() => {
100+
let (stream, addr) = stream?;
101+
(stream, addr)
102+
}
103+
};
104+
105+
let mut service = service.clone();
106+
service.addr = addr;
107+
108+
let io = hyper_util::rt::TokioIo::new(stream);
109+
110+
// Spawn a tokio task to serve multiple connections concurrently
111+
let result = tokio::task::Builder::new()
112+
.name("serve http")
113+
.spawn(async move {
114+
let conn = http1::Builder::new()
115+
.serve_connection(io, service)
116+
.with_upgrades();
117+
if let Err(err) = conn.await {
118+
error!("Error serving connection: {:?}", err);
119+
}
120+
});
121+
if let Err(err) = result {
122+
error!("Error spawning task to serve http: {:?}", err);
123+
}
124+
}
125+
}
File renamed without changes.

0 commit comments

Comments
 (0)