Skip to content
forked from octolab/breaker

🚧 Flexible mechanism to make execution flow interruptible.

License

Notifications You must be signed in to change notification settings

kamilsk/breaker

Β 
Β 

Repository files navigation

🚧 breaker Awesome Go

Flexible mechanism to make execution flow interruptible.

Build Documentation Quality Template Coverage Mirror

πŸ’‘ Idea

The breaker carries a cancellation signal to interrupt an action execution.

var NewYear = time.Time{}.AddDate(time.Now().Year(), 0, 0)

interrupter := breaker.Multiplex(
	breaker.BreakByContext(context.WithTimeout(req.Context(), time.Minute)),
	breaker.BreakByDeadline(NewYear),
	breaker.BreakBySignal(os.Interrupt),
)
defer interrupter.Close()

<-interrupter.Done() // wait context cancellation, timeout or interrupt signal

A full description of the idea is available here.

πŸ† Motivation

I have to make modules github.com/kamilsk/retry/v5:

if err := retry.Retry(breaker.BreakByTimeout(time.Minute), action); err != nil {
	log.Fatal(err)
}

and github.com/kamilsk/semaphore/v5:

if err := semaphore.Acquire(breaker.BreakByTimeout(time.Minute), 5); err != nil {
	log.Fatal(err)
}

more consistent and reliable.

Additionally, I want to implement a Graceful Shutdown on the same mechanism.

πŸ€Όβ€β™‚οΈ How to

Do HTTP request with retries

interrupter := breaker.Multiplex(
	breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
	breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()

ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "...")

req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
	panic(err)
}

var resp *http.Response
action := func(ctx context.Context) (err error) {
	req = req.Clone(ctx)

	source := ctx.Value(header).(string)
	req.Header.Set(header, source)

	resp, err = http.DefaultClient.Do(req)
	return err
}

if err := retry.Do(ctx, action); err != nil {
	panic(err)
}
Full example
package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"os"
	"syscall"
	"time"

	"github.com/kamilsk/breaker"
	"github.com/kamilsk/retry/v5"
)

func main() {
	const (
		header  = "X-Message"
		timeout = time.Minute
	)

	server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
		time.Sleep(timeout / 10)
		_, _ = rw.Write([]byte(req.Header.Get(header)))
	}))
	defer server.Close()

	interrupter := breaker.Multiplex(
		breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
		breaker.BreakByTimeout(timeout),
	)
	defer interrupter.Close()

	ctx := breaker.ToContext(interrupter)
	ctx = context.WithValue(ctx, header, "flexible mechanism to make execution flow interruptible")

	req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
	if err != nil {
		panic(err)
	}

	var resp *http.Response
	action := func(ctx context.Context) (err error) {
		req = req.Clone(ctx)

		source := ctx.Value(header).(string)
		req.Header.Set(header, source)

		resp, err = http.DefaultClient.Do(req)
		return err
	}

	if err := retry.Do(ctx, action); err != nil {
		fmt.Println("error:", err)
		return
	}
	_, _ = io.Copy(os.Stdout, resp.Body)
}

Play it!

Graceful Shutdown HTTP server

interrupter := breaker.Multiplex(
	breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
	breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()

server := http.Server{
	BaseContext: func(net.Listener) context.Context {
		return breaker.ToContext(interrupter)
	},
}
go func() {
	if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
		log.Fatal(err)
	}
}()

<-interrupter.Done()
if errors.Is(interrupter.Err(), breaker.Interrupted) {
	if err := server.Shutdown(context.TODO()); err != nil {
		panic(err)
	}
}
Full example
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"syscall"
	"time"

	"github.com/kamilsk/breaker"
)

func main() {
	const timeout = time.Minute

	interrupter := breaker.Multiplex(
		breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
		breaker.BreakByTimeout(timeout),
	)
	defer interrupter.Close()

	server := http.Server{
		Addr:    ":8080",
		Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}),
		BaseContext: func(net.Listener) context.Context {
			return breaker.ToContext(interrupter)
		},
	}
	go func() {
		if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
			log.Fatal(err)
		}
	}()

	<-interrupter.Done()
	if err := interrupter.Err(); errors.Is(err, breaker.Interrupted) {
		if err := server.Shutdown(context.TODO()); err != nil {
			panic(err)
		}
	}
	fmt.Println("graceful shutdown")
}

Play it!

🧩 Integration

The library uses SemVer for versioning, and it is not BC-safe through major releases. You can use go modules to manage its version.

$ go get github.com/kamilsk/breaker@latest

🀲 Outcomes

Console tool to execute commands for a limited time

The example shows how to execute console commands for ten minutes.

$ date
# Thu Jan  7 21:02:21
$ breakit after 10m -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process                  | Status                     | Since    | Until    |
# +--------------------------+----------------------------+----------+----------+
# | server run --port=8080   | exit 1; panic: database... | 21:02:36 | -        |
# +--------------------------+----------------------------+----------+----------+
# |                          |                            |    Total |        1 |
# +--------------------------+----------------------------+----------+----------+
$ breakit after 10m -- database run --port=5432
$ breakit after 10m delay 5s -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process                  | Status                     | Since    | Until    |
# +--------------------------+----------------------------+----------+----------+
# | database run --port=5432 | running                    | 21:04:09 | 21:14:09 |
# | server run --port=8080   | delayed                    | 21:04:30 | 21:14:25 |
# +--------------------------+----------------------------+----------+----------+
# |                          |                            |    Total |        2 |
# +--------------------------+----------------------------+----------+----------+

See more details here.

made with ❀️ for everyone

About

🚧 Flexible mechanism to make execution flow interruptible.

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Languages

  • Go 75.7%
  • Makefile 24.3%