Skip to content

Commit

Permalink
Read from standard input (#66)
Browse files Browse the repository at this point in the history
* Reapply "Read from standard input"

This reverts commit 88dee6a.

* Allow stdin or any other query type to exit

* Clean-up
  • Loading branch information
spacez320 authored Nov 16, 2024
1 parent 88dee6a commit c40d110
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 30 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ shui -query 'whoami'
# Execute `uptime` continuously, printing results to the console, without using persistence.
shui \
-count -1 \
-query 'uptime' \
-store=none
-history=false \
-query 'uptime'

# Get the size of an NVME disk's used space and output it to a table with the specific label "NVME
# Used Space".
Expand All @@ -92,6 +92,12 @@ shui \
-query 'df -h | grep nvme0n1p2 | awk '\''{print $3}'\'''
```

You can also execute Shui from standard input, which works like query mode.

```sh
while true; do uptime; sleep 1; done | shui
```

### Integrations

Shui can send its data off to external systems, making it useful as an ad-hoc metrics or log
Expand Down Expand Up @@ -199,8 +205,7 @@ shui -query 'uptime | tr -d ","' -expr 'get(result, "9") * 10'

# Cumulatively sum 5m CPU average. Note that we need to account for prevResult being empty and we
# must convert the prevResult from a string to a float.
shui -query 'uptime | tr -d ","' -filters 9 -expr 'get(result, "0") + ("0" in prevResult?
float(get(prevResult, "0")) : 0)'
shui -query 'uptime | tr -d ","' -filters 9 -expr 'get(result, "0") + ("0" in prevResult? float(get(prevResult, "0")) : 0)'
```

See: <https://expr-lang.org/docs/language-definition>
Expand All @@ -222,7 +227,6 @@ Planned improvements include things like:
- [x] ... and Elasticsearch.
- [ ] More detailed and varied display modes.
- [ ] Historical querying.
- [ ] Beter management of textual data, including diffs.

Similar Projects
----------------
Expand Down
22 changes: 17 additions & 5 deletions cmd/shui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (
promExporterAddr string // Address for Prometheus metrics page.
promPushgatewayAddr string // Address for Prometheus Pushgateway.
queries multiArg // Queries to execute.
readStdin bool // Whether input comes from standard input.
showHelp bool // Whether or not to show helpt
showLogs bool // Whether or not to show logs.
showStatus bool // Whether or not to show statuses.
Expand Down Expand Up @@ -156,11 +157,21 @@ func main() {
os.Exit(0)
}

// Check for required flags.
if len(queries) == 0 {
flag.Usage()
fmt.Fprintf(os.Stderr, "Missing required argument -query\n")
os.Exit(1)
// Detect if running from standard input.
f, err := os.Stdin.Stat()
if err != nil {
panic(err)
}
if f.Mode()&os.ModeNamedPipe != 0 {
// We are reading standard input.
readStdin = true
} else {
// There is no standard input--queries are needed.
if len(queries) == 0 {
flag.Usage()
fmt.Fprintf(os.Stderr, "Missing required argument -query\n")
os.Exit(1)
}
}

// Set-up logging.
Expand Down Expand Up @@ -207,6 +218,7 @@ func main() {
PrometheusExporterAddr: promExporterAddr,
PushgatewayAddr: promPushgatewayAddr,
Queries: queries,
ReadStdin: readStdin,
}

// Build display configuration.
Expand Down
2 changes: 1 addition & 1 deletion internal/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Config struct {
Count, Delay, DisplayMode, Mode int
ElasticsearchAddr, ElasticsearchIndex, ElasticsearchPassword, ElasticsearchUser string
Expressions, Filters, Labels, Queries []string
History, LogMulti, Silent bool
History, LogMulti, ReadStdin, Silent bool
LogLevel string
Port string
PrometheusExporterAddr string
Expand Down
84 changes: 68 additions & 16 deletions internal/lib/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package lib

import (
"bufio"
"io"
"log/slog"
"os"
"os/exec"
"strconv"
"time"
Expand All @@ -14,6 +16,11 @@ import (
const (
QUERY_MODE_COMMAND int = iota + 1 // Queries are commands.
QUERY_MODE_PROFILE // Queries are PIDs to profile.
QUERY_MODE_STDIN // Results are fron stdin.
)

var (
stdinScanner = bufio.NewScanner(os.Stdin) // Scanner for standard input queries.
)

// Wrapper for query execution.
Expand All @@ -22,7 +29,7 @@ func runQuery(
attempts, delay int,
history bool,
doneChan, pauseChan chan bool,
queryFunc func(string, bool),
queryFunc func(string, bool) bool,
) {
// This loop executes as long as attempts has not been reached, or indefinitely if attempts is
// less than zero.
Expand All @@ -33,7 +40,11 @@ func runQuery(
// pause channel.
<-pauseChan
default:
queryFunc(query, history)
if !queryFunc(query, history) {
// In the event that queryFunc returns false, allow this to signal query completion, even if
// attempts are not satisifed.
attempts = 0
}

// This is not the last execution--add a delay.
if i != attempts {
Expand All @@ -42,11 +53,12 @@ func runQuery(
}
}

slog.Debug("Query done", "query", query)
doneChan <- true
}

// Executes a query as a command to exec.
func runQueryExec(query string, history bool) {
func runQueryExec(query string, history bool) bool {
slog.Debug("Executing query", "query", query)

// Prepare query execution.
Expand Down Expand Up @@ -79,15 +91,34 @@ func runQueryExec(query string, history bool) {

// Clean-up.
cmd.Wait()

return true
}

// Executes a query as a process to profile.
func runQueryProfile(pid string, history bool) {
func runQueryProfile(pid string, history bool) bool {
slog.Debug("Profiling pid", "pid", pid)

pidInt, err := strconv.Atoi(pid)
e(err)
AddResult(pid, runProfile(pidInt), history)

return true
}

// Reads standard input for results.
func runQueryStdin(query string, history bool) bool {
var success = true

slog.Debug("Reading stdin")

if stdinScanner.Scan() {
AddResult(query, stdinScanner.Text(), history)
} else {
success = false
}

return success
}

// Entrypoint for 'query' mode.
Expand All @@ -107,21 +138,20 @@ func Query(
// Start the RPC server.
initServer(port)

for _, query := range queries {
// Initialize pause channels.
pauseQueryChans[query] = make(chan bool)
}

go func() {
// Wait for result consumption to become ready.
slog.Debug("Waiting for results readiness")
<-resultsReadyChan

for _, query := range queries {
// Execute the queries.
switch queryMode {
case QUERY_MODE_COMMAND:
slog.Debug("Executing in query mode command")
// Execute the queries.
switch queryMode {
case QUERY_MODE_COMMAND:
slog.Debug("Executing in query mode command")

for _, query := range queries {
// Initialize pause channels.
pauseQueryChans[query] = make(chan bool)

go runQuery(
query,
attempts,
Expand All @@ -131,8 +161,14 @@ func Query(
pauseQueryChans[query],
runQueryExec,
)
case QUERY_MODE_PROFILE:
slog.Debug("Executing in query mode profile")
}
case QUERY_MODE_PROFILE:
slog.Debug("Executing in query mode profile")

for _, query := range queries {
// Initialize pause channels.
pauseQueryChans[query] = make(chan bool)

go runQuery(
query,
attempts,
Expand All @@ -143,6 +179,22 @@ func Query(
runQueryProfile,
)
}
case QUERY_MODE_STDIN:
// When executing by reading standard input, there is only ever one "query".
slog.Debug("Executing in query mode stdin")

// Initialize pause channels.
pauseQueryChans[queries[0]] = make(chan bool)

go runQuery(
queries[0],
attempts,
delay,
history,
doneQueryChan,
pauseQueryChans[queries[0]],
runQueryStdin,
)
}
}()

Expand Down
27 changes: 24 additions & 3 deletions shui.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const (
MODE_READ // For running in 'read' mode.
)

const (
STDIN_QUERY_NAME = "stdin" // Named query value for reading stdin.
)

var (
ctx = context.Background() // Initialize context.
)
Expand All @@ -37,8 +41,28 @@ func Run(config lib.Config, displayConfig lib.DisplayConfig) {
slog.Debug("Running with config", "config", config)
slog.Debug("Running with display config", "displayConfig", displayConfig)

// Define a special query value when reading standard input.
if config.ReadStdin {
config.Queries = []string{STDIN_QUERY_NAME}
}

// Execute the specified mode.
switch {
case config.ReadStdin:
slog.Debug("Reading from standard input")

doneQueriesChan, pauseQueryChans = lib.Query(
lib.QUERY_MODE_STDIN,
-1, // Stdin mode is always continuous and the query itself must detect EOF.
config.Delay,
config.Queries,
config.Port,
config.History,
resultsReadyChan,
)

// Use labels that match the defined value for queries.
ctx = context.WithValue(ctx, "labels", config.Labels)
case config.Mode == int(MODE_PROFILE):
slog.Debug("Executing in profile mode")

Expand Down Expand Up @@ -98,9 +122,6 @@ func Run(config lib.Config, displayConfig lib.DisplayConfig) {
)
}

// XXX This isn't strictly necessary, mainly because getting here shouldn't be possible
// (`lib.Results` does not have any intentional return condition), but it's being left here in
// case in the future we do want to control for query completion.
<-doneQueriesChan
slog.Debug("Received the last result, nothing left to do")
close(doneQueriesChan)
Expand Down

0 comments on commit c40d110

Please sign in to comment.