diff --git a/cmd/start/main.go b/cmd/start/main.go index 98016f79..46bfb40d 100644 --- a/cmd/start/main.go +++ b/cmd/start/main.go @@ -65,7 +65,7 @@ func main() { svisor.AddProcess("exporter", "postgres_exporter", supervisor.WithEnv(exporterEnv), supervisor.WithRestart(0, 1*time.Second)) svisor.StopOnSignal(syscall.SIGINT, syscall.SIGTERM) - svisor.StartHttpListener() + svisor.StartHttpListener(node) if err := svisor.Run(); err != nil { fmt.Println(err) diff --git a/go.mod b/go.mod index 6be77fbd..5583d50d 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/pkg/term v1.1.0 github.com/superfly/fly-checks v0.0.0-20221220181621-bcbf6f4dc6d7 + golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/go.sum b/go.sum index 928dca68..dc288d7c 100644 --- a/go.sum +++ b/go.sum @@ -220,6 +220,8 @@ golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 h1:fJwx88sMf5RXwDwziL0/Mn9Wqs+efMSo/RYcL+37W9c= +golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= diff --git a/pkg/api/handle_admin.go b/pkg/api/handle_admin.go index 64545eeb..914e6f98 100644 --- a/pkg/api/handle_admin.go +++ b/pkg/api/handle_admin.go @@ -1,9 +1,14 @@ package api import ( - "net/http" - + "encoding/json" + "fmt" "github.com/fly-apps/postgres-flex/pkg/flypg" + "github.com/fly-apps/postgres-flex/pkg/flypg/admin" + "github.com/fly-apps/postgres-flex/pkg/flypg/state" + "golang.org/x/exp/slices" + "net/http" + "strings" ) func handleRole(w http.ResponseWriter, r *http.Request) { @@ -39,3 +44,209 @@ func handleRole(w http.ResponseWriter, r *http.Request) { renderJSON(w, res, http.StatusOK) } + +type SettingsUpdate struct { + Message string `json:"message"` + RestartRequired bool `json:"restart_required"` +} + +func (s *Server) handleUpdatePostgresSettings(w http.ResponseWriter, r *http.Request) { + conn, close, err := localConnection(r.Context(), "postgres") + if err != nil { + renderErr(w, err) + return + } + defer close() + + user := s.node.PGConfig.UserConfig() + + var in map[string]interface{} + + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + renderErr(w, err) + return + } + + for k, v := range in { + exists, err := admin.SettingExists(r.Context(), conn, k) + if err != nil { + renderErr(w, err) + return + } + if !exists { + renderErr(w, fmt.Errorf("invalid config option: %s", k)) + return + } + user[k] = v + } + + s.node.PGConfig.SetUserConfig(user) + + var requiresRestart []string + + for k, _ := range user { + restart, err := admin.SettingRequiresRestart(r.Context(), conn, k) + if err != nil { + renderErr(w, err) + return + } + if restart { + requiresRestart = append(requiresRestart, k) + } + } + + res := &Response{Result: SettingsUpdate{ + Message: "Updated", + RestartRequired: false, + }} + + if len(requiresRestart) > 0 { + res = &Response{Result: SettingsUpdate{ + Message: fmt.Sprintf("Updated, but settings %s need a restart to apply", strings.Join(requiresRestart, ", ")), + RestartRequired: true, + }} + } + + renderJSON(w, res, http.StatusOK) +} + +func (s *Server) handleApplyConfig(w http.ResponseWriter, r *http.Request) { + conn, close, err := localConnection(r.Context(), "postgres") + if err != nil { + renderErr(w, err) + return + } + defer close() + + consul, err := state.NewConsulClient() + if err != nil { + renderErr(w, err) + return + } + + err = flypg.WriteUserConfig(s.node.PGConfig, consul) + if err != nil { + renderErr(w, err) + return + } + + err = admin.ReloadPostgresConfig(r.Context(), conn) + if err != nil { + renderErr(w, err) + return + } +} + +type PGSettingsResponse struct { + Settings []admin.PGSetting `json:"settings"` +} + +func (s *Server) handleViewPostgresSettings(w http.ResponseWriter, r *http.Request) { + conn, close, err := localConnection(r.Context(), "postgres") + if err != nil { + renderErr(w, err) + return + } + + defer close() + internal := s.node.PGConfig.InternalConfig() + user := s.node.PGConfig.UserConfig() + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + var in []string + + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + renderErr(w, err) + return + } + + var out []admin.PGSetting + + for key, _ := range all { + if slices.Contains(in, key) { + setting, err := admin.GetSetting(r.Context(), conn, key) + if err != nil { + renderErr(w, err) + return + } + out = append(out, *setting) + } + } + + resp := &Response{Result: PGSettingsResponse{Settings: out}} + renderJSON(w, resp, http.StatusOK) +} + +func (s *Server) handleViewBouncerSettings(w http.ResponseWriter, r *http.Request) { + internal := s.node.PGBouncer.InternalConfig() + user := s.node.PGBouncer.UserConfig() + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + var in []string + + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + renderErr(w, err) + return + } + + out := map[string]interface{}{} + + for key, _ := range all { + val, _ := all[key] + if slices.Contains(in, key) { + out[key] = val + } + } + + resp := &Response{Result: out} + renderJSON(w, resp, http.StatusOK) +} + +func (s *Server) handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) { + internal := s.node.RepMgr.InternalConfig() + user := s.node.RepMgr.UserConfig() + + all := map[string]interface{}{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + var in []string + + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + renderErr(w, err) + return + } + + out := map[string]interface{}{} + + for key, _ := range all { + val, _ := all[key] + if slices.Contains(in, key) { + out[key] = val + } + } + + resp := &Response{Result: out} + renderJSON(w, resp, http.StatusOK) +} diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 9f6754b0..cb162d8b 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -13,16 +13,21 @@ import ( const Port = 5500 -func StartHttpServer() { +type Server struct { + node *flypg.Node +} + +func StartHttpServer(node *flypg.Node) { + server := &Server{node: node} r := chi.NewMux() r.Mount("/flycheck", flycheck.Handler()) - r.Mount("/commands", Handler()) + r.Mount("/commands", server.Handler()) http.ListenAndServe(fmt.Sprintf(":%d", Port), r) } -func Handler() http.Handler { +func (s *Server) Handler() http.Handler { r := chi.NewRouter() r.Route("/users", func(r chi.Router) { @@ -41,9 +46,11 @@ func Handler() http.Handler { r.Route("/admin", func(r chi.Router) { r.Get("/role", handleRole) - // r.Get("/failover/trigger", handleFailoverTrigger) - // r.Get("/settings/view", handleViewSettings) - // r.Post("/settings/update", handleUpdateSettings) + r.Get("/settings/view/postgres", s.handleViewPostgresSettings) + r.Get("/settings/view/pgbouncer", s.handleViewBouncerSettings) + r.Get("/settings/view/repmgr", s.handleViewRepmgrSettings) + r.Post("/settings/update/postgres", s.handleUpdatePostgresSettings) + r.Post("/settings/apply", s.handleApplyConfig) }) return r diff --git a/pkg/flypg/admin/admin.go b/pkg/flypg/admin/admin.go index f48f8c84..2218c347 100644 --- a/pkg/flypg/admin/admin.go +++ b/pkg/flypg/admin/admin.go @@ -224,3 +224,53 @@ func SetConfigurationSetting(ctx context.Context, conn *pgx.Conn, key string, va _, err := conn.Exec(ctx, sql) return err } + +func ReloadPostgresConfig(ctx context.Context, pg *pgx.Conn) error { + sql := "SELECT pg_reload_conf()" + + _, err := pg.Exec(ctx, sql) + return err +} + +func SettingExists(ctx context.Context, pg *pgx.Conn, setting string) (bool, error) { + sql := fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM pg_settings WHERE name='%s')", setting) + var out bool + if err := pg.QueryRow(ctx, sql).Scan(&out); err != nil { + return false, err + } + return out, nil +} + +func SettingRequiresRestart(ctx context.Context, pg *pgx.Conn, setting string) (bool, error) { + sql := fmt.Sprintf("SELECT pending_restart FROM pg_settings WHERE name='%s'", setting) + row := pg.QueryRow(ctx, sql) + var out bool + if err := row.Scan(&out); err != nil { + return false, err + } + return out, nil +} + +type PGSetting struct { + Name string `json:"name,omitempty"` + Setting string `json:"setting,omitempty"` + VarType *string `json:"vartype,omitempty"` + MinVal *string `json:"min_val,omitempty"` + MaxVal *string `json:"max_val,omitempty"` + EnumVals *[]string `json:"enumvals,omitempty"` + Context *string `json:"context,omitempty"` + Unit *string `json:"unit,omitempty"` + Desc *string `json:"short_desc,omitempty"` + PendingChange *string `json:"pending_change,omitempty"` + PendingRestart *bool `json:"pending_restart,omitempty"` +} + +func GetSetting(ctx context.Context, pg *pgx.Conn, setting string) (*PGSetting, error) { + sql := fmt.Sprintf("SELECT name, setting, vartype, min_val, max_val, enumvals, context, unit, short_desc, pending_restart FROM pg_settings WHERE name='%s'", setting) + row := pg.QueryRow(ctx, sql) + out := PGSetting{} + if err := row.Scan(&out.Name, &out.Setting, &out.VarType, &out.MinVal, &out.MaxVal, &out.EnumVals, &out.Context, &out.Unit, &out.Desc, &out.PendingRestart); err != nil { + return nil, err + } + return &out, nil +} diff --git a/pkg/flypg/config.go b/pkg/flypg/config.go index f9cd68b0..66ea6208 100644 --- a/pkg/flypg/config.go +++ b/pkg/flypg/config.go @@ -39,6 +39,9 @@ func SyncUserConfig(c Config, consul *state.ConsulClient) error { if err != nil { return fmt.Errorf("failed to pull config from consul: %s", err) } + if cfg == nil { + return nil + } c.SetUserConfig(cfg) if err := WriteConfigFiles(c); err != nil { @@ -70,6 +73,9 @@ func pullFromConsul(c Config, consul *state.ConsulClient) (ConfigMap, error) { if err != nil { return nil, err } + if configBytes == nil { + return nil, nil + } var storeCfg ConfigMap if err = json.Unmarshal(configBytes, &storeCfg); err != nil { @@ -91,16 +97,21 @@ func WriteConfigFiles(c Config) error { } defer userFile.Close() - for key, value := range c.InternalConfig() { - entry := fmt.Sprintf("%s = %v\n", key, value) - internalFile.Write([]byte(entry)) - } + internal := c.InternalConfig() for key, value := range c.UserConfig() { entry := fmt.Sprintf("%s = %v\n", key, value) + if _, ok := internal[key]; ok { + delete(internal, key) + } userFile.Write([]byte(entry)) } + for key, value := range internal { + entry := fmt.Sprintf("%s = %v\n", key, value) + internalFile.Write([]byte(entry)) + } + return nil } diff --git a/pkg/flypg/node.go b/pkg/flypg/node.go index 076d0eb4..dda620dc 100644 --- a/pkg/flypg/node.go +++ b/pkg/flypg/node.go @@ -133,6 +133,11 @@ func (n *Node) Init(ctx context.Context) error { fmt.Printf("Failed to initialize repmgr: %s\n", err.Error()) } + err = SyncUserConfig(&repmgr, consul) + if err != nil { + fmt.Printf("Failed to sync user config from consul for repmgr: %s\n", err.Error()) + } + err = WriteConfigFiles(&repmgr) if err != nil { fmt.Printf("Failed to write config files for repmgr: %s\n", err.Error()) @@ -143,6 +148,11 @@ func (n *Node) Init(ctx context.Context) error { return err } + err = SyncUserConfig(&pgbouncer, consul) + if err != nil { + fmt.Printf("Failed to sync user config from consul for pgbouncer: %s\n", err.Error()) + } + err = WriteConfigFiles(&pgbouncer) if err != nil { fmt.Printf("Failed to write config files for pgbouncer: %s\n", err.Error()) @@ -194,6 +204,12 @@ func (n *Node) Init(ctx context.Context) error { fmt.Println("Resolving PG configuration settings.") PGConfig.Setup() + + err = SyncUserConfig(PGConfig, consul) + if err != nil { + fmt.Printf("Failed to sync user config from consul for pgbouncer: %s\n", err.Error()) + } + WriteConfigFiles(PGConfig) PGConfig.Print(os.Stdout) @@ -244,7 +260,7 @@ func (n *Node) PostInit(ctx context.Context) error { // Setup repmgr database, extension, and register ourselves as the primary fmt.Println("Perform Repmgr setup") if err := repmgr.setup(ctx, conn); err != nil { - return fmt.Errorf("failed to setup repmgr: %s", err) + fmt.Printf("failed to setup repmgr: %s\n", err) } if err := consul.RegisterPrimary(n.PrivateIP); err != nil { diff --git a/pkg/flypg/state/state.go b/pkg/flypg/state/state.go index 8c3143db..7e87057c 100644 --- a/pkg/flypg/state/state.go +++ b/pkg/flypg/state/state.go @@ -47,6 +47,9 @@ func (c *ConsulClient) PullUserConfig(key string) ([]byte, error) { if err != nil { return nil, err } + if pair == nil { + return nil, nil + } return pair.Value, nil } diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index d3d5610f..c9086731 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -3,6 +3,7 @@ package supervisor import ( "context" "fmt" + "github.com/fly-apps/postgres-flex/pkg/flypg" "os" "os/exec" "os/signal" @@ -126,8 +127,8 @@ func (h *Supervisor) waitForExit(ctx context.Context) { } } -func (h *Supervisor) StartHttpListener() { - go api.StartHttpServer() +func (h *Supervisor) StartHttpListener(node *flypg.Node) { + go api.StartHttpServer(node) } func (h *Supervisor) Run() error {