Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions src/gcs-resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library';
import {Readable, Writable} from 'stream';
import retry = require('async-retry');
import {RetryOptions, PreconditionOptions} from './storage';
import * as uuid from 'uuid';

const NOT_FOUND_STATUS_CODE = 404;
const TERMINATED_UPLOAD_STATUS_CODE = 410;
const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
const DEFAULT_API_ENDPOINT_REGEX = /.*\.googleapis\.com/;
let packageJson: ReturnType<JSON['parse']> = {};
try {
// if requiring from 'build' (default)
packageJson = require('../../package.json');
} catch (e) {
// if requiring directly from TypeScript context
packageJson = require('../package.json');
}

export const PROTOCOL_REGEX = /^(\w*):\/\//;

Expand Down Expand Up @@ -262,6 +271,11 @@ export class Upload extends Writable {
contentLength: number | '*';
retryOptions: RetryOptions;
timeOfFirstRequest: number;
private currentInvocationId = {
chunk: uuid.v4(),
uri: uuid.v4(),
offset: uuid.v4(),
};
private upstreamChunkBuffer: Buffer = Buffer.alloc(0);
private chunkBufferEncoding?: BufferEncoding = undefined;
private numChunksReadInRequest = 0;
Expand Down Expand Up @@ -530,6 +544,7 @@ export class Upload extends Writable {
protected async createURIAsync(): Promise<string> {
const metadata = this.metadata;

// Check if headers already exist before creating new ones
const reqOpts: GaxiosOptions = {
method: 'POST',
url: [this.baseURI, this.bucket, 'o'].join('/'),
Expand All @@ -541,7 +556,9 @@ export class Upload extends Writable {
this.params
),
data: metadata,
headers: {},
headers: {
'x-goog-api-client': `gl-node/${process.versions.node} gccl/${packageJson.version} gccl-invocation-id/${this.currentInvocationId.uri}`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify-- I spoke with @frankyn and he mentioned that each request in a resumable upload should have its own invocation id (unless it is a retry of a request that failed). Is that reflective of what's happening here?

(Looks like it is from the test cases, but wanted to double check)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, that is what is happening here. The invocation ID is only changed after a successful response. If a failure case is hit the same ID will be used on the retry. Each request (create a URI, send a chunk, etc...) all utilize a new ID.

},
};

if (metadata.contentLength) {
Expand Down Expand Up @@ -572,6 +589,8 @@ export class Upload extends Writable {
async (bail: (err: Error) => void) => {
try {
const res = await this.makeRequest(reqOpts);
// We have successfully got a URI we can now create a new invocation id
this.currentInvocationId.uri = uuid.v4();
return res.headers.location;
} catch (err) {
const e = err as GaxiosError;
Expand Down Expand Up @@ -707,20 +726,20 @@ export class Upload extends Writable {
},
});

let headers: GaxiosOptions['headers'] = {};
const headers: GaxiosOptions['headers'] = {
'x-goog-api-client': `gl-node/${process.versions.node} gccl/${packageJson.version} gccl-invocation-id/${this.currentInvocationId.chunk}`,
};

// If using multiple chunk upload, set appropriate header
if (multiChunkMode && expectedUploadSize) {
// The '-1' is because the ending byte is inclusive in the request.
const endingByte = expectedUploadSize + this.numBytesWritten - 1;
headers = {
'Content-Length': expectedUploadSize,
'Content-Range': `bytes ${this.offset}-${endingByte}/${this.contentLength}`,
};
headers['Content-Length'] = expectedUploadSize;
headers[
'Content-Range'
] = `bytes ${this.offset}-${endingByte}/${this.contentLength}`;
} else {
headers = {
'Content-Range': `bytes ${this.offset}-*/${this.contentLength}`,
};
headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;
}

const reqOpts: GaxiosOptions = {
Expand Down Expand Up @@ -750,6 +769,9 @@ export class Upload extends Writable {
return;
}

// At this point we can safely create a new id for the chunk
this.currentInvocationId.chunk = uuid.v4();

const shouldContinueWithNextMultiChunkRequest =
this.chunkSize &&
resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE &&
Expand Down Expand Up @@ -849,10 +871,16 @@ export class Upload extends Writable {
const opts: GaxiosOptions = {
method: 'PUT',
url: this.uri!,
headers: {'Content-Length': 0, 'Content-Range': 'bytes */*'},
headers: {
'Content-Length': 0,
'Content-Range': 'bytes */*',
'x-goog-api-client': `gl-node/${process.versions.node} gccl/${packageJson.version} gccl-invocation-id/${this.currentInvocationId.offset}`,
},
};
try {
const resp = await this.makeRequest(opts);
// Successfully got the offset we can now create a new offset invocation id
this.currentInvocationId.offset = uuid.v4();
if (resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE) {
if (resp.headers.range) {
const range = resp.headers.range as string;
Expand Down
5 changes: 4 additions & 1 deletion src/nodejs-common/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import arrify = require('arrify');
import * as extend from 'extend';
import {AuthClient, GoogleAuth, GoogleAuthOptions} from 'google-auth-library';
import * as r from 'teeny-request';
import * as uuid from 'uuid';

import {Interceptor} from './service-object';
import {
Expand Down Expand Up @@ -242,7 +243,9 @@ export class Service {
}
reqOpts.headers = extend({}, reqOpts.headers, {
'User-Agent': userAgent,
'x-goog-api-client': `gl-node/${process.versions.node} gccl/${pkg.version}`,
'x-goog-api-client': `gl-node/${process.versions.node} gccl/${
pkg.version
} gccl-invocation-id/${uuid.v4()}`,
});

if (reqOpts.shouldReturnStream) {
Expand Down
16 changes: 14 additions & 2 deletions src/nodejs-common/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import * as r from 'teeny-request';
import * as retryRequest from 'retry-request';
import {Duplex, DuplexOptions, Readable, Transform, Writable} from 'stream';
import {teenyRequest} from 'teeny-request';

import {Interceptor} from './service-object';
import * as uuid from 'uuid';
import * as packageJson from '../../package.json';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const duplexify: DuplexifyConstructor = require('duplexify');

const requestDefaults = {
const requestDefaults: r.CoreOptions = {
timeout: 60000,
gzip: true,
forever: true,
Expand Down Expand Up @@ -522,6 +523,7 @@ export class Util {
return;
}

requestDefaults.headers = util._getDefaultHeaders();
const request = teenyRequest.defaults(requestDefaults);
request(authenticatedReqOpts!, (err, resp, body) => {
util.handleResp(err, resp, body, (err, data) => {
Expand Down Expand Up @@ -797,6 +799,7 @@ export class Util {
maxRetryValue = config.retryOptions.maxRetries;
}

requestDefaults.headers = this._getDefaultHeaders();
const options = {
request: teenyRequest.defaults(requestDefaults),
retries: autoRetryValue !== false ? maxRetryValue : 0,
Expand Down Expand Up @@ -945,6 +948,15 @@ export class Util {
? [{} as T, optionsOrCallback as C]
: [optionsOrCallback as T, cb as C];
}

_getDefaultHeaders() {
return {
'User-Agent': util.getUserAgentFromPackageJson(packageJson),
'x-goog-api-client': `gl-node/${process.versions.node} gccl/${
packageJson.version
} gccl-invocation-id/${uuid.v4()}`,
};
}
}

/**
Expand Down
Loading