Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coderabbit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ reviews:
Verify eventId and tenantId are always set.
chat:
auto_reply: true

103 changes: 103 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
.PHONY: up down restart logs build seed test lint clean help

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Targets defined in Makefile =="
rg -n '^[a-zA-Z_-]+:.*$' Makefile

echo
echo "== .PHONY declaration =="
rg -n '^\.(PHONY):' Makefile

echo
echo "== Check whether required default target exists =="
rg -n '^all:' Makefile || true

Repository: pahuldeepp/GrainGuard-

Length of output: 1021


Add a default all target and complete the .PHONY list.

The .PHONY declaration on line 1 is incomplete, missing 12 targets that are actually defined later in the Makefile: logs-gateway, logs-bff, logs-kafka, migrate, test-go, test-react, test-load, lint-go, lint-ts, clean-cache, ps, and health. Additionally, the all target is missing entirely, which is a standard Makefile convention. These omissions can cause target/file name collisions and potential CI lint failures.

Proposed fix
-.PHONY: up down restart logs build seed test lint clean help
+.PHONY: all up down restart logs logs-gateway logs-bff logs-kafka build seed migrate test test-go test-react test-load lint lint-go lint-ts clean clean-cache ps health help
+
+all: up ## Default target
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.PHONY: up down restart logs build seed test lint clean help
.PHONY: all up down restart logs logs-gateway logs-bff logs-kafka build seed migrate test test-go test-react test-load lint lint-go lint-ts clean clean-cache ps health help
all: up ## Default target
🧰 Tools
🪛 checkmake (0.2.2)

[warning] 1-1: Missing required phony target "all"

(minphony)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Makefile` at line 1, Update the .PHONY declaration to include the missing
targets and add a default all target: extend the existing .PHONY list to include
all, logs-gateway, logs-bff, logs-kafka, migrate, test-go, test-react,
test-load, lint-go, lint-ts, clean-cache, ps, and health; then add a top-level
all target (e.g., all: up or all: build up — choose the appropriate default
behavior for this Makefile) so running make with no args uses the standard
default; ensure the new all name doesn't collide with any file and that the
added phony names match the actual targets (logs-gateway, logs-bff, logs-kafka,
migrate, test-go, test-react, test-load, lint-go, lint-ts, clean-cache, ps,
health).


# ============================================
# GrainGuard — Developer Makefile
# ============================================

help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

# ============================================
# Docker Compose
# ============================================

up: ## Start all services
docker compose -f infra/docker/docker-compose.yml up -d

down: ## Stop all services
docker compose -f infra/docker/docker-compose.yml down

restart: ## Restart all services
docker compose -f infra/docker/docker-compose.yml restart

logs: ## Tail logs for all services
docker compose -f infra/docker/docker-compose.yml logs -f

logs-gateway: ## Tail gateway logs
docker compose -f infra/docker/docker-compose.yml logs -f gateway

logs-bff: ## Tail BFF logs
docker compose -f infra/docker/docker-compose.yml logs -f bff

logs-kafka: ## Tail Kafka logs
docker compose -f infra/docker/docker-compose.yml logs -f kafka

build: ## Build all Docker images
docker compose -f infra/docker/docker-compose.yml build

# ============================================
# Database
# ============================================

seed: ## Seed dev data (Postgres + Elasticsearch)
cd scripts/seed && bash seed-postgres.sh
cd scripts/seed && bash seed-elasticsearch.sh

migrate: ## Run database migrations
go run libs/migrate/migrate.go

# ============================================
# Testing
# ============================================

test: ## Run all tests
$(MAKE) test-go
$(MAKE) test-react

test-go: ## Run Go tests
go test ./...

test-react: ## Run React/Node tests
cd apps/dashboard && npm test -- --run
cd apps/bff && npm test -- --run

test-load: ## Run k6 load tests
k6 run scripts/load-tests/gateway-load-test.js
k6 run scripts/load-tests/bff-load-test.js

# ============================================
# Linting
# ============================================

lint: ## Lint all services
$(MAKE) lint-go
$(MAKE) lint-ts

lint-go: ## Lint Go services
golangci-lint run ./...

lint-ts: ## Lint TypeScript services
cd apps/gateway && npm run lint
cd apps/bff && npm run lint
cd apps/dashboard && npm run lint

# ============================================
# Cleanup
# ============================================

clean: ## Remove all containers and volumes
docker compose -f infra/docker/docker-compose.yml down -v --remove-orphans

clean-cache: ## Clear Redis cache
docker compose -f infra/docker/docker-compose.yml exec redis redis-cli FLUSHALL

Comment on lines +91 to +93

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find and verify the Makefile exists
fd -type f -name "Makefile" | head -20

Repository: pahuldeepp/GrainGuard-

Length of output: 236


🏁 Script executed:

# Once located, read the specific lines mentioned (91-93)
cat -n Makefile | sed -n '85,100p'

Repository: pahuldeepp/GrainGuard-

Length of output: 660


🏁 Script executed:

# Check the docker-compose configuration for Redis setup
fd -type f -name "docker-compose.yml" | head -10

Repository: pahuldeepp/GrainGuard-

Length of output: 236


🏁 Script executed:

# Find docker-compose.yml with correct syntax
find . -name "docker-compose.yml" -type f

Repository: pahuldeepp/GrainGuard-

Length of output: 99


🏁 Script executed:

# Look at the docker-compose configuration
cat infra/docker/docker-compose.yml

Repository: pahuldeepp/GrainGuard-

Length of output: 14710


🏁 Script executed:

# Check if there are any references to Redis DB selection in the codebase
rg "redis|Redis|REDIS" --max-count=20 -i

Repository: pahuldeepp/GrainGuard-

Length of output: 28172


Use FLUSHDB instead of FLUSHALL in the clean-cache target.

Line 92 executes FLUSHALL, which clears all Redis databases. Multiple services (bff, gateway, read-model-builder) depend on this Redis instance for caching, rate-limiting, and idempotency locks. Use FLUSHDB to clear only the current database, limiting the scope of cache cleanup.

Fix
 clean-cache: ## Clear Redis cache
-	docker compose -f infra/docker/docker-compose.yml exec redis redis-cli FLUSHALL
+	docker compose -f infra/docker/docker-compose.yml exec redis redis-cli FLUSHDB
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
clean-cache: ## Clear Redis cache
docker compose -f infra/docker/docker-compose.yml exec redis redis-cli FLUSHALL
clean-cache: ## Clear Redis cache
docker compose -f infra/docker/docker-compose.yml exec redis redis-cli FLUSHDB
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Makefile` around lines 91 - 93, The Makefile target clean-cache currently
runs redis-cli FLUSHALL which wipes all Redis databases; update the target
(clean-cache) to use redis-cli FLUSHDB so only the current DB is cleared, i.e.,
replace the FLUSHALL argument with FLUSHDB in the docker compose exec command
that invokes redis-cli.

