diff --git a/config/pgbouncer.ini b/config/pgbouncer.ini deleted file mode 100644 index 724d3702..00000000 --- a/config/pgbouncer.ini +++ /dev/null @@ -1,26 +0,0 @@ -[pgbouncer] - -listen_addr = * -listen_port = 5432 -unix_socket_dir = /tmp - -auth_user = postgres -auth_file = /data/pgbouncer/pgbouncer.auth - -admin_users = postgres - -user = postgres - -pool_mode = transaction - -max_client_conn = 100 -default_pool_size = 20 -min_pool_size = 5 -reserve_pool_size = 5 -reserve_pool_timeout = 3 - -log_connections = 1 -log_disconnections = 1 -log_pooler_errors = 1 - -%include /data/pgbouncer/pgbouncer.database.ini \ No newline at end of file diff --git a/pkg/flypg/config.go b/pkg/flypg/config.go index a023c4cc..f9cd68b0 100644 --- a/pkg/flypg/config.go +++ b/pkg/flypg/config.go @@ -2,320 +2,123 @@ package flypg import ( "bufio" - "context" "encoding/json" "fmt" - "io" - "io/ioutil" + "github.com/fly-apps/postgres-flex/pkg/flypg/state" "os" - "strconv" "strings" - "syscall" - - "github.com/fly-apps/postgres-flex/pkg/flypg/admin" - "github.com/fly-apps/postgres-flex/pkg/flypg/state" - "github.com/jackc/pgx/v4" ) -type PGConfig map[string]interface{} - -type Config struct { - configFilePath string - - internalConfigFilePath string - userConfigFilePath string - dataDir string - - internalConfig PGConfig - userConfig PGConfig -} - -func NewConfig(dataDir string) *Config { - return &Config{ - dataDir: dataDir, - configFilePath: fmt.Sprintf("%s/postgresql.conf", dataDir), - - internalConfigFilePath: fmt.Sprintf("%s/postgresql.internal.conf", dataDir), - userConfigFilePath: fmt.Sprintf("%s/postgresql.user.conf", dataDir), - - internalConfig: PGConfig{}, - userConfig: PGConfig{}, - } -} - -// Print outputs the interna/user config to stdout. -func (c *Config) Print(w io.Writer) error { - internalCfg, err := c.pullFromFile(c.internalConfigFilePath) - if err != nil { - return fmt.Errorf("failed to read internal config: %s", err) - } - - userCfg, err := c.pullFromFile(c.userConfigFilePath) - if err != nil { - return fmt.Errorf("failed to read internal config: %s", err) - } - - cfg := PGConfig{} - - for k, v := range internalCfg { - cfg[k] = v - } - - for k, v := range userCfg { - cfg[k] = v - } - - e := json.NewEncoder(w) - e.SetIndent("", " ") +type ConfigMap map[string]interface{} - return e.Encode(cfg) +type Config interface { + InternalConfigFile() string + UserConfigFile() string + InternalConfig() ConfigMap + UserConfig() ConfigMap + SetUserConfig(configMap ConfigMap) + ConsulKey() string } -// Setup will ensure the required configuration files are stubbed and the parent -// postgresql.conf file includes them. -func (c Config) Setup() error { - if _, err := os.Stat(c.internalConfigFilePath); err != nil { - if os.IsNotExist(err) { - if err := runCommand(fmt.Sprintf("touch %s", c.internalConfigFilePath)); err != nil { - return err - } - } else { - return err - } - } - - if _, err := os.Stat(c.userConfigFilePath); err != nil { - if os.IsNotExist(err) { - if err := runCommand(fmt.Sprintf("touch %s", c.userConfigFilePath)); err != nil { - return err - } - } else { - return err - } - } - - b, err := ioutil.ReadFile(c.configFilePath) - if err != nil { - return err - } - - var entries []string - if !strings.Contains(string(b), "postgresql.internal.conf") { - entries = append(entries, "include 'postgresql.internal.conf'\n") - } - - if !strings.Contains(string(b), "postgresql.user.conf") { - entries = append(entries, "include 'postgresql.user.conf'\n") - } - - if len(entries) > 0 { - f, err := os.OpenFile(c.configFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return nil +func WriteUserConfig(c Config, consul *state.ConsulClient) error { + if c.UserConfig() != nil { + if err := pushToConsul(c, consul); err != nil { + return fmt.Errorf("failed to write to consul: %s", err) } - defer f.Close() - for _, entry := range entries { - if _, err := f.WriteString(entry); err != nil { - return fmt.Errorf("failed append configuration entry: %s", err) - } + if err := WriteConfigFiles(c); err != nil { + return fmt.Errorf("failed to write to pg config file: %s", err) } } return nil } -// WriteDefaults will resolve the default configuration settings and write them to the -// internal config file. -func (c Config) WriteDefaults() error { - // The default wal_segment_size in mb - const walSegmentSize = 16 - - // Calculate total allocated disk in bytes - diskSizeBytes, err := diskSizeInBytes() +func SyncUserConfig(c Config, consul *state.ConsulClient) error { + cfg, err := pullFromConsul(c, consul) if err != nil { - return fmt.Errorf("failed to fetch disk size: %s", err) - } - - // Calculate total allocated memory in bytes - memSizeInBytes, err := memTotalInBytes() - if err != nil { - return fmt.Errorf("failed to fetch total system memory: %s", err) - } - - // Set max_wal_size to 10% of disk capacity. - maxWalBytes := diskSizeBytes / 10 - maxWalMb := maxWalBytes / (1024 * 1024) - - // Set min_wal_size to 25% of max_wal_size - minWalBytes := maxWalBytes / 4 - minWalMb := minWalBytes / (1024 * 1024) - - // min_wal_size must be at least twice the size of wal_segment_size. - if minWalMb < (walSegmentSize * 2) { - minWalMb = walSegmentSize * 2 - } - - var sharedBuffersBytes int - // If total memory is greater than or equal to 1GB - if memSizeInBytes >= (1024 * 1024 * 1024) { - // Set shared_buffers to 25% of available memory - sharedBuffersBytes = int(memSizeInBytes) / 4 - } else { - // Set shared buffers to 10% of available memory - sharedBuffersBytes = int(memSizeInBytes) / 10 - } - sharedBuffersMb := sharedBuffersBytes / (1024 * 1024) - - conf := PGConfig{ - "random_page_cost": "1.1", - "shared_buffers": fmt.Sprintf("%dMB", sharedBuffersMb), - "max_connections": 300, - "max_replication_slots": 10, - "min_wal_size": fmt.Sprintf("%dMB", int(minWalMb)), - "max_wal_size": fmt.Sprintf("%dMB", int(maxWalMb)), - "wal_compression": "on", - "wal_level": "replica", - "hot_standby": true, - "archive_mode": true, - "archive_command": "'/bin/true'", - "shared_preload_libraries": "repmgr", + return fmt.Errorf("failed to pull config from consul: %s", err) } + c.SetUserConfig(cfg) - if err := c.writeToFile(c.internalConfigFilePath, conf); err != nil { + if err := WriteConfigFiles(c); err != nil { return fmt.Errorf("failed to write to pg config file: %s", err) } return nil } -// WriteUserConfig will push any user-defined configuration to Consul and write it to the user config file. -func (c Config) WriteUserConfig(ctx context.Context, conn *pgx.Conn, consul *state.ConsulClient, cfg PGConfig) error { - if c.userConfig != nil { - if err := c.pushToConsul(consul, cfg); err != nil { - return fmt.Errorf("failed to write to consul: %s", err) - } - - if err := c.writeToFile(c.userConfigFilePath, cfg); err != nil { - return fmt.Errorf("failed to write to pg config file: %s", err) - } +func pushToConsul(c Config, consul *state.ConsulClient) error { + if c.UserConfig() == nil { + return nil } - return nil -} - -// SyncUserConfig will pull the latest user-defined configuration data from Consul and -// write it to the user config file. -func (c Config) SyncUserConfig(ctx context.Context, consul *state.ConsulClient) error { - cfg, err := c.pullFromConsul(consul) + configBytes, err := json.Marshal(c.UserConfig()) if err != nil { - return fmt.Errorf("failed to pull config from consul: %s", err) + return err } - if err := c.writeToFile(c.userConfigFilePath, cfg); err != nil { - return fmt.Errorf("failed to write to pg config file: %s", err) + if err := consul.PushUserConfig(c.ConsulKey(), configBytes); err != nil { + return err } return nil } -// ApplyUserConfigAtRuntime will take a config and attempt to set it at runtime. -func (c Config) ApplyUserConfigAtRuntime(ctx context.Context, conn *pgx.Conn, conf PGConfig) error { - for key, value := range conf { - if err := admin.SetConfigurationSetting(ctx, conn, key, value); err != nil { - fmt.Printf("failed to set configuration setting %s -> %s: %s", key, value, err) - } +func pullFromConsul(c Config, consul *state.ConsulClient) (ConfigMap, error) { + configBytes, err := consul.PullUserConfig(c.ConsulKey()) + if err != nil { + return nil, err } - return nil -} - -func (c Config) pushToConsul(consul *state.ConsulClient, conf PGConfig) error { - if conf == nil { - return nil + var storeCfg ConfigMap + if err = json.Unmarshal(configBytes, &storeCfg); err != nil { + return nil, err } - configBytes, err := json.Marshal(conf) + return storeCfg, nil +} + +func WriteConfigFiles(c Config) error { + internalFile, err := os.OpenFile(c.InternalConfigFile(), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return err } - - if err := consul.PushUserConfig(configBytes); err != nil { + defer internalFile.Close() + userFile, err := os.OpenFile(c.UserConfigFile(), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + if err != nil { return err } + defer userFile.Close() - return nil -} - -func (c Config) writeToFile(pathToFile string, conf PGConfig) error { - file, err := os.OpenFile(pathToFile, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) - if err != nil { - return err + for key, value := range c.InternalConfig() { + entry := fmt.Sprintf("%s = %v\n", key, value) + internalFile.Write([]byte(entry)) } - defer file.Close() - for key, value := range conf { + for key, value := range c.UserConfig() { entry := fmt.Sprintf("%s = %v\n", key, value) - file.Write([]byte(entry)) + userFile.Write([]byte(entry)) } return nil } -func (c Config) pullFromFile(pathToFile string) (PGConfig, error) { - file, err := os.Open(pathToFile) +func ReadFromFile(path string) (ConfigMap, error) { + file, err := os.Open(path) if err != nil { return nil, err } defer file.Close() - pgConf := PGConfig{} + conf := ConfigMap{} scanner := bufio.NewScanner(file) for scanner.Scan() { lineArr := strings.Split(scanner.Text(), "=") key := strings.TrimSpace(lineArr[0]) value := strings.TrimSpace(lineArr[1]) - pgConf[key] = value - } - - return pgConf, nil -} - -func (c Config) pullFromConsul(consul *state.ConsulClient) (PGConfig, error) { - configBytes, err := consul.PullUserConfig() - if err != nil { - return nil, err - } - - var storeCfg PGConfig - if err = json.Unmarshal(configBytes, &storeCfg); err != nil { - return nil, err + conf[key] = value } - return storeCfg, nil -} - -func memTotalInBytes() (int64, error) { - memoryStr := os.Getenv("FLY_VM_MEMORY_MB") - - if memoryStr == "" { - return 0, fmt.Errorf("FLY_VM_MEMORY_MB envvar has not been set") - } - - parsed, err := strconv.ParseInt(memoryStr, 10, 64) - if err != nil { - return 0, err - } - - memoryBytes := parsed * (1024 * 1024) - - return memoryBytes, nil -} - -func diskSizeInBytes() (uint64, error) { - var stat syscall.Statfs_t - if err := syscall.Statfs("/data", &stat); err != nil { - return 0, err - } - return stat.Blocks * uint64(stat.Bsize), nil + return conf, nil } diff --git a/pkg/flypg/node.go b/pkg/flypg/node.go index 5313273b..076d0eb4 100644 --- a/pkg/flypg/node.go +++ b/pkg/flypg/node.go @@ -11,7 +11,6 @@ import ( "os/exec" "os/user" "strconv" - "syscall" "time" "github.com/fly-apps/postgres-flex/pkg/flypg/admin" @@ -30,7 +29,7 @@ type Node struct { PrivateIP string DataDir string Port int - Config *Config + PGConfig *PGConfig SUCredentials Credentials OperatorCredentials Credentials @@ -62,7 +61,7 @@ func NewNode() (*Node, error) { } // Stub configuration - node.Config = NewConfig(node.DataDir) + node.PGConfig = NewConfig(node.DataDir) // Internal user node.SUCredentials = Credentials{ @@ -95,14 +94,16 @@ func NewNode() (*Node, error) { rand.Seed(int64(seed)) node.RepMgr = RepMgr{ - ID: rand.Int31(), - Region: os.Getenv("FLY_REGION"), - ConfigPath: "/data/repmgr.conf", - DataDir: node.DataDir, - PrivateIP: node.PrivateIP, - Port: 5433, - DatabaseName: "repmgr", - Credentials: node.ReplCredentials, + ID: rand.Int31(), + Region: os.Getenv("FLY_REGION"), + ConfigPath: "/data/repmgr.conf", + InternalConfigPath: "/data/repmgr.internal.conf", + UserConfigPath: "/data/repmgr.user.conf", + DataDir: node.DataDir, + PrivateIP: node.PrivateIP, + Port: 5433, + DatabaseName: "repmgr", + Credentials: node.ReplCredentials, } return node, nil @@ -125,11 +126,16 @@ func (n *Node) Init(ctx context.Context) error { repmgr := n.RepMgr pgbouncer := n.PGBouncer - config := n.Config + PGConfig := n.PGConfig fmt.Println("Initializing replication manager") if err := repmgr.initialize(); err != nil { - fmt.Printf("Failed to initialize replmgr: %s\n", err.Error()) + fmt.Printf("Failed to initialize repmgr: %s\n", err.Error()) + } + + err = WriteConfigFiles(&repmgr) + if err != nil { + fmt.Printf("Failed to write config files for repmgr: %s\n", err.Error()) } fmt.Println("Initializing pgbouncer") @@ -137,6 +143,11 @@ func (n *Node) Init(ctx context.Context) error { return err } + err = WriteConfigFiles(&pgbouncer) + if err != nil { + fmt.Printf("Failed to write config files for pgbouncer: %s\n", err.Error()) + } + switch primaryIP { case n.PrivateIP: // noop @@ -181,11 +192,11 @@ func (n *Node) Init(ctx context.Context) error { } } - fmt.Println("Resolving PG Configurtion settings.") - config.Setup() - config.WriteDefaults() + fmt.Println("Resolving PG configuration settings.") + PGConfig.Setup() + WriteConfigFiles(PGConfig) - config.Print(os.Stdout) + PGConfig.Print(os.Stdout) return nil } @@ -475,24 +486,3 @@ func setDirOwnership() error { _, err = cmd.Output() return err } - -func runCommand(cmdStr string) error { - pgUser, err := user.Lookup("postgres") - if err != nil { - return err - } - pgUID, err := strconv.Atoi(pgUser.Uid) - if err != nil { - return err - } - pgGID, err := strconv.Atoi(pgUser.Gid) - if err != nil { - return err - } - - cmd := exec.Command("sh", "-c", cmdStr) - cmd.SysProcAttr = &syscall.SysProcAttr{} - cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(pgUID), Gid: uint32(pgGID)} - _, err = cmd.Output() - return err -} diff --git a/pkg/flypg/pg.go b/pkg/flypg/pg.go new file mode 100644 index 00000000..6a7be59e --- /dev/null +++ b/pkg/flypg/pg.go @@ -0,0 +1,251 @@ +package flypg + +import ( + "context" + "encoding/json" + "fmt" + "github.com/fly-apps/postgres-flex/pkg/flypg/admin" + "github.com/fly-apps/postgres-flex/pkg/utils" + "github.com/jackc/pgx/v4" + "github.com/pkg/errors" + "io" + "os" + "strconv" + "strings" + "syscall" +) + +type PGConfig struct { + configFilePath string + + internalConfigFilePath string + userConfigFilePath string + dataDir string + + internalConfig ConfigMap + userConfig ConfigMap +} + +// type assertion +var _ Config = &PGConfig{} + +func (c *PGConfig) InternalConfig() ConfigMap { + return c.internalConfig +} + +func (c *PGConfig) UserConfig() ConfigMap { + return c.userConfig +} + +func (c *PGConfig) ConsulKey() string { + return "PGConfig" +} + +func (c *PGConfig) SetUserConfig(newConfig ConfigMap) { + c.userConfig = newConfig +} + +func (c *PGConfig) InternalConfigFile() string { + return c.internalConfigFilePath +} + +func (c *PGConfig) UserConfigFile() string { + return c.userConfigFilePath +} + +func NewConfig(dataDir string) *PGConfig { + return &PGConfig{ + dataDir: dataDir, + configFilePath: fmt.Sprintf("%s/postgresql.conf", dataDir), + + internalConfigFilePath: fmt.Sprintf("%s/postgresql.internal.conf", dataDir), + userConfigFilePath: fmt.Sprintf("%s/postgresql.user.conf", dataDir), + + internalConfig: ConfigMap{}, + userConfig: ConfigMap{}, + } +} + +// Print outputs the internal/user config to stdout. +func (c *PGConfig) Print(w io.Writer) error { + internalCfg, err := ReadFromFile(c.internalConfigFilePath) + if err != nil { + return fmt.Errorf("failed to read internal config: %s", err) + } + + userCfg, err := ReadFromFile(c.userConfigFilePath) + if err != nil { + return fmt.Errorf("failed to read internal config: %s", err) + } + + cfg := ConfigMap{} + + for k, v := range internalCfg { + cfg[k] = v + } + + for k, v := range userCfg { + cfg[k] = v + } + + e := json.NewEncoder(w) + e.SetIndent("", " ") + + return e.Encode(cfg) +} + +// Setup will ensure the required configuration files are stubbed and the parent +// postgresql.conf file includes them. +func (c *PGConfig) Setup() error { + if _, err := os.Stat(c.internalConfigFilePath); err != nil { + if os.IsNotExist(err) { + if err := utils.RunCommand(fmt.Sprintf("touch %s", c.internalConfigFilePath)); err != nil { + return err + } + } else { + return err + } + } + + if _, err := os.Stat(c.userConfigFilePath); err != nil { + if os.IsNotExist(err) { + if err := utils.RunCommand(fmt.Sprintf("touch %s", c.userConfigFilePath)); err != nil { + return err + } + } else { + return err + } + } + + b, err := os.ReadFile(c.configFilePath) + if err != nil { + return err + } + + var entries []string + if !strings.Contains(string(b), "postgresql.internal.conf") { + entries = append(entries, "include 'postgresql.internal.conf'\n") + } + + if !strings.Contains(string(b), "postgresql.user.conf") { + entries = append(entries, "include 'postgresql.user.conf'\n") + } + + if len(entries) > 0 { + f, err := os.OpenFile(c.configFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return nil + } + defer f.Close() + + for _, entry := range entries { + if _, err := f.WriteString(entry); err != nil { + return fmt.Errorf("failed append configuration entry: %s", err) + } + } + } + + err = c.SetDefaults() + if err != nil { + return errors.New("Failed to set PG defaults") + } + + return nil +} + +// SetDefaults WriteDefaults will resolve the default configuration settings and write them to the +// internal config file. +func (c *PGConfig) SetDefaults() error { + // The default wal_segment_size in mb + const walSegmentSize = 16 + + // Calculate total allocated disk in bytes + diskSizeBytes, err := diskSizeInBytes() + if err != nil { + return fmt.Errorf("failed to fetch disk size: %s", err) + } + + // Calculate total allocated memory in bytes + memSizeInBytes, err := memTotalInBytes() + if err != nil { + return fmt.Errorf("failed to fetch total system memory: %s", err) + } + + // Set max_wal_size to 10% of disk capacity. + maxWalBytes := diskSizeBytes / 10 + maxWalMb := maxWalBytes / (1024 * 1024) + + // Set min_wal_size to 25% of max_wal_size + minWalBytes := maxWalBytes / 4 + minWalMb := minWalBytes / (1024 * 1024) + + // min_wal_size must be at least twice the size of wal_segment_size. + if minWalMb < (walSegmentSize * 2) { + minWalMb = walSegmentSize * 2 + } + + var sharedBuffersBytes int + // If total memory is greater than or equal to 1GB + if memSizeInBytes >= (1024 * 1024 * 1024) { + // Set shared_buffers to 25% of available memory + sharedBuffersBytes = int(memSizeInBytes) / 4 + } else { + // Set shared buffers to 10% of available memory + sharedBuffersBytes = int(memSizeInBytes) / 10 + } + sharedBuffersMb := sharedBuffersBytes / (1024 * 1024) + + conf := ConfigMap{ + "random_page_cost": "1.1", + "shared_buffers": fmt.Sprintf("%dMB", sharedBuffersMb), + "max_connections": 300, + "max_replication_slots": 10, + "min_wal_size": fmt.Sprintf("%dMB", int(minWalMb)), + "max_wal_size": fmt.Sprintf("%dMB", int(maxWalMb)), + "wal_compression": "on", + "wal_level": "replica", + "hot_standby": true, + "archive_mode": true, + "archive_command": "'/bin/true'", + "shared_preload_libraries": "repmgr", + } + + c.internalConfig = conf + + return nil +} + +func (c *PGConfig) RuntimeApply(ctx context.Context, conn *pgx.Conn) error { + for key, value := range c.userConfig { + if err := admin.SetConfigurationSetting(ctx, conn, key, value); err != nil { + fmt.Printf("failed to set configuration setting %s -> %s: %s", key, value, err) + } + } + + return nil +} + +func memTotalInBytes() (int64, error) { + memoryStr := os.Getenv("FLY_VM_MEMORY_MB") + + if memoryStr == "" { + return 0, fmt.Errorf("FLY_VM_MEMORY_MB envvar has not been set") + } + + parsed, err := strconv.ParseInt(memoryStr, 10, 64) + if err != nil { + return 0, err + } + + memoryBytes := parsed * (1024 * 1024) + + return memoryBytes, nil +} + +func diskSizeInBytes() (uint64, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs("/data", &stat); err != nil { + return 0, err + } + return stat.Blocks * uint64(stat.Bsize), nil +} diff --git a/pkg/flypg/pgbouncer.go b/pkg/flypg/pgbouncer.go index 3b11d807..d25674bc 100644 --- a/pkg/flypg/pgbouncer.go +++ b/pkg/flypg/pgbouncer.go @@ -3,6 +3,7 @@ package flypg import ( "context" "fmt" + "github.com/fly-apps/postgres-flex/pkg/utils" "net" "os" "strconv" @@ -16,6 +17,33 @@ type PGBouncer struct { ConfigPath string Port int ForwardPort int + + internalConfig ConfigMap + userConfig ConfigMap +} + +func (p *PGBouncer) InternalConfigFile() string { + return fmt.Sprintf("%s/pgbouncer.internal.ini", p.ConfigPath) +} + +func (p *PGBouncer) UserConfigFile() string { + return fmt.Sprintf("%s/pgbouncer.user.ini", p.ConfigPath) +} + +func (p *PGBouncer) InternalConfig() ConfigMap { + return p.internalConfig +} + +func (p *PGBouncer) UserConfig() ConfigMap { + return p.userConfig +} + +func (p *PGBouncer) SetUserConfig(configMap ConfigMap) { + p.userConfig = configMap +} + +func (p *PGBouncer) ConsulKey() string { + return "PGBouncer" } func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload bool) error { @@ -41,22 +69,31 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload func (p *PGBouncer) initialize() error { cmdStr := fmt.Sprintf("mkdir -p %s", p.ConfigPath) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { return err } - // If pgbouncer.ini file is not present, set defaults. - if _, err := os.Stat(fmt.Sprintf("%s/pgbouncer.ini", p.ConfigPath)); err != nil { - if os.IsNotExist(err) { - cmdStr := fmt.Sprintf("cp /fly/pgbouncer.ini %s", p.ConfigPath) - if err := runCommand(cmdStr); err != nil { - return err - } - } else { - return err + f, err := os.OpenFile(fmt.Sprintf("%s/pgbouncer.ini", p.ConfigPath), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil + } + defer f.Close() + + entries := []string{ + "[pgbouncer]\n", + fmt.Sprintf("%%include %s/pgbouncer.internal.ini\n", p.ConfigPath), + fmt.Sprintf("%%include %s/pgbouncer.user.ini\n", p.ConfigPath), + fmt.Sprintf("%%include %s/pgbouncer.database.ini\n", p.ConfigPath), + } + + for _, entry := range entries { + if _, err := f.WriteString(entry); err != nil { + return fmt.Errorf("failed append configuration entry: %s", err) } } + p.setDefaults() + if err := p.configureAuth(); err != nil { return fmt.Errorf("failed to configure pgbouncer auth. %s", err) } @@ -64,6 +101,22 @@ func (p *PGBouncer) initialize() error { return nil } +func (p *PGBouncer) setDefaults() { + conf := ConfigMap{ + "listen_addr": "*", + "listen_port": "5432", + "auth_user": "postgres", + "auth_file": fmt.Sprintf("%s/pgbouncer.auth", p.ConfigPath), + "admin_users": "postgres", + "user": "postgres", + "pool_mode": "transaction", + "min_pool_size": "5", + "reserve_pool_size": "5", + "reserve_pool_timeout": "3", + } + p.internalConfig = conf +} + func (p *PGBouncer) configureAuth() error { path := fmt.Sprintf("%s/pgbouncer.auth", p.ConfigPath) file, err := os.OpenFile(path, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0644) diff --git a/pkg/flypg/repmgr.go b/pkg/flypg/repmgr.go index 5296a1af..91523e64 100644 --- a/pkg/flypg/repmgr.go +++ b/pkg/flypg/repmgr.go @@ -3,6 +3,7 @@ package flypg import ( "context" "fmt" + "github.com/fly-apps/postgres-flex/pkg/utils" "net" "os" "strconv" @@ -18,14 +19,43 @@ const ( ) type RepMgr struct { - ID int32 - Region string - PrivateIP string - DataDir string - DatabaseName string - Credentials Credentials - ConfigPath string - Port int + ID int32 + Region string + PrivateIP string + DataDir string + DatabaseName string + Credentials Credentials + ConfigPath string + UserConfigPath string + InternalConfigPath string + Port int + + internalConfig ConfigMap + userConfig ConfigMap +} + +func (r *RepMgr) InternalConfigFile() string { + return r.InternalConfigPath +} + +func (r *RepMgr) UserConfigFile() string { + return r.UserConfigPath +} + +func (r *RepMgr) InternalConfig() ConfigMap { + return r.internalConfig +} + +func (r *RepMgr) UserConfig() ConfigMap { + return r.userConfig +} + +func (r *RepMgr) SetUserConfig(configMap ConfigMap) { + r.userConfig = configMap +} + +func (r *RepMgr) ConsulKey() string { + return "repmgr" } func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) { @@ -39,8 +69,20 @@ func (r *RepMgr) NewRemoteConnection(ctx context.Context, hostname string) (*pgx } func (r *RepMgr) initialize() error { - if err := r.writeManagerConf(); err != nil { - return fmt.Errorf("failed to write repmgr config file: %s", err) + r.setDefaults() + + f, err := os.OpenFile(r.ConfigPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return nil + } + defer f.Close() + + entries := []string{"include 'repmgr.internal.conf'\n", "include 'repmgr.user.conf'\n"} + + for _, entry := range entries { + if _, err := f.WriteString(entry); err != nil { + return fmt.Errorf("failed append configuration entry: %s", err) + } } if err := r.writePasswdConf(); err != nil { @@ -78,13 +120,8 @@ func (r *RepMgr) Standbys(ctx context.Context, pg *pgx.Conn) ([]Standby, error) return r.standbyStatuses(ctx, pg, int(r.ID)) } -func (r *RepMgr) writeManagerConf() error { - file, err := os.OpenFile(r.ConfigPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) - if err != nil { - return err - } - - conf := map[string]interface{}{ +func (r *RepMgr) setDefaults() { + conf := ConfigMap{ "node_id": fmt.Sprint(r.ID), "node_name": fmt.Sprintf("'%s'", r.PrivateIP), "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=10'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName), @@ -102,20 +139,12 @@ func (r *RepMgr) writeManagerConf() error { conf["priority"] = "0" } - for key, value := range conf { - str := fmt.Sprintf("%s=%s\n", key, value) - _, err := file.Write([]byte(str)) - if err != nil { - return err - } - } - - return nil + r.internalConfig = conf } func (r *RepMgr) registerPrimary() error { cmdStr := fmt.Sprintf("repmgr -f %s primary register -F -v", r.ConfigPath) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { return err } @@ -124,7 +153,7 @@ func (r *RepMgr) registerPrimary() error { func (r *RepMgr) unregisterPrimary() error { cmdStr := fmt.Sprintf("repmgr -f %s primary unregister", r.ConfigPath) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { return err } @@ -133,7 +162,7 @@ func (r *RepMgr) unregisterPrimary() error { func (r *RepMgr) followPrimary() error { cmdStr := fmt.Sprintf("repmgr -f %s standby follow", r.ConfigPath) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { fmt.Printf("failed to register standby: %s", err) } @@ -143,7 +172,7 @@ func (r *RepMgr) followPrimary() error { func (r *RepMgr) registerStandby() error { // Force re-registry to ensure the standby picks up any new configuration changes. cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", r.ConfigPath) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { fmt.Printf("failed to register standby: %s", err) } @@ -152,7 +181,7 @@ func (r *RepMgr) registerStandby() error { func (r *RepMgr) UnregisterStandby(id int) error { cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { fmt.Printf("failed to unregister standby: %s", err) } @@ -161,7 +190,7 @@ func (r *RepMgr) UnregisterStandby(id int) error { func (r *RepMgr) clonePrimary(ipStr string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) - if err := runCommand(cmdStr); err != nil { + if err := utils.RunCommand(cmdStr); err != nil { return err } @@ -173,7 +202,7 @@ func (r *RepMgr) clonePrimary(ipStr string) error { r.ConfigPath) fmt.Println(cmdStr) - return runCommand(cmdStr) + return utils.RunCommand(cmdStr) } func (r *RepMgr) writePasswdConf() error { diff --git a/pkg/flypg/state/state.go b/pkg/flypg/state/state.go index f678a727..8c3143db 100644 --- a/pkg/flypg/state/state.go +++ b/pkg/flypg/state/state.go @@ -36,14 +36,14 @@ func NewConsulClient() (*ConsulClient, error) { }, nil } -func (c *ConsulClient) PushUserConfig(config []byte) error { - kv := &api.KVPair{Key: c.targetKey("Config"), Value: config} +func (c *ConsulClient) PushUserConfig(key string, config []byte) error { + kv := &api.KVPair{Key: c.targetKey(key), Value: config} _, err := c.client.KV().Put(kv, nil) return err } -func (c *ConsulClient) PullUserConfig() ([]byte, error) { - pair, _, err := c.client.KV().Get(c.targetKey("Config"), nil) +func (c *ConsulClient) PullUserConfig(key string) ([]byte, error) { + pair, _, err := c.client.KV().Get(c.targetKey(key), nil) if err != nil { return nil, err } diff --git a/pkg/utils/shell.go b/pkg/utils/shell.go new file mode 100644 index 00000000..1f708fcc --- /dev/null +++ b/pkg/utils/shell.go @@ -0,0 +1,29 @@ +package utils + +import ( + "os/exec" + "os/user" + "strconv" + "syscall" +) + +func RunCommand(cmdStr string) error { + pgUser, err := user.Lookup("postgres") + if err != nil { + return err + } + pgUID, err := strconv.Atoi(pgUser.Uid) + if err != nil { + return err + } + pgGID, err := strconv.Atoi(pgUser.Gid) + if err != nil { + return err + } + + cmd := exec.Command("sh", "-c", cmdStr) + cmd.SysProcAttr = &syscall.SysProcAttr{} + cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(pgUID), Gid: uint32(pgGID)} + _, err = cmd.Output() + return err +}