diff --git a/cmd/standby_cleaner/main.go b/cmd/standby_cleaner/main.go index 9625b896..70d31907 100644 --- a/cmd/standby_cleaner/main.go +++ b/cmd/standby_cleaner/main.go @@ -9,12 +9,12 @@ import ( "github.com/fly-apps/postgres-flex/pkg/flypg" "github.com/fly-apps/postgres-flex/pkg/flypg/admin" "github.com/jackc/pgx/v4" + + "golang.org/x/exp/maps" ) var ( monitorFrequency = time.Minute * 5 - // TODO - Make this configurable and/or extend this to 12-24 hours. - deadMemberRemovalThreshold = time.Hour * 1 ) func main() { @@ -33,11 +33,33 @@ func main() { os.Exit(1) } + internal, err := flypg.ReadFromFile("/data/flypg.internal.conf") + if err != nil { + fmt.Printf("failed to open config: %s\n", err) + os.Exit(1) + } + + user, err := flypg.ReadFromFile("/data/flypg.user.conf") + if err != nil { + fmt.Printf("failed to open config: %s\n", err) + os.Exit(1) + } + + maps.Copy(user, internal) + + deadMemberRemovalThreshold, err := time.ParseDuration(fmt.Sprint(internal["standby_clean_interval"])) + if err != nil { + fmt.Printf(fmt.Sprintf("Failed to parse config: %s", err)) + os.Exit(1) + } + seenAt := map[int]time.Time{} ticker := time.NewTicker(monitorFrequency) defer ticker.Stop() + fmt.Printf("Pruning every %s...\n", deadMemberRemovalThreshold) + for { select { case <-ticker.C: diff --git a/pkg/flypg/flypg.go b/pkg/flypg/flypg.go new file mode 100644 index 00000000..515eb3e3 --- /dev/null +++ b/pkg/flypg/flypg.go @@ -0,0 +1,74 @@ +package flypg + +import ( + "fmt" + "os" + "time" +) + +type FlyPGConfig struct { + internalConfigFilePath string + userConfigFilePath string + + internalConfig ConfigMap + userConfig ConfigMap + + configPath string +} + +func (c *FlyPGConfig) SetDefaults() { + c.internalConfig = ConfigMap{ + "standby_clean_interval": time.Hour * 24, + } +} + +func NewInternalConfig(configPath string) *FlyPGConfig { + return &FlyPGConfig{ + internalConfigFilePath: fmt.Sprintf("%s/flypg.internal.conf", configPath), + userConfigFilePath: fmt.Sprintf("%s/flypg.user.conf", configPath), + configPath: configPath, + internalConfig: ConfigMap{}, + userConfig: ConfigMap{}, + } +} +func (c *FlyPGConfig) InternalConfig() ConfigMap { + return c.internalConfig +} + +func (c *FlyPGConfig) UserConfig() ConfigMap { + return c.userConfig +} + +func (c *FlyPGConfig) ConsulKey() string { + return "FlyPGConfig" +} + +func (c *FlyPGConfig) SetUserConfig(newConfig ConfigMap) { + c.userConfig = newConfig +} + +func (c *FlyPGConfig) InternalConfigFile() string { + return c.internalConfigFilePath +} + +func (c *FlyPGConfig) UserConfigFile() string { + return c.userConfigFilePath +} + +func (c *FlyPGConfig) initialize() error { + c.SetDefaults() + + internal, err := os.Create(c.internalConfigFilePath) + if err != nil { + return err + } + defer internal.Close() + + user, err := os.Create(c.userConfigFilePath) + if err != nil { + return err + } + defer user.Close() + + return nil +} diff --git a/pkg/flypg/node.go b/pkg/flypg/node.go index b9105c24..cf4a261a 100644 --- a/pkg/flypg/node.go +++ b/pkg/flypg/node.go @@ -35,8 +35,9 @@ type Node struct { OperatorCredentials Credentials ReplCredentials Credentials - PGBouncer PGBouncer - RepMgr RepMgr + PGBouncer PGBouncer + RepMgr RepMgr + InternalConfig FlyPGConfig } func NewNode() (*Node, error) { @@ -106,6 +107,8 @@ func NewNode() (*Node, error) { Credentials: node.ReplCredentials, } + node.InternalConfig = *NewInternalConfig("/data") + return node, nil } @@ -127,6 +130,22 @@ func (n *Node) Init(ctx context.Context) error { repmgr := n.RepMgr pgbouncer := n.PGBouncer PGConfig := n.PGConfig + InternalConfig := n.InternalConfig + + fmt.Println("Initializing internal config") + if err := InternalConfig.initialize(); err != nil { + fmt.Printf("Failed to initialize internal config: %s\n", err.Error()) + } + + err = SyncUserConfig(&InternalConfig, cs.Store) + if err != nil { + fmt.Printf("Failed to sync user config from consul for internal config: %s\n", err.Error()) + } + + err = WriteConfigFiles(&InternalConfig) + if err != nil { + fmt.Printf("Failed to write config files for internal config: %s\n", err.Error()) + } fmt.Println("Initializing replication manager") if err := repmgr.initialize(); err != nil {