# ============================================
# Status
# ============================================

ps: ## Show running services
docker compose -f infra/docker/docker-compose.yml ps

health: ## Check health of all services
curl -s http://localhost:8086/health | jq
curl -s http://localhost:4000/health | jq
Comment on lines +101 to +103

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, locate and examine the Makefile
find . -name "Makefile" -type f | head -5

Repository: pahuldeepp/GrainGuard-

Length of output: 76


🏁 Script executed:

# Once we have the Makefile, check the health target
sed -n '95,110p' Makefile

Repository: pahuldeepp/GrainGuard-

Length of output: 332


🌐 Web query:

curl -s flag behavior HTTP error codes 4xx 5xx exit status

💡 Result:

  • -s / --silent only affects output: it suppresses the progress meter and curl’s error messages; it does not change how HTTP status codes map to curl’s exit status. Use -S / --show-error with -s if you want “silent except show errors”. [1][2]
  • For HTTP 4xx/5xx: by default, curl can still exit 0 even if the server responds 404, 500, etc., because the transfer itself succeeded at the HTTP layer. If you want HTTP >= 400 to be treated as a failure (non-zero exit), use -f / --fail (or --fail-with-body). [3]
  • With -f/--fail, curl uses exit code 22 for “HTTP error returned” (4xx/5xx). [4]

Common script pattern:

curl -fsS https://example.com/resource   # silent, show errors, fail (exit!=0) on 4xx/5xx

Sources: [1] [2] [3] [4]


Add -f flag to curl commands so the health target fails on HTTP errors.

Lines 102–103 use curl -s which exits 0 even on HTTP 4xx/5xx responses (the transfer itself succeeds). Without the -f flag, make health will report success when services are unhealthy. Use curl -fsS instead to fail fast on HTTP errors; optionally add --max-time 5 to prevent hangs on stalled connections.

Proposed fix
 health: ## Check health of all services
