Skip to content

Commit

Permalink
query via index should return LastEvaKey etc, so client can handle pa…
Browse files Browse the repository at this point in the history
…gination
  • Loading branch information
frankleng committed Apr 30, 2021
1 parent d29c0a7 commit 9378f59
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 92 deletions.
174 changes: 83 additions & 91 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {
QueryCommand,
UpdateItemCommand,
UpdateItemCommandInput,
} from "@aws-sdk/client-dynamodb";
import { marshall, unmarshall } from "@aws-sdk/util-dynamodb";
import { captureAWSv3Client } from "aws-xray-sdk-core";
} from '@aws-sdk/client-dynamodb';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
import { captureAWSv3Client } from 'aws-xray-sdk-core';

export const BATCH_WRITE_RETRY_THRESHOLD = 10;

Expand All @@ -32,12 +32,12 @@ export function chunkList(list: any[], size: number) {
/**
* @param table
*/
export function logTableNameUndefined(table: string = "") {
console.error("Table name is undefined. ", table);
export function logTableNameUndefined(table = '') {
console.error('Table name is undefined. ', table);
console.trace();
}

export type KeyCondMap = { op: "=" | ">" | "<" | ">=" | "<="; value: string | number };
export type KeyCondMap = { op: '=' | '>' | '<' | '>=' | '<='; value: string | number };
export type KeyCondExpressionMap = {
[key: string]: string | number | KeyCondMap;
};
Expand All @@ -46,16 +46,14 @@ export type FilterExpressionMap = {
| string
| number
| (
| KeyCondMap
| {
op: KeyCondMap["op"] & { op: "<>" };
value: KeyCondMap["value"];
}
);
| KeyCondMap
| {
op: KeyCondMap['op'] & { op: '<>' };
value: KeyCondMap['value'];
}
);
};
function getExpressionFromMap(
type: "FilterExpression" | "KeyConditionExpression"
) {
function getExpressionFromMap(type: 'FilterExpression' | 'KeyConditionExpression') {
return (map: KeyCondExpressionMap | FilterExpressionMap) => {
const keyCondExpList = [];
const ExpressionAttributeValues: { [key: string]: string | number } = {};
Expand All @@ -66,7 +64,7 @@ function getExpressionFromMap(
const attribute = `#${key}`;
const anchor = `:${key}`;
ExpressionAttributeNames[attribute] = key;
if (typeof v === "string" || typeof v === "number") {
if (typeof v === 'string' || typeof v === 'number') {
keyCondExpList.push(`${attribute} = ${anchor}`);
ExpressionAttributeValues[anchor] = v;
} else {
Expand All @@ -76,7 +74,7 @@ function getExpressionFromMap(
}
}
return {
[type]: keyCondExpList.join(" and "),
[type]: keyCondExpList.join(' and '),
ExpressionAttributeValues: marshall(ExpressionAttributeValues, {
removeUndefinedValues: true,
}),
Expand All @@ -86,16 +84,16 @@ function getExpressionFromMap(
}

export const getKeyCondExpressionFromMap = (map: KeyCondExpressionMap) =>
getExpressionFromMap("KeyConditionExpression")(map) as {
ExpressionAttributeValues: QueryCommandInput["ExpressionAttributeValues"];
KeyConditionExpression: QueryCommandInput["KeyConditionExpression"];
ExpressionAttributeNames: QueryCommandInput["ExpressionAttributeNames"];
getExpressionFromMap('KeyConditionExpression')(map) as {
ExpressionAttributeValues: QueryCommandInput['ExpressionAttributeValues'];
KeyConditionExpression: QueryCommandInput['KeyConditionExpression'];
ExpressionAttributeNames: QueryCommandInput['ExpressionAttributeNames'];
};
export const getFilterExpressionFromMap = (map: FilterExpressionMap) =>
getExpressionFromMap("FilterExpression")(map) as {
ExpressionAttributeValues: QueryCommandInput["ExpressionAttributeValues"];
FilterExpression: QueryCommandInput["FilterExpression"];
ExpressionAttributeNames: QueryCommandInput["ExpressionAttributeNames"];
getExpressionFromMap('FilterExpression')(map) as {
ExpressionAttributeValues: QueryCommandInput['ExpressionAttributeValues'];
FilterExpression: QueryCommandInput['FilterExpression'];
ExpressionAttributeNames: QueryCommandInput['ExpressionAttributeNames'];
};

/**
Expand All @@ -105,10 +103,10 @@ export const getFilterExpressionFromMap = (map: FilterExpressionMap) =>
* @param consistent
*/
export async function getTableRow(
TableName: GetItemInput["TableName"],
TableName: GetItemInput['TableName'],
keys: { [x: string]: any },
projection?: string[],
consistent?: boolean
consistent?: boolean,
) {
if (!TableName) return logTableNameUndefined();
try {
Expand All @@ -117,8 +115,8 @@ export async function getTableRow(
TableName,
Key: marshall(keys),
};
if (projection) query["ProjectionExpression"] = projection.join(",");
if (typeof consistent !== "undefined") query["ConsistentRead"] = consistent;
if (projection) query['ProjectionExpression'] = projection.join(',');
if (typeof consistent !== 'undefined') query['ConsistentRead'] = consistent;
const result = await ddb.send(new GetItemCommand(query));
return result.Item ? unmarshall(result.Item) : null;
} catch (e) {
Expand All @@ -134,16 +132,14 @@ export async function getTableRow(
* @param retryCount
*/
async function batchWriteTable(
RequestItems: BatchWriteItemInput["RequestItems"],
retryCount = 0
RequestItems: BatchWriteItemInput['RequestItems'],
retryCount = 0,
): Promise<BatchWriteItemCommandOutput | null> {
const ddb = captureAWSv3Client(new DynamoDBClient({}));
const query: BatchWriteItemInput = {
RequestItems,
};
let result: BatchWriteItemCommandOutput | null = await ddb.send(
new BatchWriteItemCommand(query)
);
let result: BatchWriteItemCommandOutput | null = await ddb.send(new BatchWriteItemCommand(query));

if (
retryCount < BATCH_WRITE_RETRY_THRESHOLD &&
Expand All @@ -160,7 +156,7 @@ async function batchWriteTable(
result?.UnprocessedItems &&
Object.keys(result.UnprocessedItems).length
) {
console.log("Unprocessed Items:", result.UnprocessedItems);
console.log('Unprocessed Items:', result.UnprocessedItems);
throw `Batch Write failed to ${process.env.ORDER_SUMMARY_TABLE_NAME}`;
}
return result;
Expand All @@ -169,11 +165,11 @@ async function batchWriteTable(
/**
* @param request
*/
function getBatchWriteRequest(request: "PutRequest" | "DeleteRequest") {
function getBatchWriteRequest(request: 'PutRequest' | 'DeleteRequest') {
return async function (
TableName: PutItemInput["TableName"],
TableName: PutItemInput['TableName'],
unmarshalledList: any[],
predicate?: (item: any) => any
predicate?: (item: any) => any,
) {
if (!TableName) return logTableNameUndefined();
const results = [];
Expand All @@ -182,25 +178,25 @@ function getBatchWriteRequest(request: "PutRequest" | "DeleteRequest") {
// so we have to chunk the list, and create separate requests
const chunkedList = chunkList(unmarshalledList, 25);

console.log("list length", unmarshalledList.length);
console.log("# of chunks", chunkedList.length);
console.log('list length', unmarshalledList.length);
console.log('# of chunks', chunkedList.length);

for (const chunk of chunkedList) {
console.log("chunk length", chunk.length);
const putRequests: BatchWriteItemInput["RequestItems"] = {
console.log('chunk length', chunk.length);
const putRequests: BatchWriteItemInput['RequestItems'] = {
[TableName]: chunk.map((item: any) => {
const row = predicate ? predicate(item) : item;
const marshalledRow = marshall(row, {
removeUndefinedValues: true,
});
if (request === "DeleteRequest") {
if (request === 'DeleteRequest') {
return {
DeleteRequest: {
Key: marshalledRow,
},
};
}
if (request === "PutRequest") {
if (request === 'PutRequest') {
return {
PutRequest: {
Item: marshalledRow,
Expand All @@ -217,50 +213,50 @@ function getBatchWriteRequest(request: "PutRequest" | "DeleteRequest") {
};
}

export const batchPutTable = getBatchWriteRequest("PutRequest");
export const batchDelTable = getBatchWriteRequest("DeleteRequest");
export const batchPutTable = getBatchWriteRequest('PutRequest');
export const batchDelTable = getBatchWriteRequest('DeleteRequest');

/**
* @param request
*/
function getWriteRequest(request: "PutItem" | "DeleteItem") {
return async function (TableName: PutItemInput["TableName"], data: any) {
function getWriteRequest(request: 'PutItem' | 'DeleteItem') {
return async function (TableName: PutItemInput['TableName'], data: any) {
const client = captureAWSv3Client(new DynamoDBClient({}));
if (request === "PutItem") {
if (request === 'PutItem') {
return client.send(
new PutItemCommand({
TableName,
Item: marshall(data, { removeUndefinedValues: true }),
})
}),
);
}
if (request === "DeleteItem") {
if (request === 'DeleteItem') {
return client.send(
new DeleteItemCommand({
TableName,
Key: marshall(data, { removeUndefinedValues: true }),
})
}),
);
}
return null;
};
}

export const putTableRow = getWriteRequest("PutItem");
export const delTableRow = getWriteRequest("DeleteItem");
export const putTableRow = getWriteRequest('PutItem');
export const delTableRow = getWriteRequest('DeleteItem');

/**
* @param TableName
* @param IndexName
* @param params
*/
export async function queryTableIndex(
TableName: QueryCommandInput["TableName"],
IndexName: QueryCommandInput["IndexName"],
params?: Partial<Omit<QueryCommandInput, "TableName" | "IndexName">> & {
TableName: QueryCommandInput['TableName'],
IndexName: QueryCommandInput['IndexName'],
params?: Partial<Omit<QueryCommandInput, 'TableName' | 'IndexName'>> & {
keyCondExpressionMap?: KeyCondExpressionMap;
filterExpressionMap?: FilterExpressionMap;
}
},
) {
try {
const client = captureAWSv3Client(new DynamoDBClient({}));
Expand All @@ -277,58 +273,54 @@ export async function queryTableIndex(
ExpressionAttributeValues,
ExpressionAttributeNames,
} = getKeyCondExpressionFromMap(keyCondExpressionMap);
query["KeyConditionExpression"] = KeyConditionExpression;
query["ExpressionAttributeNames"] = query["ExpressionAttributeNames"]
query['KeyConditionExpression'] = KeyConditionExpression;
query['ExpressionAttributeNames'] = query['ExpressionAttributeNames']
? {
...query["ExpressionAttributeNames"],
...ExpressionAttributeNames,
}
...query['ExpressionAttributeNames'],
...ExpressionAttributeNames,
}
: ExpressionAttributeNames;
query["ExpressionAttributeValues"] = query["ExpressionAttributeValues"]
query['ExpressionAttributeValues'] = query['ExpressionAttributeValues']
? {
...query["ExpressionAttributeValues"],
...ExpressionAttributeValues,
}
...query['ExpressionAttributeValues'],
...ExpressionAttributeValues,
}
: ExpressionAttributeValues;
}
if (filterExpressionMap) {
const {
FilterExpression,
ExpressionAttributeValues,
ExpressionAttributeNames,
} = getFilterExpressionFromMap(filterExpressionMap);
query["FilterExpression"] = FilterExpression;
query["ExpressionAttributeNames"] = query["ExpressionAttributeNames"]
const { FilterExpression, ExpressionAttributeValues, ExpressionAttributeNames } = getFilterExpressionFromMap(
filterExpressionMap,
);
query['FilterExpression'] = FilterExpression;
query['ExpressionAttributeNames'] = query['ExpressionAttributeNames']
? {
...query["ExpressionAttributeNames"],
...ExpressionAttributeNames,
}
...query['ExpressionAttributeNames'],
...ExpressionAttributeNames,
}
: ExpressionAttributeNames;
query["ExpressionAttributeValues"] = query["ExpressionAttributeValues"]
query['ExpressionAttributeValues'] = query['ExpressionAttributeValues']
? {
...query["ExpressionAttributeValues"],
...ExpressionAttributeValues,
}
...query['ExpressionAttributeValues'],
...ExpressionAttributeValues,
}
: ExpressionAttributeValues;
}
}
const result = await client.send(new QueryCommand(query));
return result.Count && result.Items
? result.Items.map((row) => unmarshall(row))
: null;
const { Items, ...rest } = await client.send(new QueryCommand(query));
return { list: Items?.length ? Items.map((row) => unmarshall(row)) : [], ...rest };
} catch (e) {
console.error(e);
return null;
}
}

export async function updateTableRow(
TableName: UpdateItemCommandInput["TableName"],
TableName: UpdateItemCommandInput['TableName'],
keys: { [x: string]: any },
UpdateExpression: string,
expressionAttributeValues: { [x: string]: any },
ExpressionAttributeNames?: { [x: string]: string },
ReturnValues = "ALL_NEW"
ReturnValues = 'ALL_NEW',
) {
if (!TableName) return logTableNameUndefined();
const ddb = captureAWSv3Client(new DynamoDBClient({}));
Expand All @@ -353,9 +345,9 @@ export async function updateTableRow(
* @param row
*/
export async function shallowUpdateTableRow(
TableName: UpdateItemCommandInput["TableName"],
TableName: UpdateItemCommandInput['TableName'],
keys: { [x: string]: any },
row: { [x: string]: any }
row: { [x: string]: any },
) {
const updateExpressions = [];
const expressionAttributeValues: {
Expand All @@ -375,8 +367,8 @@ export async function shallowUpdateTableRow(
return updateTableRow(
TableName,
keys,
`SET ${updateExpressions.join(", ")}`,
`SET ${updateExpressions.join(', ')}`,
expressionAttributeValues,
expressionAttributeNames
expressionAttributeNames,
);
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dynadash",
"version": "1.0.1",
"version": "1.0.2",
"description": "DynamoDb helpers",
"main": "dist/index.js",
"scripts": {
Expand Down

0 comments on commit 9378f59

Please sign in to comment.