Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/flycheck/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package flycheck

import (
"context"
"fmt"
"errors"
"io"
"net/http"
"time"
Expand Down Expand Up @@ -93,7 +93,7 @@ func handleCheckResponse(w http.ResponseWriter, suite *check.CheckSuite, raw boo
result = suite.Result()
}
if !suite.Passed() {
handleError(w, fmt.Errorf(result))
handleError(w, errors.New(result))
return
}
io.WriteString(w, result)
Expand Down
51 changes: 50 additions & 1 deletion internal/flycheck/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/superfly/fly-checks/check"
)

// Primary will be made read-only when disk capacity reaches this percentage.
const diskCapacityPercentageThreshold = 90.0

Comment on lines +13 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been meaning to make health-check stuff configurable but that can be out of scope of this PR

// CheckPostgreSQL health, replication, etc
func CheckPostgreSQL(ctx context.Context, checks *check.CheckSuite) (*check.CheckSuite, error) {

node, err := flypg.NewNode()
if err != nil {
return checks, errors.Wrap(err, "failed to initialize node")
Expand All @@ -23,18 +25,65 @@ func CheckPostgreSQL(ctx context.Context, checks *check.CheckSuite) (*check.Chec
return checks, errors.Wrap(err, "failed to connect with local node")
}

repConn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
return checks, fmt.Errorf("failed to connect to repmgr node: %s", err)
}

member, err := node.RepMgr.Member(ctx, repConn)
if err != nil {
return checks, fmt.Errorf("failed to resolve local member role: %s", err)
}

// Cleanup connections
checks.OnCompletion = func() {
localConn.Close(ctx)
repConn.Close(ctx)
}

checks.AddCheck("connections", func() (string, error) {
return connectionCount(ctx, localConn)
})

if member.Role == flypg.PrimaryRoleName && member.Active {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to expose this healthcheck on replicas without setting them to readonly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it might be useful info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a VM check that communicates general capacity that should cover that. It makes me think though that maybe we need a new name for the check.

// Check that provides additional insight into disk capacity and
// how close we are to hitting the readonly threshold.
checks.AddCheck("disk-capacity", func() (string, error) {
return diskCapacityCheck(ctx, localConn, node)
})
}

return checks, nil
}

func diskCapacityCheck(ctx context.Context, localConn *pgx.Conn, node *flypg.Node) (string, error) {
// Calculate current disk usage
size, available, err := diskUsage("/data/")
if err != nil {
return "", fmt.Errorf("failed to calculate disk usage: %s", err)
}

usedPercentage := float64(size-available) / float64(size) * 100

// Turn primary read-only
if usedPercentage > diskCapacityPercentageThreshold {
if err := flypg.SetReadOnly(ctx, node, localConn); err != nil {
return "", fmt.Errorf("failed to turn primary readonly: %s", err)
}

return "", fmt.Errorf("%0.1f%% - readonly mode enabled, extend your volume to re-enable writes", usedPercentage)
}

// Don't attempt to turn read/write if zombie lock exists.
if !flypg.ZombieLockExists() {
if err := flypg.UnsetReadOnly(ctx, node, localConn); err != nil {
return "", fmt.Errorf("failed to turn primary read/write: %s", err)
}
}

return fmt.Sprintf("%0.1f%% - readonly mode will be enabled at %0.1f%%", usedPercentage, diskCapacityPercentageThreshold), nil
}