-	curl -s http://localhost:8086/health | jq
-	curl -s http://localhost:4000/health | jq
+	curl -fsS --max-time 5 http://localhost:8086/health | jq
+	curl -fsS --max-time 5 http://localhost:4000/health | jq
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Makefile` around lines 101 - 103, The health Makefile target uses silent curl
commands that don't fail on HTTP errors; update the two curl invocations in the
"health" target to use curl -fsS (and optionally add --max-time 5) so the
command will exit non‑zero on 4xx/5xx responses and time out reliably; modify
the curl lines under the health target (the two curl commands calling
http://localhost:8086/health and http://localhost:4000/health) to include -fsS
(and --max-time 5 if desired).

1 change: 1 addition & 0 deletions apps/bff/src/datasources/cassandra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ export async function closeCassandraClient(): Promise<void> {
client = null;
}
}

1 change: 1 addition & 0 deletions apps/bff/src/datasources/elasticsearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ export const search = {
}
},
};

158 changes: 95 additions & 63 deletions apps/bff/src/datasources/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Pool } from "pg";
import { postgresCircuitBreaker } from "../lib/circuitBreaker";
import { cache } from "./redis";

const pool = new Pool({
host: process.env.READ_DB_HOST || "postgres-read",
host: process.env.READ_DB_HOST || "pgbouncer",
port: parseInt(process.env.READ_DB_PORT || "5432"),
database: process.env.READ_DB_NAME || "grainguard_read",
user: process.env.READ_DB_USER || "postgres",
password: process.env.READ_DB_PASSWORD || "postgres",
max: 10,
max: 50,
});


// Circuit-breaker-wrapped query helper
async function cbQuery(text: string, values?: any[]): Promise<import("pg").QueryResult<any>> {
return postgresCircuitBreaker.execute(() => pool.query(text, values));
Expand All @@ -33,9 +33,6 @@ export async function tenantQuery(
});
}

// Tenant-scoped query — sets app.current_tenant_id for RLS enforcement
// Use this for all queries that should be tenant-isolated

export const db = {

async getDevice(deviceId: string) {
Expand All @@ -49,14 +46,29 @@ export const db = {
},

async getAllDevices(limit: number = 20) {
const result = await cbQuery(
`SELECT device_id, tenant_id, serial_number, created_at
FROM device_projections
ORDER BY created_at DESC
LIMIT $1`,
[limit]
);
return result.rows;
const cacheKey = `devices:all:${limit}`;
const cached = await cache.get<any[]>(cacheKey);
if (cached) return cached;

const locked = await cache.acquireLock(cacheKey, 5);
if (!locked) {
await new Promise(r => setTimeout(r, 100));
return await cache.get<any[]>(cacheKey) || [];
}

try {
const result = await cbQuery(
`SELECT device_id, tenant_id, serial_number, created_at
FROM device_projections
ORDER BY created_at DESC
LIMIT $1`,
[limit]
);
await cache.set(cacheKey, result.rows, 30);
Comment on lines +49 to +67

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make this cache path tenant-aware or explicitly admin-only.

devices:all:${limit} is a shared cross-tenant key, and the query still returns rows for every tenant. If this method is reachable from normal tenant-scoped resolver flows, one caller can warm a payload that every other caller reuses. Thread tenantId through this API and key/query on it, or split it into an explicitly admin-only path. As per coding guidelines, apps/bff/src/**/*.ts: Check all queries use tenantId for RLS. Verify cache keys include tenantId.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/bff/src/datasources/postgres.ts` around lines 49 - 67, The cache key and
query are tenant-agnostic: update the call site and this function to accept a
tenantId (or an explicit isAdmin flag), include tenantId in the cache key (e.g.,
`devices:tenant:${tenantId}:all:${limit}`) and add a WHERE tenant_id = $2 (or
equivalent) to the cbQuery so the SQL filters by tenant; if this endpoint must
be admin-only instead, document and enforce that by checking an isAdmin guard
before using the shared key. Ensure the cache.get/set/acquireLock use the new
tenant-scoped key and thread tenantId through any callers that invoke this code
so RLS/tenant isolation is preserved.

