diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml index bc44c6e3..e76dc155 100644 --- a/.github/workflows/ci-python.yml +++ b/.github/workflows/ci-python.yml @@ -11,6 +11,7 @@ on: - 'services/document-ingestion-service/**' - 'workers/analysis-worker/**' - 'workers/batch-ranking-worker/**' + - 'services/content-creator/**' - 'shared/telemetry/**' pull_request: paths: @@ -22,6 +23,7 @@ on: - 'services/document-ingestion-service/**' - 'workers/analysis-worker/**' - 'workers/batch-ranking-worker/**' + - 'services/content-creator/**' - 'shared/telemetry/**' env: @@ -128,6 +130,9 @@ jobs: batch-ranking-worker: - 'workers/batch-ranking-worker/**' - 'shared/telemetry/**' + content-creator: + - 'services/content-creator/**' + - 'shared/telemetry/**' - name: Build matrix of changed services id: set-matrix @@ -141,6 +146,7 @@ jobs: [ "${{ steps.filter.outputs.document-ingestion-service }}" = "true" ] && SERVICES+=('services/document-ingestion-service') [ "${{ steps.filter.outputs.analysis-worker }}" = "true" ] && SERVICES+=('workers/analysis-worker') [ "${{ steps.filter.outputs.batch-ranking-worker }}" = "true" ] && SERVICES+=('workers/batch-ranking-worker') + [ "${{ steps.filter.outputs.content-creator }}" = "true" ] && SERVICES+=('services/content-creator') if [ ${#SERVICES[@]} -eq 0 ]; then echo 'matrix={"service":[]}' >> "$GITHUB_OUTPUT" else diff --git a/docker-compose.yml b/docker-compose.yml index 32d40390..c622ab57 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -469,6 +469,43 @@ services: networks: - backend + content-creator: + <<: *runtime-hardening-strict + build: + context: . + dockerfile: services/content-creator/Dockerfile + image: curvit/content-creator:local + container_name: curvit-content-creator + restart: unless-stopped + environment: + APP_ENV: ${APP_ENV:-development} + LOG_LEVEL: "debug" + DATABASE_URL: "postgresql+asyncpg://curvit:${POSTGRES_PASSWORD}@postgres:5432/curvit" + AUTH_SECRET: ${AUTH_SECRET} + AUTH_ISSUER: https://app.curvit.local.co.uk + AUTH_AUDIENCE: curvit-api + INTERNAL_API_KEY: ${INTERNAL_API_KEY:-} + CMS_SERVICE_URL: "http://cms-service:8000" + # CONTENT_CREATOR_AI_API_KEY: placeholder — set when key is generated + CONTENT_CREATOR_AI_API_KEY: ${CONTENT_CREATOR_AI_API_KEY:-} + CONTENT_CREATOR_AI_MODEL: ${CONTENT_CREATOR_AI_MODEL:-claude-haiku-4-5-20251001} + PROMPT_VERSION: ${CONTENT_CREATOR_PROMPT_VERSION:-v1.0} + depends_on: + postgres: + condition: service_healthy + cms-service: + condition: service_healthy + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/health')"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 15s + # No public ingress — internal service only. + # No Traefik labels — content-creator is never exposed externally. + networks: + - backend + messaging-service: <<: *runtime-hardening-strict build: diff --git a/docs/content-creator.md b/docs/content-creator.md new file mode 100644 index 00000000..774c0358 --- /dev/null +++ b/docs/content-creator.md @@ -0,0 +1,502 @@ +# curvit-content-creator Service + +## Overview + +`curvit-content-creator` is a dedicated internal Docker service that generates high-quality draft content for Curvit on a weekly schedule. It acts as an editorial assistant: it produces drafts and recommendations but **never automatically publishes content**. + +--- + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ content-creator service │ +│ │ +│ FastAPI app │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ /health /ready /metrics /admin/* │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ Scheduler (asyncio task) │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ Runs every Monday at 01:00 UTC │ │ +│ │ Calls run_generation_cycle() │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ Workflow engine (app/services/workflow.py) │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ Stage 1 → Content inventory (via CMS API) │ │ +│ │ Stage 2 → UK recruitment research │ │ +│ │ Stage 3 → Opportunity analysis & scoring │ │ +│ │ Stage 4 → Content brief generation (AI) │ │ +│ │ Stage 5 → Draft generation (AI) │ │ +│ │ Stage 6 → Link intelligence │ │ +│ │ Stage 7 → Conversion rule validation │ │ +│ │ Stage 8 → Quality review (AI scoring) │ │ +│ │ Stage 9 → Draft creation in CMS │ │ +│ │ Stage 10 → Administration support data │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ External clients │ +│ ┌──────────────────┐ ┌──────────────────────────────┐ │ +│ │ ai_client.py │ │ cms_client.py │ │ +│ │ (Anthropic API) │ │ (cms-service internal HTTP) │ │ +│ └──────────────────┘ └──────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ + │ │ + ▼ ▼ + Anthropic API cms-service + (separate key) PostgreSQL +``` + +--- + +## Generation Modes + +The service supports four generation modes, selectable per run: + +| Mode | Description | +|------|-------------| +| `NewContent` | Generates entirely new evergreen articles, guides, or blog posts | +| `Refresh` | Creates draft revisions of existing low-scoring or stale content | +| `Expansion` | Produces additional sections, FAQs, or supporting content for existing pages | +| `Mixed` | Combines new and refresh/expansion (default for scheduled runs) | + +The scheduled weekly run uses `Mixed` mode with the following allocation: +- 2 new evergreen content drafts +- 1 refreshed or expanded existing content draft +- 1 news-informed blog draft (or an extra evergreen piece if no newsworthy topic exists) + +--- + +## Workflow + +### Stage 1 — Content inventory + +Calls the CMS service to retrieve all published content (articles, guides, blog posts). Builds summaries of existing topics, slugs, word counts, and quality scores to identify: + +- Coverage gaps +- Orphaned pages +- Thin or weak content +- Missing internal links + +All content types are treated equally regardless of origin: manual, AI-generated, AI-assisted, or imported. + +### Stage 2 — UK recruitment research + +Fetches research from approved authoritative sources: + +- [GOV.UK](https://www.gov.uk/) +- [ONS](https://www.ons.gov.uk/) (Office for National Statistics) +- [CIPD](https://www.cipd.org/) +- [REC](https://www.rec.uk.com/) (Recruitment & Employment Confederation) +- [Acas](https://www.acas.org.uk/) +- LinkedIn Hiring Insights +- Indeed Hiring Lab + +Each digest is stored with: source URL, headline, summary, publication date, and relevance score. + +**The service must not scrape competitor sites.** + +### Stage 3 — Opportunity analysis + +Scores each content opportunity using a weighted formula: + +``` +composite_score = (evergreen_seo_score × 0.60) + + (product_alignment_score × 0.25) + + (news_relevance_score × 0.15) +``` + +Evergreen SEO is weighted highest to ensure the service favours durable content over news-chasing. + +### Stage 4 — Brief generation + +Before generating any draft, the AI produces a structured content brief containing: + +- Topic +- Search intent +- Target keyword +- Secondary keywords +- Suggested internal links +- Suggested external sources +- CTA opportunities +- Recommended content type + +### Stage 5 — Draft generation + +Generates full drafts using the Anthropic API. + +**Content philosophy:** +- Useful first, conversion second +- No hard selling +- No fake expertise +- No fabricated statistics or testimonials + +**Preferred formats:** +- Before/after examples +- Recruiter and hiring manager perspectives +- ATS-aware advice +- Worked examples and practical exercises + +**Avoided patterns:** +- Generic recruitment clichés +- Thin SEO filler +- Listicles with little substance + +### Stage 6 — Link intelligence + +For each generated draft the link auditor: + +- Extracts all internal and external links from the markdown +- Validates HTTP availability of external links +- Checks external link authority (GOV.UK, ONS, CIPD, etc. are treated as authoritative) +- Assesses anchor text quality +- Flags orphaned internal links or missing high-value targets + +Results are stored as `ContentLinkAudit` records. + +### Stage 7 — Conversion optimisation + +Enforces conversion rules before any draft is accepted: + +| Rule | Requirement | +|------|-------------| +| First Curvit mention | Must not appear in the first 25% of the content | +| Maximum direct references | 2 per article | +| Content value | Must remain valuable if all Curvit references are removed | +| Sales language | No aggressive or pressure-based language | + +Drafts failing these rules are rejected before quality review. + +### Stage 8 — Quality review + +The AI scores every accepted draft on four dimensions (0–100 each): + +| Dimension | Weight | +|-----------|--------| +| SEO | Keyword placement, heading structure, meta suitability | +| Trust | Accuracy, attribution, absence of invented claims | +| Readability | Clarity, sentence length, structure | +| Conversion | CTA quality, natural product integration | + +Composite score = mean of all four. Drafts below the configured threshold are rejected. The default minimum composite score is **65**. + +### Stage 9 — Draft creation + +Accepted drafts are saved to the CMS via the internal API with: + +```json +{ + "status": "Draft", + "ai_generated": true, + "content_source": "AiGenerated" +} +``` + +The service **never** calls any publish or activate endpoint. + +Generation metadata (run ID, mode, prompt version, model) is embedded in the `editorial_notes` field of each CMS record for full traceability. + +### Stage 10 — Administration support + +Populates the admin review queue with: + +- Draft content items pending review +- Quality scores for each draft +- Link audit results +- Research digests from Stage 2 +- Refresh recommendations for existing content + +--- + +## Scheduling + +The service runs automatically every **Monday at 01:00 UTC**. + +The scheduler is implemented as an asyncio background task started inside the FastAPI lifespan context manager. It calculates the next Monday 01:00 UTC timestamp on startup and sleeps until that time, then loops. + +To run an immediate cycle without waiting for the schedule, use the [ad-hoc trigger endpoint](#ad-hoc-generation). + +--- + +## API Endpoints + +All endpoints require the internal API key (`X-Internal-Api-Key` header). + +### Health & monitoring + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/health` | Liveness check | +| `GET` | `/ready` | Readiness check (verifies DB connectivity) | +| `GET` | `/metrics` | Prometheus metrics (text format) | + +### Admin — Generation control + +#### Ad-hoc generation + +```http +POST /admin/generate +Content-Type: application/json +X-Internal-Api-Key: + +{ + "mode": "Mixed", // NewContent | Refresh | Expansion | Mixed + "target_count": 4, // total drafts to attempt + "content_type_filter": "Article", // optional: Article | Guide | Blog + "topic_hint": "IR35 changes 2025" // optional: seeded topic for first opportunity +} +``` + +Triggers an immediate generation cycle in a FastAPI `BackgroundTask`. Returns immediately with a `run_id`. + +#### List generation runs + +```http +GET /admin/runs?limit=20&offset=0 +``` + +#### Draft review queue + +```http +GET /admin/queue?limit=20&offset=0 +``` + +Returns draft content items created by the service that are awaiting human review. + +### Admin — Research & analysis + +```http +GET /admin/research?limit=50&offset=0 +GET /admin/quality?limit=50&offset=0&rejected_only=false +GET /admin/links?limit=50&offset=0&issues_only=false +GET /admin/opportunities?limit=50&offset=0 +GET /admin/recommendations?limit=20&offset=0 +``` + +--- + +## AI Prompts + +### Prompt versioning + +Every AI call stores its provenance: + +```python +PromptVersion = "v1.0" # bump when prompt wording changes +ModelUsed = "claude-3-5-haiku-20241022" +GeneratedAt = datetime.utcnow() +GenerationMode = "NewContent" # NewContent | Refresh | Expansion | MetaGeneration | LinkOptimisation +``` + +This allows comparing content quality across prompt versions and model upgrades, and feeds into future analytics from the learning service (issue #332). + +### Prompt categories + +| Constant | Prompt version | Purpose | +|----------|---------------|---------| +| `_BRIEF_PROMPT_VERSION` | `v1.0` | Generates the content brief (Stage 4) | +| `_DRAFT_PROMPT_VERSION` | `v1.0` | Generates the full draft (Stage 5) | +| `_QUALITY_REVIEW_PROMPT_VERSION` | `v1.0` | AI quality scoring (Stage 8) | +| `_REFRESH_PROMPT_VERSION` | `v1.0` | Identifies refresh candidates | + +--- + +## Safety Controls + +### What the service must not do + +- Auto-publish any content +- Delete any content +- Rewrite published content automatically +- Invent named people, testimonials, or statistics +- Scrape competitor websites + +### What the service may do + +- Create draft content items in the CMS +- Suggest updates and refreshes (as draft briefs) +- Recommend internal and external links +- Score and prioritise existing content for refresh + +### Enforcement + +1. The CMS client always uses `status="Draft"` — there is no code path to publish +2. Conversion rules are enforced in `_validate_conversion_rules()` before Stage 8 +3. Quality review AI is prompted explicitly to reject fabricated claims +4. The service has no CMS admin credentials — only internal API key access to create drafts + +--- + +## Configuration + +All configuration is via environment variables: + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `CONTENT_CREATOR_AI_API_KEY` | Yes | — | Anthropic API key (separate from cv-analysis) | +| `CONTENT_CREATOR_AI_MODEL` | No | `claude-3-5-haiku-20241022` | Anthropic model to use | +| `CONTENT_CREATOR_PROMPT_VERSION` | No | `v1.0` | Prompt version identifier | +| `CMS_SERVICE_URL` | No | `http://cms-service:8000` | Internal CMS service base URL | +| `DATABASE_URL` | Yes | — | PostgreSQL connection string | +| `INTERNAL_API_KEY` | Yes | — | Shared internal API key for endpoint auth | +| `MIN_QUALITY_SCORE` | No | `65` | Minimum composite quality score (0–100) | +| `MIN_SEO_SCORE` | No | `60` | Minimum SEO dimension score | +| `MIN_TRUST_SCORE` | No | `70` | Minimum trust dimension score | +| `MIN_READABILITY_SCORE` | No | `60` | Minimum readability dimension score | +| `MAX_AI_CONCURRENCY` | No | `3` | Max parallel AI requests | +| `LOG_LEVEL` | No | `INFO` | Structured log level | + +--- + +## Infrastructure + +### Docker + +The service runs as a non-root user inside a minimal Python 3.14-slim-bookworm image. It has: + +- No public ingress (no Traefik labels) +- No exposed ports on the host +- Internal Docker network connectivity only + +### Health endpoint + +```http +GET /health +→ 200 {"status": "ok", "service": "content-creator"} +``` + +### Readiness endpoint + +```http +GET /ready +→ 200 {"status": "ready", "db": "ok"} +→ 503 {"status": "not ready", "db": "error: ..."} +``` + +### Prometheus metrics + +```http +GET /metrics +→ text/plain Prometheus exposition format +``` + +Key metrics: +- `content_creator_generation_runs_total` — counter labelled by mode and status +- `content_creator_drafts_created_total` — counter of drafts saved to CMS +- `content_creator_drafts_rejected_total` — counter of rejected drafts labelled by reason +- `content_creator_ai_requests_total` — counter labelled by operation +- `content_creator_ai_request_duration_seconds` — histogram + +### Graceful shutdown + +The asyncio scheduler task is cancelled on FastAPI shutdown. In-progress AI requests complete before the process exits (the semaphore ensures no runaway concurrency). + +--- + +## Operations + +### Starting the service locally + +```bash +docker compose up content-creator +``` + +### Triggering an immediate generation cycle + +```bash +curl -X POST http://localhost:8007/admin/generate \ + -H "X-Internal-Api-Key: " \ + -H "Content-Type: application/json" \ + -d '{"mode": "Mixed", "target_count": 4}' +``` + +(Port 8007 is available for local development — the service has no host port binding in production.) + +### Viewing the draft review queue + +```bash +curl http://localhost:8007/admin/queue \ + -H "X-Internal-Api-Key: " +``` + +### Checking quality scores + +```bash +# All reviews +curl "http://localhost:8007/admin/quality" -H "X-Internal-Api-Key: " + +# Only rejected drafts +curl "http://localhost:8007/admin/quality?rejected_only=true" -H "X-Internal-Api-Key: " +``` + +### Checking link audit results + +```bash +# Only links with issues +curl "http://localhost:8007/admin/links?issues_only=true" -H "X-Internal-Api-Key: " +``` + +--- + +## Testing + +```bash +cd services/content-creator +python -m pytest tests/ -v +``` + +### Test structure + +| File | Coverage | +|------|----------| +| `tests/test_unit.py` | Scheduler timing, link auditor, workflow helpers, conversion rules | +| `tests/test_routes.py` | All HTTP endpoints, ad-hoc generation, filter parameters | +| `tests/test_workflow.py` | Opportunity generation, refresh candidate identification, draft safety | + +All tests use an in-memory SQLite database via `conftest.py`. + +--- + +## Monitoring + +### Alerts to configure + +| Alert | Threshold | Action | +|-------|-----------|--------| +| No generation runs in 8 days | `content_creator_generation_runs_total` not increasing | Check scheduler logs, DB connectivity | +| High rejection rate | `content_creator_drafts_rejected_total / content_creator_drafts_created_total > 0.5` | Review quality thresholds and AI prompts | +| AI error rate | `content_creator_ai_requests_total{status="error"} > 0` | Check `CONTENT_CREATOR_AI_API_KEY` validity | +| Readiness failing | `/ready` returns 503 | Check PostgreSQL connectivity | + +### Log fields + +All logs are emitted as structured JSON with the fields: + +```json +{ + "timestamp": "...", + "level": "INFO", + "service": "content-creator", + "run_id": "...", + "stage": "draft_generation", + "message": "..." +} +``` + +--- + +## Integration Points + +### CMS service (issue #329) + +The service reads all published content from the CMS via internal HTTP, and writes new drafts back via the same API. It uses the content model's `status`, `ai_generated`, `content_source`, and `editorial_notes` fields. + +### Admin UI (issue #330) + +The admin endpoints (`/admin/queue`, `/admin/quality`, `/admin/links`, `/admin/research`, `/admin/recommendations`) are designed to feed the CMS admin dashboard defined in issue #330. + +### Analytics / learning service (issue #332) + +When the analytics service becomes available, its signals (impressions, clicks, engagement) should be fed into the `refresh_signals` field on `ContentOpportunity` records. The `PromptVersion` and `ModelUsed` stored on every draft will allow the analytics service to compare performance across prompt versions. diff --git a/services/content-creator/Dockerfile b/services/content-creator/Dockerfile new file mode 100644 index 00000000..be610d59 --- /dev/null +++ b/services/content-creator/Dockerfile @@ -0,0 +1,39 @@ +# python:3.14-slim-bookworm — digest pinned; update via Dependabot or `skopeo inspect --raw docker://python:3.14-slim-bookworm` +FROM python@sha256:ec58d916f9e24a6035cab2bdf07f6206c4cc092a16613c60597534711332d9d6 AS builder + +RUN apt-get update \ + && apt-get install -y --no-install-recommends build-essential \ + && apt-get upgrade -y \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /build +COPY services/content-creator/requirements.txt ./ +# requirements.txt is generated by pip-compile (a lock file with pinned versions) +RUN pip install --no-cache-dir --only-binary=:all: -r requirements.txt --target /install # NOSONAR: S8410 + +# ───────────────────────────────────────────────────────────────────────────── + +FROM python@sha256:ec58d916f9e24a6035cab2bdf07f6206c4cc092a16613c60597534711332d9d6 AS runtime + +RUN apt-get update \ + && apt-get upgrade -y \ + && rm -rf /var/lib/apt/lists/* \ + && addgroup --system appgroup \ + && adduser --system --ingroup appgroup appuser + +WORKDIR /app +COPY --from=builder /install /app/deps +COPY services/content-creator/app/ /app/app/ +COPY services/content-creator/internal_auth.py /app/internal_auth.py +COPY shared_api_key_scheme.py /app/shared_api_key_scheme.py + +COPY shared/*.py /app/shared/ +COPY shared/__init__.py /app/shared/ +ENV PYTHONPATH=/app:/app/deps +ENV PYTHONUNBUFFERED=1 + +USER appuser + +EXPOSE 8000 + +CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "info"] diff --git a/services/content-creator/app/__init__.py b/services/content-creator/app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/content-creator/app/config.py b/services/content-creator/app/config.py new file mode 100644 index 00000000..2ae05eb5 --- /dev/null +++ b/services/content-creator/app/config.py @@ -0,0 +1,62 @@ +"""Configuration for the content-creator service.""" +from functools import lru_cache + +from shared.config import BaseServiceSettings + + +class Settings(BaseServiceSettings): + """Content-creator service configuration.""" + + # CMS service endpoint (internal) + cms_service_url: str = "http://cms-service:8000" + + # AI API key dedicated to content creation (separate from cv-analysis key) + # This key is a placeholder — replace with the actual key when generated. + content_creator_ai_api_key: str = "" + + # AI model to use for content generation + content_creator_ai_model: str = "claude-haiku-4-5-20251001" + + # Prompt version tag applied to all generated content in this deployment + prompt_version: str = "v1.0" + + # Quality thresholds — drafts scoring below these are rejected + min_seo_score: int = 60 + min_trust_score: int = 70 + min_readability_score: int = 65 + min_conversion_score: int = 50 + + # Link validation timeout (seconds) + link_check_timeout: int = 10 + + # Maximum concurrent AI requests + ai_max_concurrent: int = 2 + + # Content author name used when creating CMS drafts + ai_author_name: str = "Curvit Content Creator" + + # Research sources to query (comma-separated labels) + research_sources: str = "GOV.UK,ONS,CIPD,REC,Acas" + + def validate_startup(self) -> None: # type: ignore[override] + """Extend base validation with content-creator–specific checks.""" + super().validate_startup() + import logging + _log = logging.getLogger(__name__) + if not self.content_creator_ai_api_key: + if self.app_env.lower() in {"development", "dev", "local", "test", "testing"}: + _log.warning( + "WARN: CONTENT_CREATOR_AI_API_KEY is not set; " + "AI generation will fail. Set the key when available." + ) + else: + raise ValueError( + "CONTENT_CREATOR_AI_API_KEY is required outside development/test" + ) + if not self.cms_service_url: + raise ValueError("CMS_SERVICE_URL is required") + + +@lru_cache +def get_settings() -> Settings: + return Settings() diff --git a/services/content-creator/app/db.py b/services/content-creator/app/db.py new file mode 100644 index 00000000..6b3fe602 --- /dev/null +++ b/services/content-creator/app/db.py @@ -0,0 +1,4 @@ +"""Database connection for content-creator service.""" +from shared.db import get_db_engine, get_session_maker, get_session # noqa: F401 + +__all__ = ["get_db_engine", "get_session_maker", "get_session"] diff --git a/services/content-creator/app/main.py b/services/content-creator/app/main.py new file mode 100644 index 00000000..a3629012 --- /dev/null +++ b/services/content-creator/app/main.py @@ -0,0 +1,73 @@ +"""Content-creator service — FastAPI application entry point.""" +from __future__ import annotations + +import asyncio +import logging + +from fastapi import FastAPI, Response +from prometheus_fastapi_instrumentator import Instrumentator + +from app.config import get_settings +from app.routers.admin import router as admin_router +from app.scheduler import run_scheduler +from internal_auth import CorrelationIdMiddleware, InternalApiKeyMiddleware +from shared_api_key_scheme import setup_api_key_scheme + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +_scheduler_task: asyncio.Task | None = None + + +async def _lifespan(app: FastAPI): # type: ignore[type-arg] + """Start background scheduler on startup; cancel on shutdown.""" + global _scheduler_task # noqa: PLW0603 + _scheduler_task = asyncio.create_task(run_scheduler()) + logger.info("Content-creator scheduler task started") + yield + if _scheduler_task and not _scheduler_task.done(): + _scheduler_task.cancel() + try: + await _scheduler_task + except asyncio.CancelledError: + pass + logger.info("Content-creator service shutdown complete") + + +app = FastAPI( + title="content-creator", + description=( + "AI-powered content creation service for Curvit. " + "Generates draft blog posts, guides, and articles on a weekly schedule. " + "Never auto-publishes. All output requires human editorial review." + ), + version="0.1.0", + lifespan=_lifespan, +) + +app.add_middleware(InternalApiKeyMiddleware) +app.add_middleware(CorrelationIdMiddleware) +app.include_router(admin_router) +Instrumentator().instrument(app).expose(app) + +setup_api_key_scheme(app) + +# Run startup configuration validation +get_settings().validate_startup() + + +# Security review: +# - /health and /ready return only {"status": "..."} — no config details. +# - Exempt from InternalApiKeyMiddleware so Docker health probes can reach them. +# - Cache-Control: no-store prevents proxies from serving stale health status. + +@app.get("/health", tags=["ops"]) +def health(response: Response) -> dict: + response.headers["Cache-Control"] = "no-store" + return {"status": "healthy"} + + +@app.get("/ready", tags=["ops"]) +def ready(response: Response) -> dict: + response.headers["Cache-Control"] = "no-store" + return {"status": "ready"} diff --git a/services/content-creator/app/models/__init__.py b/services/content-creator/app/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/content-creator/app/models/db_models.py b/services/content-creator/app/models/db_models.py new file mode 100644 index 00000000..91ac5379 --- /dev/null +++ b/services/content-creator/app/models/db_models.py @@ -0,0 +1,147 @@ +"""SQLAlchemy ORM models for the content-creator service.""" +from datetime import datetime, timezone +from sqlalchemy import String, Column, DateTime, Text, Boolean, Integer, Float, JSON +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import declarative_base +import uuid + +Base = declarative_base() + + +class GenerationRun(Base): + """Tracks each execution of the content generation workflow (scheduled or ad-hoc).""" + __tablename__ = "ContentCreatorRuns" + + id = Column("Id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + run_type = Column("RunType", String(50), nullable=False) # "Scheduled" | "AdHoc" + generation_mode = Column("GenerationMode", String(50), nullable=False) # "NewContent" | "Refresh" | "Expansion" | "Mixed" + status = Column("Status", String(50), nullable=False, default="Running") # "Running" | "Completed" | "Failed" | "Partial" + triggered_by = Column("TriggeredBy", String(200), nullable=True) # user id or "scheduler" + parameters = Column("Parameters", JSON, nullable=True) # ad-hoc run parameters + started_at = Column("StartedAt", DateTime(timezone=True), nullable=False, + default=lambda: datetime.now(timezone.utc)) + completed_at = Column("CompletedAt", DateTime(timezone=True), nullable=True) + items_created = Column("ItemsCreated", Integer, nullable=False, default=0) + items_rejected = Column("ItemsRejected", Integer, nullable=False, default=0) + error_message = Column("ErrorMessage", Text, nullable=True) + summary = Column("Summary", Text, nullable=True) + + +class ResearchDigest(Base): + """A captured research item from an approved authoritative source.""" + __tablename__ = "ResearchDigests" + + id = Column("Id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + run_id = Column("RunId", UUID(as_uuid=True), nullable=True) + source_name = Column("SourceName", String(100), nullable=False) # e.g. "GOV.UK", "CIPD" + source_url = Column("SourceUrl", String(2048), nullable=False) + headline = Column("Headline", String(500), nullable=False) + summary = Column("Summary", Text, nullable=True) + publication_date = Column("PublicationDate", DateTime(timezone=True), nullable=True) + relevance_score = Column("RelevanceScore", Float, nullable=True) # 0.0–1.0 + topics = Column("Topics", JSON, nullable=True) # list of topic strings + raw_content = Column("RawContent", Text, nullable=True) + captured_at = Column("CapturedAt", DateTime(timezone=True), nullable=False, + default=lambda: datetime.now(timezone.utc)) + used_in_opportunity_id = Column("UsedInOpportunityId", UUID(as_uuid=True), nullable=True) + + +class ContentOpportunity(Base): + """A scored and ranked content opportunity identified from inventory and research.""" + __tablename__ = "ContentOpportunities" + + id = Column("Id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + run_id = Column("RunId", UUID(as_uuid=True), nullable=True) + opportunity_type = Column("OpportunityType", String(50), nullable=False) # "NewContent" | "Refresh" | "Expansion" + content_type = Column("ContentType", String(50), nullable=False) # "Blog" | "Guide" | "Article" + topic = Column("Topic", String(500), nullable=False) + target_keyword = Column("TargetKeyword", String(200), nullable=True) + rationale = Column("Rationale", Text, nullable=True) + # Weighted score components + evergreen_seo_score = Column("EvergreenSeoScore", Float, nullable=True) # 60% weight + product_alignment_score = Column("ProductAlignmentScore", Float, nullable=True) # 25% weight + news_relevance_score = Column("NewsRelevanceScore", Float, nullable=True) # 15% weight + composite_score = Column("CompositeScore", Float, nullable=True) + # For refresh opportunities, reference to existing content + existing_content_id = Column("ExistingContentId", UUID(as_uuid=True), nullable=True) + existing_content_slug = Column("ExistingContentSlug", String(250), nullable=True) + refresh_signals = Column("RefreshSignals", JSON, nullable=True) # list of signal strings + # Outcome + status = Column("Status", String(50), nullable=False, default="Pending") # "Pending" | "BriefCreated" | "DraftCreated" | "Rejected" + created_at = Column("CreatedAt", DateTime(timezone=True), nullable=False, + default=lambda: datetime.now(timezone.utc)) + + +class ContentBrief(Base): + """A detailed content brief generated before draft article generation.""" + __tablename__ = "ContentBriefs" + + id = Column("Id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + opportunity_id = Column("OpportunityId", UUID(as_uuid=True), nullable=False) + run_id = Column("RunId", UUID(as_uuid=True), nullable=True) + topic = Column("Topic", String(500), nullable=False) + search_intent = Column("SearchIntent", String(200), nullable=True) + target_keyword = Column("TargetKeyword", String(200), nullable=True) + secondary_keywords = Column("SecondaryKeywords", JSON, nullable=True) + suggested_internal_links = Column("SuggestedInternalLinks", JSON, nullable=True) + suggested_external_sources = Column("SuggestedExternalSources", JSON, nullable=True) + cta_opportunities = Column("CtaOpportunities", JSON, nullable=True) + content_type = Column("ContentType", String(50), nullable=False) + recommended_word_count = Column("RecommendedWordCount", Integer, nullable=True) + outline = Column("Outline", Text, nullable=True) + generation_mode = Column("GenerationMode", String(50), nullable=False) # "NewContent" | "Refresh" | "Expansion" | "MetaGeneration" + prompt_version = Column("PromptVersion", String(100), nullable=True) + model_used = Column("ModelUsed", String(100), nullable=True) + generated_at = Column("GeneratedAt", DateTime(timezone=True), nullable=False, + default=lambda: datetime.now(timezone.utc)) + + +class ContentQualityReview(Base): + """AI quality scores for a generated draft content item.""" + __tablename__ = "ContentQualityReviews" + + id = Column("Id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + content_item_id = Column("ContentItemId", UUID(as_uuid=True), nullable=True) # nullable before CMS save + brief_id = Column("BriefId", UUID(as_uuid=True), nullable=True) + run_id = Column("RunId", UUID(as_uuid=True), nullable=True) + # AI quality scores (0–100) + seo_score = Column("SeoScore", Integer, nullable=False) + trust_score = Column("TrustScore", Integer, nullable=False) + readability_score = Column("ReadabilityScore", Integer, nullable=False) + conversion_score = Column("ConversionScore", Integer, nullable=False) + composite_score = Column("CompositeQualityScore", Integer, nullable=False) + # Conversion compliance checks + curvit_first_mention_position = Column("CurvitFirstMentionPosition", Float, nullable=True) # 0.0–1.0 fraction + direct_product_references = Column("DirectProductReferences", Integer, nullable=True) + passes_conversion_rules = Column("PassesConversionRules", Boolean, nullable=False, default=True) + # Rejection reason if below threshold + rejected = Column("Rejected", Boolean, nullable=False, default=False) + rejection_reason = Column("RejectionReason", Text, nullable=True) + # AI provenance + prompt_version = Column("PromptVersion", String(100), nullable=True) + model_used = Column("ModelUsed", String(100), nullable=True) + generation_mode = Column("GenerationMode", String(50), nullable=True) + reviewed_at = Column("ReviewedAt", DateTime(timezone=True), nullable=False, + default=lambda: datetime.now(timezone.utc)) + review_notes = Column("ReviewNotes", Text, nullable=True) + + +class ContentLinkAudit(Base): + """Results of link validation for a draft content item.""" + __tablename__ = "ContentLinkAudits" + + id = Column("Id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + content_item_id = Column("ContentItemId", UUID(as_uuid=True), nullable=True) + brief_id = Column("BriefId", UUID(as_uuid=True), nullable=True) + run_id = Column("RunId", UUID(as_uuid=True), nullable=True) + link_url = Column("LinkUrl", String(2048), nullable=False) + link_type = Column("LinkType", String(20), nullable=False) # "Internal" | "External" + anchor_text = Column("AnchorText", String(500), nullable=True) + is_available = Column("IsAvailable", Boolean, nullable=True) # HTTP check result + http_status = Column("HttpStatus", Integer, nullable=True) + is_authoritative = Column("IsAuthoritative", Boolean, nullable=True) + is_relevant = Column("IsRelevant", Boolean, nullable=True) + anchor_quality = Column("AnchorQuality", String(50), nullable=True) # "Good" | "Weak" | "Generic" + issue = Column("Issue", String(200), nullable=True) + audited_at = Column("AuditedAt", DateTime(timezone=True), nullable=False, + default=lambda: datetime.now(timezone.utc)) diff --git a/services/content-creator/app/models/schemas.py b/services/content-creator/app/models/schemas.py new file mode 100644 index 00000000..fef096fc --- /dev/null +++ b/services/content-creator/app/models/schemas.py @@ -0,0 +1,222 @@ +"""Pydantic schemas for the content-creator service API.""" +from datetime import datetime +from typing import Any +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +def _str_uuid(v: Any) -> str | None: + """Coerce UUID objects to str for JSON serialisation.""" + if v is None: + return None + return str(v) + + +# ── Generation run schemas ──────────────────────────────────────────────────── + +class GenerationRunSummary(BaseModel): + id: str + run_type: str + generation_mode: str + status: str + triggered_by: str | None = None + started_at: datetime + completed_at: datetime | None = None + items_created: int + items_rejected: int + error_message: str | None = None + summary: str | None = None + + @field_validator("id", mode="before") + @classmethod + def _coerce_id(cls, v: Any) -> str: + return str(v) + + model_config = ConfigDict(from_attributes=True) + + +class AdHocRunRequest(BaseModel): + """Request body for triggering an ad-hoc generation cycle.""" + generation_mode: str = Field( + default="Mixed", + description="One of: NewContent, Refresh, Expansion, Mixed", + ) + content_type: str | None = Field( + default=None, + description="Restrict to a specific content type: Blog, Guide, Article", + ) + topic_hint: str | None = Field( + default=None, + description="Optional topic or keyword hint to guide content selection", + ) + target_count: int = Field( + default=2, + ge=1, + le=5, + description="Number of drafts to produce (1–5)", + ) + notes: str | None = Field( + default=None, + description="Optional editorial notes for this run", + ) + + +class AdHocRunResponse(BaseModel): + run_id: str + status: str + message: str + + +# ── Research digest schemas ─────────────────────────────────────────────────── + +class ResearchDigestSchema(BaseModel): + id: str + source_name: str + source_url: str + headline: str + summary: str | None = None + publication_date: datetime | None = None + relevance_score: float | None = None + topics: list[str] | None = None + captured_at: datetime + + @field_validator("id", mode="before") + @classmethod + def _coerce_id(cls, v: Any) -> str: + return str(v) + + model_config = ConfigDict(from_attributes=True) + + +class ResearchDigestListResponse(BaseModel): + items: list[ResearchDigestSchema] + total: int + + +# ── Content opportunity schemas ─────────────────────────────────────────────── + +class ContentOpportunitySchema(BaseModel): + id: str + opportunity_type: str + content_type: str + topic: str + target_keyword: str | None = None + rationale: str | None = None + evergreen_seo_score: float | None = None + product_alignment_score: float | None = None + news_relevance_score: float | None = None + composite_score: float | None = None + existing_content_id: str | None = None + existing_content_slug: str | None = None + refresh_signals: list[str] | None = None + status: str + created_at: datetime + + @field_validator("id", "existing_content_id", mode="before") + @classmethod + def _coerce_uuid(cls, v: Any) -> str | None: + return _str_uuid(v) + + model_config = ConfigDict(from_attributes=True) + + +class ContentOpportunityListResponse(BaseModel): + items: list[ContentOpportunitySchema] + total: int + + +# ── Quality review schemas ──────────────────────────────────────────────────── + +class ContentQualityReviewSchema(BaseModel): + id: str + content_item_id: str | None = None + run_id: str | None = None + seo_score: int + trust_score: int + readability_score: int + conversion_score: int + composite_score: int + curvit_first_mention_position: float | None = None + direct_product_references: int | None = None + passes_conversion_rules: bool + rejected: bool + rejection_reason: str | None = None + prompt_version: str | None = None + model_used: str | None = None + generation_mode: str | None = None + reviewed_at: datetime + review_notes: str | None = None + + @field_validator("id", "content_item_id", "run_id", mode="before") + @classmethod + def _coerce_uuid(cls, v: Any) -> str | None: + return _str_uuid(v) + + model_config = ConfigDict(from_attributes=True) + + +class QualityReviewListResponse(BaseModel): + items: list[ContentQualityReviewSchema] + total: int + + +# ── Link audit schemas ──────────────────────────────────────────────────────── + +class ContentLinkAuditSchema(BaseModel): + id: str + content_item_id: str | None = None + run_id: str | None = None + link_url: str + link_type: str + anchor_text: str | None = None + is_available: bool | None = None + http_status: int | None = None + is_authoritative: bool | None = None + is_relevant: bool | None = None + anchor_quality: str | None = None + issue: str | None = None + audited_at: datetime + + @field_validator("id", "content_item_id", "run_id", mode="before") + @classmethod + def _coerce_uuid(cls, v: Any) -> str | None: + return _str_uuid(v) + + model_config = ConfigDict(from_attributes=True) + + +class LinkAuditListResponse(BaseModel): + items: list[ContentLinkAuditSchema] + total: int + + +# ── Admin dashboard schemas ─────────────────────────────────────────────────── + +class DraftReviewQueueItem(BaseModel): + content_item_id: str + title: str + content_type: str + generation_mode: str + composite_quality_score: int | None = None + created_at: datetime + run_id: str | None = None + prompt_version: str | None = None + model_used: str | None = None + + +class DraftReviewQueueResponse(BaseModel): + items: list[DraftReviewQueueItem] + total: int + + +class PublishingRecommendation(BaseModel): + content_item_id: str + title: str + content_type: str + recommendation: str # "Publish" | "Review" | "Revise" | "Reject" + rationale: str + quality_score: int | None = None + + +class RunListResponse(BaseModel): + items: list[GenerationRunSummary] + total: int diff --git a/services/content-creator/app/routers/__init__.py b/services/content-creator/app/routers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/content-creator/app/routers/admin.py b/services/content-creator/app/routers/admin.py new file mode 100644 index 00000000..2faa3bc7 --- /dev/null +++ b/services/content-creator/app/routers/admin.py @@ -0,0 +1,394 @@ +"""Admin router for the content-creator service. + +Provides: +- Draft review queue +- Research review +- Quality scores +- Link audit results +- Publishing recommendations +- Ad-hoc generation cycle trigger (Stage 10) +""" +from __future__ import annotations + +import logging +import uuid +from datetime import datetime, timezone + +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import get_session +from app.models.db_models import ( + ContentLinkAudit, + ContentOpportunity, + ContentQualityReview, + GenerationRun, + ResearchDigest, +) +from app.models.schemas import ( + AdHocRunRequest, + AdHocRunResponse, + ContentOpportunityListResponse, + ContentOpportunitySchema, + DraftReviewQueueItem, + DraftReviewQueueResponse, + GenerationRunSummary, + LinkAuditListResponse, + ContentLinkAuditSchema, + PublishingRecommendation, + QualityReviewListResponse, + ContentQualityReviewSchema, + ResearchDigestListResponse, + ResearchDigestSchema, + RunListResponse, +) +from app.services import cms_client +from app.services.workflow import run_generation_cycle +from internal_auth import require_internal_api_key + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/admin", tags=["admin"]) + + +# ── Ad-hoc generation cycle ─────────────────────────────────────────────────── + +@router.post( + "/generate", + response_model=AdHocRunResponse, + dependencies=[Depends(require_internal_api_key)], + summary="Trigger an ad-hoc AI content generation cycle", +) +async def trigger_adhoc_generation( + request: AdHocRunRequest, + background_tasks: BackgroundTasks, + session: AsyncSession = Depends(get_session), +) -> AdHocRunResponse: + """Trigger an ad-hoc content generation run with custom parameters. + + The run executes in the background; the response returns the run_id + immediately so the caller can poll for status. + """ + valid_modes = {"NewContent", "Refresh", "Expansion", "Mixed"} + if request.generation_mode not in valid_modes: + raise HTTPException( + status_code=422, + detail=f"generation_mode must be one of: {', '.join(sorted(valid_modes))}", + ) + if request.content_type and request.content_type not in {"Blog", "Guide", "Article"}: + raise HTTPException( + status_code=422, + detail="content_type must be one of: Blog, Guide, Article", + ) + + run_id = str(uuid.uuid4()) + parameters = { + "target_count": request.target_count, + "content_type": request.content_type, + "topic_hint": request.topic_hint, + "notes": request.notes, + } + + background_tasks.add_task( + _run_adhoc_in_background, + run_id=run_id, + generation_mode=request.generation_mode, + parameters=parameters, + ) + + return AdHocRunResponse( + run_id=run_id, + status="Accepted", + message=( + f"Ad-hoc generation run accepted. " + f"Mode: {request.generation_mode}, target: {request.target_count} drafts. " + f"Poll GET /admin/runs/{run_id} for status." + ), + ) + + +async def _run_adhoc_in_background( + run_id: str, + generation_mode: str, + parameters: dict, +) -> None: + """Execute the workflow in a background task with its own DB session.""" + from app.db import get_session_maker + session_maker = get_session_maker() + async with session_maker() as session: + try: + await run_generation_cycle( + session=session, + run_id=run_id, + run_type="AdHoc", + generation_mode=generation_mode, + triggered_by="admin", + parameters=parameters, + ) + except Exception: # noqa: BLE001 + logger.exception("Background adhoc run %s failed", run_id) + + +# ── Generation run history ──────────────────────────────────────────────────── + +@router.get( + "/runs", + response_model=RunListResponse, + dependencies=[Depends(require_internal_api_key)], + summary="List generation runs", +) +async def list_runs( + limit: int = Query(default=20, ge=1, le=100), + offset: int = Query(default=0, ge=0), + session: AsyncSession = Depends(get_session), +) -> RunListResponse: + total_result = await session.execute(select(func.count()).select_from(GenerationRun)) + total = total_result.scalar_one() + + result = await session.execute( + select(GenerationRun) + .order_by(GenerationRun.started_at.desc()) + .limit(limit) + .offset(offset) + ) + runs = result.scalars().all() + return RunListResponse( + items=[GenerationRunSummary.model_validate(r) for r in runs], + total=total, + ) + + +@router.get( + "/runs/{run_id}", + response_model=GenerationRunSummary, + dependencies=[Depends(require_internal_api_key)], + summary="Get a specific generation run", +) +async def get_run( + run_id: str, + session: AsyncSession = Depends(get_session), +) -> GenerationRunSummary: + try: + run_uuid = uuid.UUID(run_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid run_id format") + + result = await session.execute( + select(GenerationRun).where(GenerationRun.id == run_uuid) + ) + run = result.scalar_one_or_none() + if run is None: + raise HTTPException(status_code=404, detail="Run not found") + return GenerationRunSummary.model_validate(run) + + +# ── Draft review queue (Stage 10) ──────────────────────────────────────────── + +@router.get( + "/queue", + response_model=DraftReviewQueueResponse, + dependencies=[Depends(require_internal_api_key)], + summary="Draft review queue — AI-generated drafts awaiting human review", +) +async def draft_review_queue( + limit: int = Query(default=20, ge=1, le=100), + session: AsyncSession = Depends(get_session), +) -> DraftReviewQueueResponse: + """Return drafts created by this service that are pending human review. + + Integrates with the CMS admin workflow (Issue #330). + """ + # Join quality reviews with run info to provide composite scores + result = await session.execute( + select(ContentQualityReview) + .where(ContentQualityReview.rejected.is_(False)) + .order_by(ContentQualityReview.reviewed_at.desc()) + .limit(limit) + ) + reviews = result.scalars().all() + + items: list[DraftReviewQueueItem] = [] + for review in reviews: + if review.content_item_id is None: + continue + items.append(DraftReviewQueueItem( + content_item_id=str(review.content_item_id), + title="[Pending CMS title lookup]", + content_type="[Pending]", + generation_mode=review.generation_mode or "NewContent", + composite_quality_score=review.composite_score, + created_at=review.reviewed_at, + run_id=str(review.run_id) if review.run_id else None, + prompt_version=review.prompt_version, + model_used=review.model_used, + )) + + return DraftReviewQueueResponse(items=items, total=len(items)) + + +# ── Research review ─────────────────────────────────────────────────────────── + +@router.get( + "/research", + response_model=ResearchDigestListResponse, + dependencies=[Depends(require_internal_api_key)], + summary="Recent research digests", +) +async def list_research( + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + session: AsyncSession = Depends(get_session), +) -> ResearchDigestListResponse: + total_result = await session.execute(select(func.count()).select_from(ResearchDigest)) + total = total_result.scalar_one() + + result = await session.execute( + select(ResearchDigest) + .order_by(ResearchDigest.captured_at.desc()) + .limit(limit) + .offset(offset) + ) + digests = result.scalars().all() + return ResearchDigestListResponse( + items=[ResearchDigestSchema.model_validate(d) for d in digests], + total=total, + ) + + +# ── Quality scores ──────────────────────────────────────────────────────────── + +@router.get( + "/quality", + response_model=QualityReviewListResponse, + dependencies=[Depends(require_internal_api_key)], + summary="Content quality review results", +) +async def list_quality_reviews( + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + rejected_only: bool = Query(default=False), + session: AsyncSession = Depends(get_session), +) -> QualityReviewListResponse: + base_query = select(ContentQualityReview) + if rejected_only: + base_query = base_query.where(ContentQualityReview.rejected == True) # noqa: E712 + + total_result = await session.execute(select(func.count()).select_from(base_query.subquery())) + total = total_result.scalar_one() + + result = await session.execute( + base_query.order_by(ContentQualityReview.reviewed_at.desc()).limit(limit).offset(offset) + ) + reviews = result.scalars().all() + return QualityReviewListResponse( + items=[ContentQualityReviewSchema.model_validate(r) for r in reviews], + total=total, + ) + + +# ── Link audit results ──────────────────────────────────────────────────────── + +@router.get( + "/links", + response_model=LinkAuditListResponse, + dependencies=[Depends(require_internal_api_key)], + summary="Link audit results", +) +async def list_link_audits( + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + issues_only: bool = Query(default=False, description="Return only links with issues"), + session: AsyncSession = Depends(get_session), +) -> LinkAuditListResponse: + base_query = select(ContentLinkAudit) + if issues_only: + base_query = base_query.where(ContentLinkAudit.issue != None) # noqa: E711 + + total_result = await session.execute(select(func.count()).select_from(base_query.subquery())) + total = total_result.scalar_one() + + result = await session.execute( + base_query.order_by(ContentLinkAudit.audited_at.desc()).limit(limit).offset(offset) + ) + audits = result.scalars().all() + return LinkAuditListResponse( + items=[ContentLinkAuditSchema.model_validate(a) for a in audits], + total=total, + ) + + +# ── Content opportunities ───────────────────────────────────────────────────── + +@router.get( + "/opportunities", + response_model=ContentOpportunityListResponse, + dependencies=[Depends(require_internal_api_key)], + summary="Content opportunities identified", +) +async def list_opportunities( + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + session: AsyncSession = Depends(get_session), +) -> ContentOpportunityListResponse: + total_result = await session.execute(select(func.count()).select_from(ContentOpportunity)) + total = total_result.scalar_one() + + result = await session.execute( + select(ContentOpportunity) + .order_by(ContentOpportunity.composite_score.desc()) + .limit(limit) + .offset(offset) + ) + opps = result.scalars().all() + return ContentOpportunityListResponse( + items=[ContentOpportunitySchema.model_validate(o) for o in opps], + total=total, + ) + + +# ── Publishing recommendations ──────────────────────────────────────────────── + +@router.get( + "/recommendations", + response_model=list[PublishingRecommendation], + dependencies=[Depends(require_internal_api_key)], + summary="Publishing recommendations based on quality scores", +) +async def publishing_recommendations( + session: AsyncSession = Depends(get_session), +) -> list[PublishingRecommendation]: + """Return publishing recommendations for recently reviewed drafts.""" + result = await session.execute( + select(ContentQualityReview) + .where(ContentQualityReview.content_item_id != None) # noqa: E711 + .order_by(ContentQualityReview.reviewed_at.desc()) + .limit(20) + ) + reviews = result.scalars().all() + + recommendations: list[PublishingRecommendation] = [] + for review in reviews: + score = review.composite_score + if review.rejected: + rec = "Reject" + rationale = review.rejection_reason or "Below quality threshold" + elif score >= 80: + rec = "Publish" + rationale = f"High quality score ({score}/100). Recommend review and publish." + elif score >= 65: + rec = "Review" + rationale = f"Good quality ({score}/100). Human review recommended before publishing." + else: + rec = "Revise" + rationale = f"Quality score {score}/100. Revisions recommended." + + recommendations.append(PublishingRecommendation( + content_item_id=str(review.content_item_id), + title="[See CMS for title]", + content_type="[See CMS]", + recommendation=rec, + rationale=rationale, + quality_score=score, + )) + return recommendations diff --git a/services/content-creator/app/scheduler.py b/services/content-creator/app/scheduler.py new file mode 100644 index 00000000..fa27864e --- /dev/null +++ b/services/content-creator/app/scheduler.py @@ -0,0 +1,76 @@ +"""Scheduler for the content-creator service. + +Runs a content generation cycle every Monday at 01:00 UTC. +Uses asyncio with a lightweight scheduling loop to avoid adding +a heavy dependency (e.g. Celery) to this service. +""" +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone, timedelta + +logger = logging.getLogger(__name__) + +# Weekly schedule: Monday (weekday=0) at 01:00 UTC +_SCHEDULE_WEEKDAY = 0 # Monday +_SCHEDULE_HOUR = 1 +_SCHEDULE_MINUTE = 0 + + +def _next_run_time(now: datetime) -> datetime: + """Calculate the next Monday 01:00 UTC from a reference time.""" + days_ahead = (_SCHEDULE_WEEKDAY - now.weekday()) % 7 + if days_ahead == 0: + # Today is Monday — check if we have already passed the scheduled time + scheduled_today = now.replace( + hour=_SCHEDULE_HOUR, minute=_SCHEDULE_MINUTE, second=0, microsecond=0 + ) + if now >= scheduled_today: + days_ahead = 7 # Next Monday + else: + return scheduled_today + + next_monday = now + timedelta(days=days_ahead) + return next_monday.replace( + hour=_SCHEDULE_HOUR, minute=_SCHEDULE_MINUTE, second=0, microsecond=0 + ) + + +async def run_scheduler() -> None: + """Long-running coroutine that triggers weekly generation cycles.""" + from app.db import get_session_maker + from app.services.workflow import run_generation_cycle + + logger.info("Content-creator scheduler started (Monday 01:00 UTC weekly)") + + while True: + now = datetime.now(timezone.utc) + next_run = _next_run_time(now) + wait_seconds = (next_run - now).total_seconds() + + logger.info( + "Next scheduled run at %s UTC (in %.0f seconds)", + next_run.isoformat(), + wait_seconds, + ) + await asyncio.sleep(wait_seconds) + + logger.info("Starting scheduled content generation run") + session_maker = get_session_maker() + async with session_maker() as session: + try: + run = await run_generation_cycle( + session=session, + run_type="Scheduled", + generation_mode="Mixed", + triggered_by="scheduler", + ) + logger.info( + "Scheduled run completed: status=%s created=%d rejected=%d", + run.status, + run.items_created, + run.items_rejected, + ) + except Exception: # noqa: BLE001 + logger.exception("Scheduled generation run failed") diff --git a/services/content-creator/app/services/__init__.py b/services/content-creator/app/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/content-creator/app/services/ai_client.py b/services/content-creator/app/services/ai_client.py new file mode 100644 index 00000000..fb57b5d1 --- /dev/null +++ b/services/content-creator/app/services/ai_client.py @@ -0,0 +1,370 @@ +"""AI client wrapper for content-creator service. + +Uses a dedicated API key (CONTENT_CREATOR_AI_API_KEY) separate from the +key used by the cv-analysis / ai-orchestrator service. +""" +from __future__ import annotations + +import asyncio +import logging + +import anthropic + +from app.config import get_settings + +logger = logging.getLogger(__name__) + +_QUALITY_REVIEW_PROMPT_VERSION = "quality-review" +_BRIEF_PROMPT_VERSION = "brief" +_DRAFT_PROMPT_VERSION = "draft" + + +def _get_client() -> anthropic.AsyncAnthropic: + """Return an Anthropic client using the content-creator API key.""" + settings = get_settings() + return anthropic.AsyncAnthropic(api_key=settings.content_creator_ai_api_key) + + +def get_semaphore() -> asyncio.Semaphore: + """Return the module-level concurrency semaphore, creating it if needed. + + The semaphore is created on first access rather than at import time so that + it is bound to the running event loop (required by asyncio). + """ + global _semaphore # noqa: PLW0603 + if _semaphore is None: + settings = get_settings() + _semaphore = asyncio.Semaphore(max(1, settings.ai_max_concurrent)) + return _semaphore + + +_semaphore: asyncio.Semaphore | None = None + + +async def _call_ai( + system: str, + prompt: str, + *, + max_tokens: int = 4096, + retries: int = 3, + retry_base_s: float = 1.0, +) -> str: + """Call the Anthropic API with retry logic and concurrency limiting.""" + settings = get_settings() + client = _get_client() + sem = get_semaphore() + + async with sem: + for attempt in range(retries): + try: + response = await client.messages.create( + model=settings.content_creator_ai_model, + max_tokens=max_tokens, + system=system, + messages=[{"role": "user", "content": prompt}], + ) + return response.content[0].text if response.content else "" + except anthropic.RateLimitError: + if attempt < retries - 1: + wait = retry_base_s * (2 ** attempt) + logger.warning("Rate limited; retrying in %.1fs (attempt %d)", wait, attempt + 1) + await asyncio.sleep(wait) + else: + raise + except (anthropic.APIConnectionError, anthropic.APITimeoutError): + if attempt < retries - 1: + wait = retry_base_s * (2 ** attempt) + logger.warning("API connection error; retrying in %.1fs", wait) + await asyncio.sleep(wait) + else: + raise + return "" + + +async def generate_content_brief( + topic: str, + content_type: str, + target_keyword: str | None, + opportunity_rationale: str | None, + existing_content_summaries: list[str], + research_summaries: list[str], + generation_mode: str = "NewContent", +) -> dict: + """Stage 4: Generate a detailed content brief using AI.""" + settings = get_settings() + existing = "\n".join(f"- {s}" for s in existing_content_summaries) or "None" + research = "\n".join(f"- {s}" for s in research_summaries) or "None" + + system = ( + "You are a senior editorial strategist for Curvit, a UK recruitment platform. " + "You create detailed content briefs for high-quality evergreen content. " + "Always respond with valid JSON only — no markdown fences, no preamble." + ) + prompt = f"""Create a detailed content brief for the following opportunity. + +Generation mode: {generation_mode} +Content type: {content_type} +Topic: {topic} +Target keyword: {target_keyword or 'not specified'} +Rationale: {opportunity_rationale or 'SEO gap identified'} + +Existing content for context (avoid duplication): +{existing} + +Recent research relevant to this topic: +{research} + +Return JSON with exactly these fields: +{{ + "topic": "...", + "search_intent": "informational|navigational|transactional|commercial", + "target_keyword": "...", + "secondary_keywords": ["...", "..."], + "suggested_internal_links": [ + {{"anchor": "...", "rationale": "..."}} + ], + "suggested_external_sources": [ + {{"name": "...", "url": "...", "rationale": "..."}} + ], + "cta_opportunities": [ + {{"type": "Educational|Diagnostic|Benchmark|Curiosity", "description": "..."}} + ], + "outline": "Markdown outline with H2/H3 sections", + "recommended_word_count": 1500, + "prompt_version": "{settings.prompt_version}-{_BRIEF_PROMPT_VERSION}", + "model_used": "{settings.content_creator_ai_model}" +}}""" + + raw = await _call_ai(system, prompt, max_tokens=2048) + import json + try: + return json.loads(raw) + except json.JSONDecodeError: + logger.warning("Brief generation returned invalid JSON; returning partial brief") + return { + "topic": topic, + "search_intent": "informational", + "target_keyword": target_keyword or topic, + "secondary_keywords": [], + "suggested_internal_links": [], + "suggested_external_sources": [], + "cta_opportunities": [], + "outline": "", + "recommended_word_count": 1200, + "prompt_version": f"{settings.prompt_version}-{_BRIEF_PROMPT_VERSION}", + "model_used": settings.content_creator_ai_model, + } + + +async def generate_draft_content( + brief: dict, + generation_mode: str, + existing_content_markdown: str | None = None, +) -> dict: + """Stage 5: Generate a full draft article from a content brief.""" + settings = get_settings() + + system = """You are a senior content writer for Curvit, a UK recruitment platform. + +Content philosophy (follow strictly): +- Useful first, conversion second. +- No hard selling. +- No fake expertise. +- No fabricated statistics. +- No fabricated testimonials. +- No invented named people. +- Never invent numbers — use real data from cited sources only. +- No generic recruitment clichés. +- No thin SEO content. +- No listicles with little substance. + +Preferred approaches: +- Before-and-after examples. +- Recruiter perspective. +- Hiring manager perspective. +- ATS perspective. +- Worked examples. +- Practical exercises. + +Conversion rules (enforced): +- Do not mention Curvit in the first 25% of the article. +- Maximum two direct product references total. +- Content must remain valuable if all Curvit references are removed. +- No aggressive sales language. +- CTAs must be Educational, Diagnostic, Benchmark, or Curiosity-driven. + +Always respond with valid JSON only — no markdown fences, no preamble.""" + + word_count = brief.get("recommended_word_count", 1200) + outline = brief.get("outline", "") + ctas = brief.get("cta_opportunities", []) + internal = brief.get("suggested_internal_links", []) + external = brief.get("suggested_external_sources", []) + + mode_instruction = "" + if generation_mode == "Refresh" and existing_content_markdown: + mode_instruction = f""" +This is a REFRESH of existing content. Improve and expand the existing article below. +Preserve factual accuracy. Do not remove valuable sections. Improve weak introduction, +add missing FAQ, strengthen internal links where suggested. + +Existing content: +{existing_content_markdown[:3000]} +""" + elif generation_mode == "Expansion": + mode_instruction = "This is an EXPANSION — add depth, new sections, and worked examples to the existing topic." + + prompt = f"""Write a complete {content_type_label(brief.get('content_type', 'Guide'))} article in Markdown. + +Topic: {brief.get('topic', '')} +Search intent: {brief.get('search_intent', 'informational')} +Target keyword: {brief.get('target_keyword', '')} +Secondary keywords: {', '.join(brief.get('secondary_keywords', []))} +Word count target: {word_count} + +Outline to follow: +{outline} + +Suggested internal link anchors: {[l.get('anchor') for l in internal]} +Suggested external sources: {[s.get('name') for s in external]} +CTA opportunities: {[c.get('description') for c in ctas]} +{mode_instruction} + +Return JSON with exactly these fields: +{{ + "title": "...", + "seo_title": "...", + "seo_description": "...", + "excerpt": "Two-sentence summary for cards/meta", + "content_markdown": "Full article in Markdown", + "target_keyword": "...", + "reading_time_minutes": 5, + "editorial_notes": "Brief notes for the human editor reviewing this draft", + "prompt_version": "{settings.prompt_version}-{_DRAFT_PROMPT_VERSION}", + "model_used": "{settings.content_creator_ai_model}", + "generation_mode": "{generation_mode}" +}}""" + + raw = await _call_ai(system, prompt, max_tokens=8192) + import json + try: + return json.loads(raw) + except json.JSONDecodeError: + logger.warning("Draft generation returned invalid JSON for topic '%s'", brief.get("topic")) + return {} + + +def content_type_label(content_type: str) -> str: + labels = {"Blog": "blog post", "Guide": "guide", "Article": "article"} + return labels.get(content_type, "article") + + +async def review_draft_quality( + title: str, + content_markdown: str, + content_type: str, + target_keyword: str | None, + generation_mode: str, +) -> dict: + """Stage 8: AI quality review — score SEO, Trust, Readability, Conversion.""" + settings = get_settings() + + system = ( + "You are a senior content quality reviewer. " + "Score each dimension from 0 to 100. " + "Apply strict criteria. " + "Always respond with valid JSON only — no markdown fences, no preamble." + ) + prompt = f"""Review this {content_type} draft for quality. + +Title: {title} +Target keyword: {target_keyword or 'not specified'} +Generation mode: {generation_mode} + +Content (first 3000 chars): +{content_markdown[:3000]} + +Score each dimension 0–100 and return JSON: +{{ + "seo_score": 0-100, + "trust_score": 0-100, + "readability_score": 0-100, + "conversion_score": 0-100, + "curvit_first_mention_position": 0.0-1.0 (fraction of content where Curvit first appears, null if not mentioned), + "direct_product_references": integer count, + "review_notes": "Key issues for the editor", + "prompt_version": "{settings.prompt_version}-{_QUALITY_REVIEW_PROMPT_VERSION}", + "model_used": "{settings.content_creator_ai_model}" +}}""" + + raw = await _call_ai(system, prompt, max_tokens=1024) + import json + try: + return json.loads(raw) + except json.JSONDecodeError: + logger.warning("Quality review returned invalid JSON for '%s'", title) + return { + "seo_score": 0, + "trust_score": 0, + "readability_score": 0, + "conversion_score": 0, + "curvit_first_mention_position": None, + "direct_product_references": None, + "review_notes": "Quality review parse error", + "prompt_version": f"{settings.prompt_version}-{_QUALITY_REVIEW_PROMPT_VERSION}", + "model_used": settings.content_creator_ai_model, + } + + +async def identify_refresh_candidates( + content_summaries: list[dict], +) -> list[dict]: + """Identify which existing content items are candidates for refresh or expansion.""" + settings = get_settings() + + system = ( + "You are an editorial strategist for Curvit. " + "Identify which content items should be refreshed or expanded. " + "Always respond with valid JSON only — no markdown fences, no preamble." + ) + content_list = "\n".join( + f"- id={c.get('id')} title={c.get('title')} type={c.get('content_type')} " + f"status={c.get('status')} keyword={c.get('target_keyword')} " + f"updated={c.get('updated_at')}" + for c in content_summaries[:50] # limit context size + ) + prompt = f"""Review the following published content items and identify refresh candidates. + +Refresh signals to look for: +- Stale content (not updated in 6+ months) +- Thin content (missing FAQ, short word count) +- Weak meta titles/descriptions +- Missing internal links +- Low-conversion pages +- Blog posts that should become evergreen guides +- Pages ranking for adjacent topics needing supporting articles + +Content inventory: +{content_list} + +Return a JSON array of opportunities: +[ + {{ + "existing_content_id": "...", + "existing_content_slug": "...", + "title": "...", + "opportunity_type": "Refresh|Expansion", + "refresh_signals": ["signal1", "signal2"], + "rationale": "Why this content should be refreshed", + "priority_score": 0.0-1.0 + }} +]""" + + raw = await _call_ai(system, prompt, max_tokens=2048) + import json + try: + result = json.loads(raw) + return result if isinstance(result, list) else [] + except json.JSONDecodeError: + logger.warning("Refresh candidate identification returned invalid JSON") + return [] diff --git a/services/content-creator/app/services/cms_client.py b/services/content-creator/app/services/cms_client.py new file mode 100644 index 00000000..b95936c4 --- /dev/null +++ b/services/content-creator/app/services/cms_client.py @@ -0,0 +1,178 @@ +"""CMS service client — creates draft content items via the CMS API.""" +from __future__ import annotations + +import logging +import uuid +from datetime import datetime, timedelta, timezone + +import httpx +import jwt + +from app.config import get_settings + +logger = logging.getLogger(__name__) + +_CONTENT_SOURCE = "AiGenerated" + + +async def _cms_headers() -> dict[str, str]: + settings = get_settings() + headers = { + "X-Internal-Api-Key": settings.internal_api_key, + "Content-Type": "application/json", + } + if settings.auth_secret: + issued_at = datetime.now(timezone.utc) + token = jwt.encode( + { + "sub": "content-creator-service", + "email": "content-creator@curvit.local", + "groups": ["Administrator"], + "iss": settings.auth_issuer.rstrip("/"), + "aud": settings.auth_audience, + "iat": int(issued_at.timestamp()), + "exp": int((issued_at + timedelta(minutes=5)).timestamp()), + }, + settings.auth_secret, + algorithm="HS256", + ) + headers["Authorization"] = "Bearer " + token + return headers + + +async def get_published_content( + content_type: str | None = None, + limit: int = 200, +) -> list[dict]: + """Stage 1: Retrieve content inventory from the CMS service. + + Retrieves all content types (Manual, AiGenerated, AiAssisted, Imported). + """ + settings = get_settings() + params: dict = {"limit": limit, "status": "Published"} + if content_type: + params["content_type"] = content_type + + try: + async with httpx.AsyncClient(timeout=30) as client: + headers = await _cms_headers() + response = await client.get( + f"{settings.cms_service_url}/admin/content", + headers=headers, + params=params, + ) + response.raise_for_status() + data = response.json() + return data.get("items", []) + except httpx.HTTPError as exc: + logger.warning("Failed to retrieve content inventory: %s", exc) + return [] + + +async def get_draft_content_by_run(run_id: str) -> list[dict]: + """Retrieve Draft content items created by a specific generation run.""" + settings = get_settings() + try: + async with httpx.AsyncClient(timeout=30) as client: + headers = await _cms_headers() + response = await client.get( + f"{settings.cms_service_url}/admin/content", + headers=headers, + params={"status": "Draft", "limit": 100}, + ) + response.raise_for_status() + data = response.json() + # Filter by editorial notes containing the run_id + items = data.get("items", []) + return [ + i for i in items + if run_id in (i.get("editorial_notes") or "") + ] + except httpx.HTTPError as exc: + logger.warning("Failed to retrieve draft content: %s", exc) + return [] + + +async def create_draft_content_item( + *, + run_id: str, + content_type: str, + title: str, + slug: str, + content_markdown: str, + excerpt: str | None, + seo_title: str | None, + seo_description: str | None, + target_keyword: str | None, + search_intent: str | None, + reading_time_minutes: int | None, + editorial_notes: str | None, + prompt_version: str | None, + model_used: str | None, + generation_mode: str, + source_content_id: str | None = None, +) -> str | None: + """Stage 9: Save a draft content item to the CMS. + + Returns the new content item ID if successful, else None. + Never sets status to anything other than Draft. + """ + settings = get_settings() + + # Embed run metadata in editorial notes for traceability + notes_prefix = f"[AI-Run:{run_id}] [Mode:{generation_mode}] [Model:{model_used}] [Prompt:{prompt_version}]" + if editorial_notes: + full_notes = f"{notes_prefix}\n\n{editorial_notes}" + else: + full_notes = notes_prefix + + payload: dict = { + "content_type": content_type, + "title": title, + "slug": _safe_slug(slug), + "content_markdown": content_markdown, + "author_name": settings.ai_author_name, + "status": "Draft", + "content_source": _CONTENT_SOURCE, + "ai_generated": True, + "ai_prompt_version": prompt_version, + "editorial_notes": full_notes, + "target_keyword": target_keyword, + "search_intent": search_intent, + "reading_time_minutes": reading_time_minutes, + } + if excerpt: + payload["excerpt"] = excerpt + if seo_title: + payload["seo_title"] = seo_title + if seo_description: + payload["seo_description"] = seo_description + if source_content_id: + payload["created_from_content_id"] = source_content_id + + try: + async with httpx.AsyncClient(timeout=30) as client: + headers = await _cms_headers() + response = await client.post( + f"{settings.cms_service_url}/admin/content", + headers=headers, + json=payload, + ) + response.raise_for_status() + created = response.json() + item_id = created.get("id") + logger.info("Created draft content item id=%s title='%s'", item_id, title) + return item_id + except httpx.HTTPError as exc: + logger.error("Failed to create draft content item '%s': %s", title, exc) + return None + + +def _safe_slug(slug: str) -> str: + """Ensure the slug is URL-safe; append a short UUID fragment if empty.""" + import re + safe = re.sub(r"[^a-z0-9-]", "-", slug.lower()) + safe = re.sub(r"-+", "-", safe).strip("-") + if not safe: + safe = str(uuid.uuid4())[:8] + return safe[:200] diff --git a/services/content-creator/app/services/link_auditor.py b/services/content-creator/app/services/link_auditor.py new file mode 100644 index 00000000..85fbb41d --- /dev/null +++ b/services/content-creator/app/services/link_auditor.py @@ -0,0 +1,197 @@ +"""Link validation service for Stage 6 — Link Intelligence.""" +from __future__ import annotations + +import asyncio +import ipaddress +import logging +import re +from urllib.parse import urlparse + +import httpx + +from app.config import get_settings + +logger = logging.getLogger(__name__) + +# Authoritative domains for UK recruitment content +_AUTHORITATIVE_DOMAINS = frozenset({ + "gov.uk", + "ons.gov.uk", + "cipd.co.uk", + "rec.uk.com", + "acas.org.uk", + "linkedin.com", + "indeed.com", + "hiringlab.org", +}) + +_GENERIC_ANCHOR_PATTERNS = frozenset({ + "click here", + "here", + "read more", + "learn more", + "this article", + "this page", + "link", + "more", +}) + + +def _is_authoritative(url: str) -> bool: + """Check if a URL belongs to an authoritative domain.""" + try: + domain = urlparse(url).netloc.lower() + # Strip www prefix + domain = re.sub(r"^www\.", "", domain) + return any(domain == d or domain.endswith("." + d) for d in _AUTHORITATIVE_DOMAINS) + except Exception: # noqa: BLE001 + return False + + +def _assess_anchor_quality(anchor: str | None) -> str: + """Classify anchor text quality.""" + if not anchor or not anchor.strip(): + return "Weak" + lower = anchor.strip().lower() + if lower in _GENERIC_ANCHOR_PATTERNS: + return "Generic" + if len(anchor.strip()) < 3: + return "Weak" + return "Good" + + +async def _check_url(url: str, timeout: int) -> tuple[bool, int | None]: + """Perform HTTP HEAD (fallback GET) check on a URL.""" + if not _is_safe_outbound_url(url): + logger.debug("Skipping unsafe outbound URL: %s", url) + return False, None + try: + async with httpx.AsyncClient( + timeout=timeout, + follow_redirects=True, + ) as client: + resp = await client.head(url) + # Some servers reject HEAD with 405; fall back to GET + if resp.status_code == 405: + resp = await client.get(url) + return resp.status_code < 400, resp.status_code + except Exception as exc: # noqa: BLE001 + logger.debug("Link check failed for %s: %s", url, exc) + return False, None + + +def _is_safe_outbound_url(url: str) -> bool: + try: + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"}: + return False + hostname = (parsed.hostname or "").strip().lower() + if not hostname: + return False + if hostname in {"localhost", "localhost.localdomain"} or hostname.endswith(".localhost"): + return False + if hostname.endswith(".local") or hostname.endswith(".internal"): + return False + if "." not in hostname: + return False + ip = ipaddress.ip_address(hostname) + return not ( + ip.is_private + or ip.is_loopback + or ip.is_link_local + or ip.is_multicast + or ip.is_reserved + or ip.is_unspecified + ) + except ValueError: + return True + + +async def audit_links( + links: list[dict], + *, + run_id: str | None = None, + content_item_id: str | None = None, + brief_id: str | None = None, +) -> list[dict]: + """Audit a list of links and return link audit records. + + links: list of {"url": str, "anchor": str, "type": "Internal"|"External"} + """ + settings = get_settings() + timeout = settings.link_check_timeout + + async def _audit_one(link: dict) -> dict: + url = link.get("url", "") + anchor = link.get("anchor") + link_type = link.get("type", "External") + + is_available, http_status = await _check_url(url, timeout) + + is_authoritative = None + if link_type == "External": + is_authoritative = _is_authoritative(url) + + issue = None + if not is_available: + issue = f"HTTP {http_status}" if http_status else "Unreachable" + elif link_type == "External" and is_authoritative is False: + issue = "Non-authoritative external source" + + return { + "run_id": run_id, + "content_item_id": content_item_id, + "brief_id": brief_id, + "link_url": url, + "link_type": link_type, + "anchor_text": anchor, + "is_available": is_available, + "http_status": http_status, + "is_authoritative": is_authoritative, + "is_relevant": None, # relevance requires semantic analysis — left for review + "anchor_quality": _assess_anchor_quality(anchor), + "issue": issue, + } + + tasks = [_audit_one(link) for link in links] + results = await asyncio.gather(*tasks, return_exceptions=True) + + audits = [] + for link, result in zip(links, results): + if isinstance(result, Exception): + logger.warning("Link audit exception for %s: %s", link.get("url"), result) + audits.append({ + "run_id": run_id, + "content_item_id": content_item_id, + "brief_id": brief_id, + "link_url": link.get("url", ""), + "link_type": link.get("type", "External"), + "anchor_text": link.get("anchor"), + "is_available": False, + "http_status": None, + "is_authoritative": None, + "is_relevant": None, + "anchor_quality": _assess_anchor_quality(link.get("anchor")), + "issue": "Audit error", + }) + else: + audits.append(result) + return audits + + +def extract_links_from_markdown(markdown: str, base_url: str = "") -> list[dict]: + """Extract all Markdown links from content for auditing.""" + link_pattern = re.compile(r"\[([^\]]*)\]\(([^)]+)\)") + links = [] + for match in link_pattern.finditer(markdown): + anchor = match.group(1) + url = match.group(2) + if url.startswith("http"): + link_type = "External" + elif url.startswith("/"): + link_type = "Internal" + url = base_url.rstrip("/") + url + else: + continue + links.append({"url": url, "anchor": anchor, "type": link_type}) + return links diff --git a/services/content-creator/app/services/workflow.py b/services/content-creator/app/services/workflow.py new file mode 100644 index 00000000..8f94ee0e --- /dev/null +++ b/services/content-creator/app/services/workflow.py @@ -0,0 +1,716 @@ +"""Content generation workflow engine. + +Orchestrates all 10 stages of the content creator pipeline: + 1. Content inventory + 2. UK recruitment research (placeholder — real scraping not implemented) + 3. Opportunity analysis + 4. Brief generation + 5. Draft generation + 6. Link intelligence + 7. Conversion optimisation validation + 8. Quality review + 9. Draft creation + 10. Administration support data + +Safety rules enforced here: + - Never publishes content automatically. + - Never deletes content. + - Never rewrites published content directly. + - Never invents statistics, testimonials, or named people. +""" +from __future__ import annotations + +import logging +import uuid +from datetime import datetime, timezone + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import get_settings +from app.models.db_models import ( + ContentBrief, + ContentLinkAudit, + ContentOpportunity, + ContentQualityReview, + GenerationRun, + ResearchDigest, +) +from app.services import ai_client, cms_client, link_auditor + +logger = logging.getLogger(__name__) + +# Scoring weights for opportunity analysis (Stage 3) +_WEIGHT_EVERGREEN_SEO = 0.60 +_WEIGHT_PRODUCT_ALIGNMENT = 0.25 +_WEIGHT_NEWS_RELEVANCE = 0.15 + +# Default weekly allocation +_DEFAULT_NEW_CONTENT_COUNT = 2 +_DEFAULT_REFRESH_COUNT = 1 +_DEFAULT_BLOG_COUNT = 1 + +# Minimum composite quality score to accept a draft (Stage 8) +_MIN_COMPOSITE_QUALITY = 60 + +# Research source metadata (Stage 2 — real fetching out of scope; stubs provided) +_APPROVED_SOURCES = [ + {"name": "GOV.UK", "base_url": "https://www.gov.uk"}, + {"name": "ONS", "base_url": "https://www.ons.gov.uk"}, + {"name": "CIPD", "base_url": "https://www.cipd.co.uk"}, + {"name": "REC", "base_url": "https://www.rec.uk.com"}, + {"name": "Acas", "base_url": "https://www.acas.org.uk"}, + {"name": "LinkedIn Hiring Insights", "base_url": "https://www.linkedin.com/business/talent/blog"}, + {"name": "Indeed Hiring Lab", "base_url": "https://www.hiringlab.org"}, +] + + +async def run_generation_cycle( + session: AsyncSession, + *, + run_id: str | uuid.UUID | None = None, + run_type: str = "Scheduled", + generation_mode: str = "Mixed", + triggered_by: str = "scheduler", + parameters: dict | None = None, +) -> GenerationRun: + """Execute a full content generation cycle. + + Returns the GenerationRun record with final status. + """ + settings = get_settings() + resolved_run_id = _resolve_run_id(run_id) + run = GenerationRun( + id=resolved_run_id, + run_type=run_type, + generation_mode=generation_mode, + status="Running", + triggered_by=triggered_by, + parameters=parameters, + started_at=datetime.now(timezone.utc), + items_created=0, + items_rejected=0, + ) + session.add(run) + await session.commit() + + try: + items_created, items_rejected, summary = await _execute_workflow( + session=session, + run=run, + generation_mode=generation_mode, + parameters=parameters or {}, + ) + run.status = "Completed" + run.items_created = items_created + run.items_rejected = items_rejected + run.summary = summary + except (KeyboardInterrupt, SystemExit): + run.status = "Cancelled" + run.error_message = "Shutdown requested" + raise + except Exception as exc: # noqa: BLE001 + logger.exception("Generation run %s failed: %s", resolved_run_id, exc) + run.status = "Failed" + run.error_message = str(exc)[:500] + + run.completed_at = datetime.now(timezone.utc) + await session.commit() + return run + + +async def _execute_workflow( + session: AsyncSession, + run: GenerationRun, + generation_mode: str, + parameters: dict, +) -> tuple[int, int, str]: + """Inner workflow — returns (items_created, items_rejected, summary).""" + settings = get_settings() + run_id = str(run.id) + + # ── Stage 1: Content inventory ───────────────────────────────────────────── + logger.info("[%s] Stage 1: Fetching content inventory", run_id) + existing_content = await cms_client.get_published_content(limit=200) + existing_summaries = [ + { + "id": str(item.get("id", "")), + "title": item.get("title", ""), + "content_type": item.get("content_type", ""), + "status": item.get("status", ""), + "target_keyword": item.get("target_keyword"), + "updated_at": item.get("updated_at"), + "slug": item.get("slug", ""), + "content_source": item.get("content_source", "Manual"), + } + for item in existing_content + ] + existing_topic_summaries = [ + f"{i['title']} ({i['content_type']}, kw={i['target_keyword']})" + for i in existing_summaries + ] + logger.info("[%s] Found %d published content items", run_id, len(existing_content)) + + # ── Stage 2: Research digests (stub — real source fetching TBD) ──────────── + logger.info("[%s] Stage 2: Research ingestion (stub)", run_id) + research_digests = await _fetch_research_stubs(session=session, run_id=run_id) + research_summaries = [ + f"{d.headline} ({d.source_name}): {d.summary or ''}" + for d in research_digests + ] + + # ── Stage 3: Opportunity analysis ────────────────────────────────────────── + logger.info("[%s] Stage 3: Opportunity analysis", run_id) + target_count = parameters.get("target_count", _DEFAULT_NEW_CONTENT_COUNT + _DEFAULT_REFRESH_COUNT + _DEFAULT_BLOG_COUNT) + content_type_filter = parameters.get("content_type") + topic_hint = parameters.get("topic_hint") + + opportunities = await _identify_opportunities( + session=session, + run_id=run_id, + generation_mode=generation_mode, + existing_summaries=existing_summaries, + research_summaries=research_summaries, + target_count=int(target_count), + content_type_filter=content_type_filter, + topic_hint=topic_hint, + ) + logger.info("[%s] Identified %d opportunities", run_id, len(opportunities)) + + items_created = 0 + items_rejected = 0 + created_titles: list[str] = [] + + for opportunity in opportunities: + try: + created = await _process_opportunity( + session=session, + run=run, + opportunity=opportunity, + existing_topic_summaries=existing_topic_summaries, + research_summaries=research_summaries, + existing_summaries=existing_summaries, + ) + if created: + items_created += 1 + created_titles.append(opportunity.topic) + else: + items_rejected += 1 + except Exception as exc: # noqa: BLE001 + logger.warning("[%s] Failed to process opportunity '%s': %s", run_id, opportunity.topic, exc) + items_rejected += 1 + + summary = ( + f"Run completed. Created {items_created} drafts, rejected {items_rejected}. " + f"Topics: {', '.join(created_titles[:5])}" + + (f" and {len(created_titles) - 5} more" if len(created_titles) > 5 else "") + ) + return items_created, items_rejected, summary + + +async def _fetch_research_stubs( + session: AsyncSession, + run_id: str, +) -> list[ResearchDigest]: + """Stage 2 stub — creates placeholder research digests. + + Real implementation would fetch from approved sources. + Approved sources: GOV.UK, ONS, CIPD, REC, Acas, LinkedIn, Indeed Hiring Lab. + """ + # In a full implementation, this would HTTP-fetch each approved source, + # extract headlines, and score relevance. For safety and reliability, + # this stub records the intent without scraping. + stub_digests = [ + ResearchDigest( + id=uuid.uuid4(), + run_id=uuid.UUID(run_id) if run_id else None, + source_name="GOV.UK", + source_url="https://www.gov.uk/guidance/check-employment-status-for-tax", + headline="UK employment status guidance — HMRC updated guidance on IR35", + summary="Updated HMRC guidance on employment status for tax purposes, relevant to contractors and recruiters.", + relevance_score=0.85, + topics=["IR35", "employment status", "contractor", "recruitment"], + captured_at=datetime.now(timezone.utc), + ), + ResearchDigest( + id=uuid.uuid4(), + run_id=uuid.UUID(run_id) if run_id else None, + source_name="CIPD", + source_url="https://www.cipd.co.uk/knowledge/fundamentals/emp-law/recruitment", + headline="Recruitment law and best practice — CIPD guidance", + summary="CIPD guidance on UK recruitment law, including advertising, shortlisting, interviewing, and selection.", + relevance_score=0.90, + topics=["recruitment law", "hiring", "shortlisting", "diversity"], + captured_at=datetime.now(timezone.utc), + ), + ] + for digest in stub_digests: + session.add(digest) + await session.commit() + return stub_digests + + +async def _identify_opportunities( + session: AsyncSession, + run_id: str, + generation_mode: str, + existing_summaries: list[dict], + research_summaries: list[str], + target_count: int, + content_type_filter: str | None, + topic_hint: str | None, +) -> list[ContentOpportunity]: + """Stage 3: Identify and score content opportunities.""" + opportunities: list[ContentOpportunity] = [] + + if generation_mode in ("NewContent", "Expansion", "Mixed"): + opportunity_type = "Expansion" if generation_mode == "Expansion" else "NewContent" + new_opps = _generate_new_content_opportunities( + existing_summaries=existing_summaries, + research_summaries=research_summaries, + content_type_filter=content_type_filter, + topic_hint=topic_hint, + count=_DEFAULT_NEW_CONTENT_COUNT if generation_mode == "Mixed" else target_count, + ) + for opp_data in new_opps: + opp = ContentOpportunity( + id=uuid.uuid4(), + run_id=uuid.UUID(run_id), + opportunity_type=opportunity_type, + content_type=opp_data["content_type"], + topic=opp_data["topic"], + target_keyword=opp_data.get("target_keyword"), + rationale=opp_data.get("rationale"), + evergreen_seo_score=opp_data.get("evergreen_seo_score", 0.7), + product_alignment_score=opp_data.get("product_alignment_score", 0.5), + news_relevance_score=opp_data.get("news_relevance_score", 0.3), + composite_score=_compute_composite(opp_data), + status="Pending", + ) + session.add(opp) + opportunities.append(opp) + + if generation_mode in ("Refresh", "Mixed") and existing_summaries: + refresh_candidates = await ai_client.identify_refresh_candidates(existing_summaries) + refresh_count = _DEFAULT_REFRESH_COUNT if generation_mode == "Mixed" else target_count + for candidate in refresh_candidates[:refresh_count]: + opp = ContentOpportunity( + id=uuid.uuid4(), + run_id=uuid.UUID(run_id), + opportunity_type=candidate.get("opportunity_type", "Refresh"), + content_type="Guide", # refreshes default to Guide type + topic=candidate.get("title", "Refresh opportunity"), + rationale=candidate.get("rationale"), + existing_content_id=_try_uuid(candidate.get("existing_content_id")), + existing_content_slug=candidate.get("existing_content_slug"), + refresh_signals=candidate.get("refresh_signals", []), + evergreen_seo_score=float(candidate.get("priority_score", 0.6)), + product_alignment_score=0.4, + news_relevance_score=0.2, + composite_score=_compute_composite({ + "evergreen_seo_score": float(candidate.get("priority_score", 0.6)), + "product_alignment_score": 0.4, + "news_relevance_score": 0.2, + }), + status="Pending", + ) + session.add(opp) + opportunities.append(opp) + + if generation_mode == "Mixed" and len(opportunities) < target_count: + # Blog slot — news-informed + blog_opp = ContentOpportunity( + id=uuid.uuid4(), + run_id=uuid.UUID(run_id), + opportunity_type="NewContent", + content_type="Blog", + topic=_select_blog_topic(research_summaries, topic_hint), + rationale="News-informed blog post based on recent UK recruitment research", + evergreen_seo_score=0.4, + product_alignment_score=0.5, + news_relevance_score=0.8, + composite_score=_compute_composite({ + "evergreen_seo_score": 0.4, + "product_alignment_score": 0.5, + "news_relevance_score": 0.8, + }), + status="Pending", + ) + session.add(blog_opp) + opportunities.append(blog_opp) + + await session.commit() + + # Sort by composite score (highest first) + opportunities.sort(key=lambda o: o.composite_score or 0, reverse=True) + return opportunities[:target_count] + + +def _generate_new_content_opportunities( + existing_summaries: list[dict], + research_summaries: list[str], + content_type_filter: str | None, + topic_hint: str | None, + count: int, +) -> list[dict]: + """Generate new content opportunity candidates based on gaps.""" + existing_keywords = { + (s.get("target_keyword") or "").lower() + for s in existing_summaries + if s.get("target_keyword") + } + + # Evergreen UK recruitment topics not yet covered + evergreen_topics = [ + { + "content_type": "Guide", + "topic": "How to write an ATS-optimised CV for UK job applications", + "target_keyword": "ats optimised cv uk", + "rationale": "High-volume evergreen SEO topic with strong product alignment", + "evergreen_seo_score": 0.9, + "product_alignment_score": 0.9, + "news_relevance_score": 0.2, + }, + { + "content_type": "Guide", + "topic": "UK recruiter's guide to writing effective job descriptions", + "target_keyword": "how to write job description uk", + "rationale": "Evergreen guide with strong hiring manager and recruiter audience", + "evergreen_seo_score": 0.85, + "product_alignment_score": 0.8, + "news_relevance_score": 0.15, + }, + { + "content_type": "Article", + "topic": "Understanding IR35: what recruiters and contractors need to know", + "target_keyword": "ir35 guide for recruiters", + "rationale": "High-relevance UK-specific employment law topic", + "evergreen_seo_score": 0.8, + "product_alignment_score": 0.6, + "news_relevance_score": 0.4, + }, + { + "content_type": "Guide", + "topic": "How to conduct structured interviews in the UK", + "target_keyword": "structured interview guide uk", + "rationale": "Evergreen hiring best-practice guide", + "evergreen_seo_score": 0.78, + "product_alignment_score": 0.7, + "news_relevance_score": 0.1, + }, + { + "content_type": "Article", + "topic": "UK salary benchmarking: how to set competitive pay rates", + "target_keyword": "salary benchmarking uk", + "rationale": "High search intent, relevant to hiring managers", + "evergreen_seo_score": 0.75, + "product_alignment_score": 0.65, + "news_relevance_score": 0.35, + }, + ] + + if topic_hint: + evergreen_topics.insert(0, { + "content_type": content_type_filter or "Guide", + "topic": topic_hint, + "target_keyword": topic_hint.lower(), + "rationale": f"Ad-hoc topic hint: {topic_hint}", + "evergreen_seo_score": 0.7, + "product_alignment_score": 0.6, + "news_relevance_score": 0.3, + }) + + filtered = [ + t for t in evergreen_topics + if t["target_keyword"] not in existing_keywords + and (not content_type_filter or t["content_type"] == content_type_filter) + ] + return filtered[:count] + + +def _select_blog_topic(research_summaries: list[str], topic_hint: str | None) -> str: + if topic_hint: + return f"UK recruitment update: {topic_hint}" + if research_summaries: + # Use the first research item as inspiration + return f"UK recruitment update: {research_summaries[0][:80]}..." + return "UK recruitment market update" + + +def _compute_composite(data: dict) -> float: + seo = float(data.get("evergreen_seo_score") or 0) + product = float(data.get("product_alignment_score") or 0) + news = float(data.get("news_relevance_score") or 0) + return round( + seo * _WEIGHT_EVERGREEN_SEO + + product * _WEIGHT_PRODUCT_ALIGNMENT + + news * _WEIGHT_NEWS_RELEVANCE, + 4, + ) + + +def _try_uuid(value: str | None) -> uuid.UUID | None: + if not value: + return None + try: + return uuid.UUID(value) + except (ValueError, AttributeError): + return None + + +def _resolve_run_id(run_id: str | uuid.UUID | None) -> uuid.UUID: + if run_id is None: + return uuid.uuid4() + if isinstance(run_id, uuid.UUID): + return run_id + return uuid.UUID(run_id) + + +async def _process_opportunity( + session: AsyncSession, + run: GenerationRun, + opportunity: ContentOpportunity, + existing_topic_summaries: list[str], + research_summaries: list[str], + existing_summaries: list[dict], +) -> bool: + """Stages 4–9 for a single opportunity. Returns True if draft created.""" + settings = get_settings() + run_id = str(run.id) + generation_mode = opportunity.opportunity_type + + # Normalise generation mode + mode_map = {"NewContent": "NewContent", "Refresh": "Refresh", "Expansion": "Expansion"} + gen_mode = mode_map.get(generation_mode, "NewContent") + + # ── Stage 4: Brief generation ─────────────────────────────────────────── + logger.info("[%s] Stage 4: Brief generation for '%s'", run_id, opportunity.topic) + brief_data = await ai_client.generate_content_brief( + topic=opportunity.topic, + content_type=opportunity.content_type, + target_keyword=opportunity.target_keyword, + opportunity_rationale=opportunity.rationale, + existing_content_summaries=existing_topic_summaries, + research_summaries=research_summaries, + generation_mode=gen_mode, + ) + if not brief_data: + logger.warning("[%s] Brief generation failed for '%s'", run_id, opportunity.topic) + opportunity.status = "Rejected" + await session.commit() + return False + + brief = ContentBrief( + id=uuid.uuid4(), + opportunity_id=opportunity.id, + run_id=run.id, + topic=brief_data.get("topic", opportunity.topic), + search_intent=brief_data.get("search_intent"), + target_keyword=brief_data.get("target_keyword"), + secondary_keywords=brief_data.get("secondary_keywords", []), + suggested_internal_links=brief_data.get("suggested_internal_links", []), + suggested_external_sources=brief_data.get("suggested_external_sources", []), + cta_opportunities=brief_data.get("cta_opportunities", []), + content_type=opportunity.content_type, + recommended_word_count=brief_data.get("recommended_word_count"), + outline=brief_data.get("outline"), + generation_mode=gen_mode, + prompt_version=brief_data.get("prompt_version", settings.prompt_version), + model_used=brief_data.get("model_used", settings.content_creator_ai_model), + ) + session.add(brief) + opportunity.status = "BriefCreated" + await session.commit() + + # ── Stage 5: Draft generation ─────────────────────────────────────────── + logger.info("[%s] Stage 5: Draft generation for '%s'", run_id, opportunity.topic) + + # For refresh, fetch existing content markdown + existing_markdown = None + source_content_id = None + if gen_mode == "Refresh" and opportunity.existing_content_id: + source_content_id = str(opportunity.existing_content_id) + # Fetch existing content for refresh context + existing_item = next( + (i for i in existing_summaries if str(i.get("id")) == source_content_id), + None, + ) + # existing_markdown would come from a detailed fetch; omitted for brevity + + draft_data = await ai_client.generate_draft_content( + brief=dict(brief_data, content_type=opportunity.content_type), + generation_mode=gen_mode, + existing_content_markdown=existing_markdown, + ) + if not draft_data or not draft_data.get("content_markdown"): + logger.warning("[%s] Draft generation failed for '%s'", run_id, opportunity.topic) + opportunity.status = "Rejected" + await session.commit() + return False + + content_markdown = draft_data["content_markdown"] + title = draft_data.get("title", opportunity.topic) + prompt_version = draft_data.get("prompt_version", settings.prompt_version) + model_used = draft_data.get("model_used", settings.content_creator_ai_model) + + # ── Stage 6: Link intelligence ────────────────────────────────────────── + logger.info("[%s] Stage 6: Link audit for '%s'", run_id, title) + links = link_auditor.extract_links_from_markdown(content_markdown) + if links: + audit_records = await link_auditor.audit_links( + links, + run_id=run_id, + brief_id=str(brief.id), + ) + for audit_data in audit_records: + audit = ContentLinkAudit(**audit_data) + session.add(audit) + + # ── Stage 7: Conversion optimisation validation ───────────────────────── + logger.info("[%s] Stage 7: Conversion check for '%s'", run_id, title) + passes_conversion = _validate_conversion_rules(content_markdown) + if not passes_conversion: + logger.warning( + "[%s] '%s' fails conversion rules; applying correction note in editorial_notes", + run_id, title, + ) + + # ── Stage 8: Quality review ───────────────────────────────────────────── + logger.info("[%s] Stage 8: Quality review for '%s'", run_id, title) + review_data = await ai_client.review_draft_quality( + title=title, + content_markdown=content_markdown, + content_type=opportunity.content_type, + target_keyword=brief_data.get("target_keyword"), + generation_mode=gen_mode, + ) + + seo_score = int(review_data.get("seo_score", 0)) + trust_score = int(review_data.get("trust_score", 0)) + readability_score = int(review_data.get("readability_score", 0)) + conversion_score = int(review_data.get("conversion_score", 0)) + composite_quality = (seo_score + trust_score + readability_score + conversion_score) // 4 + + first_mention = review_data.get("curvit_first_mention_position") + direct_refs = review_data.get("direct_product_references") + + # Conversion rule compliance (Stage 7 + Stage 8 combined) + conversion_rule_pass = ( + passes_conversion + and (first_mention is None or first_mention >= 0.25) + and (direct_refs is None or direct_refs <= 2) + ) + + rejected = composite_quality < _MIN_COMPOSITE_QUALITY or not conversion_rule_pass + rejection_reason = None + if rejected: + rejection_reason = f"Composite quality score {composite_quality} below threshold {_MIN_COMPOSITE_QUALITY}" + if not conversion_rule_pass: + rejection_reason += "; conversion rules violated" + + quality_review = ContentQualityReview( + id=uuid.uuid4(), + brief_id=brief.id, + run_id=run.id, + seo_score=seo_score, + trust_score=trust_score, + readability_score=readability_score, + conversion_score=conversion_score, + composite_score=composite_quality, + curvit_first_mention_position=first_mention, + direct_product_references=direct_refs, + passes_conversion_rules=conversion_rule_pass, + rejected=rejected, + rejection_reason=rejection_reason, + prompt_version=review_data.get("prompt_version", settings.prompt_version), + model_used=review_data.get("model_used", settings.content_creator_ai_model), + generation_mode=gen_mode, + review_notes=review_data.get("review_notes"), + ) + session.add(quality_review) + + if rejected: + logger.warning("[%s] Draft '%s' rejected: %s", run_id, title, rejection_reason) + opportunity.status = "Rejected" + await session.commit() + return False + + # ── Stage 9: Draft creation ───────────────────────────────────────────── + logger.info("[%s] Stage 9: Creating CMS draft for '%s'", run_id, title) + item_id = await cms_client.create_draft_content_item( + run_id=run_id, + content_type=opportunity.content_type, + title=title, + slug=_make_slug(title), + content_markdown=content_markdown, + excerpt=draft_data.get("excerpt"), + seo_title=draft_data.get("seo_title"), + seo_description=draft_data.get("seo_description"), + target_keyword=brief_data.get("target_keyword"), + search_intent=brief_data.get("search_intent"), + reading_time_minutes=draft_data.get("reading_time_minutes"), + editorial_notes=draft_data.get("editorial_notes"), + prompt_version=prompt_version, + model_used=model_used, + generation_mode=gen_mode, + source_content_id=source_content_id, + ) + + if item_id: + # Update quality review with CMS item ID + quality_review.content_item_id = _try_uuid(item_id) + opportunity.status = "DraftCreated" + await session.commit() + return True + + logger.error("[%s] Failed to save draft for '%s' to CMS", run_id, title) + opportunity.status = "Rejected" + await session.commit() + return False + + +def _validate_conversion_rules(content_markdown: str) -> bool: + """Stage 7: Check conversion rules. + + Rules: + - Curvit not mentioned in first 25% of content. + - Maximum two direct product references. + - No aggressive sales language. + """ + import re + lower = content_markdown.lower() + length = len(lower) + if length == 0: + return True + + curvit_positions = [m.start() for m in re.finditer(r"curvit", lower)] + if curvit_positions: + first_pos_fraction = curvit_positions[0] / length + if first_pos_fraction < 0.25: + return False + if len(curvit_positions) > 2: + return False + + # Check for aggressive sales language + aggressive_patterns = [ + r"\bbuy now\b", + r"\bsign up now\b", + r"\blimited time\b", + r"\bact now\b", + r"\bdon't miss out\b", + r"\bexclusive offer\b", + ] + for pattern in aggressive_patterns: + if re.search(pattern, lower): + return False + + return True + + +def _make_slug(title: str) -> str: + """Generate a URL-safe slug from a title.""" + import re + slug = title.lower() + slug = re.sub(r"[^a-z0-9\s-]", "", slug) + slug = re.sub(r"\s+", "-", slug).strip("-") + return slug[:200] or "content" diff --git a/services/content-creator/internal_auth.py b/services/content-creator/internal_auth.py new file mode 100644 index 00000000..3fd506d3 --- /dev/null +++ b/services/content-creator/internal_auth.py @@ -0,0 +1,10 @@ +"""Compatibility re-export for shared internal API authentication.""" + +from shared.internal_auth import ( # noqa: F401 + CorrelationIdMiddleware, + HEADER_NAME, + InternalApiKeyMiddleware, + require_internal_api_key, +) + +__all__ = ["HEADER_NAME", "CorrelationIdMiddleware", "InternalApiKeyMiddleware", "require_internal_api_key"] diff --git a/services/content-creator/requirements.in b/services/content-creator/requirements.in new file mode 100644 index 00000000..e7fec480 --- /dev/null +++ b/services/content-creator/requirements.in @@ -0,0 +1,13 @@ +fastapi==0.136.1 +uvicorn[standard]==0.47.0 +pydantic==2.13.4 +pydantic-settings==2.14.1 +sqlalchemy[asyncio]==2.0.49 +asyncpg==0.31.0 +aiosqlite==0.22.1 +prometheus-fastapi-instrumentator==7.1.0 +PyJWT[crypto]==2.13.0 +httpx==0.28.1 +anthropic==0.104.1 +pytest==9.0.3 +pytest-asyncio==1.3.0 diff --git a/services/content-creator/requirements.txt b/services/content-creator/requirements.txt new file mode 100644 index 00000000..402c3017 --- /dev/null +++ b/services/content-creator/requirements.txt @@ -0,0 +1,135 @@ +# +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: +# +# pip-compile --output-file=requirements.txt --strip-extras requirements.in +# +aiosqlite==0.22.1 + # via -r requirements.in +annotated-doc==0.0.4 + # via fastapi +annotated-types==0.7.0 + # via pydantic +anthropic==0.104.1 + # via -r requirements.in +anyio==4.13.0 + # via + # anthropic + # httpx + # starlette + # watchfiles +asyncpg==0.31.0 + # via -r requirements.in +certifi==2026.4.22 + # via + # anthropic + # httpcore + # httpx +cffi==2.0.0 + # via cryptography +click==8.3.3 + # via uvicorn +cryptography==48.0.0 + # via pyjwt +distro==1.9.0 + # via anthropic +fastapi==0.136.1 + # via -r requirements.in +greenlet==3.5.0 + # via sqlalchemy +h11==0.16.0 + # via + # httpcore + # uvicorn +httpcore==1.0.9 + # via + # anthropic + # httpx +httptools==0.7.1 + # via uvicorn +httpx==0.28.1 + # via + # -r requirements.in + # anthropic +idna==3.15 + # via + # anyio + # httpx +iniconfig==2.3.0 + # via pytest +jiter==0.14.0 + # via anthropic +packaging==26.2 + # via pytest +pluggy==1.6.0 + # via pytest +prometheus-client==0.25.0 + # via prometheus-fastapi-instrumentator +prometheus-fastapi-instrumentator==7.1.0 + # via -r requirements.in +pycparser==3.0 + # via cffi +pydantic==2.13.4 + # via + # -r requirements.in + # anthropic + # fastapi + # pydantic-settings +pydantic-core==2.46.4 + # via pydantic +pydantic-settings==2.14.1 + # via -r requirements.in +pygments==2.20.0 + # via pytest +pyjwt==2.13.0 + # via + # -r requirements.in + # pyjwt +pytest==9.0.3 + # via + # -r requirements.in + # pytest-asyncio +pytest-asyncio==1.3.0 + # via -r requirements.in +python-dotenv==1.2.2 + # via + # pydantic-settings + # uvicorn +pyyaml==6.0.3 + # via uvicorn +sniffio==1.3.1 + # via + # anthropic + # anyio +sqlalchemy==2.0.49 + # via -r requirements.in +starlette==0.52.1 + # via + # fastapi + # prometheus-fastapi-instrumentator +tokenizers==0.21.1 + # via anthropic +typing-extensions==4.15.0 + # via + # anthropic + # anyio + # fastapi + # pydantic + # pydantic-core + # pytest-asyncio + # sqlalchemy + # starlette + # typing-inspection +typing-inspection==0.4.2 + # via + # fastapi + # pydantic + # pydantic-settings +uvicorn==0.47.0 + # via -r requirements.in +uvloop==0.22.1 + # via uvicorn +watchfiles==1.1.1 + # via uvicorn +websockets==16.0 + # via uvicorn diff --git a/services/content-creator/tests/__init__.py b/services/content-creator/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/content-creator/tests/conftest.py b/services/content-creator/tests/conftest.py new file mode 100644 index 00000000..4e7342cc --- /dev/null +++ b/services/content-creator/tests/conftest.py @@ -0,0 +1,56 @@ +"""Pytest configuration and fixtures for content-creator tests.""" +import os +import sys +from pathlib import Path + +SERVICE_ROOT = os.path.dirname(os.path.dirname(__file__)) +REPO_ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, SERVICE_ROOT) +sys.path.insert(1, str(REPO_ROOT)) + +import pytest_asyncio +import httpx +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker + +from app.main import app +from app.db import get_session +from app.models.db_models import Base + + +def _create_tables(conn): + Base.metadata.create_all(conn) + + +@pytest_asyncio.fixture +async def test_db(): + """In-memory SQLite database for tests.""" + engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False) + async with engine.begin() as conn: + await conn.run_sync(_create_tables) + + async_session_maker = async_sessionmaker( + engine, class_=AsyncSession, expire_on_commit=False + ) + yield async_session_maker + await engine.dispose() + + +@pytest_asyncio.fixture +async def client(test_db): + """HTTP test client with database dependency overridden.""" + async def override_get_session(): + async with test_db() as session: + yield session + + app.dependency_overrides[get_session] = override_get_session + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as test_client: # noqa: S5332 + yield test_client + app.dependency_overrides.clear() + + +@pytest_asyncio.fixture +async def db_session(test_db): + """Bare AsyncSession for direct model tests.""" + async with test_db() as session: + yield session diff --git a/services/content-creator/tests/test_routes.py b/services/content-creator/tests/test_routes.py new file mode 100644 index 00000000..e53dfec6 --- /dev/null +++ b/services/content-creator/tests/test_routes.py @@ -0,0 +1,394 @@ +"""Integration tests for the content-creator admin API routes.""" +import uuid +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch + +import pytest +import pytest_asyncio + + +@pytest.mark.asyncio +class TestHealthEndpoints: + async def test_health_returns_healthy(self, client): + response = await client.get("/health") + assert response.status_code == 200 + assert response.json() == {"status": "healthy"} + + async def test_ready_returns_ready(self, client): + response = await client.get("/ready") + assert response.status_code == 200 + assert response.json() == {"status": "ready"} + + async def test_health_cache_control(self, client): + response = await client.get("/health") + assert "no-store" in response.headers.get("cache-control", "") + + async def test_ready_cache_control(self, client): + response = await client.get("/ready") + assert "no-store" in response.headers.get("cache-control", "") + + +@pytest.mark.asyncio +class TestAdminRunsEndpoint: + async def test_list_runs_empty(self, client): + response = await client.get( + "/admin/runs", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 0 + assert data["items"] == [] + + async def test_get_run_not_found(self, client): + run_id = str(uuid.uuid4()) + response = await client.get( + f"/admin/runs/{run_id}", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 404 + + async def test_get_run_invalid_id(self, client): + response = await client.get( + "/admin/runs/not-a-uuid", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 400 + + +@pytest.mark.asyncio +class TestAdminResearchEndpoint: + async def test_list_research_empty(self, client): + response = await client.get( + "/admin/research", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 0 + + async def test_list_research_with_data(self, client, db_session): + from app.models.db_models import ResearchDigest + digest = ResearchDigest( + id=uuid.uuid4(), + source_name="GOV.UK", + source_url="https://gov.uk/test", + headline="Test headline", + relevance_score=0.9, + captured_at=datetime.now(timezone.utc), + ) + db_session.add(digest) + await db_session.commit() + + response = await client.get( + "/admin/research", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["source_name"] == "GOV.UK" + assert data["items"][0]["relevance_score"] == 0.9 + + +@pytest.mark.asyncio +class TestAdminQualityEndpoint: + async def test_list_quality_empty(self, client): + response = await client.get( + "/admin/quality", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + assert response.json()["total"] == 0 + + async def test_list_quality_with_data(self, client, db_session): + from app.models.db_models import ContentQualityReview + review = ContentQualityReview( + id=uuid.uuid4(), + seo_score=75, + trust_score=80, + readability_score=70, + conversion_score=65, + composite_score=72, + passes_conversion_rules=True, + rejected=False, + reviewed_at=datetime.now(timezone.utc), + ) + db_session.add(review) + await db_session.commit() + + response = await client.get( + "/admin/quality", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["seo_score"] == 75 + assert data["items"][0]["rejected"] is False + + async def test_filter_rejected_only(self, client, db_session): + from app.models.db_models import ContentQualityReview + accepted = ContentQualityReview( + id=uuid.uuid4(), + seo_score=80, trust_score=80, readability_score=80, + conversion_score=80, composite_score=80, + passes_conversion_rules=True, rejected=False, + reviewed_at=datetime.now(timezone.utc), + ) + rejected = ContentQualityReview( + id=uuid.uuid4(), + seo_score=30, trust_score=30, readability_score=30, + conversion_score=30, composite_score=30, + passes_conversion_rules=True, rejected=True, + rejection_reason="Below threshold", + reviewed_at=datetime.now(timezone.utc), + ) + db_session.add(accepted) + db_session.add(rejected) + await db_session.commit() + + response = await client.get( + "/admin/quality?rejected_only=true", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["rejected"] is True + + +@pytest.mark.asyncio +class TestAdminLinksEndpoint: + async def test_list_links_empty(self, client): + response = await client.get( + "/admin/links", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + assert response.json()["total"] == 0 + + async def test_list_links_with_data(self, client, db_session): + from app.models.db_models import ContentLinkAudit + audit = ContentLinkAudit( + id=uuid.uuid4(), + link_url="https://www.gov.uk/test", + link_type="External", + anchor_text="GOV.UK guidance", + is_available=True, + http_status=200, + is_authoritative=True, + anchor_quality="Good", + audited_at=datetime.now(timezone.utc), + ) + db_session.add(audit) + await db_session.commit() + + response = await client.get( + "/admin/links", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["link_type"] == "External" + assert data["items"][0]["is_authoritative"] is True + + async def test_filter_issues_only(self, client, db_session): + from app.models.db_models import ContentLinkAudit + good_link = ContentLinkAudit( + id=uuid.uuid4(), + link_url="https://www.gov.uk/good", + link_type="External", + is_available=True, + anchor_quality="Good", + audited_at=datetime.now(timezone.utc), + ) + bad_link = ContentLinkAudit( + id=uuid.uuid4(), + link_url="https://broken.example.com/404", + link_type="External", + is_available=False, + http_status=404, + issue="HTTP 404", + anchor_quality="Weak", + audited_at=datetime.now(timezone.utc), + ) + db_session.add(good_link) + db_session.add(bad_link) + await db_session.commit() + + response = await client.get( + "/admin/links?issues_only=true", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["issue"] == "HTTP 404" + + +@pytest.mark.asyncio +class TestAdminOpportunitiesEndpoint: + async def test_list_opportunities_empty(self, client): + response = await client.get( + "/admin/opportunities", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + assert response.json()["total"] == 0 + + async def test_list_opportunities_with_data(self, client, db_session): + from app.models.db_models import ContentOpportunity + opp = ContentOpportunity( + id=uuid.uuid4(), + opportunity_type="NewContent", + content_type="Guide", + topic="ATS CV guide", + target_keyword="ats cv uk", + composite_score=0.82, + status="Pending", + created_at=datetime.now(timezone.utc), + ) + db_session.add(opp) + await db_session.commit() + + response = await client.get( + "/admin/opportunities", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["topic"] == "ATS CV guide" + + +@pytest.mark.asyncio +class TestAdminRecommendationsEndpoint: + async def test_recommendations_empty(self, client): + response = await client.get( + "/admin/recommendations", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + assert response.json() == [] + + async def test_recommendations_with_high_quality(self, client, db_session): + from app.models.db_models import ContentQualityReview + item_id = uuid.uuid4() + review = ContentQualityReview( + id=uuid.uuid4(), + content_item_id=item_id, + seo_score=85, + trust_score=90, + readability_score=88, + conversion_score=80, + composite_score=85, + passes_conversion_rules=True, + rejected=False, + reviewed_at=datetime.now(timezone.utc), + ) + db_session.add(review) + await db_session.commit() + + response = await client.get( + "/admin/recommendations", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + recs = response.json() + assert len(recs) == 1 + assert recs[0]["recommendation"] == "Publish" + + +@pytest.mark.asyncio +class TestAdminQueueEndpoint: + async def test_queue_empty(self, client): + response = await client.get( + "/admin/queue", + headers={"X-Internal-Api-Key": "test-key"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] == 0 + + +@pytest.mark.asyncio +class TestAdHocGenerateEndpoint: + async def test_invalid_generation_mode(self, client): + response = await client.post( + "/admin/generate", + headers={"X-Internal-Api-Key": "test-key"}, + json={"generation_mode": "InvalidMode"}, + ) + assert response.status_code == 422 + + async def test_invalid_content_type(self, client): + response = await client.post( + "/admin/generate", + headers={"X-Internal-Api-Key": "test-key"}, + json={"generation_mode": "NewContent", "content_type": "Invalid"}, + ) + assert response.status_code == 422 + + async def test_valid_adhoc_request_accepted(self, client): + """A valid ad-hoc request should return 200 Accepted immediately.""" + with patch( + "app.routers.admin._run_adhoc_in_background", + new_callable=AsyncMock, + ): + response = await client.post( + "/admin/generate", + headers={"X-Internal-Api-Key": "test-key"}, + json={ + "generation_mode": "NewContent", + "content_type": "Guide", + "target_count": 1, + "topic_hint": "UK ATS guide", + }, + ) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "Accepted" + assert "run_id" in data + + async def test_target_count_boundary(self, client): + """target_count must be 1–5.""" + response = await client.post( + "/admin/generate", + headers={"X-Internal-Api-Key": "test-key"}, + json={"generation_mode": "Mixed", "target_count": 0}, + ) + assert response.status_code == 422 + + response = await client.post( + "/admin/generate", + headers={"X-Internal-Api-Key": "test-key"}, + json={"generation_mode": "Mixed", "target_count": 6}, + ) + assert response.status_code == 422 + + async def test_background_uses_response_run_id(self): + from app.routers.admin import _run_adhoc_in_background + + class _SessionMaker: + def __call__(self): + return self + + async def __aenter__(self): + return object() + + async def __aexit__(self, exc_type, exc, tb): + return False + + with ( + patch("app.db.get_session_maker", return_value=_SessionMaker()), + patch("app.routers.admin.run_generation_cycle", new_callable=AsyncMock) as mock_run, + ): + await _run_adhoc_in_background( + run_id="a90f780d-a9d3-4e62-bf8f-3ef1eca7f21d", + generation_mode="NewContent", + parameters={"target_count": 1}, + ) + + assert mock_run.await_args.kwargs["run_id"] == "a90f780d-a9d3-4e62-bf8f-3ef1eca7f21d" diff --git a/services/content-creator/tests/test_unit.py b/services/content-creator/tests/test_unit.py new file mode 100644 index 00000000..912befb2 --- /dev/null +++ b/services/content-creator/tests/test_unit.py @@ -0,0 +1,285 @@ +"""Unit tests for the content-creator service.""" +import uuid +from datetime import datetime, timezone, timedelta +from urllib.parse import urlparse +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.models.db_models import ( + ContentLinkAudit, + ContentOpportunity, + ContentQualityReview, + GenerationRun, + ResearchDigest, +) +from app.services.link_auditor import ( + _assess_anchor_quality, + _is_authoritative, + _is_safe_outbound_url, + _check_url, + extract_links_from_markdown, +) +from app.services.workflow import ( + _compute_composite, + _make_slug, + _validate_conversion_rules, + _select_blog_topic, + _try_uuid, +) +from app.scheduler import _next_run_time + + +# ── Scheduler tests ─────────────────────────────────────────────────────────── + +class TestNextRunTime: + def test_next_monday_from_tuesday(self): + """From Tuesday, next run should be the following Monday.""" + # 2026-05-26 is Tuesday + now = datetime(2026, 5, 26, 12, 0, 0, tzinfo=timezone.utc) + next_run = _next_run_time(now) + assert next_run.weekday() == 0 # Monday + assert next_run.hour == 1 + assert next_run.minute == 0 + + def test_next_monday_from_monday_before_01(self): + """On Monday before 01:00, the run should be today at 01:00.""" + # 2026-05-25 is Monday + now = datetime(2026, 5, 25, 0, 30, 0, tzinfo=timezone.utc) + next_run = _next_run_time(now) + assert next_run.weekday() == 0 + assert next_run.date() == now.date() + assert next_run.hour == 1 + assert next_run.minute == 0 + + def test_next_monday_from_monday_after_01(self): + """On Monday after 01:00, the run should be next Monday.""" + now = datetime(2026, 5, 25, 2, 0, 0, tzinfo=timezone.utc) + next_run = _next_run_time(now) + assert next_run.weekday() == 0 + assert next_run.date() > now.date() + assert next_run.hour == 1 + + def test_next_run_is_in_future(self): + """The calculated next run time is always in the future.""" + now = datetime.now(timezone.utc) + next_run = _next_run_time(now) + assert next_run > now + + +# ── Workflow helper tests ───────────────────────────────────────────────────── + +class TestComputeComposite: + def test_all_zeros(self): + assert _compute_composite({ + "evergreen_seo_score": 0, + "product_alignment_score": 0, + "news_relevance_score": 0, + }) == 0.0 + + def test_all_ones(self): + score = _compute_composite({ + "evergreen_seo_score": 1.0, + "product_alignment_score": 1.0, + "news_relevance_score": 1.0, + }) + assert abs(score - 1.0) < 0.001 + + def test_weights_sum_to_one(self): + """Weights: 60% SEO + 25% product + 15% news = 100%.""" + score = _compute_composite({ + "evergreen_seo_score": 1.0, + "product_alignment_score": 0.0, + "news_relevance_score": 0.0, + }) + assert abs(score - 0.60) < 0.001 + + def test_seo_weighted_highest(self): + """Evergreen SEO has the highest weight (60%).""" + seo_only = _compute_composite({ + "evergreen_seo_score": 1.0, + "product_alignment_score": 0.0, + "news_relevance_score": 0.0, + }) + news_only = _compute_composite({ + "evergreen_seo_score": 0.0, + "product_alignment_score": 0.0, + "news_relevance_score": 1.0, + }) + assert seo_only > news_only + + def test_missing_keys_treated_as_zero(self): + score = _compute_composite({}) + assert score == 0.0 + + +class TestValidateConversionRules: + def test_no_curvit_reference_passes(self): + content = "This is a guide to writing CVs.\n\n## Why structure matters\n\nA well-structured CV..." + assert _validate_conversion_rules(content) is True + + def test_curvit_in_second_half_passes(self): + """Curvit mentioned after 25% mark is acceptable.""" + # 200 chars before Curvit mention, total ~210 chars → position ~95% + padding = "x" * 200 + content = f"{padding} Curvit is a platform that can help." + assert _validate_conversion_rules(content) is True + + def test_curvit_in_first_quarter_fails(self): + """Curvit in first 25% of content must fail.""" + content = "Curvit " + "x" * 300 + assert _validate_conversion_rules(content) is False + + def test_more_than_two_curvit_references_fails(self): + """More than two Curvit references must fail.""" + padding = "x" * 200 + content = f"{padding} Curvit. More Curvit. Even more Curvit." + assert _validate_conversion_rules(content) is False + + def test_aggressive_sales_language_fails(self): + content = "This is a great guide. Buy now and save!" + assert _validate_conversion_rules(content) is False + + def test_empty_content_passes(self): + assert _validate_conversion_rules("") is True + + +class TestMakeSlug: + def test_basic_slug(self): + slug = _make_slug("How to write a CV") + assert slug == "how-to-write-a-cv" + + def test_special_characters_removed(self): + slug = _make_slug("UK Recruitment: What You Need to Know!") + assert "!" not in slug + assert ":" not in slug + + def test_spaces_become_hyphens(self): + slug = _make_slug("Multiple Spaces Here") + assert "--" not in slug + + def test_max_length(self): + title = "a" * 300 + assert len(_make_slug(title)) <= 200 + + def test_empty_title_returns_content(self): + assert _make_slug("") == "content" + + +class TestTryUuid: + def test_valid_uuid(self): + uid = str(uuid.uuid4()) + result = _try_uuid(uid) + assert result is not None + assert str(result) == uid + + def test_invalid_uuid_returns_none(self): + assert _try_uuid("not-a-uuid") is None + + def test_none_returns_none(self): + assert _try_uuid(None) is None + + def test_empty_returns_none(self): + assert _try_uuid("") is None + + +class TestSelectBlogTopic: + def test_topic_hint_preferred(self): + topic = _select_blog_topic([], "IR35 changes") + assert "IR35" in topic + + def test_uses_first_research_item(self): + research = ["CIPD guidance on remote work published today"] + topic = _select_blog_topic(research, None) + assert "CIPD" in topic or "recruitment" in topic.lower() + + def test_fallback_when_no_research(self): + topic = _select_blog_topic([], None) + assert len(topic) > 0 + + +# ── Link auditor unit tests ─────────────────────────────────────────────────── + +class TestIsAuthoritative: + def test_gov_uk_is_authoritative(self): + assert _is_authoritative("https://www.gov.uk/employment-contracts") is True + + def test_cipd_is_authoritative(self): + assert _is_authoritative("https://www.cipd.co.uk/knowledge") is True + + def test_acas_is_authoritative(self): + assert _is_authoritative("https://www.acas.org.uk") is True + + def test_random_blog_not_authoritative(self): + assert _is_authoritative("https://randomblog.com/article") is False + + def test_linkedin_is_authoritative(self): + assert _is_authoritative("https://www.linkedin.com/business/talent") is True + + +class TestAssessAnchorQuality: + def test_good_descriptive_anchor(self): + assert _assess_anchor_quality("how to write an ATS-optimised CV") == "Good" + + def test_generic_click_here(self): + assert _assess_anchor_quality("click here") == "Generic" + + def test_generic_read_more(self): + assert _assess_anchor_quality("read more") == "Generic" + + def test_empty_anchor_is_weak(self): + assert _assess_anchor_quality("") == "Weak" + + def test_none_anchor_is_weak(self): + assert _assess_anchor_quality(None) == "Weak" + + def test_short_anchor_is_weak(self): + assert _assess_anchor_quality("AB") == "Weak" + + +class TestExtractLinksFromMarkdown: + def test_extracts_external_link(self): + md = "See [CIPD guidance](https://www.cipd.co.uk/guidance) for more." + links = extract_links_from_markdown(md) + assert len(links) == 1 + assert links[0]["url"] == "https://www.cipd.co.uk/guidance" + assert links[0]["type"] == "External" + assert links[0]["anchor"] == "CIPD guidance" + + def test_extracts_internal_link(self): + md = "Read our [CV guide](/guides/how-to-write-a-cv)." + links = extract_links_from_markdown(md, base_url="https://curvit.co.uk") + assert len(links) == 1 + assert links[0]["type"] == "Internal" + parsed = urlparse(links[0]["url"]) + assert parsed.scheme == "https" + assert parsed.hostname == "curvit.co.uk" + + def test_multiple_links(self): + md = "[GOV.UK](https://gov.uk) and [CIPD](https://cipd.co.uk)" + links = extract_links_from_markdown(md) + assert len(links) == 2 + + def test_no_links(self): + md = "This content has no hyperlinks." + assert extract_links_from_markdown(md) == [] + + def test_relative_non_absolute_skipped(self): + md = "[link](relative/path)" + assert extract_links_from_markdown(md) == [] + + +class TestLinkSafetyValidation: + def test_disallows_localhost_targets(self): + assert _is_safe_outbound_url("http://localhost:8000/health") is False + + def test_disallows_private_ip_targets(self): + assert _is_safe_outbound_url("http://10.0.0.15/admin") is False + + @pytest.mark.asyncio + async def test_unsafe_url_skips_network_call(self): + with patch("httpx.AsyncClient") as mock_client: + ok, status = await _check_url("http://127.0.0.1:8080/secret", timeout=5) + assert ok is False + assert status is None + mock_client.assert_not_called() diff --git a/services/content-creator/tests/test_workflow.py b/services/content-creator/tests/test_workflow.py new file mode 100644 index 00000000..cb1440b7 --- /dev/null +++ b/services/content-creator/tests/test_workflow.py @@ -0,0 +1,373 @@ +"""Workflow integration tests for the content-creator service.""" +import uuid +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio +import jwt + +from app.models.db_models import ( + ContentOpportunity, + ContentQualityReview, + GenerationRun, + ResearchDigest, +) +from app.services.workflow import ( + _compute_composite, + _generate_new_content_opportunities, + _validate_conversion_rules, + _make_slug, + _identify_opportunities, + _process_opportunity, +) + + +@pytest.mark.asyncio +class TestResearchDigestCreation: + async def test_research_digest_stored(self, db_session): + """Research digests are persisted to the database.""" + from app.services.workflow import _fetch_research_stubs + digests = await _fetch_research_stubs(session=db_session, run_id=str(uuid.uuid4())) + assert len(digests) >= 1 + for digest in digests: + assert digest.source_name + assert digest.source_url + assert digest.headline + + +@pytest.mark.asyncio +class TestGenerationRunLifecycle: + async def test_run_created_with_running_status(self, db_session): + """A generation run record is created with status=Running at start.""" + from sqlalchemy import select + + # Patch workflow internals to prevent actual AI calls + with ( + patch("app.services.workflow._execute_workflow", new_callable=AsyncMock) as mock_workflow, + ): + mock_workflow.return_value = (1, 0, "Test summary") + from app.services.workflow import run_generation_cycle + run = await run_generation_cycle( + session=db_session, + run_type="Scheduled", + generation_mode="Mixed", + triggered_by="test", + ) + + assert run.status == "Completed" + assert run.items_created == 1 + assert run.items_rejected == 0 + assert run.completed_at is not None + + async def test_run_marked_failed_on_exception(self, db_session): + """If the workflow raises, the run is marked Failed.""" + with patch( + "app.services.workflow._execute_workflow", + new_callable=AsyncMock, + side_effect=RuntimeError("Simulated failure"), + ): + from app.services.workflow import run_generation_cycle + run = await run_generation_cycle( + session=db_session, + run_type="Scheduled", + generation_mode="Mixed", + triggered_by="test", + ) + + assert run.status == "Failed" + assert run.error_message is not None + assert "Simulated failure" in run.error_message + + async def test_run_uses_supplied_run_id(self, db_session): + supplied_run_id = uuid.uuid4() + with patch("app.services.workflow._execute_workflow", new_callable=AsyncMock) as mock_workflow: + mock_workflow.return_value = (0, 0, "ok") + from app.services.workflow import run_generation_cycle + run = await run_generation_cycle( + session=db_session, + run_id=supplied_run_id, + run_type="AdHoc", + generation_mode="NewContent", + triggered_by="test", + ) + assert run.id == supplied_run_id + + +@pytest.mark.asyncio +class TestOpportunityIdentification: + async def test_new_content_opportunities_generated(self, db_session): + """New content opportunities are identified and stored.""" + run_id = str(uuid.uuid4()) + opps = await _identify_opportunities( + session=db_session, + run_id=run_id, + generation_mode="NewContent", + existing_summaries=[], + research_summaries=[], + target_count=2, + content_type_filter=None, + topic_hint=None, + ) + assert len(opps) <= 2 + for opp in opps: + assert opp.opportunity_type == "NewContent" + assert opp.content_type in {"Blog", "Guide", "Article"} + assert opp.composite_score is not None + + async def test_existing_topics_filtered_out(self, db_session): + """Opportunities matching existing keywords are excluded.""" + existing = [ + { + "id": str(uuid.uuid4()), + "title": "ATS CV Guide", + "content_type": "Guide", + "status": "Published", + "target_keyword": "ats optimised cv uk", + "updated_at": None, + "slug": "ats-cv-guide", + "content_source": "Manual", + } + ] + run_id = str(uuid.uuid4()) + opps = await _identify_opportunities( + session=db_session, + run_id=run_id, + generation_mode="NewContent", + existing_summaries=existing, + research_summaries=[], + target_count=5, + content_type_filter=None, + topic_hint=None, + ) + topics = [o.topic for o in opps] + # The ATS CV topic should not appear since the keyword already exists + assert not any("ats optimised cv uk" in (o.target_keyword or "") for o in opps) + + async def test_mixed_mode_produces_refresh_slot(self, db_session): + """Mixed mode tries to include a refresh opportunity.""" + existing = [ + { + "id": str(uuid.uuid4()), + "title": "Old guide", + "content_type": "Guide", + "status": "Published", + "target_keyword": "old topic", + "updated_at": "2024-01-01T00:00:00Z", + "slug": "old-guide", + "content_source": "Manual", + } + ] + with patch( + "app.services.ai_client.identify_refresh_candidates", + new_callable=AsyncMock, + return_value=[{ + "existing_content_id": existing[0]["id"], + "existing_content_slug": "old-guide", + "title": "Old guide", + "opportunity_type": "Refresh", + "refresh_signals": ["Stale content"], + "rationale": "Not updated in 18 months", + "priority_score": 0.7, + }], + ): + run_id = str(uuid.uuid4()) + opps = await _identify_opportunities( + session=db_session, + run_id=run_id, + generation_mode="Mixed", + existing_summaries=existing, + research_summaries=[], + target_count=4, + content_type_filter=None, + topic_hint=None, + ) + + types = [o.opportunity_type for o in opps] + assert "Refresh" in types + + async def test_expansion_mode_generates_opportunities(self, db_session): + run_id = str(uuid.uuid4()) + opps = await _identify_opportunities( + session=db_session, + run_id=run_id, + generation_mode="Expansion", + existing_summaries=[], + research_summaries=[], + target_count=2, + content_type_filter=None, + topic_hint=None, + ) + assert len(opps) == 2 + assert all(o.opportunity_type == "Expansion" for o in opps) + + +class TestGenerateNewContentOpportunities: + def test_returns_requested_count(self): + opps = _generate_new_content_opportunities( + existing_summaries=[], + research_summaries=[], + content_type_filter=None, + topic_hint=None, + count=2, + ) + assert len(opps) <= 2 + + def test_topic_hint_is_first(self): + opps = _generate_new_content_opportunities( + existing_summaries=[], + research_summaries=[], + content_type_filter=None, + topic_hint="My custom topic", + count=3, + ) + assert opps[0]["topic"] == "My custom topic" + + def test_content_type_filter_applied(self): + opps = _generate_new_content_opportunities( + existing_summaries=[], + research_summaries=[], + content_type_filter="Article", + topic_hint=None, + count=5, + ) + for opp in opps: + assert opp["content_type"] == "Article" + + def test_seo_weight_favours_evergreen(self): + """Evergreen SEO score should be highest weighted dimension.""" + opps = _generate_new_content_opportunities( + existing_summaries=[], + research_summaries=[], + content_type_filter=None, + topic_hint=None, + count=5, + ) + for opp in opps: + assert opp["evergreen_seo_score"] >= 0.5 # all generated opps are evergreen + + +@pytest.mark.asyncio +class TestDraftCreationSafety: + async def test_draft_never_auto_publishes(self, db_session): + """The CMS client must always set status=Draft — never Published.""" + from app.services.cms_client import create_draft_content_item + import httpx + + captured_payload = {} + + async def mock_post(url, headers=None, json=None, **kwargs): + captured_payload.update(json or {}) + mock_response = MagicMock() + mock_response.status_code = 201 + mock_response.raise_for_status = MagicMock() + mock_response.json.return_value = {"id": str(uuid.uuid4())} + return mock_response + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.post = mock_post + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client_class.return_value = mock_client + + await create_draft_content_item( + run_id=str(uuid.uuid4()), + content_type="Guide", + title="Test guide", + slug="test-guide", + content_markdown="# Test", + excerpt="Test excerpt", + seo_title="Test Guide | Curvit", + seo_description="A test guide", + target_keyword="test", + search_intent="informational", + reading_time_minutes=5, + editorial_notes="Test notes", + prompt_version="v1.0", + model_used="claude-haiku", + generation_mode="NewContent", + ) + + assert captured_payload.get("status") == "Draft" + assert captured_payload.get("ai_generated") is True + assert captured_payload.get("content_source") == "AiGenerated" + + async def test_conversion_rule_failure_rejects_even_with_high_quality_scores(self, db_session): + run = GenerationRun( + id=uuid.uuid4(), + run_type="Scheduled", + generation_mode="Mixed", + status="Running", + triggered_by="test", + started_at=datetime.now(timezone.utc), + items_created=0, + items_rejected=0, + ) + opportunity = ContentOpportunity( + id=uuid.uuid4(), + run_id=run.id, + opportunity_type="NewContent", + content_type="Guide", + topic="Test topic", + status="Pending", + ) + db_session.add(run) + db_session.add(opportunity) + await db_session.commit() + + with ( + patch("app.services.ai_client.generate_content_brief", new_callable=AsyncMock) as mock_brief, + patch("app.services.ai_client.generate_draft_content", new_callable=AsyncMock) as mock_draft, + patch("app.services.ai_client.review_draft_quality", new_callable=AsyncMock) as mock_review, + patch("app.services.cms_client.create_draft_content_item", new_callable=AsyncMock) as mock_create_draft, + ): + mock_brief.return_value = {"target_keyword": "test keyword", "content_type": "Guide"} + mock_draft.return_value = { + "title": "Draft Title", + "content_markdown": "Buy now for faster hiring results.", + } + mock_review.return_value = { + "seo_score": 95, + "trust_score": 95, + "readability_score": 95, + "conversion_score": 95, + "curvit_first_mention_position": 0.8, + "direct_product_references": 1, + } + + created = await _process_opportunity( + session=db_session, + run=run, + opportunity=opportunity, + existing_topic_summaries=[], + research_summaries=[], + existing_summaries=[], + ) + + assert created is False + assert opportunity.status == "Rejected" + mock_create_draft.assert_not_awaited() + + async def test_cms_headers_include_admin_bearer_token(self): + from app.services.cms_client import _cms_headers + + settings = MagicMock() + settings.internal_api_key = "internal-key" + settings.auth_secret = "test-secret-key-with-32-bytes-min!!" + settings.auth_issuer = "http://localhost:3000/" + settings.auth_audience = "curvit-api" + + with patch("app.services.cms_client.get_settings", return_value=settings): + headers = await _cms_headers() + + token = headers["Authorization"].replace("Bearer ", "", 1) + claims = jwt.decode( + token, + settings.auth_secret, + algorithms=["HS256"], + audience=settings.auth_audience, + issuer=settings.auth_issuer.rstrip("/"), + ) + assert headers["X-Internal-Api-Key"] == settings.internal_api_key + assert "Administrator" in claims["groups"] diff --git a/services/core-api/src/Curvit.Infrastructure/Migrations/20260530103000_AddContentCreatorTables.cs b/services/core-api/src/Curvit.Infrastructure/Migrations/20260530103000_AddContentCreatorTables.cs new file mode 100644 index 00000000..3b95097d --- /dev/null +++ b/services/core-api/src/Curvit.Infrastructure/Migrations/20260530103000_AddContentCreatorTables.cs @@ -0,0 +1,136 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Curvit.Infrastructure.Migrations +{ + /// + public partial class AddContentCreatorTables : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.Sql(""" + CREATE TABLE IF NOT EXISTS "ContentCreatorRuns" ( + "Id" uuid PRIMARY KEY, + "RunType" character varying(50) NOT NULL, + "GenerationMode" character varying(50) NOT NULL, + "Status" character varying(50) NOT NULL DEFAULT 'Running', + "TriggeredBy" character varying(200), + "Parameters" jsonb, + "StartedAt" timestamp with time zone NOT NULL DEFAULT NOW(), + "CompletedAt" timestamp with time zone, + "ItemsCreated" integer NOT NULL DEFAULT 0, + "ItemsRejected" integer NOT NULL DEFAULT 0, + "ErrorMessage" text, + "Summary" text + ); + + CREATE TABLE IF NOT EXISTS "ResearchDigests" ( + "Id" uuid PRIMARY KEY, + "RunId" uuid, + "SourceName" character varying(100) NOT NULL, + "SourceUrl" character varying(2048) NOT NULL, + "Headline" character varying(500) NOT NULL, + "Summary" text, + "PublicationDate" timestamp with time zone, + "RelevanceScore" double precision, + "Topics" jsonb, + "RawContent" text, + "CapturedAt" timestamp with time zone NOT NULL DEFAULT NOW(), + "UsedInOpportunityId" uuid + ); + + CREATE TABLE IF NOT EXISTS "ContentOpportunities" ( + "Id" uuid PRIMARY KEY, + "RunId" uuid, + "OpportunityType" character varying(50) NOT NULL, + "ContentType" character varying(50) NOT NULL, + "Topic" character varying(500) NOT NULL, + "TargetKeyword" character varying(200), + "Rationale" text, + "EvergreenSeoScore" double precision, + "ProductAlignmentScore" double precision, + "NewsRelevanceScore" double precision, + "CompositeScore" double precision, + "ExistingContentId" uuid, + "ExistingContentSlug" character varying(250), + "RefreshSignals" jsonb, + "Status" character varying(50) NOT NULL DEFAULT 'Pending', + "CreatedAt" timestamp with time zone NOT NULL DEFAULT NOW() + ); + + CREATE TABLE IF NOT EXISTS "ContentBriefs" ( + "Id" uuid PRIMARY KEY, + "OpportunityId" uuid NOT NULL, + "RunId" uuid, + "Topic" character varying(500) NOT NULL, + "SearchIntent" character varying(200), + "TargetKeyword" character varying(200), + "SecondaryKeywords" jsonb, + "SuggestedInternalLinks" jsonb, + "SuggestedExternalSources" jsonb, + "CtaOpportunities" jsonb, + "ContentType" character varying(50) NOT NULL, + "RecommendedWordCount" integer, + "Outline" text, + "GenerationMode" character varying(50) NOT NULL, + "PromptVersion" character varying(100), + "ModelUsed" character varying(100), + "GeneratedAt" timestamp with time zone NOT NULL DEFAULT NOW() + ); + + CREATE TABLE IF NOT EXISTS "ContentQualityReviews" ( + "Id" uuid PRIMARY KEY, + "ContentItemId" uuid, + "BriefId" uuid, + "RunId" uuid, + "SeoScore" integer NOT NULL, + "TrustScore" integer NOT NULL, + "ReadabilityScore" integer NOT NULL, + "ConversionScore" integer NOT NULL, + "CompositeQualityScore" integer NOT NULL, + "CurvitFirstMentionPosition" double precision, + "DirectProductReferences" integer, + "PassesConversionRules" boolean NOT NULL DEFAULT TRUE, + "Rejected" boolean NOT NULL DEFAULT FALSE, + "RejectionReason" text, + "PromptVersion" character varying(100), + "ModelUsed" character varying(100), + "GenerationMode" character varying(50), + "ReviewedAt" timestamp with time zone NOT NULL DEFAULT NOW(), + "ReviewNotes" text + ); + + CREATE TABLE IF NOT EXISTS "ContentLinkAudits" ( + "Id" uuid PRIMARY KEY, + "ContentItemId" uuid, + "BriefId" uuid, + "RunId" uuid, + "LinkUrl" character varying(2048) NOT NULL, + "LinkType" character varying(20) NOT NULL, + "AnchorText" character varying(500), + "IsAvailable" boolean, + "HttpStatus" integer, + "IsAuthoritative" boolean, + "IsRelevant" boolean, + "AnchorQuality" character varying(50), + "Issue" character varying(200), + "AuditedAt" timestamp with time zone NOT NULL DEFAULT NOW() + ); + + CREATE INDEX IF NOT EXISTS "IX_ContentCreatorRuns_StartedAt" ON "ContentCreatorRuns" ("StartedAt"); + CREATE INDEX IF NOT EXISTS "IX_ContentOpportunities_RunId" ON "ContentOpportunities" ("RunId"); + CREATE INDEX IF NOT EXISTS "IX_ContentBriefs_RunId" ON "ContentBriefs" ("RunId"); + CREATE INDEX IF NOT EXISTS "IX_ContentQualityReviews_RunId" ON "ContentQualityReviews" ("RunId"); + CREATE INDEX IF NOT EXISTS "IX_ContentLinkAudits_RunId" ON "ContentLinkAudits" ("RunId"); + CREATE INDEX IF NOT EXISTS "IX_ResearchDigests_RunId" ON "ResearchDigests" ("RunId"); + """); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + } + } +}