Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ WORKDIR /go/src/github.com/fly-examples/fly-postgres
COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/event_handler ./cmd/event_handler
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/standby_cleaner ./cmd/standby_cleaner
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start ./cmd/start
COPY ./bin/* /fly/bin/

Expand Down
63 changes: 63 additions & 0 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"context"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/flypg"
"os"
"time"
)

var Minute int64 = 60

func main() {
ctx := context.Background()
flypgNode, err := flypg.NewNode()
if err != nil {
fmt.Printf("failed to reference node: %s\n", err)
os.Exit(1)
}

conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

seenAt := map[int]int64{}

for _ = range ticker.C {
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
if err != nil {
fmt.Printf("Failed to check role: %s", err)
continue
}
if role != "primary" {
continue
}
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
if err != nil {
fmt.Printf("Failed to get standbys: %s", err)
continue
}
for _, standby := range standbys {
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
if err != nil {
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
err := flypgNode.RepMgr.UnregisterStandby(standby.Id)
if err != nil {
fmt.Printf("Failed to unregister %d: %s", standby.Id, err)
continue
}
delete(seenAt, standby.Id)
}
} else {
seenAt[standby.Id] = time.Now().Unix()
newConn.Close(ctx)
}
}
}
}
1 change: 1 addition & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {
svisor.AddProcess("repmgrd", fmt.Sprintf("gosu postgres repmgrd -f %s --daemonize=false", node.RepMgr.ConfigPath),
supervisor.WithRestart(0, 5*time.Second),
)
svisor.AddProcess("standby_cleaner", "/usr/local/bin/standby_cleaner", supervisor.WithRestart(0, 5*time.Second))

exporterEnv := map[string]string{
"DATA_SOURCE_URI": fmt.Sprintf("[%s]:%d/postgres?sslmode=disable", node.PrivateIP, node.Port),
Expand Down
36 changes: 36 additions & 0 deletions pkg/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (r *RepMgr) CurrentRole(ctx context.Context, pg *pgx.Conn) (string, error)
return r.memberRole(ctx, pg, int(r.ID))
}

func (r *RepMgr) Standbys(ctx context.Context, pg *pgx.Conn) ([]Standby, error) {
return r.standbyStatuses(ctx, pg, int(r.ID))
}

func (r *RepMgr) writeManagerConf() error {
file, err := os.OpenFile(r.ConfigPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
Expand Down Expand Up @@ -145,6 +149,15 @@ func (r *RepMgr) registerStandby() error {
return nil
}

func (r *RepMgr) UnregisterStandby(id int) error {
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
if err := runCommand(cmdStr); err != nil {
fmt.Printf("failed to unregister standby: %s", err)
}

return nil
}

func (r *RepMgr) clonePrimary(ipStr string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if err := runCommand(cmdStr); err != nil {
Expand Down Expand Up @@ -184,6 +197,29 @@ func (r *RepMgr) writePasswdConf() error {
return nil
}

type Standby struct {
Id int
Ip string
}

func (r *RepMgr) standbyStatuses(ctx context.Context, pg *pgx.Conn, id int) ([]Standby, error) {
sql := fmt.Sprintf("select node_id, node_name from repmgr.show_nodes where type = 'standby' and upstream_node_id = '%d';", id)
var standbys []Standby
rows, err := pg.Query(ctx, sql)
if err != nil {
return nil, err
}
for rows.Next() {
var s Standby
err := rows.Scan(&s.Id, &s.Ip)
if err != nil {
return nil, err
}
standbys = append(standbys, s)
}
return standbys, nil
}

func (r *RepMgr) memberRole(ctx context.Context, pg *pgx.Conn, id int) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", id)
var role string
Expand Down