Skip to content

Commit

Permalink
better nats context logic (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
jordan-rash authored Apr 30, 2024
1 parent 60c834a commit 855a866
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 75 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,13 @@ nex node up \

Valid exporters are `http` and `prometheus`. The file exporter will write metrics to a file in the current working directory called `metrics.log`.

## Using NATS Context with `nex`
In order to use your NATS context with Nex, you will need to set the `XDG_CONFIG_HOME` environment variable. On linux, this is typically `$HOME/.config`, but specifically, it will be wherever your `nats/` configuration directory is located. This will allow `nex` to use the same configuration as your NATS context.

```bash
// Example usage
XDG_CONFIG_HOME=/home/jordan/.config sudo -E nex node up --loglevel debug --context default
```

## Contributing
For information on how to contribute to Nex, please read our [contributing](./CONTRIBUTING.md) guide.
99 changes: 44 additions & 55 deletions internal/models/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package models

import (
"fmt"
"log/slog"
"net/url"
"os"
"strings"
"time"

"github.com/nats-io/jsm.go/natscontext"
Expand Down Expand Up @@ -64,10 +63,6 @@ type Options struct {
LogJSON bool
// Name or path to a configuration context
ConfigurationContext string
// Effective configuration
Configuration *natscontext.Context
// Indicates whether contexts should not be used
SkipContexts bool
}

type RunOptions struct {
Expand Down Expand Up @@ -131,70 +126,64 @@ func (c *NodeOptions) Validate() bool {
return len(c.Errors) == 0
}

func GenerateConnectionFromOpts(opts *Options) (*nats.Conn, error) {
ctxOpts := []natscontext.Option{
natscontext.WithServerURL(opts.Servers),
natscontext.WithCreds(opts.Creds),
natscontext.WithNKey(opts.Nkey),
natscontext.WithCertificate(opts.TlsCert),
natscontext.WithKey(opts.TlsKey),
natscontext.WithCA(opts.TlsCA),
}

if opts.TlsFirst {
ctxOpts = append(ctxOpts, natscontext.WithTLSHandshakeFirst())
}
func GenerateConnectionFromOpts(opts *Options, logger *slog.Logger) (*nats.Conn, error) {
if opts.ConfigurationContext != "" {
if !natscontext.IsKnown(opts.ConfigurationContext) {
logger.Error("Unknown nats context provided", slog.String("context", opts.ConfigurationContext))
return nil, fmt.Errorf("unknown context provided")
}

if opts.Username != "" && opts.Password == "" {
ctxOpts = append(ctxOpts, natscontext.WithToken(opts.Username))
} else {
ctxOpts = append(ctxOpts, natscontext.WithUser(opts.Username), natscontext.WithPassword(opts.Password))
}
p, _ := natscontext.ContextPath(opts.ConfigurationContext)
logger.Debug("Using nats context for connection details", slog.String("path", p))

var err error
conn, err := natscontext.Connect(opts.ConfigurationContext, nats.Name(opts.ConnectionName))
logger.Info("Connected to NATS server", slog.String("server", conn.ConnectedUrlRedacted()), slog.String("nats_context", opts.ConfigurationContext))

exist, _ := fileAccessible(opts.ConfigurationContext)
return conn, err
}

if exist && strings.HasSuffix(opts.ConfigurationContext, ".json") {
opts.Configuration, err = natscontext.NewFromFile(opts.ConfigurationContext, ctxOpts...)
} else {
opts.Configuration, err = natscontext.New(opts.ConfigurationContext, !opts.SkipContexts, ctxOpts...)
natsOpts := []nats.Option{
nats.Name(opts.ConnectionName),
}

if err != nil {
return nil, err
if opts.Creds != "" {
natsOpts = append(natsOpts, nats.UserCredentials(opts.Creds))
}

conn, err := opts.Configuration.Connect(nats.Name(
func() string {
if opts.ConnectionName == "" {
return "nex"
}
return opts.ConnectionName
}(),
))
if err != nil {
return nil, err
if opts.Nkey != "" {
natsOpts = append(natsOpts, nats.Nkey(opts.Nkey, nats.DefaultOptions.SignatureCB))
}

return conn, nil
}
if opts.TlsCert != "" && opts.TlsKey != "" {
natsOpts = append(natsOpts, nats.ClientCert(opts.TlsCert, opts.TlsKey))
}

func fileAccessible(f string) (bool, error) {
stat, err := os.Stat(f)
if err != nil {
return false, err
if opts.TlsCA != "" {
natsOpts = append(natsOpts, nats.RootCAs(opts.TlsCA))
}

if stat.IsDir() {
return false, fmt.Errorf("is a directory")
if opts.TlsFirst {
natsOpts = append(natsOpts, nats.TLSHandshakeFirst())
}

file, err := os.Open(f)
if err != nil {
return false, err
if opts.Username != "" && opts.Password == "" {
natsOpts = append(natsOpts, nats.Token(opts.Username))
} else {
natsOpts = append(natsOpts, nats.UserInfo(opts.Username, opts.Password))
}
file.Close()

return true, nil
conn, err := nats.Connect(opts.Servers, natsOpts...)
logger.Debug("Connected to NATS server",
slog.String("server", conn.ConnectedUrlRedacted()),
slog.String("name", opts.ConnectionName),
slog.String("creds", opts.Creds),
slog.String("nkey", opts.Nkey),
slog.String("tls_cert", opts.TlsCert),
slog.String("tls_key", opts.TlsKey),
slog.String("tls_ca", opts.TlsCA),
slog.Bool("tls_first", opts.TlsFirst),
slog.String("username", opts.Username),
)

return conn, err
}
15 changes: 11 additions & 4 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (n *Node) Start() {
case <-heartbeat.C:
_ = n.publishHeartbeat()
case sig := <-n.sigs:
n.log.Debug("received signal: %s", sig)
n.log.Debug("received signal", slog.Any("signal", sig))
n.shutdown()
case <-n.ctx.Done():
n.shutdown()
Expand Down Expand Up @@ -267,7 +267,7 @@ func (n *Node) init() error {
}

// setup NATS connection
n.nc, _err = models.GenerateConnectionFromOpts(n.opts)
n.nc, _err = models.GenerateConnectionFromOpts(n.opts, n.log)
if _err != nil {
n.log.Error("Failed to connect to NATS server", slog.Any("err", _err))
err = errors.Join(err, fmt.Errorf("failed to connect to NATS server: %s", _err))
Expand Down Expand Up @@ -321,7 +321,6 @@ func (n *Node) startInternalNATS() error {
if err != nil {
return err
}

n.natsint.Start()

clientUrl, err := url.Parse(n.natsint.ClientURL())
Expand All @@ -335,11 +334,19 @@ func (n *Node) startInternalNATS() error {
}
n.config.InternalNodePort = &p

n.ncint, err = nats.Connect(n.natsint.ClientURL())
n.ncint, err = nats.Connect("", nats.InProcessServer(n.natsint))
if err != nil {
n.log.Error("Failed to connect to internal nats", slog.Any("err", err), slog.Any("internal_url", clientUrl), slog.Bool("with_jetstream", n.natsint.JetStreamEnabled()))
return fmt.Errorf("failed to connect to internal nats: %s", err)
}

rtt, err := n.ncint.RTT()
if err != nil {
n.log.Warn("Failed get internal nats RTT", slog.Any("err", err), slog.Any("internal_url", clientUrl))
} else {
n.log.Debug("Internal NATS RTT", slog.String("rtt", rtt.String()), slog.Bool("with_jetstream", n.natsint.JetStreamEnabled()))
}

jsCtx, err := n.ncint.JetStream()
if err != nil {
return fmt.Errorf("failed to establish jetstream connection to internal nats: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion nex/devrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {
// Attempts to "deploy a file" by finding a suitable target and publishing the workload to an ad-hoc created bucket
// and using default issuer and publisher keys stored in ~/.nex. This should be as easy as typing "nex devrun ./amazingapp env1=foo env2=bar"
func RunDevWorkload(ctx context.Context, logger *slog.Logger) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, logger)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions nex/nex.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func init() {
ncli.Flag("logcolor", "Prints text logs with color").Envar("NEX_LOG_COLORIZED").Default("false").UnNegatableBoolVar(&Opts.LogsColorized)
ncli.Flag("timeformat", "How time is formatted in logger").Envar("NEX_LOG_TIMEFORMAT").Default("DateTime").EnumVar(&Opts.LogTimeFormat, "DateOnly", "DateTime", "Stamp", "RFC822", "RFC3339")
ncli.Flag("context", "Configuration context").Envar("NATS_CONTEXT").PlaceHolder("NAME").StringVar(&Opts.ConfigurationContext)
ncli.Flag("no-context", "Disable NATS context discovery").UnNegatableBoolVar(&Opts.SkipContexts)
ncli.Flag("conn-name", "Name of NATS connection").Default(func() string {
if VERSION != "development" {
return "nex-" + VERSION
Expand Down Expand Up @@ -165,17 +164,17 @@ func main() {
handlerOpts = append(handlerOpts, shandler.WithTimeFormat(time.DateTime))
}

var logger *slog.Logger
if Opts.LogJSON {
handlerOpts = append(handlerOpts, shandler.WithJSON())
}
if Opts.LogsColorized {
handlerOpts = append(handlerOpts, shandler.WithColor())
}

logger := slog.New(shandler.NewHandler(handlerOpts...))
switch Opts.Logger {
case "nats":
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, logger)
if err != nil {
break
}
Expand Down
16 changes: 8 additions & 8 deletions nex/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (

// Uses a control API client to request a node list from a NATS environment
func ListNodes(ctx context.Context) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nc, err := models.GenerateConnectionFromOpts(Opts, log)
if err != nil {
return err
}
log := slog.New(slog.NewJSONHandler(io.Discard, nil))

nodeClient := controlapi.NewApiClient(nc, Opts.Timeout, log)
nodes, err := nodeClient.ListAllNodes()
Expand All @@ -33,11 +33,11 @@ func ListNodes(ctx context.Context) error {
}

func ListWorkloads(ctx context.Context) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nc, err := models.GenerateConnectionFromOpts(Opts, log)
if err != nil {
return err
}
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace, log)
nodes, err := nodeClient.ListWorkloads(strings.TrimSpace(RunOpts.Name))
if err != nil {
Expand All @@ -49,12 +49,12 @@ func ListWorkloads(ctx context.Context) error {
}

func LameDuck(ctx context.Context, logger *slog.Logger) error {
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nodeId := RunOpts.TargetNode
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, log)
if err != nil {
return err
}
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace, log)
_, err = nodeClient.EnterLameDuck(nodeId)
if err != nil {
Expand All @@ -68,11 +68,11 @@ func LameDuck(ctx context.Context, logger *slog.Logger) error {

// Uses a control API client to retrieve info on a single node
func NodeInfo(ctx context.Context, nodeid string) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nc, err := models.GenerateConnectionFromOpts(Opts, log)
if err != nil {
return err
}
log := slog.New(slog.NewJSONHandler(io.Discard, nil))
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace, log)
nodeInfo, err := nodeClient.NodeInfo(nodeid)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions nex/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// Issues a request to stop a running workload
func StopWorkload(ctx context.Context, logger *slog.Logger) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func StopWorkload(ctx context.Context, logger *slog.Logger) error {

// Submits a run request for the given workload to the specified node
func RunWorkload(ctx context.Context, logger *slog.Logger) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, logger)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions nex/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func WatchEvents(ctx context.Context, logger *slog.Logger) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -44,7 +44,7 @@ func WatchEvents(ctx context.Context, logger *slog.Logger) error {
}

func WatchLogs(ctx context.Context, logger *slog.Logger) error {
nc, err := models.GenerateConnectionFromOpts(Opts)
nc, err := models.GenerateConnectionFromOpts(Opts, logger)
if err != nil {
return err
}
Expand Down

0 comments on commit 855a866

Please sign in to comment.