Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package config

import (
"context"
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
"github.com/jackc/pgx/v4"
"io"
)

type ConfigMap map[string]interface{}

type ConfigModule interface {
Print(io.Writer) error
Setup() error
WriteDefaults() error
WriteUserConfig(context.Context, *pgx.Conn, *state.ConsulClient, ConfigMap) error
SyncUserConfig(ctx context.Context, consul *state.ConsulClient) error
RuntimeApply(ctx context.Context, conn *pgx.Conn) error
}
30 changes: 5 additions & 25 deletions pkg/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/binary"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/config"
"github.com/fly-apps/postgres-flex/pkg/flypg/pg"
"io/ioutil"
"math/rand"
"net"
"os"
"os/exec"
"os/user"
"strconv"
"syscall"
"time"

"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
Expand All @@ -30,7 +31,7 @@ type Node struct {
PrivateIP string
DataDir string
Port int
Config *Config
PGConfig config.ConfigModule

SUCredentials Credentials
OperatorCredentials Credentials
Expand Down Expand Up @@ -62,7 +63,7 @@ func NewNode() (*Node, error) {
}

// Stub configuration
node.Config = NewConfig(node.DataDir)
node.PGConfig = pg.NewConfig(node.DataDir)

// Internal user
node.SUCredentials = Credentials{
Expand Down Expand Up @@ -125,7 +126,7 @@ func (n *Node) Init(ctx context.Context) error {

repmgr := n.RepMgr
pgbouncer := n.PGBouncer
config := n.Config
config := n.PGConfig

fmt.Println("Initializing replication manager")
if err := repmgr.initialize(); err != nil {
Expand Down Expand Up @@ -475,24 +476,3 @@ func setDirOwnership() error {
_, err = cmd.Output()
return err
}

func runCommand(cmdStr string) error {
pgUser, err := user.Lookup("postgres")
if err != nil {
return err
}
pgUID, err := strconv.Atoi(pgUser.Uid)
if err != nil {
return err
}
pgGID, err := strconv.Atoi(pgUser.Gid)
if err != nil {
return err
}

cmd := exec.Command("sh", "-c", cmdStr)
cmd.SysProcAttr = &syscall.SysProcAttr{}
cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(pgUID), Gid: uint32(pgGID)}
_, err = cmd.Output()
return err
}
61 changes: 32 additions & 29 deletions pkg/flypg/config.go → pkg/flypg/pg/pg.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package flypg
package pg

import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/config"
"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
"github.com/fly-apps/postgres-flex/pkg/utils"
"github.com/jackc/pgx/v4"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"syscall"

"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
"github.com/jackc/pgx/v4"
)

type PGConfig map[string]interface{}
var ConfKey = "PGConfig"

type Config struct {
configFilePath string
Expand All @@ -26,10 +27,12 @@ type Config struct {
userConfigFilePath string
dataDir string

internalConfig PGConfig
userConfig PGConfig
internalConfig config.ConfigMap
userConfig config.ConfigMap
}

var _ config.ConfigModule = &Config{}

func NewConfig(dataDir string) *Config {
return &Config{
dataDir: dataDir,
Expand All @@ -38,12 +41,12 @@ func NewConfig(dataDir string) *Config {
internalConfigFilePath: fmt.Sprintf("%s/postgresql.internal.conf", dataDir),
userConfigFilePath: fmt.Sprintf("%s/postgresql.user.conf", dataDir),

internalConfig: PGConfig{},
userConfig: PGConfig{},
internalConfig: config.ConfigMap{},
userConfig: config.ConfigMap{},
}
}

// Print outputs the interna/user config to stdout.
// Print outputs the internal/user config to stdout.
func (c *Config) Print(w io.Writer) error {
internalCfg, err := c.pullFromFile(c.internalConfigFilePath)
if err != nil {
Expand All @@ -55,7 +58,7 @@ func (c *Config) Print(w io.Writer) error {
return fmt.Errorf("failed to read internal config: %s", err)
}

cfg := PGConfig{}
cfg := config.ConfigMap{}

for k, v := range internalCfg {
cfg[k] = v
Expand All @@ -73,10 +76,10 @@ func (c *Config) Print(w io.Writer) error {

// Setup will ensure the required configuration files are stubbed and the parent
// postgresql.conf file includes them.
func (c Config) Setup() error {
func (c *Config) Setup() error {
if _, err := os.Stat(c.internalConfigFilePath); err != nil {
if os.IsNotExist(err) {
if err := runCommand(fmt.Sprintf("touch %s", c.internalConfigFilePath)); err != nil {
if err := utils.RunCommand(fmt.Sprintf("touch %s", c.internalConfigFilePath)); err != nil {
return err
}
} else {
Expand All @@ -86,7 +89,7 @@ func (c Config) Setup() error {

if _, err := os.Stat(c.userConfigFilePath); err != nil {
if os.IsNotExist(err) {
if err := runCommand(fmt.Sprintf("touch %s", c.userConfigFilePath)); err != nil {
if err := utils.RunCommand(fmt.Sprintf("touch %s", c.userConfigFilePath)); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -127,7 +130,7 @@ func (c Config) Setup() error {

// WriteDefaults will resolve the default configuration settings and write them to the
// internal config file.
func (c Config) WriteDefaults() error {
func (c *Config) WriteDefaults() error {
// The default wal_segment_size in mb
const walSegmentSize = 16

Expand Down Expand Up @@ -167,7 +170,7 @@ func (c Config) WriteDefaults() error {
}
sharedBuffersMb := sharedBuffersBytes / (1024 * 1024)

conf := PGConfig{
conf := config.ConfigMap{
"random_page_cost": "1.1",
"shared_buffers": fmt.Sprintf("%dMB", sharedBuffersMb),
"max_connections": 300,
Expand All @@ -190,7 +193,7 @@ func (c Config) WriteDefaults() error {
}

// WriteUserConfig will push any user-defined configuration to Consul and write it to the user config file.
func (c Config) WriteUserConfig(ctx context.Context, conn *pgx.Conn, consul *state.ConsulClient, cfg PGConfig) error {
func (c *Config) WriteUserConfig(ctx context.Context, conn *pgx.Conn, consul *state.ConsulClient, cfg config.ConfigMap) error {
if c.userConfig != nil {
if err := c.pushToConsul(consul, cfg); err != nil {
return fmt.Errorf("failed to write to consul: %s", err)
Expand All @@ -206,7 +209,7 @@ func (c Config) WriteUserConfig(ctx context.Context, conn *pgx.Conn, consul *sta

// SyncUserConfig will pull the latest user-defined configuration data from Consul and
// write it to the user config file.
func (c Config) SyncUserConfig(ctx context.Context, consul *state.ConsulClient) error {
func (c *Config) SyncUserConfig(ctx context.Context, consul *state.ConsulClient) error {
cfg, err := c.pullFromConsul(consul)
if err != nil {
return fmt.Errorf("failed to pull config from consul: %s", err)
Expand All @@ -220,8 +223,8 @@ func (c Config) SyncUserConfig(ctx context.Context, consul *state.ConsulClient)
}

// ApplyUserConfigAtRuntime will take a config and attempt to set it at runtime.
func (c Config) ApplyUserConfigAtRuntime(ctx context.Context, conn *pgx.Conn, conf PGConfig) error {
for key, value := range conf {
func (c *Config) RuntimeApply(ctx context.Context, conn *pgx.Conn) error {
for key, value := range c.userConfig {
if err := admin.SetConfigurationSetting(ctx, conn, key, value); err != nil {
fmt.Printf("failed to set configuration setting %s -> %s: %s", key, value, err)
}
Expand All @@ -230,7 +233,7 @@ func (c Config) ApplyUserConfigAtRuntime(ctx context.Context, conn *pgx.Conn, co
return nil
}

func (c Config) pushToConsul(consul *state.ConsulClient, conf PGConfig) error {
func (c *Config) pushToConsul(consul *state.ConsulClient, conf config.ConfigMap) error {
if conf == nil {
return nil
}
Expand All @@ -240,14 +243,14 @@ func (c Config) pushToConsul(consul *state.ConsulClient, conf PGConfig) error {
return err
}

if err := consul.PushUserConfig(configBytes); err != nil {
if err := consul.PushUserConfig(ConfKey, configBytes); err != nil {
return err
}

return nil
}

func (c Config) writeToFile(pathToFile string, conf PGConfig) error {
func (c *Config) writeToFile(pathToFile string, conf config.ConfigMap) error {
file, err := os.OpenFile(pathToFile, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return err
Expand All @@ -262,14 +265,14 @@ func (c Config) writeToFile(pathToFile string, conf PGConfig) error {
return nil
}

func (c Config) pullFromFile(pathToFile string) (PGConfig, error) {
func (c *Config) pullFromFile(pathToFile string) (config.ConfigMap, error) {
file, err := os.Open(pathToFile)
if err != nil {
return nil, err
}
defer file.Close()

pgConf := PGConfig{}
pgConf := config.ConfigMap{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lineArr := strings.Split(scanner.Text(), "=")
Expand All @@ -281,13 +284,13 @@ func (c Config) pullFromFile(pathToFile string) (PGConfig, error) {
return pgConf, nil
}

func (c Config) pullFromConsul(consul *state.ConsulClient) (PGConfig, error) {
configBytes, err := consul.PullUserConfig()
func (c *Config) pullFromConsul(consul *state.ConsulClient) (config.ConfigMap, error) {
configBytes, err := consul.PullUserConfig(ConfKey)
if err != nil {
return nil, err
}

var storeCfg PGConfig
var storeCfg config.ConfigMap
if err = json.Unmarshal(configBytes, &storeCfg); err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/flypg/pgbouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flypg
import (
"context"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/utils"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -41,15 +42,15 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload

func (p *PGBouncer) initialize() error {
cmdStr := fmt.Sprintf("mkdir -p %s", p.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
return err
}

// If pgbouncer.ini file is not present, set defaults.
if _, err := os.Stat(fmt.Sprintf("%s/pgbouncer.ini", p.ConfigPath)); err != nil {
if os.IsNotExist(err) {
cmdStr := fmt.Sprintf("cp /fly/pgbouncer.ini %s", p.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
return err
}
} else {
Expand Down
15 changes: 8 additions & 7 deletions pkg/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flypg
import (
"context"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/utils"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -115,7 +116,7 @@ func (r *RepMgr) writeManagerConf() error {

func (r *RepMgr) registerPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s primary register -F -v", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
return err
}

Expand All @@ -124,7 +125,7 @@ func (r *RepMgr) registerPrimary() error {

func (r *RepMgr) unregisterPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s primary unregister", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
return err
}

Expand All @@ -133,7 +134,7 @@ func (r *RepMgr) unregisterPrimary() error {

func (r *RepMgr) followPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s standby follow", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
fmt.Printf("failed to register standby: %s", err)
}

Expand All @@ -143,7 +144,7 @@ func (r *RepMgr) followPrimary() error {
func (r *RepMgr) registerStandby() error {
// Force re-registry to ensure the standby picks up any new configuration changes.
cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
fmt.Printf("failed to register standby: %s", err)
}

Expand All @@ -152,7 +153,7 @@ func (r *RepMgr) registerStandby() error {

func (r *RepMgr) UnregisterStandby(id int) error {
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
fmt.Printf("failed to unregister standby: %s", err)
}

Expand All @@ -161,7 +162,7 @@ func (r *RepMgr) UnregisterStandby(id int) error {

func (r *RepMgr) clonePrimary(ipStr string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if err := runCommand(cmdStr); err != nil {
if err := utils.RunCommand(cmdStr); err != nil {
return err
}

Expand All @@ -173,7 +174,7 @@ func (r *RepMgr) clonePrimary(ipStr string) error {
r.ConfigPath)

fmt.Println(cmdStr)
return runCommand(cmdStr)
return utils.RunCommand(cmdStr)
}

func (r *RepMgr) writePasswdConf() error {
Expand Down
Loading