Skip to content
146 changes: 139 additions & 7 deletions batchOperations.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ var log_level = process.env['LOG_LEVEL'] || 'info';
const winston = require('winston');

const logger = winston.createLogger({
level: debug === true ? 'debug' : log_level,
transports: [
new winston.transports.Console({
format: winston.format.simple()
})
]
level: debug === true ? 'debug' : log_level,
transports: [
new winston.transports.Console({
format: winston.format.simple()
})
]
});

/**
Expand Down Expand Up @@ -89,6 +89,138 @@ function getBatch(setRegion, s3Prefix, batchId, callback) {

exports.getBatch = getBatch;

function cleanBatches(setRegion, s3Prefix, callback) {
init(setRegion);
cleanBatchesSegments(setRegion, s3Prefix, null, callback);
}

function cleanBatchesSegments(setRegion, s3Prefix, lastEvaluatedKey, callback) {
// query for batches based on given s3Prefix
queryBatchByPrefix(setRegion, s3Prefix, lastEvaluatedKey, function (err, data) {
if (err) {
callback(err);
} else {
async.map(data.items, function (batchItem, asyncCallback) {
//clean found batches one by one
cleanBatch(setRegion, batchItem, function (err, data) {
if (err) {
asyncCallback(err);
} else {
asyncCallback(null, data);
}
});
}, function (err, results) {
if (err) {
callback(err);
} else {
if (data.lastEvaluatedKey) {
return cleanBatchesSegments(setRegion, s3Prefix, data.lastEvaluatedKey, callback);
} else {
// deletions are completed
callback(null, {
batchCountDeleted: results.length,
batchesDeleted: results
});
}
}
});
}
});
}

function cleanBatch(setRegion, batchItem, callback) {
// delete batch entry
deleteBatch(batchItem.s3Prefix, batchItem.batchId, function (err, data) {
if (err) {
callback(err);
} else {
if ( !batchItem.entries || batchItem.entries.length <= 0) {
callback(null, data);
} else {
//delete related entries in filesTable
async.map(batchItem.entries, function (processedFile, asyncCallback) {
common.deleteFile(dynamoDB, setRegion, processedFile, function (err, data) {
if (err) {
asyncCallback(err);
} else {
asyncCallback(null, data);
}
});
}, function (err, results) {
if (err) {
callback(err);
} else {
data.processedFilesCountDeleted = results.length;
data.processedFilesDeleted = results;
callback(null, data);
}
});
}
}
});
}

function queryBatchByPrefix(setRegion, s3Prefix, lastEvaluatedKey, callback) {
init(setRegion);
var keyConditionExpression = null;
var keyConditionNames = null;
var keyConditionValues = null;

queryParams = {
TableName: batchTable
};
if (lastEvaluatedKey) {
queryParams.ExclusiveStartKey = lastEvaluatedKey;
}

keyConditionExpression = "#s3Prefix = :s3Prefix";
// add s3Prefix
keyConditionNames = {
"#s3Prefix": "s3Prefix"
};
keyConditionValues = {
":s3Prefix": {
"S": "" + s3Prefix
}
};

queryParams.KeyConditionExpression = keyConditionExpression;
queryParams.ExpressionAttributeNames = keyConditionNames;
queryParams.ExpressionAttributeValues = keyConditionValues;

if (debug == true) {
console.log(queryParams);
}

dynamoDB.query(queryParams, function (err, data) {
if (err) {
callback(err);
} else {
if (data && data.Items) {
var itemsToShow = [];

data.Items.map(function (item) {
toShow = {
s3Prefix: item.s3Prefix.S,
batchId: item.batchId.S,
status: item.status.S,
entries: item.entries.SS,
lastUpdateDate: common.readableTime(item.lastUpdate.N),
lastUpdate: item.lastUpdate.N
};
itemsToShow.push(toShow);
});

callback(null, {items: itemsToShow, lastEvaluatedKey: data.LastEvaluatedKey});
} else {
callback(null, []);
}
}
});
}

exports.cleanBatches = cleanBatches;

/**
* Function which performs a batch query with the provided arguments
*
Expand Down Expand Up @@ -408,4 +540,4 @@ function updateBatchStatus(s3Prefix, thisBatchId, status, requireStatusArray, up
});
};

exports.updateBatchStatus = updateBatchStatus;
exports.updateBatchStatus = updateBatchStatus;
18 changes: 18 additions & 0 deletions cleanBatches.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var batchOperations = require("./batchOperations");

var args = require('minimist')(process.argv.slice(2));

var setRegion = args.region;
var s3Prefix = args.s3Prefix;

batchOperations.cleanBatches(setRegion, s3Prefix, function (err, data) {
if (err) {
console.log("Error: " + err);
process.exit(-1);
} else {
console.log("OK: Deletion of " + data.batchCountDeleted + " Batches");
console.log("Deleted Batch Information:");
console.log(JSON.stringify(data));

}
})
135 changes: 64 additions & 71 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ function createTables(dynamoDB, callback) {
KeyType: 'HASH'
}],
TableName: filesTable,
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 5
}
BillingMode: "PAY_PER_REQUEST"
};
var configKey = s3prefix;
var configSpec = {
Expand All @@ -113,10 +110,7 @@ function createTables(dynamoDB, callback) {
KeyType: 'HASH'
}],
TableName: configTable,
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 5
}
BillingMode: "PAY_PER_REQUEST"
};

var batchKey = batchId;
Expand All @@ -143,10 +137,7 @@ function createTables(dynamoDB, callback) {
KeyType: 'RANGE'
}],
TableName: batchTable,
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 5
},
BillingMode: "PAY_PER_REQUEST",
GlobalSecondaryIndexes: [{
IndexName: batchStatusGSI,
KeySchema: [{
Expand All @@ -158,10 +149,6 @@ function createTables(dynamoDB, callback) {
}],
Projection: {
ProjectionType: 'ALL'
},
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 5
}
}]
};
Expand Down Expand Up @@ -459,65 +446,71 @@ function getS3Arn(bucket, prefix) {

exports.getS3Arn = getS3Arn;


function ensureS3InvokePermisssions(lambda, bucket, prefix, functionName, functionArn, callback) {
lambda.getPolicy({
FunctionName: functionName
}, function (err, data) {
if (err && err.code !== 'ResourceNotFoundException') {
callback(err);
}
var skipCheck = process.env.SKIP_LAMBDA_BUCKET_PERMISSION_CHECK ? Boolean(process.env.SKIP_LAMBDA_BUCKET_PERMISSION_CHECK) : false;
if ( skipCheck ) {
callback();
} else {
lambda.getPolicy({
FunctionName: functionName
}, function (err, data) {
if (err && err.code !== 'ResourceNotFoundException') {
callback(err);
}

var foundMatch = false;
var s3Arn = getS3Arn(bucket);
var sourceAccount = functionArn.split(":")[4];

// process the existing permissions policy if there is one
if (data && data.Policy) {
var statements = JSON.parse(data.Policy).Statement;

statements.map(function (item) {
try {
// check that the source s3 bucket has rights to invoke the function in the correct source account and for the correct bucket
if (item.Principal === "s3.amazonaws.com" &&
item.Action === "lambda.InvokeFunction" &&
item.Resource === functionArn &&
item.Condition.StringEquals['AWS:SourceAccount'] === sourceAccount &&
item.Condition.ArnLike['AWS:SourceArn'] === s3Arn) {
foundMatch = true;
}
} catch (e) {
// this is OK - just means that the policy structure doesn't
// match the above format
var foundMatch = false;
var s3Arn = getS3Arn(bucket);
var sourceAccount = functionArn.split(":")[4];

// process the existing permissions policy if there is one
if (data && data.Policy) {
var statements = JSON.parse(data.Policy).Statement;

statements.map(function (item) {
try {
// check that the source s3 bucket has rights to invoke the function in the correct source account and for the correct bucket
if (item.Principal === "s3.amazonaws.com" &&
item.Action === "lambda.InvokeFunction" &&
item.Resource === functionArn &&
item.Condition.StringEquals['AWS:SourceAccount'] === sourceAccount &&
item.Condition.ArnLike['AWS:SourceArn'] === s3Arn) {
foundMatch = true;
}
} catch (e) {
// this is OK - just means that the policy structure doesn't
// match the above format

}
});
}
}
});
}

if (foundMatch === true) {
logger.info("Found existing Policy match for S3 path to invoke " + functionName);
callback();
} else {
var lambdaPermissions = {
Action: "lambda:InvokeFunction",
FunctionName: functionName,
Principal: "s3.amazonaws.com",
// only use internal account sources
SourceAccount: sourceAccount,
SourceArn: s3Arn,
StatementId: uuid.v4()
};
if (foundMatch === true) {
logger.info("Found existing Policy match for S3 path to invoke " + functionName);
callback();
} else {
var lambdaPermissions = {
Action: "lambda:InvokeFunction",
FunctionName: functionName,
Principal: "s3.amazonaws.com",
// only use internal account sources
SourceAccount: sourceAccount,
SourceArn: s3Arn,
StatementId: uuid.v4()
};

lambda.addPermission(lambdaPermissions, function (err, data) {
if (err) {
logger.error(err);
callback(err);
} else {
logger.info("Granted S3 permission to invoke " + functionArn);
callback();
}
});
}
});
lambda.addPermission(lambdaPermissions, function (err, data) {
if (err) {
logger.error(err);
callback(err);
} else {
logger.info("Granted S3 permission to invoke " + functionArn);
callback();
}
});
}
});
}
}

exports.ensureS3InvokePermisssions = ensureS3InvokePermisssions;
Expand Down Expand Up @@ -870,4 +863,4 @@ function reprocessFile(dynamoDB, s3, region, file, callback) {
});
}

exports.reprocessFile = reprocessFile;
exports.reprocessFile = reprocessFile;
Binary file modified dist/AWSLambdaRedshiftLoader-2.7.8.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion kmsCrypto.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var authContext = {
};

// module key alias to be used for this application
var moduleKeyName = "alias/LambdaRedshiftLoaderKey";
var moduleKeyName = process.env.S3_REDSHIFT_LOADER_KMS_KEY_NAME || "alias/LambdaRedshiftLoaderKey";

function setRegion(region) {
if (!region) {
Expand Down