Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ PORT=3000
# CLI can override with --quiet (error) or --verbose (debug)
# LOG_LEVEL=info

# Observability (Optional - Langfuse)
# AI model call tracing via Langfuse (https://langfuse.com or self-hosted)
# When not set, observability is disabled with zero overhead
# LANGFUSE_PUBLIC_KEY=pk-lf-...
# LANGFUSE_SECRET_KEY=sk-lf-...
# LANGFUSE_BASE_URL=https://cloud.langfuse.com

# Concurrency
MAX_CONCURRENT_CONVERSATIONS=10 # Maximum concurrent AI conversations (default: 10)

Expand Down
113 changes: 113 additions & 0 deletions bun.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import {
BUNDLED_VERSION,
} from '@archon/paths';
import * as git from '@archon/git';
import { initLangfuse, shutdownLangfuse, isLangfuseEnabled } from '@archon/providers';

/** Lazy-initialized logger (deferred so test mocks can intercept createLogger) */
let cachedLog: ReturnType<typeof createLogger> | undefined;
Expand Down Expand Up @@ -253,6 +254,11 @@ async function main(): Promise<number> {
setLogLevel('debug');
}

// Initialize Langfuse observability (no-op when env vars not set)
if (isLangfuseEnabled()) {
await initLangfuse();
}

// Note: orphaned run cleanup moved to `workflow cleanup` command only.
// Running it on every CLI startup killed parallel workflow runs (all
// 'running' status rows were marked failed by each new process).
Expand Down Expand Up @@ -573,6 +579,7 @@ async function main(): Promise<number> {
}
return 1;
} finally {
await shutdownLangfuse();
// Always close database connection
await closeDb();
}
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/orchestrator/orchestrator-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import type { MergedConfig } from '../config/config-types';
import { generateAndSetTitle } from '../services/title-generator';
import { validateAndResolveIsolation, dispatchBackgroundWorkflow } from './orchestrator';
import { IsolationBlockedError } from '@archon/isolation';
import { withObservabilityContext } from '@archon/providers/observability';
import { buildOrchestratorPrompt, buildProjectScopedPrompt } from './prompt-builder';
import * as workflowDb from '../db/workflows';
import * as workflowEventDb from '../db/workflow-events';
Expand Down Expand Up @@ -500,6 +501,18 @@ export async function handleMessage(
conversationId: string,
message: string,
context?: HandleMessageContext
): Promise<void> {
return withObservabilityContext(
{ conversationId, platformType: platform.getPlatformType() },
() => handleMessageInner(platform, conversationId, message, context)
);
}

