-
Notifications
You must be signed in to change notification settings - Fork 17
DynamoDB support (WIP) #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
3e7d23e
38f5fad
560df0c
c5705b7
882c899
3306303
94bdf77
85a5330
5d28f89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,8 +14,14 @@ const create = () => { | |
switch (vendor.toLowerCase()) { | ||
|
||
case 'aws': | ||
internalConfigReader = new aws.AwsConfigReader(secretId || DefaultSecretId, region) | ||
break; | ||
switch(type) { | ||
case 'dynamo': | ||
internalConfigReader = new aws.AwsDynamoConfigReader(region) | ||
break | ||
default: | ||
internalConfigReader = new aws.AwsConfigReader(secretId || DefaultSecretId, region) | ||
} | ||
break | ||
|
||
case 'gcp': | ||
switch (type) { | ||
|
@@ -58,6 +64,9 @@ const create = () => { | |
case 'airtable': | ||
internalConfigReader = new gcp.GcpAirtableConfigReader() | ||
break; | ||
case 'dynamo': | ||
internalConfigReader = new aws.AwsDynamoConfigReader(region) | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gcp doesn't have dynamo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's not in gcp, it's in azure for the e2e test. |
||
case 'mssql': | ||
case 'mysql': | ||
case 'postgres': | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package-lock.json | ||
node_modules | ||
config.json | ||
schemas.json | ||
.gcloudignore | ||
.eslint | ||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
module.exports = { | ||
clearMocks: true, | ||
verbose: true, | ||
roots: ['<rootDir>/lib'], | ||
preset: "ts-jest", | ||
testRegex: '(.*\\.spec\\.)js$', | ||
testEnvironment: "node" | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
const SchemaProvider = require('./dynamo_schema_provider') | ||
const DataProvider = require('./dynamo_data_provider') | ||
const FilterParser = require('./sql_filter_transformer') | ||
const DatabaseOperations = require('./dynamo_operations') | ||
const { DynamoDB } = require('@aws-sdk/client-dynamodb') | ||
|
||
const extraOptions = (cfg) => { | ||
if (cfg.endpoint) | ||
return { endpoint: cfg.endpoint } | ||
} | ||
|
||
const init = async(cfg, _cfgOptions) => { //remove async | ||
//cfg : region , endpoint(optional) , | ||
const options = _cfgOptions || {} | ||
const client = new DynamoDB({region:cfg.region, ...extraOptions(cfg), ...options}) | ||
const databaseOperations = new DatabaseOperations(client) | ||
|
||
const filterParser = new FilterParser() | ||
const dataProvider = new DataProvider(client, filterParser) | ||
const schemaProvider = new SchemaProvider(client) | ||
|
||
return { dataProvider, schemaProvider, databaseOperations, connection: client, cleanup: () => {}} | ||
} | ||
|
||
module.exports = init |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
const { patchDateTime, updateFieldsFor } = require('velo-external-db-commons') | ||
const { DynamoDBDocument } = require ('@aws-sdk/lib-dynamodb') | ||
const { validateTable, patchFixDates } = require('./dynamo_utils') | ||
|
||
class DataProvider { | ||
constructor(client, filterParser) { | ||
this.filterParser = filterParser | ||
this.client = client | ||
this.docClient = DynamoDBDocument.from(client) | ||
|
||
} | ||
|
||
async find(collectionName, filter, sort, skip, limit) { | ||
const {filterExpr} = this.filterParser.transform(filter) | ||
const response = await this.docClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's better to write
|
||
.scan({TableName: collectionName, | ||
...filterExpr, | ||
Limit:limit | ||
}) | ||
return response.Items.map(patchFixDates) | ||
} | ||
|
||
async count(collectionName, filter) { | ||
const {filterExpr} = this.filterParser.transform(filter) | ||
const response = await this.docClient | ||
.scan({TableName: collectionName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's discuss the difference between scan and query |
||
...filterExpr, | ||
Select: 'COUNT' | ||
}) | ||
return response.Count | ||
} | ||
|
||
async insert(collectionName, items) { | ||
validateTable() | ||
await this.docClient | ||
.batchWrite(this.batchPutItemsExpression(collectionName, items.map(patchDateTime))) | ||
return items.length //check if there is a way to figure how many deleted/inserted with batchWrite | ||
} | ||
|
||
async update(collectionName, items) { | ||
await this.docClient.transactWrite({ | ||
TransactItems:items.map(item=>this.updateSingleItemExpression(collectionName, patchDateTime(item))) | ||
}) | ||
return items.length | ||
} | ||
|
||
async delete(collectionName, itemIds) { | ||
validateTable() | ||
await this.docClient | ||
.batchWrite(this.batchDeleteItemsExpression(collectionName, itemIds)) | ||
return itemIds.length | ||
} | ||
|
||
async truncate(collectionName) { | ||
validateTable(collectionName) | ||
const rows = await this.docClient | ||
.scan({ | ||
TableName: collectionName, | ||
AttributesToGet: ['_id'] | ||
}) | ||
|
||
await this.docClient | ||
.batchWrite(this.batchDeleteItemsExpression(collectionName, rows.Items.map(item=>item._id))) | ||
} | ||
|
||
batchPutItemsExpression(collectionName, items) { | ||
return { | ||
RequestItems: { | ||
[collectionName]: items.map(this.putSingleItemExpression) | ||
} | ||
} | ||
} | ||
|
||
putSingleItemExpression(item) { | ||
return { | ||
PutRequest: { | ||
Item: { | ||
...item | ||
} | ||
} | ||
} | ||
} | ||
|
||
batchDeleteItemsExpression(collectionName, itemIds) { | ||
return { | ||
RequestItems: { | ||
[collectionName]: itemIds.map(this.deleteSingleItemExpression) | ||
} | ||
} | ||
} | ||
|
||
deleteSingleItemExpression(id) { | ||
return { | ||
DeleteRequest: { | ||
Key: { | ||
_id : id | ||
} | ||
} | ||
} | ||
} | ||
|
||
updateSingleItemExpression(collectionName, item) { | ||
const updateFields = updateFieldsFor(item) | ||
const updateExpression = `SET ${updateFields.map(f => `#${f} = :${f}`).join(', ')}` | ||
const expressionAttributeNames = updateFields.reduce((pv, cv)=> ({ ...pv, [`#${cv}`]: cv }), {}) | ||
const expressionAttributeValues = updateFields.reduce((pv, cv)=> ({ ...pv, [`:${cv}`]: item[cv]}), {}) | ||
|
||
return { | ||
Update: { | ||
TableName: collectionName, | ||
Key: { | ||
_id: item._id | ||
}, | ||
UpdateExpression: updateExpression, | ||
ExpressionAttributeNames: expressionAttributeNames, | ||
ExpressionAttributeValues: expressionAttributeValues | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
module.exports = DataProvider |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
const { notThrowingTranslateErrorCodes } = require('./sql_exception_translator') | ||
|
||
class DatabaseOperations { | ||
constructor(client) { | ||
this.client = client | ||
} | ||
|
||
async validateConnection() { | ||
return await this.client.listTables({}).then(() => { return { valid: true } }) | ||
.catch((e) => { return { valid: false, error: notThrowingTranslateErrorCodes(e) } }) | ||
} | ||
} | ||
|
||
module.exports = DatabaseOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename the AwsConfigReader to something else