Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 138 additions & 131 deletions pkg/flypg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,164 +17,177 @@ import (
"github.com/shirou/gopsutil/v3/mem"
)

type pgConfig map[string]interface{}
type PGConfig map[string]interface{}

type Config struct {
configFilePath string
customConfigFilePath string
dataDir string
configFilePath string

pgConfig pgConfig
internalConfigFilePath string
userConfigFilePath string
dataDir string

internalConfig PGConfig
userConfig PGConfig
}

func NewConfig(dataDir string) *Config {
return &Config{
dataDir: dataDir,
configFilePath: fmt.Sprintf("%s/postgresql.conf", dataDir),
customConfigFilePath: fmt.Sprintf("%s/postgresql.custom.conf", dataDir),
pgConfig: pgConfig{},
dataDir: dataDir,
configFilePath: fmt.Sprintf("%s/postgresql.conf", dataDir),

internalConfigFilePath: fmt.Sprintf("%s/postgresql.internal.conf", dataDir),
userConfigFilePath: fmt.Sprintf("%s/postgresql.user.conf", dataDir),

internalConfig: PGConfig{},
userConfig: PGConfig{},
}
}

// SetDefaults will apply the default configuration settings to the config struct.
// Warning - it's important that this is called prior to any new settings, otherwise
// they may be overwritten.
func (c *Config) SetDefaults() error {
mem, err := memTotal()
// Print will output the local configuration data to stdout.
func (c *Config) Print(w io.Writer) error {
internalCfg, err := c.pullFromConfig(c.internalConfigFilePath)
if err != nil {
return fmt.Errorf("failed to fetch total system memory: %s", err)
return fmt.Errorf("failed to read internal config: %s", err)
}

c.pgConfig = map[string]interface{}{
"shared_buffers": fmt.Sprintf("%dMB", mem/4),
"max_wal_senders": 10,
"max_connections": 300,
"wal_level": "hot_standby",
"hot_standby": true,
"archive_mode": true,
"archive_command": "'/bin/true'",
"shared_preload_libraries": "repmgr",
userCfg, err := c.pullFromConfig(c.userConfigFilePath)
if err != nil {
return fmt.Errorf("failed to read internal config: %s", err)
}

return nil
}
cfg := PGConfig{}

// SaveOffline will write our configuration data to Consul and to our local configuration
// file. This is safe to run when Postgres is not running.
func (c Config) SaveOffline(consul *state.ConsulClient) error {
// Don't persist an empty config
if c.pgConfig == nil {
return nil
}
// Push configuration to Consul.
if err := c.pushToConsul(consul); err != nil {
return fmt.Errorf("failed to write to consul: %s", err)
for k, v := range internalCfg {
cfg[k] = v
}

// Write configuration to local file.
if err := c.writeToFile(); err != nil {
return fmt.Errorf("failed to write to pg config file: %s", err)
for k, v := range userCfg {
cfg[k] = v
}

return nil
e := json.NewEncoder(w)
e.SetIndent("", " ")

return e.Encode(cfg)
}

// SaveOnline will write our configuration information to Consul, local configuration
// and will attempt to apply eligible changes at runtime.
func (c Config) SaveOnline(ctx context.Context, conn *pgx.Conn, consul *state.ConsulClient) error {
// Don't persist an empty config
if c.pgConfig == nil {
return fmt.Errorf("unable to save an empty config")
// Setup will ensure the required configuration files are created and that the parent
// postgresql.conf file is including them.
func (c Config) Setup() error {
if _, err := os.Stat(c.internalConfigFilePath); err != nil {
if os.IsNotExist(err) {
if err := runCommand(fmt.Sprintf("touch %s", c.internalConfigFilePath)); err != nil {
return err
}
} else {
return err
}
}

if err := c.SaveOffline(consul); err != nil {
return err
if _, err := os.Stat(c.userConfigFilePath); err != nil {
if os.IsNotExist(err) {
if err := runCommand(fmt.Sprintf("touch %s", c.userConfigFilePath)); err != nil {
return err
}
} else {
return err
}
}

// Attempt to set configurations ettings at runtime.
if err := c.applyPGConfigAtRuntime(ctx, conn); err != nil {
return fmt.Errorf("faield to write to pg runtime: %s", err)
b, err := ioutil.ReadFile(c.configFilePath)
if err != nil {
return err
}

return nil
}

// SyncOffline will pull the latest Postgres configuration information from Consul and
// write it to our local configuration file.
func (c *Config) SyncOffline(ctx context.Context, consul *state.ConsulClient) error {
// Apply Consul configuration.
if err := c.pullConfigFromConsul(consul); err != nil {
return fmt.Errorf("failed to pull config from consul: %s", err)
}
// Write configuration to local file.
if err := c.writeToFile(); err != nil {
return fmt.Errorf("failed to write to pg config file: %s", err)
var entries []string
if !strings.Contains(string(b), "postgresql.internal.conf") {
entries = append(entries, "include 'postgresql.internal.conf'\n")
}

return nil
}

// SyncOnline will pull the latest Postgres configuration information from Consul and
// write it to our local configuration file and attempt to apply any new changes at runtime.
func (c *Config) SyncOnline(ctx context.Context, conn *pgx.Conn, consul *state.ConsulClient) error {
if err := c.SyncOffline(ctx, consul); err != nil {
return err
if !strings.Contains(string(b), "postgresql.user.conf") {
entries = append(entries, "include 'postgresql.user.conf'\n")
}

fmt.Println("Applying config at runtime")
// Attempt to set configuration settings at runtime.
if err := c.applyPGConfigAtRuntime(ctx, conn); err != nil {
return fmt.Errorf("faield to write to pg runtime: %s", err)
if len(entries) > 0 {
f, err := os.OpenFile(c.configFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil
}
defer f.Close()

for _, entry := range entries {
if _, err := f.WriteString(entry); err != nil {
return fmt.Errorf("failed append configuration entry: %s", err)
}
}
}

return nil
}

// Print will output the local configuration data to stdout.
func (c *Config) Print(w io.Writer) error {
if err := c.SetDefaults(); err != nil {
return err
// WriteDefaults will resolve the default configuration settings and write them to the
// internal config file.
func (c Config) WriteDefaults() error {
mem, err := memTotal()
if err != nil {
return fmt.Errorf("failed to fetch total system memory: %s", err)
}
if err := c.pullFromFile(); err != nil {
return err

conf := PGConfig{
"shared_buffers": fmt.Sprintf("%dMB", mem/4),
"max_wal_senders": 10,
"max_connections": 300,
"wal_level": "hot_standby",
"hot_standby": true,
"archive_mode": true,
"archive_command": "'/bin/true'",
"shared_preload_libraries": "repmgr",
}

e := json.NewEncoder(w)
e.SetIndent("", " ")
// Write configuration to local file.
if err := c.writeToConfig(c.internalConfigFilePath, conf); err != nil {
return fmt.Errorf("failed to write to pg config file: %s", err)
}

return e.Encode(c.pgConfig)
return nil
}

func (c Config) EnableCustomConfig() error {
if err := runCommand(fmt.Sprintf("touch %s", c.customConfigFilePath)); err != nil {
return err
}
// WriteUserConfig will push any user-defined configuration to Consul and write it to the user config file.
func (c Config) WriteUserConfig(ctx context.Context, conn *pgx.Conn, consul *state.ConsulClient, cfg PGConfig) error {
if c.userConfig != nil {
if err := c.pushToConsul(consul, cfg); err != nil {
return fmt.Errorf("failed to write to consul: %s", err)
}

// read the whole file at once
b, err := ioutil.ReadFile(c.configFilePath)
if err != nil {
return err
// Write configuration to local file.
if err := c.writeToConfig(c.userConfigFilePath, cfg); err != nil {
return fmt.Errorf("failed to write to pg config file: %s", err)
}
}

if strings.Contains(string(b), "postgresql.custom.conf") {
return nil
}
return nil
}

f, err := os.OpenFile(c.configFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
// SyncUserConfig will pull the latest user-defined configuration data from Consul and
// write it to the user config file.
func (c Config) SyncUserConfig(ctx context.Context, consul *state.ConsulClient) error {
// Apply Consul configuration.
cfg, err := c.pullConfigFromConsul(consul)
if err != nil {
return nil
return fmt.Errorf("failed to pull config from consul: %s", err)
}
defer f.Close()

if _, err := f.WriteString("include 'postgresql.custom.conf'"); err != nil {
return fmt.Errorf("failed append to conf file: %s", err)
// Write configuration to local file.
if err := c.writeToConfig(c.userConfigFilePath, cfg); err != nil {
return fmt.Errorf("failed to write to pg config file: %s", err)
}

return nil
}

func (c Config) applyPGConfigAtRuntime(ctx context.Context, conn *pgx.Conn) error {
for key, value := range c.pgConfig {
// ApplyUserConfigAtRuntime will take a config and attempt to set it at runtime.
func (c Config) ApplyUserConfigAtRuntime(ctx context.Context, conn *pgx.Conn, conf PGConfig) error {
for key, value := range conf {
if err := admin.SetConfigurationSetting(ctx, conn, key, value); err != nil {
fmt.Printf("failed to set configuration setting %s -> %s: %s", key, value, err)
}
Expand All @@ -183,75 +196,69 @@ func (c Config) applyPGConfigAtRuntime(ctx context.Context, conn *pgx.Conn) erro
return nil
}

func (c Config) pushToConsul(consul *state.ConsulClient) error {
configBytes, err := json.Marshal(c.pgConfig)
if err != nil {
return err
func (c Config) pushToConsul(consul *state.ConsulClient, conf PGConfig) error {
if conf == nil {
return nil
}

if consul == nil {
consul, err = state.NewConsulClient()
if err != nil {
return err
}
configBytes, err := json.Marshal(conf)
if err != nil {
return err
}

if err := consul.PushConfig(configBytes); err != nil {
if err := consul.PushUserConfig(configBytes); err != nil {
return err
}

return nil
}

func (c Config) writeToFile() error {
file, err := os.OpenFile(c.customConfigFilePath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
func (c Config) writeToConfig(pathToConfig string, conf PGConfig) error {
file, err := os.OpenFile(pathToConfig, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer file.Close()

for key, value := range c.pgConfig {
for key, value := range conf {
entry := fmt.Sprintf("%s = %v\n", key, value)
file.Write([]byte(entry))
}

return nil
}

func (c *Config) pullFromFile() error {
file, err := os.Open(c.customConfigFilePath)
func (c Config) pullFromConfig(pathToFile string) (PGConfig, error) {
file, err := os.Open(pathToFile)
if err != nil {
return err
return nil, err
}
defer file.Close()

pgConf := PGConfig{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lineArr := strings.Split(scanner.Text(), "=")
key := strings.TrimSpace(lineArr[0])
value := strings.TrimSpace(lineArr[1])
c.pgConfig[key] = value
pgConf[key] = value
}

return nil
return pgConf, nil
}

func (c *Config) pullConfigFromConsul(consul *state.ConsulClient) error {
configBytes, err := consul.PullConfig()
func (c Config) pullConfigFromConsul(consul *state.ConsulClient) (PGConfig, error) {
configBytes, err := consul.PullUserConfig()
if err != nil {
return err
return nil, err
}

var storeCfg pgConfig
var storeCfg PGConfig
if err = json.Unmarshal(configBytes, &storeCfg); err != nil {
return err
return nil, err
}

for key, value := range storeCfg {
c.pgConfig[key] = value
}

return nil
return storeCfg, nil
}

func memTotal() (memoryMb int64, err error) {
Expand Down
Loading