Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions internal/pkg/object/command/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package postgres

import (
"fmt"
"strings"
"sync"

"github.com/patterninc/heimdall/internal/pkg/database"
pkgcontext "github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
"github.com/patterninc/heimdall/pkg/result"
"github.com/patterninc/heimdall/pkg/result/column"
)

// postgresJobContext represents the context for a PostgreSQL job
type postgresJobContext struct {
Query string `yaml:"query,omitempty" json:"query,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
}

type postgresClusterContext struct {
ConnectionString string `yaml:"connection_string,omitempty" json:"connection_string,omitempty"`
}

type postgresCommandContext struct {
mu sync.Mutex
}

// New creates a new PostgreSQL plugin handler.
func New(_ *pkgcontext.Context) (plugin.Handler, error) {
p := &postgresCommandContext{}
return p.handler, nil
}

// Handler for the PostgreSQL query execution.
func (p *postgresCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error {
jobContext, err := validateJobContext(j)
if err != nil {
return err
}

clusterContext, err := validateClusterContext(c)
if err != nil {
return err
}

db := &database.Database{ConnectionString: clusterContext.ConnectionString}
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection string is being passed directly without validation or sanitization. Consider validating the connection string format and ensuring it doesn't contain malicious parameters that could lead to security vulnerabilities.

Copilot uses AI. Check for mistakes.

if jobContext.ReturnResult {
return executeSyncQuery(db, jobContext.Query, j)
}
return p.executeAsyncQueries(db, jobContext.Query, j)
}

func validateJobContext(j *job.Job) (*postgresJobContext, error) {
jobContext := &postgresJobContext{}
if j.Context != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's move it to 2 functions

  • create and validate job context
  • Create an validate cluster context

In that case method vill be a little bit clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done added separate functions for job and cluster context validation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also added the test results from local to PR description

if err := j.Context.Unmarshal(jobContext); err != nil {
return nil, fmt.Errorf("failed to unmarshal job context: %w", err)
}
}
if jobContext.Query == "" {
return nil, fmt.Errorf("query is required in job context")
}
return jobContext, nil
}

func validateClusterContext(c *cluster.Cluster) (*postgresClusterContext, error) {
clusterContext := &postgresClusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterContext); err != nil {
return nil, fmt.Errorf("failed to unmarshal cluster context: %w", err)
}
}
if clusterContext.ConnectionString == "" {
return nil, fmt.Errorf("connection_string is required in cluster context")
}
return clusterContext, nil
}

func executeSyncQuery(db *database.Database, query string, j *job.Job) error {
// Allow a single query, even if it ends with a semicolon
queries := splitAndTrimQueries(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does postres not allow to call query with semicolon at the end?

Copy link
Contributor Author

@sanketjadhavSF sanketjadhavSF Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this splitAndTrimQueries() function is used to parse the query param and find if there are multiple queries based on ; separator and route the query execution to different function executeSyncQuery() or executeAsyncQueries()
executeSyncQuery - this function is executed when return_result is set to true and only single query is executed.
executeAsyncQueries - this is used to run .sql scripts with multiple sql statements. this will return only errors if any and not results are returned.

if len(queries) != 1 {
return fmt.Errorf("multiple queries are not allowed when return_result is true")
}

sess, err := db.NewSession(false)
if err != nil {
return fmt.Errorf("failed to open PostgreSQL connection: %w", err)
}
defer sess.Close()

rows, err := sess.Query(queries[0])
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing raw SQL queries without prepared statements or input validation could expose the system to SQL injection attacks. Consider using parameterized queries or implementing input validation for the query parameter.

Copilot uses AI. Check for mistakes.
if err != nil {
return fmt.Errorf("PostgreSQL query execution failed: %w", err)
}
defer rows.Close()

rowsResult, err := result.FromRows(rows)
if err != nil {
return fmt.Errorf("failed to process PostgreSQL query results: %w", err)
}

j.Result = rowsResult
return nil
}

func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
sess, err := db.NewSession(false)
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}},
}
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
}
defer sess.Close()

_, err = sess.Exec(query)
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing raw SQL queries without prepared statements or input validation could expose the system to SQL injection attacks. Consider using parameterized queries or implementing input validation for the query parameter.

Copilot uses AI. Check for mistakes.
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}},
}
Comment on lines +111 to +133
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error result construction is duplicated in both error handling blocks (lines 114-120 and 127-133). Consider extracting this into a helper function to reduce code duplication and improve maintainability.

Suggested change
func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
sess, err := db.NewSession(false)
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}},
}
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
}
defer sess.Close()
_, err = sess.Exec(query)
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}},
}
// errorResult constructs a result.Result containing a single error message.
func errorResult(msg string) *result.Result {
return &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{msg}},
}
}
func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
sess, err := db.NewSession(false)
if err != nil {
j.Result = errorResult(fmt.Sprintf("Async PostgreSQL connection error: %v", err))
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
}
defer sess.Close()
_, err = sess.Exec(query)
if err != nil {
j.Result = errorResult(fmt.Sprintf("Async PostgreSQL query execution error: %v", err))

Copilot uses AI. Check for mistakes.
return fmt.Errorf("Async PostgreSQL query execution error: %v", err)
}

j.Result = &result.Result{
Columns: []*column.Column{{
Name: "message",
Type: column.Type("string"),
}},
Data: [][]any{{"All queries executed successfully"}},
}
return nil
}

func splitAndTrimQueries(query string) []string {
queries := []string{}
for _, q := range strings.Split(query, ";") {
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query splitting logic using semicolon delimiter is naive and could incorrectly split queries that contain semicolons within string literals, comments, or function definitions. This could break valid SQL statements or create security vulnerabilities.

Copilot uses AI. Check for mistakes.
q = strings.TrimSpace(q)
if q != "" {
queries = append(queries, q)
}
}
return queries
}
115 changes: 115 additions & 0 deletions plugins/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# ⚡ PostgreSQL Plugin

The **PostgreSQL Plugin** enables Heimdall to run SQL queries on configured PostgreSQL databases. It supports direct SQL, SQL files, batch execution, and both synchronous and asynchronous modes.

---

## 🧩 Plugin Overview

* **Plugin Name:** `postgres`
* **Execution Modes:** Synchronous (return_result: true) and Asynchronous (return_result: false)
* **Use Case:** Running SQL queries (single or batch) against PostgreSQL databases

---

## ⚙️ Defining a Postgres Command

A Postgres command can specify execution mode and other preferences. Example:

```yaml
- name: postgres-0.0.1
status: active
plugin: postgres
version: 0.0.1
description: Execute queries against PostgreSQL databases
tags:
- type:postgres
cluster_tags:
- type:postgres
- data:local
```

---

## 🖥️ Cluster Configuration

Each Postgres cluster must define a `connection_string`:

```yaml
- name: postgres
status: active
version: 0.0.1
description: PostgreSQL Production Database
context:
connection_string: "postgresql://user:password@host:port/database"
tags:
- type:postgres
- data:local
```

---

## 🚀 Submitting a Postgres Job

A Postgres job provides the SQL query to be executed, and can specify execution mode:

```json
{
"name": "run-pg-query",
"version": "0.0.1",
"command_criteria": [
"type:postgres"
],
"cluster_criteria": [
"data:local"
],
"context": {
"query": "select * from employees limit 10;",
"return_result": true
}
}
```

---

## 📦 Job Context & Runtime

The Postgres plugin handles:

* Executing single or multiple SQL statements (batch)
* Supporting both direct query strings and SQL file execution
* Synchronous mode (`return_result: true`): returns query results, only one query allowed
* Asynchronous mode (`return_result: false`): executes all queries, returns success or error

### Job Context Example

```yaml
query: SELECT * FROM my_table # Required - SQL query to execute or path to .sql file
return_result: true # Optional - Whether to return query results (default: false)
```

### Cluster Context Example

```yaml
connection_string: postgresql://user:password@host:port/database # Required
```

---

## 📊 Returning Job Results

If enabled in the environment, Heimdall exposes query results via:

```
GET /api/v1/job/<job_id>/result
```

---

## 🧠 Best Practices

* Use synchronous mode for SELECT queries where results are needed
* Use asynchronous mode for DDL/DML or batch operations
* Always secure your connection strings and database credentials
* Use `type:postgres` tags to isolate command and cluster matching
* Use SQL files for large or complex batch operations
12 changes: 12 additions & 0 deletions plugins/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"github.com/patterninc/heimdall/internal/pkg/object/command/postgres"
"github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/plugin"
)

// New creates a new instance of the postgres plugin.
func New(c *context.Context) (plugin.Handler, error) {
return postgres.New(c)
}