@@ -8,7 +8,6 @@ import { EJSON, EJSONOptions, ObjectId } from "bson";
88import { Transform } from "stream" ;
99import { pipeline } from "stream/promises" ;
1010import { MongoLogId } from "mongodb-log-writer" ;
11- import { RWLock } from "async-rwlock" ;
1211
1312import { UserConfig } from "./config.js" ;
1413import { LoggerBase , LogId } from "./logger.js" ;
@@ -57,43 +56,26 @@ type StoredExport = ReadyExport | InProgressExport;
5756 * JIRA: https://jira.mongodb.org/browse/MCP-104 */
5857type AvailableExport = Pick < StoredExport , "exportName" | "exportTitle" | "exportURI" | "exportPath" > ;
5958
60- export type ExportsManagerConfig = Pick < UserConfig , "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" > & {
61- // The maximum number of milliseconds to wait for in-flight operations to
62- // settle before shutting down ExportsManager.
63- activeOpsDrainTimeoutMs ?: number ;
64-
65- // The maximum number of milliseconds to wait before timing out queued reads
66- readTimeout ?: number ;
67-
68- // The maximum number of milliseconds to wait before timing out queued writes
69- writeTimeout ?: number ;
70- } ;
59+ export type ExportsManagerConfig = Pick < UserConfig , "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" > ;
7160
7261type ExportsManagerEvents = {
7362 closed : [ ] ;
7463 "export-expired" : [ string ] ;
7564 "export-available" : [ string ] ;
7665} ;
7766
78- class OperationAbortedError extends Error { }
79-
8067export class ExportsManager extends EventEmitter < ExportsManagerEvents > {
8168 private storedExports : Record < StoredExport [ "exportName" ] , StoredExport > = { } ;
8269 private exportsCleanupInProgress : boolean = false ;
8370 private exportsCleanupInterval ?: NodeJS . Timeout ;
8471 private readonly shutdownController : AbortController = new AbortController ( ) ;
85- private readonly readTimeoutMs : number ;
86- private readonly writeTimeoutMs : number ;
87- private readonly exportLocks : Map < string , RWLock > = new Map ( ) ;
8872
8973 private constructor (
9074 private readonly exportsDirectoryPath : string ,
9175 private readonly config : ExportsManagerConfig ,
9276 private readonly logger : LoggerBase
9377 ) {
9478 super ( ) ;
95- this . readTimeoutMs = this . config . readTimeout ?? 30_0000 ; // 30 seconds is the default timeout for an MCP request
96- this . writeTimeoutMs = this . config . writeTimeout ?? 120_000 ; // considering that writes can take time
9779 }
9880
9981 public get availableExports ( ) : AvailableExport [ ] {
@@ -143,29 +125,18 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
143125 try {
144126 this . assertIsNotShuttingDown ( ) ;
145127 exportName = decodeURIComponent ( exportName ) ;
146- return await this . withLock (
147- {
148- exportName,
149- mode : "read" ,
150- callbackName : "readExport" ,
151- } ,
152- async ( ) : Promise < string > => {
153- const exportHandle = this . storedExports [ exportName ] ;
154- if ( ! exportHandle ) {
155- throw new Error ( "Requested export has either expired or does not exist!" ) ;
156- }
128+ const exportHandle = this . storedExports [ exportName ] ;
129+ if ( ! exportHandle ) {
130+ throw new Error ( "Requested export has either expired or does not exist." ) ;
131+ }
157132
158- // This won't happen because of lock synchronization but
159- // keeping it here to make TS happy.
160- if ( exportHandle . exportStatus === "in-progress" ) {
161- throw new Error ( "Requested export is still being generated!" ) ;
162- }
133+ if ( exportHandle . exportStatus === "in-progress" ) {
134+ throw new Error ( "Requested export is still being generated. Try again later." ) ;
135+ }
163136
164- const { exportPath } = exportHandle ;
137+ const { exportPath } = exportHandle ;
165138
166- return fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } ) ;
167- }
168- ) ;
139+ return fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } ) ;
169140 } catch ( error ) {
170141 this . logger . error ( {
171142 id : LogId . exportReadError ,
@@ -190,32 +161,23 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
190161 try {
191162 this . assertIsNotShuttingDown ( ) ;
192163 const exportNameWithExtension = validateExportName ( ensureExtension ( exportName , "json" ) ) ;
193- return await this . withLock (
194- {
195- exportName : exportNameWithExtension ,
196- mode : "write" ,
197- callbackName : "createJSONExport" ,
198- } ,
199- ( ) : Promise < AvailableExport > => {
200- if ( this . storedExports [ exportNameWithExtension ] ) {
201- return Promise . reject (
202- new Error ( "Export with same name is either already available or being generated." )
203- ) ;
204- }
205- const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
206- const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
207- const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
208- exportName : exportNameWithExtension ,
209- exportTitle,
210- exportPath : exportFilePath ,
211- exportURI : exportURI ,
212- exportStatus : "in-progress" ,
213- } ) ;
214-
215- void this . startExport ( { input, jsonExportFormat, inProgressExport } ) ;
216- return Promise . resolve ( inProgressExport ) ;
217- }
218- ) ;
164+ if ( this . storedExports [ exportNameWithExtension ] ) {
165+ return Promise . reject (
166+ new Error ( "Export with same name is either already available or being generated." )
167+ ) ;
168+ }
169+ const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
170+ const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
171+ const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
172+ exportName : exportNameWithExtension ,
173+ exportTitle,
174+ exportPath : exportFilePath ,
175+ exportURI : exportURI ,
176+ exportStatus : "in-progress" ,
177+ } ) ;
178+
179+ void this . startExport ( { input, jsonExportFormat, inProgressExport } ) ;
180+ return Promise . resolve ( inProgressExport ) ;
219181 } catch ( error ) {
220182 this . logger . error ( {
221183 id : LogId . exportCreationError ,
@@ -236,49 +198,40 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
236198 inProgressExport : InProgressExport ;
237199 } ) : Promise < void > {
238200 try {
239- await this . withLock (
240- {
241- exportName : inProgressExport . exportName ,
242- mode : "write" ,
243- callbackName : "startExport" ,
244- } ,
245- async ( ) : Promise < void > => {
246- let pipeSuccessful = false ;
247- try {
248- await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
249- const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
250- await pipeline (
251- [
252- input . stream ( ) ,
253- this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
254- outputStream ,
255- ] ,
256- { signal : this . shutdownController . signal }
257- ) ;
258- pipeSuccessful = true ;
259- } catch ( error ) {
260- // If the pipeline errors out then we might end up with
261- // partial and incorrect export so we remove it entirely.
262- await this . silentlyRemoveExport (
263- inProgressExport . exportPath ,
264- LogId . exportCreationCleanupError ,
265- `Error when removing incomplete export ${ inProgressExport . exportName } `
266- ) ;
267- delete this . storedExports [ inProgressExport . exportName ] ;
268- throw error ;
269- } finally {
270- if ( pipeSuccessful ) {
271- this . storedExports [ inProgressExport . exportName ] = {
272- ...inProgressExport ,
273- exportCreatedAt : Date . now ( ) ,
274- exportStatus : "ready" ,
275- } ;
276- this . emit ( "export-available" , inProgressExport . exportURI ) ;
277- }
278- void input . close ( ) ;
279- }
201+ let pipeSuccessful = false ;
202+ try {
203+ await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
204+ const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
205+ await pipeline (
206+ [
207+ input . stream ( ) ,
208+ this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
209+ outputStream ,
210+ ] ,
211+ { signal : this . shutdownController . signal }
212+ ) ;
213+ pipeSuccessful = true ;
214+ } catch ( error ) {
215+ // If the pipeline errors out then we might end up with
216+ // partial and incorrect export so we remove it entirely.
217+ await this . silentlyRemoveExport (
218+ inProgressExport . exportPath ,
219+ LogId . exportCreationCleanupError ,
220+ `Error when removing incomplete export ${ inProgressExport . exportName } `
221+ ) ;
222+ delete this . storedExports [ inProgressExport . exportName ] ;
223+ throw error ;
224+ } finally {
225+ if ( pipeSuccessful ) {
226+ this . storedExports [ inProgressExport . exportName ] = {
227+ ...inProgressExport ,
228+ exportCreatedAt : Date . now ( ) ,
229+ exportStatus : "ready" ,
230+ } ;
231+ this . emit ( "export-available" , inProgressExport . exportURI ) ;
280232 }
281- ) ;
233+ void input . close ( ) ;
234+ }
282235 } catch ( error ) {
283236 this . logger . error ( {
284237 id : LogId . exportCreationError ,
@@ -335,33 +288,29 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
335288
336289 this . exportsCleanupInProgress = true ;
337290 try {
338- const exportsForCleanup = Object . values ( { ...this . storedExports } ) . filter (
339- ( storedExport ) : storedExport is ReadyExport => storedExport . exportStatus === "ready"
340- ) ;
291+ // first, unregister all exports that are expired, so they are not considered anymore for reading
292+ const exportsForCleanup : ReadyExport [ ] = [ ] ;
293+ for ( const expiredExport of Object . values ( this . storedExports ) ) {
294+ if (
295+ expiredExport . exportStatus === "ready" &&
296+ isExportExpired ( expiredExport . exportCreatedAt , this . config . exportTimeoutMs )
297+ ) {
298+ exportsForCleanup . push ( expiredExport ) ;
299+ delete this . storedExports [ expiredExport . exportName ] ;
300+ }
301+ }
341302
342- await Promise . allSettled (
343- exportsForCleanup . map ( async ( { exportPath, exportCreatedAt, exportURI, exportName } ) => {
344- if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
345- await this . withLock (
346- {
347- exportName,
348- mode : "write" ,
349- finalize : true ,
350- callbackName : "cleanupExpiredExport" ,
351- } ,
352- async ( ) : Promise < void > => {
353- delete this . storedExports [ exportName ] ;
354- await this . silentlyRemoveExport (
355- exportPath ,
356- LogId . exportCleanupError ,
357- `Considerable error when removing export ${ exportName } `
358- ) ;
359- this . emit ( "export-expired" , exportURI ) ;
360- }
361- ) ;
362- }
363- } )
364- ) ;
303+ // and then remove them (slow operation potentially) from disk.
304+ const allDeletionPromises : Promise < void > [ ] = [ ] ;
305+ for ( const { exportPath, exportName } of exportsForCleanup ) {
306+ allDeletionPromises . push (
307+ this . silentlyRemoveExport (
308+ exportPath ,
309+ LogId . exportCleanupError ,
310+ `Considerable error when removing export ${ exportName } `
311+ )
312+ ) ;
313+ }
365314 } catch ( error ) {
366315 this . logger . error ( {
367316 id : LogId . exportCleanupError ,
@@ -396,69 +345,6 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
396345 }
397346 }
398347
399- private async withLock < CallbackResult extends Promise < unknown > > (
400- lockConfig : {
401- exportName : string ;
402- mode : "read" | "write" ;
403- finalize ?: boolean ;
404- callbackName ?: string ;
405- } ,
406- callback : ( ) => CallbackResult
407- ) : Promise < Awaited < CallbackResult > > {
408- const { exportName, mode, finalize = false , callbackName } = lockConfig ;
409- const operationName = callbackName ? `${ callbackName } - ${ exportName } ` : exportName ;
410- let lock = this . exportLocks . get ( exportName ) ;
411- if ( ! lock ) {
412- lock = new RWLock ( ) ;
413- this . exportLocks . set ( exportName , lock ) ;
414- }
415-
416- let lockAcquired : boolean = false ;
417- const acquireLock = async ( ) : Promise < void > => {
418- if ( mode === "read" ) {
419- await lock . readLock ( this . readTimeoutMs ) ;
420- } else {
421- await lock . writeLock ( this . writeTimeoutMs ) ;
422- }
423- lockAcquired = true ;
424- } ;
425-
426- try {
427- await Promise . race ( [
428- this . operationAbortedPromise ( `Acquire ${ mode } lock for ${ operationName } ` ) ,
429- acquireLock ( ) ,
430- ] ) ;
431- return await Promise . race ( [ this . operationAbortedPromise ( operationName ) , callback ( ) ] ) ;
432- } finally {
433- if ( lockAcquired ) {
434- lock . unlock ( ) ;
435- }
436- if ( finalize ) {
437- this . exportLocks . delete ( exportName ) ;
438- }
439- }
440- }
441-
442- private operationAbortedPromise ( operationName ?: string ) : Promise < never > {
443- return new Promise ( ( _ , reject ) => {
444- const rejectIfAborted = ( ) : void => {
445- if ( this . shutdownController . signal . aborted ) {
446- // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
447- const abortReason = this . shutdownController . signal . reason ;
448- const abortMessage =
449- typeof abortReason === "string"
450- ? abortReason
451- : `${ operationName ?? "Operation" } aborted - ExportsManager shutting down!` ;
452- reject ( new OperationAbortedError ( abortMessage ) ) ;
453- this . shutdownController . signal . removeEventListener ( "abort" , rejectIfAborted ) ;
454- }
455- } ;
456-
457- rejectIfAborted ( ) ;
458- this . shutdownController . signal . addEventListener ( "abort" , rejectIfAborted ) ;
459- } ) ;
460- }
461-
462348 static init (
463349 config : ExportsManagerConfig ,
464350 logger : LoggerBase ,
0 commit comments