Skip to content

Commit

Permalink
Create database schema in its own command
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Jan 7, 2022
1 parent 9bf4111 commit eb81826
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 28 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Latte is still early stage software under intensive development.
Start a Cassandra cluster somewhere (can be a local node). Then run:

```shell
latte schema <workload.rn> [<node address>] # create the database schema
latte load <workload.rn> [<node address>] # populate the database with data
latte run <workload.rn> [<node address>] # execute the workload and measure the performance
```
Expand Down Expand Up @@ -124,13 +125,16 @@ Instance functions on `ctx` are asynchronous, so you should call `await` on them

### Schema creation

You can create your own keyspaces and tables in the `schema` function:
You can (re)create your own keyspaces and tables needed by the benchmark in the `schema` function.
The `schema` function should also drop the old schema if present.
The `schema` function is executed by running `latte schema` command.

```rust
pub async fn schema(ctx) {
ctx.execute("CREATE KEYSPACE IF NOT EXISTS test \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }").await?;
ctx.execute("CREATE TABLE IF NOT EXISTS test.test(id bigint, data varchar").await?;
ctx.execute("DROP TABLE IF NOT EXISTS test.test").await?;
ctx.execute("CREATE TABLE test.test(id bigint, data varchar)").await?;
}
```

Expand Down
36 changes: 32 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ pub struct ConnectionConf {
pub addresses: Vec<String>,
}

#[derive(Parser, Debug, Serialize, Deserialize)]
#[clap(
setting(AppSettings::NextLineHelp),
setting(AppSettings::DeriveDisplayOrder)
)]
pub struct SchemaCommand {
/// Parameter values passed to the workload, accessible through param! macro.
#[clap(short('P'), parse(try_from_str = parse_key_val),
number_of_values = 1, multiple_occurrences = true)]
pub params: Vec<(String, String)>,

/// Path to the workload definition file.
#[clap(name = "workload", required = true, value_name = "PATH")]
pub workload: PathBuf,

// Cassandra connection settings.
#[clap(flatten)]
pub connection: ConnectionConf,
}

#[derive(Parser, Debug, Serialize, Deserialize)]
#[clap(
setting(AppSettings::NextLineHelp),
Expand Down Expand Up @@ -271,7 +291,17 @@ pub struct HdrCommand {
#[derive(Parser, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Command {
/// Generates the data needed for the benchmark (typically needed by read benchmarks).
/// Creates the database schema by invoking the `schema` function of the workload script.
///
/// The function should remove the old schema if present.
/// Calling this is likely to remove data from the database.
Schema(SchemaCommand),

/// Erases and generates fresh data needed for the benchmark by invoking the `erase` and `load`
/// functions of the workload script.
///
/// Running this command is typically needed by read benchmarks.
/// You need to create the schema before.
Load(LoadCommand),

/// Runs the benchmark.
Expand All @@ -285,13 +315,11 @@ pub enum Command {
/// Can compare two runs.
Show(ShowCommand),

/// Exports call- and response-time histograms as a compressed HDR interval log.
/// Exports histograms as a compressed HDR interval log.
///
/// To be used with HdrHistogram (https://github.com/HdrHistogram/HdrHistogram).
/// Timestamps are given in seconds since Unix epoch.
/// Response times are recorded in nanoseconds.
///
/// Each histogram is tagged by the benchmark name, parameters and benchmark tags.
Hdr(HdrCommand),
}

Expand Down
40 changes: 23 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio::runtime::{Builder, Runtime};
use config::RunCommand;

use crate::config::{
AppConfig, Command, ConnectionConf, HdrCommand, Interval, LoadCommand, ShowCommand,
AppConfig, Command, ConnectionConf, HdrCommand, Interval, LoadCommand, SchemaCommand,
ShowCommand,
};
use crate::context::*;
use crate::context::{CassError, CassErrorKind, Context, SessionStats};
Expand Down Expand Up @@ -105,17 +106,29 @@ async fn connect(conf: &ConnectionConf) -> Result<(Context, Option<ClusterInfo>)
Ok((session, cluster_info))
}

