diff --git a/FetchMigration/python/fetch_orchestrator.py b/FetchMigration/python/fetch_orchestrator.py index a624dd191e..8908678eeb 100644 --- a/FetchMigration/python/fetch_orchestrator.py +++ b/FetchMigration/python/fetch_orchestrator.py @@ -59,7 +59,7 @@ def write_inline_pipeline(pipeline_file_path: str, inline_pipeline: str, inline_ def write_inline_target_host(pipeline_file_path: str, inline_target_host: str): - with open(pipeline_file_path, 'rw') as pipeline_file: + with open(pipeline_file_path, 'r+') as pipeline_file: pipeline_yaml = yaml.safe_load(pipeline_file) update_target_host(pipeline_yaml, inline_target_host) # Note - this does not preserve comments @@ -84,7 +84,7 @@ def run(params: FetchOrchestratorParams) -> Optional[int]: report=True, dryrun=params.is_dry_run) logging.info("Running metadata migration...\n") metadata_migration_result = metadata_migration.run(metadata_migration_params) - if len(metadata_migration_result.created_indices) > 0 and not params.is_only_metadata_migration(): + if len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration(): # Kick off a subprocess for Data Prepper logging.info("Running Data Prepper...\n") proc = subprocess.Popen(dp_exec_path) diff --git a/FetchMigration/python/index_diff.py b/FetchMigration/python/index_diff.py new file mode 100644 index 0000000000..15d84f0800 --- /dev/null +++ b/FetchMigration/python/index_diff.py @@ -0,0 +1,35 @@ +import utils +from index_operations import SETTINGS_KEY, MAPPINGS_KEY + + +# Computes and captures differences in indices between a "source" cluster +# and a "target" cluster. Indices that exist on the source cluster but not +# on the target cluster are considered "to-create". "Conflicting" indices +# are present on both source and target clusters, but differ in their index +# settings or mappings. +class IndexDiff: + indices_to_create: set + identical_indices: set + identical_empty_indices: set + conflicting_indices: set + + def __init__(self, source: dict, target: dict): + self.identical_empty_indices = set() + self.conflicting_indices = set() + # Compute index names that are present in both the source and target + indices_intersection = set(source.keys()) & set(target.keys()) + # Check if these "common" indices are identical or have metadata conflicts + for index in indices_intersection: + # Check settings + if utils.has_differences(SETTINGS_KEY, source[index], target[index]): + self.conflicting_indices.add(index) + # Check mappings + if utils.has_differences(MAPPINGS_KEY, source[index], target[index]): + self.conflicting_indices.add(index) + # Identical indices are the subset that do not have metadata conflicts + self.identical_indices = set(indices_intersection) - set(self.conflicting_indices) + # Indices that are not already on the target need to be created + self.indices_to_create = set(source.keys()) - set(indices_intersection) + + def set_identical_empty_indices(self, indices: set): + self.identical_empty_indices = indices diff --git a/FetchMigration/python/index_doc_count.py b/FetchMigration/python/index_doc_count.py new file mode 100644 index 0000000000..90b8910661 --- /dev/null +++ b/FetchMigration/python/index_doc_count.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + + +# Captures the doc_count for indices in a cluster, and also computes a total +@dataclass +class IndexDocCount: + total: int + index_doc_count_map: dict diff --git a/FetchMigration/python/index_operations.py b/FetchMigration/python/index_operations.py index a311ec7e61..b889ae116a 100644 --- a/FetchMigration/python/index_operations.py +++ b/FetchMigration/python/index_operations.py @@ -1,14 +1,22 @@ +import jsonpath_ng import requests from endpoint_info import EndpointInfo # Constants +from index_doc_count import IndexDocCount + SETTINGS_KEY = "settings" MAPPINGS_KEY = "mappings" COUNT_KEY = "count" __INDEX_KEY = "index" __ALL_INDICES_ENDPOINT = "*" -__COUNT_ENDPOINT = "/_count" +__SEARCH_COUNT_PATH = "/_search?size=0" +__SEARCH_COUNT_PAYLOAD = {"aggs": {"count": {"terms": {"field": "_index"}}}} +__TOTAL_COUNT_JSONPATH = jsonpath_ng.parse("$.hits.total.value") +__INDEX_COUNT_JSONPATH = jsonpath_ng.parse("$.aggregations.count.buckets") +__BUCKET_INDEX_NAME_KEY = "key" +__BUCKET_DOC_COUNT_KEY = "doc_count" __INTERNAL_SETTINGS_KEYS = ["creation_date", "uuid", "provided_name", "version", "store"] @@ -43,9 +51,16 @@ def create_indices(indices_data: dict, endpoint: EndpointInfo): raise RuntimeError(f"Failed to create index [{index}] - {e!s}") -def doc_count(indices: set, endpoint: EndpointInfo) -> int: - count_endpoint_suffix: str = ','.join(indices) + __COUNT_ENDPOINT +def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount: + count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix) - resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl()) + resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(), + json=__SEARCH_COUNT_PAYLOAD) + # TODO Handle resp.status_code for non successful requests result = dict(resp.json()) - return int(result[COUNT_KEY]) + total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value + counts_list: list = __INDEX_COUNT_JSONPATH.find(result)[0].value + count_map = dict() + for entry in counts_list: + count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY] + return IndexDocCount(total, count_map) diff --git a/FetchMigration/python/metadata_migration.py b/FetchMigration/python/metadata_migration.py index a86eb330ce..4cd2c3fc48 100644 --- a/FetchMigration/python/metadata_migration.py +++ b/FetchMigration/python/metadata_migration.py @@ -1,10 +1,12 @@ import argparse +import logging import yaml import endpoint_utils import index_operations import utils +from index_diff import IndexDiff from metadata_migration_params import MetadataMigrationParams from metadata_migration_result import MetadataMigrationResult @@ -14,13 +16,13 @@ INDEX_NAME_KEY = "index_name_regex" -def write_output(yaml_data: dict, new_indices: set, output_path: str): +def write_output(yaml_data: dict, indices_to_migrate: set, output_path: str): pipeline_config = next(iter(yaml_data.values())) # Result is a tuple of (type, config) source_config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SOURCE_KEY)[1] source_indices = source_config.get(INDICES_KEY, dict()) included_indices = source_indices.get(INCLUDE_KEY, list()) - for index in new_indices: + for index in indices_to_migrate: included_indices.append({INDEX_NAME_KEY: index}) source_indices[INCLUDE_KEY] = included_indices source_config[INDICES_KEY] = source_indices @@ -28,36 +30,15 @@ def write_output(yaml_data: dict, new_indices: set, output_path: str): yaml.dump(yaml_data, out_file) -# Computes differences in indices between source and target. -# Returns a tuple with 3 elements: -# - The 1st element is the set of indices to create on the target -# - The 2nd element is a set of indices that are identical on source and target -# - The 3rd element is a set of indices that are present on both source and target, -# but differ in their settings or mappings. -def get_index_differences(source: dict, target: dict) -> tuple[set, set, set]: - index_conflicts = set() - indices_in_target = set(source.keys()) & set(target.keys()) - for index in indices_in_target: - # Check settings - if utils.has_differences(index_operations.SETTINGS_KEY, source[index], target[index]): - index_conflicts.add(index) - # Check mappings - if utils.has_differences(index_operations.MAPPINGS_KEY, source[index], target[index]): - index_conflicts.add(index) - identical_indices = set(indices_in_target) - set(index_conflicts) - indices_to_create = set(source.keys()) - set(indices_in_target) - return indices_to_create, identical_indices, index_conflicts - - -# The order of data in the tuple is: -# (indices to create), (identical indices), (indices with conflicts) -def print_report(index_differences: tuple[set, set, set], count: int): # pragma no cover - print("Identical indices in the target cluster (no changes will be made): " + - utils.string_from_set(index_differences[1])) - print("Indices in target cluster with conflicting settings/mappings: " + - utils.string_from_set(index_differences[2])) - print("Indices to create: " + utils.string_from_set(index_differences[0])) - print("Total documents to be moved: " + str(count)) +def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover + logging.info("Identical indices in the target cluster: " + utils.string_from_set(diff.identical_indices)) + logging.info("Identical empty indices in the target cluster (data will be migrated): " + + utils.string_from_set(diff.identical_empty_indices)) + logging.info("Indices present in both clusters with conflicting settings/mappings (data will not be migrated): " + + utils.string_from_set(diff.conflicting_indices)) + logging.info("Indices to be created in the target cluster (data will be migrated): " + + utils.string_from_set(diff.indices_to_create)) + logging.info("Total number of documents to be moved: " + str(total_doc_count)) def run(args: MetadataMigrationParams) -> MetadataMigrationResult: @@ -83,23 +64,29 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult: return result target_indices = index_operations.fetch_all_indices(target_endpoint_info) # Compute index differences and print report - diff = get_index_differences(source_indices, target_indices) - # The first element in the tuple is the set of indices to create - indices_to_create = diff[0] - if indices_to_create: - result.created_indices = indices_to_create - result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info) + diff = IndexDiff(source_indices, target_indices) + if diff.identical_indices: + # Identical indices with zero documents on the target are eligible for migration + target_doc_count = index_operations.doc_count(diff.identical_indices, target_endpoint_info) + # doc_count only returns indices that have non-zero counts, so the difference in responses + # gives us the set of identical, empty indices + result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys()) + diff.set_identical_empty_indices(result.migration_indices) + if diff.indices_to_create: + result.migration_indices.update(diff.indices_to_create) + if result.migration_indices: + doc_count_result = index_operations.doc_count(result.migration_indices, source_endpoint_info) + result.target_doc_count = doc_count_result.total if args.report: print_report(diff, result.target_doc_count) - if indices_to_create: + if result.migration_indices: # Write output YAML if len(args.output_file) > 0: - write_output(dp_config, indices_to_create, args.output_file) - if args.report: # pragma no cover - print("Wrote output YAML pipeline to: " + args.output_file) + write_output(dp_config, result.migration_indices, args.output_file) + logging.debug("Wrote output YAML pipeline to: " + args.output_file) if not args.dryrun: index_data = dict() - for index_name in indices_to_create: + for index_name in diff.indices_to_create: index_data[index_name] = source_indices[index_name] index_operations.create_indices(index_data, target_endpoint_info) return result diff --git a/FetchMigration/python/metadata_migration_result.py b/FetchMigration/python/metadata_migration_result.py index d2122f5df7..7bf325eaaf 100644 --- a/FetchMigration/python/metadata_migration_result.py +++ b/FetchMigration/python/metadata_migration_result.py @@ -4,4 +4,5 @@ @dataclass class MetadataMigrationResult: target_doc_count: int = 0 - created_indices: set = field(default_factory=set) + # Set of indices for which data needs to be migrated + migration_indices: set = field(default_factory=set) diff --git a/FetchMigration/python/migration_monitor.py b/FetchMigration/python/migration_monitor.py index 481d738964..24d81f6b36 100644 --- a/FetchMigration/python/migration_monitor.py +++ b/FetchMigration/python/migration_monitor.py @@ -162,6 +162,6 @@ def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_i help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated" ) namespace = arg_parser.parse_args() - print("\n##### Starting monitor tool... #####\n") + logging.info("\n##### Starting monitor tool... #####\n") run(MigrationMonitorParams(namespace.target_count, namespace.data_prepper_endpoint)) - print("\n##### Ending monitor tool... #####\n") + logging.info("\n##### Ending monitor tool... #####\n") diff --git a/FetchMigration/python/requirements.txt b/FetchMigration/python/requirements.txt index a8f57b550c..b9352bd738 100644 --- a/FetchMigration/python/requirements.txt +++ b/FetchMigration/python/requirements.txt @@ -1,5 +1,6 @@ botocore>=1.31.70 jsondiff>=2.0.0 +jsonpath-ng>=1.6.0 prometheus-client>=0.17.1 pyyaml>=6.0.1 requests>=2.31.0 diff --git a/FetchMigration/python/tests/test_endpoint_utils.py b/FetchMigration/python/tests/test_endpoint_utils.py index 03486ed3d0..5a435442e8 100644 --- a/FetchMigration/python/tests/test_endpoint_utils.py +++ b/FetchMigration/python/tests/test_endpoint_utils.py @@ -8,6 +8,7 @@ from moto import mock_iam import endpoint_utils +from endpoint_info import EndpointInfo from tests import test_constants # Constants @@ -71,6 +72,10 @@ def test_is_insecure_missing_nested(self): test_input = {"key1": 123, CONNECTION_KEY: {"key2": "val"}} self.assertFalse(endpoint_utils.is_insecure(test_input)) + def test_auth_normalized_url(self): + val = EndpointInfo("test") + self.assertEqual("test/", val.get_url()) + def test_get_auth_returns_none(self): # The following inputs should not return an auth tuple: # - Empty input diff --git a/FetchMigration/python/tests/test_fetch_orchestrator.py b/FetchMigration/python/tests/test_fetch_orchestrator.py index 826d3baff0..96260073f0 100644 --- a/FetchMigration/python/tests/test_fetch_orchestrator.py +++ b/FetchMigration/python/tests/test_fetch_orchestrator.py @@ -157,7 +157,7 @@ def test_write_inline_target_host(self, mock_file_open: MagicMock, mock_yaml_loa mock_file_open.reset_mock() mock_yaml_dump.reset_mock() orchestrator.write_inline_target_host("test", val) - mock_file_open.assert_called_once_with("test", "rw") + mock_file_open.assert_called_once_with("test", "r+") mock_yaml_dump.assert_called_once_with(expected_pipeline, ANY) def test_update_target_host_bad_config(self): diff --git a/FetchMigration/python/tests/test_index_diff.py b/FetchMigration/python/tests/test_index_diff.py new file mode 100644 index 0000000000..32f3203862 --- /dev/null +++ b/FetchMigration/python/tests/test_index_diff.py @@ -0,0 +1,73 @@ +import copy +import unittest + +from index_diff import IndexDiff +from tests import test_constants + + +class TestIndexDiff(unittest.TestCase): + def test_index_diff_empty(self): + # Base case should return an empty list + diff = IndexDiff(dict(), dict()) + # All members should be empty + self.assertEqual(set(), diff.indices_to_create) + self.assertEqual(set(), diff.identical_indices) + self.assertEqual(set(), diff.conflicting_indices) + + def test_index_diff_empty_target(self): + diff = IndexDiff(test_constants.BASE_INDICES_DATA, dict()) + # No conflicts or identical indices + self.assertEqual(set(), diff.conflicting_indices) + self.assertEqual(set(), diff.identical_indices) + # Indices-to-create + self.assertEqual(3, len(diff.indices_to_create)) + self.assertTrue(test_constants.INDEX1_NAME in diff.indices_to_create) + self.assertTrue(test_constants.INDEX2_NAME in diff.indices_to_create) + self.assertTrue(test_constants.INDEX3_NAME in diff.indices_to_create) + + def test_index_diff_identical_index(self): + test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) + del test_data[test_constants.INDEX2_NAME] + del test_data[test_constants.INDEX3_NAME] + diff = IndexDiff(test_data, test_data) + # No indices to move, or conflicts + self.assertEqual(set(), diff.indices_to_create) + self.assertEqual(set(), diff.conflicting_indices) + # Identical indices + self.assertEqual(1, len(diff.identical_indices)) + self.assertTrue(test_constants.INDEX1_NAME in diff.identical_indices) + + def test_index_diff_settings_conflict(self): + test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) + # Set up conflict in settings + index_settings = test_data[test_constants.INDEX2_NAME][test_constants.SETTINGS_KEY] + index_settings[test_constants.INDEX_KEY][test_constants.NUM_REPLICAS_SETTING] += 1 + diff = IndexDiff(test_constants.BASE_INDICES_DATA, test_data) + # No indices to move + self.assertEqual(set(), diff.indices_to_create) + # Identical indices + self.assertEqual(2, len(diff.identical_indices)) + self.assertTrue(test_constants.INDEX1_NAME in diff.identical_indices) + self.assertTrue(test_constants.INDEX3_NAME in diff.identical_indices) + # Conflicting indices + self.assertEqual(1, len(diff.conflicting_indices)) + self.assertTrue(test_constants.INDEX2_NAME in diff.conflicting_indices) + + def test_index_diff_mappings_conflict(self): + test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) + # Set up conflict in mappings + test_data[test_constants.INDEX3_NAME][test_constants.MAPPINGS_KEY] = {} + diff = IndexDiff(test_constants.BASE_INDICES_DATA, test_data) + # No indices to move + self.assertEqual(set(), diff.indices_to_create) + # Identical indices + self.assertEqual(2, len(diff.identical_indices)) + self.assertTrue(test_constants.INDEX1_NAME in diff.identical_indices) + self.assertTrue(test_constants.INDEX2_NAME in diff.identical_indices) + # Conflicting indices + self.assertEqual(1, len(diff.conflicting_indices)) + self.assertTrue(test_constants.INDEX3_NAME in diff.conflicting_indices) + + +if __name__ == '__main__': + unittest.main() diff --git a/FetchMigration/python/tests/test_index_operations.py b/FetchMigration/python/tests/test_index_operations.py index 1378da5fda..8830b34e58 100644 --- a/FetchMigration/python/tests/test_index_operations.py +++ b/FetchMigration/python/tests/test_index_operations.py @@ -60,12 +60,18 @@ def test_create_indices_exception(self): @responses.activate def test_doc_count(self): test_indices = {test_constants.INDEX1_NAME, test_constants.INDEX2_NAME} - expected_count_endpoint = test_constants.SOURCE_ENDPOINT + ",".join(test_indices) + "/_count" - mock_count_response = {"count": "10"} + index_doc_count: int = 5 + test_buckets = list() + for index_name in test_indices: + test_buckets.append({"key": index_name, "doc_count": index_doc_count}) + total_docs: int = index_doc_count * len(test_buckets) + expected_count_endpoint = test_constants.SOURCE_ENDPOINT + ",".join(test_indices) + "/_search?size=0" + mock_count_response = {"hits": {"total": {"value": total_docs}}, + "aggregations": {"count": {"buckets": test_buckets}}} responses.get(expected_count_endpoint, json=mock_count_response) # Now send request - count_value = index_operations.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT)) - self.assertEqual(10, count_value) + doc_count_result = index_operations.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT)) + self.assertEqual(total_docs, doc_count_result.total) if __name__ == '__main__': diff --git a/FetchMigration/python/tests/test_metadata_migration.py b/FetchMigration/python/tests/test_metadata_migration.py index 1696c50d6a..d295af4b8a 100644 --- a/FetchMigration/python/tests/test_metadata_migration.py +++ b/FetchMigration/python/tests/test_metadata_migration.py @@ -4,6 +4,7 @@ from unittest.mock import patch, MagicMock, ANY import metadata_migration +from index_doc_count import IndexDocCount from metadata_migration_params import MetadataMigrationParams from tests import test_constants @@ -14,78 +15,6 @@ def setUp(self) -> None: with open(test_constants.PIPELINE_CONFIG_PICKLE_FILE_PATH, "rb") as f: self.loaded_pipeline_config = pickle.load(f) - def test_get_index_differences_empty(self): - # Base case should return an empty list - result_tuple = metadata_migration.get_index_differences(dict(), dict()) - # Invariant - self.assertEqual(3, len(result_tuple)) - # All diffs should be empty - self.assertEqual(set(), result_tuple[0]) - self.assertEqual(set(), result_tuple[1]) - self.assertEqual(set(), result_tuple[2]) - - def test_get_index_differences_empty_target(self): - result_tuple = metadata_migration.get_index_differences(test_constants.BASE_INDICES_DATA, dict()) - # Invariant - self.assertEqual(3, len(result_tuple)) - # No conflicts or identical indices - self.assertEqual(set(), result_tuple[1]) - self.assertEqual(set(), result_tuple[2]) - # Indices-to-create - self.assertEqual(3, len(result_tuple[0])) - self.assertTrue(test_constants.INDEX1_NAME in result_tuple[0]) - self.assertTrue(test_constants.INDEX2_NAME in result_tuple[0]) - self.assertTrue(test_constants.INDEX3_NAME in result_tuple[0]) - - def test_get_index_differences_identical_index(self): - test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) - del test_data[test_constants.INDEX2_NAME] - del test_data[test_constants.INDEX3_NAME] - result_tuple = metadata_migration.get_index_differences(test_data, test_data) - # Invariant - self.assertEqual(3, len(result_tuple)) - # No indices to move, or conflicts - self.assertEqual(set(), result_tuple[0]) - self.assertEqual(set(), result_tuple[2]) - # Identical indices - self.assertEqual(1, len(result_tuple[1])) - self.assertTrue(test_constants.INDEX1_NAME in result_tuple[1]) - - def test_get_index_differences_settings_conflict(self): - test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) - # Set up conflict in settings - index_settings = test_data[test_constants.INDEX2_NAME][test_constants.SETTINGS_KEY] - index_settings[test_constants.INDEX_KEY][test_constants.NUM_REPLICAS_SETTING] += 1 - result_tuple = metadata_migration.get_index_differences(test_constants.BASE_INDICES_DATA, test_data) - # Invariant - self.assertEqual(3, len(result_tuple)) - # No indices to move - self.assertEqual(set(), result_tuple[0]) - # Identical indices - self.assertEqual(2, len(result_tuple[1])) - self.assertTrue(test_constants.INDEX1_NAME in result_tuple[1]) - self.assertTrue(test_constants.INDEX3_NAME in result_tuple[1]) - # Conflicting indices - self.assertEqual(1, len(result_tuple[2])) - self.assertTrue(test_constants.INDEX2_NAME in result_tuple[2]) - - def test_get_index_differences_mappings_conflict(self): - test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) - # Set up conflict in mappings - test_data[test_constants.INDEX3_NAME][test_constants.MAPPINGS_KEY] = {} - result_tuple = metadata_migration.get_index_differences(test_constants.BASE_INDICES_DATA, test_data) - # Invariant - self.assertEqual(3, len(result_tuple)) - # No indices to move - self.assertEqual(set(), result_tuple[0]) - # Identical indices - self.assertEqual(2, len(result_tuple[1])) - self.assertTrue(test_constants.INDEX1_NAME in result_tuple[1]) - self.assertTrue(test_constants.INDEX2_NAME in result_tuple[1]) - # Conflicting indices - self.assertEqual(1, len(result_tuple[2])) - self.assertTrue(test_constants.INDEX3_NAME in result_tuple[2]) - @patch('index_operations.doc_count') @patch('metadata_migration.write_output') @patch('metadata_migration.print_report') @@ -94,15 +23,11 @@ def test_get_index_differences_mappings_conflict(self): # Note that mock objects are passed bottom-up from the patch order above def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock, mock_print_report: MagicMock, mock_write_output: MagicMock, mock_doc_count: MagicMock): - mock_doc_count.return_value = 1 + mock_doc_count.return_value = IndexDocCount(1, dict()) index_to_create = test_constants.INDEX3_NAME index_with_conflict = test_constants.INDEX2_NAME - index_exact_match = test_constants.INDEX1_NAME # Set up expected arguments to mocks so we can verify expected_create_payload = {index_to_create: test_constants.BASE_INDICES_DATA[index_to_create]} - # Print report accepts a tuple. The elements of the tuple - # are in the order: to-create, exact-match, conflicts - expected_diff = {index_to_create}, {index_exact_match}, {index_with_conflict} # Create mock data for indices on target target_indices_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) del target_indices_data[index_to_create] @@ -115,7 +40,7 @@ def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: Ma metadata_migration.run(test_input) mock_create_indices.assert_called_once_with(expected_create_payload, ANY) mock_doc_count.assert_called() - mock_print_report.assert_called_once_with(expected_diff, 1) + mock_print_report.assert_called_once_with(ANY, 1) mock_write_output.assert_not_called() @patch('index_operations.doc_count') @@ -125,8 +50,35 @@ def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: Ma # Note that mock objects are passed bottom-up from the patch order above def test_run_dryrun(self, mock_fetch_indices: MagicMock, mock_write_output: MagicMock, mock_print_report: MagicMock, mock_doc_count: MagicMock): + index_to_migrate = test_constants.INDEX1_NAME + expected_output_path = "dummy" + test_doc_counts = {test_constants.INDEX2_NAME: 2, test_constants.INDEX3_NAME: 3} + # Doc counts are first fetched for the target cluster, + # then then source cluster is queried only for identical, empty indices + mock_doc_count.side_effect = [IndexDocCount(5, test_doc_counts), + IndexDocCount(1, {index_to_migrate: 1})] + mock_fetch_indices.return_value = test_constants.BASE_INDICES_DATA + test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, expected_output_path, + dryrun=True) + test_result = metadata_migration.run(test_input) + self.assertEqual(1, test_result.target_doc_count) + self.assertEqual({index_to_migrate}, test_result.migration_indices) + mock_write_output.assert_called_once_with(self.loaded_pipeline_config, {index_to_migrate}, expected_output_path) + mock_doc_count.assert_called() + # Report should not be printed + mock_print_report.assert_not_called() + + @patch('index_operations.doc_count') + @patch('metadata_migration.print_report') + @patch('metadata_migration.write_output') + @patch('index_operations.fetch_all_indices') + # Note that mock objects are passed bottom-up from the patch order above + def test_identical_empty_index(self, mock_fetch_indices: MagicMock, mock_write_output: MagicMock, + mock_print_report: MagicMock, mock_doc_count: MagicMock): + test_index_doc_counts = {test_constants.INDEX2_NAME: 2, + test_constants.INDEX3_NAME: 3} + mock_doc_count.return_value = IndexDocCount(1, test_index_doc_counts) index_to_create = test_constants.INDEX1_NAME - mock_doc_count.return_value = 1 expected_output_path = "dummy" # Create mock data for indices on target target_indices_data = copy.deepcopy(test_constants.BASE_INDICES_DATA) @@ -136,8 +88,8 @@ def test_run_dryrun(self, mock_fetch_indices: MagicMock, mock_write_output: Magi test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, expected_output_path, dryrun=True) test_result = metadata_migration.run(test_input) - self.assertEqual(mock_doc_count.return_value, test_result.target_doc_count) - self.assertEqual({index_to_create}, test_result.created_indices) + self.assertEqual(mock_doc_count.return_value.total, test_result.target_doc_count) + self.assertEqual({index_to_create}, test_result.migration_indices) mock_write_output.assert_called_once_with(self.loaded_pipeline_config, {index_to_create}, expected_output_path) mock_doc_count.assert_called() # Report should not be printed @@ -182,7 +134,7 @@ def test_no_indices_in_source(self, mock_fetch_indices: MagicMock): test_result = metadata_migration.run(test_input) mock_fetch_indices.assert_called_once() self.assertEqual(0, test_result.target_doc_count) - self.assertEqual(0, len(test_result.created_indices)) + self.assertEqual(0, len(test_result.migration_indices)) if __name__ == '__main__': diff --git a/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml b/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml index 9baec2325c..a8d3b7b43c 100644 --- a/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml +++ b/deployment/cdk/opensearch-service-migration/dp_pipeline_template.yaml @@ -1,18 +1,36 @@ +# Name of the Data Prepper pipeline historical-data-migration: + # Source cluster configuration source: opensearch: + # CDK code will replace this value, so DO NOT CHANGE this + # unless the file is being used outside of the CDK hosts: - + # Uncomment the following line to disable TLS + #insecure: true + # Example configuration on how to disable authentication (default: false) disable_authentication: true indices: + # Indices to exclude - exclude system indices by default exclude: - index_name_regex: \.* + # Target cluster configuration sink: - opensearch: - bulk_size: 10 + # Note - CDK code will replace this value with the target cluster endpoint hosts: - - https:// + - + # Derive index name from record metadata index: ${getMetadata("opensearch-index")} + # Use the same document ID as the source cluster document document_id: ${getMetadata("opensearch-document_id")} + # Example configuration for basic auth username: user password: pass + #disable_authentication: true + # Additional pipeline options/optimizations + # For maximum throughput, match workers to number of vCPUs (default: 1) + workers: 1 + # delay is how often the worker threads should process data (default: 3000 ms) + delay: 0 diff --git a/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts b/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts index de4c016f69..47ff189600 100644 --- a/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/fetch-migration-stack.ts @@ -38,8 +38,8 @@ export class FetchMigrationStack extends Stack { // ECS Task Definition const fetchMigrationFargateTask = new FargateTaskDefinition(this, "fetchMigrationFargateTask", { - memoryLimitMiB: 2048, - cpu: 512 + memoryLimitMiB: 4096, + cpu: 1024 }); new StringParameter(this, 'SSMParameterFetchMigrationTaskDefArn', {