async function handleMessageInner(
platform: IPlatformAdapter,
conversationId: string,
message: string,
context?: HandleMessageContext
): Promise<void> {
const { issueContext, threadContext, parentConversationId, isolationHints, attachedFiles } =
context ?? {};
Expand Down
10 changes: 7 additions & 3 deletions packages/providers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
"./codex/config": "./src/codex/config.ts",
"./codex/binary-resolver": "./src/codex/binary-resolver.ts",
"./errors": "./src/errors.ts",
"./registry": "./src/registry.ts"
"./registry": "./src/registry.ts",
"./observability": "./src/observability.ts"
},
"scripts": {
"test": "bun test src/claude/provider.test.ts && bun test src/codex/provider.test.ts && bun test src/registry.test.ts && bun test src/codex/binary-guard.test.ts && bun test src/codex/binary-resolver.test.ts && bun test src/codex/binary-resolver-dev.test.ts",
"test": "bun test src/claude/provider.test.ts && bun test src/codex/provider.test.ts && bun test src/registry.test.ts && bun test src/codex/binary-guard.test.ts && bun test src/codex/binary-resolver.test.ts && bun test src/codex/binary-resolver-dev.test.ts && bun test src/observability.test.ts",
"type-check": "bun x tsc --noEmit"
},
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.2.89",
"@archon/paths": "workspace:*",
"@openai/codex-sdk": "^0.116.0"
"@langfuse/otel": "^5.1.0",
"@langfuse/tracing": "^5.1.0",
"@openai/codex-sdk": "^0.116.0",
"@opentelemetry/sdk-node": "^0.214.0"
},
"devDependencies": {
"pino": "^9"
Expand Down
15 changes: 10 additions & 5 deletions packages/providers/src/claude/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
type HookCallback,
type HookCallbackMatcher,
} from '@anthropic-ai/claude-agent-sdk';
import { traceQuery } from '../observability';
import cliPath from '@anthropic-ai/claude-agent-sdk/embed';
import type {
IAgentProvider,
Expand Down Expand Up @@ -147,16 +148,17 @@ class FirstEventTimeoutError extends Error {}
* within `timeoutMs`. If it doesn't, aborts the controller and throws.
*/
export async function* withFirstMessageTimeout<T>(
gen: AsyncGenerator<T>,
iterable: AsyncIterable<T>,
controller: AbortController,
timeoutMs: number,
diagnostics: Record<string, unknown>
): AsyncGenerator<T> {
const iter = iterable[Symbol.asyncIterator]();
let timerId: ReturnType<typeof setTimeout> | undefined;
let firstValue: IteratorResult<T>;
try {
firstValue = await Promise.race([
gen.next(),
iter.next(),
new Promise<never>((_, reject) => {
timerId = setTimeout(() => {
reject(new FirstEventTimeoutError());
Expand All @@ -182,7 +184,10 @@ export async function* withFirstMessageTimeout<T>(

if (firstValue.done) return;
yield firstValue.value;
yield* gen;
// Continue yielding remaining items from the iterator
for await (const item of { [Symbol.asyncIterator]: () => iter }) {
yield item;
}
}

/**
Expand Down Expand Up @@ -916,8 +921,8 @@ export class ClaudeProvider implements IAgentProvider {
);
const events = withFirstMessageTimeout(rawEvents, controller, timeoutMs, diagnostics);

// 5. Stream normalized events
yield* streamClaudeMessages(events, toolResultQueue);
// 5. Stream normalized events (with optional Langfuse tracing)
yield* traceQuery(prompt, options.model, streamClaudeMessages(events, toolResultQueue));
return;
} catch (error) {
const err = error as Error;
Expand Down
15 changes: 10 additions & 5 deletions packages/providers/src/codex/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
ProviderCapabilities,
} from '../types';
import { parseCodexConfig } from './config';
import { traceQuery } from '../observability';
import { CODEX_CAPABILITIES } from './capabilities';
import { resolveCodexBinaryPath } from './binary-resolver';
import { createLogger } from '@archon/paths';
Expand Down Expand Up @@ -584,11 +585,15 @@ export class CodexProvider implements IAgentProvider {
const result = await thread.runStreamed(prompt, turnOptions);

// 5. Stream normalized events (fresh state per attempt to avoid dedup leaks)
yield* streamCodexEvents(
result.events as AsyncIterable<Record<string, unknown>>,
hasOutputFormat,
thread.id,
requestOptions?.abortSignal
yield* traceQuery(
prompt,
requestOptions?.model,
streamCodexEvents(
result.events as AsyncIterable<Record<string, unknown>>,
hasOutputFormat,
thread.id,
requestOptions?.abortSignal
)
);
return;
} catch (error) {
Expand Down
9 changes: 9 additions & 0 deletions packages/providers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ export { parseCodexConfig, type CodexProviderDefaults } from './codex/config';
// Utilities (needed by consumers)
export { resetCodexSingleton } from './codex/provider';
export { resolveCodexBinaryPath, fileExists } from './codex/binary-resolver';

// Observability (optional Langfuse integration)
export {
initLangfuse,
shutdownLangfuse,
isLangfuseEnabled,
withObservabilityContext,
type ObservabilityAttrs,
} from './observability';
Loading