Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ LABEL fly.pg-version=${PG_VERSION}
LABEL fly.pg-manager=repmgr

RUN apt-get update && apt-get install --no-install-recommends -y \
ca-certificates iproute2 postgresql-$PG_MAJOR_VERSION-repmgr curl bash dnsutils vim socat procps ssh gnupg \
ca-certificates iproute2 postgresql-$PG_MAJOR_VERSION-repmgr curl bash dnsutils vim socat procps ssh gnupg rsync barman-cli barman cron \
&& apt autoremove -y

# PostGIS
Expand Down
31 changes: 31 additions & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"
"time"

"github.com/fly-apps/postgres-flex/internal/flybarman"
"github.com/fly-apps/postgres-flex/internal/flypg"
"github.com/fly-apps/postgres-flex/internal/supervisor"
)
Expand All @@ -22,6 +23,36 @@ func main() {
}
}

if os.Getenv("IS_BARMAN") != "" {
node, err := flybarman.NewNode()
if err != nil {
panicHandler(err)
return
}

ctx := context.Background()

if err = node.Init(ctx); err != nil {
panicHandler(err)
return
}

svisor := supervisor.New("flybarman", 1*time.Minute)
svisor.AddProcess("barman", fmt.Sprintf("tail -f %s", node.LogFile))
svisor.AddProcess("admin", "/usr/local/bin/start_admin_server",
supervisor.WithRestart(0, 5*time.Second),
)

svisor.StopOnSignal(syscall.SIGINT, syscall.SIGTERM)

if err := svisor.Run(); err != nil {
fmt.Println(err)
os.Exit(1)
}

return
}

