Skip to content

Commit

Permalink
Merge pull request #31 from DavidZbarsky-at/master
Browse files Browse the repository at this point in the history
Add some knobs to configure logging levels
  • Loading branch information
dzbarsky authored Sep 20, 2024
2 parents 865c7ad + 55125a1 commit 6d19cda
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 38 deletions.
6 changes: 6 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ bool_flag(
build_setting_default = False,
visibility = ["//visibility:public"],
)

bool_flag(
name = "terse_svcinit_output",
build_setting_default = False,
visibility = ["//visibility:public"],
)
10 changes: 9 additions & 1 deletion cmd/svcinit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func must(err error) {
}
}

var terseOutput = os.Getenv("SVCINIT_TERSE_OUTPUT") == "True"

func main() {
log.SetFlags(log.Ltime | log.Lmicroseconds)

serviceSpecsPath, err := runfiles.Rlocation(os.Getenv("SVCINIT_SERVICE_SPECS_RLOCATION_PATH"))
must(err)

Expand All @@ -47,6 +51,7 @@ func main() {

enablePerServiceReload := os.Getenv("SVCINIT_ENABLE_PER_SERVICE_RELOAD") == "True"
allowConfiguringTmpdir := os.Getenv("SVCINIT_ALLOW_CONFIGURING_TMPDIR") == "True"

shouldHotReload := os.Getenv("IBAZEL_NOTIFY_CHANGES") == "y"
testLabel := os.Getenv("TEST_TARGET")

Expand Down Expand Up @@ -356,7 +361,10 @@ func assignPorts(
qualifiedPortName += ":" + portName
}

fmt.Printf("Assigning port %s to %s\n", port, qualifiedPortName)
if !terseOutput {
log.Printf("Assigning port %s to %s\n", port, qualifiedPortName)
}

ports.Set(qualifiedPortName, port)

if !spec.SoReuseportAware {
Expand Down
24 changes: 9 additions & 15 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/binary"
"io"
"log"
"strconv"
)

Expand Down Expand Up @@ -101,15 +102,13 @@ func Colorize(s string) string {

func New(prefix string, color string, out io.Writer) io.WriteCloser {
return &Logger{
prefix: []byte(color + prefix + Reset),
out: out,
out: log.New(out, color+prefix+Reset, log.Ltime|log.Lmicroseconds|log.Lmsgprefix),
}
}

type Logger struct {
prefix []byte
out io.Writer
buf bytes.Buffer
out *log.Logger
buf bytes.Buffer
}

func (l *Logger) Write(data []byte) (int, error) {
Expand All @@ -119,15 +118,11 @@ func (l *Logger) Write(data []byte) (int, error) {
for i, b := range data {
if b == '\n' {
line := append(
append(l.prefix, l.buf.Bytes()...),
l.buf.Bytes(),
data[lastNewline:i+1]...,
)
n, err := l.out.Write(line)
written += n
if err != nil {
return written, err
}

l.out.Print(string(line))
written += len(line)
l.buf.Reset()
lastNewline = i + 1
}
Expand All @@ -138,7 +133,6 @@ func (l *Logger) Write(data []byte) (int, error) {
}

func (l *Logger) Close() error {
data := append(l.prefix, l.buf.Bytes()...)
_, err := l.out.Write(data)
return err
l.out.Print(l.buf.String())
return nil
}
10 changes: 10 additions & 0 deletions private/itest.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _run_environment(ctx, service_specs_file):
"SVCINIT_ENABLE_PER_SERVICE_RELOAD": str(ctx.attr._enable_per_service_reload[BuildSettingInfo].value),
"SVCINIT_GET_ASSIGNED_PORT_BIN_RLOCATION_PATH": to_rlocation_path(ctx, ctx.executable._get_assigned_port),
"SVCINIT_SERVICE_SPECS_RLOCATION_PATH": to_rlocation_path(ctx, service_specs_file),
"SVCINIT_TERSE_OUTPUT": str(ctx.attr._terse_svcinit_output[BuildSettingInfo].value),
}

def _services_runfiles(ctx, services_attr_name = "services"):
Expand Down Expand Up @@ -85,6 +86,9 @@ _svcinit_attrs = {
"_allow_configuring_tmpdir": attr.label(
default = "//:allow_configuring_tmpdir",
),
"_terse_svcinit_output": attr.label(
default = "//:terse_svcinit_output",
),
}

_itest_binary_attrs = {
Expand Down Expand Up @@ -171,6 +175,7 @@ def _validate_duration(name, s):
fail("Invalid unit for %s: %s" % (name, unit))

def _itest_service_impl(ctx):
_validate_duration("expected_start_duration", ctx.attr.expected_start_duration)
_validate_duration("health_check_interval", ctx.attr.health_check_interval)

if ctx.attr.health_check_timeout:
Expand All @@ -186,6 +191,7 @@ def _itest_service_impl(ctx):
"so_reuseport_aware": ctx.attr.so_reuseport_aware,
"named_ports": ctx.attr.named_ports,
"hot_reloadable": ctx.attr.hot_reloadable,
"expected_start_duration": ctx.attr.expected_start_duration,
"health_check_interval": ctx.attr.health_check_interval,
"health_check_timeout": ctx.attr.health_check_timeout,
}
Expand Down Expand Up @@ -240,6 +246,10 @@ _itest_service_attrs = _itest_binary_attrs | {
"health_check_args": attr.string_list(
doc = """Arguments to pass to the health_check binary. The various defined ports will be substituted prior to being given to the health_check binary.""",
),
"expected_start_duration": attr.string(
default = "0s",
doc = "How long the service expected to take before passing a healthcheck. Any failing health checks before this duration elapses will not be logged.",
),
"health_check_interval": attr.string(
default = "200ms",
doc = "The duration between each health check. The syntax is based on common time duration with a number, followed by the time unit. For example, `200ms`, `1s`, `2m`, `3h`, `4d`.",
Expand Down
9 changes: 8 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// test process (svcinit) and all its children.
// If we were to start new process groups in tests, we could leak children (at least on Mac).
var shouldUseProcessGroups = runtime.GOOS != "windows" && os.Getenv("BAZEL_TEST") != "1"
var terseOutput = os.Getenv("SVCINIT_TERSE_OUTPUT") == "True"

type ServiceSpecs = map[string]svclib.VersionedServiceSpec

Expand Down Expand Up @@ -55,7 +56,13 @@ func (r *Runner) StartAll(serviceErrCh chan error) ([]topological.Task, error) {
if service.Type == "group" {
return nil
}
log.Printf("Starting %s %v\n", colorize(service.VersionedServiceSpec), service.cmd.Args[1:])

if terseOutput {
log.Printf("Starting %s\n", colorize(service.VersionedServiceSpec))
} else {
log.Printf("Starting %s %v\n", colorize(service.VersionedServiceSpec), service.cmd.Args[1:])
}

startErr := service.Start(ctx)
if startErr != nil {
return startErr
Expand Down
77 changes: 59 additions & 18 deletions runner/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package runner

import (
"context"
"fmt"
"io"
"log"
"net/http"
Expand All @@ -28,9 +27,10 @@ type ServiceInstance struct {
startErrFn func() error
waitErrFn func() error

mu sync.Mutex
runErr error
killed bool
mu sync.Mutex
runErr error
killed bool
healthcheckAttempted bool
}

func (s *ServiceInstance) Start(ctx context.Context) error {
Expand Down Expand Up @@ -62,13 +62,18 @@ func (s *ServiceInstance) WaitUntilHealthy(ctx context.Context) error {
return err
}

sleepDuration, err := time.ParseDuration(s.VersionedServiceSpec.HealthCheckInterval)
sleepDuration, err := time.ParseDuration(s.HealthCheckInterval)
if err != nil {
log.Printf("failed to parse health check time duration, falling back to 200ms: %v", err)
// This should really not happen if we validate it properly in starlark
sleepDuration = time.Duration(200) * time.Millisecond
}

expectedStartDuration, err := time.ParseDuration(s.ExpectedStartDuration)
if err != nil {
log.Print("failed to parse expected start duration")
}

for {
err := s.Error()
if err != nil {
Expand All @@ -80,14 +85,11 @@ func (s *ServiceInstance) WaitUntilHealthy(ctx context.Context) error {
return err
}

err = s.HealthCheck(ctx)
if err == nil {
if s.HealthCheck(ctx, expectedStartDuration) {
log.Printf("%s healthy!\n", coloredLabel)
break
}

fmt.Println(err)

time.Sleep(sleepDuration)
}

Expand All @@ -101,36 +103,75 @@ var httpClient = http.Client{
Timeout: 50 * time.Millisecond,
}

func (s *ServiceInstance) HealthCheck(ctx context.Context) error {
func (s *ServiceInstance) HealthCheck(ctx context.Context, expectedStartDuration time.Duration) bool {
httpHealthCheckReq, _ := http.NewRequestWithContext(ctx, "GET", s.HttpHealthCheckAddress, nil)
coloredLabel := s.Colorize(s.Label)

shouldSilence := s.startTime.Add(expectedStartDuration).After(time.Now())

isHealthy := true
var err error
if s.HttpHealthCheckAddress != "" {
log.Printf("HTTP Healthchecking %s (pid %d) : %s\n", coloredLabel, s.Pid(), s.HttpHealthCheckAddress)
if !s.HealthcheckAttempted() || !shouldSilence {
log.Printf("HTTP Healthchecking %s (pid %d) : %s\n", coloredLabel, s.Pid(), s.HttpHealthCheckAddress)
}

logFunc := log.Printf
if shouldSilence {
logFunc = func(format string, v ...any) {}
}

var resp *http.Response
resp, err = httpClient.Do(httpHealthCheckReq)
if resp != nil {
if err != nil {
logFunc("healthcheck for %s failed: %v\n", coloredLabel, err)
isHealthy = false
} else if resp != nil {
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("healthcheck for %s failed: %v", coloredLabel, resp)
logFunc("healthcheck for %s failed: %v\n", coloredLabel, resp)
isHealthy = false
}

closeErr := resp.Body.Close()
if closeErr != nil {
log.Printf("error closing http body %v", closeErr)
logFunc("error closing http body %v", closeErr)
}
}

} else if s.ServiceSpec.HealthCheck != "" {
log.Printf("CMD Healthchecking %s (pid %d) : %s %v\n", coloredLabel, s.Pid(), s.Colorize(s.HealthCheckLabel), strings.Join(s.HealthCheckArgs, " "))
if !s.HealthcheckAttempted() || !shouldSilence {
if terseOutput {
log.Printf("CMD Healthchecking %s\n", coloredLabel)
} else {
log.Printf("CMD Healthchecking %s (pid %d) : %s %v\n", coloredLabel, s.Pid(), s.Colorize(s.HealthCheckLabel), strings.Join(s.HealthCheckArgs, " "))
}
}

cmd := exec.CommandContext(ctx, s.ServiceSpec.HealthCheck, s.HealthCheckArgs...)
cmd.Stdout = logger.New(s.Label+"? ", s.Color, os.Stdout)
cmd.Stderr = logger.New(s.Label+"? ", s.Color, os.Stderr)
if shouldSilence {
cmd.Stdout = io.Discard
cmd.Stderr = io.Discard
} else {
cmd.Stdout = logger.New(s.Label+"? ", s.Color, os.Stdout)
cmd.Stderr = logger.New(s.Label+"? ", s.Color, os.Stderr)
}
err = cmd.Run()
if err != nil {
cmd.Stdout.Write([]byte(err.Error()))
isHealthy = false
}
}

return err
s.mu.Lock()
defer s.mu.Unlock()
s.healthcheckAttempted = true
return isHealthy
}

func (s *ServiceInstance) HealthcheckAttempted() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.healthcheckAttempted
}

func (s *ServiceInstance) StartTime() time.Time {
Expand Down
6 changes: 3 additions & 3 deletions svcctl/svcctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func handleHealthCheck(ctx context.Context, r *runner.Runner, _ chan error, w ht
return
}

err = s.HealthCheck(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
isHealthy := s.HealthCheck(ctx, 0)
if !isHealthy {
http.Error(w, "Healthcheck failed", http.StatusServiceUnavailable)
return
}

Expand Down
1 change: 1 addition & 0 deletions svclib/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type ServiceSpec struct {
Env map[string]string `json:"env"`
Exe string `json:"exe"`
HttpHealthCheckAddress string `json:"http_health_check_address"`
ExpectedStartDuration string `json:"expected_start_duration"`
HealthCheck string `json:"health_check"`
HealthCheckLabel string `json:"health_check_label"`
HealthCheckArgs []string `json:"health_check_args"`
Expand Down

0 comments on commit 6d19cda

Please sign in to comment.