Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 42 additions & 35 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ func main() {
os.Exit(1)
}

// TODO - We should connect using the flypgadmin user so we can differentiate between
// internal admin connection usage and the actual repmgr process.
conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}
defer conn.Close(ctx)

internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
if err != nil {
fmt.Printf("failed to open config: %s\n", err)
Expand Down Expand Up @@ -64,45 +55,61 @@ func main() {
for {
select {
case <-ticker.C:
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
if err != nil {
fmt.Printf("Failed to check role: %s\n", err)
continue
if err := handleTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold); err != nil {
fmt.Println(err)
}
}
}
}

if role != flypg.PrimaryRoleName {
continue
}
func handleTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error {
// TODO - We should connect using the flypgadmin user so we can differentiate between
// internal admin connection usage and the actual repmgr process.
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}
defer conn.Close(ctx)

standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
if err != nil {
fmt.Printf("Failed to query standbys: %s\n", err)
continue
}
role, err := node.RepMgr.CurrentRole(ctx, conn)
if err != nil {
return fmt.Errorf("failed to check role: %s", err)
}

for _, standby := range standbys {
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
defer newConn.Close(ctx)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
continue
}
if role != flypg.PrimaryRoleName {
return nil
}

delete(seenAt, standby.Id)
}
standbys, err := node.RepMgr.Standbys(ctx, conn)
if err != nil {
return fmt.Errorf("failed to query standbys: %s", err)
}

for _, standby := range standbys {
// Wrap this in a function so connections are properly closed.
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Ip)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
if err := node.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
fmt.Printf("failed to unregister member %d: %v", standby.Id, err)
continue
}

seenAt[standby.Id] = time.Now()
delete(seenAt, standby.Id)
}

removeOrphanedReplicationSlots(ctx, conn, standbys)
continue
}
defer sConn.Close(ctx)

seenAt[standby.Id] = time.Now()
}

removeOrphanedReplicationSlots(ctx, conn, standbys)

return nil
}

func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {
Expand Down