Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 67 additions & 49 deletions maxun-core/src/interpret.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export default class Interpreter extends EventEmitter {
scrapeSchema: {}
};

private scrapeListCounter: number = 0;

constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
super();
this.workflow = workflow.workflow;
Expand Down Expand Up @@ -484,7 +486,7 @@ export default class Interpreter extends EventEmitter {
await this.options.serializableCallback(scrapeResults);
},

scrapeSchema: async (schema: Record<string, { selector: string; tag: string, attribute: string; shadow: string}>) => {
scrapeSchema: async (schema: Record<string, { selector: string; tag: string, attribute: string; shadow: string}>, actionName: string = "") => {
if (this.isAborted) {
this.log('Workflow aborted, stopping scrapeSchema', Level.WARN);
return;
Expand Down Expand Up @@ -540,25 +542,25 @@ export default class Interpreter extends EventEmitter {
}

const actionType = "scrapeSchema";
const actionName = (schema as any).__name || "Texts";
const name = actionName || "Texts";

if (!this.namedResults[actionType]) this.namedResults[actionType] = {};
this.namedResults[actionType][actionName] = this.cumulativeResults;
this.namedResults[actionType][name] = this.cumulativeResults;

if (!this.serializableDataByType[actionType]) this.serializableDataByType[actionType] = {};
if (!this.serializableDataByType[actionType][actionName]) {
this.serializableDataByType[actionType][actionName] = [];
if (!this.serializableDataByType[actionType][name]) {
this.serializableDataByType[actionType][name] = [];
}

this.serializableDataByType[actionType][actionName] = [...this.cumulativeResults];
this.serializableDataByType[actionType][name] = [...this.cumulativeResults];

await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
},

scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }) => {
scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }, actionName: string = "") => {
if (this.isAborted) {
this.log('Workflow aborted, stopping scrapeList', Level.WARN);
return;
Expand All @@ -575,12 +577,13 @@ export default class Interpreter extends EventEmitter {

try {
await this.ensureScriptsLoaded(page);

if (this.options.debugChannel?.incrementScrapeListIndex) {
this.options.debugChannel.incrementScrapeListIndex();
}

let scrapeResults = [];
let paginationUsed = false;

if (!config.pagination) {
scrapeResults = await page.evaluate((cfg) => {
Expand All @@ -592,38 +595,53 @@ export default class Interpreter extends EventEmitter {
}
}, config);
} else {
scrapeResults = await this.handlePagination(page, config);
paginationUsed = true;
scrapeResults = await this.handlePagination(page, config, actionName);
}

if (!Array.isArray(scrapeResults)) {
scrapeResults = [];
}

const actionType = "scrapeList";
const actionName = (config as any).__name || "List";
console.log(`ScrapeList completed with ${scrapeResults.length} results`);

if (!this.serializableDataByType[actionType]) this.serializableDataByType[actionType] = {};
if (!this.serializableDataByType[actionType][actionName]) {
this.serializableDataByType[actionType][actionName] = [];
}
if (!paginationUsed) {
const actionType = "scrapeList";
let name = actionName || "";

this.serializableDataByType[actionType][actionName].push(...scrapeResults);
if (!name || name.trim() === "") {
this.scrapeListCounter++;
name = `List ${this.scrapeListCounter}`;
}

await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
if (!this.serializableDataByType[actionType]) this.serializableDataByType[actionType] = {};
if (!this.serializableDataByType[actionType][name]) {
this.serializableDataByType[actionType][name] = [];
}

this.serializableDataByType[actionType][name].push(...scrapeResults);

await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
}
} catch (error) {
console.error('ScrapeList action failed completely:', error.message);

const actionType = "scrapeList";
const actionName = (config as any).__name || "List";
let name = actionName || "";

if (!name || name.trim() === "") {
this.scrapeListCounter++;
name = `List ${this.scrapeListCounter}`;
}

if (!this.namedResults[actionType]) this.namedResults[actionType] = {};
this.namedResults[actionType][actionName] = [];
this.namedResults[actionType][name] = [];

if (!this.serializableDataByType[actionType]) this.serializableDataByType[actionType] = {};
this.serializableDataByType[actionType][actionName] = [];
this.serializableDataByType[actionType][name] = [];

await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
Expand Down Expand Up @@ -718,26 +736,7 @@ export default class Interpreter extends EventEmitter {
debug.setActionType(String(step.action));
}

if ((step as any)?.name) {
stepName = (step as any).name;
} else if (
Array.isArray((step as any)?.args) &&
(step as any).args.length > 0 &&
typeof (step as any).args[0] === "object" &&
"__name" in (step as any).args[0]
) {
stepName = (step as any).args[0].__name;
} else if (
typeof (step as any)?.args === "object" &&
step?.args !== null &&
"__name" in (step as any).args
) {
stepName = (step as any).args.__name;
}

if (!stepName) {
stepName = String(step.action);
}
stepName = (step as any)?.name || String(step.action);

if (debug && typeof (debug as any).setActionName === "function") {
(debug as any).setActionName(stepName);
Expand All @@ -751,6 +750,9 @@ export default class Interpreter extends EventEmitter {
const params = !step.args || Array.isArray(step.args) ? step.args : [step.args];
if (step.action === 'screenshot') {
await (wawActions.screenshot as any)(...(params ?? []), stepName ?? undefined);
} else if (step.action === 'scrapeList' || step.action === 'scrapeSchema') {
const actionName = (step as any).name || "";
await wawActions[step.action as CustomFunctions](...(params ?? []), actionName);
} else {
await wawActions[step.action as CustomFunctions](...(params ?? []));
}
Expand Down Expand Up @@ -812,26 +814,39 @@ export default class Interpreter extends EventEmitter {
fields: any,
limit?: number,
pagination: any
}) {
}, providedActionName: string = "") {
if (this.isAborted) {
this.log('Workflow aborted, stopping pagination', Level.WARN);
return [];
}

const actionType = "scrapeList";
let actionName = providedActionName || "";
if (!actionName || actionName.trim() === "") {
this.scrapeListCounter++;
actionName = `List ${this.scrapeListCounter}`;
}

if (!this.serializableDataByType[actionType]) {
this.serializableDataByType[actionType] = {};
}
if (!this.serializableDataByType[actionType][actionName]) {
this.serializableDataByType[actionType][actionName] = [];
}

let allResults: Record<string, any>[] = [];
let previousHeight = 0;
let scrapedItems: Set<string> = new Set<string>();
let visitedUrls: Set<string> = new Set<string>();
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000; // 1 second delay between retries
const RETRY_DELAY = 1000;
const MAX_UNCHANGED_RESULTS = 5;

const debugLog = (message: string, ...args: any[]) => {
console.log(`[Page ${visitedUrls.size}] [URL: ${page.url()}] ${message}`, ...args);
};

const scrapeCurrentPage = async () => {
// Check abort flag before scraping current page
if (this.isAborted) {
debugLog("Workflow aborted, stopping scrapeCurrentPage");
return;
Expand All @@ -849,7 +864,6 @@ export default class Interpreter extends EventEmitter {
debugLog(`Page evaluation failed: ${error.message}`);
return;
}

const newResults = results.filter(item => {
const uniqueKey = JSON.stringify(item);
if (scrapedItems.has(uniqueKey)) return false;
Expand All @@ -859,7 +873,11 @@ export default class Interpreter extends EventEmitter {
allResults = allResults.concat(newResults);
debugLog("Results collected:", allResults.length);

await this.options.serializableCallback(allResults);
this.serializableDataByType[actionType][actionName] = [...allResults];
await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
};

const checkLimit = () => {
Expand Down
38 changes: 28 additions & 10 deletions server/src/workflow-management/classes/Interpreter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,37 +567,55 @@ export class WorkflowInterpreter {
typeKey = "scrapeSchema";
}

if (this.currentActionType === "scrapeList" && data.scrapeList) {
if (typeKey === "scrapeList" && data.scrapeList) {
data = data.scrapeList;
} else if (this.currentActionType === "scrapeSchema" && data.scrapeSchema) {
} else if (typeKey === "scrapeSchema" && data.scrapeSchema) {
data = data.scrapeSchema;
}

let actionName = this.currentActionName || "";
if (typeKey === "scrapeList") {
actionName = this.getUniqueActionName(typeKey, this.currentActionName);
let actionName = "";
if (typeKey === "scrapeList" && data && typeof data === "object" && !Array.isArray(data)) {
const keys = Object.keys(data);
if (keys.length === 1) {
actionName = keys[0];
data = data[actionName];
} else if (keys.length > 1) {
actionName = keys[keys.length - 1];
data = data[actionName];
}
}

if (!actionName) {
actionName = this.currentActionName || "";
if (typeKey === "scrapeList" && !actionName) {
actionName = this.getUniqueActionName(typeKey, "");
}
}

const flattened = Array.isArray(data)
? data
: (data?.List ?? (data && typeof data === 'object' ? Object.values(data).flat?.() ?? data : []));
: (
data?.List ??
(data && typeof data === "object"
? Object.values(data).flat?.() ?? data
: [])
);

if (!this.serializableDataByType[typeKey]) {
this.serializableDataByType[typeKey] = {};
}

this.serializableDataByType[typeKey][actionName] = flattened;

await this.persistDataToDatabase(typeKey, { [actionName]: flattened });
await this.persistDataToDatabase(typeKey, {
[actionName]: flattened,
});

this.socket.emit("serializableCallback", {
type: typeKey,
name: actionName,
data: flattened,
});

this.currentActionType = null;
this.currentActionName = null;
} catch (err: any) {
logger.log('error', `serializableCallback handler failed: ${err.message}`);
}
Expand Down