Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 82 additions & 59 deletions pkg/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,15 @@ func (n *Node) Init() error {
}

// Writes or updates the replication manager configuration.
if err := InitializeManager(*n); err != nil {
if err := initializeRepmgr(*n); err != nil {
fmt.Printf("Failed to initialize replmgr: %s\n", err.Error())
}

// We are done here if we are the primary.
if primaryIP == n.PrivateIP {
return nil
}

// If there's no primary then we should initialize ourself as the primary.
if primaryIP == "" {
switch primaryIP {
case n.PrivateIP:
// Noop
case "":
// Initialize ourselves as the primary.
fmt.Println("Initializing postgres")
if err := n.initializePostgres(); err != nil {
return fmt.Errorf("failed to initialize postgres %s", err)
Expand All @@ -132,16 +130,31 @@ func (n *Node) Init() error {
if err := n.setDefaultHBA(); err != nil {
return fmt.Errorf("failed updating pg_hba.conf: %s", err)
}
} else {
// TODO If the Postgresql directory exists, then we know that we have already been initialized.
// If we have already been intialized we are either a current standby, or a demoted
// primary that's coming back online.

// TODO - It may be necessary to flag the Primary node within Consul to let
// us know what we need to actually do here.
fmt.Println("Cloning from primary")
if err := cloneFromPrimary(*n, primaryIP); err != nil {
return fmt.Errorf("failed to clone primary: %s", err)
default:
// If we are here we are either a standby, new node or primary coming back from the dead.
clonePrimary := true
if n.isInitialized() {
// Attempt to resolve our role by querying the primary.
remoteConn, err := n.NewRepRemoteConnection(context.TODO(), primaryIP)
if err != nil {
return fmt.Errorf("failed to resolve my role according to the primary: %s", err)
}
role, err := memberRoleByHostname(context.TODO(), remoteConn, n.PrivateIP)
if err != nil {
return fmt.Errorf("failed to resolve role for %s: %s", primaryIP, err)
}

fmt.Printf("My role is: %s\n", role)
if role == standbyRoleName {
clonePrimary = false
}
}

if clonePrimary {
fmt.Println("Cloning from primary")
if err := cloneFromPrimary(*n, primaryIP); err != nil {
return fmt.Errorf("failed to clone primary: %s", err)
}
}
}

Expand All @@ -158,15 +171,15 @@ func (n *Node) Init() error {
return nil
}

func (n *Node) ValidPrimary() bool {
if n.Region == os.Getenv("PRIMARY_REGION") {
return true
}
return false
}

// PostInit are operations that should be executed against a running Postgres on boot.
func (n *Node) PostInit() error {
// Ensure local PG is up before establishing connection with
// consul.
conn, err := n.NewLocalConnection(context.TODO())
if err != nil {
return fmt.Errorf("failed to establish connection to local node: %s", err)
}

client, err := state.NewConsulClient()
if err != nil {
return fmt.Errorf("failed to establish connection with consul: %s", err)
Expand All @@ -178,17 +191,19 @@ func (n *Node) PostInit() error {
}

switch primaryIP {
case n.PrivateIP:
// Re-register the primary in order to pick up any changes made to the
// configuration file.
fmt.Println("Updating primary record")
if err := registerPrimary(*n); err != nil {
fmt.Printf("failed to register primary: %s", err)
}
case "":
// Check if we can be a primary
if !n.ValidPrimary() {
if !n.validPrimary() {
return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", n.Region, os.Getenv("PRIMARY_REGION"))
}

conn, err := n.NewLocalConnection(context.TODO())
if err != nil {
return err
}

// Initialize ourselves as the primary.
if err := n.createRequiredUsers(conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
Expand All @@ -215,12 +230,9 @@ func (n *Node) PostInit() error {
if err := client.RegisterNode(n.ID, n.PrivateIP); err != nil {
return fmt.Errorf("failed to register member with consul: %s", err)
}
case n.PrivateIP:
// We are an already initialized primary.
default:
// If we are here, we are a new node, a standby or a demoted primary who needs
// to be reconfigured as a standby.

conn, err := n.NewRepLocalConnection(context.TODO())
if err != nil {
return err
Expand All @@ -231,29 +243,17 @@ func (n *Node) PostInit() error {
return err
}

if role == "" {
fmt.Printf("Configuring a new node\n")
} else {
fmt.Printf("Reconfiguring a %s node as healthy\n", role)
}

if role == "primary" {
if role == primaryRoleName {
fmt.Println("Unregistering primary")
if err := unregisterPrimary(*n); err != nil {
fmt.Printf("failed to unregister primary: %s\n", err)
}
}

// TODO - Verify if there are any issues with attempting to re-register
// an already registered standby. I don't think so, but need to verify.
fmt.Println("Registering standby")
if err := registerStandby(*n); err != nil {
fmt.Printf("failed to register standby: %s\n", err)
}

// TODO - Verify if there are any issues with re-following a primary we are
// already following. I don't think so, but need to verify.
fmt.Println("Follow the primary")
if err := standbyFollow(*n); err != nil {
fmt.Printf("failed to register standby: %s\n", err)
}
Expand Down Expand Up @@ -293,6 +293,30 @@ func (n *Node) NewRepLocalConnection(ctx context.Context) (*pgx.Conn, error) {
return openConnection(ctx, host, "repmgr", n.ManagerCredentials)
}

func (n *Node) NewRepRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) {
host := net.JoinHostPort(hostname, strconv.Itoa(n.PGPort))
return openConnection(ctx, host, "repmgr", n.ManagerCredentials)
}

func (n *Node) isInitialized() bool {
_, err := os.Stat(n.DataDir)
if os.IsNotExist(err) {
return false
}
return true
}

func (n *Node) currentRole(ctx context.Context, pg *pgx.Conn) (string, error) {
return memberRole(ctx, pg, int(n.ID))
}

func (n *Node) validPrimary() bool {
if n.Region == os.Getenv("PRIMARY_REGION") {
return true
}
return false
}

func (n *Node) createRequiredUsers(conn *pgx.Conn) error {
curUsers, err := admin.ListUsers(context.TODO(), conn)
if err != nil {
Expand Down Expand Up @@ -330,21 +354,20 @@ func (n *Node) createRequiredUsers(conn *pgx.Conn) error {
}

func (n *Node) initializePostgres() error {
_, err := os.Stat(n.DataDir)
if os.IsNotExist(err) {
if err := ioutil.WriteFile("/data/.default_password", []byte(os.Getenv("OPERATOR_PASSWORD")), 0644); err != nil {
return err
}
cmd := exec.Command("gosu", "postgres", "initdb", "--pgdata", n.DataDir, "--pwfile=/data/.default_password")
_, err := cmd.CombinedOutput()
if err != nil {
return err
}

if n.isInitialized() {
return nil
}

return err
if err := ioutil.WriteFile("/data/.default_password", []byte(os.Getenv("OPERATOR_PASSWORD")), 0644); err != nil {
return err
}
cmd := exec.Command("gosu", "postgres", "initdb", "--pgdata", n.DataDir, "--pwfile=/data/.default_password")
_, err := cmd.CombinedOutput()
if err != nil {
return err
}

return nil
}

func (n *Node) ConfigurePGBouncerAuth() error {
Expand Down
35 changes: 28 additions & 7 deletions pkg/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ package flypg
import (
"context"
"fmt"
"github.com/jackc/pgx/v4"
"os"

"github.com/jackc/pgx/v4"
)

const (
primaryRoleName = "primary"
standbyRoleName = "standby"
unknownRoleName = ""
)

func InitializeManager(node Node) error {
func initializeRepmgr(node Node) error {
// Write conf file.
if err := writeManagerConf(node); err != nil {
return fmt.Errorf("failed to write repmgr config file: %s", err)
Expand All @@ -26,7 +33,7 @@ func InitializeManager(node Node) error {
}

func registerPrimary(node Node) error {
cmdStr := fmt.Sprintf("repmgr -f %s primary register",
cmdStr := fmt.Sprintf("repmgr -f %s primary register -F",
node.ManagerConfigPath,
)
if err := runCommand(cmdStr); err != nil {
Expand Down Expand Up @@ -57,7 +64,8 @@ func standbyFollow(node Node) error {
}

func registerStandby(node Node) error {
cmdStr := fmt.Sprintf("repmgr -f %s standby register", node.ManagerConfigPath)
// Force re-registry to ensure the standby picks up any new configuration changes.
cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", node.ManagerConfigPath)
if err := runCommand(cmdStr); err != nil {
fmt.Printf("failed to register standby: %s", err)
}
Expand Down Expand Up @@ -105,7 +113,7 @@ func writeManagerConf(node Node) error {
"location": node.Region,
}

if !node.ValidPrimary() {
if !node.validPrimary() {
conf["priority"] = "0"
}

Expand Down Expand Up @@ -142,8 +150,21 @@ func writePasswdConf(node Node) error {
return nil
}

func (n *Node) currentRole(ctx context.Context, pg *pgx.Conn) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", n.ID)
func memberRole(ctx context.Context, pg *pgx.Conn, id int) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", id)
var role string
err := pg.QueryRow(ctx, sql).Scan(&role)
if err != nil {
if err == pgx.ErrNoRows {
return "", nil
}
return "", err
}
return role, nil
}

func memberRoleByHostname(ctx context.Context, pg *pgx.Conn, hostname string) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id where n.connInfo LIKE '%%%s%%';", hostname)
var role string
err := pg.QueryRow(ctx, sql).Scan(&role)
if err != nil {
Expand Down