Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .deepsource.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version = 1

[[analyzers]]
name = "shell"

[[analyzers]]
name = "go"

[analyzers.meta]
import_root = "github.com/fly-apps/postgres-flex"
6 changes: 1 addition & 5 deletions cmd/admin_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ package main

import (
"github.com/fly-apps/postgres-flex/internal/api"
"github.com/fly-apps/postgres-flex/internal/flypg"
)

func main() {
node, err := flypg.NewNode()
if err != nil {
if err := api.StartHttpServer(); err != nil {
panic(err)
}

api.StartHttpServer(node)
}
37 changes: 19 additions & 18 deletions cmd/event_handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@ import (
const eventLogFile = "/data/event.log"

func main() {
ctx := context.Background()

if err := processEvent(ctx); err != nil {
log.Println(err)
os.Exit(1)
}
}

func processEvent(ctx context.Context) error {
event := flag.String("event", "", "event type")
nodeID := flag.Int("node-id", 0, "the node id")
success := flag.String("success", "", "success (1) failure (0)")
details := flag.String("details", "", "details")
flag.Parse()

ctx := context.Background()

logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
fmt.Printf("failed to open event log: %s", err)
return fmt.Errorf("failed to open event log: %s", err)
}
defer logFile.Close()

Expand All @@ -34,46 +41,40 @@ func main() {

node, err := flypg.NewNode()
if err != nil {
log.Printf("failed to initialize node: %s", err)
os.Exit(1)
return fmt.Errorf("failed to initialize node: %s", err)
}

switch *event {
case "child_node_disconnect", "child_node_reconnect", "child_node_new_connect":
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
log.Printf("failed to open local connection: %s", err)
os.Exit(1)
return fmt.Errorf("failed to open local connection: %s", err)
}
defer conn.Close(ctx)

member, err := node.RepMgr.Member(ctx, conn)
if err != nil {
log.Printf("failed to resolve member: %s", err)
os.Exit(1)
return fmt.Errorf("failed to resolve member: %s", err)
}

if member.Role != flypg.PrimaryRoleName {
// We should never get here.
log.Println("skipping since we are not the primary")
os.Exit(0)
return nil
}

if err := evaluateClusterState(ctx, conn, node); err != nil {
log.Printf("failed to evaluate cluster state: %s", err)
os.Exit(0)
return fmt.Errorf("failed to evaluate cluster state: %s", err)
}

os.Exit(0)
default:
// noop
}

return nil
}

func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
primary, err := flypg.PerformScreening(ctx, conn, node)
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
if err := flypg.Quarantine(ctx, node, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}
return fmt.Errorf("primary has been quarantined: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/monitor/monitor_cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func clusterStateMonitorTick(ctx context.Context, node *flypg.Node) error {

primary, err := flypg.PerformScreening(ctx, conn, node)
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
if err := flypg.Quarantine(ctx, node, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}
return fmt.Errorf("primary has been quarantined: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
// TODO - Verify the exception that's getting thrown.
if time.Since(seenAt[standby.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", standby.Hostname)
if err := node.RepMgr.UnregisterMember(ctx, standby); err != nil {
if err := node.RepMgr.UnregisterMember(standby); err != nil {
log.Printf("failed to unregister member %s: %v", standby.Hostname, err)
continue
}
Expand Down
37 changes: 18 additions & 19 deletions cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,42 @@ import (
)

func main() {
ctx := context.Background()

if err := processUnregistration(ctx); err != nil {
utils.WriteError(err)
os.Exit(1)
}

utils.WriteOutput("Member has been succesfully unregistered", "")
}

func processUnregistration(ctx context.Context) error {
encodedArg := os.Args[1]
hostnameBytes, err := base64.StdEncoding.DecodeString(encodedArg)
if err != nil {
utils.WriteError(fmt.Errorf("failed to decode hostname: %v", err))
os.Exit(1)
return
return fmt.Errorf("failed to decode hostname: %v", err)
}

ctx := context.Background()

node, err := flypg.NewNode()
if err != nil {
utils.WriteError(err)
os.Exit(1)
return
return fmt.Errorf("faied to initialize node: %s", err)
}

conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
utils.WriteError(fmt.Errorf("failed to connect to local db: %s", err))
os.Exit(1)
return
return fmt.Errorf("failed to connect to local db: %s", err)
}
defer conn.Close(ctx)

member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
if err != nil {
utils.WriteError(fmt.Errorf("failed to resolve member: %s", err))
os.Exit(1)
return
return fmt.Errorf("failed to resolve member: %s", err)
}

if err := node.RepMgr.UnregisterMember(ctx, *member); err != nil {
utils.WriteError(fmt.Errorf("failed to unregister member: %v", err))
os.Exit(1)
return
if err := node.RepMgr.UnregisterMember(*member); err != nil {
return fmt.Errorf("failed to unregister member: %v", err)
}

utils.WriteOutput("Member has been succesfully unregistered", "")
return nil
}
65 changes: 44 additions & 21 deletions internal/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"golang.org/x/exp/slices"
)

func handleReadonlyState(w http.ResponseWriter, r *http.Request) {
func handleReadonlyState(w http.ResponseWriter, _ *http.Request) {
res := &Response{
Result: false,
}
Expand All @@ -24,7 +24,7 @@ func handleReadonlyState(w http.ResponseWriter, r *http.Request) {
renderJSON(w, res, http.StatusOK)
}

func handleHaproxyRestart(w http.ResponseWriter, r *http.Request) {
func handleHaproxyRestart(w http.ResponseWriter, _ *http.Request) {
if err := flypg.RestartHaproxy(); err != nil {
renderErr(w, err)
return
Expand Down Expand Up @@ -78,18 +78,18 @@ func handleDisableReadonly(w http.ResponseWriter, r *http.Request) {
}

func handleRole(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context(), "postgres")
node, err := flypg.NewNode()
if err != nil {
renderErr(w, err)
return
}
defer close()

node, err := flypg.NewNode()
conn, err := localConnection(r.Context(), "postgres")
if err != nil {
renderErr(w, err)
return
}
defer conn.Close(r.Context())

member, err := node.RepMgr.Member(r.Context(), conn)
if err != nil {
Expand Down Expand Up @@ -118,21 +118,27 @@ type SettingsUpdate struct {
RestartRequired bool `json:"restart_required"`
}

func (s *Server) handleUpdatePostgresSettings(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context(), "postgres")
func handleUpdatePostgresSettings(w http.ResponseWriter, r *http.Request) {
node, err := flypg.NewNode()
if err != nil {
renderErr(w, err)
return
}
defer close()

conn, err := localConnection(r.Context(), "postgres")
if err != nil {
renderErr(w, err)
return
}
defer conn.Close(r.Context())

consul, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
}

user, err := flypg.ReadFromFile(s.node.PGConfig.UserConfigFile())
user, err := flypg.ReadFromFile(node.PGConfig.UserConfigFile())
if err != nil {
renderErr(w, err)
return
Expand All @@ -158,7 +164,7 @@ func (s *Server) handleUpdatePostgresSettings(w http.ResponseWriter, r *http.Req
user[k] = v
}

s.node.PGConfig.SetUserConfig(user)
node.PGConfig.SetUserConfig(user)

var requiresRestart []string

Expand All @@ -185,7 +191,7 @@ func (s *Server) handleUpdatePostgresSettings(w http.ResponseWriter, r *http.Req
}}
}

err = flypg.PushUserConfig(s.node.PGConfig, consul)
err = flypg.PushUserConfig(node.PGConfig, consul)
if err != nil {
renderErr(w, err)
return
Expand All @@ -194,21 +200,27 @@ func (s *Server) handleUpdatePostgresSettings(w http.ResponseWriter, r *http.Req
renderJSON(w, res, http.StatusOK)
}

func (s *Server) handleApplyConfig(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context(), "postgres")
func handleApplyConfig(w http.ResponseWriter, r *http.Request) {
node, err := flypg.NewNode()
if err != nil {
renderErr(w, err)
return
}
defer close()

conn, err := localConnection(r.Context(), "postgres")
if err != nil {
renderErr(w, err)
return
}
defer conn.Close(r.Context())

consul, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
}

err = flypg.SyncUserConfig(s.node.PGConfig, consul)
err = flypg.SyncUserConfig(node.PGConfig, consul)
if err != nil {
renderErr(w, err)
return
Expand All @@ -225,16 +237,21 @@ type PGSettingsResponse struct {
Settings []admin.PGSetting `json:"settings"`
}

func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context(), "postgres")
func handleViewPostgresSettings(w http.ResponseWriter, r *http.Request) {
node, err := flypg.NewNode()
if err != nil {
renderErr(w, err)
return
}

defer close()
conn, err := localConnection(r.Context(), "postgres")
if err != nil {
renderErr(w, err)
return
}
defer conn.Close(r.Context())

all, err := s.node.PGConfig.CurrentConfig()
all, err := node.PGConfig.CurrentConfig()
if err != nil {
renderErr(w, err)
return
Expand Down Expand Up @@ -264,8 +281,14 @@ func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Reque
renderJSON(w, resp, http.StatusOK)
}

func (s *Server) handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) {
all, err := s.node.RepMgr.CurrentConfig()
func handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) {
node, err := flypg.NewNode()
if err != nil {
renderErr(w, err)
return
}

all, err := node.RepMgr.CurrentConfig()
if err != nil {
renderErr(w, err)
return
Expand Down
Loading