Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"github.com/fly-apps/postgres-flex/pkg/flypg"
"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
"github.com/jackc/pgx/v4"

"golang.org/x/exp/maps"
)

var (
monitorFrequency = time.Minute * 5
// TODO - Make this configurable and/or extend this to 12-24 hours.
deadMemberRemovalThreshold = time.Hour * 1
)

func main() {
Expand All @@ -33,11 +33,33 @@ func main() {
os.Exit(1)
}

internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
if err != nil {
fmt.Printf("failed to open config: %s\n", err)
os.Exit(1)
}

user, err := flypg.ReadFromFile("/data/flypg.user.conf")
if err != nil {
fmt.Printf("failed to open config: %s\n", err)
os.Exit(1)
}

maps.Copy(user, internal)

deadMemberRemovalThreshold, err := time.ParseDuration(fmt.Sprint(internal["standby_clean_interval"]))
if err != nil {
fmt.Printf(fmt.Sprintf("Failed to parse config: %s", err))
os.Exit(1)
}

seenAt := map[int]time.Time{}

ticker := time.NewTicker(monitorFrequency)
defer ticker.Stop()

fmt.Printf("Pruning every %s...\n", deadMemberRemovalThreshold)

for {
select {
case <-ticker.C:
Expand Down
74 changes: 74 additions & 0 deletions pkg/flypg/flypg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package flypg

import (
"fmt"
"os"
"time"
)

type FlyPGConfig struct {
internalConfigFilePath string
userConfigFilePath string

internalConfig ConfigMap
userConfig ConfigMap

configPath string
}

func (c *FlyPGConfig) SetDefaults() {
c.internalConfig = ConfigMap{
"standby_clean_interval": time.Hour * 24,
}
}

func NewInternalConfig(configPath string) *FlyPGConfig {
return &FlyPGConfig{
internalConfigFilePath: fmt.Sprintf("%s/flypg.internal.conf", configPath),
userConfigFilePath: fmt.Sprintf("%s/flypg.user.conf", configPath),
configPath: configPath,
internalConfig: ConfigMap{},
userConfig: ConfigMap{},
}
}
func (c *FlyPGConfig) InternalConfig() ConfigMap {
return c.internalConfig
}

func (c *FlyPGConfig) UserConfig() ConfigMap {
return c.userConfig
}

func (c *FlyPGConfig) ConsulKey() string {
return "FlyPGConfig"
}

func (c *FlyPGConfig) SetUserConfig(newConfig ConfigMap) {
c.userConfig = newConfig
}

func (c *FlyPGConfig) InternalConfigFile() string {
return c.internalConfigFilePath
}

func (c *FlyPGConfig) UserConfigFile() string {
return c.userConfigFilePath
}

func (c *FlyPGConfig) initialize() error {
c.SetDefaults()

internal, err := os.Create(c.internalConfigFilePath)
if err != nil {
return err
}
defer internal.Close()

user, err := os.Create(c.userConfigFilePath)
if err != nil {
return err
}
defer user.Close()

return nil
}
23 changes: 21 additions & 2 deletions pkg/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type Node struct {
OperatorCredentials Credentials
ReplCredentials Credentials

PGBouncer PGBouncer
RepMgr RepMgr
PGBouncer PGBouncer
RepMgr RepMgr
InternalConfig FlyPGConfig
}

func NewNode() (*Node, error) {
Expand Down Expand Up @@ -106,6 +107,8 @@ func NewNode() (*Node, error) {
Credentials: node.ReplCredentials,
}

node.InternalConfig = *NewInternalConfig("/data")

return node, nil
}

Expand All @@ -127,6 +130,22 @@ func (n *Node) Init(ctx context.Context) error {
repmgr := n.RepMgr
pgbouncer := n.PGBouncer
PGConfig := n.PGConfig
InternalConfig := n.InternalConfig

fmt.Println("Initializing internal config")
if err := InternalConfig.initialize(); err != nil {
fmt.Printf("Failed to initialize internal config: %s\n", err.Error())
}

err = SyncUserConfig(&InternalConfig, cs.Store)
if err != nil {
fmt.Printf("Failed to sync user config from consul for internal config: %s\n", err.Error())
}

err = WriteConfigFiles(&InternalConfig)
if err != nil {
fmt.Printf("Failed to write config files for internal config: %s\n", err.Error())
}

fmt.Println("Initializing replication manager")
if err := repmgr.initialize(); err != nil {
Expand Down