@@ -11,7 +11,12 @@ const { S3Client,
11
11
PutObjectTaggingCommand,
12
12
DeleteObjectTaggingCommand,
13
13
CopyObjectCommand,
14
- GetBucketLocationCommand } = require ( '@aws-sdk/client-s3' ) ;
14
+ GetBucketLocationCommand,
15
+ GetBucketVersioningCommand,
16
+ NoSuchKeyException,
17
+ NotFoundException,
18
+ NoSuchVersionException,
19
+ AccessDeniedException } = require ( '@aws-sdk/client-s3' ) ;
15
20
const werelogs = require ( 'werelogs' ) ;
16
21
17
22
const errors = require ( '../../../errors' ) . default ;
@@ -68,7 +73,7 @@ class AwsClient {
68
73
return cb ( ) ;
69
74
} )
70
75
. catch ( err => {
71
- if ( err && err . code !== 'AuthorizationHeaderMalformed' ) {
76
+ if ( err . name !== 'AuthorizationHeaderMalformed' ) {
72
77
this . _logger . error ( 'error during setup' , {
73
78
error : err ,
74
79
method : 'AwsClient.setup' ,
@@ -169,7 +174,7 @@ class AwsClient {
169
174
. catch ( err => {
170
175
let logLevel ;
171
176
let retError ;
172
- if ( err . code === 'NotFound' ) {
177
+ if ( err instanceof NotFoundException ) {
173
178
logLevel = 'info' ;
174
179
retError = errors . LocationNotFound ;
175
180
} else {
@@ -194,15 +199,23 @@ class AwsClient {
194
199
this . _client . send ( new GetObjectCommand ( params ) ) . then ( data => {
195
200
// Always return an object with .createReadStream for test compatibility
196
201
const stream = data . Body ;
197
- stream . abort = ( ) => { } ;
198
- const response = {
202
+ let isAborted = false ;
203
+ const destroy = ( ) => {
204
+ if ( isAborted ) return ;
205
+ isAborted = true ;
206
+ ( stream ?. destroy || stream ?. abort || stream ?. close || stream ?. end || stream ?. removeAllListeners ) ?. ( ) ;
207
+ } ;
208
+ if ( ! stream ?. abort ) {
209
+ stream . abort = abort ;
210
+ }
211
+ return callback ( null , {
199
212
createReadStream : ( ) => stream ,
200
213
Body : stream ,
201
- abort : ( ) => { } ,
202
- } ;
203
- return callback ( null , response ) ;
214
+ abort,
215
+ destroy ,
216
+ } ) ;
204
217
} ) . catch ( err => {
205
- if ( err . code === 'NoSuchKey' || err . code === 'NotFound' ) {
218
+ if ( err instanceof NoSuchKeyException || err instanceof NotFoundException ) {
206
219
logHelper ( log , 'info' , 'object not found' , err , this . _dataStoreName ) ;
207
220
} else {
208
221
logHelper ( log , 'error' , 'error getting object from datastore' , err , this . _dataStoreName ) ;
@@ -220,12 +233,10 @@ class AwsClient {
220
233
if ( deleteVersion ) {
221
234
params . VersionId = dataStoreVersionId ;
222
235
}
223
- return this . _client . send ( new DeleteObjectCommand ( params ) ) . then ( ( ) => {
224
- return callback ( ) ;
225
- } ) . catch ( err => {
236
+ return this . _client . send ( new DeleteObjectCommand ( params ) ) . then ( ( ) => callback ( ) ) . catch ( err => {
226
237
logHelper ( log , 'error' , 'error deleting object from ' +
227
- 'datastore' , err , this . _dataStoreName , this . clientType ) ;
228
- if ( err . code === 'NoSuchVersion' || err . code === 'NoSuchKey' ) {
238
+ 'datastore' , err , this . _dataStoreName , this . clientType ) ;
239
+ if ( err instanceof NoSuchVersionException || err instanceof NoSuchKeyException ) {
229
240
// data may have been deleted directly from the AWS backend
230
241
// don't want to retry the delete and errors are not
231
242
// sent back to client anyway, so no need to return err
@@ -241,22 +252,19 @@ class AwsClient {
241
252
healthcheck ( location , callback ) {
242
253
const awsResp = { } ;
243
254
this . _client . send ( new HeadObjectCommand ( { Bucket : this . _awsBucketName } ) )
244
- . catch ( err => {
245
-
246
- awsResp [ location ] = { error : err , external : true } ;
247
- return callback ( null , awsResp ) ;
248
- } ) . then ( ( ) => {
255
+ . then ( ( ) => {
249
256
if ( ! this . _supportsVersioning ) {
250
257
awsResp [ location ] = {
251
258
message : 'Congrats! You own the bucket' ,
252
259
} ;
253
260
return callback ( null , awsResp ) ;
254
261
}
255
- return this . _client . send ( new GetObjectCommand ( {
262
+ return this . _client . send ( new GetBucketVersioningCommand ( {
256
263
Bucket : this . _awsBucketName } ) ) . catch ( err => {
257
264
awsResp [ location ] = { error : err , external : true } ;
258
265
} ) . then ( data => {
259
- if ( ! data . VersionId ) {
266
+ if ( ! data . Status ||
267
+ data . Status === 'Suspended' ) {
260
268
awsResp [ location ] = {
261
269
versioningStatus : data . Status ,
262
270
error : 'Versioning must be enabled' ,
@@ -270,6 +278,9 @@ class AwsClient {
270
278
}
271
279
return callback ( null , awsResp ) ;
272
280
} ) ;
281
+ } ) . catch ( err => {
282
+ awsResp [ location ] = { error : err , external : true } ;
283
+ return callback ( null , awsResp ) ;
273
284
} ) ;
274
285
}
275
286
@@ -419,10 +430,10 @@ class AwsClient {
419
430
const completeObjData = { key : awsKey } ;
420
431
return this . _client . send ( new CompleteMultipartUploadCommand ( mpuParams ) ) . catch ( err => {
421
432
422
- if ( mpuError [ err . code ] ) {
433
+ if ( mpuError [ err . name ] ) {
423
434
logHelper ( log , 'trace' , 'err from data backend on ' +
424
435
'completeMPU' , err , this . _dataStoreName , this . clientType ) ;
425
- return callback ( errors [ err . code ] ) ;
436
+ return callback ( errors [ err . name ] ) ;
426
437
}
427
438
logHelper ( log , 'error' , 'err from data backend on ' +
428
439
'completeMPU' , err , this . _dataStoreName , this . clientType ) ;
@@ -551,7 +562,7 @@ class AwsClient {
551
562
awsParams . ServerSideEncryption = 'AES256' ;
552
563
}
553
564
return this . _client . send ( new CopyObjectCommand ( awsParams ) ) . catch ( err => {
554
- if ( err . code === 'AccessDenied' ) {
565
+ if ( err instanceof AccessDeniedException ) {
555
566
logHelper ( log , 'error' , 'Unable to access ' +
556
567
`${ sourceAwsBucketName } ${ this . type } bucket` , err ,
557
568
this . _dataStoreName , this . clientType ) ;
@@ -608,7 +619,7 @@ class AwsClient {
608
619
UploadId : uploadId ,
609
620
} ;
610
621
return this . _client . send ( new CopyObjectCommand ( params ) ) . catch ( err => {
611
- if ( err . code === 'AccessDenied' ) {
622
+ if ( err instanceof AccessDeniedException ) {
612
623
logHelper ( log , 'error' , 'Unable to access ' +
613
624
`${ sourceAwsBucketName } AWS bucket` , err ,
614
625
this . _dataStoreName , this . clientType ) ;
0 commit comments