Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/

import { describe, expect, test } from "bun:test";
import type { QueuedMessage } from "@lobu/core";
import { MessageBatcher } from "../gateway/message-batcher";
import type { QueuedMessage } from "../gateway/types";

function makeMsg(
messageId: string,
Expand Down
7 changes: 0 additions & 7 deletions packages/agent-worker/src/gateway/gateway-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export class HttpWorkerTransport implements WorkerTransport {
private botResponseTs?: string;
public processedMessageIds: string[] = [];
private jobId?: string;
private moduleData?: Record<string, unknown>;
private teamId: string;
private platform?: string;
private platformMetadata?: Record<string, unknown>;
Expand All @@ -52,10 +51,6 @@ export class HttpWorkerTransport implements WorkerTransport {
this.jobId = jobId;
}

setModuleData(moduleData: Record<string, unknown>): void {
this.moduleData = moduleData;
}

async signalDone(finalDelta?: string): Promise<void> {
// Send final delta if there is one
if (finalDelta) {
Expand Down Expand Up @@ -137,7 +132,6 @@ export class HttpWorkerTransport implements WorkerTransport {
await this.sendResponse(
this.buildBaseResponse({
delta: actualDelta,
moduleData: this.moduleData,
isFullReplacement,
})
);
Expand All @@ -147,7 +141,6 @@ export class HttpWorkerTransport implements WorkerTransport {
await this.sendResponse(
this.buildBaseResponse({
processedMessageIds: this.processedMessageIds,
moduleData: this.moduleData,
})
);
}
Expand Down
3 changes: 1 addition & 2 deletions packages/agent-worker/src/gateway/message-batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
* Message batching for grouping rapid messages
*/

import { createLogger } from "@lobu/core";
import type { QueuedMessage } from "./types";
import { createLogger, type QueuedMessage } from "@lobu/core";

const logger = createLogger("message-batcher");

Expand Down
3 changes: 2 additions & 1 deletion packages/agent-worker/src/gateway/sse-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
createLogger,
extractTraceId,
flushTracing,
type MessagePayload,
type QueuedMessage,
SpanStatusCode,
stripEnv,
} from "@lobu/core";
Expand All @@ -16,7 +18,6 @@ import type { WorkerConfig, WorkerExecutor } from "../core/types";
import { SENSITIVE_WORKER_ENV_KEYS } from "../shared/worker-env-keys";
import { HttpWorkerTransport } from "./gateway-integration";
import { MessageBatcher } from "./message-batcher";
import type { MessagePayload, QueuedMessage } from "./types";

const logger = createLogger("sse-client");

Expand Down
82 changes: 6 additions & 76 deletions packages/agent-worker/src/gateway/types.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,13 @@
/**
* Shared types for gateway communication
* Worker-side response payload returned to the gateway over HTTP.
*
* The gateway↔worker wire contract (`MessagePayload`, `JobType`,
* `QueuedMessage`) is exported from `@lobu/core` — import from there
* directly, not from this file.
*/

import type { AgentOptions, ThreadResponsePayload } from "@lobu/core";
import type { ThreadResponsePayload } from "@lobu/core";

/**
* Platform-specific metadata (e.g., Slack team_id, channel, thread_ts)
*/
interface PlatformMetadata {
team_id?: string;
channel?: string;
ts?: string;
thread_ts?: string;
files?: unknown[];
traceId?: string; // Trace ID for end-to-end observability
[key: string]: unknown;
}

/**
* Job type for queue messages
* - message: Standard agent message execution
* - exec: Direct command execution in sandbox
*/
export type JobType = "message" | "exec";

/**
* Message payload for agent execution
*/
export interface MessagePayload {
botId: string;
userId: string;
agentId: string;
conversationId: string;
platform: string;
channelId: string;
messageId: string;
messageText: string;
platformMetadata: PlatformMetadata;
agentOptions: AgentOptions;
jobId?: string; // Optional job ID from gateway
teamId?: string; // Optional team ID (WhatsApp uses top-level, Slack uses platformMetadata)

// The runs.id of the row that dispatched this job. Set by the gateway
// MessageConsumer (stamped from the runs-queue claim's job.id) and
// threaded into WorkerConfig.runId. The worker's cleanup() uses it to
// attribute the agent_transcript_snapshot row to the correct run —
// see codex P1#1 on PR #865.
runId?: number;

// Per-run worker JWT bound to `runId` above. Minted by MessageConsumer
// and threaded into WorkerConfig.runJobToken. The worker uses THIS
// token (not the deployment-lifetime WORKER_TOKEN) when calling the
// snapshot endpoint, so the route's `tokenData.runId === body.runId`
// equality check can reject any cross-run impersonation — codex round
// 2 finding A on PR #865.
runJobToken?: string;

// Job type (default: "message")
jobType?: JobType;

// Exec-specific fields (only used when jobType === "exec")
execId?: string; // Unique ID for exec job (for response routing)
execCommand?: string; // Command to execute
execCwd?: string; // Working directory for command
execEnv?: Record<string, string>; // Additional environment variables
execTimeout?: number; // Timeout in milliseconds
}

/**
* Queued message with timestamp
*/
export interface QueuedMessage {
payload: MessagePayload;
timestamp: number;
}

/**
* Response data sent back to gateway
*/
export type ResponseData = ThreadResponsePayload & {
originalMessageId: string;
};
1 change: 0 additions & 1 deletion packages/agent-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ async function main() {
logger.info(`Tracing initialized: lobu-worker -> ${otlpEndpoint}`);
}

await moduleRegistry.registerAvailableModules();
await moduleRegistry.initAll();
logger.info("✅ Modules initialized");

Expand Down
92 changes: 0 additions & 92 deletions packages/agent-worker/src/modules/lifecycle.ts

This file was deleted.

35 changes: 0 additions & 35 deletions packages/agent-worker/src/openclaw/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,6 @@ export class OpenClawWorker implements WorkerExecutor {
this.config.userId,
this.config.sessionKey
);

const { initModuleWorkspace } = await import("../modules/lifecycle");
await initModuleWorkspace({
workspaceDir: this.workspaceManager.getCurrentWorkingDirectory(),
username: this.config.userId,
sessionKey: this.config.sessionKey,
});
}
);

Expand All @@ -425,26 +418,6 @@ export class OpenClawWorker implements WorkerExecutor {
}
);

// Module hooks may modify the system prompt before agent execution.
try {
const { onSessionStart } = await import("../modules/lifecycle");
const moduleContext = await onSessionStart({
platform: this.config.platform,
channelId: this.config.channelId,
userId: this.config.userId,
conversationId: this.config.conversationId,
messageId: this.config.responseId,
workingDirectory: this.workspaceManager.getCurrentWorkingDirectory(),
customInstructions,
});
if (moduleContext.customInstructions) {
customInstructions = moduleContext.customInstructions;
}
} catch (error) {
logger.error("Failed to call onSessionStart hooks:", error);
}

// Add file I/O instructions AFTER module hooks so they aren't overwritten
customInstructions += this.getFileIOInstructions();

logger.info(
Expand Down Expand Up @@ -507,14 +480,6 @@ export class OpenClawWorker implements WorkerExecutor {
}
);

const { collectModuleData } = await import("../modules/lifecycle");
const moduleData = await collectModuleData({
workspaceDir: this.workspaceManager.getCurrentWorkingDirectory(),
userId: this.config.userId,
conversationId: this.config.conversationId,
});
this.workerTransport.setModuleData(moduleData);

if (result.success) {
// Snapshot writer in cleanup() reads this to discriminate the row.
// Hydrate skips non-completed snapshots, so getting this right is
Expand Down
Loading
Loading