func connectionCount(ctx context.Context, local *pgx.Conn) (string, error) {
sql := `select used, res_for_super as reserved, max_conn as max from
(select count(*) used from pg_stat_activity) q1,
Expand Down
42 changes: 0 additions & 42 deletions internal/flypg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,45 +330,3 @@ func GetSetting(ctx context.Context, pg *pgx.Conn, setting string) (*PGSetting,
}
return &out, nil
}

func SetReadOnly(ctx context.Context, conn *pgx.Conn) error {
databases, err := ListDatabases(ctx, conn)
if err != nil {
return err
}

for _, db := range databases {
if db.Name == "repmgr" || db.Name == "postgres" {
continue
}

sql := fmt.Sprintf("ALTER DATABASE %s set default_transaction_read_only = true;", db.Name)
_, err := conn.Exec(ctx, sql)
if err != nil {
return err
}
}

return nil
}

func UnsetReadOnly(ctx context.Context, conn *pgx.Conn) error {
databases, err := ListDatabases(ctx, conn)
if err != nil {
return err
}

for _, db := range databases {
if db.Name == "repmgr" || db.Name == "postgres" {
continue
}

sql := fmt.Sprintf("ALTER DATABASE %s set default_transaction_read_only = false;", db.Name)
_, err := conn.Exec(ctx, sql)
if err != nil {
return err
}
}

return nil
}
16 changes: 12 additions & 4 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

fmt.Println("Turning all user-created databases readonly.")
if err := admin.SetReadOnly(ctx, conn); err != nil {
if err := SetReadOnly(ctx, n, conn); err != nil {
return fmt.Errorf("failed to set read-only: %s", err)
}

Expand All @@ -439,7 +439,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

fmt.Println("Turning user-created databases read-only")
if err := admin.SetReadOnly(ctx, conn); err != nil {
if err := SetReadOnly(ctx, n, conn); err != nil {
return fmt.Errorf("failed to set read-only: %s", err)
}

Expand All @@ -457,8 +457,11 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to reconfigure pgbouncer: %s", err)
}

if err := admin.UnsetReadOnly(ctx, conn); err != nil {
return fmt.Errorf("failed to unset read-only")
// Readonly lock is set by healthchecks when disk capacity is dangerously high.
if !ReadOnlyLockExists() {
if err := UnsetReadOnly(ctx, n, conn); err != nil {
return fmt.Errorf("failed to unset read-only: %s", err)
}
}

default:
Expand Down Expand Up @@ -498,6 +501,11 @@ func (n *Node) NewLocalConnection(ctx context.Context, database string) (*pgx.Co
return openConnection(ctx, host, database, n.OperatorCredentials)
}

func (n *Node) NewPrimaryConnection(ctx context.Context, database string) (*pgx.Conn, error) {
host := net.JoinHostPort(n.PrivateIP, strconv.Itoa(n.PGBouncer.Port))
return openConnection(ctx, host, database, n.OperatorCredentials)
}

func (n *Node) initializePG() error {
if n.isPGInitialized() {
return nil
Expand Down
142 changes: 142 additions & 0 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package flypg

import (
"context"
"fmt"
"os"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg/admin"
"github.com/jackc/pgx/v5"
)

const (
readOnlyLockFile = "/data/readonly.lock"
readOnlyEnabled = "on"
readOnlyDisabled = "off"
)

func SetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error {
if err := writeReadOnlyLock(); err != nil {
return fmt.Errorf("failed to set readonly lock: %s", err)
}

databases, err := admin.ListDatabases(ctx, conn)
if err != nil {
return err
}

for _, db := range databases {
// exclude administrative dbs
if db.Name == "repmgr" || db.Name == "postgres" {
continue
}

// Route configuration change through PGBouncer
dbConn, err := n.NewPrimaryConnection(ctx, db.Name)
if err != err {
return fmt.Errorf("failed to establish connection to db %s: %s", db.Name, err)
}
defer dbConn.Close(ctx)

// Set readonly
if _, err = dbConn.Exec(ctx, "SET default_transaction_read_only=true;"); err != nil {
return fmt.Errorf("failed to set readonly on db %s: %s", db.Name, err)
}

// Query configuration value and confirm the value change.
var status string
dbConn.QueryRow(ctx, "SHOW default_transaction_read_only;").Scan(&status)
if err != nil {
return fmt.Errorf("failed to verify readonly was unset: %s", err)
}

if status == readOnlyDisabled {
return fmt.Errorf("failed to turn database '%s' readonly", db.Name)
}
}

return nil
}

func UnsetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error {
// Skip if there's no readonly lock present
if !ReadOnlyLockExists() {
return nil
}

databases, err := admin.ListDatabases(ctx, conn)
if err != nil {
return err
}

for _, db := range databases {
// exclude administrative dbs
if db.Name == "repmgr" || db.Name == "postgres" {
continue
}

// Route configuration change through PGBouncer
dbConn, err := n.NewPrimaryConnection(ctx, db.Name)
if err != err {
return fmt.Errorf("failed to establish connection to db %s: %s", db.Name, err)
}
defer dbConn.Close(ctx)

// Disable readonly
_, err = dbConn.Exec(ctx, "SET default_transaction_read_only=false;")
if err != nil {
return fmt.Errorf("failed to unset readonly on db %s: %s", db.Name, err)
}

// Query configuration value and confirm the value change.
var status string
dbConn.QueryRow(ctx, "SHOW default_transaction_read_only;").Scan(&status)
if err != nil {
return fmt.Errorf("failed to verify readonly was unset: %s", err)
}

if status == readOnlyEnabled {
return fmt.Errorf("failed to turn database '%s' read/write : %s", db.Name, err)
}
}

if err := removeReadOnlyLock(); err != nil {
return fmt.Errorf("failed to remove readonly lock: %s", err)
}

return nil
}

func ReadOnlyLockExists() bool {
_, err := os.Stat(readOnlyLockFile)
if os.IsNotExist(err) {
return false
}

return true
}

func writeReadOnlyLock() error {
if ReadOnlyLockExists() {
return nil
}

if err := os.WriteFile(readOnlyLockFile, []byte(time.Now().String()), 0644); err != nil {
return err
}

return nil
}

func removeReadOnlyLock() error {
if !ReadOnlyLockExists() {
return nil
}

if err := os.Remove(readOnlyLockFile); err != nil {
return err
}

return nil
}