Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ab71414
wip
Pijukatel Jul 24, 2025
d328ada
More wip, todo figure out streaming buffer
Pijukatel Aug 11, 2025
a8ae55f
WIP, polish the tests. Add those that exists in Python version.
Pijukatel Aug 18, 2025
4f2146d
Do I have to split logger into formater + logger?
Pijukatel Aug 25, 2025
cc7aba9
Some working draft
Pijukatel Oct 21, 2025
50b2732
Try to exclude files, to make it possible to install/build from branch
Pijukatel Oct 21, 2025
f25935c
Add log option to ActorClient.call
Pijukatel Oct 22, 2025
4aa0125
Update type hint
Pijukatel Oct 22, 2025
c410190
Format and lint and add test
Pijukatel Oct 22, 2025
d9a900a
Merge remote-tracking branch 'origin/master' into log-redirection
Pijukatel Oct 22, 2025
1c293da
Add Actor call test, 3 scenarios
Pijukatel Oct 23, 2025
0456c20
Add more comments
Pijukatel Oct 23, 2025
dc88c2b
Remove unused import
Pijukatel Oct 23, 2025
ec0d60e
Fix failing tests
Pijukatel Oct 23, 2025
fc7bb6c
Do not run log redirection when in Browser
Pijukatel Oct 23, 2025
bbeeee4
Review comments
Pijukatel Oct 27, 2025
00bf7ea
Update tests to pas on Node 16
Pijukatel Oct 27, 2025
5fd0fc7
Properlyhnadle multibyte characters that are split by chunking
Pijukatel Oct 29, 2025
5d4c1f4
Fix awaiting logs without calling run
Pijukatel Oct 29, 2025
51dd246
Make it possible to set response sequence per each test if needed
Pijukatel Oct 29, 2025
71d7fb5
Review comments
Pijukatel Nov 3, 2025
0b2e6e6
Simplify handling of multibyte chars
Pijukatel Nov 4, 2025
0440e39
Basic review feedback
Pijukatel Nov 5, 2025
0b84a32
Simplify loging and add more docs
Pijukatel Nov 5, 2025
bca27b2
Cleanup using finally chain
Pijukatel Nov 5, 2025
8eb6e71
Use setTimeout from node
Pijukatel Nov 6, 2025
dfa5dcd
Review comments
Pijukatel Nov 11, 2025
3e48c60
Add `default` alternative to `undefined`
Pijukatel Nov 11, 2025
f560b3b
Test review comments
Pijukatel Nov 19, 2025
bc2f1e2
Remove test related tmp edit
Pijukatel Nov 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"@apify/utilities": "^2.18.0",
"@crawlee/types": "^3.3.0",
"agentkeepalive": "^4.2.1",
"ansi-colors": "^4.1.1",
"async-retry": "^1.3.3",
"axios": "^1.6.7",
"content-type": "^1.0.5",
Expand Down
24 changes: 22 additions & 2 deletions src/resource_clients/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import ow from 'ow';

import type { RUN_GENERAL_ACCESS } from '@apify/consts';
import { ACT_JOB_STATUSES, ACTOR_PERMISSION_LEVEL, META_ORIGINS } from '@apify/consts';
import { Log } from '@apify/log';

import type { ApiClientSubResourceOptions } from '../base/api_client';
import { ResourceClient } from '../base/resource_client';
Expand Down Expand Up @@ -139,18 +140,28 @@ export class ActorClient extends ResourceClient {
webhooks: ow.optional.array.ofType(ow.object),
maxItems: ow.optional.number.not.negative,
maxTotalChargeUsd: ow.optional.number.not.negative,
log: ow.optional.any(ow.null, ow.object.instanceOf(Log), ow.string.equals('default')),
restartOnError: ow.optional.boolean,
forcePermissionLevel: ow.optional.string.oneOf(Object.values(ACTOR_PERMISSION_LEVEL)),
}),
);

