Skip to content

Commit 279612d

Browse files
committed
feat: handle robot run for workers
1 parent de2d828 commit 279612d

File tree

1 file changed

+229
-0
lines changed

1 file changed

+229
-0
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Import core dependencies
2+
import { chromium } from 'playwright-extra';
3+
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
4+
import { Page } from "playwright";
5+
6+
// Import local utilities and services
7+
import { destroyRemoteBrowser } from '../browser-management/controller';
8+
import logger from '../logger';
9+
import { browserPool } from "../server";
10+
import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "./integrations/gsheet";
11+
import { BinaryOutputService } from "../storage/mino";
12+
import { capture } from "../utils/analytics";
13+
14+
// Import models and types
15+
import Robot from "../models/Robot";
16+
import Run from "../models/Run";
17+
import { WorkflowFile } from "maxun-core";
18+
import { io, Socket } from 'socket.io-client';
19+
20+
// Enable stealth mode for chromium
21+
chromium.use(stealthPlugin());
22+
23+
async function readyForRunHandler(browserId: string, id: string) {
24+
try {
25+
const result = await executeRun(id);
26+
27+
const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, {
28+
transports: ['websocket'],
29+
rejectUnauthorized: false
30+
});
31+
32+
if (result && result.success) {
33+
logger.info(`Interpretation of ${id} succeeded`);
34+
socket.emit('run-completed', 'success');
35+
resetRecordingState(browserId, id);
36+
return result.interpretationInfo;
37+
} else {
38+
logger.error(`Interpretation of ${id} failed`);
39+
socket.emit('run-completed', 'failed');
40+
await destroyRemoteBrowser(browserId);
41+
resetRecordingState(browserId, id);
42+
return null;
43+
}
44+
45+
} catch (error: any) {
46+
logger.error(`Error during readyForRunHandler: ${error.message}`);
47+
await destroyRemoteBrowser(browserId);
48+
return null;
49+
}
50+
}
51+
52+
function resetRecordingState(browserId: string, id: string) {
53+
browserId = '';
54+
id = '';
55+
}
56+
57+
function AddGeneratedFlags(workflow: WorkflowFile) {
58+
const copy = JSON.parse(JSON.stringify(workflow));
59+
for (let i = 0; i < workflow.workflow.length; i++) {
60+
copy.workflow[i].what.unshift({
61+
action: 'flag',
62+
args: ['generated'],
63+
});
64+
}
65+
return copy;
66+
}
67+
68+
async function executeRun(id: string) {
69+
try {
70+
const run = await Run.findOne({ where: { runId: id } });
71+
if (!run) {
72+
return {
73+
success: false,
74+
error: 'Run not found'
75+
};
76+
}
77+
78+
const plainRun = run.toJSON();
79+
80+
const recording = await Robot.findOne({
81+
where: { 'recording_meta.id': plainRun.robotMetaId },
82+
raw: true
83+
});
84+
if (!recording) {
85+
return {
86+
success: false,
87+
error: 'Recording not found'
88+
};
89+
}
90+
91+
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
92+
if (!browser) {
93+
throw new Error('Could not access browser');
94+
}
95+
96+
let currentPage = await browser.getCurrentPage();
97+
if (!currentPage) {
98+
throw new Error('Could not create a new page');
99+
}
100+
101+
const workflow = AddGeneratedFlags(recording.recording);
102+
const interpretationInfo = await browser.interpreter.InterpretRecording(
103+
workflow,
104+
currentPage,
105+
(newPage: Page) => currentPage = newPage,
106+
plainRun.interpreterSettings
107+
);
108+
109+
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
110+
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(
111+
run,
112+
interpretationInfo.binaryOutput
113+
);
114+
115+
await destroyRemoteBrowser(plainRun.browserId);
116+
117+
const updatedRun = await run.update({
118+
...run,
119+
status: 'success',
120+
finishedAt: new Date().toLocaleString(),
121+
browserId: plainRun.browserId,
122+
log: interpretationInfo.log.join('\n'),
123+
serializableOutput: interpretationInfo.serializableOutput,
124+
binaryOutput: uploadedBinaryOutput,
125+
});
126+
127+
let totalRowsExtracted = 0;
128+
let extractedScreenshotsCount = 0;
129+
let extractedItemsCount = 0;
130+
131+
if (updatedRun.dataValues.binaryOutput && updatedRun.dataValues.binaryOutput["item-0"]) {
132+
extractedScreenshotsCount = 1;
133+
}
134+
135+
if (updatedRun.dataValues.serializableOutput && updatedRun.dataValues.serializableOutput["item-0"]) {
136+
const itemsArray = updatedRun.dataValues.serializableOutput["item-0"];
137+
extractedItemsCount = itemsArray.length;
138+
totalRowsExtracted = itemsArray.reduce((total: number, item: any) => {
139+
return total + Object.keys(item).length;
140+
}, 0);
141+
}
142+
143+
logger.info(`Extracted Items Count: ${extractedItemsCount}`);
144+
logger.info(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
145+
logger.info(`Total Rows Extracted: ${totalRowsExtracted}`);
146+
147+
capture('maxun-oss-run-created-manual', {
148+
runId: id,
149+
created_at: new Date().toISOString(),
150+
status: 'success',
151+
extractedItemsCount,
152+
totalRowsExtracted,
153+
extractedScreenshotsCount,
154+
});
155+
156+
// Handle Google Sheets integration
157+
try {
158+
googleSheetUpdateTasks[plainRun.runId] = {
159+
robotId: plainRun.robotMetaId,
160+
runId: plainRun.runId,
161+
status: 'pending',
162+
retries: 5,
163+
};
164+
await processGoogleSheetUpdates();
165+
} catch (err: any) {
166+
logger.error(`Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
167+
}
168+
169+
return {
170+
success: true,
171+
interpretationInfo: updatedRun.toJSON()
172+
};
173+
174+
} catch (error: any) {
175+
logger.error(`Error running robot: ${error.message}`);
176+
const run = await Run.findOne({ where: { runId: id } });
177+
if (run) {
178+
await run.update({
179+
status: 'failed',
180+
finishedAt: new Date().toLocaleString(),
181+
});
182+
}
183+
184+
capture('maxun-oss-run-created-manual', {
185+
runId: id,
186+
created_at: new Date().toISOString(),
187+
status: 'failed',
188+
error_message: error.message,
189+
});
190+
191+
return {
192+
success: false,
193+
error: error.message,
194+
};
195+
}
196+
}
197+
198+
/**
199+
* Main function to handle running a recording through the worker process
200+
*/
201+
export async function handleRunRecording(id: string, userId: string, runId: string) {
202+
try {
203+
if (!id || !runId || !userId) {
204+
throw new Error('browserId or runId or userId is undefined');
205+
}
206+
207+
const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${id}`, {
208+
transports: ['websocket'],
209+
rejectUnauthorized: false
210+
});
211+
212+
socket.on('ready-for-run', () => readyForRunHandler(id, runId));
213+
214+
logger.info(`Running Robot: ${id}`);
215+
216+
socket.on('disconnect', () => {
217+
cleanupSocketListeners(socket, id, runId);
218+
});
219+
220+
} catch (error: any) {
221+
logger.error('Error running robot:', error);
222+
throw error;
223+
}
224+
}
225+
226+
function cleanupSocketListeners(socket: Socket, browserId: string, id: string) {
227+
socket.off('ready-for-run', () => readyForRunHandler(browserId, id));
228+
logger.info(`Cleaned up listeners for browserId: ${browserId}, runId: ${id}`);
229+
}

0 commit comments

Comments
 (0)