node, err := flypg.NewNode()
if err != nil {
panicHandler(err)
Expand Down
194 changes: 194 additions & 0 deletions internal/flybarman/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package flybarman

import (
"context"
"fmt"
"log"
"os"
"os/exec"

"github.com/fly-apps/postgres-flex/internal/flypg"
"github.com/fly-apps/postgres-flex/internal/flypg/admin"
)

var dataDir = "/data"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just wrap this:

var (
  dataDir = "/data"
  barmanConfigFile = "..."
  ...
)

var barmanConfigFile = dataDir + "/barman.conf"
var barmanCronFile = dataDir + "/barman.cron"
var globalBarmanConfigFile = "/etc/barman.conf"

type Node struct {
AppName string
PrivateIP string
PrimaryRegion string
DataDir string
Port int

BarmanHome string
LogFile string
PasswordConfigPath string

SUCredentials admin.Credential
OperatorCredentials admin.Credential
ReplCredentials admin.Credential
}

func NewNode() (*Node, error) {
node := &Node{
AppName: "local",
BarmanHome: dataDir + "/barman.d",
LogFile: dataDir + "/barman.log",
PasswordConfigPath: "/root/.pgpass",
}

if appName := os.Getenv("FLY_APP_NAME"); appName != "" {
node.AppName = appName
}

// Internal user
node.SUCredentials = admin.Credential{
Username: "flypgadmin",
Password: os.Getenv("SU_PASSWORD"),
}

// Postgres user
node.OperatorCredentials = admin.Credential{
Username: "postgres",
Password: os.Getenv("OPERATOR_PASSWORD"),
}

// Repmgr user
node.ReplCredentials = admin.Credential{
Username: "repmgr",
Password: os.Getenv("REPL_PASSWORD"),
}

return node, nil
}

func (n *Node) Init(ctx context.Context) error {
err := flypg.WriteSSHKey()
if err != nil {
return fmt.Errorf("failed write ssh keys: %s", err)
}

if _, err := os.Stat(barmanConfigFile); os.IsNotExist(err) {
barmanConfigFileContent := fmt.Sprintf(`[barman]
barman_user = root
barman_home = /data/barman.d
log_level = info
log_file = /data/barman.log
[pg]
description = "Fly.io Postgres Cluster"
conninfo = host=%s.internal user=repmgr dbname=postgres
streaming_conninfo = host=%s.internal user=repmgr dbname=postgres
backup_method = postgres
streaming_archiver = on
slot_name = barman
create_slot = auto
retention_policy_mode = auto
retention_policy = RECOVERY WINDOW OF 7 days
wal_retention_policy = main
`, n.AppName, n.AppName)

if err := os.WriteFile(barmanConfigFile, []byte(barmanConfigFileContent), 0644); err != nil {
return fmt.Errorf("failed write %s: %s", barmanConfigFile, err)
}

log.Println(barmanConfigFile + " created successfully.")
}

if err := deleteGlobalBarmanFile(); err != nil {
return fmt.Errorf("failed delete /etc/barman.conf: %s", err)
}

if err := os.Symlink(barmanConfigFile, globalBarmanConfigFile); err != nil {
return fmt.Errorf("failed symlink %s to %s: %s", barmanConfigFile, globalBarmanConfigFile, err)
}

log.Println("Symbolic link to barman config created successfully.")

if err := os.MkdirAll(n.BarmanHome, os.ModePerm); err != nil {
return fmt.Errorf("failed to mkdir %s: %s", n.BarmanHome, err)
}

log.Println("Barman home directory successfully.")

passStr := fmt.Sprintf("*:*:*:%s:%s", n.ReplCredentials.Username, n.ReplCredentials.Password)
if err := os.WriteFile(n.PasswordConfigPath, []byte(passStr), 0700); err != nil {
return fmt.Errorf("failed to write file %s: %s", n.PasswordConfigPath, err)
}
// We need this in case the user ssh to the vm as root
if err := os.WriteFile("/.pgpass", []byte(passStr), 0700); err != nil {
return fmt.Errorf("failed to write file %s: %s", n.PasswordConfigPath, err)
}

if _, err := os.Stat(barmanCronFile); os.IsNotExist(err) {
barmanCronFileContent := `* * * * * /usr/bin/barman cron
`
if err := os.WriteFile(barmanCronFile, []byte(barmanCronFileContent), 0644); err != nil {
return fmt.Errorf("failed write %s: %s", barmanCronFile, err)
}

log.Println(barmanCronFile + " created successfully.")
}

if _, err := os.Stat(n.LogFile); os.IsNotExist(err) {
file, err := os.Create(n.LogFile)
if err != nil {
return fmt.Errorf("failed to touch %s: %s", n.LogFile, err)
}
defer func() { _ = file.Close() }()

log.Println(n.LogFile + " created successfully.")
}

crontabCommand := exec.Command("/usr/bin/crontab", barmanCronFile)
if _, err := crontabCommand.Output(); err != nil {
return fmt.Errorf("failed set crontab: %s", err)
}

log.Println("Crontab updated")

serviceCmd := exec.Command("/usr/sbin/service", "--version")
if err := serviceCmd.Run(); err != nil {
log.Println("service command not found, skipping initializing cron service")
} else {
serviceCronStartCommand := exec.Command("service", "cron", "start")
if _, err := serviceCronStartCommand.Output(); err != nil {
return fmt.Errorf("failed starting cron service: %s", err)
}
log.Println("Started cron service")
}

switchWalCommand := exec.Command("barman", "switch-wal", "--archive", "--force", "pg")
if _, err := switchWalCommand.Output(); err != nil {
log.Println(fmt.Errorf("failed switching WAL: %s", err))
log.Println("try running `barman switch-wal --archive --force pg` or wait for the next WAL")
} else {
log.Println("successfully switched WAL files to start barman")
}

cronCommand := exec.Command("barman", "cron")
if _, err := cronCommand.Output(); err != nil {
log.Println(fmt.Errorf("failed running barman cron: %s", err))
log.Println("try running `cronCommand` or wait for the next run")
} else {
log.Println("successfully ran `barman cron`")
}

return nil
}

func deleteGlobalBarmanFile() error {
if _, err := os.Stat(globalBarmanConfigFile); os.IsNotExist(err) {
return nil
}

if err := os.Remove(globalBarmanConfigFile); err != nil {
return err
}

log.Println(globalBarmanConfigFile + " deleted successfully")
return nil
}
54 changes: 54 additions & 0 deletions internal/flycheck/barman.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package flycheck

import (
"errors"
"os/exec"
"regexp"
"strings"

"github.com/superfly/fly-checks/check"
)

func CheckBarmanConnection(checks *check.CheckSuite) *check.CheckSuite {
cmd := exec.Command("barman", "check", "pg")

output, err := cmd.CombinedOutput()
if err != nil {
checks.AddCheck("connection", func() (string, error) {
msg := "failed running `barman check pg`"
return "", errors.New(msg)
})

return checks
}

// Each line besides the first represents a check and will include FAILED or OK
// We just separate those lines and create a health check entry of our own
// so it's uniform how we handle it
lines := strings.Split(string(output), "\n")

for _, line := range lines {
pattern := `\s*(.*?):(.*)$`
regex := regexp.MustCompile(pattern)
matches := regex.FindStringSubmatch(line)

if len(matches) == 3 {
left := matches[1]
right := strings.Trim(matches[2], "")

if right == "" {
continue
}

checks.AddCheck(left, func() (string, error) {
if strings.Contains(right, "FAILED") {
return "", errors.New(right)
}

return right, nil
})
}
}

return checks
}
49 changes: 46 additions & 3 deletions internal/flycheck/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"net/http"
"os"
"time"

"github.com/superfly/fly-checks/check"
Expand All @@ -16,9 +17,15 @@ const Port = 5500
func Handler() http.Handler {
r := http.NewServeMux()

r.HandleFunc("/flycheck/vm", runVMChecks)
r.HandleFunc("/flycheck/pg", runPGChecks)
r.HandleFunc("/flycheck/role", runRoleCheck)
if os.Getenv("IS_BARMAN") != "" {
r.HandleFunc("/flycheck/vm", runVMChecks)
r.HandleFunc("/flycheck/connection", runBarmanConnectionChecks)
r.HandleFunc("/flycheck/role", runBarmanRoleCheck)
} else {
r.HandleFunc("/flycheck/vm", runVMChecks)
r.HandleFunc("/flycheck/pg", runPGChecks)
r.HandleFunc("/flycheck/role", runRoleCheck)
}

return r
}
Expand Down Expand Up @@ -82,6 +89,42 @@ func runRoleCheck(w http.ResponseWriter, r *http.Request) {
handleCheckResponse(w, suite, true)
}

func runBarmanConnectionChecks(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), (5 * time.Second))
defer cancel()

