diff --git a/.devcontainer-features/peerbot-integration/devcontainer-feature.json b/.devcontainer-features/peerbot-integration/devcontainer-feature.json new file mode 100644 index 000000000..09edae74d --- /dev/null +++ b/.devcontainer-features/peerbot-integration/devcontainer-feature.json @@ -0,0 +1,20 @@ +{ + "id": "peerbot-integration", + "version": "1.0.0", + "name": "Peerbot Worker Integration", + "description": "Adds Peerbot worker packages and Claude Code CLI to devcontainer environment", + "documentationURL": "https://github.com/buremba/peerbot", + "options": { + "version": { + "type": "string", + "default": "latest", + "description": "Version of Claude Code CLI to install" + } + }, + "installsAfter": ["ghcr.io/devcontainers/features/common-utils"], + "postCreateCommand": [ + "npm install -g @anthropic-ai/claude-code", + "npm install -g /app/packages/worker", + "npm install -g /app/packages/shared" + ] +} \ No newline at end of file diff --git a/.devcontainer-features/peerbot-integration/install.sh b/.devcontainer-features/peerbot-integration/install.sh new file mode 100644 index 000000000..bf9193e71 --- /dev/null +++ b/.devcontainer-features/peerbot-integration/install.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +set -e + +# Peerbot Integration Feature Installation Script + +echo "Installing Peerbot Worker Integration..." + +# Install Claude Code CLI globally +echo "Installing Claude Code CLI..." +npm install -g "@anthropic-ai/claude-code@${VERSION:-latest}" + +# Verify installation +if ! command -v claude >/dev/null 2>&1; then + echo "❌ Error: Claude CLI installation failed" + exit 1 +fi + +# Setup git configuration for peerbot workers +echo "Configuring git for Peerbot workers..." +git config --global user.name "Claude Code Worker" 2>/dev/null || true +git config --global user.email "claude-code-worker@noreply.github.com" 2>/dev/null || true +git config --global init.defaultBranch main 2>/dev/null || true +git config --global pull.rebase false 2>/dev/null || true +git config --global safe.directory '*' 2>/dev/null || true + +# Create peerbot directories +mkdir -p /app/packages +mkdir -p /home/${_REMOTE_USER:-vscode}/.claude + +echo "✅ Peerbot Integration feature installed successfully" \ No newline at end of file diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d2f237016..898ab1286 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -113,10 +113,10 @@ graph TB ### How Conversations Continue 1. **Persistent Volume**: Single 10GB PVC shared across all workers -2. **User Isolation**: Each user gets `/workspace/user-{username}/` directory -3. **Claude Sessions**: All conversation history stored in `.claude/` subdirectory +2. **User Isolation**: Each user gets `/workspace/user-{username}/` directory +3. **Claude Sessions**: Global session data stored in `/workspace/.claude-sessions/` and symlinked to `~/.claude` 4. **Auto-Resume**: Workers use `claude --resume ` to continue conversations -5. **No Data Loss**: Data persists even when worker pods terminate +5. **No Data Loss**: Both project data and Claude sessions persist across worker pod restarts ### Background Process Management Workers include a background process management MCP server. @@ -124,25 +124,23 @@ Workers include a background process management MCP server. ### Directory Structure ``` /workspace/ # PVC mount point (K8s) or local workspaces dir (Docker) +├── .claude-sessions/ # Claude CLI global session data (symlinked from ~/.claude) +│ ├── projects/ # Project contexts across all threads +│ ├── sessions/ # Conversation history for all sessions +│ └── settings.mcp.json # MCP server configuration ├── U095ZLHKP98/ # Per-userId directory │ ├── 1756492073.980799/ # Per-thread workspace (threadId/timestamp) │ │ ├── .git/ # Cloned repository -│ │ ├── .claude/ # Claude session data (auto-resume support) -│ │ │ ├── projects/ # Project context -│ │ │ └── sessions/ # Conversation history │ │ └── [project files] # User's code from repository │ └── 1756491379.629309/ # Another thread workspace │ ├── .git/ -│ ├── .claude/ │ └── [project files] └── U09513HH1N1/ # Another user's workspace ├── 1756479858.121779/ # Thread-specific workspace │ ├── .git/ - │ ├── .claude/ │ └── [project files] └── 1756491388.020389/ # Another thread workspace ├── .git/ - ├── .claude/ └── [project files] ``` diff --git a/Dockerfile.worker b/Dockerfile.worker deleted file mode 100644 index 4f22ab496..000000000 --- a/Dockerfile.worker +++ /dev/null @@ -1,114 +0,0 @@ -FROM oven/bun:1.2.9 - -# Build argument to control dev/prod behavior -ARG BUILD_MODE=prod - -WORKDIR /app - -# Install runtime dependencies including Node.js -RUN apt-get update && apt-get install -y \ - git \ - curl \ - bash \ - openssh-client \ - jq \ - sudo \ - ca-certificates \ - procps \ - htop \ - net-tools \ - iputils-ping \ - && curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \ - && apt-get install -y nodejs \ - && rm -rf /var/lib/apt/lists/* \ - && ln -sf /bin/bash /bin/sh - -# Install cloudflared for multiple architectures -RUN ARCH=$(dpkg --print-architecture) && \ - case ${ARCH} in \ - amd64) CF_ARCH="amd64" ;; \ - arm64) CF_ARCH="arm64" ;; \ - armhf) CF_ARCH="arm" ;; \ - *) CF_ARCH="amd64" ;; \ - esac && \ - curl -L "https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-${CF_ARCH}" -o /usr/local/bin/cloudflared && \ - chmod +x /usr/local/bin/cloudflared - -# Create a non-root user with home directory and sudo access -RUN addgroup --gid 1001 claude && \ - adduser --system --uid 1001 --ingroup claude --home /home/claude claude && \ - echo 'claude ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers - -# Install Claude Code CLI globally and make it accessible to all users - using npm instead -RUN npm install -g @anthropic-ai/claude-code && \ - echo "Npm global packages:" && \ - npm list -g --depth=0 && \ - echo "Looking for claude binary..." && \ - which claude && \ - echo "Testing claude command..." && \ - claude --version - -# Ensure the non-root user can also access bun and claude -RUN echo 'export PATH="/home/bun/.bun/bin:$PATH"' >> /home/claude/.bashrc && \ - echo 'export BUN_INSTALL="/home/bun/.bun"' >> /home/claude/.bashrc - -# Copy package files -COPY package.json ./ -COPY tsconfig.json ./ -COPY packages/worker/package.json ./packages/worker/ -COPY packages/shared/package.json ./packages/shared/ -COPY packages/dispatcher/package.json ./packages/dispatcher/ -COPY packages/orchestrator/package.json ./packages/orchestrator/ - -# Install all dependencies including devDependencies for building -RUN bun install - -# Copy source code (needed for both dev and prod) -COPY packages/ ./packages/ -COPY scripts/ ./scripts/ - -# For production mode, build during image creation -# For dev mode, we'll build at startup to allow for live code changes -RUN if [ "$BUILD_MODE" = "prod" ]; then \ - echo "Building for production..."; \ - cd /app/packages/shared && bun run build; \ - cd /app/packages/worker && bun run build; \ - ls -la /app/packages/worker/dist/ || echo "No dist directory found"; \ - else \ - echo "Setting up for development mode..."; \ - fi - -WORKDIR /app - -# Create workspace directory and set permissions -RUN mkdir -p /workspace && \ - chown -R claude:claude /app && \ - chown -R claude:claude /workspace && \ - chmod 755 /workspace - -# Copy .claude/projects directory if it exists -COPY --chown=claude:claude .claude/ /home/claude/.claude/ - -# Ensure .claude directory structure exists for claude user and make scripts executable -RUN mkdir -p /home/claude/.claude/projects && \ - mkdir -p /home/claude/bin && \ - chown -R claude:claude /home/claude/.claude && \ - chown -R claude:claude /home/claude && \ - chmod -R 755 /home/claude/.claude && \ - echo 'export PATH="/home/claude/bin:$PATH"' >> /home/claude/.bashrc - -# Copy the entrypoint script and make it executable -COPY packages/worker/scripts/worker-entrypoint.sh /app/entrypoint.sh -RUN chmod +x /app/entrypoint.sh && \ - chown claude:claude /app/entrypoint.sh - -# Switch to non-root user -USER claude - -# Set working directory to worker -WORKDIR /app/packages/worker - -# Set the build mode as an environment variable -ENV BUILD_MODE=${BUILD_MODE} - -ENTRYPOINT ["/app/entrypoint.sh"] \ No newline at end of file diff --git a/Makefile b/Makefile index f86337362..97254151e 100644 --- a/Makefile +++ b/Makefile @@ -16,8 +16,8 @@ setup: @echo "🚀 Starting PeerBot development setup..." @./bin/setup-slack.sh -# Start local development -dev: build-worker +# Start local development +dev: @if [ ! -f .env ]; then \ echo "❌ .env file not found!"; \ echo ""; \ @@ -28,9 +28,8 @@ dev: build-worker fi @echo "🚀 Starting local development mode..." @echo " This will:" - @echo " - Build worker Docker image" @echo " - Start orchestrator and dispatcher with hot reload" - @echo " - Use Docker containers for workers" + @echo " - Use devcontainer-built images for workers (built on demand)" @echo "" @if grep -q "DEPLOYMENT_MODE=" .env 2>/dev/null; then \ DEPLOYMENT_MODE=$$(grep "DEPLOYMENT_MODE=" .env | cut -d'=' -f2); \ @@ -55,16 +54,10 @@ dev: build-worker bun --watch packages/orchestrator/src/index.ts & \ bun --watch packages/dispatcher/src/index.ts -# Build worker image for Docker mode +# Build base worker image for development (optional fallback) build-worker: - @echo "🔨 Building worker Docker image..." - @if [ "$$NODE_ENV" = "development" ]; then \ - echo "📦 Building development image with volume mounts..."; \ - docker build -f Dockerfile.worker.dev -t peerbot-worker:latest .; \ - else \ - echo "📦 Building production image..."; \ - docker build -f Dockerfile.worker -t peerbot-worker:latest .; \ - fi + @echo "ℹ️ Worker images are now built dynamically using devcontainers" + @echo " Base images will be built on-demand when workers are created" # Catch-all target to prevent errors when passing arguments %: diff --git a/packages/dispatcher/src/queue/slack-thread-processor.ts b/packages/dispatcher/src/queue/slack-thread-processor.ts index 9514754cc..4ffbd8a30 100644 --- a/packages/dispatcher/src/queue/slack-thread-processor.ts +++ b/packages/dispatcher/src/queue/slack-thread-processor.ts @@ -197,6 +197,10 @@ interface ThreadResponsePayload { originalMessageTs?: string; // User's original message timestamp for reactions gitBranch?: string; // Current git branch for Edit button URLs botResponseTs?: string; // Bot's response message timestamp for updates + envChanges?: boolean; // Flag indicating environment was modified + modifiedFiles?: string[]; // Files changed (e.g. [".devcontainer/devcontainer.json"]) + currentCommit?: string; // Current git HEAD after changes + repositoryUrl?: string; // Repository URL for rebuild context } /** @@ -340,6 +344,12 @@ export class ThreadResponseConsumer { await this.handleError(data, isFirstResponse, botMessageTs); } + // Handle environment changes - forward rebuild request to orchestrator + if (data.envChanges && data.currentCommit && data.repositoryUrl) { + logger.info(`Environment change detected for thread ${data.threadTs}: ${data.modifiedFiles?.join(', ')}`); + await this.handleEnvironmentChange(data); + } + // Log completion if (data.isDone) { logger.info(`Thread processing completed for message ${data.messageId}`); @@ -602,6 +612,49 @@ export class ThreadResponseConsumer { } } + /** + * Handle environment change by forwarding rebuild request to orchestrator + */ + private async handleEnvironmentChange(data: ThreadResponsePayload): Promise { + try { + // Create rebuild request message for orchestrator + const rebuildRequest = { + type: 'rebuild_request', + threadId: data.threadTs, + userId: data.userId, + channelId: data.channelId, + repositoryUrl: data.repositoryUrl, + commitId: data.currentCommit, + modifiedFiles: data.modifiedFiles, + originalMessageTs: data.originalMessageTs, + platformMetadata: { + teamId: '', // Will be filled in by dispatcher if needed + repositoryUrl: data.repositoryUrl, + botResponseTs: data.botResponseTs, + originalMessageTs: data.originalMessageTs + }, + timestamp: Date.now() + }; + + // Send rebuild request to orchestrator via messages queue + const jobId = await this.pgBoss.send('messages', rebuildRequest, { + retryLimit: 3, + retryDelay: 5000, + priority: 5 // High priority for rebuild requests + }); + + if (jobId) { + logger.info(`Sent rebuild request to orchestrator: job ${jobId} for thread ${data.threadTs}`); + } else { + logger.error(`Failed to send rebuild request for thread ${data.threadTs}: pgBoss returned null`); + } + + } catch (error) { + logger.error(`Failed to handle environment change for thread ${data.threadTs}:`, error); + // Don't throw - environment changes are not critical to message processing + } + } + /** * Check if consumer is running and healthy */ diff --git a/packages/orchestrator/package.json b/packages/orchestrator/package.json index edf3c79b1..dbc3b9f3c 100644 --- a/packages/orchestrator/package.json +++ b/packages/orchestrator/package.json @@ -10,6 +10,7 @@ "test": "bun test" }, "dependencies": { + "@devcontainers/cli": "^0.60.0", "@kubernetes/client-node": "^0.21.0", "@sentry/node": "^10.6.0", "dockerode": "^4.0.7", diff --git a/packages/orchestrator/src/base/BaseDeploymentManager.ts b/packages/orchestrator/src/base/BaseDeploymentManager.ts index dc7dafe55..df72aada6 100644 --- a/packages/orchestrator/src/base/BaseDeploymentManager.ts +++ b/packages/orchestrator/src/base/BaseDeploymentManager.ts @@ -2,6 +2,7 @@ import { DatabasePool } from '../db-connection-pool'; import { DatabaseManager } from '../db-operations'; import { BaseSecretManager } from './BaseSecretManager'; import { OrchestratorConfig, OrchestratorError, ErrorCode } from '../types'; +import { DevcontainerBuilder } from '../devcontainer-builder'; export interface DeploymentInfo { deploymentName: string; @@ -17,19 +18,21 @@ export interface DeploymentInfo { export abstract class BaseDeploymentManager { protected config: OrchestratorConfig; protected dbPool: DatabasePool; - protected databaseManager: DatabaseManager; + public databaseManager: DatabaseManager; protected secretManager: BaseSecretManager; + public devcontainerBuilder: DevcontainerBuilder; constructor(config: OrchestratorConfig, dbPool: DatabasePool, secretManager: BaseSecretManager) { this.config = config; this.dbPool = dbPool; this.databaseManager = new DatabaseManager(dbPool); this.secretManager = secretManager; + this.devcontainerBuilder = new DevcontainerBuilder(); } // Abstract methods that must be implemented by concrete classes abstract listDeployments(): Promise; - abstract createDeployment(deploymentName: string, username: string, userId: string, messageData?: any): Promise; + abstract createDeployment(deploymentName: string, username: string, userId: string, messageData?: any, imageName?: string): Promise; abstract scaleDeployment(deploymentName: string, replicas: number): Promise; abstract deleteDeployment(deploymentId: string): Promise; abstract updateDeploymentActivity(deploymentName: string): Promise; @@ -37,14 +40,19 @@ export abstract class BaseDeploymentManager { /** * Create worker deployment for handling messages */ - async createWorkerDeployment(userId: string, threadId: string, teamId?: string, messageData?: any): Promise { + async createWorkerDeployment( + userId: string, + threadId: string, + teamId?: string, + messageData?: any, + onProgress?: (message: string) => void + ): Promise { const deploymentName = `peerbot-worker-${threadId}`; try { // Always ensure user credentials exist first const username = this.databaseManager.generatePostgresUsername(userId); - // Check if secret already exists and get existing password, or generate new one await this.secretManager.getOrCreateUserCredentials(username, (username: string, password: string) => this.databaseManager.createPostgresUser(username, password)); @@ -58,7 +66,24 @@ export abstract class BaseDeploymentManager { return; } - await this.createDeployment(deploymentName, username, userId, messageData); + let imageName: string | undefined; + + // Build custom image if repository URL is provided + const repositoryUrl = messageData?.platformMetadata?.repositoryUrl; + if (repositoryUrl) { + try { + const buildResult = await this.devcontainerBuilder.build(repositoryUrl, onProgress); + imageName = buildResult.imageName; + + onProgress?.(`✅ Built custom image: ${buildResult.imageName} (${buildResult.hasDevcontainer ? 'devcontainer' : 'default'})`); + } catch (error) { + console.warn(`Failed to build custom image for ${repositoryUrl}, using default:`, error); + onProgress?.(`⚠️ Using default image (custom build failed)`); + // Continue with default image + } + } + + await this.createDeployment(deploymentName, username, userId, messageData, imageName); } catch (error) { throw new OrchestratorError( diff --git a/packages/orchestrator/src/devcontainer-builder.ts b/packages/orchestrator/src/devcontainer-builder.ts new file mode 100644 index 000000000..adc44ab39 --- /dev/null +++ b/packages/orchestrator/src/devcontainer-builder.ts @@ -0,0 +1,467 @@ +import { spawn } from 'child_process'; +import { promises as fs } from 'fs'; +import * as path from 'path'; +import * as crypto from 'crypto'; +import { OrchestratorError, ErrorCode } from './types'; + +export interface DevcontainerBuildResult { + imageName: string; + imageTag: string; + hasDevcontainer: boolean; + repoHash: string; + commitId: string; + repoUrlHash: string; +} + +export interface DevcontainerConfig { + name?: string; + image?: string; + build?: { + dockerfile?: string; + context?: string; + }; + features?: Record; + postCreateCommand?: string | string[]; + postStartCommand?: string | string[]; + customizations?: { + vscode?: { + extensions?: string[]; + settings?: Record; + }; + }; +} + +export class DevcontainerBuilder { + private tempDir: string; + + constructor(tempDir: string = '/tmp') { + this.tempDir = tempDir; + } + + /** + * Build worker image with full devcontainer support + */ + async build(repoUrl: string, onProgress?: (message: string) => void, commitId?: string): Promise { + const progress = (msg: string) => { + console.log(`[DevcontainerBuilder] ${msg}`); + onProgress?.(msg); + }; + + progress('🔧 Building your development environment...'); + + // Generate unique temp directory + const repoName = this.extractRepoName(repoUrl); + const tempRepoDir = path.join(this.tempDir, `peerbot-build-${repoName}-${Date.now()}`); + + try { + // Shallow clone repository + progress('📋 Applying devcontainer configuration...'); + await this.cloneRepository(repoUrl, tempRepoDir, commitId); + + // Get commit ID and generate hashes + const fullCommitId = commitId || await this.getCommitId(tempRepoDir); + const repoUrlHash = this.generateRepoUrlHash(repoUrl); + const repoHash = await this.generateRepoHash(tempRepoDir); + + // Use new snapshot naming scheme: peerbot-snapshot-{repoUrlHash}-{commitId} + const imageTag = `peerbot-snapshot-${repoUrlHash}-${fullCommitId}`; + + // Check if devcontainer exists + const devcontainerPath = path.join(tempRepoDir, '.devcontainer', 'devcontainer.json'); + const hasDevcontainer = await this.fileExists(devcontainerPath); + + if (hasDevcontainer) { + progress('📦 Installing project dependencies...'); + + // Parse devcontainer configuration + const config = await this.parseDevcontainerConfig(devcontainerPath); + + // Build using devcontainers CLI + const imageName = await this.buildWithDevcontainerCli(tempRepoDir, imageTag, config, repoUrl, fullCommitId); + + // Add peerbot integration to the built image + await this.addPeerbotIntegration(imageName, progress); + + progress('✅ Environment ready! Starting Claude Code...'); + + return { + imageName, + imageTag, + hasDevcontainer: true, + repoHash, + commitId: fullCommitId, + repoUrlHash + }; + } else { + // Use default Bun environment + progress('📦 Setting up default Bun environment...'); + + const imageName = await this.buildDefaultImage(tempRepoDir, imageTag, repoUrl, fullCommitId); + + progress('✅ Environment ready! Starting Claude Code...'); + + return { + imageName, + imageTag, + hasDevcontainer: false, + repoHash, + commitId: fullCommitId, + repoUrlHash + }; + } + } finally { + // Cleanup temp directory + try { + await fs.rm(tempRepoDir, { recursive: true }); + } catch (error) { + console.warn('Failed to cleanup temp directory:', error); + } + } + } + + /** + * Check if image already exists with given tag + */ + async imageExists(imageTag: string): Promise { + try { + const result = await this.runCommand('docker', ['images', '-q', imageTag]); + return result.stdout.trim().length > 0; + } catch { + return false; + } + } + + private async cloneRepository(repoUrl: string, targetDir: string, commitId?: string): Promise { + if (commitId) { + // Clone specific commit + await this.runCommand('git', ['clone', repoUrl, targetDir]); + await this.runCommand('git', ['checkout', commitId], targetDir); + } else { + // Shallow clone latest + await this.runCommand('git', [ + 'clone', + '--depth', '1', + '--single-branch', + repoUrl, + targetDir + ]); + } + } + + private async getCommitId(repoDir: string): Promise { + const result = await this.runCommand('git', ['rev-parse', 'HEAD'], repoDir); + return result.stdout.trim(); + } + + private generateRepoUrlHash(repoUrl: string): string { + return crypto.createHash('sha256').update(repoUrl).digest('hex').substring(0, 8); + } + + private async generateRepoHash(repoDir: string): Promise { + // Create hash based on repo content and devcontainer config + const gitRevision = await this.runCommand('git', ['rev-parse', 'HEAD'], repoDir); + const devcontainerPath = path.join(repoDir, '.devcontainer', 'devcontainer.json'); + + let devcontainerContent = ''; + if (await this.fileExists(devcontainerPath)) { + devcontainerContent = await fs.readFile(devcontainerPath, 'utf8'); + } + + const hashContent = `${gitRevision.stdout.trim()}:${devcontainerContent}`; + return crypto.createHash('sha256').update(hashContent).digest('hex').substring(0, 12); + } + + private async parseDevcontainerConfig(configPath: string): Promise { + try { + const content = await fs.readFile(configPath, 'utf8'); + // Remove JSON5 comments and parse + const jsonContent = content.replace(/\/\/.*$/gm, '').replace(/\/\*[\s\S]*?\*\//g, ''); + return JSON.parse(jsonContent); + } catch (error) { + throw new OrchestratorError( + ErrorCode.DEPLOYMENT_CREATE_FAILED, + `Failed to parse devcontainer.json: ${error instanceof Error ? error.message : String(error)}`, + { configPath, error } + ); + } + } + + private async buildWithDevcontainerCli( + repoDir: string, + imageTag: string, + config: DevcontainerConfig, + repoUrl: string, + commitId: string + ): Promise { + const imageName = `peerbot-worker:${imageTag}`; + + await this.runCommand('npx', [ + '@devcontainers/cli', + 'build', + '--workspace-folder', repoDir, + '--image-name', imageName, + '--cache-from', 'peerbot-worker:latest' + ], repoDir, { + timeout: 600000, // 10-minute timeout for complex builds + isolateNetwork: false // DevContainer CLI needs network for base images + }); + + // Add git metadata labels to the image + await this.addGitLabelsToImage(imageName, repoUrl, commitId); + + return imageName; + } + + private async buildDefaultImage(repoDir: string, imageTag: string, repoUrl: string, commitId: string): Promise { + // Create a minimal Dockerfile for default Bun environment + const dockerfile = ` +FROM oven/bun:1.2.9 + +# Install basic tools +RUN apt-get update && apt-get install -y \\ + git curl bash openssh-client jq sudo ca-certificates \\ + procps htop net-tools iputils-ping && \\ + curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \\ + apt-get install -y nodejs && \\ + rm -rf /var/lib/apt/lists/* + +# Create non-root user +RUN addgroup --gid 1001 claude && \\ + adduser --system --uid 1001 --ingroup claude --home /home/claude claude && \\ + echo 'claude ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers + +# Copy repository content +COPY . /workspace +WORKDIR /workspace +RUN chown -R claude:claude /workspace + +USER claude +`; + + const dockerfilePath = path.join(repoDir, 'Dockerfile.peerbot'); + await fs.writeFile(dockerfilePath, dockerfile); + + const imageName = `peerbot-worker:${imageTag}`; + await this.runCommand('docker', [ + 'build', + '-f', dockerfilePath, + '-t', imageName, + repoDir + ], undefined, { + timeout: 600000, // 10-minute timeout + isolateNetwork: false // Docker build needs network for base images + }); + + // Clean up dockerfile + await fs.unlink(dockerfilePath); + + // Add git metadata labels to the image + await this.addGitLabelsToImage(imageName, repoUrl, commitId); + + return imageName; + } + + private async addPeerbotIntegration(imageName: string, progress: (msg: string) => void): Promise { + progress('🔧 Adding Peerbot integration...'); + + // Create restricted build context (Option 3: Security) + const restrictedContextDir = path.join(this.tempDir, `peerbot-context-${Date.now()}`); + await fs.mkdir(restrictedContextDir, { recursive: true }); + + // Copy only necessary peerbot files to restricted context + const peerbotRoot = process.cwd(); + await this.copyDirectory(path.join(peerbotRoot, 'packages'), path.join(restrictedContextDir, 'packages')); + await this.copyDirectory(path.join(peerbotRoot, 'scripts'), path.join(restrictedContextDir, 'scripts')); + + // Create a new image layer with peerbot packages + const dockerfile = ` +FROM ${imageName} + +# Install Claude Code CLI if not already present +RUN if ! command -v claude >/dev/null 2>&1; then \\ + npm install -g @anthropic-ai/claude-code; \\ + fi + +# Copy peerbot packages from restricted context +COPY packages /app/packages +COPY scripts /app/scripts + +# Set up MCP configuration with persistent storage symlink +RUN USER_HOME=\\$(getent passwd \\$(whoami) | cut -d: -f6) && \\ + mkdir -p "/workspace/.claude-sessions" && \\ + mkdir -p "\\$USER_HOME" && \\ + ln -sf "/workspace/.claude-sessions" "\\$USER_HOME/.claude" && \\ + if [ -f "/app/packages/worker/mcp-config.json" ]; then \\ + cp /app/packages/worker/mcp-config.json "/workspace/.claude-sessions/settings.mcp.json"; \\ + fi + +# Install dependencies and build peerbot packages +RUN cd /app && \\ + if [ -f "package.json" ]; then bun install; fi && \\ + cd /app/packages/shared && bun run build && \\ + cd /app/packages/worker && bun run build && \\ + chmod +x /app/packages/worker/dist/mcp/process-manager-server.mjs 2>/dev/null || true + +# Set up git configuration +RUN git config --global user.name "Claude Code Worker" && \\ + git config --global user.email "claude-code-worker@noreply.github.com" && \\ + git config --global init.defaultBranch main && \\ + git config --global pull.rebase false && \\ + git config --global safe.directory '*' + +WORKDIR /workspace +`; + + const tempDockerfile = path.join(this.tempDir, `Dockerfile.peerbot-${Date.now()}`); + await fs.writeFile(tempDockerfile, dockerfile); + + try { + await this.runCommand('docker', [ + 'build', + '-f', tempDockerfile, + '-t', imageName, + restrictedContextDir // Use restricted context instead of full peerbot root + ], undefined, { + timeout: 600000, // 10-minute timeout + isolateNetwork: false // Docker build needs network for npm install + }); + } finally { + await fs.unlink(tempDockerfile); + // Cleanup restricted context + await fs.rm(restrictedContextDir, { recursive: true }).catch(() => {}); + } + } + + private async fileExists(filePath: string): Promise { + try { + await fs.access(filePath); + return true; + } catch { + return false; + } + } + + private extractRepoName(repoUrl: string): string { + const match = repoUrl.match(/\/([^\/]+?)(?:\.git)?$/); + return match?.[1] || 'unknown'; + } + + private async copyDirectory(src: string, dest: string): Promise { + try { + await fs.access(src); + await fs.mkdir(path.dirname(dest), { recursive: true }); + await fs.cp(src, dest, { recursive: true }); + } catch (error) { + // Skip if source doesn't exist (some repos might not have scripts folder) + console.warn(`Failed to copy ${src} to ${dest}:`, error); + } + } + + private async addGitLabelsToImage(imageName: string, repoUrl: string, commitId: string): Promise { + try { + // Extract branch name from git repo (will be 'HEAD' for detached) + const branchResult = await this.runCommand('docker', [ + 'run', '--rm', imageName, + 'git', 'rev-parse', '--abbrev-ref', 'HEAD' + ]); + const branch = branchResult.stdout.trim(); + + // Add labels with git metadata + const dockerfile = ` +FROM ${imageName} + +LABEL "peerbot.repository.url"="${repoUrl}" +LABEL "peerbot.repository.commit"="${commitId}" +LABEL "peerbot.repository.branch"="${branch}" +LABEL "peerbot.image.type"="snapshot" +LABEL "peerbot.build.timestamp"="${new Date().toISOString()}" +`; + + const tempDockerfile = path.join(this.tempDir, `Dockerfile.labels-${Date.now()}`); + await fs.writeFile(tempDockerfile, dockerfile); + + try { + await this.runCommand('docker', [ + 'build', + '-f', tempDockerfile, + '-t', imageName, + '.' + ], undefined, { + timeout: 300000, // 5-minute timeout for labels + isolateNetwork: true // Label builds don't need network + }); + } finally { + await fs.unlink(tempDockerfile); + } + } catch (error) { + console.warn('Failed to add git labels to image:', error); + // Don't throw - labels are not critical + } + } + + private async runCommand( + command: string, + args: string[], + cwd?: string, + options?: { timeout?: number; isolateNetwork?: boolean } + ): Promise<{ stdout: string; stderr: string }> { + const timeout = options?.timeout || 300000; // 5-minute default timeout + + return new Promise((resolve, reject) => { + // Create filtered environment (Option 1: Security) + const safeEnv = { + PATH: process.env.PATH || '', + HOME: process.env.HOME || '/tmp', + USER: process.env.USER || 'claude', + SHELL: process.env.SHELL || '/bin/bash', + TERM: process.env.TERM || 'xterm', + LANG: process.env.LANG || 'en_US.UTF-8' + }; + + // Add network isolation for builds (Option 1: Network isolation) + if (options?.isolateNetwork) { + safeEnv.no_proxy = '*'; + safeEnv.NO_PROXY = '*'; + safeEnv.HTTP_PROXY = ''; + safeEnv.HTTPS_PROXY = ''; + } + + const process = spawn(command, args, { + cwd, + stdio: 'pipe', + env: safeEnv + }); + + // Option 2: Build timeout protection + const timeoutHandle = setTimeout(() => { + process.kill('SIGKILL'); + reject(new Error(`Command timed out after ${timeout}ms: ${command} ${args.join(' ')}`)); + }, timeout); + + let stdout = ''; + let stderr = ''; + + process.stdout.on('data', (data) => { + stdout += data.toString(); + }); + + process.stderr.on('data', (data) => { + stderr += data.toString(); + }); + + process.on('close', (code) => { + clearTimeout(timeoutHandle); + if (code === 0) { + resolve({ stdout, stderr }); + } else { + reject(new Error(`Command failed: ${command} ${args.join(' ')}\nCode: ${code}\nStderr: ${stderr}`)); + } + }); + + process.on('error', (error) => { + clearTimeout(timeoutHandle); + reject(error); + }); + }); + } +} \ No newline at end of file diff --git a/packages/orchestrator/src/docker/DockerDeploymentManager.ts b/packages/orchestrator/src/docker/DockerDeploymentManager.ts index 4bc5bf41c..2e9320144 100644 --- a/packages/orchestrator/src/docker/DockerDeploymentManager.ts +++ b/packages/orchestrator/src/docker/DockerDeploymentManager.ts @@ -69,7 +69,7 @@ export class DockerDeploymentManager extends BaseDeploymentManager { } } - async createDeployment(deploymentName: string, username: string, userId: string, messageData?: any): Promise { + async createDeployment(deploymentName: string, username: string, userId: string, messageData?: any, imageName?: string): Promise { try { // Create workspace directory for this user (use absolute path) const workspaceDir = `${process.cwd()}/workspaces/${userId}`; @@ -98,7 +98,7 @@ export class DockerDeploymentManager extends BaseDeploymentManager { const createOptions: Docker.ContainerCreateOptions = { name: deploymentName, - Image: `${this.config.worker.image.repository}:${this.config.worker.image.tag}`, + Image: imageName || `${this.config.worker.image.repository}:${this.config.worker.image.tag}`, Env: envVars, Labels: { 'app.kubernetes.io/name': 'peerbot', diff --git a/packages/orchestrator/src/task-queue-consumer.ts b/packages/orchestrator/src/task-queue-consumer.ts index 98a14783a..ee12abf4a 100644 --- a/packages/orchestrator/src/task-queue-consumer.ts +++ b/packages/orchestrator/src/task-queue-consumer.ts @@ -114,9 +114,36 @@ export class QueueConsumer { try { const deploymentName = `peerbot-worker-${data.threadId}`; const isNewThread = !data.routingMetadata?.targetThreadId; // New thread if no parent thread + const isRebuildRequest = data.type === 'rebuild_request'; const teamId = data.platformMetadata?.teamId; - if (isNewThread) { + if (isRebuildRequest) { + // Handle rebuild request - recreate deployment with new image + console.log(`Rebuild request for thread ${data.threadId} - commit: ${data.commitId}`); + + await Sentry.startSpan( + { + name: "orchestrator.rebuild_worker_deployment", + op: "orchestrator.deployment_management", + attributes: { + "user.id": data.userId, + "thread.id": data.threadId, + "deployment.name": deploymentName, + "commit.id": data.commitId + } + }, + async () => { + // Create progress callback for status updates + const onProgress = async (message: string) => { + await this.sendStatusMessage(data, message); + }; + + await this.rebuildWorkerDeployment(deploymentName, data, onProgress); + } + ); + console.log(`✅ Rebuilt deployment: ${deploymentName}`); + + } else if (isNewThread) { // New thread - create deployment console.log(`New thread ${data.threadId} - creating deployment ${deploymentName}`); @@ -131,7 +158,12 @@ export class QueueConsumer { } }, async () => { - await this.deploymentManager.createWorkerDeployment(data.userId, data.threadId, teamId, data); + // Create progress callback for status updates + const onProgress = async (message: string) => { + await this.sendStatusMessage(data, message); + }; + + await this.deploymentManager.createWorkerDeployment(data.userId, data.threadId, teamId, data, onProgress); } ); console.log(`✅ Created deployment: ${deploymentName}`); @@ -149,7 +181,12 @@ export class QueueConsumer { } catch (error) { // Deployment doesn't exist, recreate it console.log(`Deployment ${deploymentName} doesn't exist, recreating...`); - await this.deploymentManager.createWorkerDeployment(data.userId, data.threadId, teamId, data); + + const onProgress = async (message: string) => { + await this.sendStatusMessage(data, message); + }; + + await this.deploymentManager.createWorkerDeployment(data.userId, data.threadId, teamId, data, onProgress); console.log(`✅ Recreated deployment: ${deploymentName}`); // Reconcile deployments after recreating @@ -157,21 +194,23 @@ export class QueueConsumer { } } - // Send message to worker queue - await Sentry.startSpan( - { - name: "orchestrator.send_to_worker_queue", - op: "orchestrator.message_routing", - attributes: { - "user.id": data.userId, - "thread.id": data.threadId, - "deployment.name": deploymentName + // Send message to worker queue (skip for rebuild requests) + if (!isRebuildRequest) { + await Sentry.startSpan( + { + name: "orchestrator.send_to_worker_queue", + op: "orchestrator.message_routing", + attributes: { + "user.id": data.userId, + "thread.id": data.threadId, + "deployment.name": deploymentName + } + }, + async () => { + await this.sendToWorkerQueue(data, deploymentName); } - }, - async () => { - await this.sendToWorkerQueue(data, deploymentName); - } - ); + ); + } // Update deployment activity annotation for simplified tracking await this.deploymentManager.updateDeploymentActivity(deploymentName); @@ -262,6 +301,64 @@ export class QueueConsumer { + /** + * Rebuild worker deployment with new image based on commit changes + */ + private async rebuildWorkerDeployment(deploymentName: string, data: any, onProgress?: (message: string) => void): Promise { + try { + onProgress?.('🔄 Environment changes detected, rebuilding...'); + + // Generate new image with specific commit + const buildResult = await this.deploymentManager.devcontainerBuilder.build( + data.repositoryUrl, + onProgress, + data.commitId + ); + + onProgress?.(`✅ Built new image: ${buildResult.imageName} (commit: ${data.commitId.substring(0, 7)})`); + + // Delete existing deployment + try { + const deployments = await this.deploymentManager.listDeployments(); + const existingDeployment = deployments.find(d => d.deploymentName === deploymentName); + + if (existingDeployment) { + await this.deploymentManager.deleteDeployment(existingDeployment.deploymentId); + onProgress?.('🗑️ Removed old deployment'); + } + } catch (deleteError) { + console.warn('Failed to delete existing deployment during rebuild:', deleteError); + } + + // Create new deployment with updated image + const username = this.deploymentManager.databaseManager.generatePostgresUsername(data.userId); + await this.deploymentManager.createDeployment(deploymentName, username, data.userId, data, buildResult.imageName); + + onProgress?.('✅ Deployment recreated with updated environment'); + + } catch (error) { + console.error(`Failed to rebuild deployment ${deploymentName}:`, error); + throw error; + } + } + + /** + * Send status message to Slack (placeholder implementation) + */ + private async sendStatusMessage(messageData: any, statusMessage: string): Promise { + try { + // For now, just log the status message + // In a real implementation, this would send a message to Slack + console.log(`[Status Update] Thread ${messageData.threadId}: ${statusMessage}`); + + // TODO: Implement actual Slack API call to update thread with status + // This could use the Slack Web API to post an ephemeral message or update the original message + } catch (error) { + console.error('Failed to send status message:', error); + // Don't throw - status messages are non-critical + } + } + /** * Get queue statistics */ diff --git a/packages/worker/package.json b/packages/worker/package.json index 62712deec..0dc7fccdb 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -13,6 +13,7 @@ "typecheck": "tsc --noEmit" }, "dependencies": { + "@anthropic-ai/claude-code": "^1.0.0", "@kubernetes/client-node": "^1.3.0", "@modelcontextprotocol/sdk": "^1.0.5", "@octokit/rest": "^21.1.1", diff --git a/packages/worker/scripts/worker-entrypoint.sh b/packages/worker/scripts/worker-entrypoint.sh index af4ab9039..e377e778c 100644 --- a/packages/worker/scripts/worker-entrypoint.sh +++ b/packages/worker/scripts/worker-entrypoint.sh @@ -1,19 +1,14 @@ #!/bin/bash set -e -# Container entrypoint script for Claude Worker -echo "🚀 Starting Claude Code Worker container..." +# Simplified Claude Code Worker entrypoint for devcontainer-built images +echo "🚀 Starting Claude Code Worker..." # Function to handle cleanup on exit cleanup() { - echo "📦 Container shutting down, performing cleanup..." - - # Kill any background processes + echo "📦 Container shutting down..." jobs -p | xargs -r kill || true - - # Give processes time to exit gracefully - sleep 2 - + sleep 1 echo "✅ Cleanup completed" exit 0 } @@ -21,153 +16,63 @@ cleanup() { # Setup signal handlers for graceful shutdown trap cleanup SIGTERM SIGINT -echo "🔍 Environment variables provided by orchestrator:" -echo " - SESSION_KEY: ${SESSION_KEY:-not set}" -echo " - USER_ID: ${USER_ID:-not set}" -echo " - CHANNEL_ID: ${CHANNEL_ID:-not set}" -echo " - REPOSITORY_URL: ${REPOSITORY_URL:-not set}" -echo " - DEPLOYMENT_NAME: ${DEPLOYMENT_NAME:-not set}" - # Basic validation for critical variables if [[ -z "${SESSION_KEY:-}" ]]; then echo "❌ Error: SESSION_KEY is required" exit 1 fi -echo "✅ Critical environment variables are set" - -# Setup Google Cloud credentials if provided -if [[ -n "${GOOGLE_APPLICATION_CREDENTIALS:-}" ]]; then - echo "🔑 Setting up Google Cloud credentials..." - - # Ensure the credentials file exists - if [[ -f "$GOOGLE_APPLICATION_CREDENTIALS" ]]; then - echo "✅ Google Cloud credentials file found" - - # Set proper permissions - chmod 600 "$GOOGLE_APPLICATION_CREDENTIALS" - - # Test credentials - if command -v gcloud >/dev/null 2>&1; then - echo "🧪 Testing Google Cloud credentials..." - if gcloud auth application-default print-access-token >/dev/null 2>&1; then - echo "✅ Google Cloud credentials are valid" - else - echo "⚠️ Warning: Google Cloud credentials test failed" - fi - fi - else - echo "⚠️ Warning: Google Cloud credentials file not found at $GOOGLE_APPLICATION_CREDENTIALS" - fi -fi +echo "✅ Environment: Session ${SESSION_KEY}, User ${USER_ID:-unknown}" # Setup workspace directory -echo "📁 Setting up workspace directory..." WORKSPACE_DIR="/workspace" mkdir -p "$WORKSPACE_DIR" cd "$WORKSPACE_DIR" -# Set proper permissions for workspace -chmod 755 "$WORKSPACE_DIR" +# Ensure Claude CLI sessions persist in the workspace volume +# Create persistent Claude directory in workspace +CLAUDE_PERSISTENT_DIR="/workspace/.claude-sessions" +mkdir -p "$CLAUDE_PERSISTENT_DIR" -echo "✅ Workspace directory ready: $WORKSPACE_DIR" +# Get user's home directory and setup symlink for Claude CLI +USER_HOME=$(getent passwd "$(whoami)" | cut -d: -f6) +mkdir -p "$USER_HOME" -# Log container information -echo "📊 Container Information:" -echo " - Session Key: $SESSION_KEY" -echo " - Repository: $REPOSITORY_URL" -echo " - Recovery Mode: ${RECOVERY_MODE:-false}" -echo " - Working Directory: $(pwd)" -echo " - Container Hostname: $(hostname)" -echo " - Container Memory Limit: $(cat /sys/fs/cgroup/memory.max 2>/dev/null || echo 'unknown')" -echo " - Container CPU Limit: $(cat /sys/fs/cgroup/cpu.max 2>/dev/null || echo 'unknown')" +# Remove existing .claude if it's not a symlink, then create symlink +if [[ -d "$USER_HOME/.claude" ]] && [[ ! -L "$USER_HOME/.claude" ]]; then + echo "🔄 Moving existing Claude config to persistent storage..." + mv "$USER_HOME/.claude"/* "$CLAUDE_PERSISTENT_DIR/" 2>/dev/null || true + rm -rf "$USER_HOME/.claude" +fi -# Check available tools -echo "🔧 Checking available tools..." -tools_to_check=( - "node" - "bun" - "git" - "claude" - "curl" - "jq" -) +# Create symlink if it doesn't exist +if [[ ! -L "$USER_HOME/.claude" ]]; then + echo "🔗 Creating Claude session persistence symlink..." + ln -sf "$CLAUDE_PERSISTENT_DIR" "$USER_HOME/.claude" +fi -for tool in "${tools_to_check[@]}"; do - if command -v "$tool" >/dev/null 2>&1; then - version=$(timeout 5 "$tool" --version 2>/dev/null | head -1 || echo "unknown") - echo " ✅ $tool: $version" - else - echo " ❌ $tool: not available" - fi -done +echo "✅ Claude sessions will persist in: $CLAUDE_PERSISTENT_DIR" -# Check Claude CLI specifically -echo "🤖 Checking Claude CLI installation..." -if command -v claude >/dev/null 2>&1; then - claude_version=$(timeout 10 claude --version 2>/dev/null || echo "unknown") - echo " ✅ Claude CLI: $claude_version" - - # Test Claude CLI basic functionality - if timeout 10 claude --help >/dev/null 2>&1; then - echo " ✅ Claude CLI is functional" - else - echo " ⚠️ Warning: Claude CLI help test failed" - fi - - # Setup MCP server configuration for Claude Code - echo "🔧 Configuring MCP servers for Claude Code..." - if [ -f "/app/packages/worker/mcp-config.json" ]; then - mkdir -p /home/claude/.claude - cp /app/packages/worker/mcp-config.json /home/claude/.claude/settings.mcp.json - echo " ✅ MCP server configuration deployed to /home/claude/.claude/settings.mcp.json" - - # Also ensure the MCP server is executable - if [ -f "/app/packages/worker/dist/mcp/process-manager-server.mjs" ]; then - chmod +x /app/packages/worker/dist/mcp/process-manager-server.mjs - echo " ✅ MCP server made executable" - fi - else - echo " ⚠️ Warning: MCP config file not found" - fi +# Repository is pre-cloned in the image, just pull for updates +if [[ -n "${REPOSITORY_URL:-}" ]] && [[ -d ".git" ]]; then + echo "🔄 Pulling latest changes..." + timeout 30 git pull origin main 2>/dev/null || { + echo "⚠️ Git pull failed or timed out, continuing with existing code" + } else - echo " ❌ Error: Claude CLI not found in PATH" - echo " PATH: $PATH" - exit 1 + echo "ℹ️ No git repository to update" fi -# Setup git global configuration -echo "⚙️ Setting up git configuration..." -git config --global user.name "Claude Code Worker" -git config --global user.email "claude-code-worker@noreply.github.com" -git config --global init.defaultBranch main -git config --global pull.rebase false -git config --global safe.directory '*' - -echo "✅ Git configuration completed" - -# Display final status -echo "🎯 Starting worker execution..." -echo " - Session: ${SESSION_KEY:-unknown}" -echo " - User ID: ${USER_ID:-unknown}" -echo " - Timeout: 5 minutes (managed by orchestrator)" -echo " - Recovery: ${RECOVERY_MODE:-false}" - -# Make scripts executable -chmod +x /app/scripts/*.sh 2>/dev/null || true - -# Setup MCP server -/app/packages/worker/scripts/setup-mcp-server.sh || echo "⚠️ MCP server setup failed or not found" - -# Check if we need to build (dev mode or if dist does not exist) -if [ "${BUILD_MODE:-prod}" = "dev" ] || [ ! -d "/app/packages/worker/dist" ]; then - echo "Building packages..." - cd /app/packages/shared && bun run build - cd /app/packages/worker && bun run build - chmod +x /app/packages/worker/dist/mcp/process-manager-server.mjs 2>/dev/null || true +# MCP servers and dependencies are pre-installed in the image +# Just verify Claude CLI is available +if ! command -v claude >/dev/null 2>&1; then + echo "❌ Error: Claude CLI not found" + exit 1 fi +echo "✅ Claude CLI ready" + # Start the worker process -echo "🚀 Executing Claude Worker..." +echo "🚀 Starting worker..." cd /app/packages/worker exec bun run dist/index.js \ No newline at end of file diff --git a/packages/worker/src/claude-worker.ts b/packages/worker/src/claude-worker.ts index 6e482f5a4..2e06d6c4b 100644 --- a/packages/worker/src/claude-worker.ts +++ b/packages/worker/src/claude-worker.ts @@ -265,7 +265,8 @@ export class ClaudeWorker { : "✅ Task completed successfully"; logger.info(`Sending final message via queue: ${finalMessage}...`); - await this.queueIntegration.updateProgress(finalMessage); + await this.queueIntegration.updateProgressWithEnvCheck(finalMessage, true); + await this.queueIntegration.signalDone(finalMessage); // Hide stop button and update reaction to success diff --git a/packages/worker/src/task-queue-integration.ts b/packages/worker/src/task-queue-integration.ts index 4cdb7a1af..90101acc0 100644 --- a/packages/worker/src/task-queue-integration.ts +++ b/packages/worker/src/task-queue-integration.ts @@ -22,6 +22,10 @@ interface ThreadResponsePayload { originalMessageTs?: string; // User's original message timestamp for reactions gitBranch?: string; // Current git branch for Edit button URLs botResponseTs?: string; // Bot's response message timestamp for updates + envChanges?: boolean; // Flag indicating environment was modified + modifiedFiles?: string[]; // Files changed (e.g. [".devcontainer/devcontainer.json"]) + currentCommit?: string; // Current git HEAD after changes + repositoryUrl?: string; // Repository URL for rebuild context } export class QueueIntegration { @@ -134,6 +138,41 @@ export class QueueIntegration { } } + /** + * Update progress with optional environment change detection + */ + async updateProgressWithEnvCheck(content: string, checkForEnvChanges: boolean = false): Promise { + try { + // Ensure we always have content to update with + if (!content || content.trim() === "") { + logger.warn("updateProgressWithEnvCheck called with empty content, using default message"); + content = "✅ Task completed"; + } + + // Check for devcontainer changes if requested + let envChangeInfo = null; + if (checkForEnvChanges) { + envChangeInfo = await this.detectDevcontainerChanges(); + } + + // Rate limiting: don't update more than once every 2 seconds + const now = Date.now(); + if (now - this.lastUpdateTime < 2000) { + // Queue the update with environment info + this.updateQueue.push(JSON.stringify({ content, envChangeInfo })); + this.processQueue(); + return; + } + + await this.performUpdateWithEnv(content, envChangeInfo); + this.lastUpdateTime = now; + + } catch (error) { + logger.error("Failed to send progress update with env check to queue:", error); + // Don't throw - worker should continue even if queue updates fail + } + } + /** * Stream progress updates (for real-time Claude output) */ @@ -210,7 +249,18 @@ export class QueueIntegration { this.updateQueue = []; // Clear queue if (latestUpdate) { - await this.performUpdate(latestUpdate); + // Handle both string and JSON updates (for environment changes) + try { + const updateObj = JSON.parse(latestUpdate); + if (updateObj.content && updateObj.envChangeInfo !== undefined) { + await this.performUpdateWithEnv(updateObj.content, updateObj.envChangeInfo); + } else { + await this.performUpdate(latestUpdate); + } + } catch { + // Not JSON, treat as regular string update + await this.performUpdate(latestUpdate); + } this.lastUpdateTime = Date.now(); } @@ -302,6 +352,13 @@ export class QueueIntegration { * Perform the actual queue update */ private async performUpdate(content: string): Promise { + await this.performUpdateWithEnv(content, null); + } + + /** + * Perform queue update with optional environment change information + */ + private async performUpdateWithEnv(content: string, envChangeInfo: { modifiedFiles: string[], currentCommit: string, repositoryUrl?: string } | null): Promise { if (!this.isConnected) { logger.warn("Queue not connected, skipping update"); return; @@ -310,7 +367,7 @@ export class QueueIntegration { try { // Final safety check - ensure we have content if (!content || content.trim() === "") { - logger.warn("performUpdate called with empty content, using fallback"); + logger.warn("performUpdateWithEnv called with empty content, using fallback"); content = "✅ Task completed"; } @@ -327,15 +384,28 @@ export class QueueIntegration { botResponseTs: this.botResponseTs // Bot's response message for updates }; + // Add environment change information if present + if (envChangeInfo) { + payload.envChanges = true; + payload.modifiedFiles = envChangeInfo.modifiedFiles; + payload.currentCommit = envChangeInfo.currentCommit; + payload.repositoryUrl = envChangeInfo.repositoryUrl; + logger.info(`Including environment changes in progress update: ${envChangeInfo.modifiedFiles.join(', ')}`); + } + // Send to thread_response queue + const priority = envChangeInfo ? 10 : 0; // Higher priority for environment changes const jobId = await this.pgBoss.send('thread_response', payload, { - priority: 0, + priority, retryLimit: 3, retryDelay: 5, expireInHours: 1, }); - - logger.info(`Sent progress update to queue with job id: ${jobId}`); + + const logMessage = envChangeInfo ? + `Sent progress update with env changes to queue with job id: ${jobId} - files: ${envChangeInfo.modifiedFiles.join(', ')}` : + `Sent progress update to queue with job id: ${jobId}`; + logger.info(logMessage); } catch (error: any) { logger.error("Failed to send update to thread_response queue:", error); @@ -595,6 +665,161 @@ export class QueueIntegration { this.currentToolExecution = ""; } + /** + * Signal environment changes that require rebuild + */ + async signalEnvironmentChange(modifiedFiles: string[], repositoryUrl?: string): Promise { + if (!this.isConnected) { + logger.warn("Queue not connected, skipping environment change signal"); + return; + } + + try { + // Get current commit ID + const currentCommit = await this.getCurrentCommitId(); + if (!currentCommit) { + logger.warn("Could not determine current commit ID, skipping environment change signal"); + return; + } + + const payload: ThreadResponsePayload = { + messageId: `env-change-${Date.now()}`, + channelId: this.responseChannel, + threadTs: this.responseTs, + userId: process.env.USER_ID || 'unknown', + isDone: false, + timestamp: Date.now(), + envChanges: true, + modifiedFiles, + currentCommit, + repositoryUrl: repositoryUrl || process.env.REPOSITORY_URL, + originalMessageTs: process.env.ORIGINAL_MESSAGE_TS, + botResponseTs: this.botResponseTs + }; + + const jobId = await this.pgBoss.send('thread_response', payload, { + priority: 10, // Very high priority for environment changes + retryLimit: 5, + retryDelay: 5, + expireInHours: 1, + }); + + logger.info(`Sent environment change signal to queue with job id: ${jobId} - files: ${modifiedFiles.join(', ')}`); + + } catch (error: any) { + logger.error("Failed to send environment change signal to queue:", error); + // Don't throw - worker should continue even if signal fails + } + } + + /** + * Detect devcontainer changes and return change info (without sending separate signal) + */ + async detectDevcontainerChanges(): Promise<{ modifiedFiles: string[], currentCommit: string, repositoryUrl?: string } | null> { + try { + const devcontainerFiles = [ + '.devcontainer/devcontainer.json', + '.devcontainer.json', + 'devcontainer.json' + ]; + + const modifiedFiles: string[] = []; + + // Check if any devcontainer files were modified + for (const file of devcontainerFiles) { + if (await this.isFileRecentlyModified(file)) { + logger.info(`Devcontainer change detected in ${file}`); + modifiedFiles.push(file); + } + } + + if (modifiedFiles.length === 0) { + return null; + } + + // Get current commit ID + const currentCommit = await this.getCurrentCommitId(); + if (!currentCommit) { + logger.warn("Could not determine current commit ID for devcontainer changes"); + return null; + } + + return { + modifiedFiles, + currentCommit, + repositoryUrl: process.env.REPOSITORY_URL + }; + + } catch (error) { + logger.warn('Failed to detect devcontainer changes:', error); + return null; + } + } + + /** + * Check for devcontainer changes and signal if needed (deprecated - use updateProgressWithEnvCheck) + */ + async checkAndSignalDevcontainerChanges(): Promise { + try { + const envChangeInfo = await this.detectDevcontainerChanges(); + if (envChangeInfo) { + await this.signalEnvironmentChange(envChangeInfo.modifiedFiles, envChangeInfo.repositoryUrl); + } + } catch (error) { + logger.warn('Failed to check for devcontainer changes:', error); + } + } + + /** + * Get current git commit ID + */ + private async getCurrentCommitId(): Promise { + try { + const workspaceDir = process.env.USER_ID ? `/workspace/${process.env.USER_ID}` : process.cwd(); + + const commitId = execSync('git rev-parse HEAD', { + encoding: 'utf-8', + cwd: workspaceDir + }).trim(); + + return commitId || null; + } catch (error) { + logger.warn('Could not get current commit ID:', error); + return null; + } + } + + /** + * Check if a file was recently modified (within the last session) + */ + private async isFileRecentlyModified(filePath: string): Promise { + try { + const workspaceDir = process.env.USER_ID ? `/workspace/${process.env.USER_ID}` : process.cwd(); + const fullPath = `${workspaceDir}/${filePath}`; + + // Check if file exists + try { + await import('fs').then(fs => fs.promises.access(fullPath)); + } catch { + return false; // File doesn't exist + } + + // Check git status to see if file has uncommitted changes + const status = execSync(`git status --porcelain "${filePath}"`, { + encoding: 'utf-8', + cwd: workspaceDir, + stdio: 'pipe' + }).trim(); + + // File is modified if it appears in git status + return status.length > 0; + + } catch (error) { + logger.debug(`Could not check modification status for ${filePath}:`, error); + return false; + } + } + /** * Check if queue integration is connected */