diff --git a/packages/api/src/filestorage/drive/sync/sync.service.ts b/packages/api/src/filestorage/drive/sync/sync.service.ts index 7ef892a7b..02e394248 100644 --- a/packages/api/src/filestorage/drive/sync/sync.service.ts +++ b/packages/api/src/filestorage/drive/sync/sync.service.ts @@ -33,7 +33,7 @@ export class SyncService implements OnModuleInit, IBaseSync { this.registry.registerService('filestorage', 'drive', this); } onModuleInit() { -// + // } @Cron('0 */8 * * *') // every 8 hours diff --git a/packages/api/src/filestorage/file/services/googledrive/index.ts b/packages/api/src/filestorage/file/services/googledrive/index.ts index 7b42d2d52..340d04705 100644 --- a/packages/api/src/filestorage/file/services/googledrive/index.ts +++ b/packages/api/src/filestorage/file/services/googledrive/index.ts @@ -40,6 +40,8 @@ export class GoogleDriveService implements IFileService { async ingestFiles( sourceData: GoogleDriveFileOutput[], connectionId: string, + drive: ReturnType, + remote_cursor?: string, customFieldMappings?: { slug: string; remote_id: string; @@ -47,16 +49,18 @@ export class GoogleDriveService implements IFileService { extraParams?: { [key: string]: any }, ): Promise { // Extract all permissions from the files - const allPermissions: GoogledrivePermissionOutput[] = sourceData.reduce< - GoogledrivePermissionOutput[] - >((accumulator, file) => { - if (file.permissions?.length) { - accumulator.push(...file.permissions); - } - return accumulator; - }, []); + const permissionsIds: string[] = Array.from( + new Set( + sourceData.reduce((accumulator, file) => { + if (file.permissionIds?.length) { + accumulator.push(...file.permissionIds); + } + return accumulator; + }, []), + ), + ); - if (allPermissions.length === 0) { + if (permissionsIds.length === 0) { this.logger.log('No permissions found in the provided files.'); return this.ingestService.ingestData< UnifiedFilestorageFileOutput, @@ -72,11 +76,10 @@ export class GoogleDriveService implements IFileService { ); } - // Remove duplicate permissions based on 'id' - const uniquePermissions: GoogledrivePermissionOutput[] = Array.from( - new Map( - allPermissions.map((permission) => [permission.id, permission]), - ).values(), + const uniquePermissions = await this.fetchPermissions( + permissionsIds, + sourceData, + drive, ); // Ingest permissions using the ingestService @@ -105,9 +108,9 @@ export class GoogleDriveService implements IFileService { // Update each file's permissions with the synced permission IDs sourceData.forEach((file) => { - if (file.permissions?.length) { - file.permissions = file.permissions - .map((permission) => permissionIdMap.get(permission.id)) + if (file.permissionIds?.length) { + file.internal_permissions = file.permissionIds + .map((permissionId) => permissionIdMap.get(permissionId)) .filter( (permissionId): permissionId is string => permissionId !== undefined, @@ -133,6 +136,17 @@ export class GoogleDriveService implements IFileService { `Ingested a batch of ${syncedFiles.length} googledrive files.`, ); + if (remote_cursor) { + await this.prisma.fs_drives.updateMany({ + where: { + id_connection: connectionId, + }, + data: { + remote_cursor: remote_cursor, + }, + }); + } + return syncedFiles; } @@ -175,6 +189,8 @@ export class GoogleDriveService implements IFileService { await this.ingestFiles( files, connection.id_connection, + drive, + null, custom_field_mappings, ingestParams, ); @@ -194,12 +210,14 @@ export class GoogleDriveService implements IFileService { } } else { // incremental sync using changes api - const { filesToSync, moreChangesToFetch } = + const { filesToSync, moreChangesToFetch, remote_cursor } = await this.getFilesToSyncFromChangesApi(drive, connection); await this.ingestFiles( filesToSync, connection.id_connection, + drive, + remote_cursor, custom_field_mappings, ingestParams, ); @@ -232,73 +250,84 @@ export class GoogleDriveService implements IFileService { async getFilesToSyncFromChangesApi( drive: ReturnType, connection: any, + maxApiCalls = 10, // number of times we use nextPageToken ) { - let moreChangesToFetch = false; // becomes true if there are more changes to fetch in any drive const filesToSync: GoogleDriveFileOutput[] = []; + let apiCallCount = 0; + let pageToken: string | undefined; + let newRemoteCursor: string | undefined; - const remoteCursor = await this.getRemoteCursor(connection); - - const response = await this.rateLimitedRequest(() => - drive.changes.list({ - pageToken: remoteCursor, - supportsAllDrives: true, - includeItemsFromAllDrives: true, - pageSize: 1000, - fields: - 'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissions))', - }), - ); + // Get initial cursor + pageToken = await this.getRemoteCursor(drive, connection.id_connection); - const { changes, nextPageToken, newStartPageToken } = response.data; + do { + const response = await this.rateLimitedRequest(() => + drive.changes.list({ + pageToken, + supportsAllDrives: true, + includeItemsFromAllDrives: true, + pageSize: 1000, + fields: + 'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissionIds))', + }), + ); - const batchFiles = changes - .filter( - (change) => - change.file?.mimeType !== 'application/vnd.google-apps.folder', - ) - .map((change) => change.file); + const { changes, nextPageToken, newStartPageToken } = response.data; - filesToSync.push(...(batchFiles as GoogleDriveFileOutput[])); + const batchFiles = changes + .filter( + (change) => + change.file?.mimeType !== 'application/vnd.google-apps.folder', + ) + .map((change) => change.file); - if (nextPageToken) { - moreChangesToFetch = true; - } + filesToSync.push(...(batchFiles as GoogleDriveFileOutput[])); - const nextCursor = newStartPageToken ? newStartPageToken : nextPageToken; + // Update pageToken for next iteration + pageToken = nextPageToken; + apiCallCount++; - // all drives share the same cursor (might update this in the future) - await this.prisma.fs_drives.updateMany({ - where: { - id_connection: connection.id_connection, - }, - data: { - remote_cursor: nextCursor, - }, - }); + if (!nextPageToken || apiCallCount >= maxApiCalls) { + newRemoteCursor = newStartPageToken || nextPageToken; + } + } while (pageToken && apiCallCount < maxApiCalls); return { filesToSync, - moreChangesToFetch, + moreChangesToFetch: !!pageToken, // true if there's still a next page + remote_cursor: newRemoteCursor, }; } - private async getRemoteCursor(connection: any) { + private async getRemoteCursor( + drive: ReturnType, + connectionId: string, + ): Promise { const internalDrive = await this.prisma.fs_drives.findFirst({ - where: { - id_connection: connection.id_connection, - }, - select: { - remote_cursor: true, - id_fs_drive: true, - }, + where: { id_connection: connectionId }, // all drives share the same cursor for now + select: { id_fs_drive: true, remote_cursor: true }, }); - return internalDrive?.remote_cursor; + let remoteCursor = internalDrive?.remote_cursor; + if (!remoteCursor) { + const startPageToken = await this.rateLimitedRequest(() => + drive.changes + .getStartPageToken({ supportsAllDrives: true }) // one cursor for all drives + .then((response) => response.data.startPageToken), + ); + remoteCursor = startPageToken; + + await this.prisma.fs_drives.updateMany({ + where: { id_connection: connectionId }, + data: { remote_cursor: remoteCursor }, + }); + } + return remoteCursor; } async getFilesToSync( drive: any, pageToken?: string, - pages = 20, // number of times we use nextPageToken + pages = 5, // number of times we use nextPageToken ) { interface DriveResponse { data: { @@ -316,7 +345,7 @@ export class GoogleDriveService implements IFileService { drive.files.list({ q: 'mimeType != "application/vnd.google-apps.folder"', fields: - 'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissions, trashed)', + 'nextPageToken, files(id, name, mimeType, createdTime, modifiedTime, size, parents, webViewLink, driveId, permissionIds, trashed)', pageSize: BATCH_SIZE, pageToken: nextPageToken, includeItemsFromAllDrives: true, @@ -353,6 +382,51 @@ export class GoogleDriveService implements IFileService { ); } + private async fetchPermissions( + permissionIds: string[], + files: GoogleDriveFileOutput[], + drive: ReturnType, + ): Promise { + const permissionIdToFiles = new Map(); + + for (const file of files) { + if (file.permissionIds?.length) { + for (const permissionId of file.permissionIds) { + if (permissionIdToFiles.has(permissionId)) { + // only need one file_id to get the permission + continue; + } else { + permissionIdToFiles.set(permissionId, [file.id]); + } + } + } + } + + const permissions: GoogledrivePermissionOutput[] = []; + const entries = Array.from(permissionIdToFiles.entries()); + + // do in batches of 10 + for (let i = 0; i < entries.length; i += 10) { + const batch = entries.slice(i, i + 10); + const batchPromises = batch.map(([permissionId, fileIds]) => + drive.permissions.get({ + permissionId, + fileId: fileIds[0], + supportsAllDrives: true, + }), + ); + + const batchResults = await Promise.all(batchPromises); + permissions.push( + ...batchResults.map( + (result) => result.data as unknown as GoogledrivePermissionOutput, + ), + ); + } + + return permissions; + } + private async rateLimitedRequest(request: () => Promise): Promise { let attempt = 0; let backoff = INITIAL_BACKOFF; diff --git a/packages/api/src/filestorage/file/services/googledrive/mappers.ts b/packages/api/src/filestorage/file/services/googledrive/mappers.ts index 63f27b529..dafaa223e 100644 --- a/packages/api/src/filestorage/file/services/googledrive/mappers.ts +++ b/packages/api/src/filestorage/file/services/googledrive/mappers.ts @@ -93,7 +93,7 @@ export class GoogleDriveFileMapper implements IFileMapper { file_url: file.webViewLink || file.webContentLink || null, mime_type: file.mimeType || null, size: file.size || null, - permissions: file.permissions, + permissions: file.internal_permissions, shared_link: null, ...opts, field_mappings, diff --git a/packages/api/src/filestorage/file/services/googledrive/types.ts b/packages/api/src/filestorage/file/services/googledrive/types.ts index 657b0c0ec..758445a77 100644 --- a/packages/api/src/filestorage/file/services/googledrive/types.ts +++ b/packages/api/src/filestorage/file/services/googledrive/types.ts @@ -1,4 +1,4 @@ -export type GoogleDriveFileInput = Partial +export type GoogleDriveFileInput = Partial; export interface GoogleDriveFileOutput { kind?: string; @@ -105,4 +105,7 @@ export interface GoogleDriveFileOutput { }; sha1Checksum?: string; sha256Checksum?: string; + + // Internal fields + internal_permissions?: string[]; // Permissions ID in panora db } diff --git a/packages/api/src/filestorage/folder/services/googledrive/index.ts b/packages/api/src/filestorage/folder/services/googledrive/index.ts index 5a9f8b371..e4cc56b3b 100644 --- a/packages/api/src/filestorage/folder/services/googledrive/index.ts +++ b/packages/api/src/filestorage/folder/services/googledrive/index.ts @@ -138,7 +138,11 @@ export class GoogleDriveFolderService implements IFolderService { ); // Sync permissions for folders - await this.ingestPermissionsForFolders(folders, connection.id_connection); + await this.ingestPermissionsForFolders( + folders, + connection.id_connection, + auth, + ); this.logger.log(`Synced ${folders.length} Google Drive folders!`); @@ -190,7 +194,7 @@ export class GoogleDriveFolderService implements IFolderService { drive.files.list({ q: buildQuery(parentId, driveId), fields: - 'nextPageToken, files(id, name, parents, createdTime, modifiedTime, driveId, webViewLink, permissions, trashed)', + 'nextPageToken, files(id, name, parents, createdTime, modifiedTime, driveId, webViewLink, permissionIds, trashed)', pageToken, includeItemsFromAllDrives: true, supportsAllDrives: true, @@ -307,6 +311,7 @@ export class GoogleDriveFolderService implements IFolderService { async ingestPermissionsForFolders( folders: GoogleDriveFolderOutput[], connectionId: string, + auth: OAuth2Client, ): Promise { if (folders.length === 0) { this.logger.log('No folders provided for ingesting permissions.'); @@ -315,29 +320,26 @@ export class GoogleDriveFolderService implements IFolderService { try { // Extract all permissions from the folders - const allPermissions: GoogledrivePermissionOutput[] = folders.reduce< - GoogledrivePermissionOutput[] - >((accumulator, folder) => { - if (folder.permissions?.length) { - accumulator.push(...folder.permissions); - } - return accumulator; - }, []); + const permissionsIds: string[] = Array.from( + new Set( + folders.reduce((accumulator, folder) => { + if (folder.permissionIds?.length) { + accumulator.push(...folder.permissionIds); + } + return accumulator; + }, []), + ), + ); - if (allPermissions.length === 0) { + if (permissionsIds.length === 0) { this.logger.log('No permissions found in the provided folders.'); return folders; } - // Remove duplicate permissions based on 'id' - const uniquePermissions: GoogledrivePermissionOutput[] = Array.from( - new Map( - allPermissions.map((permission) => [permission.id, permission]), - ).values(), - ); - - this.logger.log( - `Ingesting ${uniquePermissions.length} unique permissions.`, + const uniquePermissions = await this.fetchPermissions( + permissionsIds, + folders, + auth, ); // Ingest permissions using the ingestService @@ -362,9 +364,9 @@ export class GoogleDriveFolderService implements IFolderService { // Update each folder's permissions with the synced permission IDs folders.forEach((folder) => { - if (folder.permissions?.length) { - folder.permissions = folder.permissions - .map((permission) => permissionIdMap.get(permission.id)) + if (folder.permissionIds?.length) { + folder.internal_permissions = folder.permissionIds + .map((permissionId) => permissionIdMap.get(permissionId)) .filter( (permissionId): permissionId is string => permissionId !== undefined, @@ -373,7 +375,7 @@ export class GoogleDriveFolderService implements IFolderService { }); this.logger.log( - `Ingested ${syncedPermissions.length} googledrive permissions.`, + `Ingested ${syncedPermissions.length} googledrive permissions for folders.`, ); return folders; } catch (error) { @@ -515,6 +517,47 @@ export class GoogleDriveFolderService implements IFolderService { return pending.length === remaining.length; } + private async fetchPermissions(permissionIds, folders, auth) { + const drive = google.drive({ version: 'v3', auth }); + const permissionIdToFolders = new Map(); + + for (const folder of folders) { + if (folder.permissionIds?.length) { + for (const permissionId of folder.permissionIds) { + if (permissionIdToFolders.has(permissionId)) { + permissionIdToFolders.get(permissionId)?.push(folder.id); + } else { + permissionIdToFolders.set(permissionId, [folder.id]); + } + } + } + } + + const permissions: GoogledrivePermissionOutput[] = []; + const entries = Array.from(permissionIdToFolders.entries()); + + // do in batches of 10 + for (let i = 0; i < entries.length; i += 10) { + const batch = entries.slice(i, i + 10); + const batchPromises = batch.map(([permissionId, folderIds]) => + drive.permissions.get({ + permissionId, + fileId: folderIds[0], + supportsAllDrives: true, + }), + ); + + const batchResults = await Promise.all(batchPromises); + permissions.push( + ...batchResults.map( + (result) => result.data as unknown as GoogledrivePermissionOutput, + ), + ); + } + + return permissions; + } + private async handleUnresolvedFolders( pending: GoogleDriveFolderOutput[], output: GoogleDriveFolderOutput[], @@ -650,7 +693,7 @@ export class GoogleDriveFolderService implements IFolderService { supportsAllDrives: true, pageSize: 1000, fields: - 'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissions))', + 'nextPageToken, newStartPageToken, changes(file(id,name,mimeType,createdTime,modifiedTime,size,parents,webViewLink,driveId,trashed,permissionIds))', }); const batchFolders = response.data.changes @@ -685,6 +728,8 @@ export class GoogleDriveFolderService implements IFolderService { .then((response) => response.data.startPageToken), ); remoteCursor = startPageToken; + + // for first incremental sync await this.updateRemoteCursor(remoteCursor, connectionId); } return remoteCursor; diff --git a/packages/api/src/filestorage/folder/services/googledrive/mappers.ts b/packages/api/src/filestorage/folder/services/googledrive/mappers.ts index 94421dc7e..8866f8b9e 100644 --- a/packages/api/src/filestorage/folder/services/googledrive/mappers.ts +++ b/packages/api/src/filestorage/folder/services/googledrive/mappers.ts @@ -105,7 +105,7 @@ export class GoogleDriveFolderMapper implements IFolderMapper { : null, folder_url: folder.webViewLink || null, name: folder.name, - permissions: folder.permissions, + permissions: folder.internal_permissions, ...opts, field_mappings, }; diff --git a/packages/api/src/filestorage/folder/services/googledrive/types.ts b/packages/api/src/filestorage/folder/services/googledrive/types.ts index eb8ca0c32..0124f2a46 100644 --- a/packages/api/src/filestorage/folder/services/googledrive/types.ts +++ b/packages/api/src/filestorage/folder/services/googledrive/types.ts @@ -35,7 +35,10 @@ export interface GoogleDriveFolderOutput { explicitlyTrashed?: boolean; spaces?: string[]; driveId?: string; + permissionIds?: string[]; + // Internal fields internal_id?: string | null; // Folder ID in panora db internal_parent_folder_id?: string | null; // Parent Folder ID in panora db + internal_permissions?: string[]; // Permissions ID in panora db } diff --git a/packages/api/src/filestorage/folder/sync/sync.service.ts b/packages/api/src/filestorage/folder/sync/sync.service.ts index f9ba7d4b3..acd1225b1 100644 --- a/packages/api/src/filestorage/folder/sync/sync.service.ts +++ b/packages/api/src/filestorage/folder/sync/sync.service.ts @@ -93,6 +93,7 @@ export class SyncService implements OnModuleInit, IBaseSync { ): Promise { try { const folders_results: FileStorageFolder[] = []; + const driveLookupCache = new Map(); const updateOrCreateFolder = async ( folder: UnifiedFilestorageFolderOutput, @@ -107,16 +108,26 @@ export class SyncService implements OnModuleInit, IBaseSync { let drive_id_by_remote_drive_id = null; if (folder.remote_drive_id) { - const drive = await this.prisma.fs_drives.findFirst({ - where: { - remote_id: folder.remote_drive_id, - id_connection: connection_id, - }, - select: { - id_fs_drive: true, - }, - }); - drive_id_by_remote_drive_id = drive?.id_fs_drive; + if (driveLookupCache.has(folder.remote_drive_id)) { + drive_id_by_remote_drive_id = driveLookupCache.get( + folder.remote_drive_id, + ); + } else { + const drive = await this.prisma.fs_drives.findFirst({ + where: { + remote_id: folder.remote_drive_id, + id_connection: connection_id, + }, + select: { + id_fs_drive: true, + }, + }); + drive_id_by_remote_drive_id = drive?.id_fs_drive ?? null; + driveLookupCache.set( + folder.remote_drive_id, + drive_id_by_remote_drive_id, + ); + } } const baseData: any = {