async fn load(conf: LoadCommand) -> Result<()> {
/// Runs the `schema` function of the workload script.
/// Exits with error if the `schema` function is not present or fails.
async fn schema(conf: SchemaCommand) -> Result<()> {
let mut program = load_workload_script(&conf.workload, &conf.params)?;
let (mut session, _) = connect(&conf.connection).await?;

if program.has_schema() {
eprintln!("info: Creating schema...");
if let Err(e) = program.schema(&mut session).await {
eprintln!("error: Failed to create schema: {}", e);
exit(255);
}
if !program.has_schema() {
eprintln!("error: Function `schema` not found in the workload script.");
exit(255);
}
eprintln!("info: Creating schema...");
if let Err(e) = program.schema(&mut session).await {
eprintln!("error: Failed to create schema: {}", e);
exit(255);
}
eprintln!("info: Schema created successfully");
Ok(())
}

/// Loads the data into the database.
/// Exits with error if the `load` function is not present or fails.
async fn load(conf: LoadCommand) -> Result<()> {
let mut program = load_workload_script(&conf.workload, &conf.params)?;
let (mut session, _) = connect(&conf.connection).await?;

if program.has_prepare() {
eprintln!("info: Preparing...");
Expand Down Expand Up @@ -184,14 +197,6 @@ async fn run(conf: RunCommand) -> Result<()> {
conf.cass_version = Some(cluster_info.cassandra_version);
}

if program.has_schema() {
eprintln!("info: Creating schema...");
if let Err(e) = program.schema(&mut session).await {
eprintln!("error: Failed to create schema: {}", e);
exit(255);
}
}

if program.has_prepare() {
eprintln!("info: Preparing...");
if let Err(e) = program.prepare(&mut session).await {
Expand Down Expand Up @@ -347,6 +352,7 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {

async fn async_main(command: Command) -> Result<()> {
match command {
Command::Schema(config) => schema(config).await?,
Command::Load(config) => load(config).await?,
Command::Run(config) => run(config).await?,
Command::Show(config) => show(config).await?,
Expand Down
3 changes: 2 additions & 1 deletion workloads/basic/read.rn
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const TABLE = "basic";
pub async fn schema(ctx) {
ctx.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
ctx.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
ctx.execute(`DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
ctx.execute(`CREATE TABLE ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(ctx) {
Expand Down
3 changes: 2 additions & 1 deletion workloads/basic/write-blob.rn
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const TABLE = "blob";
pub async fn schema(db) {
db.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
db.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY, data BLOB)`).await?;
db.execute(`DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
db.execute(`CREATE TABLE ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY, data BLOB)`).await?;
}

pub async fn erase(db) {
Expand Down
3 changes: 2 additions & 1 deletion workloads/basic/write.rn
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const TABLE = "basic";
pub async fn schema(db) {
db.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
db.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
db.execute(`DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
db.execute(`CREATE TABLE ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(db) {
Expand Down
4 changes: 3 additions & 1 deletion workloads/sai/new/common.rn
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ pub async fn init_schema(db) {
CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE}
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }`).await?;
db.execute(`
CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE} (
DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
db.execute(`
CREATE TABLE ${KEYSPACE}.${TABLE} (
par_id bigint,
row_id uuid,
time1 timestamp,
Expand Down
4 changes: 3 additions & 1 deletion workloads/sai/orig/lib.rn
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub async fn schema(ctx) {
CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE}
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }`).await?;
ctx.execute(`
CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE} (
DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
ctx.execute(`
CREATE TABLE ${KEYSPACE}.${TABLE} (
id uuid,
time timestamp,
value int,
Expand Down

0 comments on commit eb81826

Please sign in to comment.