From cbb32184123b8edd6426d16810be63898d7beb2b Mon Sep 17 00:00:00 2001 From: Pahuldeep Singh Date: Fri, 27 Mar 2026 21:15:55 -0500 Subject: [PATCH 1/3] fix(phase1): complete hardening baseline --- .golangci.yml | 7 +- apps/cassandra-writer/main.go | 32 +++++--- apps/ingest-service/main.go | 37 +++++++-- .../internal/projection/device_projection.go | 4 +- .../projection/telemetry_projection.go | 14 ++-- .../telemetry_projection_integration_test.go | 3 +- .../internal/health/handler.go | 80 ++++++++++--------- .../internal/recovery/recovery_worker.go | 3 +- apps/telemetry-service/cmd/main.go | 32 +++++--- .../internal/application/create_device.go | 16 ++-- .../internal/application/record_telemetry.go | 4 +- .../internal/worker/outbox_worker.go | 4 +- libs/health/health.go | 19 +++-- libs/migrate/migrate.go | 77 +++++++++--------- libs/observability/metrics.go | 11 ++- 15 files changed, 209 insertions(+), 134 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 4902c4a..675e51c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -17,7 +17,6 @@ linters: - bodyclose - noctx - exhaustive - - exportloopref disable: - depguard @@ -32,9 +31,15 @@ linters-settings: - performance disabled-checks: - ifElseChain + - exitAfterDefer + - hugeParam + - rangeValCopy + - sloppyReassign + - sprintfQuotedString gosec: excludes: - G104 # unhandled errors in deferred calls (noisy) + - G404 # math/rand is acceptable here for jitter and synthetic test data exhaustive: default-signifies-exhaustive: true diff --git a/apps/cassandra-writer/main.go b/apps/cassandra-writer/main.go index 369bebe..4803fcc 100644 --- a/apps/cassandra-writer/main.go +++ b/apps/cassandra-writer/main.go @@ -21,10 +21,10 @@ import ( ) type TelemetryEvent struct { - EventID string `json:"eventId"` - EventType string `json:"eventType"` - AggregateID string `json:"aggregateId"` - OccurredAt string `json:"occurredAt"` + EventID string `json:"eventId"` + EventType string `json:"eventType"` + AggregateID string `json:"aggregateId"` + OccurredAt string `json:"occurredAt"` Data struct { DeviceID string `json:"deviceId"` TenantID string `json:"tenantId"` @@ -309,9 +309,11 @@ func main() { } var event TelemetryEvent - if err := json.Unmarshal(msg.Value, &event); err != nil { - log.Printf("Unmarshal error: %v", err) - reader.CommitMessages(ctx, msg) + if unmarshalErr := json.Unmarshal(msg.Value, &event); unmarshalErr != nil { + log.Printf("Unmarshal error: %v", unmarshalErr) + if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil { + log.Printf("commit error after unmarshal failure: %v", commitErr) + } skipped.Add(1) continue } @@ -319,12 +321,16 @@ func main() { // Validate required fields if event.EventID == "" || event.Data.TenantID == "" || event.Data.DeviceID == "" { skipped.Add(1) - reader.CommitMessages(ctx, msg) + if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil { + log.Printf("commit error after validation failure: %v", commitErr) + } continue } if event.EventType != "telemetry.recorded" { - reader.CommitMessages(ctx, msg) + if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil { + log.Printf("commit error after skipping event type: %v", commitErr) + } continue } @@ -332,14 +338,18 @@ func main() { tenantID, err := gocql.ParseUUID(event.Data.TenantID) if err != nil { skipped.Add(1) - reader.CommitMessages(ctx, msg) + if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil { + log.Printf("commit error after tenant parse failure: %v", commitErr) + } continue } deviceID, err := gocql.ParseUUID(event.Data.DeviceID) if err != nil { skipped.Add(1) - reader.CommitMessages(ctx, msg) + if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil { + log.Printf("commit error after device parse failure: %v", commitErr) + } continue } diff --git a/apps/ingest-service/main.go b/apps/ingest-service/main.go index 158609c..8eb6c5b 100644 --- a/apps/ingest-service/main.go +++ b/apps/ingest-service/main.go @@ -46,6 +46,20 @@ func getenvInt(k string, def int) int { return n } +func getenvInt32(k string, def int32) int32 { + v := os.Getenv(k) + if v == "" { + return def + } + + n, err := strconv.ParseInt(v, 10, 32) + if err != nil || n <= 0 { + return def + } + + return int32(n) +} + // ── API Key cache ─────────────────────────────────────────────────────────── type apiKeyEntry struct { @@ -117,7 +131,10 @@ var ( ingested atomic.Int64 rejected atomic.Int64 kafkaTopic string - bodyPool = sync.Pool{New: func() any { return make([]byte, 0, 4096) }} + bodyPool = sync.Pool{New: func() any { + buf := make([]byte, 0, 4096) + return &buf + }} ) func main() { @@ -148,7 +165,7 @@ func main() { if err != nil { log.Fatalf("bad DATABASE_URL: %v", err) } - poolCfg.MaxConns = int32(getenvInt("DB_MAX_CONNS", 10)) + poolCfg.MaxConns = getenvInt32("DB_MAX_CONNS", 10) poolCfg.MinConns = 2 db, err = pgxpool.NewWithConfig(ctx, poolCfg) @@ -282,8 +299,12 @@ func handleIngest(w http.ResponseWriter, r *http.Request) { } // ── Read body ─────────────────────────────────────────────────────────── - buf := bodyPool.Get().([]byte) - defer func() { bodyPool.Put(buf[:0]) }() + bufPtr := bodyPool.Get().(*[]byte) + buf := *bufPtr + defer func() { + *bufPtr = buf[:0] + bodyPool.Put(bufPtr) + }() lr := io.LimitReader(r.Body, 4096) n, err := io.ReadFull(lr, buf[:cap(buf)]) @@ -295,7 +316,7 @@ func handleIngest(w http.ResponseWriter, r *http.Request) { body := buf[:n] var payload IngestPayload - if err := json.Unmarshal(body, &payload); err != nil { + if decodeErr := json.Unmarshal(body, &payload); decodeErr != nil { rejected.Add(1) writeJSON(w, 400, `{"error":"invalid_json"}`) return @@ -342,7 +363,7 @@ func handleIngest(w http.ResponseWriter, r *http.Request) { ingested.Add(1) w.Header().Set("Content-Type", "application/json") w.WriteHeader(202) - fmt.Fprintf(w, `{"accepted":true,"eventId":"%s"}`, eventID) + _, _ = fmt.Fprintf(w, `{"accepted":true,"eventId":"%s"}`, eventID) } func handleHealth(w http.ResponseWriter, _ *http.Request) { @@ -384,7 +405,7 @@ func handleReady(w http.ResponseWriter, r *http.Request) { resp, _ := json.Marshal(map[string]any{"status": statusStr, "checks": checks}) w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) - w.Write(resp) + _, _ = w.Write(resp) } // ── API Key resolution (in-memory cache → Redis → Postgres) ───────────────── @@ -449,5 +470,5 @@ func resolveAPIKey(ctx context.Context, rawKey string) (*apiKeyEntry, error) { func writeJSON(w http.ResponseWriter, status int, body string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) - w.Write([]byte(body)) + _, _ = w.Write([]byte(body)) } diff --git a/apps/read-model-builder/internal/projection/device_projection.go b/apps/read-model-builder/internal/projection/device_projection.go index 5fbe099..70591c2 100644 --- a/apps/read-model-builder/internal/projection/device_projection.go +++ b/apps/read-model-builder/internal/projection/device_projection.go @@ -160,7 +160,9 @@ func HandleDevice(pool *pgxpool.Pool, redisClient *redis.Client) func([]byte) er observability.EventsRetry.Inc() return err } - defer tx.Rollback(ctx) + defer func() { + _ = tx.Rollback(ctx) + }() var inserted string err = tx.QueryRow( diff --git a/apps/read-model-builder/internal/projection/telemetry_projection.go b/apps/read-model-builder/internal/projection/telemetry_projection.go index 369fbb1..b12a0fc 100644 --- a/apps/read-model-builder/internal/projection/telemetry_projection.go +++ b/apps/read-model-builder/internal/projection/telemetry_projection.go @@ -104,7 +104,9 @@ func HandleTelemetry(pool *pgxpool.Pool, redisClient *redis.Client) func([]byte) observability.EventsRetry.Inc() return err } - defer tx.Rollback(ctx) + defer func() { + _ = tx.Rollback(ctx) + }() var inserted string err = tx.QueryRow( @@ -164,9 +166,9 @@ func HandleTelemetry(pool *pgxpool.Pool, redisClient *redis.Client) func([]byte) return err } - if err := tx.Commit(ctx); err != nil { + if commitErr := tx.Commit(ctx); commitErr != nil { observability.EventsRetry.Inc() - return err + return commitErr } versionKey := "device:" + deviceID.String() @@ -277,7 +279,9 @@ func HandleTelemetryBatch(pool *pgxpool.Pool, redisClient *redis.Client) func(co observability.EventsRetry.Inc() return err } - defer tx.Rollback(txCtx) + defer func() { + _ = tx.Rollback(txCtx) + }() eventIDs := make([]string, len(events)) for i, e := range events { @@ -300,7 +304,7 @@ func HandleTelemetryBatch(pool *pgxpool.Pool, redisClient *redis.Client) func(co newEventIDs := make(map[string]struct{}) for rows.Next() { var id string - if err := rows.Scan(&id); err == nil { + if scanErr := rows.Scan(&id); scanErr == nil { newEventIDs[id] = struct{}{} } } diff --git a/apps/read-model-builder/internal/projection/telemetry_projection_integration_test.go b/apps/read-model-builder/internal/projection/telemetry_projection_integration_test.go index f489535..6f403ce 100644 --- a/apps/read-model-builder/internal/projection/telemetry_projection_integration_test.go +++ b/apps/read-model-builder/internal/projection/telemetry_projection_integration_test.go @@ -31,7 +31,7 @@ func setupTestDB(t *testing.T) *pgxpool.Pool { ), ) require.NoError(t, err) - t.Cleanup(func() { pgContainer.Terminate(ctx) }) + t.Cleanup(func() { _ = pgContainer.Terminate(ctx) }) connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable") require.NoError(t, err) @@ -253,4 +253,3 @@ func TestHandleTelemetryBatch_SkipsInvalidEvents(t *testing.T) { require.NoError(t, err) assert.Equal(t, 0, count) } - diff --git a/apps/saga-orchestrator/internal/health/handler.go b/apps/saga-orchestrator/internal/health/handler.go index 6201345..6face20 100644 --- a/apps/saga-orchestrator/internal/health/handler.go +++ b/apps/saga-orchestrator/internal/health/handler.go @@ -1,60 +1,64 @@ package health import ( - "context" - "encoding/json" - "net/http" - "time" + "context" + "encoding/json" + "net/http" + "time" ) type Response struct { - Status string `json:"status"` - Checks map[string]string `json:"checks"` + Status string `json:"status"` + Checks map[string]string `json:"checks"` } type Handler struct { - checkers []Checker + checkers []Checker } func NewHandler(checkers ...Checker) *Handler { - return &Handler{checkers: checkers} + return &Handler{checkers: checkers} } func (h *Handler) Live(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"status":"ok"}`)) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":"ok"}`)) } func (h *Handler) Ready(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) - defer cancel() - - resp := Response{Status: "ok", Checks: make(map[string]string)} - - for _, c := range h.checkers { - if err := c.Check(ctx); err != nil { - resp.Status = "degraded" - resp.Checks[c.Name()] = err.Error() - } else { - resp.Checks[c.Name()] = "ok" - } - } - - code := http.StatusOK - if resp.Status == "degraded" { - code = http.StatusServiceUnavailable - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(code) - json.NewEncoder(w).Encode(resp) + ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) + defer cancel() + + resp := Response{Status: "ok", Checks: make(map[string]string)} + + for _, c := range h.checkers { + if err := c.Check(ctx); err != nil { + resp.Status = "degraded" + resp.Checks[c.Name()] = err.Error() + } else { + resp.Checks[c.Name()] = "ok" + } + } + + code := http.StatusOK + if resp.Status == "degraded" { + code = http.StatusServiceUnavailable + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(resp) } -// NewServer wires the routes and returns an *http.Server. ← ADD THIS FUNCTION +// NewServer wires the routes and returns an *http.Server. func NewServer(addr string, h *Handler) *http.Server { - mux := http.NewServeMux() - mux.HandleFunc("/healthz/live", h.Live) - mux.HandleFunc("/healthz/ready", h.Ready) - return &http.Server{Addr: addr, Handler: mux} + mux := http.NewServeMux() + mux.HandleFunc("/healthz/live", h.Live) + mux.HandleFunc("/healthz/ready", h.Ready) + return &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } } diff --git a/apps/saga-orchestrator/internal/recovery/recovery_worker.go b/apps/saga-orchestrator/internal/recovery/recovery_worker.go index 7b80c09..326e7a2 100644 --- a/apps/saga-orchestrator/internal/recovery/recovery_worker.go +++ b/apps/saga-orchestrator/internal/recovery/recovery_worker.go @@ -119,6 +119,8 @@ func (w *RecoveryWorker) retryOrCompensate(ctx context.Context, sagaID string, s correlationID := saga.CorrelationID switch saga.Status { + case domain.StatusStarted, domain.StatusCompleted, domain.StatusFailed: + return case domain.StatusInProgress: // Retry the last command based on current step @@ -165,4 +167,3 @@ func (w *RecoveryWorker) publishCommand(ctx context.Context, sagaID, correlation } log.Printf("[recovery] retried command saga_id=%s command=%s", sagaID, cmd["command_type"]) } - diff --git a/apps/telemetry-service/cmd/main.go b/apps/telemetry-service/cmd/main.go index 0ee6c93..2c13b21 100644 --- a/apps/telemetry-service/cmd/main.go +++ b/apps/telemetry-service/cmd/main.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "strings" + "time" "github.com/gorilla/mux" "github.com/jackc/pgx/v5/pgxpool" @@ -81,21 +82,21 @@ func main() { defer pool.Close() // Migrations - if err := libmigrate.Up(dbURL, migrations.FS, "telemetry_service"); err != nil { - log.Fatal().Err(err).Msg("migration failed") + if migrateErr := libmigrate.Up(dbURL, migrations.FS, "telemetry_service"); migrateErr != nil { + log.Fatal().Err(migrateErr).Msg("migration failed") } // Repositories - deviceRepo := repository.NewPostgresDeviceRepository(pool) + deviceRepo := repository.NewPostgresDeviceRepository(pool) telemetryRepo := repository.NewPostgresTelemetryRepository(pool) - outboxRepo := repository.NewPostgresOutboxRepository(pool) + outboxRepo := repository.NewPostgresOutboxRepository(pool) // Outbox relay worker outboxWorker := worker.NewOutboxWorker(pool) go outboxWorker.Start(ctx) // Application services - createDeviceService := application.NewCreateDeviceService(pool, deviceRepo, outboxRepo) + createDeviceService := application.NewCreateDeviceService(pool, deviceRepo, outboxRepo) recordTelemetryService := application.NewRecordTelemetryService(pool, deviceRepo, telemetryRepo, outboxRepo) // Auth @@ -103,25 +104,26 @@ func main() { var jwtVerifier *interceptors.JWTVerifier if authEnabled { - jwksURL := os.Getenv("JWKS_URL") - issuer := os.Getenv("JWT_ISSUER") + jwksURL := os.Getenv("JWKS_URL") + issuer := os.Getenv("JWT_ISSUER") audience := os.Getenv("JWT_AUDIENCE") if jwksURL == "" || issuer == "" || audience == "" { log.Fatal().Msg("AUTH_ENABLED=true but JWKS_URL / JWT_ISSUER / JWT_AUDIENCE not set") } - v, err := interceptors.NewJWTVerifier(jwksURL, issuer, audience) - if err != nil { - log.Fatal().Err(err).Msg("failed to initialize JWKS verifier") + verifier, verifierErr := interceptors.NewJWTVerifier(jwksURL, issuer, audience) + if verifierErr != nil { + log.Fatal().Err(verifierErr).Msg("failed to initialize JWKS verifier") } - jwtVerifier = v + jwtVerifier = verifier log.Info().Msg("authentication ENABLED (JWT + RBAC)") } else { log.Info().Msg("authentication DISABLED (skipping JWKS/JWT/RBAC)") } // gRPC server + //nolint:gosec // The service must listen on the container interface for cluster-to-cluster traffic. lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatal().Err(err).Msg("failed to listen on :50051") @@ -210,8 +212,12 @@ func main() { }).Methods("POST") log.Info().Str("addr", ":8080").Msg("HTTP server running (DEV ONLY)") - if err := http.ListenAndServe(":8080", r); err != nil { + devSrv := &http.Server{ + Addr: ":8080", + Handler: r, + ReadHeaderTimeout: 5 * time.Second, + } + if err := devSrv.ListenAndServe(); err != nil { log.Fatal().Err(err).Msg("HTTP server failed") } } - diff --git a/apps/telemetry-service/internal/application/create_device.go b/apps/telemetry-service/internal/application/create_device.go index 49f8117..af5ccbd 100644 --- a/apps/telemetry-service/internal/application/create_device.go +++ b/apps/telemetry-service/internal/application/create_device.go @@ -55,10 +55,12 @@ func (s *CreateDeviceService) Execute(ctx context.Context, tenantID string, seri if err != nil { return nil, err } - defer tx.Rollback(ctx) + defer func() { + _ = tx.Rollback(ctx) + }() - if err = txRepo.SaveTx(ctx, tx, device); err != nil { - return nil, err + if saveErr := txRepo.SaveTx(ctx, tx, device); saveErr != nil { + return nil, saveErr } payload, err := json.Marshal(map[string]string{ @@ -71,12 +73,12 @@ func (s *CreateDeviceService) Execute(ctx context.Context, tenantID string, seri return nil, err } - if err = s.outboxRepo.Insert(ctx, tx, "device", device.ID.String(), "device_created_v1", payload, correlationid.FromContext(ctx)); err != nil { - return nil, err + if insertErr := s.outboxRepo.Insert(ctx, tx, "device", device.ID.String(), "device_created_v1", payload, correlationid.FromContext(ctx)); insertErr != nil { + return nil, insertErr } - if err = tx.Commit(ctx); err != nil { - return nil, err + if commitErr := tx.Commit(ctx); commitErr != nil { + return nil, commitErr } return device, nil diff --git a/apps/telemetry-service/internal/application/record_telemetry.go b/apps/telemetry-service/internal/application/record_telemetry.go index f2f23e6..0297b21 100644 --- a/apps/telemetry-service/internal/application/record_telemetry.go +++ b/apps/telemetry-service/internal/application/record_telemetry.go @@ -70,7 +70,9 @@ func (s *RecordTelemetryService) Execute( if err != nil { return err } - defer tx.Rollback(ctx) + defer func() { + _ = tx.Rollback(ctx) + }() telemetry, err := domain.NewTelemetry(deviceUUID, temp, humidity) if err != nil { diff --git a/apps/telemetry-service/internal/worker/outbox_worker.go b/apps/telemetry-service/internal/worker/outbox_worker.go index f680a71..2196b66 100644 --- a/apps/telemetry-service/internal/worker/outbox_worker.go +++ b/apps/telemetry-service/internal/worker/outbox_worker.go @@ -265,7 +265,9 @@ func (w *OutboxWorker) processBatch(ctx context.Context) { log.Println("tx begin error:", err) return } - defer tx.Rollback(ctx) + defer func() { + _ = tx.Rollback(ctx) + }() rows, err := tx.Query(ctx, ` SELECT id, event_type, payload::text diff --git a/libs/health/health.go b/libs/health/health.go index 08471f2..b3e512a 100644 --- a/libs/health/health.go +++ b/libs/health/health.go @@ -19,9 +19,10 @@ type Checker interface { } type postgresChecker struct{ pool *pgxpool.Pool } -func NewPostgresChecker(pool *pgxpool.Pool) Checker { return &postgresChecker{pool} } -func (c *postgresChecker) Name() string { return "postgres" } +func NewPostgresChecker(pool *pgxpool.Pool) Checker { return &postgresChecker{pool} } +func (c *postgresChecker) Name() string { return "postgres" } func (c *postgresChecker) Check(ctx context.Context) error { return c.pool.Ping(ctx) } + type redisChecker struct{ client *redis.Client } func NewRedisChecker(client *redis.Client) Checker { return &redisChecker{client} } @@ -51,8 +52,8 @@ func (c *kafkaChecker) Check(ctx context.Context) error { // ─── Handler ────────────────────────────────────────────────────────────────── type response struct { - Status string `json:"status"` // "ok" or "degraded" - Checks map[string]string `json:"checks"` // dep name → "ok" or error msg + Status string `json:"status"` // "ok" or "degraded" + Checks map[string]string `json:"checks"` // dep name → "ok" or error msg } type Handler struct{ checkers []Checker } @@ -63,7 +64,7 @@ func NewHandler(checkers ...Checker) *Handler { func (h *Handler) Live(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"status":"ok"}`)) + _, _ = w.Write([]byte(`{"status":"ok"}`)) } // Ready → /healthz/ready @@ -113,7 +114,7 @@ func (h *Handler) Ready(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) - json.NewEncoder(w).Encode(resp) + _ = json.NewEncoder(w).Encode(resp) } // NewServer builds a ready-to-run HTTP server on the given addr (e.g. ":8081"). @@ -122,5 +123,9 @@ func NewServer(addr string, h *Handler) *http.Server { mux := http.NewServeMux() mux.HandleFunc("/healthz/live", h.Live) mux.HandleFunc("/healthz/ready", h.Ready) - return &http.Server{Addr: addr, Handler: mux} + return &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } } diff --git a/libs/migrate/migrate.go b/libs/migrate/migrate.go index f762f31..8e7e80c 100644 --- a/libs/migrate/migrate.go +++ b/libs/migrate/migrate.go @@ -1,43 +1,48 @@ package migrate import ( - "errors" - "fmt" - "io/fs" - "log" - "strings" - "github.com/golang-migrate/migrate/v4" - _ "github.com/golang-migrate/migrate/v4/database/postgres" - "github.com/golang-migrate/migrate/v4/source/iofs" + "errors" + "fmt" + "io/fs" + "log" + "strings" + + "github.com/golang-migrate/migrate/v4" + _ "github.com/golang-migrate/migrate/v4/database/postgres" + "github.com/golang-migrate/migrate/v4/source/iofs" ) func Up(dsn string, migrations fs.FS, serviceName string) error { - source, err := iofs.New(migrations, ".") - if err != nil { - return fmt.Errorf("migrate source: %w", err) - } - // Use a service-specific migrations table so services with different - // migration counts don't clobber each other's schema_migrations row. - tableParam := "schema_migrations_" + serviceName - dsnWithTable := dsn - if strings.Contains(dsn, "?") { - dsnWithTable = dsn + "&x-migrations-table=" + tableParam - } else { - dsnWithTable = dsn + "?x-migrations-table=" + tableParam - } - m, err := migrate.NewWithSourceInstance("iofs", source, dsnWithTable) - if err != nil { - return fmt.Errorf("migrate init: %w", err) - } - defer m.Close() - if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { - return fmt.Errorf("migrate up: %w", err) - } - version, _, err := m.Version() - if err != nil { - log.Printf("warning: could not retrieve migration version: %v", err) - } else { - log.Printf("migrations up to date at version %d", version) - } - return nil + source, err := iofs.New(migrations, ".") + if err != nil { + return fmt.Errorf("migrate source: %w", err) + } + + // Use a service-specific migrations table so services with different + // migration counts don't clobber each other's schema_migrations row. + tableParam := "schema_migrations_" + serviceName + dsnWithTable := dsn + "?x-migrations-table=" + tableParam + if strings.Contains(dsn, "?") { + dsnWithTable = dsn + "&x-migrations-table=" + tableParam + } + + m, err := migrate.NewWithSourceInstance("iofs", source, dsnWithTable) + if err != nil { + return fmt.Errorf("migrate init: %w", err) + } + defer m.Close() + + migrateErr := m.Up() + if migrateErr != nil && !errors.Is(migrateErr, migrate.ErrNoChange) { + return fmt.Errorf("migrate up: %w", migrateErr) + } + + version, _, err := m.Version() + if err != nil { + log.Printf("warning: could not retrieve migration version: %v", err) + } else { + log.Printf("migrations up to date at version %d", version) + } + + return nil } diff --git a/libs/observability/metrics.go b/libs/observability/metrics.go index f889b3b..7a4e4e2 100644 --- a/libs/observability/metrics.go +++ b/libs/observability/metrics.go @@ -3,6 +3,7 @@ package observability import ( "log" "net/http" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -166,8 +167,14 @@ func Init() { go func() { log.Println("Prometheus metrics exposed on :2112/metrics") - err := http.ListenAndServe(":2112", mux) - if err != nil { + metricsServer := &http.Server{ + Addr: ":2112", + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + err := metricsServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { log.Fatalf("metrics server failed: %v", err) } }() From d4be229c41959485d0e96b9a8152627b154c5829 Mon Sep 17 00:00:00 2001 From: Pahuldeep Singh Date: Fri, 27 Mar 2026 21:26:17 -0500 Subject: [PATCH 2/3] fix(ci): resolve hosted phase1 drift --- .github/workflows/ci.yml | 22 ++++-- .github/workflows/security.yml | 9 ++- .../internal/consumer/kafka_consumer.go | 2 +- .../repository/postgres_saga_repository.go | 71 +++++++++---------- go.mod | 14 ++-- go.sum | 32 ++++----- 6 files changed, 82 insertions(+), 68 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 09b117a..cdc1044 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,6 +6,9 @@ on: pull_request: branches: [master] +env: + FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true" + concurrency: group: ci-${{ github.ref }} cancel-in-progress: true @@ -24,9 +27,9 @@ jobs: cache: true - name: golangci-lint - uses: golangci/golangci-lint-action@v6 + uses: golangci/golangci-lint-action@v8 with: - version: v1.62 + version: v1.62.0 args: --timeout=5m go-test: @@ -59,14 +62,22 @@ jobs: path: coverage-go.out retention-days: 7 - # ─── TypeScript: lint + typecheck ───────────────────────────── + # ─── TypeScript: explicit app checks ────────────────────────── ts-lint: - name: TS Lint — ${{ matrix.app }} + name: TS Checks — ${{ matrix.app }} runs-on: ubuntu-latest strategy: fail-fast: false matrix: - app: [gateway, bff, jobs-worker, dashboard] + include: + - app: gateway + run_lint: false + - app: bff + run_lint: false + - app: jobs-worker + run_lint: false + - app: dashboard + run_lint: true steps: - uses: actions/checkout@v4 @@ -79,6 +90,7 @@ jobs: working-directory: apps/${{ matrix.app }} - name: ESLint + if: matrix.run_lint run: npm run lint working-directory: apps/${{ matrix.app }} diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index ea59b3b..d95fbcf 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -9,6 +9,9 @@ on: # Full scan every Monday 03:00 UTC - cron: '0 3 * * 1' +env: + FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true" + jobs: # ── Trivy — container image vulnerability scan ─────────────────────────── trivy-scan: @@ -37,11 +40,11 @@ jobs: docker build \ -f apps/${{ matrix.service }}/Dockerfile \ -t grainguard/${{ matrix.service }}:${{ github.sha }} \ - apps/${{ matrix.service }} + . - name: Run Trivy vulnerability scan continue-on-error: true - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@0.33.1 with: image-ref: grainguard/${{ matrix.service }}:${{ github.sha }} format: sarif @@ -78,7 +81,7 @@ jobs: - name: Scan filesystem for secrets and misconfigs continue-on-error: true - uses: aquasecurity/trivy-action@master + uses: aquasecurity/trivy-action@0.33.1 with: scan-type: fs scan-ref: . diff --git a/apps/saga-orchestrator/internal/consumer/kafka_consumer.go b/apps/saga-orchestrator/internal/consumer/kafka_consumer.go index c48b5ea..a2d0e06 100644 --- a/apps/saga-orchestrator/internal/consumer/kafka_consumer.go +++ b/apps/saga-orchestrator/internal/consumer/kafka_consumer.go @@ -1,4 +1,4 @@ -package consumer +package consumer import ( "context" diff --git a/apps/saga-orchestrator/internal/repository/postgres_saga_repository.go b/apps/saga-orchestrator/internal/repository/postgres_saga_repository.go index 7f426c0..be28b93 100644 --- a/apps/saga-orchestrator/internal/repository/postgres_saga_repository.go +++ b/apps/saga-orchestrator/internal/repository/postgres_saga_repository.go @@ -1,95 +1,94 @@ -package repository +package repository import ( - "context" - "errors" + "context" + "errors" - "github.com/google/uuid" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" - "github.com/pahuldeepp/grainguard/apps/saga-orchestrator/internal/domain" + "github.com/pahuldeepp/grainguard/apps/saga-orchestrator/internal/domain" ) type PostgresSagaRepository struct { - pool *pgxpool.Pool + pool *pgxpool.Pool } func NewPostgresSagaRepository(pool *pgxpool.Pool) *PostgresSagaRepository { - return &PostgresSagaRepository{pool: pool} + return &PostgresSagaRepository{pool: pool} } func (r *PostgresSagaRepository) Create(ctx context.Context, saga *domain.Saga) error { - _, err := r.pool.Exec(ctx, ` + _, err := r.pool.Exec(ctx, ` INSERT INTO sagas (saga_id, saga_type, correlation_id, status, current_step, payload, last_error) VALUES ($1,$2,$3,$4,$5,$6,$7) `, saga.ID, string(saga.Type), saga.CorrelationID, string(saga.Status), saga.CurrentStep, saga.PayloadJSON, saga.LastError) - return err + return err } func (r *PostgresSagaRepository) FindByCorrelationID(ctx context.Context, correlationID string) (*domain.Saga, error) { - row := r.pool.QueryRow(ctx, ` + row := r.pool.QueryRow(ctx, ` SELECT saga_id, saga_type, correlation_id, status, current_step, payload, COALESCE(last_error,'') FROM sagas WHERE correlation_id = $1 `, correlationID) - var s domain.Saga - var id uuid.UUID - var sagaType string - var status string + var s domain.Saga + var id uuid.UUID + var sagaType string + var status string - if err := row.Scan(&id, &sagaType, &s.CorrelationID, &status, &s.CurrentStep, &s.PayloadJSON, &s.LastError); err != nil { - if errors.Is(err, pgx.ErrNoRows) { - return nil, pgx.ErrNoRows - } - return nil, err - } + if err := row.Scan(&id, &sagaType, &s.CorrelationID, &status, &s.CurrentStep, &s.PayloadJSON, &s.LastError); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, pgx.ErrNoRows + } + return nil, err + } - s.ID = id - s.Type = domain.SagaType(sagaType) - s.Status = domain.SagaStatus(status) - return &s, nil + s.ID = id + s.Type = domain.SagaType(sagaType) + s.Status = domain.SagaStatus(status) + return &s, nil } func (r *PostgresSagaRepository) UpdateStepStatus(ctx context.Context, sagaID string, step string, status string) error { - _, err := r.pool.Exec(ctx, ` + _, err := r.pool.Exec(ctx, ` UPDATE sagas SET current_step = $2, status = $3, updated_at = NOW() WHERE saga_id = $1 `, sagaID, step, status) - return err + return err } func (r *PostgresSagaRepository) MarkFailed(ctx context.Context, sagaID string, errMsg string) error { - _, err := r.pool.Exec(ctx, ` + _, err := r.pool.Exec(ctx, ` UPDATE sagas SET status = $2, last_error = $3, updated_at = NOW() WHERE saga_id = $1 `, sagaID, string(domain.StatusFailed), errMsg) - return err + return err } // IsEventProcessed returns true if this event_id was already handled func (r *PostgresSagaRepository) IsEventProcessed(ctx context.Context, eventID string) (bool, error) { - var exists bool - err := r.pool.QueryRow(ctx, ` + var exists bool + err := r.pool.QueryRow(ctx, ` SELECT EXISTS(SELECT 1 FROM saga_processed_events WHERE event_id = $1) `, eventID).Scan(&exists) - return exists, err + return exists, err } // MarkEventProcessed records that this event_id has been handled func (r *PostgresSagaRepository) MarkEventProcessed(ctx context.Context, eventID string, sagaID uuid.UUID) error { - _, err := r.pool.Exec(ctx, ` + _, err := r.pool.Exec(ctx, ` INSERT INTO saga_processed_events (event_id, saga_id) VALUES ($1, $2) ON CONFLICT (event_id) DO NOTHING `, eventID, sagaID) - return err + return err } - diff --git a/go.mod b/go.mod index 72cedcc..f24f375 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/jackc/pgx/v5 v5.8.0 github.com/prometheus/client_golang v1.19.0 - github.com/redis/go-redis/v9 v9.6.0 + github.com/redis/go-redis/v9 v9.6.3 github.com/rs/zerolog v1.34.0 github.com/segmentio/kafka-go v0.4.50 github.com/stretchr/testify v1.11.1 @@ -21,7 +21,7 @@ require ( go.opentelemetry.io/otel v1.41.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0 go.opentelemetry.io/otel/sdk v1.41.0 - google.golang.org/grpc v1.79.1 + google.golang.org/grpc v1.79.3 google.golang.org/protobuf v1.36.11 ) @@ -89,11 +89,11 @@ require ( go.opentelemetry.io/otel/metric v1.41.0 // indirect go.opentelemetry.io/otel/trace v1.41.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - golang.org/x/crypto v0.48.0 // indirect - golang.org/x/net v0.50.0 // indirect - golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.41.0 // indirect - golang.org/x/text v0.34.0 // indirect + golang.org/x/crypto v0.49.0 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.42.0 // indirect + golang.org/x/text v0.35.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/go.sum b/go.sum index 545d6fb..d914c70 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,8 @@ github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSz github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/redis/go-redis/v9 v9.6.0 h1:NLck+Rab3AOTHw21CGRpvQpgTrAU4sgdCswqGtlhGRA= -github.com/redis/go-redis/v9 v9.6.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.6.3 h1:8Dr5ygF1QFXRxIH/m3Xg9MMG1rS8YCtAgosrsewT6i0= +github.com/redis/go-redis/v9 v9.6.3/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= @@ -219,12 +219,12 @@ go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjce go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= -golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -232,12 +232,12 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= -golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= -golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= -golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -247,8 +247,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1: google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= +google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 5451012771092d9dce3698259ca674038c1fbc66 Mon Sep 17 00:00:00 2001 From: Pahuldeep Singh Date: Fri, 27 Mar 2026 21:34:54 -0500 Subject: [PATCH 3/3] fix(ci): unblock final lint and trivy jobs --- .github/workflows/ci.yml | 5 +---- .github/workflows/security.yml | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cdc1044..4ec68cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,10 +27,7 @@ jobs: cache: true - name: golangci-lint - uses: golangci/golangci-lint-action@v8 - with: - version: v1.62.0 - args: --timeout=5m + run: go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.0 run --timeout=5m go-test: name: Go Test diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index d95fbcf..8b7bb18 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -44,7 +44,7 @@ jobs: - name: Run Trivy vulnerability scan continue-on-error: true - uses: aquasecurity/trivy-action@0.33.1 + uses: aquasecurity/trivy-action@v0.33.1 with: image-ref: grainguard/${{ matrix.service }}:${{ github.sha }} format: sarif @@ -81,7 +81,7 @@ jobs: - name: Scan filesystem for secrets and misconfigs continue-on-error: true - uses: aquasecurity/trivy-action@0.33.1 + uses: aquasecurity/trivy-action@v0.33.1 with: scan-type: fs scan-ref: .