suite := &check.CheckSuite{Name: "Connection"}
suite = CheckBarmanConnection(suite)

go func(ctx context.Context) {
suite.Process(ctx)
cancel()
}(ctx)

<-ctx.Done()

handleCheckResponse(w, suite, false)
}

func runBarmanRoleCheck(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), (time.Second * 5))
defer cancel()

suite := &check.CheckSuite{Name: "Role"}
suite.AddCheck("role", func() (string, error) {
return "barman", nil
})

go func() {
suite.Process(ctx)
cancel()
}()

<-ctx.Done()

handleCheckResponse(w, suite, true)
}

func handleCheckResponse(w http.ResponseWriter, suite *check.CheckSuite, raw bool) {
if suite.ErrOnSetup != nil {
handleError(w, suite.ErrOnSetup)
Expand Down
2 changes: 1 addition & 1 deletion internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (n *Node) Init(ctx context.Context) error {
}
}

err := writeSSHKey()
err := WriteSSHKey()
if err != nil {
return fmt.Errorf("failed write ssh keys: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (c *PGConfig) setDefaultHBA() error {
},
{
Type: "host",
Database: c.repmgrDatabase,
Database: fmt.Sprintf("replication,%s", c.repmgrDatabase),
User: c.repmgrUsername,
Address: "fdaa::/16",
Method: "trust",
Expand Down
Loading