Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
pull_request:
branches: [master]

env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true"

concurrency:
group: ci-${{ github.ref }}
cancel-in-progress: true
Expand All @@ -24,10 +27,7 @@ jobs:
cache: true

- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.62
args: --timeout=5m
run: go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.62.0 run --timeout=5m

go-test:
name: Go Test
Expand Down Expand Up @@ -59,14 +59,22 @@ jobs:
path: coverage-go.out
retention-days: 7

# ─── TypeScript: lint + typecheck ─────────────────────────────
# ─── TypeScript: explicit app checks ──────────────────────────
ts-lint:
name: TS Lint — ${{ matrix.app }}
name: TS Checks — ${{ matrix.app }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
app: [gateway, bff, jobs-worker, dashboard]
include:
- app: gateway
run_lint: false
- app: bff
run_lint: false
- app: jobs-worker
run_lint: false
- app: dashboard
run_lint: true
steps:
- uses: actions/checkout@v4

Expand All @@ -79,6 +87,7 @@ jobs:
working-directory: apps/${{ matrix.app }}

- name: ESLint
if: matrix.run_lint
run: npm run lint
working-directory: apps/${{ matrix.app }}

Expand Down
9 changes: 6 additions & 3 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ on:
# Full scan every Monday 03:00 UTC
- cron: '0 3 * * 1'

env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true"

jobs:
# ── Trivy — container image vulnerability scan ───────────────────────────
trivy-scan:
Expand Down Expand Up @@ -37,11 +40,11 @@ jobs:
docker build \
-f apps/${{ matrix.service }}/Dockerfile \
-t grainguard/${{ matrix.service }}:${{ github.sha }} \
apps/${{ matrix.service }}
.

- name: Run Trivy vulnerability scan
continue-on-error: true
uses: aquasecurity/trivy-action@master
uses: aquasecurity/trivy-action@v0.33.1
with:
image-ref: grainguard/${{ matrix.service }}:${{ github.sha }}
format: sarif
Expand Down Expand Up @@ -78,7 +81,7 @@ jobs:

- name: Scan filesystem for secrets and misconfigs
continue-on-error: true
uses: aquasecurity/trivy-action@master
uses: aquasecurity/trivy-action@v0.33.1
with:
scan-type: fs
scan-ref: .
Expand Down
7 changes: 6 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ linters:
- bodyclose
- noctx
- exhaustive
- exportloopref
disable:
- depguard

Expand All @@ -32,9 +31,15 @@ linters-settings:
- performance
disabled-checks:
- ifElseChain
- exitAfterDefer
- hugeParam
- rangeValCopy
- sloppyReassign
- sprintfQuotedString
gosec:
excludes:
- G104 # unhandled errors in deferred calls (noisy)
- G404 # math/rand is acceptable here for jitter and synthetic test data
exhaustive:
default-signifies-exhaustive: true

Expand Down
32 changes: 21 additions & 11 deletions apps/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
)

type TelemetryEvent struct {
EventID string `json:"eventId"`
EventType string `json:"eventType"`
AggregateID string `json:"aggregateId"`
OccurredAt string `json:"occurredAt"`
EventID string `json:"eventId"`
EventType string `json:"eventType"`
AggregateID string `json:"aggregateId"`
OccurredAt string `json:"occurredAt"`
Data struct {
DeviceID string `json:"deviceId"`
TenantID string `json:"tenantId"`
Expand Down Expand Up @@ -309,37 +309,47 @@ func main() {
}

var event TelemetryEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Unmarshal error: %v", err)
reader.CommitMessages(ctx, msg)
if unmarshalErr := json.Unmarshal(msg.Value, &event); unmarshalErr != nil {
log.Printf("Unmarshal error: %v", unmarshalErr)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after unmarshal failure: %v", commitErr)
}
skipped.Add(1)
continue
}

// Validate required fields
if event.EventID == "" || event.Data.TenantID == "" || event.Data.DeviceID == "" {
skipped.Add(1)
reader.CommitMessages(ctx, msg)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after validation failure: %v", commitErr)
}
continue
}

