diff --git a/pkg/flypg/node.go b/pkg/flypg/node.go index 88677422..2a5052d7 100644 --- a/pkg/flypg/node.go +++ b/pkg/flypg/node.go @@ -112,17 +112,15 @@ func (n *Node) Init() error { } // Writes or updates the replication manager configuration. - if err := InitializeManager(*n); err != nil { + if err := initializeRepmgr(*n); err != nil { fmt.Printf("Failed to initialize replmgr: %s\n", err.Error()) } - // We are done here if we are the primary. - if primaryIP == n.PrivateIP { - return nil - } - - // If there's no primary then we should initialize ourself as the primary. - if primaryIP == "" { + switch primaryIP { + case n.PrivateIP: + // Noop + case "": + // Initialize ourselves as the primary. fmt.Println("Initializing postgres") if err := n.initializePostgres(); err != nil { return fmt.Errorf("failed to initialize postgres %s", err) @@ -132,16 +130,31 @@ func (n *Node) Init() error { if err := n.setDefaultHBA(); err != nil { return fmt.Errorf("failed updating pg_hba.conf: %s", err) } - } else { - // TODO If the Postgresql directory exists, then we know that we have already been initialized. - // If we have already been intialized we are either a current standby, or a demoted - // primary that's coming back online. - - // TODO - It may be necessary to flag the Primary node within Consul to let - // us know what we need to actually do here. - fmt.Println("Cloning from primary") - if err := cloneFromPrimary(*n, primaryIP); err != nil { - return fmt.Errorf("failed to clone primary: %s", err) + default: + // If we are here we are either a standby, new node or primary coming back from the dead. + clonePrimary := true + if n.isInitialized() { + // Attempt to resolve our role by querying the primary. + remoteConn, err := n.NewRepRemoteConnection(context.TODO(), primaryIP) + if err != nil { + return fmt.Errorf("failed to resolve my role according to the primary: %s", err) + } + role, err := memberRoleByHostname(context.TODO(), remoteConn, n.PrivateIP) + if err != nil { + return fmt.Errorf("failed to resolve role for %s: %s", primaryIP, err) + } + + fmt.Printf("My role is: %s\n", role) + if role == standbyRoleName { + clonePrimary = false + } + } + + if clonePrimary { + fmt.Println("Cloning from primary") + if err := cloneFromPrimary(*n, primaryIP); err != nil { + return fmt.Errorf("failed to clone primary: %s", err) + } } } @@ -158,15 +171,15 @@ func (n *Node) Init() error { return nil } -func (n *Node) ValidPrimary() bool { - if n.Region == os.Getenv("PRIMARY_REGION") { - return true - } - return false -} - // PostInit are operations that should be executed against a running Postgres on boot. func (n *Node) PostInit() error { + // Ensure local PG is up before establishing connection with + // consul. + conn, err := n.NewLocalConnection(context.TODO()) + if err != nil { + return fmt.Errorf("failed to establish connection to local node: %s", err) + } + client, err := state.NewConsulClient() if err != nil { return fmt.Errorf("failed to establish connection with consul: %s", err) @@ -178,17 +191,19 @@ func (n *Node) PostInit() error { } switch primaryIP { + case n.PrivateIP: + // Re-register the primary in order to pick up any changes made to the + // configuration file. + fmt.Println("Updating primary record") + if err := registerPrimary(*n); err != nil { + fmt.Printf("failed to register primary: %s", err) + } case "": // Check if we can be a primary - if !n.ValidPrimary() { + if !n.validPrimary() { return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", n.Region, os.Getenv("PRIMARY_REGION")) } - conn, err := n.NewLocalConnection(context.TODO()) - if err != nil { - return err - } - // Initialize ourselves as the primary. if err := n.createRequiredUsers(conn); err != nil { return fmt.Errorf("failed to create required users: %s", err) @@ -215,12 +230,9 @@ func (n *Node) PostInit() error { if err := client.RegisterNode(n.ID, n.PrivateIP); err != nil { return fmt.Errorf("failed to register member with consul: %s", err) } - case n.PrivateIP: - // We are an already initialized primary. default: // If we are here, we are a new node, a standby or a demoted primary who needs // to be reconfigured as a standby. - conn, err := n.NewRepLocalConnection(context.TODO()) if err != nil { return err @@ -231,29 +243,17 @@ func (n *Node) PostInit() error { return err } - if role == "" { - fmt.Printf("Configuring a new node\n") - } else { - fmt.Printf("Reconfiguring a %s node as healthy\n", role) - } - - if role == "primary" { + if role == primaryRoleName { fmt.Println("Unregistering primary") if err := unregisterPrimary(*n); err != nil { fmt.Printf("failed to unregister primary: %s\n", err) } } - // TODO - Verify if there are any issues with attempting to re-register - // an already registered standby. I don't think so, but need to verify. - fmt.Println("Registering standby") if err := registerStandby(*n); err != nil { fmt.Printf("failed to register standby: %s\n", err) } - // TODO - Verify if there are any issues with re-following a primary we are - // already following. I don't think so, but need to verify. - fmt.Println("Follow the primary") if err := standbyFollow(*n); err != nil { fmt.Printf("failed to register standby: %s\n", err) } @@ -293,6 +293,30 @@ func (n *Node) NewRepLocalConnection(ctx context.Context) (*pgx.Conn, error) { return openConnection(ctx, host, "repmgr", n.ManagerCredentials) } +func (n *Node) NewRepRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) { + host := net.JoinHostPort(hostname, strconv.Itoa(n.PGPort)) + return openConnection(ctx, host, "repmgr", n.ManagerCredentials) +} + +func (n *Node) isInitialized() bool { + _, err := os.Stat(n.DataDir) + if os.IsNotExist(err) { + return false + } + return true +} + +func (n *Node) currentRole(ctx context.Context, pg *pgx.Conn) (string, error) { + return memberRole(ctx, pg, int(n.ID)) +} + +func (n *Node) validPrimary() bool { + if n.Region == os.Getenv("PRIMARY_REGION") { + return true + } + return false +} + func (n *Node) createRequiredUsers(conn *pgx.Conn) error { curUsers, err := admin.ListUsers(context.TODO(), conn) if err != nil { @@ -330,21 +354,20 @@ func (n *Node) createRequiredUsers(conn *pgx.Conn) error { } func (n *Node) initializePostgres() error { - _, err := os.Stat(n.DataDir) - if os.IsNotExist(err) { - if err := ioutil.WriteFile("/data/.default_password", []byte(os.Getenv("OPERATOR_PASSWORD")), 0644); err != nil { - return err - } - cmd := exec.Command("gosu", "postgres", "initdb", "--pgdata", n.DataDir, "--pwfile=/data/.default_password") - _, err := cmd.CombinedOutput() - if err != nil { - return err - } - + if n.isInitialized() { return nil } - return err + if err := ioutil.WriteFile("/data/.default_password", []byte(os.Getenv("OPERATOR_PASSWORD")), 0644); err != nil { + return err + } + cmd := exec.Command("gosu", "postgres", "initdb", "--pgdata", n.DataDir, "--pwfile=/data/.default_password") + _, err := cmd.CombinedOutput() + if err != nil { + return err + } + + return nil } func (n *Node) ConfigurePGBouncerAuth() error { diff --git a/pkg/flypg/repmgr.go b/pkg/flypg/repmgr.go index 362cdf4a..3578a8e7 100644 --- a/pkg/flypg/repmgr.go +++ b/pkg/flypg/repmgr.go @@ -3,11 +3,18 @@ package flypg import ( "context" "fmt" - "github.com/jackc/pgx/v4" "os" + + "github.com/jackc/pgx/v4" +) + +const ( + primaryRoleName = "primary" + standbyRoleName = "standby" + unknownRoleName = "" ) -func InitializeManager(node Node) error { +func initializeRepmgr(node Node) error { // Write conf file. if err := writeManagerConf(node); err != nil { return fmt.Errorf("failed to write repmgr config file: %s", err) @@ -26,7 +33,7 @@ func InitializeManager(node Node) error { } func registerPrimary(node Node) error { - cmdStr := fmt.Sprintf("repmgr -f %s primary register", + cmdStr := fmt.Sprintf("repmgr -f %s primary register -F", node.ManagerConfigPath, ) if err := runCommand(cmdStr); err != nil { @@ -57,7 +64,8 @@ func standbyFollow(node Node) error { } func registerStandby(node Node) error { - cmdStr := fmt.Sprintf("repmgr -f %s standby register", node.ManagerConfigPath) + // Force re-registry to ensure the standby picks up any new configuration changes. + cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", node.ManagerConfigPath) if err := runCommand(cmdStr); err != nil { fmt.Printf("failed to register standby: %s", err) } @@ -105,7 +113,7 @@ func writeManagerConf(node Node) error { "location": node.Region, } - if !node.ValidPrimary() { + if !node.validPrimary() { conf["priority"] = "0" } @@ -142,8 +150,21 @@ func writePasswdConf(node Node) error { return nil } -func (n *Node) currentRole(ctx context.Context, pg *pgx.Conn) (string, error) { - sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", n.ID) +func memberRole(ctx context.Context, pg *pgx.Conn, id int) (string, error) { + sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", id) + var role string + err := pg.QueryRow(ctx, sql).Scan(&role) + if err != nil { + if err == pgx.ErrNoRows { + return "", nil + } + return "", err + } + return role, nil +} + +func memberRoleByHostname(ctx context.Context, pg *pgx.Conn, hostname string) (string, error) { + sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id where n.connInfo LIKE '%%%s%%';", hostname) var role string err := pg.QueryRow(ctx, sql).Scan(&role) if err != nil {