const { waitSecs, ...startOptions } = options;
const { waitSecs, log, ...startOptions } = options;
const { id } = await this.start(input, startOptions);

// Calling root client because we need access to top level API.
// Creating a new instance of RunClient here would only allow
// setting it up as a nested route under actor API.
return this.apifyClient.run(id).waitForFinish({ waitSecs });
const newRunClient = this.apifyClient.run(id);

const streamedLog = await newRunClient.getStreamedLog({ toLog: options?.log });
streamedLog?.start();
return this.apifyClient
.run(id)
.waitForFinish({ waitSecs })
.finally(async () => {
await streamedLog?.stop();
});
}

/**
Expand Down Expand Up @@ -425,7 +436,16 @@ export interface ActorStartOptions {
}

export interface ActorCallOptions extends Omit<ActorStartOptions, 'waitForFinish'> {
/**
* Wait time in seconds for the actor run to finish.
*/
waitSecs?: number;
/**
* `Log` instance that should be used to redirect actor run logs to.
* If `undefined` or `'default'` the pre-defined `Log` will be created and used.
* If `null`, no log redirection will occur.
*/
log?: Log | null | 'default';
}

export interface ActorRunListItem {
Expand Down
180 changes: 177 additions & 3 deletions src/resource_clients/log.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
// eslint-disable-next-line max-classes-per-file
import type { Readable } from 'node:stream';

import c from 'ansi-colors';

import type { Log } from '@apify/log';
import { Logger, LogLevel } from '@apify/log';

import type { ApifyApiError } from '../apify_api_error';
import type { ApiClientSubResourceOptions } from '../base/api_client';
import { ResourceClient } from '../base/resource_client';
Expand All @@ -20,11 +26,11 @@ export class LogClient extends ResourceClient {
/**
* https://docs.apify.com/api/v2#/reference/logs/log/get-log
*/
async get(): Promise<string | undefined> {
async get(options: LogOptions = {}): Promise<string | undefined> {
const requestOpts: ApifyRequestConfig = {
url: this._url(),
method: 'GET',
params: this._params(),
params: this._params(options),
};

try {
Expand All @@ -41,9 +47,10 @@ export class LogClient extends ResourceClient {
* Gets the log in a Readable stream format. Only works in Node.js.
* https://docs.apify.com/api/v2#/reference/logs/log/get-log
*/
async stream(): Promise<Readable | undefined> {
async stream(options: LogOptions = {}): Promise<Readable | undefined> {
const params = {
stream: true,
raw: options.raw,
};

const requestOpts: ApifyRequestConfig = {
Expand All @@ -63,3 +70,170 @@ export class LogClient extends ResourceClient {
return undefined;
}
}

export interface LogOptions {
/** @default false */
raw?: boolean;
}

/**
* Logger for redirected actor logs.
*/
export class LoggerActorRedirect extends Logger {
constructor(options = {}) {
super({ skipTime: true, level: LogLevel.DEBUG, ...options });
}

override _log(level: LogLevel, message: string, data?: any, exception?: unknown, opts: Record<string, any> = {}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering, what's the biggest difference between this implementation and the _log from LoggerText (link)? Could we subclass LoggerText instead of abstract Logger and reduce the amount of code here?

Copy link
Contributor Author

@Pijukatel Pijukatel Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger in our codebase is kind of a combination of a formater and a handler in Python terms. While LoggerActorRedirect has the same handler as LoggerText (they are both logging to console), they have completely different formater. So there is no point in inheriting from LoggerText as none of its methods would be reused in LoggerActorRedirect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to adjust the shared logger to support this use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really needed for this change. This change works with the current version of the codebase. Changing Log and Logger from the upstream repo to be more similar to the Python counterpart would be quite a refactoring.

if (level > this.options.level) {
return;
}
if (data || exception) {
throw new Error('Redirect logger does not use other arguments than level and message');
}
let { prefix } = opts;
prefix = prefix ? `${prefix}` : '';

let maybeDate = '';
if (!this.options.skipTime) {
maybeDate = `${new Date().toISOString().replace('Z', '').replace('T', ' ')} `;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to repeat the timestamps?

Copy link
Contributor Author

@Pijukatel Pijukatel Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those timestamps will not be the same. One is the timestamp of origin, and the other is the timestamp of redirection.

For example>
2025-10-31T09:04:03.380Z redirect-log-tester runId:V4OVPjPbmjflPDeMP -> 2025-10-31T09:03:26.899Z ACTOR: Pulling container image of build KNlNPqgcBBivdtNCr from registry.

It is important to

  • keep logs formatted in the same way. (Otherwise, redirected logs would be offset by not having the timestamp) - which is used to detect the actual log lines separations.
  • to keep it clear what the time of log redirection is and what the time of the actual log origin is (for redirected logs from actors that were started before we started redirection and to which we just attached)

}

const line = `${c.gray(maybeDate)}${c.cyan(prefix)}${message || ''}`;

// All redirected logs are logged at info level to avid any console specific formating for non-info levels,
// which have already been applied once to the original log. (For example error stack traces etc.)
this._outputWithConsole(LogLevel.INFO, line);
return line;
}
}

/**
* Helper class for redirecting streamed Actor logs to another log.
*/
export class StreamedLog {
private destinationLog: Log;
private streamBuffer: Buffer[] = [];
private splitMarker = /(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)/g;
private relevancyTimeLimit: Date | null;

private logClient: LogClient;
private streamingTask: Promise<void> | null = null;
private stopLogging = false;

constructor(options: StreamedLogOptions) {
const { toLog, logClient, fromStart = true } = options;
this.destinationLog = toLog;
this.logClient = logClient;
this.relevancyTimeLimit = fromStart ? null : new Date();
}

/**
* Start log redirection.
*/
public start(): void {
if (this.streamingTask) {
throw new Error('Streaming task already active');
}
this.stopLogging = false;
this.streamingTask = this.streamLog();
}

/**
* Stop log redirection.
*/
public async stop(): Promise<void> {
if (!this.streamingTask) {
throw new Error('Streaming task is not active');
}
this.stopLogging = true;
try {
await this.streamingTask;
} catch (err) {
if (!(err instanceof Error && err.name === 'AbortError')) {
throw err;
}
} finally {
this.streamingTask = null;
}
}

/**
* Get log stream from response and redirect it to another log.
*/
private async streamLog(): Promise<void> {
const logStream = await this.logClient.stream({ raw: true });
if (!logStream) {
return;
}
const lastChunkRemainder = await this.logStreamChunks(logStream);
// Process whatever is left when exiting. Maybe it is incomplete, maybe it is last log without EOL.
const lastMessage = Buffer.from(lastChunkRemainder).toString().trim();
if (lastMessage.length) {
this.destinationLog.info(lastMessage);
}
}

private async logStreamChunks(logStream: Readable): Promise<Uint8Array> {
// Chunk may be incomplete. Keep remainder for next chunk.
let previousChunkRemainder: Uint8Array = new Uint8Array();

for await (const chunk of logStream) {
// Handle possible leftover incomplete line from previous chunk.
// Everything before last end of line is complete.
const chunkWithPreviousRemainder = new Uint8Array(previousChunkRemainder.length + chunk.length);
chunkWithPreviousRemainder.set(previousChunkRemainder, 0);
chunkWithPreviousRemainder.set(chunk, previousChunkRemainder.length);

const lastCompleteMessageIndex = chunkWithPreviousRemainder.lastIndexOf(0x0a);
previousChunkRemainder = chunkWithPreviousRemainder.slice(lastCompleteMessageIndex);

// Push complete part of the chunk to the buffer
this.streamBuffer.push(Buffer.from(chunkWithPreviousRemainder.slice(0, lastCompleteMessageIndex)));
this.logBufferContent();

// Keep processing the new data until stopped
if (this.stopLogging) {
break;
}
}
return previousChunkRemainder;
}

/**
* Parse the buffer and log complete messages.
*/
private logBufferContent(): void {
const allParts = Buffer.concat(this.streamBuffer).toString().split(this.splitMarker).slice(1);
// Parse the buffer parts into complete messages
const messageMarkers = allParts.filter((_, i) => i % 2 === 0);
const messageContents = allParts.filter((_, i) => i % 2 !== 0);
this.streamBuffer = [];

messageMarkers.forEach((marker, index) => {
const decodedMarker = marker;
const decodedContent = messageContents[index];
if (this.relevancyTimeLimit) {
// Log only relevant messages. Ignore too old log messages.
const logTime = new Date(decodedMarker);
if (logTime < this.relevancyTimeLimit) {
return;
}
}
const message = decodedMarker + decodedContent;

// Original log level information is not available. Log all on info level. Log level could be guessed for
// some logs, but for any multiline logs such guess would be probably correct only for the first line.
this.destinationLog.info(message.trim());
});
}
}

export interface StreamedLogOptions {
/** Log client used to communicate with the Apify API. */
logClient: LogClient;
/** Log to which the Actor run logs will be redirected. */
toLog: Log;
/** Whether to redirect all logs from Actor run start (even logs from the past). */
fromStart?: boolean;
}
38 changes: 36 additions & 2 deletions src/resource_clients/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import type { AxiosRequestConfig } from 'axios';
import ow from 'ow';

import type { RUN_GENERAL_ACCESS } from '@apify/consts';
import { LEVELS, Log } from '@apify/log';

import type { ApiClientOptionsWithOptionalResourcePath } from '../base/api_client';
import { ResourceClient } from '../base/resource_client';
import type { ApifyResponse } from '../http_client';
import { cast, parseDateFields, pluckData } from '../utils';
import { cast, isNode, parseDateFields, pluckData } from '../utils';
import type { ActorRun } from './actor';
import { DatasetClient } from './dataset';
import { KeyValueStoreClient } from './key_value_store';
import { LogClient } from './log';
import { LogClient, LoggerActorRedirect, StreamedLog } from './log';
import { RequestQueueClient } from './request_queue';

const RUN_CHARGE_IDEMPOTENCY_HEADER = 'idempotency-key';
Expand Down Expand Up @@ -266,6 +267,39 @@ export class RunClient extends ResourceClient {
}),
);
}

/**
* Get StreamedLog for convenient streaming of the run log and their redirection.
*/
async getStreamedLog(options: GetStreamedLogOptions = {}): Promise<StreamedLog | undefined> {
const { fromStart = true } = options;
let { toLog } = options;
if (toLog === null || !isNode()) {
// Explicitly no logging or not in Node.js
return undefined;
}
if (toLog === undefined || toLog === 'default') {
// Create default StreamedLog
// Get actor name and run id
const runData = await this.get();
const runId = runData?.id ?? '';

const actorId = runData?.actId ?? '';
const actorData = (await this.apifyClient.actor(actorId).get()) || { name: '' };

const actorName = actorData?.name ?? '';
const name = [actorName, `runId:${runId}`].filter(Boolean).join(' ');

toLog = new Log({ level: LEVELS.DEBUG, prefix: `${name} -> `, logger: new LoggerActorRedirect() });
}

return new StreamedLog({ logClient: this.log(), toLog, fromStart });
}
}

export interface GetStreamedLogOptions {
toLog?: Log | null | 'default';
fromStart?: boolean;
}

export interface RunGetOptions {
Expand Down
2 changes: 1 addition & 1 deletion test/_helper.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { launchPuppeteer, puppeteerUtils } = require('@crawlee/puppeteer');

const mockServer = require('./mock_server/server');
const { mockServer } = require('./mock_server/server');

class Browser {
async start() {
Expand Down
Loading