1+ import Airtable from 'airtable' ;
2+ import logger from '../../logger' ;
3+ import Run from '../../models/Run' ;
4+ import Robot from '../../models/Robot' ;
5+
6+ interface AirtableUpdateTask {
7+ robotId : string ;
8+ runId : string ;
9+ status : 'pending' | 'completed' | 'failed' ;
10+ retries : number ;
11+ }
12+
13+ const MAX_RETRIES = 5 ;
14+
15+ export let airtableUpdateTasks : { [ runId : string ] : AirtableUpdateTask } = { } ;
16+
17+ /**
18+ * Updates Airtable with data from a successful run.
19+ * @param robotId - The ID of the robot.
20+ * @param runId - The ID of the run.
21+ */
22+ export async function updateAirtable ( robotId : string , runId : string ) {
23+ try {
24+ const run = await Run . findOne ( { where : { runId } } ) ;
25+
26+ if ( ! run ) {
27+ throw new Error ( `Run not found for runId: ${ runId } ` ) ;
28+ }
29+
30+ const plainRun = run . toJSON ( ) ;
31+
32+ if ( plainRun . status === 'success' ) {
33+ let data : { [ key : string ] : any } [ ] = [ ] ;
34+ if ( plainRun . serializableOutput && Object . keys ( plainRun . serializableOutput ) . length > 0 ) {
35+ data = plainRun . serializableOutput [ 'item-0' ] as { [ key : string ] : any } [ ] ;
36+ } else if ( plainRun . binaryOutput && plainRun . binaryOutput [ 'item-0' ] ) {
37+ const binaryUrl = plainRun . binaryOutput [ 'item-0' ] as string ;
38+ data = [ { "Screenshot URL" : binaryUrl } ] ;
39+ }
40+
41+ const robot = await Robot . findOne ( { where : { 'recording_meta.id' : robotId } } ) ;
42+
43+ if ( ! robot ) {
44+ throw new Error ( `Robot not found for robotId: ${ robotId } ` ) ;
45+ }
46+
47+ const plainRobot = robot . toJSON ( ) ;
48+
49+ const tableName = plainRobot . airtable_table_name ;
50+ const baseId = plainRobot . airtable_base_id ;
51+ const personalAccessToken = plainRobot . airtable_personal_access_token ;
52+
53+ if ( tableName && baseId && personalAccessToken ) {
54+ console . log ( `Preparing to write data to Airtable for robot: ${ robotId } , table: ${ tableName } ` ) ;
55+
56+ await writeDataToAirtable ( baseId , tableName , personalAccessToken , data ) ;
57+ console . log ( `Data written to Airtable successfully for Robot: ${ robotId } and Run: ${ runId } ` ) ;
58+ } else {
59+ console . log ( 'Airtable integration not configured.' ) ;
60+ }
61+ } else {
62+ console . log ( 'Run status is not success or serializableOutput is missing.' ) ;
63+ }
64+ } catch ( error : any ) {
65+ console . error ( `Failed to write data to Airtable for Robot: ${ robotId } and Run: ${ runId } : ${ error . message } ` ) ;
66+ }
67+ }
68+
69+ /**
70+ * Writes data to Airtable.
71+ * @param baseId - The ID of the Airtable base.
72+ * @param tableName - The name of the Airtable table.
73+ * @param personalAccessToken - The Airtable Personal Access Token.
74+ * @param data - The data to write to Airtable.
75+ */
76+ export async function writeDataToAirtable ( baseId : string , tableName : string , personalAccessToken : string , data : any [ ] ) {
77+ try {
78+ // Initialize Airtable with Personal Access Token
79+ const base = new Airtable ( { apiKey : personalAccessToken } ) . base ( baseId ) ;
80+
81+ const table = base ( tableName ) ;
82+
83+ // Prepare records for Airtable
84+ const records = data . map ( ( row ) => ( { fields : row } ) ) ;
85+
86+ // Write data to Airtable
87+ const response = await table . create ( records ) ;
88+
89+ if ( response ) {
90+ console . log ( 'Data successfully appended to Airtable.' ) ;
91+ } else {
92+ console . error ( 'Airtable append failed:' , response ) ;
93+ }
94+
95+ logger . log ( `info` , `Data written to Airtable: ${ tableName } ` ) ;
96+ } catch ( error : any ) {
97+ logger . log ( `error` , `Error writing data to Airtable: ${ error . message } ` ) ;
98+ throw error ;
99+ }
100+ }
101+
102+ /**
103+ * Processes pending Airtable update tasks.
104+ */
105+ export const processAirtableUpdates = async ( ) => {
106+ while ( true ) {
107+ let hasPendingTasks = false ;
108+ for ( const runId in airtableUpdateTasks ) {
109+ const task = airtableUpdateTasks [ runId ] ;
110+ console . log ( `Processing task for runId: ${ runId } , status: ${ task . status } ` ) ;
111+
112+ if ( task . status === 'pending' ) {
113+ hasPendingTasks = true ;
114+ try {
115+ await updateAirtable ( task . robotId , task . runId ) ;
116+ console . log ( `Successfully updated Airtable for runId: ${ runId } ` ) ;
117+ delete airtableUpdateTasks [ runId ] ;
118+ } catch ( error : any ) {
119+ console . error ( `Failed to update Airtable for run ${ task . runId } :` , error ) ;
120+ if ( task . retries < MAX_RETRIES ) {
121+ airtableUpdateTasks [ runId ] . retries += 1 ;
122+ console . log ( `Retrying task for runId: ${ runId } , attempt: ${ task . retries } ` ) ;
123+ } else {
124+ airtableUpdateTasks [ runId ] . status = 'failed' ;
125+ console . log ( `Max retries reached for runId: ${ runId } . Marking task as failed.` ) ;
126+ }
127+ }
128+ }
129+ }
130+
131+ if ( ! hasPendingTasks ) {
132+ console . log ( 'No pending tasks. Exiting loop.' ) ;
133+ break ;
134+ }
135+
136+ console . log ( 'Waiting for 5 seconds before checking again...' ) ;
137+ await new Promise ( ( resolve ) => setTimeout ( resolve , 5000 ) ) ;
138+ }
139+ } ;
0 commit comments