From 75cacbe9e610578b480482795144d3628ce6776d Mon Sep 17 00:00:00 2001 From: sanketjadhavSF Date: Sat, 26 Jul 2025 01:27:23 +0530 Subject: [PATCH 1/4] Add PostgreSQL plugin with query execution capabilities and documentation --- .../pkg/object/command/postgres/postgres.go | 154 ++++++++++++++++++ plugins/postgres/README.md | 73 +++++++++ plugins/postgres/postgres.go | 12 ++ 3 files changed, 239 insertions(+) create mode 100644 internal/pkg/object/command/postgres/postgres.go create mode 100644 plugins/postgres/README.md create mode 100644 plugins/postgres/postgres.go diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go new file mode 100644 index 0000000..5f8421a --- /dev/null +++ b/internal/pkg/object/command/postgres/postgres.go @@ -0,0 +1,154 @@ +package postgres + +import ( + "fmt" + "strings" + "sync" + + "github.com/patterninc/heimdall/internal/pkg/database" + pkgcontext "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/job" + "github.com/patterninc/heimdall/pkg/plugin" + "github.com/patterninc/heimdall/pkg/result" + "github.com/patterninc/heimdall/pkg/result/column" +) + +// postgresJobContext represents the context for a PostgreSQL job +type postgresJobContext struct { + Query string `yaml:"query,omitempty" json:"query,omitempty"` + ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` +} + +type postgresClusterContext struct { + ConnectionString string `yaml:"connection_string,omitempty" json:"connection_string,omitempty"` +} + +type postgresCommandContext struct { + mu sync.Mutex +} + +// New creates a new PostgreSQL plugin handler. +func New(_ *pkgcontext.Context) (plugin.Handler, error) { + p := &postgresCommandContext{} + return p.handler, nil +} + +// Handler for the PostgreSQL query execution. +func (p *postgresCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { + jobContext := &postgresJobContext{} + if j.Context != nil { + if err := j.Context.Unmarshal(jobContext); err != nil { + return fmt.Errorf("failed to unmarshal job context: %w", err) + } + } + if jobContext.Query == "" { + return fmt.Errorf("query is required in job context") + } + + clusterContext := &postgresClusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterContext); err != nil { + return fmt.Errorf("failed to unmarshal cluster context: %w", err) + } + } + if clusterContext.ConnectionString == "" { + return fmt.Errorf("connection_string is required in cluster context") + } + + db := &database.Database{ConnectionString: clusterContext.ConnectionString} + + if jobContext.ReturnResult { + return handleSyncQuery(db, jobContext.Query, j) + } + return p.handleAsyncQueries(db, jobContext.Query, j) +} + +func handleSyncQuery(db *database.Database, query string, j *job.Job) error { + // Allow a single query, even if it ends with a semicolon + queries := splitAndTrimQueries(query) + if len(queries) != 1 { + return fmt.Errorf("multiple queries are not allowed when return_result is true") + } + + sess, err := db.NewSession(false) + if err != nil { + return fmt.Errorf("failed to open PostgreSQL connection: %w", err) + } + defer sess.Close() + + rows, err := sess.Query(queries[0]) + if err != nil { + return fmt.Errorf("PostgreSQL query execution failed: %w", err) + } + defer rows.Close() + + rowsResult, err := result.FromRows(rows) + if err != nil { + return fmt.Errorf("failed to process PostgreSQL query results: %w", err) + } + + j.Result = rowsResult + return nil +} + +func (p *postgresCommandContext) handleAsyncQueries(db *database.Database, query string, j *job.Job) error { + queries := splitAndTrimQueries(query) + if len(queries) == 0 { + return fmt.Errorf("no valid queries found in job context") + } + + // Track error for async jobs and set job result if any query fails + errChan := make(chan error, 1) + p.mu.Lock() + go func(db *database.Database, queries []string, errChan chan error) { + defer p.mu.Unlock() + sess, err := db.NewSession(true) + if err != nil { + errChan <- fmt.Errorf("Async PostgreSQL connection error: %v", err) + return + } + defer sess.Close() + + for i, q := range queries { + _, err = sess.Exec(q) + if err != nil { + errChan <- fmt.Errorf("error at line %d: %s | query: %s", i+1, err.Error(), q) + return + } + } + errChan <- nil + }(db, queries, errChan) + + err := <-errChan + if err != nil { + j.Result = &result.Result{ + Columns: []*column.Column{{ + Name: "error", + Type: column.Type("string"), + }}, + Data: [][]any{{err.Error()}}, + } + return err + } + + j.Result = &result.Result{ + Columns: []*column.Column{{ + Name: "message", + Type: column.Type("string"), + }}, + Data: [][]any{{"All queries executed successfully"}}, + } + return nil +} + +func splitAndTrimQueries(query string) []string { + queries := []string{} + for _, q := range strings.Split(query, ";") { + q = strings.TrimSpace(q) + if q != "" { + queries = append(queries, q) + } + } + return queries +} diff --git a/plugins/postgres/README.md b/plugins/postgres/README.md new file mode 100644 index 0000000..f7bfaa5 --- /dev/null +++ b/plugins/postgres/README.md @@ -0,0 +1,73 @@ +# PostgreSQL Plugin + +This plugin provides an interface to PostgreSQL databases with support for direct SQL queries, SQL files, and batch execution. + +## Features + +- Execute single or multiple SQL statements +- Support for both direct query strings and SQL file execution +- Synchronous (return_result: true) and asynchronous (return_result: false) execution modes +- Error reporting with line number and query for batch execution +- Transaction support for batch execution + +## Configuration + +### Cluster Context + +```yaml +connection_string: postgresql://user:password@host:port/database # Required +``` + +### Job Context + +```yaml +query: SELECT * FROM my_table # Required - SQL query to execute or path to .sql file +return_result: true # Optional - Whether to return query results (default: false) +``` + +#### Execution Modes + +1. **Return Results Mode** (`return_result: true`): + - Only a single query is allowed (trailing semicolon is fine) + - Returns query results as structured data + - Fails if multiple queries are provided + +2. **Execute Only Mode** (`return_result: false`): + - Multiple queries separated by `;` are allowed + - Executes all queries in order, stops and fails on first error + - Returns error message with line number and query if any query fails + - Returns a success message if all queries succeed + +## Usage + +```yaml +# Example 1: SELECT query with results +- name: postgresql-select + description: Execute a SELECT query and return results + command: postgres + cluster: postgres-cluster + context: + query: SELECT * FROM my_table WHERE status = 'active' + return_result: true + +# Example 2: Execute an UPDATE statement +- name: postgresql-update + description: Execute an UPDATE statement + command: postgres + cluster: postgres-cluster + context: + query: UPDATE my_table SET status = 'inactive' WHERE last_active < '2025-01-01' + return_result: false + + +# Example 3: Execute multiple statements in a batch +- name: postgresql-batch + description: Execute multiple SQL statements in a batch + command: postgres + cluster: postgres-cluster + context: + query: | + INSERT INTO orders (id, customer_id, amount) VALUES ('ORD001', 'CUST123', 299.99); + UPDATE inventory SET stock = stock - 1 WHERE product_id = 'PROD456'; + return_result: false +``` diff --git a/plugins/postgres/postgres.go b/plugins/postgres/postgres.go new file mode 100644 index 0000000..87300e8 --- /dev/null +++ b/plugins/postgres/postgres.go @@ -0,0 +1,12 @@ +package main + +import ( + "github.com/patterninc/heimdall/internal/pkg/object/command/postgres" + "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/plugin" +) + +// New creates a new instance of the postgres plugin. +func New(c *context.Context) (plugin.Handler, error) { + return postgres.New(c) +} From 5b2731da5d50bff11a50bde888ab01bb4a2dd87a Mon Sep 17 00:00:00 2001 From: sanketjadhavSF Date: Mon, 28 Jul 2025 19:31:35 +0530 Subject: [PATCH 2/4] simplify Async queries execution --- .../pkg/object/command/postgres/postgres.go | 50 +++++++------------ 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go index 5f8421a..25af3e6 100644 --- a/internal/pkg/object/command/postgres/postgres.go +++ b/internal/pkg/object/command/postgres/postgres.go @@ -59,12 +59,12 @@ func (p *postgresCommandContext) handler(r *plugin.Runtime, j *job.Job, c *clust db := &database.Database{ConnectionString: clusterContext.ConnectionString} if jobContext.ReturnResult { - return handleSyncQuery(db, jobContext.Query, j) + return executeSyncQuery(db, jobContext.Query, j) } - return p.handleAsyncQueries(db, jobContext.Query, j) + return p.executeAsyncQueries(db, jobContext.Query, j) } -func handleSyncQuery(db *database.Database, query string, j *job.Job) error { +func executeSyncQuery(db *database.Database, query string, j *job.Job) error { // Allow a single query, even if it ends with a semicolon queries := splitAndTrimQueries(query) if len(queries) != 1 { @@ -92,44 +92,30 @@ func handleSyncQuery(db *database.Database, query string, j *job.Job) error { return nil } -func (p *postgresCommandContext) handleAsyncQueries(db *database.Database, query string, j *job.Job) error { - queries := splitAndTrimQueries(query) - if len(queries) == 0 { - return fmt.Errorf("no valid queries found in job context") - } - - // Track error for async jobs and set job result if any query fails - errChan := make(chan error, 1) - p.mu.Lock() - go func(db *database.Database, queries []string, errChan chan error) { - defer p.mu.Unlock() - sess, err := db.NewSession(true) - if err != nil { - errChan <- fmt.Errorf("Async PostgreSQL connection error: %v", err) - return - } - defer sess.Close() - - for i, q := range queries { - _, err = sess.Exec(q) - if err != nil { - errChan <- fmt.Errorf("error at line %d: %s | query: %s", i+1, err.Error(), q) - return - } +func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error { + sess, err := db.NewSession(false) + if err != nil { + j.Result = &result.Result{ + Columns: []*column.Column{{ + Name: "error", + Type: column.Type("string"), + }}, + Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}}, } - errChan <- nil - }(db, queries, errChan) + return fmt.Errorf("Async PostgreSQL connection error: %v", err) + } + defer sess.Close() - err := <-errChan + _, err = sess.Exec(query) if err != nil { j.Result = &result.Result{ Columns: []*column.Column{{ Name: "error", Type: column.Type("string"), }}, - Data: [][]any{{err.Error()}}, + Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}}, } - return err + return fmt.Errorf("Async PostgreSQL query execution error: %v", err) } j.Result = &result.Result{ From 437d3fed3b46c8bfb06494467352d220760b9425 Mon Sep 17 00:00:00 2001 From: sanketjadhavSF Date: Mon, 28 Jul 2025 22:37:52 +0530 Subject: [PATCH 3/4] Refactor PostgreSQL handler for readability --- .../pkg/object/command/postgres/postgres.go | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go index 25af3e6..baab46a 100644 --- a/internal/pkg/object/command/postgres/postgres.go +++ b/internal/pkg/object/command/postgres/postgres.go @@ -36,32 +36,48 @@ func New(_ *pkgcontext.Context) (plugin.Handler, error) { // Handler for the PostgreSQL query execution. func (p *postgresCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { + jobContext, err := validateJobContext(j) + if err != nil { + return err + } + + clusterContext, err := validateClusterContext(c) + if err != nil { + return err + } + + db := &database.Database{ConnectionString: clusterContext.ConnectionString} + + if jobContext.ReturnResult { + return executeSyncQuery(db, jobContext.Query, j) + } + return p.executeAsyncQueries(db, jobContext.Query, j) +} + +func validateJobContext(j *job.Job) (*postgresJobContext, error) { jobContext := &postgresJobContext{} if j.Context != nil { if err := j.Context.Unmarshal(jobContext); err != nil { - return fmt.Errorf("failed to unmarshal job context: %w", err) + return nil, fmt.Errorf("failed to unmarshal job context: %w", err) } } if jobContext.Query == "" { - return fmt.Errorf("query is required in job context") + return nil, fmt.Errorf("query is required in job context") } + return jobContext, nil +} +func validateClusterContext(c *cluster.Cluster) (*postgresClusterContext, error) { clusterContext := &postgresClusterContext{} if c.Context != nil { if err := c.Context.Unmarshal(clusterContext); err != nil { - return fmt.Errorf("failed to unmarshal cluster context: %w", err) + return nil, fmt.Errorf("failed to unmarshal cluster context: %w", err) } } if clusterContext.ConnectionString == "" { - return fmt.Errorf("connection_string is required in cluster context") + return nil, fmt.Errorf("connection_string is required in cluster context") } - - db := &database.Database{ConnectionString: clusterContext.ConnectionString} - - if jobContext.ReturnResult { - return executeSyncQuery(db, jobContext.Query, j) - } - return p.executeAsyncQueries(db, jobContext.Query, j) + return clusterContext, nil } func executeSyncQuery(db *database.Database, query string, j *job.Job) error { From bb20133340286f337967cd9fab23e0935401e053 Mon Sep 17 00:00:00 2001 From: sanketjadhavSF Date: Mon, 28 Jul 2025 23:02:05 +0530 Subject: [PATCH 4/4] readme updates to make it consistent with other plugin docs --- plugins/postgres/README.md | 148 ++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 53 deletions(-) diff --git a/plugins/postgres/README.md b/plugins/postgres/README.md index f7bfaa5..0a435fa 100644 --- a/plugins/postgres/README.md +++ b/plugins/postgres/README.md @@ -1,73 +1,115 @@ -# PostgreSQL Plugin +# ⚡ PostgreSQL Plugin -This plugin provides an interface to PostgreSQL databases with support for direct SQL queries, SQL files, and batch execution. +The **PostgreSQL Plugin** enables Heimdall to run SQL queries on configured PostgreSQL databases. It supports direct SQL, SQL files, batch execution, and both synchronous and asynchronous modes. -## Features +--- -- Execute single or multiple SQL statements -- Support for both direct query strings and SQL file execution -- Synchronous (return_result: true) and asynchronous (return_result: false) execution modes -- Error reporting with line number and query for batch execution -- Transaction support for batch execution +## 🧩 Plugin Overview -## Configuration +* **Plugin Name:** `postgres` +* **Execution Modes:** Synchronous (return_result: true) and Asynchronous (return_result: false) +* **Use Case:** Running SQL queries (single or batch) against PostgreSQL databases -### Cluster Context +--- + +## ⚙️ Defining a Postgres Command + +A Postgres command can specify execution mode and other preferences. Example: ```yaml -connection_string: postgresql://user:password@host:port/database # Required + - name: postgres-0.0.1 + status: active + plugin: postgres + version: 0.0.1 + description: Execute queries against PostgreSQL databases + tags: + - type:postgres + cluster_tags: + - type:postgres + - data:local +``` + +--- + +## 🖥️ Cluster Configuration + +Each Postgres cluster must define a `connection_string`: + +```yaml + - name: postgres + status: active + version: 0.0.1 + description: PostgreSQL Production Database + context: + connection_string: "postgresql://user:password@host:port/database" + tags: + - type:postgres + - data:local ``` -### Job Context +--- + +## 🚀 Submitting a Postgres Job + +A Postgres job provides the SQL query to be executed, and can specify execution mode: + +```json +{ + "name": "run-pg-query", + "version": "0.0.1", + "command_criteria": [ + "type:postgres" + ], + "cluster_criteria": [ + "data:local" + ], + "context": { + "query": "select * from employees limit 10;", + "return_result": true + } +} +``` + +--- + +## 📦 Job Context & Runtime + +The Postgres plugin handles: + +* Executing single or multiple SQL statements (batch) +* Supporting both direct query strings and SQL file execution +* Synchronous mode (`return_result: true`): returns query results, only one query allowed +* Asynchronous mode (`return_result: false`): executes all queries, returns success or error + +### Job Context Example ```yaml query: SELECT * FROM my_table # Required - SQL query to execute or path to .sql file return_result: true # Optional - Whether to return query results (default: false) ``` -#### Execution Modes +### Cluster Context Example + +```yaml +connection_string: postgresql://user:password@host:port/database # Required +``` -1. **Return Results Mode** (`return_result: true`): - - Only a single query is allowed (trailing semicolon is fine) - - Returns query results as structured data - - Fails if multiple queries are provided +--- -2. **Execute Only Mode** (`return_result: false`): - - Multiple queries separated by `;` are allowed - - Executes all queries in order, stops and fails on first error - - Returns error message with line number and query if any query fails - - Returns a success message if all queries succeed +## 📊 Returning Job Results -## Usage +If enabled in the environment, Heimdall exposes query results via: -```yaml -# Example 1: SELECT query with results -- name: postgresql-select - description: Execute a SELECT query and return results - command: postgres - cluster: postgres-cluster - context: - query: SELECT * FROM my_table WHERE status = 'active' - return_result: true - -# Example 2: Execute an UPDATE statement -- name: postgresql-update - description: Execute an UPDATE statement - command: postgres - cluster: postgres-cluster - context: - query: UPDATE my_table SET status = 'inactive' WHERE last_active < '2025-01-01' - return_result: false - - -# Example 3: Execute multiple statements in a batch -- name: postgresql-batch - description: Execute multiple SQL statements in a batch - command: postgres - cluster: postgres-cluster - context: - query: | - INSERT INTO orders (id, customer_id, amount) VALUES ('ORD001', 'CUST123', 299.99); - UPDATE inventory SET stock = stock - 1 WHERE product_id = 'PROD456'; - return_result: false ``` +GET /api/v1/job//result +``` + +--- + +## 🧠 Best Practices + +* Use synchronous mode for SELECT queries where results are needed +* Use asynchronous mode for DDL/DML or batch operations +* Always secure your connection strings and database credentials +* Use `type:postgres` tags to isolate command and cluster matching +* Use SQL files for large or complex batch operations