@@ -8,7 +8,6 @@ import { EJSON, EJSONOptions, ObjectId } from "bson";
88import { Transform } from "stream" ;
99import { pipeline } from "stream/promises" ;
1010import { MongoLogId } from "mongodb-log-writer" ;
11- import { RWLock } from "async-rwlock" ;
1211
1312import { UserConfig } from "./config.js" ;
1413import { LoggerBase , LogId } from "./logger.js" ;
@@ -57,43 +56,26 @@ type StoredExport = ReadyExport | InProgressExport;
5756 * JIRA: https://jira.mongodb.org/browse/MCP-104 */
5857type AvailableExport = Pick < StoredExport , "exportName" | "exportTitle" | "exportURI" | "exportPath" > ;
5958
60- export type ExportsManagerConfig = Pick < UserConfig , "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" > & {
61- // The maximum number of milliseconds to wait for in-flight operations to
62- // settle before shutting down ExportsManager.
63- activeOpsDrainTimeoutMs ?: number ;
64-
65- // The maximum number of milliseconds to wait before timing out queued reads
66- readTimeout ?: number ;
67-
68- // The maximum number of milliseconds to wait before timing out queued writes
69- writeTimeout ?: number ;
70- } ;
59+ export type ExportsManagerConfig = Pick < UserConfig , "exportsPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" > ;
7160
7261type ExportsManagerEvents = {
7362 closed : [ ] ;
7463 "export-expired" : [ string ] ;
7564 "export-available" : [ string ] ;
7665} ;
7766
78- class OperationAbortedError extends Error { }
79-
8067export class ExportsManager extends EventEmitter < ExportsManagerEvents > {
8168 private storedExports : Record < StoredExport [ "exportName" ] , StoredExport > = { } ;
8269 private exportsCleanupInProgress : boolean = false ;
8370 private exportsCleanupInterval ?: NodeJS . Timeout ;
8471 private readonly shutdownController : AbortController = new AbortController ( ) ;
85- private readonly readTimeoutMs : number ;
86- private readonly writeTimeoutMs : number ;
87- private readonly exportLocks : Map < string , RWLock > = new Map ( ) ;
8872
8973 private constructor (
9074 private readonly exportsDirectoryPath : string ,
9175 private readonly config : ExportsManagerConfig ,
9276 private readonly logger : LoggerBase
9377 ) {
9478 super ( ) ;
95- this . readTimeoutMs = this . config . readTimeout ?? 30_0000 ; // 30 seconds is the default timeout for an MCP request
96- this . writeTimeoutMs = this . config . writeTimeout ?? 120_000 ; // considering that writes can take time
9779 }
9880
9981 public get availableExports ( ) : AvailableExport [ ] {
@@ -121,6 +103,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
121103 ) ;
122104 }
123105 }
106+
124107 public async close ( ) : Promise < void > {
125108 if ( this . shutdownController . signal . aborted ) {
126109 return ;
@@ -143,29 +126,18 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
143126 try {
144127 this . assertIsNotShuttingDown ( ) ;
145128 exportName = decodeURIComponent ( exportName ) ;
146- return await this . withLock (
147- {
148- exportName,
149- mode : "read" ,
150- callbackName : "readExport" ,
151- } ,
152- async ( ) : Promise < string > => {
153- const exportHandle = this . storedExports [ exportName ] ;
154- if ( ! exportHandle ) {
155- throw new Error ( "Requested export has either expired or does not exist!" ) ;
156- }
129+ const exportHandle = this . storedExports [ exportName ] ;
130+ if ( ! exportHandle ) {
131+ throw new Error ( "Requested export has either expired or does not exist." ) ;
132+ }
157133
158- // This won't happen because of lock synchronization but
159- // keeping it here to make TS happy.
160- if ( exportHandle . exportStatus === "in-progress" ) {
161- throw new Error ( "Requested export is still being generated!" ) ;
162- }
134+ if ( exportHandle . exportStatus === "in-progress" ) {
135+ throw new Error ( "Requested export is still being generated. Try again later." ) ;
136+ }
163137
164- const { exportPath } = exportHandle ;
138+ const { exportPath } = exportHandle ;
165139
166- return fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } ) ;
167- }
168- ) ;
140+ return fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } ) ;
169141 } catch ( error ) {
170142 this . logger . error ( {
171143 id : LogId . exportReadError ,
@@ -190,32 +162,23 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
190162 try {
191163 this . assertIsNotShuttingDown ( ) ;
192164 const exportNameWithExtension = validateExportName ( ensureExtension ( exportName , "json" ) ) ;
193- return await this . withLock (
194- {
195- exportName : exportNameWithExtension ,
196- mode : "write" ,
197- callbackName : "createJSONExport" ,
198- } ,
199- ( ) : Promise < AvailableExport > => {
200- if ( this . storedExports [ exportNameWithExtension ] ) {
201- return Promise . reject (
202- new Error ( "Export with same name is either already available or being generated." )
203- ) ;
204- }
205- const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
206- const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
207- const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
208- exportName : exportNameWithExtension ,
209- exportTitle,
210- exportPath : exportFilePath ,
211- exportURI : exportURI ,
212- exportStatus : "in-progress" ,
213- } ) ;
214-
215- void this . startExport ( { input, jsonExportFormat, inProgressExport } ) ;
216- return Promise . resolve ( inProgressExport ) ;
217- }
218- ) ;
165+ if ( this . storedExports [ exportNameWithExtension ] ) {
166+ return Promise . reject (
167+ new Error ( "Export with same name is either already available or being generated." )
168+ ) ;
169+ }
170+ const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
171+ const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
172+ const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
173+ exportName : exportNameWithExtension ,
174+ exportTitle,
175+ exportPath : exportFilePath ,
176+ exportURI : exportURI ,
177+ exportStatus : "in-progress" ,
178+ } ) ;
179+
180+ void this . startExport ( { input, jsonExportFormat, inProgressExport } ) ;
181+ return Promise . resolve ( inProgressExport ) ;
219182 } catch ( error ) {
220183 this . logger . error ( {
221184 id : LogId . exportCreationError ,
@@ -236,49 +199,41 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
236199 inProgressExport : InProgressExport ;
237200 } ) : Promise < void > {
238201 try {
239- await this . withLock (
240- {
241- exportName : inProgressExport . exportName ,
242- mode : "write" ,
243- callbackName : "startExport" ,
244- } ,
245- async ( ) : Promise < void > => {
246- let pipeSuccessful = false ;
247- try {
248- await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
249- const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
250- await pipeline (
251- [
252- input . stream ( ) ,
253- this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
254- outputStream ,
255- ] ,
256- { signal : this . shutdownController . signal }
257- ) ;
258- pipeSuccessful = true ;
259- } catch ( error ) {
260- // If the pipeline errors out then we might end up with
261- // partial and incorrect export so we remove it entirely.
262- await this . silentlyRemoveExport (
263- inProgressExport . exportPath ,
264- LogId . exportCreationCleanupError ,
265- `Error when removing incomplete export ${ inProgressExport . exportName } `
266- ) ;
267- delete this . storedExports [ inProgressExport . exportName ] ;
268- throw error ;
269- } finally {
270- if ( pipeSuccessful ) {
271- this . storedExports [ inProgressExport . exportName ] = {
272- ...inProgressExport ,
273- exportCreatedAt : Date . now ( ) ,
274- exportStatus : "ready" ,
275- } ;
276- this . emit ( "export-available" , inProgressExport . exportURI ) ;
277- }
278- void input . close ( ) ;
279- }
202+ let pipeSuccessful = false ;
203+ try {
204+ await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
205+ const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
206+ await pipeline (
207+ [
208+ input . stream ( ) ,
209+ this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
210+ outputStream ,
211+ ] ,
212+ { signal : this . shutdownController . signal }
213+ ) ;
214+ pipeSuccessful = true ;
215+ } catch ( error ) {
216+ // If the pipeline errors out then we might end up with
217+ // partial and incorrect export so we remove it entirely.
218+ delete this . storedExports [ inProgressExport . exportName ] ;
219+ // do not block the user, just delete the file in the background
220+ void this . silentlyRemoveExport (
221+ inProgressExport . exportPath ,
222+ LogId . exportCreationCleanupError ,
223+ `Error when removing incomplete export ${ inProgressExport . exportName } `
224+ ) ;
225+ throw error ;
226+ } finally {
227+ if ( pipeSuccessful ) {
228+ this . storedExports [ inProgressExport . exportName ] = {
229+ ...inProgressExport ,
230+ exportCreatedAt : Date . now ( ) ,
231+ exportStatus : "ready" ,
232+ } ;
233+ this . emit ( "export-available" , inProgressExport . exportURI ) ;
280234 }
281- ) ;
235+ void input . close ( ) ;
236+ }
282237 } catch ( error ) {
283238 this . logger . error ( {
284239 id : LogId . exportCreationError ,
@@ -335,33 +290,31 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
335290
336291 this . exportsCleanupInProgress = true ;
337292 try {
338- const exportsForCleanup = Object . values ( { ...this . storedExports } ) . filter (
339- ( storedExport ) : storedExport is ReadyExport => storedExport . exportStatus === "ready"
340- ) ;
293+ // first, unregister all exports that are expired, so they are not considered anymore for reading
294+ const exportsForCleanup : ReadyExport [ ] = [ ] ;
295+ for ( const expiredExport of Object . values ( this . storedExports ) ) {
296+ if (
297+ expiredExport . exportStatus === "ready" &&
298+ isExportExpired ( expiredExport . exportCreatedAt , this . config . exportTimeoutMs )
299+ ) {
300+ exportsForCleanup . push ( expiredExport ) ;
301+ delete this . storedExports [ expiredExport . exportName ] ;
302+ }
303+ }
341304
342- await Promise . allSettled (
343- exportsForCleanup . map ( async ( { exportPath, exportCreatedAt, exportURI, exportName } ) => {
344- if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
345- await this . withLock (
346- {
347- exportName,
348- mode : "write" ,
349- finalize : true ,
350- callbackName : "cleanupExpiredExport" ,
351- } ,
352- async ( ) : Promise < void > => {
353- delete this . storedExports [ exportName ] ;
354- await this . silentlyRemoveExport (
355- exportPath ,
356- LogId . exportCleanupError ,
357- `Considerable error when removing export ${ exportName } `
358- ) ;
359- this . emit ( "export-expired" , exportURI ) ;
360- }
361- ) ;
362- }
363- } )
364- ) ;
305+ // and then remove them (slow operation potentially) from disk.
306+ const allDeletionPromises : Promise < void > [ ] = [ ] ;
307+ for ( const { exportPath, exportName } of exportsForCleanup ) {
308+ allDeletionPromises . push (
309+ this . silentlyRemoveExport (
310+ exportPath ,
311+ LogId . exportCleanupError ,
312+ `Considerable error when removing export ${ exportName } `
313+ )
314+ ) ;
315+ }
316+
317+ await Promise . allSettled ( allDeletionPromises ) ;
365318 } catch ( error ) {
366319 this . logger . error ( {
367320 id : LogId . exportCleanupError ,
@@ -396,69 +349,6 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
396349 }
397350 }
398351
399- private async withLock < CallbackResult extends Promise < unknown > > (
400- lockConfig : {
401- exportName : string ;
402- mode : "read" | "write" ;
403- finalize ?: boolean ;
404- callbackName ?: string ;
405- } ,
406- callback : ( ) => CallbackResult
407- ) : Promise < Awaited < CallbackResult > > {
408- const { exportName, mode, finalize = false , callbackName } = lockConfig ;
409- const operationName = callbackName ? `${ callbackName } - ${ exportName } ` : exportName ;
410- let lock = this . exportLocks . get ( exportName ) ;
411- if ( ! lock ) {
412- lock = new RWLock ( ) ;
413- this . exportLocks . set ( exportName , lock ) ;
414- }
415-
416- let lockAcquired : boolean = false ;
417- const acquireLock = async ( ) : Promise < void > => {
418- if ( mode === "read" ) {
419- await lock . readLock ( this . readTimeoutMs ) ;
420- } else {
421- await lock . writeLock ( this . writeTimeoutMs ) ;
422- }
423- lockAcquired = true ;
424- } ;
425-
426- try {
427- await Promise . race ( [
428- this . operationAbortedPromise ( `Acquire ${ mode } lock for ${ operationName } ` ) ,
429- acquireLock ( ) ,
430- ] ) ;
431- return await Promise . race ( [ this . operationAbortedPromise ( operationName ) , callback ( ) ] ) ;
432- } finally {
433- if ( lockAcquired ) {
434- lock . unlock ( ) ;
435- }
436- if ( finalize ) {
437- this . exportLocks . delete ( exportName ) ;
438- }
439- }
440- }
441-
442- private operationAbortedPromise ( operationName ?: string ) : Promise < never > {
443- return new Promise ( ( _ , reject ) => {
444- const rejectIfAborted = ( ) : void => {
445- if ( this . shutdownController . signal . aborted ) {
446- // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
447- const abortReason = this . shutdownController . signal . reason ;
448- const abortMessage =
449- typeof abortReason === "string"
450- ? abortReason
451- : `${ operationName ?? "Operation" } aborted - ExportsManager shutting down!` ;
452- reject ( new OperationAbortedError ( abortMessage ) ) ;
453- this . shutdownController . signal . removeEventListener ( "abort" , rejectIfAborted ) ;
454- }
455- } ;
456-
457- rejectIfAborted ( ) ;
458- this . shutdownController . signal . addEventListener ( "abort" , rejectIfAborted ) ;
459- } ) ;
460- }
461-
462352 static init (
463353 config : ExportsManagerConfig ,
464354 logger : LoggerBase ,
0 commit comments