return result.rows;
} finally {
await cache.releaseLock(cacheKey);
Comment on lines +49 to +70

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not let cache contention or Redis failures turn into empty/failed reads.

On a contested miss, the loser sleeps 100ms and then returns [] if the winner has not populated Redis yet. Separately, any cache.get / set / releaseLock error will reject the request, even when Postgres is healthy or the DB query already succeeded. These paths should fall back to the underlying DB query whenever the cache cannot serve a value.

Also applies to: 85-120, 165-205

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/bff/src/datasources/postgres.ts` around lines 49 - 70, The cache logic
currently returns [] on contested misses and lets any cache
(get/set/releaseLock) errors surface instead of falling back to Postgres; update
the flow around cacheKey, cache.get, cache.acquireLock, cache.set and
cache.releaseLock so that any cache error or a contested-miss timeout triggers a
direct cbQuery to Postgres and returns those rows, and ensure cache.releaseLock
errors are caught/logged and do not override the DB result. Concretely: on cache
miss, attempt acquireLock; if not locked, sleep then try cache.get again and if
still empty call cbQuery(...) and return its rows; wrap all
cache.get/set/releaseLock calls in try/catch to fall back to cbQuery on failure;
after a successful cbQuery, attempt cache.set but ignore/set-log any errors so
the DB result is still returned. This same pattern should be applied to the
other similar blocks indicated (lines ~85-120 and ~165-205).

Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
},

async getDeviceTelemetry(deviceId: string) {
Expand All @@ -70,26 +82,43 @@ export const db = {
},

async getAllTelemetry(limit: number = 20, tenantId?: string) {
if (tenantId) {
const cacheKey = `telemetry:all:${tenantId || "global"}:${limit}`;
const cached = await cache.get<any[]>(cacheKey);
if (cached) return cached;

const locked = await cache.acquireLock(cacheKey, 5);
if (!locked) {
await new Promise(r => setTimeout(r, 100));
return await cache.get<any[]>(cacheKey) || [];
}

try {
if (tenantId) {
const result = await cbQuery(
`SELECT t.device_id, t.temperature, t.humidity, t.recorded_at, t.updated_at, t.version
FROM device_telemetry_latest t
INNER JOIN device_projections d ON d.device_id = t.device_id
WHERE d.tenant_id = $1
ORDER BY t.updated_at DESC
LIMIT $2`,
[tenantId, limit]
);
Comment on lines +96 to +105

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the tenant-scoped query path on these branches.

These branches already have a tenantId, but they still call cbQuery, so the new cached reads are bypassing the module's tenant/RLS path and relying only on ad hoc predicates. Route them through the tenant-scoped helper here, keeping the explicit tenant_id filter if you want defense in depth. As per coding guidelines, apps/bff/src/**/*.ts: Check all queries use tenantId for RLS.

Also applies to: 176-187

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/bff/src/datasources/postgres.ts` around lines 96 - 105, The branch is
calling cbQuery directly despite having a tenantId, bypassing the module's
tenant/RLS pathway; replace the cbQuery call in this block (the query selecting
from device_telemetry_latest joined to device_projections) with the module's
tenant-scoped query helper (e.g., the tenantScopedQuery/tenantQuery helper used
elsewhere) and pass tenantId and limit through that helper while keeping the
explicit WHERE d.tenant_id = $1 predicate for defense-in-depth; apply the same
replacement for the analogous block around lines 176-187 so all tenant-aware
reads use the tenant-scoped helper rather than cbQuery.

await cache.set(cacheKey, result.rows, 30);
return result.rows;
}

const result = await cbQuery(
`SELECT t.device_id, t.temperature, t.humidity, t.recorded_at, t.updated_at, t.version
FROM device_telemetry_latest t
INNER JOIN device_projections d ON d.device_id = t.device_id
WHERE d.tenant_id = $1
ORDER BY t.updated_at DESC
LIMIT $2`,
[tenantId, limit]
`SELECT device_id, temperature, humidity, recorded_at, updated_at, version
FROM device_telemetry_latest
ORDER BY updated_at DESC
LIMIT $1`,
[limit]
);
await cache.set(cacheKey, result.rows, 30);
return result.rows;
} finally {
await cache.releaseLock(cacheKey);
}
const result = await cbQuery(
`SELECT device_id, temperature, humidity, recorded_at, updated_at, version
FROM device_telemetry_latest
ORDER BY updated_at DESC
LIMIT $1`,
[limit]
);
return result.rows;
},

