Skip to content

Commit

Permalink
feat(postgresqldatabase-controller): Add support for type ownership r…
Browse files Browse the repository at this point in the history
…ecover
  • Loading branch information
oxyno-zeta committed Jun 11, 2024
1 parent 55b7e89 commit c33f95a
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 1 deletion.
61 changes: 60 additions & 1 deletion internal/controller/postgresql/postgres/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ const (
DefaultPrivsSchemaSQLTemplate = `ALTER DEFAULT PRIVILEGES FOR ROLE "%s" IN SCHEMA "%s" GRANT %s ON TABLES TO "%s"`
GetTablesFromSchemaSQLTemplate = `SELECT table_name FROM information_schema.tables WHERE table_schema = '%s'`
ChangeTableOwnerSQLTemplate = `ALTER TABLE IF EXISTS "%s" OWNER TO "%s"`
DuplicateDatabaseErrorCode = "42P04"
ChangeTypeOwnerSQLTemplate = `ALTER TYPE "%s"."%s" OWNER TO "%s"`
// Got and edited from : https://stackoverflow.com/questions/3660787/how-to-list-custom-types-using-postgres-information-schema
GetTypesFromSchemaSQLTemplate = `SELECT t.typname as type
FROM pg_type t
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid)
AND n.nspname = '%s';`
DuplicateDatabaseErrorCode = "42P04"
)

func (c *pg) IsDatabaseExist(dbname string) (bool, error) {
Expand Down Expand Up @@ -157,6 +165,57 @@ func (c *pg) ChangeTableOwner(db, table, owner string) error {
return nil
}

func (c *pg) GetTypesInSchema(db, schema string) ([]string, error) {
err := c.connect(db)
if err != nil {
return nil, err
}

rows, err := c.db.Query(fmt.Sprintf(GetTypesFromSchemaSQLTemplate, schema))
if err != nil {
return nil, err
}

defer rows.Close()

res := []string{}

for rows.Next() {
typeName := ""
// Scan
err = rows.Scan(&typeName)
// Check error
if err != nil {
return nil, err
}
// Save
res = append(res, typeName)
}

// Rows error
err = rows.Err()
// Check error
if err != nil {
return nil, err
}

return res, nil
}

func (c *pg) ChangeTypeOwnerInSchema(db, schema, typeName, owner string) error {
err := c.connect(db)
if err != nil {
return err
}

_, err = c.db.Exec(fmt.Sprintf(ChangeTypeOwnerSQLTemplate, schema, typeName, owner))
if err != nil {
return err
}

return nil
}

func (c *pg) DropDatabase(database string) error {
err := c.connect(c.defaultDatabase)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/postgresql/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type PG interface { //nolint:interfacebloat // This is needed
GetRoleMembership(role string) ([]string, error)
GetTablesInSchema(db, schema string) ([]string, error)
ChangeTableOwner(db, table, owner string) error
GetTypesInSchema(db, schema string) ([]string, error)
ChangeTypeOwnerInSchema(db, schema, typeName, owner string) error
GetUser() string
GetHost() string
GetPort() int
Expand Down
15 changes: 15 additions & 0 deletions internal/controller/postgresql/postgresqldatabase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,21 @@ func (*PostgresqlDatabaseReconciler) manageSchemas(pg postgres.PG, instance *pos
}
}

// Get list of types inside schema
types, err := pg.GetTypesInSchema(instance.Spec.Database, schema)
if err != nil {
return err
}

// Loop over all types to force owner
for _, typeName := range types {
// Force table owner
err = pg.ChangeTypeOwnerInSchema(instance.Spec.Database, schema, typeName, owner)
if err != nil {
return err
}
}

// Check if schema was created. Skip if already added
if !funk.ContainsString(instance.Status.Schemas, schema) {
instance.Status.Schemas = append(instance.Status.Schemas, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,46 @@ var _ = Describe("PostgresqlDatabase tests", func() {
Should(Succeed())
})

It("should be ok to recover a wrong type owner", func() {
// Create pgec
setupPGEC("10s", false)

// Create pgdb
item := setupPGDB(false)

Expect(len(item.Status.Schemas)).To(Equal(1))
Expect(item.Status.Schemas).To(ContainElement(pgPublicSchemaName))

// Schema should be in sql db
exists, err := isSQLSchemaExists(pgPublicSchemaName)
Expect(err).ToNot(HaveOccurred())
Expect(exists).To(BeTrue())

// Add table to schema
typeName := "mytype"
err = createTypeInSchemaAsAdmin(pgPublicSchemaName, typeName)
Expect(err).ToNot(HaveOccurred())

Eventually(
func() error {
owner, err := getTypeOwner(pgdbDBName, typeName)
if err != nil {
return err
}

// Check owner
if owner != item.Status.Roles.Owner {
return errors.New("operator didn't change owner: " + owner)
}

return nil
},
generalEventuallyTimeout,
generalEventuallyInterval,
).
Should(Succeed())
})

It("should be ok to remove a schema with drop on delete without cascade", func() {
// Create pgec
prov, _ := setupPGEC("10s", false)
Expand Down
62 changes: 62 additions & 0 deletions internal/controller/postgresql/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,68 @@ func createTableInSchemaAsAdmin(schema, table string) error {
return nil
}

// Here we are considering that type cannot be in another schema just for test.
// This is easier for test cases.
func getTypeOwner(dbName, typeName string) (string, error) {
// Connect
db, err := sql.Open("postgres", fmt.Sprintf(postgresUrlWithDbTemplate, postgresUser, postgresPassword, dbName))
// Check error
if err != nil {
return "", err
}

defer db.Close()

sqlTemplate := `SELECT typowner::regrole FROM pg_type WHERE typname = '%s';`
res, err := db.Query(fmt.Sprintf(sqlTemplate, typeName))
if err != nil {
return "", err
}

var owner string
for res.Next() {
err = res.Scan(&owner)
if err != nil {
return "", err
}
}

// Rows error
err = res.Err()
// Check error
if err != nil {
return "", err
}

// Clean member to remove extra "
owner = strings.ReplaceAll(owner, `"`, "")

return owner, nil
}

func createTypeInSchemaAsAdmin(schema, typeName string) error {
// Query template
CreateTypeInSchemaTemplate := `CREATE TYPE "%s"."%s" AS ENUM ('new', 'open', 'closed');`

// Connect
db, err := sql.Open("postgres", postgresUrlToDB)
// Check error
if err != nil {
return err
}

defer func() error {
return db.Close()
}()

_, err = db.Exec(fmt.Sprintf(CreateTypeInSchemaTemplate, schema, typeName))
if err != nil {
return err
}

return nil
}

func checkRoleInSQLDb(role string) {
roleExists, roleErr := isSQLRoleExists(role)
Expect(roleErr).ToNot(HaveOccurred())
Expand Down

0 comments on commit c33f95a

Please sign in to comment.