PR: feat: NEXUS infra-gateway — Kafka pipeline & cloud deployment#1
Conversation
- Add Docker Compose orchestration with onprem and cloud profiles - Add cloud ingestor FastAPI service with Kafka producer (aiokafka) - Add on-prem vault aggregator and sync service - Add Zookeeper and Kafka broker configuration (Confluent 7.6.0) - Add RBAC configuration for role-based access control - Add Render deployment configuration for cloud ingestor - Add SSL/TLS support via Aiven cert integration - Add health checks for all services - Configure gitignore for secrets, certs, and environment files
📝 WalkthroughWalkthroughThis pull request introduces a complete infrastructure gateway system for NEXUS, comprising a cloud-based FastAPI ingestor service, on-premises vault aggregation and synchronization services, Docker Compose orchestration, and comprehensive test coverage for multi-tenant telemetry event ingestion and processing. Changes
Sequence Diagram(s)sequenceDiagram
participant SDK as SDK/Client
participant CloudIng as Cloud Ingestor<br/>(FastAPI)
participant Kafka as Kafka Broker
participant OnPremAgg as On-Prem Aggregator<br/>(SQLite)
participant OnPremSync as On-Prem Sync<br/>Service
SDK->>CloudIng: POST /api/v1/events<br/>(EventBatchRequest + API Key)
CloudIng->>CloudIng: Verify API Key
CloudIng->>CloudIng: Ensure tenant topics exist
CloudIng->>Kafka: Produce event to<br/>nexus.events.{tenant_id}
CloudIng-->>SDK: 202 Accepted
OnPremAgg->>OnPremAgg: Poll SQLite raw_events<br/>where is_processed=0
OnPremAgg->>OnPremAgg: Group by tenant_id
OnPremAgg->>OnPremAgg: Aggregate per-feature summaries<br/>(counts, durations, user dedup)
OnPremAgg->>OnPremAgg: Compute SHA-256 checksum
OnPremAgg->>OnPremAgg: Insert InsightPacket<br/>Mark raw_events processed
OnPremSync->>OnPremSync: Fetch unsynced packets<br/>where is_synced=0
OnPremSync->>OnPremSync: Verify checksum per packet
OnPremSync->>CloudIng: POST /api/v1/ingest<br/>(InsightPacketBatch + Auth)
CloudIng->>CloudIng: Validate API Key
CloudIng->>CloudIng: Ensure tenant topics
CloudIng->>Kafka: Produce packet to<br/>nexus.packets.{tenant_id}
CloudIng-->>OnPremSync: IngestResponse<br/>(acknowledged_ids, rejected_count)
OnPremSync->>OnPremSync: Mark acknowledged packets<br/>is_synced=1
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 14
🧹 Nitpick comments (12)
infra-gateway/tests/conftest.py (1)
1-4: Remove redundantsys.pathsetup in favor ofpytest.ini.The
pythonpathconfiguration ininfra-gateway/pytest.ini(line 4) already includes bothon_prem_vaultandcloud_ingestor, making the manualsys.path.insert()calls here redundant. This duplication can lead to unintended import ordering when the same paths are added multiple times. Usepytest.inias the single source of truth for path setup and delete lines 3–4.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/tests/conftest.py` around lines 1 - 4, Remove the redundant sys.path modifications in conftest.py: delete the two sys.path.insert(0, os.path.join(_infra, "on_prem_vault")) and sys.path.insert(0, os.path.join(_infra, "cloud_ingestor")) calls (the _infra variable may remain if used elsewhere) so pytest.ini's pythonpath is the single source of truth; ensure you only rely on pytest.ini for adding "on_prem_vault" and "cloud_ingestor" to sys.path and do not re-add them programmatically.infra-gateway/cloud_ingestor/requirements.txt (1)
1-22: Consolidate three duplicate requirement manifests to prevent further drift.This file is byte-for-byte identical to
infra-gateway/requirements.txtandinfra-gateway/on_prem_vault/requirements.txt. Version drift is already present: Dockerfiles pinaiokafka==0.11.0while all three requirements.txt files specifyaiokafka==0.10.0. Use a shared base or constraints file to maintain consistency across all copies.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/cloud_ingestor/requirements.txt` around lines 1 - 22, This requirements file is duplicated across three manifests and mismatches Dockerfile pins for aiokafka; consolidate by creating a single shared requirements/constraints file (e.g., a central requirements.txt or constraints.txt) and update these three copies to reference it via -r or -c, ensure the shared file pins aiokafka to the Dockerfile-compatible version (aiokafka==0.11.0) and keep the other packages (requests, fastapi, uvicorn, pydantic, jsonschema, pytest, pytest-asyncio, httpx) as needed so all three manifests pull from the same source of truth.infra-gateway/on_prem_vault/Dockerfile.sync (2)
1-22: Consider using requirements.txt for dependency management.The Dockerfile hardcodes dependencies inline rather than using the
requirements.txtfile in the same directory. This creates a maintenance burden where versions must be synchronized in multiple places.♻️ Proposed alternative using requirements.txt
WORKDIR /app +COPY requirements.txt . -RUN pip install --no-cache-dir \ - requests==2.31.0 \ - aiokafka==0.11.0 \ - fastapi==0.111.0 \ - "uvicorn[standard]==0.29.0" \ - "pydantic>=2.10,<3" \ - jsonschema==4.22.0 +RUN pip install --no-cache-dir -r requirements.txt COPY sync_service.py .🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/on_prem_vault/Dockerfile.sync` around lines 1 - 22, The Dockerfile currently pins Python packages inline in the RUN pip install ... layer, which duplicates dependency data and complicates updates; replace the inline pip install list with a single COPY of the existing requirements.txt into the image and run pip install --no-cache-dir -r requirements.txt (keep existing flags like --no-cache-dir and the slim base, preserve USER nexus and CMD "python sync_service.py"); update the Dockerfile steps around pip install and COPY to ensure requirements.txt is added before pip install so the build uses the file and removes duplicated version management.
10-16: Unnecessary dependencies and version mismatch with requirements.txt.This Dockerfile installs
aiokafka==0.11.0,fastapi,uvicorn, andjsonschema, butsync_service.pyonly usesrequestsfor HTTP calls. These extra packages bloat the image unnecessarily.Additionally,
aiokafka==0.11.0here conflicts withaiokafka==0.10.0inrequirements.txt.♻️ Proposed fix: Install only required dependencies
RUN pip install --no-cache-dir \ - requests==2.31.0 \ - aiokafka==0.11.0 \ - fastapi==0.111.0 \ - "uvicorn[standard]==0.29.0" \ - "pydantic>=2.10,<3" \ - jsonschema==4.22.0 + requests==2.31.0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/on_prem_vault/Dockerfile.sync` around lines 10 - 16, The Dockerfile's pip install line installs unused packages and a conflicting aiokafka version; update the RUN pip install block in infra-gateway/on_prem_vault/Dockerfile.sync to only install the real runtime dependency used by sync_service.py (requests==2.31.0) and either remove aiokafka/fastapi/uvicorn/jsonschema entirely or change aiokafka to match the version in requirements.txt (aiokafka==0.10.0) if other services require it; ensure the final pip install list mirrors requirements.txt to avoid version drift and reduce image size.infra-gateway/cloud_ingestor/Dockerfile.ingestor (2)
11-18: Version mismatch: aiokafka 0.11.0 vs requirements.txt 0.10.0.The Dockerfile pins
aiokafka==0.11.0whilerequirements.txtspecifiesaiokafka==0.10.0. This inconsistency can cause confusion and unexpected behavior differences between local development and containerized deployments.♻️ Proposed fix: Align versions or use requirements.txt
Option 1 - Align to 0.11.0 in requirements.txt:
# Kafka (Python-native, no C build issues) -aiokafka==0.10.0 +aiokafka==0.11.0Option 2 - Use requirements.txt in Dockerfile:
+COPY cloud_ingestor/requirements.txt . -RUN pip install --no-cache-dir \ - requests==2.31.0 \ - aiokafka==0.11.0 \ - ... +RUN pip install --no-cache-dir -r requirements.txt🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/cloud_ingestor/Dockerfile.ingestor` around lines 11 - 18, The Dockerfile currently pins aiokafka==0.11.0 while requirements.txt pins aiokafka==0.10.0; pick one canonical source and make them consistent: either update requirements.txt to aiokafka==0.11.0 or change the Dockerfile to install from requirements.txt (remove the explicit aiokafka pin and run pip install -r requirements.txt) so both local and container installs use the same aiokafka version; adjust the chosen file (requirements.txt or Dockerfile.ingestor) and ensure any CI/build steps use the same reference.
22-24: Shell form CMD prevents signal propagation.Using shell form (
CMD uvicorn ...) wraps the command in/bin/sh -c, which doesn't forward signals (SIGTERM) properly to uvicorn. This can cause slow container shutdowns and potential data loss during graceful termination.♻️ Proposed fix: Use exec form
USER nexus EXPOSE 8080 -CMD uvicorn kafka_config:app --host 0.0.0.0 --port ${PORT:-8080} --workers 2 +CMD ["sh", "-c", "exec uvicorn kafka_config:app --host 0.0.0.0 --port ${PORT:-8080} --workers 2"]The
execreplaces the shell process with uvicorn, allowing proper signal handling while retaining env var substitution.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/cloud_ingestor/Dockerfile.ingestor` around lines 22 - 24, The Dockerfile uses shell-form CMD which prevents signal propagation; replace the shell form CMD that runs "uvicorn kafka_config:app --host 0.0.0.0 --port ${PORT:-8080} --workers 2" with the exec/JSON form so the shell is not spawned and signals reach uvicorn directly (keep the same arguments and env var substitution semantics), updating the CMD instruction in the Dockerfile.ingestor near the existing USER nexus line.infra-gateway/on_prem_vault/requirements.txt (1)
19-22: Separate dev/test dependencies from production requirements.Mixing
pytest,pytest-asyncio, andhttpxwith production dependencies means they'll be installed in production containers, increasing attack surface and image size.♻️ Proposed fix: Create separate requirements-dev.txt
Create
requirements-dev.txt:-r requirements.txt pytest==8.2.0 pytest-asyncio==0.23.6 httpx==0.27.0Then remove from
requirements.txt:# JSON schema validation jsonschema==4.22.0 - -# Dev/testing only -pytest==8.2.0 -pytest-asyncio==0.23.6 -httpx==0.27.0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/on_prem_vault/requirements.txt` around lines 19 - 22, The requirements.txt currently includes dev/test packages (pytest, pytest-asyncio, httpx); extract these into a new requirements-dev.txt that begins with "-r requirements.txt" and lists pytest==8.2.0, pytest-asyncio==0.23.6, and httpx==0.27.0, then remove those three entries from infra-gateway/on_prem_vault/requirements.txt so production installs no longer pull test/dev deps.infra-gateway/on_prem_vault/aggregator.py (2)
92-106: CPU usage calculation may be inaccurate.The current implementation reads
/proc/statonce and calculates instantaneous CPU usage. However, CPU percentage is typically calculated as the delta between two readings over a time interval. The current approach gives cumulative usage since boot, not current load.♻️ Proposed fix: Calculate delta-based CPU usage
+_prev_idle = 0 +_prev_total = 0 + def get_cpu_usage() -> float: """ Reads CPU usage from /proc/stat for a lightweight check. Returns a float percentage (0.0 - 100.0). Falls back to 0.0 if /proc/stat is unavailable (e.g., Windows dev machines). """ + global _prev_idle, _prev_total try: with open("/proc/stat", "r") as f: line = f.readline() fields = [float(x) for x in line.strip().split()[1:]] idle = fields[3] total = sum(fields) - return 100.0 * (1.0 - idle / total) if total > 0 else 0.0 + + idle_delta = idle - _prev_idle + total_delta = total - _prev_total + _prev_idle, _prev_total = idle, total + + if total_delta > 0: + return 100.0 * (1.0 - idle_delta / total_delta) + return 0.0 except FileNotFoundError: return 0.0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/on_prem_vault/aggregator.py` around lines 92 - 106, The get_cpu_usage function currently reads /proc/stat once and returns cumulative since boot; change it to perform two readings of the CPU fields (parse the same fields from /proc/stat in function get_cpu_usage), separated by a short sleep (e.g., 0.1-0.5s), compute delta_total and delta_idle and return 100.0 * (1 - delta_idle / delta_total) when delta_total > 0; preserve the FileNotFoundError fallback to return 0.0 and ensure parsing still uses fields[3] as idle and sum(fields) as total for both samples.
137-139: Addstrict=Trueto zip() for safety.Same issue as in
sync_service.py: withoutstrict=True, mismatched lengths betweencursor.descriptionandrowwould cause silent data truncation.♻️ Proposed fix
columns = [desc[0] for desc in cursor.description] - rows = [dict(zip(columns, row)) for row in cursor.fetchall()] + rows = [dict(zip(columns, row, strict=True)) for row in cursor.fetchall()]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/on_prem_vault/aggregator.py` around lines 137 - 139, The current rows construction uses dict(zip(columns, row)) which can silently drop values if cursor.description and row lengths differ; update the list comprehension in aggregator.py that builds rows (the variables columns, cursor.description and the comprehension using cursor.fetchall()) to use zip(columns, row, strict=True) so mismatches raise an error and surface the issue during execution.infra-gateway/tests/test_person_b.py (1)
241-242: Module-level patching of aiokafka is fragile.This approach patches aiokafka before importing
kafka_config, which works but is order-dependent and fragile. If imports are reordered or another test file importskafka_configfirst, tests may fail unexpectedly.♻️ Consider using pytest fixtures for cleaner isolation
`@pytest.fixture`(autouse=True) def mock_kafka(): with patch("aiokafka.AIOKafkaProducer"), \ patch("aiokafka.admin.AIOKafkaAdminClient"): # Import inside the patch context import importlib import kafka_config importlib.reload(kafka_config) yield kafka_configAlternatively, structure
kafka_config.pyto allow dependency injection for testing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/tests/test_person_b.py` around lines 241 - 242, Replace the fragile module-level patching of aiokafka in test_person_b.py with a pytest fixture that applies the patches and reloads the kafka_config module inside the patch context so imports are deterministic: create an autouse fixture (e.g., mock_kafka) that patches aiokafka.AIOKafkaProducer and aiokafka.admin.AIOKafkaAdminClient, imports and importlib.reload(kafka_config) while patches are active, yields the module for tests, and remove the top-level with patch(...) import kafka_config usage so tests no longer depend on import order.infra-gateway/docker-compose.yml (1)
183-183: Inconsistent worker count: 4 here vs 2 in Dockerfile.The
commandoverride specifies--workers 4, but the Dockerfile'sCMDuses--workers 2. This creates confusion about the intended configuration and means the Dockerfile default is never used in local development.Either remove the override to use the Dockerfile default, or update the Dockerfile to match if 4 workers is the intended production setting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/docker-compose.yml` at line 183, The docker-compose service command overrides the Dockerfile CMD by starting uvicorn with "--workers 4" while the Dockerfile's CMD uses "--workers 2", causing an inconsistency; fix by either removing the overridden command line from docker-compose.yml so the container uses the Dockerfile CMD, or update the Dockerfile CMD to match the compose override (change the CMD invoking "uvicorn kafka_config:app --host 0.0.0.0 --port 8080 --workers 4") so both specify the same worker count.infra-gateway/on_prem_vault/sync_service.py (1)
190-192: Addstrict=Trueto zip() for safety.Without
strict=True, ifcursor.descriptionandrowhave mismatched lengths, data would be silently truncated or misaligned, potentially causing hard-to-debug issues.♻️ Proposed fix
columns = [desc[0] for desc in cursor.description] - return [dict(zip(columns, row)) for row in cursor.fetchall()] + return [dict(zip(columns, row, strict=True)) for row in cursor.fetchall()]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra-gateway/on_prem_vault/sync_service.py` around lines 190 - 192, The list comprehension building rows uses dict(zip(columns, row)) which silently truncates or misaligns when cursor.description (used to build columns) and each row have different lengths; update the comprehension to use zip(columns, row, strict=True) so a ValueError is raised on length mismatch, i.e., replace dict(zip(columns, row)) with dict(zip(columns, row, strict=True)) where columns is built from cursor.description and rows come from cursor.fetchall().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@infra-gateway/cloud_ingestor/kafka_config.py`:
- Around line 247-252: The InsightPacketBatch model currently uses packets:
list[dict], which skips validation; define a Pydantic model InsightPacket
(including the fields your handlers rely on such as tenant_id: str and topic:
str and any timestamp/ids used by ensure_tenant_topics() and produce_to_kafka())
and replace packets: list[dict] with packets: list[InsightPacket]; set
appropriate validation rules (e.g., types, non-empty strings, extra=forbid if
desired) and update imports/usages so ensure_tenant_topics() and
produce_to_kafka() receive validated InsightPacket instances.
- Around line 203-213: produce_to_kafka currently swallows broker failures and
returns False so upstream HTTP routes still return 200/202; change it to surface
failures by logging and re-raising the exception (or raising a custom exception)
instead of returning success/False so callers can return an error status;
specifically update produce_to_kafka to catch KafkaError only to log and then
raise, ensure the await kafka_producer.send_and_wait call is not treated as
success on exception, and update the other similar send helper blocks in the
same module (the other produce/send wrappers around
kafka_producer.send_and_wait) to follow the same pattern so HTTP handlers can
convert exceptions into 5xx/503 responses.
- Around line 234-239: The validate_user_hash field validator
(validate_user_hash on user_hash) only checks length; update it to also verify
the string contains only hex digits (0-9, a-f, A-F) to ensure it's a valid
SHA-256 hex digest; modify the validator to return v only if v is None or
(len(v) == 64 and all characters are hex) — use a regex like
r'^[0-9a-fA-F]{64}$' or Python's string.hexdigits check and raise the same
ValueError message when it fails.
- Around line 44-47: API_KEY_SECRET currently falls back to a hardcoded default
which silently allows missing/misspelled env vars; change the code that defines
API_KEY_SECRET to require the NEXUS_API_KEY environment variable and fail fast
if it's not set (e.g., read via os.environ[...] or explicitly check
os.environ.get("NEXUS_API_KEY") and raise a RuntimeError/ValueError during
startup), so update the API_KEY_SECRET initialization to validate presence and
abort with a clear error message rather than using the
"dev-secret-key-replace-in-prod" default.
- Around line 106-111: The TLS contexts currently disable hostname verification
by setting ssl_ctx.check_hostname = False; locate the two SSL context
constructions (the one guarded by KAFKA_USE_SSL that assigns ssl_ctx and the
similar one for the admin/topic calls, e.g., KAFKA_ADMIN_USE_SSL) and remove the
lines that set check_hostname = False so hostname verification remains enabled
(keep ssl_ctx.load_verify_locations, load_cert_chain and ssl_ctx.verify_mode =
_ssl.CERT_REQUIRED unchanged).
- Around line 45-46: The KAFKA_RETENTION_MS constant is parsed but never applied
to new topics; update the topic creation code that constructs NewTopic (look for
usages of NewTopic or the function that creates tenant topics) to include the
retention window by adding the topic config "retention.ms" set to the
KAFKA_RETENTION_MS value (stringified) in the NewTopic config dict/parameters;
ensure the NewTopic call uses this config so newly created tenant topics use the
configured retention instead of the broker default.
In `@infra-gateway/docker-compose.yml`:
- Around line 92-93: The docker-compose depends_on for nexus-vault-sync
currently lists nexus-vault-aggregator without a health requirement; update the
nexus-vault-sync service's depends_on to require nexus-vault-aggregator:
condition: service_healthy and ensure the nexus-vault-aggregator service defines
a proper healthcheck that confirms the SQLite DB/schema is ready (e.g., a
command that checks DB connectivity or schema migration status). Modify the
depends_on block for nexus-vault-sync and add/verify a healthcheck stanza on
nexus-vault-aggregator (targeting the health command used to indicate
readiness).
- Around line 157-159: The docker-compose build context is set to
./cloud_ingestor while render.yaml uses context: . and dockerfilePath:
./cloud_ingestor/Dockerfile.ingestor, causing the Dockerfile's COPY
cloud_ingestor/kafka_config.py . to fail; fix by aligning contexts—either change
the docker-compose build.context to . and keep dockerfile:
./cloud_ingestor/Dockerfile.ingestor, or modify the Dockerfile COPY paths to
match build.context ./cloud_ingestor; update the docker-compose build block
(build.context and dockerfile) or adjust Dockerfile COPY references
(cloud_ingestor/kafka_config.py) so both environments use the same context.
In `@infra-gateway/on_prem_vault/Dockerfile.aggregator`:
- Around line 10-16: The aiokafka version is inconsistent (aiokafka==0.11.0 in
this Dockerfile vs 0.10.0 in repo requirements), causing divergent runtimes;
pick the canonical version used across the repo (or the secure/compatible one
you intend) and make versions consistent: update the RUN pip install line in
this Dockerfile (the aiokafka pin) to match the chosen version and then update
all corresponding requirements.txt (or the canonical requirements) for services
like on_prem_vault and cloud_ingestor so every deployment uses the same aiokafka
version; ensure you run dependency tests after the change to verify
compatibility.
In `@infra-gateway/on_prem_vault/sync_service.py`:
- Around line 108-113: The current code disables TLS verification by setting
session.verify = False when TLS_CA_PATH is missing; instead, change this to fail
fast in non-development environments: check an explicit dev flag (e.g.,
os.environ["ENV"] == "development" or a config like settings.DEV_MODE) and if
not in dev mode, log an error mentioning TLS_CA_PATH and raise an exception or
exit the process (RuntimeError/SystemExit) so the service won’t start without a
CA; only allow session.verify = False when the explicit dev flag is true.
In `@infra-gateway/requirements.txt`:
- Around line 10-22: The requirements lock pulls vulnerable transitive packages;
update requirements.txt to remove the strict fastapi==0.111.0 pin (or bump it to
a FastAPI release that allows Starlette >=0.47.2) and explicitly raise the
transitive packages: set python-multipart>=0.0.22, orjson>=3.11.6, and
ujson>=5.12.0; ensure pydantic remains compatible (pydantic>=2.10,<3) and then
run your dependency resolver/lockfile to verify starlette resolves to >=0.47.2
and no vulnerable versions remain before merging.
In `@infra-gateway/security/rbac_config.json`:
- Around line 81-85: The wildcard permission "*" in the NEXUS_SUPER_ADMIN role
can accidentally override global forbidden rules (e.g., the raw-event API export
forbidden entry), so update the RBAC config to enforce deny-overrides-allow
semantics: either add a global directive like "deny_overrides_allow": true (or
"deny_precedence": "deny_overrides_allow") at the top-level of the JSON, or
explicitly add the raw-event export resource/action to the NEXUS_SUPER_ADMIN
"forbidden" array (and repeat for other roles that use "*"); reference the
NEXUS_SUPER_ADMIN role, the permission "*" entry, and the raw-event export
forbidden rule when making the change.
- Around line 91-117: The ingest path
(infra-gateway/cloud_ingestor/kafka_config.py) must load and enforce the RBAC
rules from rbac_config.json: update the Bearer auth flow (e.g.,
authenticate_bearer / validate_token) to resolve caller identity and role, then
in the message processing path (e.g., CloudIngestor.handle_message or
route_to_topic) enforce tenant isolation by rejecting or filtering messages that
don't match caller_tenant_id (inject/validate tenant_id on the message and
drop/transform if mismatch), strip PII fields (user_hash, session_id) from
payloads before any downstream routing, and emit audit logs + return appropriate
denial responses (HTTP 403 or a rejected message/error event) when
enforcement_rules are violated; ensure the code reads rbac_config.json once at
startup and uses its enforcement/violation_action values to drive behavior and
logging (use audit_logger/audit_event helpers).
---
Nitpick comments:
In `@infra-gateway/cloud_ingestor/Dockerfile.ingestor`:
- Around line 11-18: The Dockerfile currently pins aiokafka==0.11.0 while
requirements.txt pins aiokafka==0.10.0; pick one canonical source and make them
consistent: either update requirements.txt to aiokafka==0.11.0 or change the
Dockerfile to install from requirements.txt (remove the explicit aiokafka pin
and run pip install -r requirements.txt) so both local and container installs
use the same aiokafka version; adjust the chosen file (requirements.txt or
Dockerfile.ingestor) and ensure any CI/build steps use the same reference.
- Around line 22-24: The Dockerfile uses shell-form CMD which prevents signal
propagation; replace the shell form CMD that runs "uvicorn kafka_config:app
--host 0.0.0.0 --port ${PORT:-8080} --workers 2" with the exec/JSON form so the
shell is not spawned and signals reach uvicorn directly (keep the same arguments
and env var substitution semantics), updating the CMD instruction in the
Dockerfile.ingestor near the existing USER nexus line.
In `@infra-gateway/cloud_ingestor/requirements.txt`:
- Around line 1-22: This requirements file is duplicated across three manifests
and mismatches Dockerfile pins for aiokafka; consolidate by creating a single
shared requirements/constraints file (e.g., a central requirements.txt or
constraints.txt) and update these three copies to reference it via -r or -c,
ensure the shared file pins aiokafka to the Dockerfile-compatible version
(aiokafka==0.11.0) and keep the other packages (requests, fastapi, uvicorn,
pydantic, jsonschema, pytest, pytest-asyncio, httpx) as needed so all three
manifests pull from the same source of truth.
In `@infra-gateway/docker-compose.yml`:
- Line 183: The docker-compose service command overrides the Dockerfile CMD by
starting uvicorn with "--workers 4" while the Dockerfile's CMD uses "--workers
2", causing an inconsistency; fix by either removing the overridden command line
from docker-compose.yml so the container uses the Dockerfile CMD, or update the
Dockerfile CMD to match the compose override (change the CMD invoking "uvicorn
kafka_config:app --host 0.0.0.0 --port 8080 --workers 4") so both specify the
same worker count.
In `@infra-gateway/on_prem_vault/aggregator.py`:
- Around line 92-106: The get_cpu_usage function currently reads /proc/stat once
and returns cumulative since boot; change it to perform two readings of the CPU
fields (parse the same fields from /proc/stat in function get_cpu_usage),
separated by a short sleep (e.g., 0.1-0.5s), compute delta_total and delta_idle
and return 100.0 * (1 - delta_idle / delta_total) when delta_total > 0; preserve
the FileNotFoundError fallback to return 0.0 and ensure parsing still uses
fields[3] as idle and sum(fields) as total for both samples.
- Around line 137-139: The current rows construction uses dict(zip(columns,
row)) which can silently drop values if cursor.description and row lengths
differ; update the list comprehension in aggregator.py that builds rows (the
variables columns, cursor.description and the comprehension using
cursor.fetchall()) to use zip(columns, row, strict=True) so mismatches raise an
error and surface the issue during execution.
In `@infra-gateway/on_prem_vault/Dockerfile.sync`:
- Around line 1-22: The Dockerfile currently pins Python packages inline in the
RUN pip install ... layer, which duplicates dependency data and complicates
updates; replace the inline pip install list with a single COPY of the existing
requirements.txt into the image and run pip install --no-cache-dir -r
requirements.txt (keep existing flags like --no-cache-dir and the slim base,
preserve USER nexus and CMD "python sync_service.py"); update the Dockerfile
steps around pip install and COPY to ensure requirements.txt is added before pip
install so the build uses the file and removes duplicated version management.
- Around line 10-16: The Dockerfile's pip install line installs unused packages
and a conflicting aiokafka version; update the RUN pip install block in
infra-gateway/on_prem_vault/Dockerfile.sync to only install the real runtime
dependency used by sync_service.py (requests==2.31.0) and either remove
aiokafka/fastapi/uvicorn/jsonschema entirely or change aiokafka to match the
version in requirements.txt (aiokafka==0.10.0) if other services require it;
ensure the final pip install list mirrors requirements.txt to avoid version
drift and reduce image size.
In `@infra-gateway/on_prem_vault/requirements.txt`:
- Around line 19-22: The requirements.txt currently includes dev/test packages
(pytest, pytest-asyncio, httpx); extract these into a new requirements-dev.txt
that begins with "-r requirements.txt" and lists pytest==8.2.0,
pytest-asyncio==0.23.6, and httpx==0.27.0, then remove those three entries from
infra-gateway/on_prem_vault/requirements.txt so production installs no longer
pull test/dev deps.
In `@infra-gateway/on_prem_vault/sync_service.py`:
- Around line 190-192: The list comprehension building rows uses
dict(zip(columns, row)) which silently truncates or misaligns when
cursor.description (used to build columns) and each row have different lengths;
update the comprehension to use zip(columns, row, strict=True) so a ValueError
is raised on length mismatch, i.e., replace dict(zip(columns, row)) with
dict(zip(columns, row, strict=True)) where columns is built from
cursor.description and rows come from cursor.fetchall().
In `@infra-gateway/tests/conftest.py`:
- Around line 1-4: Remove the redundant sys.path modifications in conftest.py:
delete the two sys.path.insert(0, os.path.join(_infra, "on_prem_vault")) and
sys.path.insert(0, os.path.join(_infra, "cloud_ingestor")) calls (the _infra
variable may remain if used elsewhere) so pytest.ini's pythonpath is the single
source of truth; ensure you only rely on pytest.ini for adding "on_prem_vault"
and "cloud_ingestor" to sys.path and do not re-add them programmatically.
In `@infra-gateway/tests/test_person_b.py`:
- Around line 241-242: Replace the fragile module-level patching of aiokafka in
test_person_b.py with a pytest fixture that applies the patches and reloads the
kafka_config module inside the patch context so imports are deterministic:
create an autouse fixture (e.g., mock_kafka) that patches
aiokafka.AIOKafkaProducer and aiokafka.admin.AIOKafkaAdminClient, imports and
importlib.reload(kafka_config) while patches are active, yields the module for
tests, and remove the top-level with patch(...) import kafka_config usage so
tests no longer depend on import order.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f7e47f96-ba65-49f9-ab53-5c8ac525cc32
📒 Files selected for processing (20)
.gitignoreinfra-gateway/cloud_ingestor/Dockerfile.ingestorinfra-gateway/cloud_ingestor/__init__.pyinfra-gateway/cloud_ingestor/kafka_config.pyinfra-gateway/cloud_ingestor/requirements.txtinfra-gateway/docker-compose.ymlinfra-gateway/on-prem-vault/sync_service.pyinfra-gateway/on_prem_vault/Dockerfile.aggregatorinfra-gateway/on_prem_vault/Dockerfile.syncinfra-gateway/on_prem_vault/__init__.pyinfra-gateway/on_prem_vault/aggregator.pyinfra-gateway/on_prem_vault/requirements.txtinfra-gateway/on_prem_vault/sync_service.pyinfra-gateway/pytest.iniinfra-gateway/render.yamlinfra-gateway/requirements.txtinfra-gateway/security/rbac_config.jsoninfra-gateway/test.jsoninfra-gateway/tests/conftest.pyinfra-gateway/tests/test_person_b.py
| KAFKA_BOOTSTRAP_SERVERS = os.environ.get("NEXUS_KAFKA_BROKERS", "kafka:9092") | ||
| KAFKA_REPLICATION_FACTOR = int(os.environ.get("NEXUS_KAFKA_REPLICATION", 1)) | ||
| KAFKA_RETENTION_MS = int(os.environ.get("NEXUS_KAFKA_RETENTION_MS", 604_800_000)) | ||
| API_KEY_SECRET = os.environ.get("NEXUS_API_KEY", "dev-secret-key-replace-in-prod") |
There was a problem hiding this comment.
Fail fast when NEXUS_API_KEY is unset.
The fallback secret makes a missing or misspelled env var equivalent to a public, known bearer token. For an internet-facing ingest API, that's an authentication bypass.
🔐 Proposed fix
-API_KEY_SECRET = os.environ.get("NEXUS_API_KEY", "dev-secret-key-replace-in-prod")
+API_KEY_SECRET = os.environ.get("NEXUS_API_KEY")
+if not API_KEY_SECRET:
+ raise RuntimeError("NEXUS_API_KEY must be set")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| KAFKA_BOOTSTRAP_SERVERS = os.environ.get("NEXUS_KAFKA_BROKERS", "kafka:9092") | |
| KAFKA_REPLICATION_FACTOR = int(os.environ.get("NEXUS_KAFKA_REPLICATION", 1)) | |
| KAFKA_RETENTION_MS = int(os.environ.get("NEXUS_KAFKA_RETENTION_MS", 604_800_000)) | |
| API_KEY_SECRET = os.environ.get("NEXUS_API_KEY", "dev-secret-key-replace-in-prod") | |
| KAFKA_BOOTSTRAP_SERVERS = os.environ.get("NEXUS_KAFKA_BROKERS", "kafka:9092") | |
| KAFKA_REPLICATION_FACTOR = int(os.environ.get("NEXUS_KAFKA_REPLICATION", 1)) | |
| KAFKA_RETENTION_MS = int(os.environ.get("NEXUS_KAFKA_RETENTION_MS", 604_800_000)) | |
| API_KEY_SECRET = os.environ.get("NEXUS_API_KEY") | |
| if not API_KEY_SECRET: | |
| raise RuntimeError("NEXUS_API_KEY must be set") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/cloud_ingestor/kafka_config.py` around lines 44 - 47,
API_KEY_SECRET currently falls back to a hardcoded default which silently allows
missing/misspelled env vars; change the code that defines API_KEY_SECRET to
require the NEXUS_API_KEY environment variable and fail fast if it's not set
(e.g., read via os.environ[...] or explicitly check
os.environ.get("NEXUS_API_KEY") and raise a RuntimeError/ValueError during
startup), so update the API_KEY_SECRET initialization to validate presence and
abort with a clear error message rather than using the
"dev-secret-key-replace-in-prod" default.
| KAFKA_REPLICATION_FACTOR = int(os.environ.get("NEXUS_KAFKA_REPLICATION", 1)) | ||
| KAFKA_RETENTION_MS = int(os.environ.get("NEXUS_KAFKA_RETENTION_MS", 604_800_000)) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "kafka_config.py" | head -20Repository: meharkp7/NEXUS
Length of output: 104
🏁 Script executed:
wc -l ./infra-gateway/cloud_ingestor/kafka_config.pyRepository: meharkp7/NEXUS
Length of output: 108
🏁 Script executed:
cat -n ./infra-gateway/cloud_ingestor/kafka_config.py | head -200Repository: meharkp7/NEXUS
Length of output: 8746
🏁 Script executed:
# Check aiokafka NewTopic API documentation or implementation
python3 << 'EOF'
try:
from aiokafka.admin import NewTopic
import inspect
sig = inspect.signature(NewTopic.__init__)
print("NewTopic.__init__ signature:")
print(sig)
print("\nNewTopic class docstring:")
print(NewTopic.__doc__)
except Exception as e:
print(f"Error: {e}")
EOFRepository: meharkp7/NEXUS
Length of output: 91
🏁 Script executed:
# Find requirements or setup files to confirm aiokafka version/dependency
fd -e txt -e txt -e cfg -e toml -e ini -e lock | grep -E "(requirements|setup|poetry|pipfile)" | head -20Repository: meharkp7/NEXUS
Length of output: 205
🏁 Script executed:
cat ./infra-gateway/cloud_ingestor/requirements.txtRepository: meharkp7/NEXUS
Length of output: 511
🌐 Web query:
aiokafka 0.10.0 NewTopic topic_configs API documentation
💡 Result:
In aiokafka 0.10.0, the NewTopic class from aiokafka.admin is used with AIOKafkaAdminClient.create_topics to create Kafka topics asynchronously. It supports specifying topic name, number of partitions, replication factor, and topic configurations. NewTopic constructor: - name (str): The topic name (required). - num_partitions (int): Number of partitions (default broker default). - replication_factor (int): Replication factor (default broker default). - topic_configs (dict[str, str] | None): Optional dictionary of topic configurations, e.g., {'cleanup.policy': 'compact', 'retention.ms': '86400000'}. Maps Kafka topic config names to string values. Example usage: from aiokafka.admin import AIOKafkaAdminClient, NewTopic import asyncio async def create_topic: admin = AIOKafkaAdminClient(bootstrap_servers='localhost:9092') await admin.start try: topic = NewTopic( name='my_topic', num_partitions=3, replication_factor=1, topic_configs={'cleanup.policy': 'delete', 'retention.ms': '3600000'} ) result = await admin.create_topics([topic]) print(result.topic_errors) finally: await admin.stop asyncio.run(create_topic) Official documentation for aiokafka 0.10.0 is at https://dokk.org/documentation/aiokafka/v0.10.0/, though detailed API pages focus on producer/consumer; admin client details are in source code and examples. The API mirrors kafka-python's NewTopic, which explicitly supports topic_configs for Kafka's CreateTopicsRequest configs. Usage confirmed in code examples and GitHub issues for version 0.10.0.
Citations:
- 1: https://dokk.org/documentation/aiokafka/v0.10.0/
- 2: https://pypi.org/project/aiokafka/0.10.0/
- 3: IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol. aio-libs/aiokafka#1008
- 4: https://dfrojas.com/software/creating-kafka-topics-with-aiokafka-python.html
- 5: admin client - failure to create topics (error code 41) aio-libs/aiokafka#995
- 6: https://github.com/aio-libs/aiokafka
🏁 Script executed:
# Verify the actual aiokafka source for NewTopic to confirm topic_configs support in 0.10.0
python3 << 'EOF'
import urllib.request
import json
# Fetch aiokafka 0.10.0 package info from PyPI
url = "https://pypi.org/pypi/aiokafka/0.10.0/json"
try:
with urllib.request.urlopen(url) as response:
data = json.loads(response.read().decode())
# Get the package details
info = data['info']
print(f"Package: {info['name']} {info['version']}")
print(f"Summary: {info['summary']}")
print(f"Home Page: {info['home_page']}")
print("\nReleased: ", info.get('release_url', 'N/A'))
except Exception as e:
print(f"Error fetching PyPI data: {e}")
EOFRepository: meharkp7/NEXUS
Length of output: 211
🏁 Script executed:
# Check if KAFKA_RETENTION_MS is used anywhere else in the file
rg "KAFKA_RETENTION_MS" ./infra-gateway/cloud_ingestor/kafka_config.pyRepository: meharkp7/NEXUS
Length of output: 139
🏁 Script executed:
# Check if KAFKA_RETENTION_MS is referenced anywhere else in the entire infra-gateway directory
rg "KAFKA_RETENTION_MS" ./infra-gateway/ -A 2 -B 2Repository: meharkp7/NEXUS
Length of output: 613
Apply KAFKA_RETENTION_MS when topics are created.
This value is parsed at line 46 but never passed into NewTopic, so new tenant topics inherit the broker default instead of the configured window. That can violate retention expectations without any visible config error.
Proposed fix
topics_to_create = [
- NewTopic(name=f"nexus.events.{tenant_id}", num_partitions=6,
- replication_factor=KAFKA_REPLICATION_FACTOR),
- NewTopic(name=f"nexus.packets.{tenant_id}", num_partitions=3,
- replication_factor=KAFKA_REPLICATION_FACTOR),
+ NewTopic(
+ name=f"nexus.events.{tenant_id}",
+ num_partitions=6,
+ replication_factor=KAFKA_REPLICATION_FACTOR,
+ topic_configs={"retention.ms": str(KAFKA_RETENTION_MS)},
+ ),
+ NewTopic(
+ name=f"nexus.packets.{tenant_id}",
+ num_partitions=3,
+ replication_factor=KAFKA_REPLICATION_FACTOR,
+ topic_configs={"retention.ms": str(KAFKA_RETENTION_MS)},
+ ),
]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/cloud_ingestor/kafka_config.py` around lines 45 - 46, The
KAFKA_RETENTION_MS constant is parsed but never applied to new topics; update
the topic creation code that constructs NewTopic (look for usages of NewTopic or
the function that creates tenant topics) to include the retention window by
adding the topic config "retention.ms" set to the KAFKA_RETENTION_MS value
(stringified) in the NewTopic config dict/parameters; ensure the NewTopic call
uses this config so newly created tenant topics use the configured retention
instead of the broker default.
| if KAFKA_USE_SSL: | ||
| ssl_ctx = _ssl.create_default_context() | ||
| ssl_ctx.load_verify_locations(cafile=KAFKA_SSL_CAFILE) | ||
| ssl_ctx.load_cert_chain(certfile=KAFKA_SSL_CERTFILE, keyfile=KAFKA_SSL_KEYFILE) | ||
| ssl_ctx.check_hostname = False | ||
| ssl_ctx.verify_mode = _ssl.CERT_REQUIRED |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
git ls-files | grep -E "kafka_config\.py"Repository: meharkp7/NEXUS
Length of output: 102
🏁 Script executed:
cat -n infra-gateway/cloud_ingestor/kafka_config.py | head -200Repository: meharkp7/NEXUS
Length of output: 8746
Keep broker hostname verification enabled.
Both SSL contexts set check_hostname = False (lines 110 and 183), which means the CA is trusted but the broker identity is not verified. This weakens the TLS guarantee for both produce and topic-admin calls, leaving the connection vulnerable to MITM attacks even with a valid certificate.
Remove check_hostname = False from both contexts to restore hostname verification:
🔒 Proposed fix
ssl_ctx.load_verify_locations(cafile=KAFKA_SSL_CAFILE)
ssl_ctx.load_cert_chain(certfile=KAFKA_SSL_CERTFILE, keyfile=KAFKA_SSL_KEYFILE)
- ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = _ssl.CERT_REQUIREDand
ssl_ctx.load_verify_locations(cafile=KAFKA_SSL_CAFILE)
ssl_ctx.load_cert_chain(certfile=KAFKA_SSL_CERTFILE, keyfile=KAFKA_SSL_KEYFILE)
- ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_REQUIRED📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if KAFKA_USE_SSL: | |
| ssl_ctx = _ssl.create_default_context() | |
| ssl_ctx.load_verify_locations(cafile=KAFKA_SSL_CAFILE) | |
| ssl_ctx.load_cert_chain(certfile=KAFKA_SSL_CERTFILE, keyfile=KAFKA_SSL_KEYFILE) | |
| ssl_ctx.check_hostname = False | |
| ssl_ctx.verify_mode = _ssl.CERT_REQUIRED | |
| if KAFKA_USE_SSL: | |
| ssl_ctx = _ssl.create_default_context() | |
| ssl_ctx.load_verify_locations(cafile=KAFKA_SSL_CAFILE) | |
| ssl_ctx.load_cert_chain(certfile=KAFKA_SSL_CERTFILE, keyfile=KAFKA_SSL_KEYFILE) | |
| ssl_ctx.verify_mode = _ssl.CERT_REQUIRED |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/cloud_ingestor/kafka_config.py` around lines 106 - 111, The TLS
contexts currently disable hostname verification by setting
ssl_ctx.check_hostname = False; locate the two SSL context constructions (the
one guarded by KAFKA_USE_SSL that assigns ssl_ctx and the similar one for the
admin/topic calls, e.g., KAFKA_ADMIN_USE_SSL) and remove the lines that set
check_hostname = False so hostname verification remains enabled (keep
ssl_ctx.load_verify_locations, load_cert_chain and ssl_ctx.verify_mode =
_ssl.CERT_REQUIRED unchanged).
| async def produce_to_kafka(topic: str, key: str, payload: dict) -> bool: | ||
| """Sends a message to a Kafka topic asynchronously. Returns True on success.""" | ||
| if kafka_producer is None: | ||
| log.error("Kafka producer not available.") | ||
| return False | ||
| try: | ||
| await kafka_producer.send_and_wait(topic, value=payload, key=key) | ||
| return True | ||
| except KafkaError as e: | ||
| log.error("Kafka produce error for topic %s: %s", topic, e) | ||
| return False |
There was a problem hiding this comment.
Don't return success when nothing reached Kafka.
produce_to_kafka() converts broker failures into False, but the routes still return 202/200 even when every message was rejected. Clients that key off HTTP status will stop retrying and lose data.
🚦 Proposed fix
`@app.post`("/api/v1/events", status_code=202)
async def ingest_cloud_events(
batch: EventBatchRequest,
authorization: Optional[str] = Header(None),
):
@@
verify_api_key(authorization)
+ if kafka_producer is None:
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail="Kafka producer unavailable",
+ )
queued, failed = 0, 0
@@
log.info("Cloud event batch: %d queued, %d failed.", queued, failed)
+ if queued == 0 and failed > 0:
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail="Failed to queue events",
+ )
return {"queued": queued, "failed": failed, "message": "Batch processed"}
@@
`@app.post`("/api/v1/ingest", response_model=IngestResponse)
async def ingest_onprem_packets(
batch: InsightPacketBatch,
authorization: Optional[str] = Header(None),
):
@@
verify_api_key(authorization)
+ if kafka_producer is None:
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail="Kafka producer unavailable",
+ )
acknowledged_ids, rejected_count = [], 0
@@
log.info("On-prem batch [%s]: %d ACKed, %d rejected.", batch.batch_id, len(acknowledged_ids), rejected_count)
+ if not acknowledged_ids and rejected_count > 0:
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail="Failed to queue packets",
+ )
return IngestResponse(Also applies to: 300-325, 328-364
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/cloud_ingestor/kafka_config.py` around lines 203 - 213,
produce_to_kafka currently swallows broker failures and returns False so
upstream HTTP routes still return 200/202; change it to surface failures by
logging and re-raising the exception (or raising a custom exception) instead of
returning success/False so callers can return an error status; specifically
update produce_to_kafka to catch KafkaError only to log and then raise, ensure
the await kafka_producer.send_and_wait call is not treated as success on
exception, and update the other similar send helper blocks in the same module
(the other produce/send wrappers around kafka_producer.send_and_wait) to follow
the same pattern so HTTP handlers can convert exceptions into 5xx/503 responses.
| @field_validator("user_hash") | ||
| @classmethod | ||
| def validate_user_hash(cls, v): | ||
| if v and len(v) != 64: | ||
| raise ValueError("user_hash must be a 64-character SHA-256 hex string") | ||
| return v |
There was a problem hiding this comment.
Validate user_hash as hex, not just length.
Any 64-character string currently passes, even though the field claims to be a SHA-256 hex digest.
🔎 Proposed fix
`@field_validator`("user_hash")
`@classmethod`
def validate_user_hash(cls, v):
- if v and len(v) != 64:
+ if v and (len(v) != 64 or any(ch not in "0123456789abcdefABCDEF" for ch in v)):
raise ValueError("user_hash must be a 64-character SHA-256 hex string")
return v🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/cloud_ingestor/kafka_config.py` around lines 234 - 239, The
validate_user_hash field validator (validate_user_hash on user_hash) only checks
length; update it to also verify the string contains only hex digits (0-9, a-f,
A-F) to ensure it's a valid SHA-256 hex digest; modify the validator to return v
only if v is None or (len(v) == 64 and all characters are hex) — use a regex
like r'^[0-9a-fA-F]{64}$' or Python's string.hexdigits check and raise the same
ValueError message when it fails.
| RUN pip install --no-cache-dir \ | ||
| requests==2.31.0 \ | ||
| aiokafka==0.11.0 \ | ||
| fastapi==0.111.0 \ | ||
| "uvicorn[standard]==0.29.0" \ | ||
| "pydantic>=2.10,<3" \ | ||
| jsonschema==4.22.0 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Compare aiokafka pins across repo manifests.
rg -n -C1 'aiokafka=='Repository: meharkp7/NEXUS
Length of output: 1302
Dependency version mismatch: aiokafka inconsistent across Dockerfiles and requirements files.
This Dockerfile pins aiokafka==0.11.0, but corresponding requirements.txt files throughout the repository pin 0.10.0. The same pattern appears across multiple services (on_prem_vault, cloud_ingestor). This divergence creates inconsistent runtime behavior and complicates security patching across deployments.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/on_prem_vault/Dockerfile.aggregator` around lines 10 - 16, The
aiokafka version is inconsistent (aiokafka==0.11.0 in this Dockerfile vs 0.10.0
in repo requirements), causing divergent runtimes; pick the canonical version
used across the repo (or the secure/compatible one you intend) and make versions
consistent: update the RUN pip install line in this Dockerfile (the aiokafka
pin) to match the chosen version and then update all corresponding
requirements.txt (or the canonical requirements) for services like on_prem_vault
and cloud_ingestor so every deployment uses the same aiokafka version; ensure
you run dependency tests after the change to verify compatibility.
| else: | ||
| log.warning( | ||
| "CA cert not found at %s — TLS verification disabled (dev mode only!).", | ||
| TLS_CA_PATH | ||
| ) | ||
| session.verify = False |
There was a problem hiding this comment.
Security: Disabling TLS verification in production could be dangerous.
When the CA cert is missing, session.verify = False is set, which disables all TLS certificate verification. While intended for dev mode, this could accidentally run in production if cert provisioning fails, exposing the service to MITM attacks.
🔒 Proposed fix: Fail fast if CA cert is missing in production
# Verify cloud server against our CA bundle
if os.path.exists(TLS_CA_PATH):
session.verify = TLS_CA_PATH
else:
- log.warning(
- "CA cert not found at %s — TLS verification disabled (dev mode only!).",
- TLS_CA_PATH
- )
- session.verify = False
+ # Only allow insecure mode in explicit dev environments
+ if os.environ.get("NEXUS_DEV_MODE", "").lower() == "true":
+ log.warning(
+ "CA cert not found at %s — TLS verification disabled (dev mode).",
+ TLS_CA_PATH
+ )
+ session.verify = False
+ else:
+ raise RuntimeError(
+ f"CA cert not found at {TLS_CA_PATH}. "
+ "Set NEXUS_DEV_MODE=true to disable TLS verification in development."
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| else: | |
| log.warning( | |
| "CA cert not found at %s — TLS verification disabled (dev mode only!).", | |
| TLS_CA_PATH | |
| ) | |
| session.verify = False | |
| else: | |
| # Only allow insecure mode in explicit dev environments | |
| if os.environ.get("NEXUS_DEV_MODE", "").lower() == "true": | |
| log.warning( | |
| "CA cert not found at %s — TLS verification disabled (dev mode).", | |
| TLS_CA_PATH | |
| ) | |
| session.verify = False | |
| else: | |
| raise RuntimeError( | |
| f"CA cert not found at {TLS_CA_PATH}. " | |
| "Set NEXUS_DEV_MODE=true to disable TLS verification in development." | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/on_prem_vault/sync_service.py` around lines 108 - 113, The
current code disables TLS verification by setting session.verify = False when
TLS_CA_PATH is missing; instead, change this to fail fast in non-development
environments: check an explicit dev flag (e.g., os.environ["ENV"] ==
"development" or a config like settings.DEV_MODE) and if not in dev mode, log an
error mentioning TLS_CA_PATH and raise an exception or exit the process
(RuntimeError/SystemExit) so the service won’t start without a CA; only allow
session.verify = False when the explicit dev flag is true.
| fastapi==0.111.0 | ||
| uvicorn[standard]==0.29.0 | ||
|
|
||
| # ✅ Updated for Python 3.13 compatibility | ||
| pydantic>=2.10,<3 | ||
|
|
||
| # JSON schema validation | ||
| jsonschema==4.22.0 | ||
|
|
||
| # Dev/testing only | ||
| pytest==8.2.0 | ||
| pytest-asyncio==0.23.6 | ||
| httpx==0.27.0 No newline at end of file |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For Python dependencies fastapi==0.111.0 and uvicorn[standard]==0.29.0, what transitive versions of starlette, python-multipart, orjson, and ujson are resolved, and what are the minimum fixed versions for GHSA-2c2j-9gv5-cj73, GHSA-f96h-pmfr-66vw, GHSA-59g5-xgcq-4qw3, GHSA-wp53-j4wj-2cfg, GHSA-hx9q-6w63-j58v, GHSA-pwr2-4v36-6qpr, GHSA-c8rr-9gxc-jprv, GHSA-wgvc-ghv9-3pmm?
💡 Result:
With fastapi==0.111.0 and uvicorn[standard]==0.29.0, pip resolves the following transitive versions for the specified packages (based on dependency constraints from PyPI metadata, GitHub discussions showing real-world installs, and release notes): - starlette: 0.37.2 (direct dependency of fastapi==0.111.0: >=0.37.2,<0.38.0) - python-multipart: 0.0.9 (part of fastapi[standard] extras >=0.0.7; observed in installs) - orjson: 3.10.4 (optional extra >=3.2.1 in fastapi[standard]; observed in installs) - ujson: 5.10.0 (optional extra >=4.0.1 with exclusions; observed in installs) These are the latest compatible versions pip would select as of the package release dates (FastAPI 0.111.0 ~May 2024, Uvicorn 0.29.0 March 2024). Uvicorn[standard] does not directly depend on these packages. No sources explicitly state minimum fixed versions for the listed GitHub advisories (GHSA--), as searches returned unrelated vulnerabilities (e.g., fast-xml-parser, .NET). These GHSA IDs likely relate to CVEs in starlette/python-multipart/orjson/ujson, but no matching advisory details or fixed versions were found in results. Recommend checking GitHub Advisory Database directly for each ID or running pip-audit on the resolved lockfile.
Citations:
- 1: https://pypi.org/project/fastapi/0.111.1/
- 2: FastAPI 0.110.3 "extra" dependencies are now in default dependencies for 0.111.0 fastapi/fastapi#11567
- 3: Instructions in the "Run a FastAPI app" speedrun differ from those in the repo's readme and neither of them work superfly/docs#1965
- 4: https://github.com/fastapi/fastapi/blob/master/pyproject.toml
- 5: https://github.com/encode/uvicorn/blob/master/pyproject.toml
- 6: https://newreleases.io/project/github/fastapi/fastapi/release/0.111.0
- 7: https://uvicorn.dev/release-notes/
🌐 Web query:
GHSA-2c2j-9gv5-cj73 Python CVE starlette python-multipart orjson ujson
💡 Result:
GHSA-2c2j-9gv5-cj73 is a security vulnerability in the Starlette Python web framework (CVE-2025-54121), published July 2025. It allows a possible denial-of-service (DoS) when parsing multipart forms with large files exceeding the default max spool size. Starlette blocks the main event loop thread while rolling over the file to disk, preventing new connections. Affected: Starlette versions < 0.47.2 (specifically up to 0.47.1). Patched: Starlette 0.47.2 via commit Kludex/starlette@9f7ec2e, fixing a bug in UploadFile._in_memory property check and write method. CVSS v3.1: 5.3 MEDIUM (AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:L) CWE-770: Allocation of Resources Without Limits or Throttling. No direct relation to python-multipart (used by Starlette for multipart parsing, has separate vulns like GHSA-2jv5-9r88-3w3p ReDoS), orjson, or ujson (JSON libraries with their own unrelated vulns like recursion limits or memory leaks). The query likely references ecosystem packages involved in file/form handling in Starlette/FastAPI apps. Impact low; requires large file uploads via multipart forms (e.g., request.form or UploadFile). Upgrade to Starlette >=0.47.2.
Citations:
- 1: GHSA-2c2j-9gv5-cj73
- 2: https://nvd.nist.gov/vuln/detail/CVE-2025-54121
- 3: https://cve.circl.lu/vuln/ghsa-2c2j-9gv5-cj73
- 4: GHSA-2c2j-9gv5-cj73
- 5: https://osv.dev/GHSA-2c2j-9gv5-cj73
🌐 Web query:
GitHub Advisory Database GHSA-f96h-pmfr-66vw GHSA-59g5-xgcq-4qw3 GHSA-wp53-j4wj-2cfg
💡 Result:
The GitHub Advisory Database entries GHSA-f96h-pmfr-66vw, GHSA-59g5-xgcq-4qw3, and GHSA-wp53-j4wj-2cfg are all security vulnerabilities related to handling multipart/form-data in Python web frameworks, specifically affecting Starlette/FastAPI applications. They form a chain of related DoS and file handling issues. GHSA-f96h-pmfr-66vw (Starlette DoS via multipart/form-data, CVE-2024-47874, published Oct 15, 2024): - Package: starlette (pip) < 0.40.0 - Patched: >= 0.40.0 - Severity: High (8.7 CVSS) - Issue: Buffers large multipart parts without filename in byte strings, leading to memory exhaustion DoS. - Affected: All Starlette/FastAPI apps accepting form requests. - CWE-770 (Allocation of Resources Without Limits or Throttling) GHSA-59g5-xgcq-4qw3 (python-multipart DoS via malformed multipart boundary, CVE-2024-53981, published Nov 30, 2024): - Package: python-multipart (pip) < 0.0.18 - Patched: >= 0.0.18 - Severity: High (7.5-8.7 CVSS) - Issue: Skips malformed line breaks or trailing bytes one-by-one with logging, causing high CPU and event loop stall DoS. - Affected: Apps using python-multipart (e.g., FastAPI, Starlette). - CWE-770 GHSA-wp53-j4wj-2cfg (python-multipart Arbitrary File Write via Path Traversal, CVE-2026-24486, published Jan 25, 2026): - Package: python-multipart (pip) < 0.0.22 - Patched: >= 0.0.22 - Severity: High (8.6 CVSS) - Issue: Path traversal when using non-default UPLOAD_DIR and UPLOAD_KEEP_FILENAME=True, allowing arbitrary file writes. - Affected: Apps with those config options enabled. - CWE-22 (Path Traversal); requires specific non-default config. Mitigation: Update starlette to >=0.40.0 and python-multipart to >=0.0.22. Avoid non-default file upload configs if possible. These primarily impact ASGI apps like FastAPI processing untrusted multipart uploads.
Citations:
- 1: GHSA-f96h-pmfr-66vw
- 2: https://osv.dev/vulnerability/GHSA-f96h-pmfr-66vw
- 3: GHSA-59g5-xgcq-4qw3
- 4: https://www.sentinelone.com/vulnerability-database/cve-2024-53981/
- 5: GHSA-59g5-xgcq-4qw3
- 6: GHSA-wp53-j4wj-2cfg
- 7: https://osv.dev/GHSA-wp53-j4wj-2cfg
- 8: https://nvd.nist.gov/vuln/detail/CVE-2026-24486
🌐 Web query:
pip-audit starlette 0.37.2 python-multipart 0.0.9 orjson ujson vulnerabilities
💡 Result:
pip-audit is a tool for auditing Python packages against known vulnerabilities listed in the PyPA advisory database and OSV. No vulnerabilities are reported for starlette==0.37.2, python-multipart==0.0.9, orjson (any recent version like 3.11+), or ujson in pip-audit's primary sources as of 2026-04-03. However, third-party scanners like Snyk report potential issues for older versions: - starlette 0.37.2: No direct vulnerabilities listed; earlier versions (<0.40.0, <0.47.2, <0.49.1) had ReDoS and resource allocation issues in multipart/form-data and FileResponse, fixed before 0.37.2 (released Mar 2024). - python-multipart 0.0.9: Vulnerable to high-severity issues including Directory Traversal (fixed in 0.0.22), Allocation of Resources Without Limits (fixed in 0.0.18), and ReDoS in Content-Type header (CVE-2024-24762, fixed later). These affect versions <0.0.18/0.0.22. - orjson: Vulnerabilities like CVE-2025-67221 (unbounded recursion in dumps, affects <=3.11.4) and CVE-2024-27454 (parsing recursion, fixed in 3.9.15) exist but only for specific older versions; recent versions (3.11.7+) are clean per Snyk. - ujson: No vulnerabilities mentioned in results. To verify with pip-audit, run: pip install pip-audit; pip-audit starlette==0.37.2 python-multipart==0.0.9 orjson ujson. If issues appear, upgrade python-multipart to >=0.0.22. No PyPA advisories found for these exact versions/packages, indicating clean per official Python audit DB.
Citations:
- 1: https://github.com/pypa/pip-audit
- 2: https://security.snyk.io/package/pip/starlette/0.37.0
- 3: https://security.snyk.io/package/pip/starlette/versions?page=3
- 4: https://security.snyk.io/package/pip/python-multipart/0.0.9
- 5: https://github.com/advisories/GHSA-2jv5-9r88-3w3p/dependabot
- 6: https://nvd.nist.gov/vuln/detail/CVE-2025-67221
- 7: https://security.snyk.io/package/pip/orjson
- 8: https://advisories.gitlab.com/pkg/pypi/starlette/
- 9: https://security.snyk.io/package/pip/starlette
- 10: https://security.snyk.io/package/pip/python-multipart
- 11: https://github.com/pypa/advisory-database
🌐 Web query:
GHSA-hx9q-6w63-j58v GHSA-pwr2-4v36-6qpr GHSA-c8rr-9gxc-jprv GHSA-wgvc-ghv9-3pmm CVE
💡 Result:
These are four GitHub Security Advisories (GHSA) related to vulnerabilities in fast JSON libraries for Python, all published in early 2026. No direct CVE for GHSA-hx9q-6w63-j58v was found in search results, but the others have assigned CVEs. GHSA-hx9q-6w63-j58v: orjson does not limit recursion for deeply nested JSON documents. Affects orjson. Allows potential DoS via stack overflow from deeply nested structures. Published Jan 2026. Official advisory at GitHub Advisory Database. GHSA-pwr2-4v36-6qpr: Not found in searches. Likely another ujson or similar JSON library vulnerability, possibly unpublished or internal. No details or CVE identified. GHSA-c8rr-9gxc-jprv: Integer overflow handling large indent leads to buffer overflow or infinite loop in ujson.dumps. Affects ujson >=5.1 <=5.11.0. Patched in 5.12.0. CVE-2026-32875. High severity (CVSS 7.5). Allows DoS (segfault or infinite loop) if untrusted control over indent. GHSA-wgvc-ghv9-3pmm: Memory leak parsing large integers allows DoS in ujson.load(s)/decode. Affects ujson >=5.4 <=5.11.0. Patched in 5.12.0. CVE-2026-32874. High severity (CVSS 7.5). Accumulating memory leak from out-of-range integers. These ujson issues were fixed in v5.12.0 (released Mar 2026). Upgrade recommended. All primarily DoS vectors via untrusted JSON input.
Citations:
- 1: GHSA-hx9q-6w63-j58v
- 2: https://github.com/github/advisory-database/blob/main/advisories/github-reviewed/2026/01/GHSA-hx9q-6w63-j58v/GHSA-hx9q-6w63-j58v.json
- 3: https://osv.dev/vulnerability/GHSA-c8rr-9gxc-jprv
- 4: GHSA-wgvc-ghv9-3pmm
- 5: GHSA-c8rr-9gxc-jprv
- 6: https://github.com/ultrajson/ultrajson/releases/tag/5.12.0
- 7: https://advisory.eventussecurity.com/advisory/ultrajson-vulnerabilities-originate-buffer-overflow-and-memory-leak/
🌐 Web query:
GHSA-hx9q-6w63-j58v orjson affected versions 3.10 fixed minimum
💡 Result:
GHSA-hx9q-6w63-j58v (CVE-2025-67221) affects orjson versions up to and including 3.11.5. The vulnerability is a denial-of-service issue where orjson.dumps does not limit recursion for deeply nested JSON documents, leading to crashes or high resource usage on deeply nested inputs. It was fixed in version 3.11.6, released on 2026-01-29, with the fix commit 62bb185b70785ded49c79c26f8c9781f1e6fe370 addressing sporadic crashes during serialization of deeply nested list-of-dict structures.
Citations:
- 1: GHSA-hx9q-6w63-j58v
- 2: https://mondoo.com/vulnerability-intelligence/vulnerability/GHSA-hx9q-6w63-j58v
- 3: https://advisories.gitlab.com/pkg/pypi/orjson/CVE-2025-67221/
- 4: https://nvd.nist.gov/vuln/detail/CVE-2025-67221
- 5: https://osv.dev/vulnerability/CVE-2025-67221
- 6: https://security.snyk.io/vuln/SNYK-PYTHON-ORJSON-15123465
- 7: ijl/orjson@62bb185
Transitive dependencies are vulnerable to multiple flagged CVEs; versions must be upgraded before release.
The resolved versions contain vulnerabilities in Starlette, python-multipart, orjson, and ujson:
- starlette 0.37.2: Vulnerable to GHSA-2c2j-9gv5-cj73 (DoS, <0.47.2) and GHSA-f96h-pmfr-66vw (multipart DoS, <0.40.0). Requires ≥0.47.2.
- python-multipart 0.0.9: Vulnerable to GHSA-59g5-xgcq-4qw3 (DoS, <0.0.18) and GHSA-wp53-j4wj-2cfg (path traversal, <0.0.22). Requires ≥0.0.22.
- orjson 3.10.4: Vulnerable to GHSA-hx9q-6w63-j58v (recursion DoS, ≤3.11.5). Requires ≥3.11.6.
- ujson 5.10.0: Vulnerable to GHSA-c8rr-9gxc-jprv (buffer overflow, 5.1-5.11.0) and GHSA-wgvc-ghv9-3pmm (memory leak, 5.4-5.11.0). Requires ≥5.12.0.
Note: fastapi==0.111.0 pins starlette to >=0.37.2,<0.38.0, locking in the vulnerable version. Unpin fastapi or upgrade to a newer version that allows starlette ≥0.47.2. (GHSA-pwr2-4v36-6qpr could not be verified as a valid advisory.)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/requirements.txt` around lines 10 - 22, The requirements lock
pulls vulnerable transitive packages; update requirements.txt to remove the
strict fastapi==0.111.0 pin (or bump it to a FastAPI release that allows
Starlette >=0.47.2) and explicitly raise the transitive packages: set
python-multipart>=0.0.22, orjson>=3.11.6, and ujson>=5.12.0; ensure pydantic
remains compatible (pydantic>=2.10,<3) and then run your dependency
resolver/lockfile to verify starlette resolves to >=0.47.2 and no vulnerable
versions remain before merging.
| "NEXUS_SUPER_ADMIN": { | ||
| "description": "NEXUS platform engineers only. Full access for debugging. All actions are audited.", | ||
| "data_scope": "ALL", | ||
| "permissions": ["*"], | ||
| "forbidden": [], |
There was a problem hiding this comment.
Define explicit deny-precedence semantics for wildcard permissions.
Line 84 grants "*" to NEXUS_SUPER_ADMIN, while Lines 103-104 state raw-event API export is forbidden for all roles. Without explicit deny-overrides-allow semantics, evaluators may interpret wildcard allow as bypass.
🔧 Suggested config hardening
"enforcement_rules": {
+ "evaluation_order": "DENY_OVERRIDES_ALLOW",
"tenant_isolation": {Also applies to: 102-105
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/security/rbac_config.json` around lines 81 - 85, The wildcard
permission "*" in the NEXUS_SUPER_ADMIN role can accidentally override global
forbidden rules (e.g., the raw-event API export forbidden entry), so update the
RBAC config to enforce deny-overrides-allow semantics: either add a global
directive like "deny_overrides_allow": true (or "deny_precedence":
"deny_overrides_allow") at the top-level of the JSON, or explicitly add the
raw-event export resource/action to the NEXUS_SUPER_ADMIN "forbidden" array (and
repeat for other roles that use "*"); reference the NEXUS_SUPER_ADMIN role, the
permission "*" entry, and the raw-event export forbidden rule when making the
change.
| "enforcement_rules": { | ||
| "tenant_isolation": { | ||
| "description": "NEXUS_TENANT_ADMIN can NEVER access another tenant's data.", | ||
| "implementation": "Inject 'WHERE tenant_id = :caller_tenant_id' into ALL data queries when role is NEXUS_TENANT_ADMIN. Enforced in the data layer, not just the API layer.", | ||
| "violation_action": "HTTP 403 + audit log entry" | ||
| }, | ||
| "pii_access": { | ||
| "description": "No role can access raw user_hash values or session_ids via the dashboard.", | ||
| "implementation": "Strip user_hash and session_id fields in the API serialization layer before any response is sent.", | ||
| "violation_action": "Strip field silently + audit log warning" | ||
| }, | ||
| "raw_event_access": { | ||
| "description": "No role (including SUPER_ADMIN) can bulk-export raw_events via the API.", | ||
| "implementation": "The /api/v1/raw-events endpoint is disabled. Raw data access requires direct DB access with MFA.", | ||
| "violation_action": "HTTP 403 + alert to compliance officer" | ||
| } | ||
| }, | ||
|
|
||
| "telemetry_consent": { | ||
| "description": "Per-tenant opt-out control. When a tenant opts out, telemetry collection stops immediately.", | ||
| "states": { | ||
| "FULL_OPT_IN": "All telemetry collected and synced to cloud.", | ||
| "ON_PREM_ONLY": "Telemetry collected but InsightPackets NOT synced to cloud. Stays on-prem only.", | ||
| "OPT_OUT": "Ghost SDK stops buffering. No data collected. On-prem vault is paused." | ||
| }, | ||
| "enforcement": "On consent change, push new state to Ghost SDK via a signed config packet. SDK must re-check consent on every startup." | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether rbac_config.json is actually loaded/enforced anywhere.
rg -n -C3 'rbac_config\.json|enforcement_rules|tenant_isolation|pii_access|raw_event_access|NEXUS_TENANT_ADMIN|NEXUS_SUPER_ADMIN'Repository: meharkp7/NEXUS
Length of output: 3059
🏁 Script executed:
# Search for imports/references to rbac_config or where this JSON is loaded
rg -l 'rbac_config|load.*config|from_config' --type py --type ts --type jsRepository: meharkp7/NEXUS
Length of output: 74
🏁 Script executed:
# Check if kafka_config.py exists and examine it
fd 'kafka_config' -type fRepository: meharkp7/NEXUS
Length of output: 228
🏁 Script executed:
# Search for tenant_isolation or tenant filtering in data layer
rg -n 'tenant_id.*filter|WHERE.*tenant_id|tenant_isolation' --type py --type tsRepository: meharkp7/NEXUS
Length of output: 184
🏁 Script executed:
# Find kafka_config file (fix fd command - remove --type)
fd kafka_configRepository: meharkp7/NEXUS
Length of output: 102
🏁 Script executed:
# Check ai-brain/main.py to see if/how rbac_config is loaded
cat -n ai-brain/main.py | head -100Repository: meharkp7/NEXUS
Length of output: 3760
🏁 Script executed:
# Search for where rbac_config.json is actually loaded
rg -n 'rbac_config|json\.load|open.*config' | head -30Repository: meharkp7/NEXUS
Length of output: 410
🏁 Script executed:
# Look for enforcement implementation - search for tenant_id filter logic
rg -n 'caller_tenant_id|inject.*tenant|tenant_id.*filter' -A 2 -B 2Repository: meharkp7/NEXUS
Length of output: 1057
🏁 Script executed:
# Read the actual kafka_config.py file
cat infra-gateway/cloud_ingestor/kafka_config.pyRepository: meharkp7/NEXUS
Length of output: 12759
🏁 Script executed:
# Search specifically for rbac_config being imported or loaded
rg -n 'rbac_config|import.*security' infra-gateway/ -A 3Repository: meharkp7/NEXUS
Length of output: 40
🏁 Script executed:
# Search for actual tenant filtering in query/data layer
rg -n 'def.*filter|tenant_id.*=' infra-gateway/ --type pyRepository: meharkp7/NEXUS
Length of output: 1526
🏁 Script executed:
# Check sync_service.py to see if it applies any RBAC/tenant filtering
cat -n infra-gateway/on_prem_vault/sync_service.py | head -100Repository: meharkp7/NEXUS
Length of output: 4599
🏁 Script executed:
# Search entire codebase for any reference to rbac_config
rg 'rbac_config|enforcement_rules|NEXUS_TENANT_ADMIN|NEXUS_SUPER_ADMIN' --type py -lRepository: meharkp7/NEXUS
Length of output: 40
🏁 Script executed:
# Check if any Python file loads the security config from JSON
rg 'security|rbac|config.*load|json\.load.*security' infra-gateway/ --type py -B 2 -A 2Repository: meharkp7/NEXUS
Length of output: 861
RBAC policy in rbac_config.json is not enforced by the ingest path.
The enforcement_rules in rbac_config.json (lines 91–117) describe required controls: tenant isolation via WHERE tenant_id filters, PII field stripping (user_hash, session_id), and raw event access restrictions. However, infra-gateway/cloud_ingestor/kafka_config.py implements only simple Bearer token authentication and does not load or apply this config. It routes messages to tenant-isolated Kafka topics but never:
- Validates the caller's role against RBAC rules
- Applies enforcement rules (query filtering, field stripping)
- Implements violation actions (HTTP 403, audit logs)
This creates a compliance gap: policy declares enforcement at the data and API layers, but the ingest code path enforces neither.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra-gateway/security/rbac_config.json` around lines 91 - 117, The ingest
path (infra-gateway/cloud_ingestor/kafka_config.py) must load and enforce the
RBAC rules from rbac_config.json: update the Bearer auth flow (e.g.,
authenticate_bearer / validate_token) to resolve caller identity and role, then
in the message processing path (e.g., CloudIngestor.handle_message or
route_to_topic) enforce tenant isolation by rejecting or filtering messages that
don't match caller_tenant_id (inject/validate tenant_id on the message and
drop/transform if mismatch), strip PII fields (user_hash, session_id) from
payloads before any downstream routing, and emit audit logs + return appropriate
denial responses (HTTP 403 or a rejected message/error event) when
enforcement_rules are violated; ensure the code reads rbac_config.json once at
startup and uses its enforcement/violation_action values to drive behavior and
logging (use audit_logger/audit_event helpers).
Overview Implements the complete "Bridge" infrastructure layer for the NEXUS platform. This PR delivers the Kafka-based telemetry pipeline, on-prem vault services, and cloud ingestor API — fully deployed to production on Render with Aiven Kafka (SSL).
What's included
Cloud Ingestor (
cloud_ingestor/)nexus.events.<tenant_id>,nexus.packets.<tenant_id>)On-Prem Vault (
on_prem_vault/)Infrastructure (
docker-compose.yml)onpremandcloudnexus_internal— no internet,nexus_cloud)Security (
security/rbac_config.json)NEXUS_PRODUCT_MANAGER,NEXUS_SALES,NEXUS_COMPLIANCE,NEXUS_TENANT_ADMIN,NEXUS_SUPER_ADMINDeployment
render.yaml) for cloud ingestorhttps://nexus-zofl.onrender.comSummary by CodeRabbit
New Features
Tests
Chores