Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ DBUSERS:

func (c *Cluster) syncDatabases() error {
c.setProcessName("syncing databases")

errors := make([]string, 0)
createDatabases := make(map[string]string)
alterOwnerDatabases := make(map[string]string)
preparedDatabases := make([]string, 0)
Expand Down Expand Up @@ -1097,12 +1097,12 @@ func (c *Cluster) syncDatabases() error {

for databaseName, owner := range createDatabases {
if err = c.executeCreateDatabase(databaseName, owner); err != nil {
return err
errors = append(errors, err.Error())
}
}
for databaseName, owner := range alterOwnerDatabases {
if err = c.executeAlterDatabaseOwner(databaseName, owner); err != nil {
return err
errors = append(errors, err.Error())
}
}

Expand All @@ -1118,24 +1118,32 @@ func (c *Cluster) syncDatabases() error {
// set default privileges for prepared database
for _, preparedDatabase := range preparedDatabases {
if err := c.initDbConnWithName(preparedDatabase); err != nil {
return fmt.Errorf("could not init database connection to %s", preparedDatabase)
errors = append(errors, fmt.Sprintf("could not init database connection to %s", preparedDatabase))
continue
}

for _, owner := range c.getOwnerRoles(preparedDatabase, c.Spec.PreparedDatabases[preparedDatabase].DefaultUsers) {
if err = c.execAlterGlobalDefaultPrivileges(owner, preparedDatabase); err != nil {
return err
errors = append(errors, err.Error())
}
}
}

if len(errors) > 0 {
return fmt.Errorf("error(s) while syncing databases: %v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) syncPreparedDatabases() error {
c.setProcessName("syncing prepared databases")
errors := make([]string, 0)

for preparedDbName, preparedDB := range c.Spec.PreparedDatabases {
if err := c.initDbConnWithName(preparedDbName); err != nil {
return fmt.Errorf("could not init connection to database %s: %v", preparedDbName, err)
errors = append(errors, fmt.Sprintf("could not init connection to database %s: %v", preparedDbName, err))
continue
}

c.logger.Debugf("syncing prepared database %q", preparedDbName)
Expand All @@ -1145,24 +1153,30 @@ func (c *Cluster) syncPreparedDatabases() error {
preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}}
}
if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil {
return err
errors = append(errors, err.Error())
continue
}

// install extensions
if err := c.syncExtensions(preparedDB.Extensions); err != nil {
return err
errors = append(errors, err.Error())
}

if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}

if len(errors) > 0 {
return fmt.Errorf("error(s) while syncing prepared databases: %v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) syncPreparedSchemas(databaseName string, preparedSchemas map[string]acidv1.PreparedSchema) error {
c.setProcessName("syncing prepared schemas")
errors := make([]string, 0)

currentSchemas, err := c.getSchemas()
if err != nil {
Expand All @@ -1185,17 +1199,21 @@ func (c *Cluster) syncPreparedSchemas(databaseName string, preparedSchemas map[s
owner = dbOwner
}
if err = c.executeCreateDatabaseSchema(databaseName, schemaName, dbOwner, owner); err != nil {
return err
errors = append(errors, err.Error())
}
}
}

if len(errors) > 0 {
return fmt.Errorf("error(s) while syncing schemas of prepared databases: %v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) syncExtensions(extensions map[string]string) error {
c.setProcessName("syncing database extensions")

errors := make([]string, 0)
createExtensions := make(map[string]string)
alterExtensions := make(map[string]string)

Expand All @@ -1215,15 +1233,19 @@ func (c *Cluster) syncExtensions(extensions map[string]string) error {

for extName, schema := range createExtensions {
if err = c.executeCreateExtension(extName, schema); err != nil {
return err
errors = append(errors, err.Error())
}
}
for extName, schema := range alterExtensions {
if err = c.executeAlterExtension(extName, schema); err != nil {
return err
errors = append(errors, err.Error())
}
}

if len(errors) > 0 {
return fmt.Errorf("error(s) while syncing database extensions: %v", strings.Join(errors, `', '`))
}

return nil
}

Expand Down