Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions maxun-core/src/interpret.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export default class Interpreter extends EventEmitter {
scrapeSchema: {}
};

private scrapeListCounter: number = 0;

constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
super();
this.workflow = workflow.workflow;
Expand Down Expand Up @@ -575,12 +577,13 @@ export default class Interpreter extends EventEmitter {

try {
await this.ensureScriptsLoaded(page);

if (this.options.debugChannel?.incrementScrapeListIndex) {
this.options.debugChannel.incrementScrapeListIndex();
}

let scrapeResults = [];
let paginationUsed = false;

if (!config.pagination) {
scrapeResults = await page.evaluate((cfg) => {
Expand All @@ -592,32 +595,47 @@ export default class Interpreter extends EventEmitter {
}
}, config);
} else {
paginationUsed = true;
scrapeResults = await this.handlePagination(page, config);
}

if (!Array.isArray(scrapeResults)) {
scrapeResults = [];
}

const actionType = "scrapeList";
const actionName = (config as any).__name || "List";
console.log(`ScrapeList completed with ${scrapeResults.length} results`);

if (!this.serializableDataByType[actionType]) this.serializableDataByType[actionType] = {};
if (!this.serializableDataByType[actionType][actionName]) {
this.serializableDataByType[actionType][actionName] = [];
}
if (!paginationUsed) {
const actionType = "scrapeList";
let actionName = (config as any).__name || "";

this.serializableDataByType[actionType][actionName].push(...scrapeResults);
if (!actionName || actionName.trim() === "") {
this.scrapeListCounter++;
actionName = `List ${this.scrapeListCounter}`;
}

await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
if (!this.serializableDataByType[actionType]) this.serializableDataByType[actionType] = {};
if (!this.serializableDataByType[actionType][actionName]) {
this.serializableDataByType[actionType][actionName] = [];
}

this.serializableDataByType[actionType][actionName].push(...scrapeResults);

await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
}
} catch (error) {
console.error('ScrapeList action failed completely:', error.message);

const actionType = "scrapeList";
const actionName = (config as any).__name || "List";
let actionName = (config as any).__name || "";

if (!actionName || actionName.trim() === "") {
this.scrapeListCounter++;
actionName = `List ${this.scrapeListCounter}`;
}

if (!this.namedResults[actionType]) this.namedResults[actionType] = {};
this.namedResults[actionType][actionName] = [];
Expand Down Expand Up @@ -818,20 +836,33 @@ export default class Interpreter extends EventEmitter {
return [];
}

const actionType = "scrapeList";
let actionName = (config as any).__name || "";
if (!actionName || actionName.trim() === "") {
this.scrapeListCounter++;
actionName = `List ${this.scrapeListCounter}`;
}

if (!this.serializableDataByType[actionType]) {
this.serializableDataByType[actionType] = {};
}
if (!this.serializableDataByType[actionType][actionName]) {
this.serializableDataByType[actionType][actionName] = [];
}

let allResults: Record<string, any>[] = [];
let previousHeight = 0;
let scrapedItems: Set<string> = new Set<string>();
let visitedUrls: Set<string> = new Set<string>();
const MAX_RETRIES = 3;
const RETRY_DELAY = 1000; // 1 second delay between retries
const RETRY_DELAY = 1000;
const MAX_UNCHANGED_RESULTS = 5;

const debugLog = (message: string, ...args: any[]) => {
console.log(`[Page ${visitedUrls.size}] [URL: ${page.url()}] ${message}`, ...args);
};

const scrapeCurrentPage = async () => {
// Check abort flag before scraping current page
if (this.isAborted) {
debugLog("Workflow aborted, stopping scrapeCurrentPage");
return;
Expand All @@ -849,7 +880,6 @@ export default class Interpreter extends EventEmitter {
debugLog(`Page evaluation failed: ${error.message}`);
return;
}

const newResults = results.filter(item => {
const uniqueKey = JSON.stringify(item);
if (scrapedItems.has(uniqueKey)) return false;
Expand All @@ -859,7 +889,11 @@ export default class Interpreter extends EventEmitter {
allResults = allResults.concat(newResults);
debugLog("Results collected:", allResults.length);

await this.options.serializableCallback(allResults);
this.serializableDataByType[actionType][actionName] = [...allResults];
await this.options.serializableCallback({
scrapeList: this.serializableDataByType.scrapeList,
scrapeSchema: this.serializableDataByType.scrapeSchema
});
};

const checkLimit = () => {
Expand Down
38 changes: 28 additions & 10 deletions server/src/workflow-management/classes/Interpreter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,37 +567,55 @@ export class WorkflowInterpreter {
typeKey = "scrapeSchema";
}

if (this.currentActionType === "scrapeList" && data.scrapeList) {
if (typeKey === "scrapeList" && data.scrapeList) {
data = data.scrapeList;
} else if (this.currentActionType === "scrapeSchema" && data.scrapeSchema) {
} else if (typeKey === "scrapeSchema" && data.scrapeSchema) {
data = data.scrapeSchema;
}

let actionName = this.currentActionName || "";
if (typeKey === "scrapeList") {
actionName = this.getUniqueActionName(typeKey, this.currentActionName);
let actionName = "";
if (typeKey === "scrapeList" && data && typeof data === "object" && !Array.isArray(data)) {
const keys = Object.keys(data);
if (keys.length === 1) {
actionName = keys[0];
data = data[actionName];
} else if (keys.length > 1) {
actionName = keys[keys.length - 1];
data = data[actionName];
}
}

if (!actionName) {
actionName = this.currentActionName || "";
if (typeKey === "scrapeList" && !actionName) {
actionName = this.getUniqueActionName(typeKey, "");
}
}

const flattened = Array.isArray(data)
? data
: (data?.List ?? (data && typeof data === 'object' ? Object.values(data).flat?.() ?? data : []));
: (
data?.List ??
(data && typeof data === "object"
? Object.values(data).flat?.() ?? data
: [])
);

if (!this.serializableDataByType[typeKey]) {
this.serializableDataByType[typeKey] = {};
}

this.serializableDataByType[typeKey][actionName] = flattened;

await this.persistDataToDatabase(typeKey, { [actionName]: flattened });
await this.persistDataToDatabase(typeKey, {
[actionName]: flattened,
});

this.socket.emit("serializableCallback", {
type: typeKey,
name: actionName,
data: flattened,
});

this.currentActionType = null;
this.currentActionName = null;
} catch (err: any) {
logger.log('error', `serializableCallback handler failed: ${err.message}`);
}
Expand Down