if event.EventType != "telemetry.recorded" {
reader.CommitMessages(ctx, msg)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after skipping event type: %v", commitErr)
}
continue
}
Comment on lines 330 to 335
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing skipped.Add(1) for non-matching event types.

When event.EventType != "telemetry.recorded", the message is committed and skipped but skipped.Add(1) is not called, unlike other skip paths (lines 317, 323, 340, 349). This causes undercounting in the stats output.

🔧 Proposed fix
 		if event.EventType != "telemetry.recorded" {
+			skipped.Add(1)
 			if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
 				log.Printf("commit error after skipping event type: %v", commitErr)
 			}
 			continue
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if event.EventType != "telemetry.recorded" {
reader.CommitMessages(ctx, msg)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after skipping event type: %v", commitErr)
}
continue
}
if event.EventType != "telemetry.recorded" {
skipped.Add(1)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after skipping event type: %v", commitErr)
}
continue
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/cassandra-writer/main.go` around lines 330 - 335, In the branch that
handles non-matching event types (when event.EventType != "telemetry.recorded"),
increment the skip counter by calling skipped.Add(1) before continuing;
specifically, update the block that commits the message via
reader.CommitMessages(ctx, msg) to also call skipped.Add(1) (ensure this happens
regardless of commitErr) so the skip path for this condition matches the other
skip paths (which call skipped.Add(1)).


// Parse IDs
tenantID, err := gocql.ParseUUID(event.Data.TenantID)
if err != nil {
skipped.Add(1)
reader.CommitMessages(ctx, msg)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after tenant parse failure: %v", commitErr)
}
continue
}

deviceID, err := gocql.ParseUUID(event.Data.DeviceID)
if err != nil {
skipped.Add(1)
reader.CommitMessages(ctx, msg)
if commitErr := reader.CommitMessages(ctx, msg); commitErr != nil {
log.Printf("commit error after device parse failure: %v", commitErr)
}
continue
}

Expand Down
37 changes: 29 additions & 8 deletions apps/ingest-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ func getenvInt(k string, def int) int {
return n
}

func getenvInt32(k string, def int32) int32 {
v := os.Getenv(k)
if v == "" {
return def
}

n, err := strconv.ParseInt(v, 10, 32)
if err != nil || n <= 0 {
return def
}

return int32(n)
}

// ── API Key cache ───────────────────────────────────────────────────────────

type apiKeyEntry struct {
Expand Down Expand Up @@ -117,7 +131,10 @@ var (
ingested atomic.Int64
rejected atomic.Int64
kafkaTopic string
bodyPool = sync.Pool{New: func() any { return make([]byte, 0, 4096) }}
bodyPool = sync.Pool{New: func() any {
buf := make([]byte, 0, 4096)
return &buf
}}
)

func main() {
Expand Down Expand Up @@ -148,7 +165,7 @@ func main() {
if err != nil {
log.Fatalf("bad DATABASE_URL: %v", err)
}
poolCfg.MaxConns = int32(getenvInt("DB_MAX_CONNS", 10))
poolCfg.MaxConns = getenvInt32("DB_MAX_CONNS", 10)
poolCfg.MinConns = 2

db, err = pgxpool.NewWithConfig(ctx, poolCfg)
Expand Down Expand Up @@ -282,8 +299,12 @@ func handleIngest(w http.ResponseWriter, r *http.Request) {
}

// ── Read body ───────────────────────────────────────────────────────────
buf := bodyPool.Get().([]byte)
defer func() { bodyPool.Put(buf[:0]) }()
bufPtr := bodyPool.Get().(*[]byte)
buf := *bufPtr
defer func() {
*bufPtr = buf[:0]
bodyPool.Put(bufPtr)
}()

lr := io.LimitReader(r.Body, 4096)
n, err := io.ReadFull(lr, buf[:cap(buf)])
Expand All @@ -295,7 +316,7 @@ func handleIngest(w http.ResponseWriter, r *http.Request) {
body := buf[:n]

var payload IngestPayload
if err := json.Unmarshal(body, &payload); err != nil {
if decodeErr := json.Unmarshal(body, &payload); decodeErr != nil {
rejected.Add(1)
writeJSON(w, 400, `{"error":"invalid_json"}`)
return
Expand Down Expand Up @@ -342,7 +363,7 @@ func handleIngest(w http.ResponseWriter, r *http.Request) {
ingested.Add(1)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"accepted":true,"eventId":"%s"}`, eventID)
_, _ = fmt.Fprintf(w, `{"accepted":true,"eventId":"%s"}`, eventID)
}

