Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix #228 - Abstract the output format and clean up
Browse files Browse the repository at this point in the history
Lars T Hansen committed Jan 14, 2025
1 parent bc3f29d commit b263d36
Showing 5 changed files with 711 additions and 282 deletions.
91 changes: 78 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ mod interrupt;
mod jobs;
mod log;
mod nvidia;
mod output;
mod procfs;
mod procfsapi;
mod ps;
@@ -17,6 +18,8 @@ mod time;
mod users;
mod util;

use std::io;

const TIMEOUT_SECONDS: u64 = 5; // For subprocesses
const USAGE_ERROR: i32 = 2; // clap, Python, Go

@@ -57,9 +60,15 @@ enum Commands {
/// One output record per Sonar invocation will contain a load= field with an encoding of
/// the per-cpu usage since boot.
load: bool,

/// Output JSON, not CSV
json: bool,
},
/// Extract system information
Sysinfo {},
Sysinfo {
/// Output CSV, not JSON
csv: bool,
},
/// Extract slurm job information
Slurmjobs {
/// Set the sacct start time to now-`window` and the end time to now, and dump records that
@@ -70,6 +79,9 @@ enum Commands {
/// From-to dates on the form yyyy-mm-dd,yyyy-mm-dd (with the comma); from is inclusive,
/// to is exclusive. Precludes -window.
span: Option<String>,

/// Output json, not CSV
json: bool,
},
Version {},
}
@@ -83,6 +95,9 @@ fn main() {

log::init();

let mut stdout = io::stdout();
let writer: &mut dyn io::Write = &mut stdout;

match &command_line() {
Commands::PS {
rollup,
@@ -95,6 +110,7 @@ fn main() {
exclude_commands,
lockdir,
load,
json,
} => {
let opts = ps::PsOptions {
rollup: *rollup,
@@ -115,30 +131,33 @@ fn main() {
vec![]
},
lockdir: lockdir.clone(),
json: *json,
};
if *batchless {
let mut jm = batchless::BatchlessJobManager::new();
ps::create_snapshot(&mut jm, &opts, &timestamp);
ps::create_snapshot(writer, &mut jm, &opts, &timestamp);
} else {
let mut jm = slurm::SlurmJobManager {};
ps::create_snapshot(&mut jm, &opts, &timestamp);
ps::create_snapshot(writer, &mut jm, &opts, &timestamp);
}
}
Commands::Sysinfo {} => {
sysinfo::show_system(&timestamp);
Commands::Sysinfo { csv } => {
sysinfo::show_system(writer, &timestamp, *csv);
}
Commands::Slurmjobs { window, span } => {
slurmjobs::show_slurm_jobs(window, span);
Commands::Slurmjobs { window, span, json } => {
slurmjobs::show_slurm_jobs(writer, window, span, *json);
}
Commands::Version {} => {
show_version(&mut std::io::stdout());
show_version(writer);
}
}
let _ = writer.flush();
}

// For the sake of simplicity:
// - allow repeated options to overwrite earlier values
// - all error reporting is via a generic "usage" message, without specificity as to what was wrong
// - both --json and --csv are accepted to all commands

fn command_line() -> Commands {
let args = std::env::args().collect::<Vec<String>>();
@@ -158,6 +177,8 @@ fn command_line() -> Commands {
let mut exclude_commands = None;
let mut lockdir = None;
let mut load = false;
let mut json = false;
let mut csv = false;
while next < args.len() {
let arg = args[next].as_ref();
next += 1;
@@ -167,6 +188,10 @@ fn command_line() -> Commands {
(next, rollup) = (new_next, true);
} else if let Some(new_next) = bool_arg(arg, &args, next, "--load") {
(next, load) = (new_next, true);
} else if let Some(new_next) = bool_arg(arg, &args, next, "--json") {
(next, json) = (new_next, true);
} else if let Some(new_next) = bool_arg(arg, &args, next, "--csv") {
(next, csv) = (new_next, true);
} else if let Some(new_next) =
bool_arg(arg, &args, next, "--exclude-system-jobs")
{
@@ -210,6 +235,10 @@ fn command_line() -> Commands {
eprintln!("--rollup and --batchless are incompatible");
std::process::exit(USAGE_ERROR);
}
if json && csv {
eprintln!("--csv and --json are incompatible");
std::process::exit(USAGE_ERROR);
}

Commands::PS {
batchless,
@@ -222,12 +251,34 @@ fn command_line() -> Commands {
exclude_commands,
lockdir,
load,
json,
}
}
"sysinfo" => Commands::Sysinfo {},
"sysinfo" => {
let mut json = false;
let mut csv = false;
while next < args.len() {
let arg = args[next].as_ref();
next += 1;
if let Some(new_next) = bool_arg(arg, &args, next, "--json") {
(next, json) = (new_next, true);
} else if let Some(new_next) = bool_arg(arg, &args, next, "--csv") {
(next, csv) = (new_next, true);
} else {
usage(true);
}
}
if json && csv {
eprintln!("--csv and --json are incompatible");
std::process::exit(USAGE_ERROR);
}
Commands::Sysinfo { csv }
}
"slurm" => {
let mut window = None;
let mut span = None;
let mut json = false;
let mut csv = false;
while next < args.len() {
let arg = args[next].as_ref();
next += 1;
@@ -237,14 +288,22 @@ fn command_line() -> Commands {
(next, window) = (new_next, Some(value));
} else if let Some((new_next, value)) = string_arg(arg, &args, next, "--span") {
(next, span) = (new_next, Some(value));
} else if let Some(new_next) = bool_arg(arg, &args, next, "--json") {
(next, json) = (new_next, true);
} else if let Some(new_next) = bool_arg(arg, &args, next, "--csv") {
(next, csv) = (new_next, true);
} else {
usage(true);
}
}
if window.is_some() && span.is_some() {
usage(true);
}
Commands::Slurmjobs { window, span }
if json && csv {
eprintln!("--csv and --json are incompatible");
std::process::exit(USAGE_ERROR);
}
Commands::Slurmjobs { window, span, json }
}
"version" => Commands::Version {},
"help" => {
@@ -316,9 +375,9 @@ fn usage(is_error: bool) -> ! {
Usage: sonar <COMMAND>
Commands:
ps Take a snapshot of the currently running processes
sysinfo Extract system information
slurm Extract slurm job information for a [start,end) time interval
ps Print process and load information
sysinfo Print system information
slurm Print slurm job information for a [start,end) time interval
help Print this message
Options for `ps`:
@@ -345,6 +404,10 @@ Options for `ps`:
--lockdir directory
Create a per-host lockfile in this directory and exit early if the file
exists on startup [default: none]
--load
Print per-cpu and per-gpu load data
--json
Format output as JSON, not CSV
Options for `slurm`:
--window minutes
@@ -353,6 +416,8 @@ Options for `slurm`:
--span start,end
Both `start` and `end` are on the form yyyy-mm-dd. Mostly useful for seeding a
database with older data. Precludes --window
--json
Format output as JSON, not CSV
",
);
let _ = out.flush();
416 changes: 416 additions & 0 deletions src/output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,416 @@
// Define a nested data structure of arrays, objects, and scalar values that can subsequently be
// serialized, currently as CSV and JSON, following conventions that are backward compatible with
// the older ad-hoc Sonar formatting code.
//
// Adding eg a compact binary serialization form would be very simple.

use crate::util;

use std::io;

pub enum Value {
A(Array),
O(Object),
S(String),
U(u64),
I(i64),
F(f64),
E(), // Empty array element only, never a field or toplevel value
}

struct Field {
tag: String,
value: Value,
}

pub struct Object {
fields: Vec<Field>,
}

#[allow(dead_code)]
impl Object {
pub fn new() -> Object {
Object { fields: vec![] }
}

pub fn is_empty(&self) -> bool {
self.fields.is_empty()
}

pub fn push(&mut self, tag: &str, value: Value) {
self.fields.push(Field {
tag: tag.to_string(),
value,
})
}

pub fn prepend(&mut self, tag: &str, value: Value) {
self.fields.insert(
0,
Field {
tag: tag.to_string(),
value,
},
)
}

pub fn push_o(&mut self, tag: &str, o: Object) {
self.push(tag, Value::O(o));
}

pub fn push_a(&mut self, tag: &str, a: Array) {
self.push(tag, Value::A(a));
}

pub fn push_s(&mut self, tag: &str, s: String) {
self.push(tag, Value::S(s));
}

pub fn prepend_s(&mut self, tag: &str, s: String) {
self.prepend(tag, Value::S(s));
}

pub fn push_u(&mut self, tag: &str, u: u64) {
self.push(tag, Value::U(u));
}

pub fn push_i(&mut self, tag: &str, i: i64) {
self.push(tag, Value::I(i));
}

pub fn push_f(&mut self, tag: &str, f: f64) {
self.push(tag, Value::F(f));
}
}

pub struct Array {
elements: Vec<Value>,
nonempty_base45: bool,
sep: String,
}

#[allow(dead_code)]
impl Array {
pub fn new() -> Array {
Array {
elements: vec![],
nonempty_base45: false,
sep: ",".to_string(),
}
}

pub fn from_vec(elements: Vec<Value>) -> Array {
Array {
elements,
nonempty_base45: false,
sep: ",".to_string(),
}
}

pub fn push(&mut self, value: Value) {
self.elements.push(value)
}

pub fn len(&self) -> usize {
self.elements.len()
}

pub fn push_o(&mut self, o: Object) {
self.push(Value::O(o));
}

pub fn push_s(&mut self, s: String) {
self.push(Value::S(s));
}

pub fn push_u(&mut self, u: u64) {
self.push(Value::U(u));
}

pub fn push_i(&mut self, i: i64) {
self.push(Value::I(i));
}

pub fn push_f(&mut self, f: f64) {
self.push(Value::F(f));
}

pub fn push_e(&mut self) {
self.push(Value::E());
}

// This creates a constraint that:
//
// - there must be at least one element
// - all elements must be Value::U
// - the array is encoded as an offsetted little-endian base45 string (below).
//
// This is an efficient and CSV-friendly encoding of a typical array of cpu-second data.
pub fn set_encode_nonempty_base45(&mut self) {
self.nonempty_base45 = true;
}

// Use sep as a CSV array separator instead of the default ",".
pub fn set_csv_separator(&mut self, sep: String) {
self.sep = sep;
}
}

// Write some data and ignore errors.

fn write_chars(writer: &mut dyn io::Write, s: &str) {
let _ = writer.write(s.as_bytes());
}

// JSON output follows the standard.

pub fn write_json(writer: &mut dyn io::Write, v: &Value) {
write_json_int(writer, v);
let _ = writer.write(&[b'\n']);
}

fn write_json_int(writer: &mut dyn io::Write, v: &Value) {
match v {
Value::A(a) => write_json_array(writer, a),
Value::O(o) => write_json_object(writer, o),
Value::S(s) => write_json_string(writer, s),
Value::U(u) => write_chars(writer, &format!("{u}")),
Value::I(i) => write_chars(writer, &format!("{i}")),
Value::F(f) => write_chars(writer, &format!("{f}")),
Value::E() => {}
}
}

fn write_json_array(writer: &mut dyn io::Write, a: &Array) {
if a.nonempty_base45 {
let us = a
.elements
.iter()
.map(|x| {
if let Value::U(u) = x {
*u
} else {
panic!("Not a Value::U")
}
})
.collect::<Vec<u64>>();
write_chars(writer, &encode_cpu_secs_base45el(&us));
return;
}

let _ = writer.write(&[b'[']);
let mut first = true;
for elt in &a.elements {
if !first {
let _ = writer.write(&[b',']);
}
write_json_int(writer, elt);
first = false;
}
let _ = writer.write(&[b']']);
}

fn write_json_object(writer: &mut dyn io::Write, o: &Object) {
let _ = writer.write(&[b'{']);
let mut first = true;
for fld in &o.fields {
if !first {
let _ = writer.write(&[b',']);
}
write_json_string(writer, &fld.tag);
let _ = writer.write(&[b':']);
write_json_int(writer, &fld.value);
first = false;
}
let _ = writer.write(&[b'}']);
}

fn write_json_string(writer: &mut dyn io::Write, s: &String) {
let _ = writer.write(&[b'"']);
write_chars(writer, &util::json_quote(&s));
let _ = writer.write(&[b'"']);
}

#[test]
pub fn test_json() {
let mut a = Array::new();
let mut o = Object::new();
o.push_o("o", Object::new());
o.push_a("a", Array::new());
o.push_s("s", r#"hello, "sir""#.to_string());
o.push_u("u", 123);
o.push_i("i", -12);
o.push_f("f", 12.5);
a.push_o(o);
a.push_e();
a.push_s(r#"stri\ng"#.to_string());
let expect =
concat!(r#"[{"o":{},"a":[],"s":"hello, \"sir\"","u":123,"i":-12,"f":12.5},,"stri\\ng"]"#,
"\n",
);
let mut output = Vec::new();
write_json(&mut output, &Value::A(a));
let got = String::from_utf8_lossy(&output);
assert!(expect == got);
}

// CSV:
//
// - an object is a comma-separated list of FIELDs
// - an array is an X-separated list of VALUEs (where X is comma by default but can be changed)
// - a TAG is an unquoted string
// - each FIELD is {TAG}={VALUE}
// - a VALUE is the string representation of the value
// - if the FIELD of an object or the VALUE of an array contains ',' or '"', then the FIELD or VALUE
// is prefixed and suffixed by '"' and any '"' in the original string is doubled.
//
// Note that the bare representation of a value of any kind is just the string representation of the
// value itself (unquoted), it's the inclusion in an object or array that forces the quoting.
//
// The format allows nesting but the number of " grows exponentially with the nesting level if array
// separators are not managed carefully. Also, custom array element separators are not handled
// specially by the quoting mechanism, effectively requiring each nesting level to have its own
// custom quoting mechanism and to avoid quoting chars used at outer levels. For data nested more
// than one level, and especially when those data include arbitrary strings, use JSON.

pub fn write_csv(writer: &mut dyn io::Write, v: &Value) {
write_chars(writer, &format_csv_value(v));
let _ = writer.write(&[b'\n']);
}

pub fn format_csv_value(v: &Value) -> String {
match v {
Value::A(a) => format_csv_array(a),
Value::O(o) => format_csv_object(o),
Value::S(s) => s.clone(),
Value::U(u) => format!("{u}"),
Value::I(i) => format!("{i}"),
Value::F(f) => format!("{f}"),
Value::E() => "".to_string(),
}
}

fn format_csv_object(o: &Object) -> String {
let mut first = true;
let mut s = "".to_string();
for fld in &o.fields {
if !first {
s += ","
}
let mut tmp = fld.tag.clone();
tmp += "=";
tmp += &format_csv_value(&fld.value);
s += &util::csv_quote(&tmp);
first = false;
}
return s;
}

fn format_csv_array(a: &Array) -> String {
if a.nonempty_base45 {
let us = a
.elements
.iter()
.map(|x| {
if let Value::U(u) = x {
*u
} else {
panic!("Not a Value::U")
}
})
.collect::<Vec<u64>>();
return encode_cpu_secs_base45el(&us);
}
let mut first = true;
let mut s = "".to_string();
for elt in &a.elements {
if !first {
s += &a.sep;
}
s += &util::csv_quote(&format_csv_value(elt));
first = false;
}
return s;
}

#[test]
pub fn test_csv() {
// The common (really only truly supported) case for CSV is that there's an object outermost.
let mut o = Object::new();
o.push_o("o", Object::new());
let mut aa = Array::new();
aa.push_i(1);
aa.push_e();
aa.push_i(2);
aa.set_csv_separator("|".to_string());
o.push_a("a", aa);
o.push_s("s", r#"hello, "sir""#.to_string());
o.push_u("u", 123);
o.push_i("i", -12);
o.push_f("f", 12.5);
let mut ab = Array::new();
ab.set_encode_nonempty_base45();
// See the encoding test further down for an explanation of the encoded value.
for x in vec![1, 30, 89, 12] {
ab.push_u(x);
}
o.push_a("x", ab);
let expect = concat!(r#"o=,a=1||2,"s=hello, ""sir""",u=123,i=-12,f=12.5,x=)(t*1b"#, "\n");
let mut output = Vec::new();
write_csv(&mut output, &Value::O(o));
let got = String::from_utf8_lossy(&output);
assert!(expect == got);
}

// Encode a nonempty u64 array compactly.
//
// The output must be ASCII text (32 <= c < 128), ideally without ',' or '"' or '\' or ' ' to not
// make it difficult for the various output formats we use. Also avoid DEL, because it is a weird
// control character.
//
// We have many encodings to choose from, see https://github.com/NordicHPC/sonar/issues/178.
//
// The values to be represented are always cpu seconds of active time since boot, one item per cpu,
// and it is assumed that they are roughly in the vicinity of each other (the largest is rarely more
// than 4x the smallest, say). The assumption does not affect correctness, only compactness.
//
// The encoding first finds the minimum input value and subtracts that from all entries. The
// minimum value, and all the entries, are then emitted as unsigned little-endian base-45 with the
// initial digit chosen from a different character set to indicate that it is initial.

fn encode_cpu_secs_base45el(cpu_secs: &[u64]) -> String {
let base = *cpu_secs
.iter()
.reduce(std::cmp::min)
.expect("Must have a non-empty array");
let mut s = encode_u64_base45el(base);
for x in cpu_secs {
s += encode_u64_base45el(*x - base).as_str();
}
s
}

// The only character unused by the encoding, other than the ones we're not allowed to use, is '='.
const BASE: u64 = 45;
const INITIAL: &[u8] = "(){}[]<>+-abcdefghijklmnopqrstuvwxyz!@#$%^&*_".as_bytes();
const SUBSEQUENT: &[u8] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ~|';:.?/`".as_bytes();

fn encode_u64_base45el(mut x: u64) -> String {
let mut s = String::from(INITIAL[(x % BASE) as usize] as char);
x /= BASE;
while x > 0 {
s.push(SUBSEQUENT[(x % BASE) as usize] as char);
x /= BASE;
}
s
}

#[test]
pub fn test_encoding() {
assert!(INITIAL.len() == BASE as usize);
assert!(SUBSEQUENT.len() == BASE as usize);
// This should be *1, *0, *29, *43, 1, *11 with * denoting an INITIAL char.
let v = vec![1, 30, 89, 12];
println!("{}", encode_cpu_secs_base45el(&v));
assert!(encode_cpu_secs_base45el(&v) == ")(t*1b");
}
357 changes: 149 additions & 208 deletions src/ps.rs
Original file line number Diff line number Diff line change
@@ -6,12 +6,13 @@ use crate::hostname;
use crate::interrupt;
use crate::jobs;
use crate::log;
use crate::output;
use crate::procfs;
use crate::procfsapi;
use crate::util::{csv_quote, three_places};
use crate::util::three_places;

use std::collections::{HashMap, HashSet};
use std::io::{self, Result, Write};
use std::io::{self, Write};
use std::path::PathBuf;

// The GpuSet has three states:
@@ -175,9 +176,15 @@ pub struct PsOptions<'a> {
pub exclude_commands: Vec<&'a str>,
pub lockdir: Option<String>,
pub load: bool,
pub json: bool,
}

pub fn create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timestamp: &str) {
pub fn create_snapshot(
writer: &mut dyn io::Write,
jobs: &mut dyn jobs::JobManager,
opts: &PsOptions,
timestamp: &str,
) {
// If a lock file was requested, create one before the operation, exit early if it already
// exists, and if we performed the operation, remove the file afterwards. Otherwise, just
// perform the operation.
@@ -234,7 +241,7 @@ pub fn create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timest
}

if !failed && !skip {
do_create_snapshot(jobs, opts, timestamp);
do_create_snapshot(writer, jobs, opts, timestamp);

// Testing code: If we got the lockfile and produced a report, wait 10s after producing
// it while holding onto the lockfile. It is then possible to run sonar in that window
@@ -262,11 +269,16 @@ pub fn create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timest
log::error("Unable to properly manage or delete lockfile");
}
} else {
do_create_snapshot(jobs, opts, timestamp);
do_create_snapshot(writer, jobs, opts, timestamp);
}
}

fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timestamp: &str) {
fn do_create_snapshot(
writer: &mut dyn io::Write,
jobs: &mut dyn jobs::JobManager,
opts: &PsOptions,
timestamp: &str,
) {
let no_gpus = empty_gpuset();
let mut proc_by_pid = ProcTable::new();

@@ -340,7 +352,7 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta
let mut gpu_status = GpuStatus::Ok;

let gpu_utilization: Vec<gpu::Process>;
let mut gpu_info: String = "".to_string();
let mut gpu_info: Option<output::Object> = None;
match gpu::probe() {
None => {}
Some(mut gpu) => {
@@ -349,18 +361,20 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta
gpu_status = GpuStatus::UnknownFailure;
}
Ok(ref cards) => {
let mut s = "".to_string();
let mut s = output::Object::new();
s = add_key(s, "fan%", cards, |c: &gpu::CardState| {
nonzero(c.fan_speed_pct as i64)
});
s = add_key(s, "mode", cards, |c: &gpu::CardState| {
if c.compute_mode == "Default" {
"".to_string()
output::Value::E()
} else {
c.compute_mode.clone()
output::Value::S(c.compute_mode.clone())
}
});
s = add_key(s, "perf", cards, |c: &gpu::CardState| c.perf_state.clone());
s = add_key(s, "perf", cards, |c: &gpu::CardState| {
output::Value::S(c.perf_state.clone())
});
// Reserved memory is really not interesting, it's possible it would have been
// interesting as part of the card configuration.
//s = add_key(s, "mreskib", cards, |c: &gpu::CardState| nonzero(c.mem_reserved_kib));
@@ -388,7 +402,9 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta
s = add_key(s, "memz", cards, |c: &gpu::CardState| {
nonzero(c.mem_clock_mhz.into())
});
gpu_info = s;
if !s.is_empty() {
gpu_info = Some(s);
}
}
}
match gpu.get_process_utilization(&user_by_pid) {
@@ -449,8 +465,6 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta

// Once we start printing we'll print everything and not check the interrupted flag any more.

let mut writer = io::stdout();

let hostname = hostname::get();
const VERSION: &str = env!("CARGO_PKG_VERSION");
let print_params = PrintParameters {
@@ -531,106 +545,123 @@ fn do_create_snapshot(jobs: &mut dyn jobs::JobManager, opts: &PsOptions, timesta
.filter(|proc_info| filter_proc(proc_info, &print_params))
.collect::<Vec<ProcInfo>>();

let mut did_print = false;
// Here JSON and CSV will diverge a little. The trick is this:
//
// - first generate all output lines w/o version, timestamp, hostname, per_cpu_secs, and gpu_info
// - then:
// - for json, create an object that has those common fields and an array of per-process data,
// this object is also the heartbeat / synthetic record
// - for csv:
// - if there are no records and must_print is true, synthesize one
// - if we want them, insert per_cpu_secs and gpu_info fields in the first record (there will be one)
// - attach version, timestamp and hostname to all records (ideally we would prepend...)
//
// Then print.

let mut records: Vec<output::Object> = vec![];
for c in candidates {
match print_record(
&mut writer,
&print_params,
&c,
if !did_print {
Some(&per_cpu_secs)
records.push(generate_candidate(&c));
}

if !opts.json {
let have_load_data = {
if print_params.opts.load {
!per_cpu_secs.is_empty() || gpu_info.is_some()
} else {
None
},
if !did_print { Some(&gpu_info) } else { None },
) {
Ok(did_print_one) => did_print = did_print_one || did_print,
Err(_) => {
// Discard the error: there's nothing very sensible we can do at this point if the
// write failed, and it will fail if we cut off a pipe, for example, see #132. I
// guess one can argue whether we should try the next record, but it seems sensible
// to just bail out and hope for the best.
break;
false
}
}
}
};

if !did_print && must_print {
// Print a synthetic record
let synth = ProcInfo {
user: "_sonar_",
_uid: 0,
command: "_heartbeat_",
pid: 0,
ppid: 0,
rolledup: 0,
is_system_job: true,
has_children: false,
job_id: 0,
cpu_percentage: 0.0,
cputime_sec: 0,
mem_percentage: 0.0,
mem_size_kib: 0,
rssanon_kib: 0,
gpu_cards: empty_gpuset(),
gpu_percentage: 0.0,
gpu_mem_percentage: 0.0,
gpu_mem_size_kib: 0,
gpu_status: GpuStatus::Ok,
if (must_print || have_load_data) && records.len() == 0 {
let mut fields = output::Object::new();
fields.push_s("user", "_sonar_".to_string());
fields.push_s("cmd", "_heartbeat_".to_string());
records.push(fields);
};
// Discard the error, see above.
let _ = print_record(
&mut writer,
&print_params,
&synth,
if !did_print {
Some(&per_cpu_secs)
} else {
None
},
if !did_print { Some(&gpu_info) } else { None },
);
}

// Discard the error code, see above.
let _ = writer.flush();
if print_params.opts.load {
if !per_cpu_secs.is_empty() {
let mut a = output::Array::from_vec(
per_cpu_secs
.iter()
.map(|x| output::Value::U(*x))
.collect::<Vec<output::Value>>(),
);
a.set_encode_nonempty_base45();
records[0].push_a("load", a);
}
if let Some(info) = gpu_info {
records[0].push_o("gpuinfo", info);
}
}

// Historically, these three fields were always first, and we have test cases that depend on
// "v=" being the very first field.
for r in &mut records {
r.prepend_s("host", print_params.hostname.to_string());
r.prepend_s("time", print_params.timestamp.to_string());
r.prepend_s("v", print_params.version.to_string());
}

for v in records {
output::write_csv(writer, &output::Value::O(v));
}
} else {
let mut datum = output::Object::new();
datum.push_s("v", print_params.version.to_string());
datum.push_s("time", print_params.timestamp.to_string());
datum.push_s("host", print_params.hostname.to_string());
if print_params.opts.load {
if !per_cpu_secs.is_empty() {
let a = output::Array::from_vec(
per_cpu_secs
.iter()
.map(|x| output::Value::U(*x))
.collect::<Vec<output::Value>>(),
);
datum.push_a("load", a);
}
if let Some(info) = gpu_info {
datum.push_o("gpuinfo", info);
}
}
let mut samples = output::Array::new();
for o in records {
samples.push_o(o);
}
datum.push_a("samples", samples);
output::write_json(writer, &output::Value::O(datum));
}
}

fn add_key(
mut s: String,
fn add_key<'a>(
mut s: output::Object,
key: &str,
cards: &[gpu::CardState],
extract: fn(&gpu::CardState) -> String,
) -> String {
let mut vs = "".to_string();
let mut any = false;
let mut first = true;
extract: fn(&gpu::CardState) -> output::Value,
) -> output::Object {
let mut vs = output::Array::new();
let mut any_nonempty = false;
vs.set_csv_separator("|".to_string());
for c in cards {
let v = extract(c);
if !first {
vs += "|";
}
if !v.is_empty() {
any = true;
vs = vs + &v;
if let output::Value::E() = v {
} else {
any_nonempty = true;
}
first = false;
vs.push(v);
}
if any {
if !s.is_empty() {
s += ",";
}
s + key + "=" + &vs
} else {
s
if any_nonempty {
s.push(key, output::Value::A(vs));
}
s
}

fn nonzero(x: i64) -> String {
fn nonzero(x: i64) -> output::Value {
if x == 0 {
"".to_string()
output::Value::E()
} else {
format!("{:?}", x)
output::Value::I(x)
}
}

@@ -700,160 +731,70 @@ struct PrintParameters<'a> {
opts: &'a PsOptions<'a>,
}

fn print_record(
writer: &mut dyn io::Write,
params: &PrintParameters,
proc_info: &ProcInfo,
per_cpu_secs: Option<&[u64]>,
gpu_info: Option<&str>,
) -> Result<bool> {
// Mandatory fields.

let mut fields = vec![
format!("v={}", params.version),
format!("time={}", params.timestamp),
format!("host={}", params.hostname),
format!("user={}", proc_info.user),
format!("cmd={}", proc_info.command),
];
fn generate_candidate(proc_info: &ProcInfo) -> output::Object {
let mut fields = output::Object::new();

fields.push_s("user", proc_info.user.to_string());
fields.push_s("cmd", proc_info.command.to_string());

// Only print optional fields whose values are not their defaults. The defaults are defined in
// README.md. The values there must agree with those used by Jobanalyzer's parser.

if proc_info.job_id != 0 {
fields.push(format!("job={}", proc_info.job_id));
fields.push_u("job", proc_info.job_id as u64);
}
if proc_info.rolledup == 0 && proc_info.pid != 0 {
// pid must be 0 for rolledup > 0 as there is no guarantee that there is any fixed
// representative pid for a rolled-up set of processes: the set can change from run to run,
// and sonar has no history.
fields.push(format!("pid={}", proc_info.pid));
fields.push_u("pid", proc_info.pid as u64);
}
if proc_info.ppid != 0 {
fields.push(format!("ppid={}", proc_info.ppid));
fields.push_u("ppid", proc_info.ppid as u64);
}
if proc_info.cpu_percentage != 0.0 {
fields.push(format!("cpu%={}", three_places(proc_info.cpu_percentage)));
fields.push_f("cpu%", three_places(proc_info.cpu_percentage));
}
if proc_info.mem_size_kib != 0 {
fields.push(format!("cpukib={}", proc_info.mem_size_kib));
fields.push_u("cpukib", proc_info.mem_size_kib as u64);
}
if proc_info.rssanon_kib != 0 {
fields.push(format!("rssanonkib={}", proc_info.rssanon_kib));
fields.push_u("rssanonkib", proc_info.rssanon_kib as u64);
}
if let Some(ref cards) = proc_info.gpu_cards {
if cards.is_empty() {
// Nothing
} else {
fields.push(format!(
"gpus={}",
fields.push_s(
"gpus",
cards
.iter()
.map(|&num| num.to_string())
.collect::<Vec<String>>()
.join(",")
))
.join(","),
);
}
} else {
fields.push("gpus=unknown".to_string());
fields.push_s("gpus", "unknown".to_string());
}
if proc_info.gpu_percentage != 0.0 {
fields.push(format!("gpu%={}", three_places(proc_info.gpu_percentage)));
fields.push_f("gpu%", three_places(proc_info.gpu_percentage));
}
if proc_info.gpu_mem_percentage != 0.0 {
fields.push(format!(
"gpumem%={}",
three_places(proc_info.gpu_mem_percentage)
));
fields.push_f("gpumem%", three_places(proc_info.gpu_mem_percentage));
}
if proc_info.gpu_mem_size_kib != 0 {
fields.push(format!("gpukib={}", proc_info.gpu_mem_size_kib));
fields.push_u("gpukib", proc_info.gpu_mem_size_kib as u64);
}
if proc_info.cputime_sec != 0 {
fields.push(format!("cputime_sec={}", proc_info.cputime_sec));
fields.push_u("cputime_sec", proc_info.cputime_sec as u64);
}
if proc_info.gpu_status != GpuStatus::Ok {
fields.push(format!("gpufail={}", proc_info.gpu_status as i32));
fields.push_u("gpufail", proc_info.gpu_status as u64);
}
if proc_info.rolledup > 0 {
fields.push(format!("rolledup={}", proc_info.rolledup));
}
if params.opts.load {
if let Some(cpu_secs) = per_cpu_secs {
if !cpu_secs.is_empty() {
fields.push(format!("load={}", encode_cpu_secs_base45el(cpu_secs)));
}
}
if let Some(gpu_info) = gpu_info {
if !gpu_info.is_empty() {
fields.push(format!("gpuinfo={gpu_info}"));
}
}
fields.push_u("rolledup", proc_info.rolledup as u64);
}

let mut s = "".to_string();
for f in fields {
if !s.is_empty() {
s += ","
}
s += &csv_quote(&f);
}
s += "\n";

let _ = writer.write(s.as_bytes())?;

Ok(true)
}

// Encode a nonempty u64 array compactly.
//
// The output must be ASCII text (32 <= c < 128), ideally without ',' or '"' or '\' or ' ' to not
// make it difficult for the various output formats we use. Also avoid DEL, because it is a weird
// control character.
//
// We have many encodings to choose from, see https://github.com/NordicHPC/sonar/issues/178.
//
// The values to be represented are always cpu seconds of active time since boot, one item per cpu,
// and it is assumed that they are roughly in the vicinity of each other (the largest is rarely more
// than 4x the smallest, say). The assumption does not affect correctness, only compactness.
//
// The encoding first finds the minimum input value and subtracts that from all entries. The
// minimum value, and all the entries, are then emitted as unsigned little-endian base-45 with the
// initial digit chosen from a different character set to indicate that it is initial.

fn encode_cpu_secs_base45el(cpu_secs: &[u64]) -> String {
let base = *cpu_secs
.iter()
.reduce(std::cmp::min)
.expect("Must have a non-empty array");
let mut s = encode_u64_base45el(base);
for x in cpu_secs {
s += encode_u64_base45el(*x - base).as_str();
}
s
}

// The only character unused by the encoding, other than the ones we're not allowed to use, is '='.
const BASE: u64 = 45;
const INITIAL: &[u8] = "(){}[]<>+-abcdefghijklmnopqrstuvwxyz!@#$%^&*_".as_bytes();
const SUBSEQUENT: &[u8] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ~|';:.?/`".as_bytes();

fn encode_u64_base45el(mut x: u64) -> String {
let mut s = String::from(INITIAL[(x % BASE) as usize] as char);
x /= BASE;
while x > 0 {
s.push(SUBSEQUENT[(x % BASE) as usize] as char);
x /= BASE;
}
s
}

#[test]
pub fn test_encoding() {
assert!(INITIAL.len() == BASE as usize);
assert!(SUBSEQUENT.len() == BASE as usize);
// This should be *1, *0, *29, *43, 1, *11 with * denoting an INITIAL char.
let v = vec![1, 30, 89, 12];
println!("{}", encode_cpu_secs_base45el(&v));
assert!(encode_cpu_secs_base45el(&v) == ")(t*1b");
fields
}
34 changes: 24 additions & 10 deletions src/slurmjobs.rs
Original file line number Diff line number Diff line change
@@ -2,8 +2,8 @@

use crate::command;
use crate::log;
use crate::output;
use crate::time;
use crate::util;

#[cfg(test)]
use std::cmp::min;
@@ -19,7 +19,12 @@ const TIMEOUT_S: u64 = 180;
// Same output format as sacctd, which uses this version number.
const VERSION: &str = "0.1.0";

pub fn show_slurm_jobs(window: &Option<u32>, span: &Option<String>) {
pub fn show_slurm_jobs(
writer: &mut dyn io::Write,
window: &Option<u32>,
span: &Option<String>,
json: bool,
) {
let (job_states, field_names) = parameters();

// Parse the options to compute the time range to pass to sacct.
@@ -59,9 +64,8 @@ pub fn show_slurm_jobs(window: &Option<u32>, span: &Option<String>) {
log::error(&format!("sacct failed: {:?}", e));
}
Ok(sacct_output) => {
let mut writer = io::stdout();
let local = time::now_local();
format_jobs(&mut writer, &sacct_output, &field_names, &local);
format_jobs(writer, &sacct_output, &field_names, &local, json);
}
}
}
@@ -139,6 +143,7 @@ fn format_jobs(
sacct_output: &str,
field_names: &[&str],
local: &libc::tm,
json: bool,
) {
// Fields that are dates that may be reinterpreted before transmission.
let date_fields = HashSet::from(["Start", "End", "Submit"]);
@@ -160,7 +165,8 @@ fn format_jobs(
field_store[field_names.len() - 1] = &jobname;
let fields = &field_store[..field_names.len()];

let mut output_line = "v=".to_string() + VERSION;
let mut output_line = output::Object::new();
output_line.push_s("v", VERSION.to_string());
for (i, name) in field_names.iter().enumerate() {
let mut val = fields[i].to_string();
let is_zero = val.is_empty()
@@ -179,12 +185,14 @@ fn format_jobs(
val = time::format_iso8601(&t).to_string()
}
}
output_line += ",";
output_line += &util::csv_quote(&(name.to_string() + "=" + &val));
output_line.push_s(name, val);
}
}
output_line += "\n";
let _ = writer.write(output_line.as_bytes());
if json {
output::write_json(writer, &output::Value::O(output_line));
} else {
output::write_csv(writer, &output::Value::O(output_line));
}
}
}

@@ -204,7 +212,7 @@ pub fn test_format_jobs() {
// The output below depends on us being in UTC+01:00 and not in dst so mock that.
local.tm_gmtoff = 3600;
local.tm_isdst = 0;
format_jobs(&mut output, sacct_output, &field_names, &local);
format_jobs(&mut output, sacct_output, &field_names, &local, false);
if output != expected.as_bytes() {
let xs = &output;
let ys = expected.as_bytes();
@@ -224,6 +232,12 @@ pub fn test_format_jobs() {
break;
}
}
println!("{} {}", xs.len(), ys.len());
if xs.len() > ys.len() {
println!("{}", String::from_utf8_lossy(&xs[ys.len()..]));
} else {
println!("{}", String::from_utf8_lossy(&ys[xs.len()..]));
}
assert!(false);
}
}
95 changes: 44 additions & 51 deletions src/sysinfo.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::gpu;
use crate::hostname;
use crate::log;
use crate::output;
use crate::procfs;
use crate::procfsapi;
use crate::util;

use std::io;

pub fn show_system(timestamp: &str) {
pub fn show_system(writer: &mut dyn io::Write, timestamp: &str, csv: bool) {
let fs = procfsapi::RealFS::new();
let mut writer = io::stdout();
match do_show_system(&mut writer, &fs, timestamp) {
match do_show_system(writer, &fs, timestamp, csv) {
Ok(_) => {}
Err(e) => {
log::error(&format!("sysinfo failed: {e}"));
@@ -24,6 +23,7 @@ fn do_show_system(
writer: &mut dyn io::Write,
fs: &dyn procfsapi::ProcfsAPI,
timestamp: &str,
csv: bool,
) -> Result<(), String> {
let (model, sockets, cores_per_socket, threads_per_core) = procfs::get_cpu_info(fs)?;
let mem_by = procfs::get_memtotal_kib(fs)? * 1024;
@@ -41,7 +41,9 @@ fn do_show_system(
} else {
""
};
let (gpu_desc, gpu_cards, gpumem_gb, gpu_info) = if !cards.is_empty() {

let mut gpu_info = output::Array::new();
let (gpu_desc, gpu_cards, gpumem_gb) = if !cards.is_empty() {
// Sort cards
cards.sort_by(|a: &gpu::Card, b: &gpu::Card| {
if a.model == b.model {
@@ -78,11 +80,7 @@ fn do_show_system(
}

// Compute the info blobs
let mut gpu_info = "".to_string();
for c in &cards {
if !gpu_info.is_empty() {
gpu_info += ","
}
let gpu::Card {
bus_addr,
index,
@@ -98,56 +96,51 @@ fn do_show_system(
max_ce_clock_mhz,
max_mem_clock_mhz,
} = c;
let manufacturer = util::json_quote(&manufacturer);
let bus_addr = util::json_quote(bus_addr);
let model = util::json_quote(model);
let arch = util::json_quote(arch);
let driver = util::json_quote(driver);
let firmware = util::json_quote(firmware);
gpu_info += &format!(
r###"
{{"bus_addr":"{bus_addr}", "index":{index}, "uuid":"{uuid}",
"manufacturer":"{manufacturer}", "model":"{model}", "arch":"{arch}", "driver":"{driver}", "firmware":"{firmware}",
"mem_size_kib":{mem_size_kib},
"power_limit_watt":{power_limit_watt}, "max_power_limit_watt":{max_power_limit_watt}, "min_power_limit_watt":{min_power_limit_watt},
"max_ce_clock_mhz":{max_ce_clock_mhz}, "max_mem_clock_mhz":{max_mem_clock_mhz}}}"###
);
let mut gpu = output::Object::new();
gpu.push_s("bus_addr", bus_addr.to_string());
gpu.push_i("index", *index as i64);
gpu.push_s("uuid", uuid.to_string());
gpu.push_s("manufacturer", manufacturer.clone());
gpu.push_s("model", model.to_string());
gpu.push_s("arch", arch.to_string());
gpu.push_s("driver", driver.to_string());
gpu.push_s("firmware", firmware.to_string());
gpu.push_i("mem_size_kib", *mem_size_kib);
gpu.push_i("power_limit_watt", *power_limit_watt as i64);
gpu.push_i("max_power_limit_watt", *max_power_limit_watt as i64);
gpu.push_i("min_power_limit_watt", *min_power_limit_watt as i64);
gpu.push_i("max_ce_clock_mhz", *max_ce_clock_mhz as i64);
gpu.push_i("max_mem_clock_mhz", *max_mem_clock_mhz as i64);
gpu_info.push_o(gpu);
}

(gpu_desc, gpu_cards, total_mem_by / GIB as i64, gpu_info)
(gpu_desc, gpu_cards, total_mem_by / GIB as i64)
} else {
("".to_string(), 0, 0, "".to_string())
("".to_string(), 0, 0)
};
let timestamp = util::json_quote(timestamp);
let hostname = util::json_quote(&hostname);
let description = util::json_quote(&format!(
"{sockets}x{cores_per_socket}{ht} {model}, {mem_gib} GiB{gpu_desc}"
));
let cpu_cores = sockets * cores_per_socket * threads_per_core;

// Note the field names here are used by decoders that are developed separately, and they should
// be considered set in stone.

let version = util::json_quote(env!("CARGO_PKG_VERSION"));
let s = format!(
r#"{{
"version": "{version}",
"timestamp": "{timestamp}",
"hostname": "{hostname}",
"description": "{description}",
"cpu_cores": {cpu_cores},
"mem_gb": {mem_gib},
"gpu_cards": {gpu_cards},
"gpumem_gb": {gpumem_gb},
"gpu_info": [{gpu_info}]
}}
"#
let mut sysinfo = output::Object::new();
sysinfo.push_s("version", env!("CARGO_PKG_VERSION").to_string());
sysinfo.push_s("timestamp", timestamp.to_string());
sysinfo.push_s("hostname", hostname);
sysinfo.push_s(
"description",
format!("{sockets}x{cores_per_socket}{ht} {model}, {mem_gib} GiB{gpu_desc}"),
);
sysinfo.push_i("cpu_cores", cpu_cores as i64);
sysinfo.push_i("mem_gb", mem_gib);
sysinfo.push_i("gpu_cards", gpu_cards as i64);
sysinfo.push_i("gpumem_gb", gpumem_gb);
if gpu_info.len() > 0 {
sysinfo.push_a("gpu_info", gpu_info);
}

// Ignore I/O errors.

let _ = writer.write(s.as_bytes());
let _ = writer.flush();
if csv {
output::write_csv(writer, &output::Value::O(sysinfo));
} else {
output::write_json(writer, &output::Value::O(sysinfo));
}
Ok(())
}

0 comments on commit b263d36

Please sign in to comment.