async getDeviceWithTelemetry(deviceId: string) {
Expand Down Expand Up @@ -133,45 +162,48 @@ export const db = {
},

async getAllDevicesWithTelemetry(limit: number = 20, tenantId?: string) {
if (tenantId) {
const cacheKey = `devices:telemetry:${tenantId || "global"}:${limit}`;
const cached = await cache.get<any[]>(cacheKey);
if (cached) return cached;

const locked = await cache.acquireLock(cacheKey, 5);
if (!locked) {
await new Promise(r => setTimeout(r, 100));
return await cache.get<any[]>(cacheKey) || [];
}

try {
if (tenantId) {
const result = await cbQuery(
`SELECT
d.device_id, d.tenant_id, d.serial_number, d.created_at,
t.temperature, t.humidity, t.recorded_at, t.version
FROM device_projections d
LEFT JOIN device_telemetry_latest t ON d.device_id = t.device_id
WHERE d.tenant_id = $1
ORDER BY d.created_at DESC
LIMIT $2`,
[tenantId, limit]
);
await cache.set(cacheKey, result.rows, 30);
return result.rows;
}

const result = await cbQuery(
`SELECT
d.device_id,
d.tenant_id,
d.serial_number,
d.created_at,
t.temperature,
t.humidity,
t.recorded_at,
t.version
d.device_id, d.tenant_id, d.serial_number, d.created_at,
t.temperature, t.humidity, t.recorded_at, t.version
FROM device_projections d
LEFT JOIN device_telemetry_latest t
ON d.device_id = t.device_id
WHERE d.tenant_id = $1
LEFT JOIN device_telemetry_latest t ON d.device_id = t.device_id
ORDER BY d.created_at DESC
LIMIT $2`,
[tenantId, limit]
LIMIT $1`,
[limit]
);
await cache.set(cacheKey, result.rows, 30);
return result.rows;
} finally {
await cache.releaseLock(cacheKey);
}
const result = await cbQuery(
`SELECT
d.device_id,
d.tenant_id,
d.serial_number,
d.created_at,
t.temperature,
t.humidity,
t.recorded_at,
t.version
FROM device_projections d
LEFT JOIN device_telemetry_latest t
ON d.device_id = t.device_id
ORDER BY d.created_at DESC
LIMIT $1`,
[limit]
);
return result.rows;
},

async getDevicesWithCursor(first: number = 20, after: string | null = null, tenantId?: string) {
Expand Down Expand Up @@ -264,4 +296,4 @@ export const db = {
},
};
},
};
};
2 changes: 1 addition & 1 deletion apps/bff/src/datasources/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ export const cache = {
async releaseLock(key: string): Promise<void> {
await client.del(`lock:${key}`);
}
};
};
1 change: 1 addition & 0 deletions apps/bff/src/lib/circuitBreaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ export const postgresCircuitBreaker = new CircuitBreaker({
successThreshold: 2,
timeout: 30_000,
});

1 change: 1 addition & 0 deletions apps/bff/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export const pubsub = new PubSub();

export const TELEMETRY_UPDATED = "TELEMETRY_UPDATED";
export const TENANT_TELEMETRY_UPDATED = "TENANT_TELEMETRY_UPDATED";

1 change: 1 addition & 0 deletions apps/bff/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,4 @@ export const resolvers = {
},
},
};

1 change: 1 addition & 0 deletions apps/bff/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,4 @@
tenantTelemetryUpdated(tenantId: String!): Telemetry!
}
`;

1 change: 1 addition & 0 deletions apps/bff/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,4 @@ startServer().catch((err) => {
console.error("Failed to start BFF:", err);
process.exit(1);
});

2 changes: 1 addition & 1 deletion apps/bff/src/telemetryWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ export async function startTelemetryWatcher() {
});

console.log("Telemetry watcher started — LISTEN telemetry_updated");
}
}
1 change: 1 addition & 0 deletions apps/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,4 @@ func init() {
// Ensure uuid package is used
_ = uuid.New()
}

1 change: 1 addition & 0 deletions apps/cdc-transformer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,4 @@ func isSkippable(err error) bool {
}
return errors.Is(err, context.Canceled)
}

1 change: 1 addition & 0 deletions apps/cdc-transformer/internal/consumer/cdc_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,4 @@ func isSkippable(err error) bool {
return errors.Is(err, context.Canceled) ||
strings.HasPrefix(s, "skip:")
}

2 changes: 1 addition & 1 deletion apps/cdc-transformer/internal/idempotency/dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ func (d *Deduper) Confirm(ctx context.Context, key string) error {
// Cancel removes reservation when publish fails so it can retry.
func (d *Deduper) Cancel(ctx context.Context, key string) error {
return d.rdb.Del(ctx, key).Err()
}
}
Loading
Loading