Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,27 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
return nil
}

standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
votingMembers, err := node.RepMgr.VotingMembers(ctx, conn)
if err != nil {
return fmt.Errorf("failed to query standbys: %s", err)
}

for _, standby := range standbys {
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
for _, voter := range votingMembers {
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", standby.Hostname)
if err := node.RepMgr.UnregisterMember(standby); err != nil {
log.Printf("failed to unregister member %s: %v", standby.Hostname, err)
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", voter.Hostname)
if err := node.RepMgr.UnregisterMember(voter); err != nil {
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
continue
}
delete(seenAt, standby.ID)
delete(seenAt, voter.ID)
}

continue
}
seenAt[standby.ID] = time.Now()
seenAt[voter.ID] = time.Now()

if err := sConn.Close(ctx); err != nil {
return fmt.Errorf("failed to close connection: %s", err)
Expand Down
2 changes: 2 additions & 0 deletions internal/flycheck/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func PostgreSQLRole(ctx context.Context, checks *check.CheckSuite) (*check.Check

case flypg.StandbyRoleName:
return "replica", nil
case flypg.WitnessRoleName:
return "witness", nil
default:
return "unknown", nil
}
Expand Down
60 changes: 51 additions & 9 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Node struct {
PrimaryRegion string
DataDir string
Port int
Witness bool

SUCredentials admin.Credential
OperatorCredentials admin.Credential
Expand Down Expand Up @@ -61,6 +62,12 @@ func NewNode() (*Node, error) {
node.Port = port
}

if os.Getenv("WITNESS") != "" {
node.Witness = true
} else {
node.Witness = false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely unimportant nit: if just checking for existence os.LookupEnv


// Internal user
node.SUCredentials = admin.Credential{
Username: "flypgadmin",
Expand Down Expand Up @@ -122,7 +129,6 @@ func (n *Node) Init(ctx context.Context) error {

// Check to see if we were just restored
if os.Getenv("FLY_RESTORED_FROM") != "" {
// Check to see if there's an active restore.
active, err := isRestoreActive()
if err != nil {
return fmt.Errorf("failed to verify active restore: %s", err)
Expand Down Expand Up @@ -163,9 +169,13 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to verify cluster state %s", err)
}

if !clusterInitialized {
log.Println("Provisioning primary")
// TODO - This should probably run on boot in case the password changes.
if !clusterInitialized || n.Witness {
if n.Witness {
log.Println("Provisioning witness")
} else {
log.Println("Provisioning primary")
}

if err := n.PGConfig.writePasswordFile(n.OperatorCredentials.Password); err != nil {
return fmt.Errorf("failed to write pg password file: %s", err)
}
Expand Down Expand Up @@ -236,13 +246,13 @@ func (n *Node) PostInit(ctx context.Context) error {
}

if registered {
// Existing member
repConn, err := n.RepMgr.NewLocalConnection(ctx)
if err != nil {
return fmt.Errorf("failed to establish connection to local repmgr: %s", err)
}
defer func() { _ = repConn.Close(ctx) }()

// Existing member
member, err := n.RepMgr.Member(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve member role: %s", err)
Expand Down Expand Up @@ -289,6 +299,16 @@ func (n *Node) PostInit(ctx context.Context) error {
if err := n.RepMgr.registerStandby(); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
}
case WitnessRoleName:
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating witness: %s", err)
}

// Register existing witness to take-on any configuration changes.
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
return fmt.Errorf("failed to register existing witness: %s", err)
}
default:
return fmt.Errorf("member has unknown role: %q", member.Role)
}
Expand Down Expand Up @@ -350,10 +370,32 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to issue registration certificate: %s", err)
}
} else {
// Configure as standby
log.Println("Registering standby")
if err := n.RepMgr.registerStandby(); err != nil {
return fmt.Errorf("failed to register new standby: %s", err)
if n.Witness {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity have you tested switching a nodes role between member and witness? Does that break things?

Copy link
Contributor Author

@davissp14 davissp14 Mar 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't, but It would break that node at the very least.

log.Println("Registering witness")

// Create required users
if err := n.setupCredentials(ctx, conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
}

// Setup repmgr database and extension
if err := n.RepMgr.enable(ctx, conn); err != nil {
return fmt.Errorf("failed to enable repmgr: %s", err)
}

primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
if err != nil {
return fmt.Errorf("failed to resolve primary member: %s", err)
}

if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
return fmt.Errorf("failed to register witness: %s", err)
}
} else {
log.Println("Registering standby")
if err := n.RepMgr.registerStandby(); err != nil {
return fmt.Errorf("failed to register new standby: %s", err)
Comment on lines +380 to +399
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking for a while we should set up a better logging system (obviously doesn't have to happen now) so we can annotate logs with the codepath they are hitting. Like instead of just loging:

failed to register new standby: %s

we could do something like

[witness-registration] failed to register new standby: %s

since that error can possibly happen in a few different places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zap and other logging frameworks provide codepath out of the box, but i'm not sure how useful it would be for the end-user. I think the existing error hierarchy gets us pretty close though, or at least I haven't run into an issue yet matching an error to a specific condition.

}
}

// Let the boot process know that we've already been configured.
Expand Down
1 change: 1 addition & 0 deletions internal/flypg/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func isRegistered(ctx context.Context, conn *pgx.Conn, n *Node) (bool, error) {
if errors.Is(err, pgx.ErrNoRows) {
return false, nil
}

return false, fmt.Errorf("failed to resolve member role: %s", err)
}

Expand Down
76 changes: 45 additions & 31 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
const (
PrimaryRoleName = "primary"
StandbyRoleName = "standby"
WitnessRoleName = "witness"
UnknownRoleName = ""

repmgrConsulKey = "repmgr"
Expand Down Expand Up @@ -216,7 +217,21 @@ func (r *RepMgr) resolveNodeID() (string, error) {
}

func (r *RepMgr) registerPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s primary register -F -v", r.ConfigPath)
cmdStr := fmt.Sprintf("repmgr primary register -f %s -F", r.ConfigPath)
_, err := utils.RunCommand(cmdStr, "postgres")

return err
}

func (r *RepMgr) registerStandby() error {
cmdStr := fmt.Sprintf("repmgr standby register -f %s -F", r.ConfigPath)
_, err := utils.RunCommand(cmdStr, "postgres")

return err
}

func (r *RepMgr) registerWitness(primaryHostname string) error {
cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname)
_, err := utils.RunCommand(cmdStr, "postgres")

return err
Expand All @@ -229,6 +244,20 @@ func (r *RepMgr) unregisterPrimary(id int) error {
return err
}

func (r *RepMgr) unregisterStandby(id int) error {
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
_, err := utils.RunCommand(cmdStr, "postgres")

return err
}

func (r *RepMgr) unregisterWitness(id int) error {
cmdStr := fmt.Sprintf("repmgr witness unregister -f %s --node-id=%d", r.ConfigPath, id)
_, err := utils.RunCommand(cmdStr, "postgres")

return err
}

func (r *RepMgr) rejoinCluster(hostname string) error {
cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait",
r.ConfigPath,
Expand All @@ -244,25 +273,6 @@ func (r *RepMgr) rejoinCluster(hostname string) error {
return err
}

func (r *RepMgr) registerStandby() error {
// Force re-registry to ensure the standby picks up any new configuration changes.
cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", r.ConfigPath)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
return err
}

return nil
}

func (r *RepMgr) unregisterStandby(id int) error {
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
return err
}

return nil
}

func (r *RepMgr) clonePrimary(ipStr string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
Expand Down Expand Up @@ -339,20 +349,20 @@ func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error)
return &member, nil
}

func (r *RepMgr) StandbyMembers(ctx context.Context, conn *pgx.Conn) ([]Member, error) {
func (r *RepMgr) VotingMembers(ctx context.Context, conn *pgx.Conn) ([]Member, error) {
members, err := r.Members(ctx, conn)
if err != nil {
return nil, err
}

var standbys []Member
var voters []Member
for _, member := range members {
if member.Role == StandbyRoleName {
standbys = append(standbys, member)
if member.Role == StandbyRoleName || member.Role == WitnessRoleName {
voters = append(voters, member)
}
}

return standbys, nil
return voters, nil
}

func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, error) {
Expand Down Expand Up @@ -441,15 +451,19 @@ func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error
}

func (r *RepMgr) UnregisterMember(member Member) error {
if member.Role == PrimaryRoleName {
switch member.Role {
case PrimaryRoleName:
if err := r.unregisterPrimary(member.ID); err != nil {
return fmt.Errorf("failed to unregister member %d: %s", member.ID, err)
}
return nil
}

if err := r.unregisterStandby(member.ID); err != nil {
return fmt.Errorf("failed to unregister member %d: %s", member.ID, err)
case StandbyRoleName:
if err := r.unregisterStandby(member.ID); err != nil {
return fmt.Errorf("failed to unregister standby %d: %s", member.ID, err)
}
case WitnessRoleName:
if err := r.unregisterWitness(member.ID); err != nil {
return fmt.Errorf("failed to unregister witness %d: %s", member.ID, err)
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/flypg/zombie.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func ReadZombieLock() (string, error) {
}

func PerformScreening(ctx context.Context, conn *pgx.Conn, n *Node) (string, error) {
standbys, err := n.RepMgr.StandbyMembers(ctx, conn)
members, err := n.RepMgr.VotingMembers(ctx, conn)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return "", fmt.Errorf("failed to query standbys")
}
}

sample, err := TakeDNASample(ctx, n, standbys)
sample, err := TakeDNASample(ctx, n, members)
if err != nil {
return "", fmt.Errorf("failed to evaluate cluster data: %s", err)
}
Expand Down