Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery service account #3128

Merged
merged 7 commits into from
Apr 18, 2022
86 changes: 75 additions & 11 deletions packages/nodes-base/nodes/Google/BigQuery/GenericFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,18 @@ import {

import {
IDataObject,
JsonObject,
NodeApiError,
NodeOperationError
} from 'n8n-workflow';

import moment from 'moment-timezone';

import * as jwt from 'jsonwebtoken';

export async function googleApiRequest(this: IExecuteFunctions | IExecuteSingleFunctions | ILoadOptionsFunctions, method: string, resource: string, body: any = {}, qs: IDataObject = {}, uri?: string, headers: IDataObject = {}): Promise<any> { // tslint:disable-line:no-any
const authenticationMethod = this.getNodeParameter('authentication', 0, 'serviceAccount') as string;

const options: OptionsWithUri = {
headers: {
'Content-Type': 'application/json',
Expand All @@ -30,20 +39,28 @@ export async function googleApiRequest(this: IExecuteFunctions | IExecuteSingleF
if (Object.keys(body).length === 0) {
delete options.body;
}
//@ts-ignore
return await this.helpers.requestOAuth2.call(this, 'googleBigQueryOAuth2Api', options);
} catch (error) {
if (error.response && error.response.body && error.response.body.error) {

let errors = error.response.body.error.errors;
if (authenticationMethod === 'serviceAccount') {
const credentials = await this.getCredentials('googleApi');

errors = errors.map((e: IDataObject) => e.message);
// Try to return the error prettier
throw new Error(
`Google BigQuery error response [${error.statusCode}]: ${errors.join('|')}`,
);
if (credentials === undefined) {
throw new NodeOperationError(this.getNode(), 'No credentials got returned!');
}

const { access_token } = await getAccessToken.call(this, credentials as IDataObject);

options.headers!.Authorization = `Bearer ${access_token}`;
return await this.helpers.request!(options);
} else {
//@ts-ignore
return await this.helpers.requestOAuth2.call(this, 'googleBigQueryOAuth2Api', options);
}
throw error;
} catch (error) {
if (error.code === 'ERR_OSSL_PEM_NO_START_LINE') {
error.statusCode = '401';
}

throw new NodeApiError(this.getNode(), error as JsonObject);
}
}

Expand All @@ -66,6 +83,53 @@ export async function googleApiRequestAllItems(this: IExecuteFunctions | ILoadOp
return returnData;
}

function getAccessToken(this: IExecuteFunctions | IExecuteSingleFunctions | ILoadOptionsFunctions, credentials: IDataObject): Promise<IDataObject> {
//https://developers.google.com/identity/protocols/oauth2/service-account#httprest

const privateKey = (credentials.privateKey as string).replace(/\\n/g, '\n').trim();

const scopes = [
'https://www.googleapis.com/auth/bigquery',
];

const now = moment().unix();

const signature = jwt.sign(
{
'iss': credentials.email as string,
'sub': credentials.delegatedEmail || credentials.email as string,
'scope': scopes.join(' '),
'aud': `https://oauth2.googleapis.com/token`,
'iat': now,
'exp': now + 3600,
},
privateKey,
{
algorithm: 'RS256',
header: {
'kid': privateKey,
'typ': 'JWT',
'alg': 'RS256',
},
},
);

const options: OptionsWithUri = {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
method: 'POST',
form: {
grant_type: 'urn:ietf:params:oauth:grant-type:jwt-bearer',
assertion: signature,
},
uri: 'https://oauth2.googleapis.com/token',
json: true,
};

return this.helpers.request!(options);
}

export function simplify(rows: IDataObject[], fields: string[]) {
const results = [];
for (const row of rows) {
Expand Down
131 changes: 92 additions & 39 deletions packages/nodes-base/nodes/Google/BigQuery/GoogleBigQuery.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
INodePropertyOptions,
INodeType,
INodeTypeDescription,
NodeApiError,
} from 'n8n-workflow';

import {
Expand Down Expand Up @@ -39,24 +40,60 @@ export class GoogleBigQuery implements INodeType {
inputs: ['main'],
outputs: ['main'],
credentials: [
{
name: 'googleApi',
required: true,
displayOptions: {
show: {
authentication: [
'serviceAccount',
],
},
},
},
{
name: 'googleBigQueryOAuth2Api',
required: true,
displayOptions: {
show: {
authentication: [
'oAuth2',
],
},
},
},
],
properties: [
{
displayName: 'Authentication',
name: 'authentication',
type: 'options',
noDataExpression: true,
options: [
{
name: 'Service Account',
value: 'serviceAccount',
},
{
name: 'OAuth2',
value: 'oAuth2',
},
],
default: 'oAuth2',
},
{
displayName: 'Resource',
name: 'resource',
type: 'options',
noDataExpression: true,
options: [
{
name: 'Record',
value: 'record',
},
],
default: 'record',
description: 'The resource to operate on.',
description: 'The resource to operate on',
},
...recordOperations,
...recordFields,
Expand Down Expand Up @@ -171,14 +208,22 @@ export class GoogleBigQuery implements INodeType {
}

body.rows = rows;
responseData = await googleApiRequest.call(
this,
'POST',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/insertAll`,
body,
);
returnData.push(responseData);

try {
responseData = await googleApiRequest.call(
this,
'POST',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/insertAll`,
body,
);
returnData.push(responseData);
} catch (error) {
if (this.continueOnFail()) {
returnData.push({ error: error.message });
} else {
throw new NodeApiError(this.getNode(), error);
}
}
} else if (operation === 'getAll') {

// ----------------------------------
Expand All @@ -205,40 +250,48 @@ export class GoogleBigQuery implements INodeType {
}

for (let i = 0; i < length; i++) {
const options = this.getNodeParameter('options', i) as IDataObject;
Object.assign(qs, options);
try {
const options = this.getNodeParameter('options', i) as IDataObject;
Object.assign(qs, options);

// if (qs.useInt64Timestamp !== undefined) {
// qs.formatOptions = {
// useInt64Timestamp: qs.useInt64Timestamp,
// };
// delete qs.useInt64Timestamp;
// }
// if (qs.useInt64Timestamp !== undefined) {
// qs.formatOptions = {
// useInt64Timestamp: qs.useInt64Timestamp,
// };
// delete qs.useInt64Timestamp;
// }

if (qs.selectedFields) {
fields = (qs.selectedFields as string).split(',');
}
if (qs.selectedFields) {
fields = (qs.selectedFields as string).split(',');
}

if (returnAll) {
responseData = await googleApiRequestAllItems.call(
this,
'rows',
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/data`,
{},
qs,
);
returnData.push.apply(returnData, (simple) ? simplify(responseData, fields) : responseData);
} else {
qs.maxResults = this.getNodeParameter('limit', i) as number;
responseData = await googleApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/data`,
{},
qs,
);
returnData.push.apply(returnData, (simple) ? simplify(responseData.rows, fields) : responseData.rows);
if (returnAll) {
responseData = await googleApiRequestAllItems.call(
this,
'rows',
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/data`,
{},
qs,
);
returnData.push.apply(returnData, (simple) ? simplify(responseData, fields) : responseData);
} else {
qs.maxResults = this.getNodeParameter('limit', i) as number;
responseData = await googleApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/data`,
{},
qs,
);
returnData.push.apply(returnData, (simple) ? simplify(responseData.rows, fields) : responseData.rows);
}
} catch (error) {
if (this.continueOnFail()) {
returnData.push({ error: error.message });
continue;
}
throw new NodeApiError(this.getNode(), error);
}
}
}
Expand Down
Loading