Skip to content

Commit

Permalink
feat: add multi-query transaction support to snowflake piece
Browse files Browse the repository at this point in the history
  • Loading branch information
valentin-mourtialon committed Jan 24, 2025
1 parent 4d1dd96 commit 0f20e53
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 3 deletions.
2 changes: 1 addition & 1 deletion packages/pieces/community/snowflake/package.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"name": "@activepieces/piece-snowflake",
"version": "0.0.9"
"version": "0.0.10"
}
5 changes: 3 additions & 2 deletions packages/pieces/community/snowflake/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
PieceAuth,
Property,
} from '@activepieces/pieces-framework';
import { runMultipleQueries } from './lib/actions/run-multiple-queries';
import { runQuery } from './lib/actions/run-query';
import { PieceCategory } from '@activepieces/shared';
import { insertRowAction } from './lib/actions/insert-row';
Expand Down Expand Up @@ -53,7 +54,7 @@ export const snowflake = createPiece({
minimumSupportedRelease: '0.27.1',
logoUrl: 'https://cdn.activepieces.com/pieces/snowflake.png',
categories: [PieceCategory.DEVELOPER_TOOLS],
authors: ['AdamSelene', 'abuaboud'],
actions: [runQuery, insertRowAction],
authors: ['AdamSelene', 'abuaboud', 'valentin-mourtialon'],
actions: [runQuery, runMultipleQueries, insertRowAction],
triggers: [],
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { createAction, Property } from '@activepieces/pieces-framework';
import snowflake, { Statement, SnowflakeError } from 'snowflake-sdk';
import { snowflakeAuth } from '../../index';

type QueryResult = unknown[] | undefined;
type QueryResults = { query: string; result: QueryResult }[];

const DEFAULT_APPLICATION_NAME = 'ActivePieces';
const DEFAULT_QUERY_TIMEOUT = 30000;

export const runMultipleQueries = createAction({
name: 'runMultipleQueries',
displayName: 'Run Multiple Queries',
description: 'Run Multiple Queries',
auth: snowflakeAuth,
props: {
sqlTexts: Property.Array({
displayName: 'SQL queries',
description:
'Array of SQL queries to execute in order, in the same transaction. Use :1, :2… or ? placeholders to use binding parameters.',
required: true,
}),
binds: Property.Array({
displayName: 'Parameters',
description:
'Binding parameters for the SQL query (to prevent SQL injection attacks)',
required: false,
}),
useTransaction: Property.Checkbox({
displayName: 'Use Transaction',
description:
'When enabled, all queries will be executed in a single transaction. If any query fails, all changes will be rolled back.',
required: false,
defaultValue: false,
}),
timeout: Property.Number({
displayName: 'Query timeout (ms)',
description:
'An integer indicating the maximum number of milliseconds to wait for a query to complete before timing out.',
required: false,
defaultValue: DEFAULT_QUERY_TIMEOUT,
}),
application: Property.ShortText({
displayName: 'Application name',
description:
'A string indicating the name of the client application connecting to the server.',
required: false,
defaultValue: DEFAULT_APPLICATION_NAME,
}),
},

async run(context) {
const { username, password, role, database, warehouse, account } =
context.auth;

const connection = snowflake.createConnection({
application: context.propsValue.application,
timeout: context.propsValue.timeout,
username,
password,
role,
database,
warehouse,
account,
});

return new Promise<QueryResults>((resolve, reject) => {
connection.connect(async function (err: SnowflakeError | undefined) {
if (err) {
reject(err);
return;
}

const { sqlTexts, binds, useTransaction } = context.propsValue;
const queryResults: QueryResults = [];

function handleError(err: SnowflakeError) {
if (useTransaction) {
connection.execute({
sqlText: 'ROLLBACK',
complete: () => {
connection.destroy(() => {
reject(err);
});
},
});
} else {
connection.destroy(() => {
reject(err);
});
}
}

async function executeQueriesSequentially() {
try {
if (useTransaction) {
await new Promise<void>((resolveBegin, rejectBegin) => {
connection.execute({
sqlText: 'BEGIN',
complete: (err: SnowflakeError | undefined) => {
if (err) rejectBegin(err);
else resolveBegin();
},
});
});
}
for (const sqlText of sqlTexts) {
const result = await new Promise<QueryResult>(
(resolveQuery, rejectQuery) => {
connection.execute({
sqlText: sqlText as string,
binds: binds as snowflake.Binds,
complete: (
err: SnowflakeError | undefined,
stmt: Statement,
rows: QueryResult
) => {
if (err) {
rejectQuery(err);
return;
}
resolveQuery(rows);
},
});
}
);

queryResults.push({
query: sqlText as string,
result,
});
}

if (useTransaction) {
await new Promise<void>((resolveCommit, rejectCommit) => {
connection.execute({
sqlText: 'COMMIT',
complete: (err: SnowflakeError | undefined) => {
if (err) rejectCommit(err);
else resolveCommit();
},
});
});
}

connection.destroy((err: SnowflakeError | undefined) => {
if (err) {
reject(err);
return;
}
resolve(queryResults);
});
} catch (err) {
handleError(err as SnowflakeError); // Reject with the original error!
}
}

executeQueriesSequentially();
});
});
},
});

0 comments on commit 0f20e53

Please sign in to comment.