func handleHealth(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -384,7 +405,7 @@ func handleReady(w http.ResponseWriter, r *http.Request) {
resp, _ := json.Marshal(map[string]any{"status": statusStr, "checks": checks})
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
w.Write(resp)
_, _ = w.Write(resp)
}

// ── API Key resolution (in-memory cache → Redis → Postgres) ─────────────────
Expand Down Expand Up @@ -449,5 +470,5 @@ func resolveAPIKey(ctx context.Context, rawKey string) (*apiKeyEntry, error) {
func writeJSON(w http.ResponseWriter, status int, body string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
w.Write([]byte(body))
_, _ = w.Write([]byte(body))
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ func HandleDevice(pool *pgxpool.Pool, redisClient *redis.Client) func([]byte) er
observability.EventsRetry.Inc()
return err
}
defer tx.Rollback(ctx)
defer func() {
_ = tx.Rollback(ctx)
}()

var inserted string
err = tx.QueryRow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func HandleTelemetry(pool *pgxpool.Pool, redisClient *redis.Client) func([]byte)
observability.EventsRetry.Inc()
return err
}
defer tx.Rollback(ctx)
defer func() {
_ = tx.Rollback(ctx)
}()

var inserted string
err = tx.QueryRow(
Expand Down Expand Up @@ -164,9 +166,9 @@ func HandleTelemetry(pool *pgxpool.Pool, redisClient *redis.Client) func([]byte)
return err
}

if err := tx.Commit(ctx); err != nil {
if commitErr := tx.Commit(ctx); commitErr != nil {
observability.EventsRetry.Inc()
return err
return commitErr
}

versionKey := "device:" + deviceID.String()
Expand Down Expand Up @@ -277,7 +279,9 @@ func HandleTelemetryBatch(pool *pgxpool.Pool, redisClient *redis.Client) func(co
observability.EventsRetry.Inc()
return err
}
defer tx.Rollback(txCtx)
defer func() {
_ = tx.Rollback(txCtx)
}()

eventIDs := make([]string, len(events))
for i, e := range events {
Expand All @@ -300,7 +304,7 @@ func HandleTelemetryBatch(pool *pgxpool.Pool, redisClient *redis.Client) func(co
newEventIDs := make(map[string]struct{})
for rows.Next() {
var id string
if err := rows.Scan(&id); err == nil {
if scanErr := rows.Scan(&id); scanErr == nil {
newEventIDs[id] = struct{}{}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func setupTestDB(t *testing.T) *pgxpool.Pool {
),
)
require.NoError(t, err)
t.Cleanup(func() { pgContainer.Terminate(ctx) })
t.Cleanup(func() { _ = pgContainer.Terminate(ctx) })

connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
require.NoError(t, err)
Expand Down Expand Up @@ -253,4 +253,3 @@ func TestHandleTelemetryBatch_SkipsInvalidEvents(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 0, count)
}

2 changes: 1 addition & 1 deletion apps/saga-orchestrator/internal/consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package consumer
package consumer

import (
"context"
Expand Down
Loading
Loading