Skip to content

Commit

Permalink
Implement log replication prefix filtering (#8087)
Browse files Browse the repository at this point in the history
Implement support for object prefix filtering in log-based replication optimization
Signed-off-by: Ben <[email protected]>
  • Loading branch information
Neon-White committed May 30, 2024
1 parent 540e84b commit 426e7c7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
48 changes: 29 additions & 19 deletions src/server/bg_services/replication_log_parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async function get_log_candidates(source_bucket_id, rule_id, replication_config,

async function get_aws_log_candidates(source_bucket_id, rule_id, replication_config, candidates_limit, sync_deletions) {
const aws_log_replication_info = replication_config.log_replication_info.aws_log_replication_info;
const obj_prefix_filter = _get_obj_prefix_filter_for_rule(rule_id, replication_config);
const { logs_bucket, prefix } = aws_log_replication_info.logs_location;
const s3 = _get_source_bucket_aws_connection(source_bucket_id, aws_log_replication_info);
let log_object_continuation_token = _get_log_object_continuation_token_for_rule(rule_id, replication_config);
Expand All @@ -60,7 +61,7 @@ async function get_aws_log_candidates(source_bucket_id, rule_id, replication_con
}

const next_log_data = await _aws_get_next_log(s3, logs_bucket, next_log_entry.Contents[0].Key);
aws_parse_log_object(logs, next_log_data, sync_deletions);
aws_parse_log_object(logs, next_log_data, sync_deletions, obj_prefix_filter);

dbg.log1("get_aws_log_candidates: parsed logs ", logs);

Expand All @@ -85,7 +86,7 @@ async function get_azure_log_candidates(source_bucket_id, rule_id, replication_c
const namespace_resource = source_bucket.namespace.write_resource.resource;
const src_storage_account = namespace_resource.connection.access_key;
const src_container_name = namespace_resource.connection.target_bucket;
const prefix = replication_config.log_replication_info.azure_log_replication_info.prefix || '';
const obj_prefix_filter = _get_obj_prefix_filter_for_rule(rule_id, replication_config) || '';
const { logs_query_client, monitor_workspace_id } = _get_source_bucket_azure_connection(source_bucket_id);
let candidates;

Expand Down Expand Up @@ -126,7 +127,7 @@ async function get_azure_log_candidates(source_bucket_id, rule_id, replication_c
| project Time=_TimeReceived, Action=substring(Category, 7), Key=ObjectKey
| sort by Time asc
| where Action == "Write" or Action == "Delete"
| where Key startswith "/${src_storage_account.unwrap()}/${src_container_name}/${prefix}"
| where Key startswith "/${src_storage_account.unwrap()}/${src_container_name}/${obj_prefix_filter}"
| where Key !contains "test-delete-non-existing-"
| parse Key with * "/" StorageAccount "/" Container "/" Key
| project Time, Action, Key`;
Expand Down Expand Up @@ -281,30 +282,33 @@ async function _aws_get_next_log(s3, bucket, key) {
* @param {nb.ReplicationLogs} logs - Log array
* @param {*} log_object - AWS log object
* @param {boolean} sync_deletions - Whether deletions should be synced or not
* @param {string} obj_prefix_filter - Object prefix filter
*/
function aws_parse_log_object(logs, log_object, sync_deletions) {
function aws_parse_log_object(logs, log_object, sync_deletions, obj_prefix_filter) {
const log_string = log_object.Body.toString();
const log_array = log_string.split("\n");

for (const line of log_array) {
if (line !== '') {
const log = _parse_aws_log_entry(line);
if (log.operation) {
if (log.operation.includes('PUT.OBJECT') || log.operation.includes('POST.OBJECT')) {
logs.push({
key: log.key,
action: 'copy',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain copy (PUT or POST) entry');
}
if (log.operation.includes('DELETE.OBJECT') && sync_deletions && log.http_status === 204) {
logs.push({
key: log.key,
action: 'delete',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain delete (DELETE) entry');
if (obj_prefix_filter === undefined || log.key?.startsWith(obj_prefix_filter)) {
if (log.operation.includes('PUT.OBJECT') || log.operation.includes('POST.OBJECT')) {
logs.push({
key: log.key,
action: 'copy',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain copy (PUT or POST) entry');
}
if (log.operation.includes('DELETE.OBJECT') && sync_deletions && log.http_status === 204) {
logs.push({
key: log.key,
action: 'delete',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain delete (DELETE) entry');
}
}
}
}
Expand Down Expand Up @@ -537,6 +541,12 @@ function _get_log_object_continuation_token_for_rule(rule_id, replication_config
return replication_rule?.rule_log_status?.log_marker?.continuation_token;
}

function _get_obj_prefix_filter_for_rule(rule_id, replication_config) {
dbg.log1('_get_obj_prefix_filter_for_rule: ', rule_id, 'replication_config: ', replication_config);
const replication_rule = replication_config.rules.find(rule => rule.rule_id === rule_id);
return replication_rule?.filter?.prefix;
}

// EXPORTS
exports.get_log_candidates = get_log_candidates;
exports.aws_parse_log_object = aws_parse_log_object;
Expand Down
4 changes: 2 additions & 2 deletions src/server/system_services/replication_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ class ReplicationStore {
dbg.log1('find_log_based_replication_rules: ');
const replications = await this._replicationconfigs.find({ deleted: null });
const reduced_replications = _.filter(
replications, repl =>
repl.log_replication_info?.azure_log_replication_info || repl.log_replication_info?.aws_log_replication_info
replications, repl => repl.log_replication_info?.endpoint_type ||
repl.log_replication_info?.aws_log_replication_info
);
// TODO: Further transformation of the data can be done here - refer to find_rules_updated_longest_time_ago
dbg.log1('find_log_based_replication_rules: ', reduced_replications);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('AWS S3 server log parsing tests', () => {
aaa test.bucket [13/Feb/2023:15:08:28 +0000] 1.1.1.1 arn:aws:iam::111:user/user AAA BATCH.DELETE.OBJECT text.txt - 204 - - 1 - - - - - AAA SigV4 ECDHE-RSA-AES128-GCM-SHA256 AuthHeader s3.us-east-2.amazonaws.com TLSv1.2 - -
` };
const action_dictionary = { 'test': 'delete', 'test.js': 'delete', 'code2': 'copy', 'test2': 'delete', 'testfile.js': 'delete', 'empty': 'copy', 'text.txt': 'delete' };
log_parser.aws_parse_log_object(logs, example_log, true);
log_parser.aws_parse_log_object(logs, example_log, true, '');
// Make sure the test doesn't pass in case the parsing fails
expect(logs.length).toEqual(Object.keys(action_dictionary).length);
// Make sure all expected actions are mapped to the appropriate keys
Expand All @@ -38,7 +38,7 @@ describe('AWS S3 server log parsing tests', () => {
});
// Test with sync_deletions set to false
logs.length = 0;
log_parser.aws_parse_log_object(logs, example_log, false);
log_parser.aws_parse_log_object(logs, example_log, false, '');
// Delete all action_dictionary keys whose value is delete
Object.keys(action_dictionary).forEach(key => {
if (action_dictionary[key] === 'delete') {
Expand All @@ -55,7 +55,7 @@ describe('AWS S3 server log parsing tests', () => {
aaa test.bucket [13/Feb/2023:09:08:28 +0000] 1.1.1.1 arn:aws:iam::111:user/user AAA BATCH.DELETE.OBJECT other_obj - 204 - - 1 - - - - - AAA SigV4 ECDHE-RSA-AES128-GCM-SHA256 AuthHeader s3.us-east-2.amazonaws.com TLSv1.2 - -
aaa test.bucket [13/Feb/2023:09:08:56 +0000] 0.0.0.0 arn:aws:iam::111:user/user AAA REST.PUT.OBJECT test "PUT /test.bucket/test?X-Amz-Security-Token=AAAAAAAAAAAAAAA=20230213T160856Z&X-Amz-AAAAAA HTTP/1.1" 200 - - 1 1 1 "https://s3.console.aws.amazon.com/s3/upload/test.bucket?region=us-east-2" "AAA/5.0 (AAA 1.1; AAA; AAA) AAA/1.1 (KHTML, like Gecko) AAA/1.1 AAA/1.1" - AAAA SigV4 ECDHE-RSA-AES128-GCM-SHA256 QueryString s3.us-east-2.amazonaws.com TLSv1.2 - -
` };
log_parser.aws_parse_log_object(logs, example_log, true);
log_parser.aws_parse_log_object(logs, example_log, true, '');
const candidates = log_parser.create_candidates(logs);
// DELETE log should be the latest log present inside the candidate, as candidate storing only latest log per key
expect(candidates.test.action).toEqual('delete');
Expand Down

0 comments on commit 426e7c7

Please sign in to comment.