diff --git a/cmd/monitor/main.go b/cmd/monitor/main.go index 638a6748..f465f895 100644 --- a/cmd/monitor/main.go +++ b/cmd/monitor/main.go @@ -2,218 +2,43 @@ package main import ( "context" - "encoding/json" "fmt" - "net/http" - "os" + "log" "time" "github.com/fly-apps/postgres-flex/internal/flypg" - "github.com/fly-apps/postgres-flex/internal/flypg/admin" - "github.com/jackc/pgx/v5" - - "golang.org/x/exp/maps" ) var ( - deadMemberMonitorFrequency = time.Minute * 5 - readonlyStateMonitorFrequency = time.Minute * 1 + deadMemberMonitorFrequency = time.Hour * 1 + replicationStateMonitorFrequency = time.Hour * 1 + readonlyStateMonitorFrequency = time.Minute * 1 + + defaultDeadMemberRemovalThreshold = time.Hour * 12 + defaultInactiveSlotRemovalThreshold = time.Hour * 12 ) func main() { ctx := context.Background() - flypgNode, err := flypg.NewNode() + + node, err := flypg.NewNode() if err != nil { - fmt.Printf("failed to reference node: %s\n", err) - os.Exit(1) + panic(fmt.Sprintf("failed to reference node: %s\n", err)) } // Dead member monitor + log.Println("Monitoring dead members") go func() { - internal, err := flypg.ReadFromFile("/data/flypg.internal.conf") - if err != nil { - fmt.Printf("failed to open config: %s\n", err) - os.Exit(1) - } - - user, err := flypg.ReadFromFile("/data/flypg.user.conf") - if err != nil { - fmt.Printf("failed to open config: %s\n", err) - os.Exit(1) - } - - maps.Copy(user, internal) - - deadMemberRemovalThreshold, err := time.ParseDuration(fmt.Sprint(internal["standby_clean_interval"])) - if err != nil { - fmt.Printf(fmt.Sprintf("Failed to parse config: %s", err)) - os.Exit(1) - } - - seenAt := map[int]time.Time{} - - ticker := time.NewTicker(deadMemberMonitorFrequency) - defer ticker.Stop() - - fmt.Printf("Pruning every %s...\n", deadMemberRemovalThreshold) - - for range ticker.C { - err := handleDeadMemberMonitorTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold) - if err != nil { - fmt.Println(err) - } + if err := monitorDeadMembers(ctx, node); err != nil { + panic(err) } }() // Readonly monitor - ticker := time.NewTicker(readonlyStateMonitorFrequency) - defer ticker.Stop() - for range ticker.C { - if err := handleReadonlyMonitorTick(ctx, flypgNode); err != nil { - fmt.Println(err) - } - } - -} - -type readonlyStateResponse struct { - Result bool -} - -func handleReadonlyMonitorTick(ctx context.Context, node *flypg.Node) error { - conn, err := node.RepMgr.NewLocalConnection(ctx) - if err != nil { - return fmt.Errorf("failed to open local connection: %s", err) - } - defer conn.Close(ctx) - - member, err := node.RepMgr.Member(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query local member: %s", err) - } - - if member.Role == flypg.PrimaryRoleName { - return nil - } - - primary, err := node.RepMgr.PrimaryMember(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query primary member: %s", err) - } - - endpoint := fmt.Sprintf("http://[%s]:5500/%s", primary.Hostname, flypg.ReadOnlyStateEndpoint) - resp, err := http.Get(endpoint) - if err != nil { - return fmt.Errorf("failed to query primary readonly state: %s", err) - } - defer resp.Body.Close() - - var state readonlyStateResponse - if err := json.NewDecoder(resp.Body).Decode(&state); err != nil { - return fmt.Errorf("failed to decode result: %s", err) - } - - if state.Result { - if !flypg.ReadOnlyLockExists() { - fmt.Printf("Setting connections running under %s to readonly\n", node.PrivateIP) - if err := flypg.EnableReadonly(ctx, node); err != nil { - return fmt.Errorf("failed to set connection under %s to readonly: %s", node.PrivateIP, err) - } - } - } else { - if !flypg.ZombieLockExists() && flypg.ReadOnlyLockExists() { - fmt.Printf("Setting connections running under %s to read/write\n", node.PrivateIP) - if err := flypg.DisableReadonly(ctx, node); err != nil { - return fmt.Errorf("failed to set connections under %s read/write: %s", node.PrivateIP, err) - } - } - } - - return nil -} - -func handleDeadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error { - // TODO - We should connect using the flypgadmin user so we can differentiate between - // internal admin connection usage and the actual repmgr process. - conn, err := node.RepMgr.NewLocalConnection(ctx) - if err != nil { - fmt.Printf("failed to open local connection: %s\n", err) - os.Exit(1) - } - defer conn.Close(ctx) - - member, err := node.RepMgr.MemberByID(ctx, conn, int(node.RepMgr.ID)) - if err != nil { - return err - } - - if member.Role != flypg.PrimaryRoleName { - return nil - } - - standbys, err := node.RepMgr.StandbyMembers(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query standbys: %s", err) - } - - for _, standby := range standbys { - sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname) - if err != nil { - // TODO - Verify the exception that's getting thrown. - if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold { - if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil { - fmt.Printf("failed to unregister member %s: %v", standby.Hostname, err) - continue - } - - delete(seenAt, standby.ID) - } - - continue - } - defer sConn.Close(ctx) + log.Println("Monitoring readonly state") + go monitorReadOnly(ctx, node) - seenAt[standby.ID] = time.Now() - } - - removeOrphanedReplicationSlots(ctx, conn, standbys) - - return nil -} - -func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Member) { - var orphanedSlots []admin.ReplicationSlot - - slots, err := admin.ListReplicationSlots(ctx, conn) - if err != nil { - fmt.Printf("failed to list replication slots: %s", err) - } - - // An orphaned replication slot is defined as an inactive replication slot that is no longer tied to - // and existing repmgr member. - for _, slot := range slots { - matchFound := false - for _, standby := range standbys { - if slot.MemberID == int32(standby.ID) { - matchFound = true - } - } - - if !matchFound && !slot.Active { - orphanedSlots = append(orphanedSlots, slot) - } - } - - if len(orphanedSlots) > 0 { - fmt.Printf("%d orphaned replication slot(s) detected\n", len(orphanedSlots)) - - for _, slot := range orphanedSlots { - fmt.Printf("Dropping replication slot: %s\n", slot.Name) - - if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil { - fmt.Printf("failed to drop replication slot %s: %v\n", slot.Name, err) - continue - } - } - } + // Replication slot monitor + log.Println("Monitoring replication slots") + monitorReplicationSlots(ctx, node) } diff --git a/cmd/monitor/monitor_dead_members.go b/cmd/monitor/monitor_dead_members.go new file mode 100644 index 00000000..5deff5a5 --- /dev/null +++ b/cmd/monitor/monitor_dead_members.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" + "golang.org/x/exp/maps" +) + +func monitorDeadMembers(ctx context.Context, node *flypg.Node) error { + internal, err := flypg.ReadFromFile("/data/flypg.internal.conf") + if err != nil { + return fmt.Errorf("failed to open config: %s", err) + } + + user, err := flypg.ReadFromFile("/data/flypg.user.conf") + if err != nil { + return fmt.Errorf("failed to open config: %s", err) + } + + maps.Copy(user, internal) + + removalThreshold := defaultDeadMemberRemovalThreshold + + if internal["deadMemberRemovalThreshold"] != "" { + removalThreshold, err = time.ParseDuration(fmt.Sprint(internal["deadMemberRemovalThreshold"])) + if err != nil { + log.Printf("failed to parse deadMemberRemovalThreshold: %s\n", err) + } + } + + seenAt := map[int]time.Time{} + + ticker := time.NewTicker(deadMemberMonitorFrequency) + defer ticker.Stop() + + log.Printf("Pruning every %s...\n", removalThreshold) + + for range ticker.C { + err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold) + if err != nil { + log.Printf("deadMemberMonitorTick failed with: %s", err) + } + } + + return nil +} + +func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error { + // TODO - We should connect using the flypgadmin user so we can differentiate between + // internal admin connection usage and the actual repmgr process. + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + return fmt.Errorf("failed to open local connection: %s", err) + } + defer conn.Close(ctx) + + member, err := node.RepMgr.MemberByID(ctx, conn, int(node.RepMgr.ID)) + if err != nil { + return err + } + + if member.Role != flypg.PrimaryRoleName { + return nil + } + + standbys, err := node.RepMgr.StandbyMembers(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query standbys: %s", err) + } + + for _, standby := range standbys { + sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname) + if err != nil { + // TODO - Verify the exception that's getting thrown. + if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold { + log.Printf("Removing dead member: %s\n", standby.Hostname) + if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil { + log.Printf("failed to unregister member %s: %v", standby.Hostname, err) + continue + } + delete(seenAt, standby.ID) + } + + continue + } + defer sConn.Close(ctx) + seenAt[standby.ID] = time.Now() + } + + return nil +} diff --git a/cmd/monitor/monitor_readonly.go b/cmd/monitor/monitor_readonly.go new file mode 100644 index 00000000..31fe40f1 --- /dev/null +++ b/cmd/monitor/monitor_readonly.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" +) + +type readonlyStateResponse struct { + Result bool +} + +func monitorReadOnly(ctx context.Context, node *flypg.Node) { + ticker := time.NewTicker(readonlyStateMonitorFrequency) + defer ticker.Stop() + for range ticker.C { + if err := readonlyMonitorTick(ctx, node); err != nil { + log.Printf("readOnlyMonitorTick failed with: %s", err) + } + } +} + +func readonlyMonitorTick(ctx context.Context, node *flypg.Node) error { + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + return fmt.Errorf("failed to open local connection: %s", err) + } + defer conn.Close(ctx) + + member, err := node.RepMgr.Member(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query local member: %s", err) + } + + if member.Role == flypg.PrimaryRoleName { + return nil + } + + primary, err := node.RepMgr.PrimaryMember(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query primary member: %s", err) + } + + endpoint := fmt.Sprintf("http://[%s]:5500/%s", primary.Hostname, flypg.ReadOnlyStateEndpoint) + resp, err := http.Get(endpoint) + if err != nil { + return fmt.Errorf("failed to query primary readonly state: %s", err) + } + defer resp.Body.Close() + + var state readonlyStateResponse + if err := json.NewDecoder(resp.Body).Decode(&state); err != nil { + return fmt.Errorf("failed to decode result: %s", err) + } + + if state.Result { + if !flypg.ReadOnlyLockExists() { + log.Printf("Setting connections running under %s to readonly\n", node.PrivateIP) + if err := flypg.EnableReadonly(ctx, node); err != nil { + return fmt.Errorf("failed to set connection under %s to readonly: %s", node.PrivateIP, err) + } + } + } else { + if !flypg.ZombieLockExists() && flypg.ReadOnlyLockExists() { + log.Printf("Setting connections running under %s to read/write\n", node.PrivateIP) + if err := flypg.DisableReadonly(ctx, node); err != nil { + return fmt.Errorf("failed to set connections under %s read/write: %s", node.PrivateIP, err) + } + } + } + + return nil +} diff --git a/cmd/monitor/monitor_replication_slots.go b/cmd/monitor/monitor_replication_slots.go new file mode 100644 index 00000000..167838db --- /dev/null +++ b/cmd/monitor/monitor_replication_slots.go @@ -0,0 +1,86 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/fly-apps/postgres-flex/internal/flypg/admin" +) + +func monitorReplicationSlots(ctx context.Context, node *flypg.Node) { + inactiveSlotStatus := map[int]time.Time{} + + ticker := time.NewTicker(replicationStateMonitorFrequency) + defer ticker.Stop() + for range ticker.C { + if err := replicationSlotMonitorTick(ctx, node, inactiveSlotStatus); err != nil { + log.Printf("replicationSlotMonitorTick failed with: %s", err) + } + } +} + +func replicationSlotMonitorTick(ctx context.Context, node *flypg.Node, inactiveSlotStatus map[int]time.Time) error { + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + log.Printf("failed to open local connection: %s\n", err) + } + defer conn.Close(ctx) + + member, err := node.RepMgr.Member(ctx, conn) + if err != nil { + return err + } + + // Only monitor replication slots on the primary. + // We need to check this per-tick as the role can change at runtime. + if member.Role != flypg.PrimaryRoleName { + return nil + } + + slots, err := admin.ListReplicationSlots(ctx, conn) + if err != nil { + log.Printf("failed to list replication slots: %s\n", err) + } + + for _, slot := range slots { + if slot.Active { + delete(inactiveSlotStatus, int(slot.MemberID)) + continue + } + + // Log warning if inactive replication slot is holding onto more than 50MB worth of WAL. + if slot.RetainedWalInBytes != 0 { + retainedWalInMB := slot.RetainedWalInBytes / 1024 / 1024 + if retainedWalInMB > 50 { + log.Printf("Warning: Inactive replication slot %s is retaining %d MB of WAL", slot.Name, retainedWalInMB) + } + } + + // Check to see if slot has already been registered as inactive. + if lastSeen, ok := inactiveSlotStatus[int(slot.MemberID)]; ok { + // TODO - Consider creating a separate threshold for when the member exists. + // TODO - Consider being more aggressive with removing replication slots if disk capacity is at dangerous levels. + // TODO - Make inactiveSlotRemovalThreshold configurable. + + // Remove the replication slot if it has been inactive for longer than the defined threshold + if time.Since(lastSeen) > defaultInactiveSlotRemovalThreshold { + log.Printf("Dropping replication slot: %s\n", slot.Name) + if err := admin.DropReplicationSlot(ctx, conn, slot.Name); err != nil { + log.Printf("failed to drop replication slot %s: %v\n", slot.Name, err) + continue + } + + delete(inactiveSlotStatus, int(slot.MemberID)) + continue + } + + log.Printf("Replication slot %s has been inactive for %v\n", slot.Name, time.Since(lastSeen).Round(time.Second)) + } else { + inactiveSlotStatus[int(slot.MemberID)] = time.Now() + } + } + + return nil +} diff --git a/internal/flypg/admin/admin.go b/internal/flypg/admin/admin.go index a2f70351..267be42f 100644 --- a/internal/flypg/admin/admin.go +++ b/internal/flypg/admin/admin.go @@ -78,15 +78,15 @@ func DeleteDatabase(ctx context.Context, pg *pgx.Conn, name string) error { } type ReplicationSlot struct { - MemberID int32 - Name string - Type string - Active bool - WalStatus string + MemberID int32 + Name string + Active bool + WalStatus string + RetainedWalInBytes int } func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, error) { - sql := "SELECT slot_name, slot_type, active, wal_status from pg_replication_slots;" + sql := "SELECT slot_name, active, wal_status, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal FROM pg_replication_slots;" rows, err := pg.Query(ctx, sql) if err != nil { return nil, err @@ -97,7 +97,7 @@ func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, for rows.Next() { var slot ReplicationSlot - if err := rows.Scan(&slot.Name, &slot.Type, &slot.Active, &slot.WalStatus); err != nil { + if err := rows.Scan(&slot.Name, &slot.Active, &slot.WalStatus, &slot.RetainedWalInBytes); err != nil { return nil, err } diff --git a/internal/flypg/flypg.go b/internal/flypg/flypg.go index ef362526..dcfa8023 100644 --- a/internal/flypg/flypg.go +++ b/internal/flypg/flypg.go @@ -18,7 +18,7 @@ type FlyPGConfig struct { func (c *FlyPGConfig) SetDefaults() { c.internalConfig = ConfigMap{ - "standby_clean_interval": time.Hour * 24, + "deadMemberRemovalThreshold": time.Hour * 24, } }