From 9445088c17d5b69fb8b8a80064d63cc929256a54 Mon Sep 17 00:00:00 2001 From: eereiter Date: Mon, 11 Nov 2024 06:13:05 -0500 Subject: [PATCH 1/2] CMR-10141: Fixing the ability to subscribe SQS queues correctly to the local and AWS topics. Moving subscription ARN to CMR_Subscriptions table. (#2188) * CMR-10141: Fixing the ability to subscribe SQS queus correctly to the internal and AWS topics. Moving subscription ARN to CMR_Subscriptions table. * CMR-10141: Fixing unit test. --- .../src/cmr/message_queue/topic/aws_topic.clj | 23 ++++++++++-------- .../data/oracle/concepts/subscription.clj | 8 ++++--- .../data/oracle/sub_notifications.clj | 21 ++++------------ .../092_update_cmr_subscriptions_table.clj | 18 ++++++++++++++ .../metadata_db/services/concept_service.clj | 24 ++++++++++++------- .../services/concept_validations.clj | 2 +- .../services/sub_notifications.clj | 22 ----------------- .../metadata_db/services/subscriptions.clj | 11 +++++---- .../test/services/subscriptions_test.clj | 24 +++++++++++++------ 9 files changed, 80 insertions(+), 73 deletions(-) create mode 100644 metadata-db-app/src/cmr/metadata_db/migrations/092_update_cmr_subscriptions_table.clj diff --git a/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj b/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj index 3a7d13d847..237bccb27f 100644 --- a/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj +++ b/message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj @@ -4,6 +4,7 @@ [cheshire.core :as json] [cmr.common.dev.record-pretty-printer :as record-pretty-printer] [cmr.common.log :refer [error]] + [cmr.common.services.errors :as errors] [cmr.common.util :as util] [cmr.message-queue.config :as config] [cmr.message-queue.queue.aws-queue :as aws-queue] @@ -72,13 +73,13 @@ filter policy is a hash map - for example: {\"collection-concept-id\": \"C12345-PROV1\" \"mode\": [\"New\", \"Update\"]}" - [sns-client subscription-arn subscription] + [sns-client subscription-arn subscription-metadata] ;; Turn the clojure filter policy to json - (when (or (:CollectionConceptId subscription) - (:Mode subscription)) + (when (or (:CollectionConceptId subscription-metadata) + (:Mode subscription-metadata)) (let [filters (util/remove-nil-keys - {:collection-concept-id (:CollectionConceptId subscription) - :mode (:Mode subscription)}) + {:collection-concept-id [(:CollectionConceptId subscription-metadata)] + :mode (:Mode subscription-metadata)}) filter-json (json/generate-string filters) sub-filter-request (-> (SetSubscriptionAttributesRequest/builder) (.subscriptionArn subscription-arn) @@ -118,14 +119,16 @@ (try (let [subscription-arn (subscribe-sqs-to-sns sns-client topic-arn (get-in subscription [:metadata :EndPoint]))] (when subscription-arn - (set-filter-policy sns-client subscription-arn subscription) + (set-filter-policy sns-client subscription-arn (:metadata subscription)) (set-redrive-policy sns-client subscription-arn subscription-dead-letter-queue-arn)) subscription-arn) (catch Exception e - (error (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s" - (:EndPoint subscription) - topic-arn - (.getMessage e)))))) + (let [msg (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s" + (get-in subscription [:metadata :EndPoint]) + topic-arn + (.getMessage e))] + (error msg) + (errors/throw-service-error :bad-aws-subscription msg))))) (unsubscribe [_this subscription-id] diff --git a/metadata-db-app/src/cmr/metadata_db/data/oracle/concepts/subscription.clj b/metadata-db-app/src/cmr/metadata_db/data/oracle/concepts/subscription.clj index 258504d94a..53563271f5 100644 --- a/metadata-db-app/src/cmr/metadata_db/data/oracle/concepts/subscription.clj +++ b/metadata-db-app/src/cmr/metadata_db/data/oracle/concepts/subscription.clj @@ -22,6 +22,7 @@ (assoc-in [:extra-fields :subscription-type] (:subscription_type result)) (assoc-in [:extra-fields :subscription-name] (:subscription_name result)) (assoc-in [:extra-fields :subscriber-id] (:subscriber_id result)) + (assoc-in [:extra-fields :aws-arn] (:awsarn result)) (add-last-notified-at-if-present result db) (assoc-in [:extra-fields :collection-concept-id] (:collection_concept_id result)))) @@ -32,16 +33,17 @@ subscriber-id collection-concept-id normalized-query - subscription-type]} :extra-fields + subscription-type + aws-arn]} :extra-fields user-id :user-id provider-id :provider-id} concept [cols values] (concepts/concept->common-insert-args concept)] [(concat cols ["provider_id" "user_id" "subscription_name" "subscriber_id" "collection_concept_id" "normalized_query" - "subscription_type"]) + "subscription_type" "aws_arn"]) (concat values [provider-id user-id subscription-name subscriber-id collection-concept-id normalized-query - subscription-type])])) + subscription-type aws-arn])])) (defmethod concepts/concept->insert-args [:subscription false] [concept _] diff --git a/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj b/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj index e14df1d118..59301773ab 100644 --- a/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj +++ b/metadata-db-app/src/cmr/metadata_db/data/oracle/sub_notifications.clj @@ -12,12 +12,11 @@ (defn dbresult->sub-notification "Converts a map result from the database to a provider map" [db data] - (let [{:keys [subscription_concept_id last_notified_at aws_arn]} data] + (let [{:keys [subscription_concept_id last_notified_at]} data] (j/with-db-transaction [conn db] {:subscription-concept-id subscription_concept_id :last-notified-at (when last_notified_at - (oracle/oracle-timestamp->str-time conn last_notified_at)) - :aws-arn aws_arn}))) + (oracle/oracle-timestamp->str-time conn last_notified_at))}))) (defn subscription-exists? "Check to see if the subscription exists" @@ -39,7 +38,7 @@ (defn get-sub-notification "Get subscription notification from Oracle." [db subscription-id] - (let [sql (str "SELECT id, subscription_concept_id, last_notified_at, aws_arn " + (let [sql (str "SELECT id, subscription_concept_id, last_notified_at " "FROM cmr_sub_notifications " "WHERE subscription_concept_id = ?") results (first (j/query db [sql subscription-id]))] @@ -61,17 +60,6 @@ "WHERE subscription_concept_id = ?")] (j/db-do-prepared db sql [(cr/to-sql-time last-notified-time) subscription-id]))) -(defn update-sub-not-with-aws-arn - "Updates the subscription notification with the subscription arn. - If the subscription doesn't exist then create it as well." - [db subscription-id aws-arn] - (when-not (sub-notification-exists? db subscription-id) - (save-sub-notification db subscription-id)) - (let [sql (str "UPDATE cmr_sub_notifications " - "SET aws_arn = ? " - "WHERE subscription_concept_id = ?")] - (j/db-do-prepared db sql [aws-arn subscription-id]))) - (defn delete-sub-notification "Delete a subscription notification record by id" [db subscription-id] @@ -90,5 +78,4 @@ (println (sub-notification-exists? db "SUB1234-test")) (println (get-sub-notification db "SUB1234-test")) (println (update-sub-notification db "SUB1234-test" "2024-11-01T02:17:09.749Z")) - (println (update-sub-not-with-aws-arn db "SUB1234-test" "arn:aws:sns:us-east-1:1234455667:SometestSubscription")) - (println (delete-sub-notification db "SUB1234-test")) ) + (println (delete-sub-notification db "SUB1234-test"))) diff --git a/metadata-db-app/src/cmr/metadata_db/migrations/092_update_cmr_subscriptions_table.clj b/metadata-db-app/src/cmr/metadata_db/migrations/092_update_cmr_subscriptions_table.clj new file mode 100644 index 0000000000..9dabd079a0 --- /dev/null +++ b/metadata-db-app/src/cmr/metadata_db/migrations/092_update_cmr_subscriptions_table.clj @@ -0,0 +1,18 @@ +(ns cmr.metadata-db.migrations.092-update-cmr-subscriptions-table + "Move the AWS subscription arn column from the cmr_sub_notifications table to the cmr_subscriptions table." + (:require + [config.mdb-migrate-helper :as h])) + +(defn up + "Migrates the database up to version 92." + [] + (println "cmr.metadata-db.migrations.092-update-cmr-sub-notifications-table up...") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS DROP COLUMN AWS_ARN") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUBSCRIPTIONS ADD AWS_ARN VARCHAR(2048) NULL")) + +(defn down + "Migrates the database down from version 92." + [] + (println "cmr.metadata-db.migrations.092-update-cmr-sub-notifications-table down.") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS ADD AWS_ARN VARCHAR(2048) NULL") + (h/sql "ALTER TABLE METADATA_DB.CMR_SUBSCRIPTIONS DROP COLUMN AWS_ARN")) diff --git a/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj b/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj index 715e2d66d7..abf1035d15 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/concept_service.clj @@ -793,9 +793,6 @@ (= :subscription concept-type)) (:metadata previous-revision) "") - subscription-arn (when (and subscriptions/subscriptions-enabled? - (= :subscription concept-type)) - (sub-not/get-subscription-aws-arn context concept-id)) tombstone (create-tombstone-concept metadata concept previous-revision)] (cv/validate-concept tombstone) (validate-concept-revision-id db provider tombstone previous-revision) @@ -843,7 +840,7 @@ context (ingest-events/concept-delete-event revisioned-tombstone))) (when (and subscriptions/subscriptions-enabled? (= :subscription concept-type)) - (subscriptions/delete-subscription context revisioned-tombstone subscription-arn)) + (subscriptions/delete-subscription context revisioned-tombstone)) (subscriptions/work-potential-notification context revisioned-tombstone) revisioned-tombstone))) (if revision-id @@ -893,6 +890,17 @@ context (ingest-events/associations-update-event associations)))))) +(defn set-subscription-arn + "Subscribes a subscription request to the CMR external topic and + saves the subscription ARN to the concept and returns the new concept." + [context concept-type concept] + (if (and subscriptions/subscriptions-enabled? + (= :subscription concept-type)) + (if-let [subscription-arn (subscriptions/add-subscription context concept)] + (assoc-in concept [:extra-fields :aws-arn] subscription-arn) + concept) + concept)) + ;; false implies creation of a non-tombstone revision (defmethod save-concept-revision false [context concept] @@ -913,6 +921,7 @@ {:keys [concept-type concept-id]} concept] (validate-concept-revision-id db provider concept) (let [concept (->> concept + (set-subscription-arn context concept-type) (set-or-generate-revision-id db provider) (set-deleted-flag false) (try-to-save db provider context)) @@ -931,10 +940,9 @@ (ingest-events/publish-event context (ingest-events/concept-update-event concept)) - (when (and subscriptions/subscriptions-enabled? - (= :subscription concept-type)) - (when-let [subscription-arn (subscriptions/add-subscription context concept)] - (sub-not/update-subscription-with-aws-arn context concept-id subscription-arn))) + ;; Add the ingest subscriptions to the cache. The subscriptions were saved to the database + ;; above so now we can put it into the cache. + (subscriptions/add-delete-subscription context concept) (subscriptions/work-potential-notification context concept) concept))) diff --git a/metadata-db-app/src/cmr/metadata_db/services/concept_validations.clj b/metadata-db-app/src/cmr/metadata_db/services/concept_validations.clj index 156592fd74..ed59a6973b 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/concept_validations.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/concept_validations.clj @@ -96,7 +96,7 @@ can sometimes be nil." [concept] (nil-fields-validation (apply dissoc (:extra-fields concept) - [:delete-time :version-id :source-revision-id :associated-revision-id :target-provider-id :collection-concept-id :endpoint :mode :method]))) + [:delete-time :version-id :source-revision-id :associated-revision-id :target-provider-id :collection-concept-id :endpoint :mode :method :aws-arn]))) (defn concept-id-validation [concept] diff --git a/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj b/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj index b66d0d4f70..2e3620a3e9 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj @@ -29,25 +29,3 @@ (errors/throw-service-error :not-found (msg/subscription-not-found subscription-id))))) - -(defn update-subscription-with-aws-arn - "Update the sub_notifications DB table with the subscription arn value." - [context subscription-id subscription-arn] - (let [errors (common-concepts/concept-id-validation subscription-id) - db (mdb-util/context->db context)] - (if (nil? errors) - (sub-note/update-sub-not-with-aws-arn db subscription-id subscription-arn) - (errors/throw-service-error - :not-found - (msg/subscription-not-found subscription-id))))) - -(defn get-subscription-aws-arn - "Get the subscription ARN value from the sub_notifications DB table." - [context subscription-id] - (let [errors (common-concepts/concept-id-validation subscription-id) - db (mdb-util/context->db context)] - (if (nil? errors) - (:aws-arn (sub-note/get-sub-notification db subscription-id)) - (errors/throw-service-error - :not-found - (msg/subscription-not-found subscription-id))))) diff --git a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj index 7230c31f70..2b2732ebb6 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj @@ -106,25 +106,26 @@ (when (and subscriptions-enabled? (= :subscription (:concept-type concept))) (let [concept-edn (convert-concept-to-edn concept)] - (change-subscription context concept-edn) - concept-edn))) + (when (ingest-subscription-concept? concept-edn) + (change-subscription context concept-edn) + concept-edn)))) (defn add-subscription "Add the subscription to the cache and subscribe the subscription to the topic." [context concept] - (when-let [concept-edn (add-delete-subscription context concept)] + (when-let [concept-edn (convert-concept-to-edn concept)] (let [topic (get-in context [:system :sns :external])] (topic-protocol/subscribe topic concept-edn)))) (defn delete-subscription "Remove the subscription from the cache and unsubscribe the subscription from the topic." - [context concept subscription-arn] + [context concept] (when-let [concept-edn (add-delete-subscription context concept)] (let [topic (get-in context [:system :sns :external])] (topic-protocol/unsubscribe topic {:concept-id (:concept-id concept-edn) - :subscription-arn subscription-arn})))) + :subscription-arn (get-in concept-edn [:extra-fields :aws-arn])})))) ;; ;; The functions below are for refreshing the subscription cache if needed. diff --git a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj index e398179ea8..2b2e340619 100644 --- a/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj +++ b/metadata-db-app/test/cmr/metadata_db/test/services/subscriptions_test.clj @@ -229,9 +229,15 @@ (testing "adding and deleting subscriptions from the cache calling add-delete-subscription" (let [db-contents '()] (with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-contents)} - (is (= {:metadata {:CollectionConceptId "C12345-PROV1"} + (is (= {:metadata {:CollectionConceptId "C12345-PROV1" + :EndPoint "ARN" + :Mode ["New" "Delete"] + :Method "ingest"} :concept-type :subscription} - (subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\"}" + (subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\", + \"EndPoint\":\"ARN\", + \"Mode\":[\"New\", \"Delete\"], + \"Method\":\"ingest\"}" :concept-type :subscription})))))))) (def db-result-1 @@ -567,9 +573,11 @@ :extra-fields {:parent-collection-id "C1200000002-PROV1"}}] ;; if successful, the subscription concept-id is returned for local topic. + (subscriptions/add-delete-subscription test-context sub-concept) (is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept))) ;; the subscription is replaced when the subscription already exists. + (subscriptions/add-delete-subscription test-context sub-concept) (is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept))) ;; For this test add the subscription to the internal topic to test publishing. @@ -609,9 +617,8 @@ (let [sub-concept {:metadata concept-metadata :deleted true :concept-type :subscription - :concept-id "SUB1200000005-PROV1"} - subscription-arn nil] - (is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept subscription-arn))) + :concept-id "SUB1200000005-PROV1"}] + (is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept))) ;; Also remove subscription from internal queue. (let [topic (get-in test-context [:system :sns :internal])] (topic-protocol/unsubscribe topic sub-concept))))))) @@ -654,10 +661,13 @@ \"DataGranule\": {\"Identifiers\": [{\"IdentifierType\": \"ProducerGranuleId\", \"Identifier\": \"Algorithm-1\"}]}}" :extra-fields {:parent-collection-id "C1200000002-PROV1"}} - subscription-arn (subscriptions/add-subscription test-context sub-concept)] + _ (subscriptions/add-delete-subscription test-context sub-concept) + subscription-arn (subscriptions/add-subscription test-context sub-concept) + sub-concept (assoc-in sub-concept [:extra-fields :aws-arn] subscription-arn)] + (is (some? subscription-arn)) (when subscription-arn - (is (some? (subscriptions/delete-subscription test-context sub-concept subscription-arn)))) + (is (some? (subscriptions/delete-subscription test-context sub-concept)))) ;; publish message. this should publish to the internal queue (is (some? (subscriptions/work-potential-notification test-context granule-concept))) From 6a7e0696f935e76d7195f8a90da1149411b2e180 Mon Sep 17 00:00:00 2001 From: eereiter Date: Tue, 12 Nov 2024 14:20:48 -0500 Subject: [PATCH 2/2] CMR-10141-1: Fixing broken build (#2189) * CMR-10141-1: Fixing broken build * CMR-10141-1: Fixing broken build --- metadata-db-app/int-test/cmr/metadata_db/int_test/utility.clj | 4 +++- .../src/cmr/metadata_db/services/sub_notifications.clj | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/metadata-db-app/int-test/cmr/metadata_db/int_test/utility.clj b/metadata-db-app/int-test/cmr/metadata_db/int_test/utility.clj index 1028bf6dc6..d0191b41d0 100644 --- a/metadata-db-app/int-test/cmr/metadata_db/int_test/utility.clj +++ b/metadata-db-app/int-test/cmr/metadata_db/int_test/utility.clj @@ -391,7 +391,9 @@ (is (= (dissoc (expected-concept concept) :coll-concept-id) (dissoc stored-concept :revision-date :transaction-id :created-at :coll-concept-id))) (is (= (expected-concept concept) - (dissoc stored-concept :revision-date :transaction-id :created-at)))))) + (util/dissoc-in + (dissoc stored-concept :revision-date :transaction-id :created-at) + [:extra-fields :aws-arn])))))) (defn is-tag-association-deleted? "Returns if the ta is marked as deleted in metadata-db" diff --git a/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj b/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj index 2e3620a3e9..990b29750b 100644 --- a/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj +++ b/metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj @@ -15,7 +15,9 @@ (if (sub-note/subscription-exists? db subscription-id) (if (sub-note/sub-notification-exists? db subscription-id) (sub-note/update-sub-notification db subscription-id last-notified-time) - (sub-note/save-sub-notification db subscription-id)) + (do + (sub-note/save-sub-notification db subscription-id) + (sub-note/update-sub-notification db subscription-id last-notified-time))) (errors/throw-service-error :not-found (msg/subscription-not-found subscription-id)))) (defn update-subscription-notification