Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .flyctl/cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@ func main() {
return
}

ctx := context.Background()

node, err := flypg.NewNode()
if err != nil {
utils.WriteError(err)
os.Exit(1)
return
}

if err := node.UnregisterMemberByHostname(context.Background(), string(hostnameBytes)); err != nil {
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
utils.WriteError(fmt.Errorf("failed to connect to local db: %s", err))
os.Exit(1)
return
}

if err := node.RepMgr.UnregisterMemberByHostname(ctx, conn, string(hostnameBytes)); err != nil {
utils.WriteError(fmt.Errorf("failed to unregister member: %v", err))
os.Exit(1)
return
Expand Down
17 changes: 17 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ jobs:
tags: |
flyio/postgres-flex:14
flyio/postgres-flex:14.6
-
name: Build and push Postgres 15
id: docker_build_15
uses: docker/build-push-action@v3
with:
build-args: |
PG_VERSION=15.1
VERSION=${{ steps.get-latest-tag.outputs.tag }}
context: .
file: ./Dockerfile
push: true
tags: |
flyio/postgres-flex:15
flyio/postgres-flex:15.1
-
name: Postgres 14 Image digest
run: echo ${{ steps.docker_build_14.outputs.digest }}
-
name: Postgres 15 Image digest
run: echo ${{ steps.docker_build_15.outputs.digest }}
72 changes: 30 additions & 42 deletions cmd/event_handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"strconv"

"github.com/fly-apps/postgres-flex/pkg/flypg"

"github.com/fly-apps/postgres-flex/pkg/flypg/state"
)

func main() {
Expand All @@ -29,56 +27,46 @@ func main() {
switch *event {
case "repmgrd_failover_promote", "standby_promote":
// TODO - Need to figure out what to do when success == 0.

cs, err := state.NewClusterState()
if err != nil {
fmt.Printf("failed initialize cluster state store. %v", err)
}

member, err := cs.FindMemberByID(int32(*nodeID))
if err != nil {
fmt.Printf("failed to find member %v: %s", *nodeID, err)
}

if err := cs.AssignPrimary(member.ID); err != nil {
fmt.Printf("failed to register primary with consul: %s", err)
if err := reconfigurePGBouncer(*nodeID); err != nil {
fmt.Println(err.Error())
return
}

flypgNode, err := flypg.NewNode()
if err != nil {
fmt.Printf("failed to reference node: %s\n", err)
}

fmt.Println("Reconfiguring pgbouncer primary")
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
}
case "standby_follow":
cs, err := state.NewClusterState()
if err != nil {
fmt.Printf("failed initialize cluster state store. %v", err)
}

newMemberID, err := strconv.Atoi(*newPrimary)
if err != nil {
fmt.Printf("failed to parse new member id: %s", err)
}

member, err := cs.FindMemberByID(int32(newMemberID))
if err != nil {
fmt.Printf("failed to find member in consul: %s", err)
}

flypgNode, err := flypg.NewNode()
if err != nil {
fmt.Printf("failed to reference member: %s\n", err)
}

fmt.Println("Reconfiguring pgbouncer primary")
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
if err := reconfigurePGBouncer(newMemberID); err != nil {
fmt.Println(err.Error())
return
}
default:
// noop
}
}

func reconfigurePGBouncer(id int) error {
node, err := flypg.NewNode()
if err != nil {
return fmt.Errorf("failed to reference node: %s", err)
}

conn, err := node.RepMgr.NewLocalConnection(context.TODO())
if err != nil {
return fmt.Errorf("failed to establish connection with local pg: %s", err)
}

member, err := node.RepMgr.MemberByID(context.TODO(), conn, id)
if err != nil {
return err
}

fmt.Println("Reconfiguring pgbouncer primary")
if err := node.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err)
}

return nil
}
4 changes: 0 additions & 4 deletions cmd/failover_validation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ func main() {
totalNodes := flag.Int("total-nodes", 0, "The total number of nodes registered")
flag.Parse()

// TODO - This will ultimately remove HA from a 2-node cluster setup.
// This will be the case until we come up with a strategy to differentiate
// between a down node and a network partition.

if *visibleNodes == 0 || *visibleNodes < (*totalNodes/2+1) {
fmt.Printf("Unable to perform failover as quorum can not be met. Total nodes: %d, Visible nodes: %d\n", *totalNodes, *visibleNodes)
os.Exit(1)
Expand Down
24 changes: 12 additions & 12 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,47 +72,47 @@ func handleTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time,
}
defer conn.Close(ctx)

role, err := node.RepMgr.CurrentRole(ctx, conn)
member, err := node.RepMgr.MemberByID(ctx, conn, int(node.RepMgr.ID))
if err != nil {
return fmt.Errorf("failed to check role: %s", err)
return err
}

if role != flypg.PrimaryRoleName {
if member.Role != flypg.PrimaryRoleName {
return nil
}

standbys, err := node.RepMgr.Standbys(ctx, conn)
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
if err != nil {
return fmt.Errorf("failed to query standbys: %s", err)
}

for _, standby := range standbys {
// Wrap this in a function so connections are properly closed.
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Ip)
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
if err := node.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
fmt.Printf("failed to unregister member %d: %v", standby.Id, err)
if time.Now().Sub(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil {
fmt.Printf("failed to unregister member %s: %v", standby.Hostname, err)
continue
}

delete(seenAt, standby.Id)
delete(seenAt, standby.ID)
}

continue
}
defer sConn.Close(ctx)

seenAt[standby.Id] = time.Now()
seenAt[standby.ID] = time.Now()
}

removeOrphanedReplicationSlots(ctx, conn, standbys)

return nil
}

func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {
func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Member) {
var orphanedSlots []admin.ReplicationSlot

slots, err := admin.ListReplicationSlots(ctx, conn)
Expand All @@ -125,7 +125,7 @@ func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standby
for _, slot := range slots {
matchFound := false
for _, standby := range standbys {
if slot.MemberID == int32(standby.Id) {
if slot.MemberID == int32(standby.ID) {
matchFound = true
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ func handleRole(w http.ResponseWriter, r *http.Request) {
return
}

role, err := node.RepMgr.CurrentRole(r.Context(), conn)
member, err := node.RepMgr.Member(r.Context(), conn)
if err != nil {
renderErr(w, err)
return
}

var alteredRole string
if role == flypg.PrimaryRoleName {

switch member.Role {
case flypg.PrimaryRoleName:
alteredRole = "primary"
} else if role == flypg.StandbyRoleName {
case flypg.StandbyRoleName:
alteredRole = "replica"
} else {
default:
alteredRole = "unknown"
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/flycheck/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ func PostgreSQLRole(ctx context.Context, checks *check.CheckSuite) (*check.Check
}

checks.AddCheck("role", func() (string, error) {
role, err := node.RepMgr.CurrentRole(ctx, conn)
member, err := node.RepMgr.Member(ctx, conn)
if err != nil {
return "failed", errors.Wrap(err, "failed to check role")
}

if role == flypg.PrimaryRoleName {
switch member.Role {
case flypg.PrimaryRoleName:
return "primary", nil
} else if role == flypg.StandbyRoleName {
case flypg.StandbyRoleName:
return "replica", nil
} else {
default:
return "unknown", nil
}
})
Expand Down
Loading