Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Heimdall supports a growing set of pluggable command types:
| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async |
| `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async |
| `trino` | [Query execution in Trino](https://github.com/patterninc/heimdall/blob/main/plugins/trino/README.md) | Async |
| `clickhouse`| [Query execution in Clickhouse](https://github.com/patterninc/heimdall/blob/main/plugins/clickhouse/README.md) | Sync |

---

Expand Down
26 changes: 17 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/patterninc/heimdall
go 1.24.6

require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.3
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/aws/aws-sdk-go-v2 v1.38.0
github.com/aws/aws-sdk-go-v2/config v1.30.3
github.com/aws/aws-sdk-go-v2/credentials v1.18.3
Expand All @@ -20,6 +22,7 @@ require (
github.com/kubeflow/spark-operator/v2 v2.3.0
github.com/lib/pq v1.10.9
github.com/linkedin/goavro v2.1.0+incompatible
github.com/shopspring/decimal v1.4.0
github.com/snowflakedb/gosnowflake v1.15.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.33.4
Expand All @@ -35,8 +38,8 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 // indirect
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/ClickHouse/ch-go v0.68.0 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/apache/arrow-go/v18 v18.4.0 // indirect
github.com/apache/thrift v0.22.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
Expand All @@ -61,6 +64,8 @@ require (
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.9 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
Expand All @@ -86,29 +91,32 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.41.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect
golang.org/x/mod v0.27.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/term v0.34.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/term v0.35.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.36.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
Expand Down
89 changes: 65 additions & 24 deletions go.sum

Large diffs are not rendered by default.

183 changes: 183 additions & 0 deletions internal/pkg/object/command/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package clickhouse

import (
"context"
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/hladush/go-telemetry/pkg/telemetry"
hdctx "github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/object/job/status"
"github.com/patterninc/heimdall/pkg/plugin"
"github.com/patterninc/heimdall/pkg/result"
"github.com/patterninc/heimdall/pkg/result/column"
)

type commandContext struct {
Username string `yaml:"username,omitempty" json:"username,omitempty"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Username and password is stored on the command level because different teams eventually might have different access permitions and it should be managed on command level not on the cluster level.
Please share your thoughts

Copy link
Contributor

@sanketjadhavSF sanketjadhavSF Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thought is ideally it should be a connection string in cluster level something like this => clickhouse://username:password@host:port/database?ssl=true and then at command level we control different access to different teams using the RBAC roles.
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts that have a second line of defense is always nice, specially when companies don't have RBAC.

Password string `yaml:"password,omitempty" json:"password,omitempty"`
}

type clusterContext struct {
Endpoints []string `yaml:"endpoints" json:"endpoints"`
Database string `yaml:"database,omitempty" json:"database,omitempty"`
}

type jobContext struct {
Query string `yaml:"query" json:"query"`
Params map[string]string `yaml:"params,omitempty" json:"params,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
conn driver.Conn
}

const (
serviceName = "clickhouse"
)

var (
dummyRowsInstance = dummyRows()
handleMethod = telemetry.NewMethod("handle", serviceName)
createExcMethod = telemetry.NewMethod("createExc", serviceName)
collectResultsMethod = telemetry.NewMethod("collectResults", serviceName)
)

// New creates a new clickhouse plugin handler
func New(ctx *hdctx.Context) (plugin.Handler, error) {
t := &commandContext{}

if ctx != nil {
if err := ctx.Unmarshal(t); err != nil {
return nil, err
}
}

return t.handler, nil
}

func (cmd *commandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error {
ctx := context.Background()

jobContext, err := cmd.createJobContext(j, c)
if err != nil {
handleMethod.LogAndCountError(err, "create_job_context")
return err
}

rows, err := jobContext.execute(ctx)
if err != nil {
handleMethod.LogAndCountError(err, "execute")
return err
}
res, err := collectResults(rows)
if err != nil {
handleMethod.LogAndCountError(err, "collect_results")
return err
}
j.Result = res
j.Status = status.Succeeded

return nil
}

func (cmd *commandContext) createJobContext(j *job.Job, c *cluster.Cluster) (*jobContext, error) {
// get cluster context
clusterCtx := &clusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterCtx); err != nil {
createExcMethod.CountError("unmarshal_cluster_context")
return nil, fmt.Errorf("failed to unmarshal cluster context: %v", err)
}
}

// get job context
jobCtx := &jobContext{}
if j.Context != nil {
if err := j.Context.Unmarshal(jobCtx); err != nil {
createExcMethod.CountError("unmarshal_job_context")
return nil, fmt.Errorf("failed to unmarshal job context: %v", err)
}
}

conn, err := clickhouse.Open(&clickhouse.Options{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apart from the basic username and password the user should be able to override or control the connection settings from either command or cluster level as it supports only support single query execution at a time.

Reference for enabling settings in connection open() method: https://github.com/ClickHouse/clickhouse-go/blob/main/examples/clickhouse_api/connect_settings.go

Addr: clusterCtx.Endpoints,
Auth: clickhouse.Auth{
Database: clusterCtx.Database,
Username: cmd.Username,
Password: cmd.Password,
},
})
if err != nil {
createExcMethod.CountError("open_connection")
return nil, fmt.Errorf("failed to open ClickHouse connection: %v", err)
}
jobCtx.conn = conn
return jobCtx, nil
}

func (j *jobContext) execute(ctx context.Context) (driver.Rows, error) {
var args []any
for k, v := range j.Params {
args = append(args, clickhouse.Named(k, v))
}
if j.ReturnResult {
return j.conn.Query(ctx, j.Query, args...)
}
return dummyRowsInstance, j.conn.Exec(ctx, j.Query, args...)

}

func collectResults(rows driver.Rows) (*result.Result, error) {
defer rows.Close()

cols := rows.Columns()
colTypes := rows.ColumnTypes()

out := &result.Result{
Columns: make([]*column.Column, len(cols)),
Data: make([][]any, 0, 128),
}
for i, c := range cols {
base, _ := unwrapCHType(colTypes[i].DatabaseTypeName())
columnTypeName := colTypes[i].DatabaseTypeName()
if val, ok := chTypeToResultTypeName[base]; ok {
columnTypeName = val
}
out.Columns[i] = &column.Column{
Name: c,
Type: column.Type(columnTypeName),
}
}

// For each column we keep: scan target and a reader that returns a normalized interface{}

for rows.Next() {
scanTargets := make([]any, len(cols))
readers := make([]func() any, len(cols))

for i, ct := range colTypes {
base, nullable := unwrapCHType(ct.DatabaseTypeName())

if handler, ok := chTypeHandlers[base]; ok {
scanTargets[i], readers[i] = handler(nullable)
} else {
// Fallback (covers unknown + legacy decimal detection)
scanTargets[i], readers[i] = handleDefault(nullable)
}
}

if err := rows.Scan(scanTargets...); err != nil {
collectResultsMethod.CountError("row_scan")
return nil, fmt.Errorf("row scan error: %w", err)
}

row := make([]any, len(cols))
for i := range readers {
row[i] = readers[i]()
}
out.Data = append(out.Data, row)
}
return out, nil
}
Loading