Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified output #68

Merged
merged 13 commits into from
Apr 27, 2019
3 changes: 2 additions & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ authors = ["Nikolas Göbel <[email protected]>"]
edition = "2018"

[dependencies]
declarative-dataflow = { path = "../" }
declarative-dataflow = { path = "../", features = ["graphql"] }
serde = "1"
serde_json = "1"
rmp-serde = "0.13.7"
log = "0.4"
env_logger = "0.5.6"
clap = { version = "~2.33.0", features = ["yaml"] }
ws = "*"
uuid = { version = "0.7", features = ["serde", "v4"] }

[profile.release]
opt-level = 3
Expand Down
32 changes: 32 additions & 0 deletions cli/src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,38 @@ name: 3dfctl
version: "0.1.0"
author: Nikolas Göbel <[email protected]>
about: Consumer / producer CLI to 3DF.
args:
- host:
long: host
value_name: HOST
help: hostname of a peer
takes_value: true
- port:
long: port
value_name: PORT
help: port at which 3DF is listening
takes_value: true
subcommands:
- ping:
about: attempts to retrieve a heartbeat from the cluster
- req:
about: pushes arbitrary requests to the cluster
args:
- REQUEST:
help: request description in json
required: false
index: 1
- tx:
about: pushes transaction data to the cluster
args:
- TXDATA:
help: transaction data
required: false
index: 1
- gql:
about: subscribes to a GraphQL query
args:
- QUERY:
help: a GraphQL query
required: false
index: 1
146 changes: 140 additions & 6 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,164 @@ extern crate log;
#[macro_use]
extern crate clap;

use clap::App;
use std::io::Read;
use std::time::Duration;

use clap::App;
use uuid::Uuid;
use ws::{connect, CloseCode};

use declarative_dataflow::server::Request;
use declarative_dataflow::plan::{GraphQl, Plan};
use declarative_dataflow::server::{Interest, Register, Request};
use declarative_dataflow::sinks::{AssocIn, Sink};
use declarative_dataflow::{Output, Rule, TxData};

fn main() {
env_logger::init();

let cli_config = load_yaml!("cli.yml");
let matches = App::from_yaml(cli_config).get_matches();

if let Some(matches) = matches.subcommand_matches("ping") {
connect("ws://127.0.0.1:6262", |out| {
let host = matches.value_of("host").unwrap_or("127.0.0.1");
let port = matches.value_of("port").unwrap_or("6262");
let addr = format!("ws://{}:{}", host, port);

if let Some(_) = matches.subcommand_matches("ping") {
connect(addr.clone(), |out| {
let req = serde_json::to_string::<Vec<Request>>(&vec![Request::Status])
.expect("failed to serialize request");
.expect("failed to serialize requests");

out.send(req).unwrap();

move |msg| {
handle_message(msg)?;
out.close(CloseCode::Normal)
}
})
.expect("failed to connect");
}

if let Some(matches) = matches.subcommand_matches("req") {
connect(addr.clone(), |out| {
let reqs: Vec<Request> = match matches.value_of("REQUEST") {
None => {
let mut buf = String::new();
std::io::stdin()
.read_to_string(&mut buf)
.expect("failed to read from stdin");

serde_json::from_str(&buf).expect("failed to parse requests")
}
Some(arg) => serde_json::from_str(arg).expect("failed to parse requests"),
};

let req =
serde_json::to_string::<Vec<Request>>(&reqs).expect("failed to serialize requests");

debug!("{:?}", req);

out.send(req).unwrap();

move |msg| {
handle_message(msg)?;
out.close(CloseCode::Normal)
}
})
.expect("failed to connect");
}

if let Some(matches) = matches.subcommand_matches("tx") {
connect(addr.clone(), |out| {
let tx_data: Vec<TxData> = match matches.value_of("TXDATA") {
None => {
let mut buf = String::new();
std::io::stdin()
.read_to_string(&mut buf)
.expect("failed to read from stdin");

serde_json::from_str(&buf).expect("failed to parse tx data")
}
Some(tx_in) => serde_json::from_str(tx_in).expect("failed to parse tx data"),
};

let req = serde_json::to_string::<Vec<Request>>(&vec![Request::Transact(tx_data)])
.expect("failed to serialize requests");

debug!("{:?}", req);

out.send(req).unwrap();

move |msg| {
println!("Got message: {}", msg);
handle_message(msg)?;
out.close(CloseCode::Normal)
}
})
.expect("failed to connect");
}

if let Some(matches) = matches.subcommand_matches("gql") {
connect(addr.clone(), |out| {
let query: String = match matches.value_of("QUERY") {
None => {
let mut buf = String::new();
std::io::stdin()
.read_to_string(&mut buf)
.expect("failed to read from stdin");

buf
}
Some(query) => query.to_string(),
};

let name = Uuid::new_v4();

let req = serde_json::to_string::<Vec<Request>>(&vec![
Request::Register(Register {
rules: vec![Rule {
name: name.to_string(),
plan: Plan::GraphQl(GraphQl::new(query)),
}],
publish: vec![name.to_string()],
}),
Request::Interest(Interest {
name: name.to_string(),
tenant: None,
granularity: None,
sink: Some(Sink::AssocIn(AssocIn {})),
disable_logging: None,
}),
])
.expect("failed to serialize requests");

debug!("{:?}", req);

out.send(req).unwrap();

move |msg| handle_message(msg)
})
.expect("failed to connect");
}
}

fn handle_message(msg: ws::Message) -> ws::Result<()> {
match msg {
ws::Message::Text(msg) => {
trace!("{:?}", msg);

match serde_json::from_str::<Output<Duration>>(&msg) {
Err(err) => error!("{:?}", err),
Ok(out) => match out {
Output::Json(_, v, _, _) => {
let pprinted = serde_json::to_string_pretty(&v).expect("failed to pprint");
info!("{}", pprinted);
}
Output::Error(_, err, tx_id) => error!("{:?} @ {}", err, tx_id),
_ => info!("{:?}", out),
},
}
}
ws::Message::Binary(_) => unimplemented!(),
}

Ok(())
}
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.34.0
1.34.1
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ blocking = []
real-time = ["declarative-dataflow/real-time"]
csv-source = ["declarative-dataflow/csv-source"]
json-source = ["declarative-dataflow/json-source"]
graphql = ["declarative-dataflow/graphql"]

[profile.release]
opt-level = 3
Expand Down
Loading