From a08cf4c508d40c1f2aae7af7add8ebb433ce15f2 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Wed, 21 Dec 2022 18:24:58 -0500 Subject: [PATCH] Deregister standbys after x mins of inactivity --- Dockerfile | 1 + cmd/standby_cleaner/main.go | 63 +++++++++++++++++++++++++++++++++++++ cmd/start/main.go | 1 + pkg/flypg/repmgr.go | 36 +++++++++++++++++++++ 4 files changed, 101 insertions(+) create mode 100644 cmd/standby_cleaner/main.go diff --git a/Dockerfile b/Dockerfile index 80fba8c8..add7deb0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,7 @@ WORKDIR /go/src/github.com/fly-examples/fly-postgres COPY . . RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/event_handler ./cmd/event_handler +RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/standby_cleaner ./cmd/standby_cleaner RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start ./cmd/start COPY ./bin/* /fly/bin/ diff --git a/cmd/standby_cleaner/main.go b/cmd/standby_cleaner/main.go new file mode 100644 index 00000000..51928b7e --- /dev/null +++ b/cmd/standby_cleaner/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "fmt" + "github.com/fly-apps/postgres-flex/pkg/flypg" + "os" + "time" +) + +var Minute int64 = 60 + +func main() { + ctx := context.Background() + flypgNode, err := flypg.NewNode() + if err != nil { + fmt.Printf("failed to reference node: %s\n", err) + os.Exit(1) + } + + conn, err := flypgNode.RepMgr.NewLocalConnection(ctx) + if err != nil { + fmt.Printf("failed to open local connection: %s\n", err) + os.Exit(1) + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + seenAt := map[int]int64{} + + for _ = range ticker.C { + role, err := flypgNode.RepMgr.CurrentRole(ctx, conn) + if err != nil { + fmt.Printf("Failed to check role: %s", err) + continue + } + if role != "primary" { + continue + } + standbys, err := flypgNode.RepMgr.Standbys(ctx, conn) + if err != nil { + fmt.Printf("Failed to get standbys: %s", err) + continue + } + for _, standby := range standbys { + newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip) + if err != nil { + if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute { + err := flypgNode.RepMgr.UnregisterStandby(standby.Id) + if err != nil { + fmt.Printf("Failed to unregister %d: %s", standby.Id, err) + continue + } + delete(seenAt, standby.Id) + } + } else { + seenAt[standby.Id] = time.Now().Unix() + newConn.Close(ctx) + } + } + } +} diff --git a/cmd/start/main.go b/cmd/start/main.go index 09c43357..98016f79 100644 --- a/cmd/start/main.go +++ b/cmd/start/main.go @@ -52,6 +52,7 @@ func main() { svisor.AddProcess("repmgrd", fmt.Sprintf("gosu postgres repmgrd -f %s --daemonize=false", node.RepMgr.ConfigPath), supervisor.WithRestart(0, 5*time.Second), ) + svisor.AddProcess("standby_cleaner", "/usr/local/bin/standby_cleaner", supervisor.WithRestart(0, 5*time.Second)) exporterEnv := map[string]string{ "DATA_SOURCE_URI": fmt.Sprintf("[%s]:%d/postgres?sslmode=disable", node.PrivateIP, node.Port), diff --git a/pkg/flypg/repmgr.go b/pkg/flypg/repmgr.go index c8c6fd22..4110a0aa 100644 --- a/pkg/flypg/repmgr.go +++ b/pkg/flypg/repmgr.go @@ -74,6 +74,10 @@ func (r *RepMgr) CurrentRole(ctx context.Context, pg *pgx.Conn) (string, error) return r.memberRole(ctx, pg, int(r.ID)) } +func (r *RepMgr) Standbys(ctx context.Context, pg *pgx.Conn) ([]Standby, error) { + return r.standbyStatuses(ctx, pg, int(r.ID)) +} + func (r *RepMgr) writeManagerConf() error { file, err := os.OpenFile(r.ConfigPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { @@ -145,6 +149,15 @@ func (r *RepMgr) registerStandby() error { return nil } +func (r *RepMgr) UnregisterStandby(id int) error { + cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id) + if err := runCommand(cmdStr); err != nil { + fmt.Printf("failed to unregister standby: %s", err) + } + + return nil +} + func (r *RepMgr) clonePrimary(ipStr string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) if err := runCommand(cmdStr); err != nil { @@ -184,6 +197,29 @@ func (r *RepMgr) writePasswdConf() error { return nil } +type Standby struct { + Id int + Ip string +} + +func (r *RepMgr) standbyStatuses(ctx context.Context, pg *pgx.Conn, id int) ([]Standby, error) { + sql := fmt.Sprintf("select node_id, node_name from repmgr.show_nodes where type = 'standby' and upstream_node_id = '%d';", id) + var standbys []Standby + rows, err := pg.Query(ctx, sql) + if err != nil { + return nil, err + } + for rows.Next() { + var s Standby + err := rows.Scan(&s.Id, &s.Ip) + if err != nil { + return nil, err + } + standbys = append(standbys, s) + } + return standbys, nil +} + func (r *RepMgr) memberRole(ctx context.Context, pg *pgx.Conn, id int) (string, error) { sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", id) var role string