Skip to content

Commit

Permalink
CMR-10141: Fixing the ability to subscribe SQS queues correctly to th…
Browse files Browse the repository at this point in the history
…e local and AWS topics. Moving subscription ARN to CMR_Subscriptions table. (#2188)

* CMR-10141: Fixing the ability to subscribe SQS queus correctly to the internal and AWS topics. Moving subscription ARN to CMR_Subscriptions table.

* CMR-10141: Fixing unit test.
  • Loading branch information
eereiter authored Nov 11, 2024
1 parent dd8ee96 commit 9445088
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 73 deletions.
23 changes: 13 additions & 10 deletions message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[cheshire.core :as json]
[cmr.common.dev.record-pretty-printer :as record-pretty-printer]
[cmr.common.log :refer [error]]
[cmr.common.services.errors :as errors]
[cmr.common.util :as util]
[cmr.message-queue.config :as config]
[cmr.message-queue.queue.aws-queue :as aws-queue]
Expand Down Expand Up @@ -72,13 +73,13 @@
filter policy is a hash map - for example:
{\"collection-concept-id\": \"C12345-PROV1\"
\"mode\": [\"New\", \"Update\"]}"
[sns-client subscription-arn subscription]
[sns-client subscription-arn subscription-metadata]
;; Turn the clojure filter policy to json
(when (or (:CollectionConceptId subscription)
(:Mode subscription))
(when (or (:CollectionConceptId subscription-metadata)
(:Mode subscription-metadata))
(let [filters (util/remove-nil-keys
{:collection-concept-id (:CollectionConceptId subscription)
:mode (:Mode subscription)})
{:collection-concept-id [(:CollectionConceptId subscription-metadata)]
:mode (:Mode subscription-metadata)})
filter-json (json/generate-string filters)
sub-filter-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
Expand Down Expand Up @@ -118,14 +119,16 @@
(try
(let [subscription-arn (subscribe-sqs-to-sns sns-client topic-arn (get-in subscription [:metadata :EndPoint]))]
(when subscription-arn
(set-filter-policy sns-client subscription-arn subscription)
(set-filter-policy sns-client subscription-arn (:metadata subscription))
(set-redrive-policy sns-client subscription-arn subscription-dead-letter-queue-arn))
subscription-arn)
(catch Exception e
(error (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s"
(:EndPoint subscription)
topic-arn
(.getMessage e))))))
(let [msg (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s"
(get-in subscription [:metadata :EndPoint])
topic-arn
(.getMessage e))]
(error msg)
(errors/throw-service-error :bad-aws-subscription msg)))))

(unsubscribe
[_this subscription-id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
(assoc-in [:extra-fields :subscription-type] (:subscription_type result))
(assoc-in [:extra-fields :subscription-name] (:subscription_name result))
(assoc-in [:extra-fields :subscriber-id] (:subscriber_id result))
(assoc-in [:extra-fields :aws-arn] (:awsarn result))
(add-last-notified-at-if-present result db)
(assoc-in [:extra-fields :collection-concept-id]
(:collection_concept_id result))))
Expand All @@ -32,16 +33,17 @@
subscriber-id
collection-concept-id
normalized-query
subscription-type]} :extra-fields
subscription-type
aws-arn]} :extra-fields
user-id :user-id
provider-id :provider-id} concept
[cols values] (concepts/concept->common-insert-args concept)]
[(concat cols ["provider_id" "user_id" "subscription_name"
"subscriber_id" "collection_concept_id" "normalized_query"
"subscription_type"])
"subscription_type" "aws_arn"])
(concat values [provider-id user-id subscription-name
subscriber-id collection-concept-id normalized-query
subscription-type])]))
subscription-type aws-arn])]))

(defmethod concepts/concept->insert-args [:subscription false]
[concept _]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
(defn dbresult->sub-notification
"Converts a map result from the database to a provider map"
[db data]
(let [{:keys [subscription_concept_id last_notified_at aws_arn]} data]
(let [{:keys [subscription_concept_id last_notified_at]} data]
(j/with-db-transaction [conn db]
{:subscription-concept-id subscription_concept_id
:last-notified-at (when last_notified_at
(oracle/oracle-timestamp->str-time conn last_notified_at))
:aws-arn aws_arn})))
(oracle/oracle-timestamp->str-time conn last_notified_at))})))

(defn subscription-exists?
"Check to see if the subscription exists"
Expand All @@ -39,7 +38,7 @@
(defn get-sub-notification
"Get subscription notification from Oracle."
[db subscription-id]
(let [sql (str "SELECT id, subscription_concept_id, last_notified_at, aws_arn "
(let [sql (str "SELECT id, subscription_concept_id, last_notified_at "
"FROM cmr_sub_notifications "
"WHERE subscription_concept_id = ?")
results (first (j/query db [sql subscription-id]))]
Expand All @@ -61,17 +60,6 @@
"WHERE subscription_concept_id = ?")]
(j/db-do-prepared db sql [(cr/to-sql-time last-notified-time) subscription-id])))

(defn update-sub-not-with-aws-arn
"Updates the subscription notification with the subscription arn.
If the subscription doesn't exist then create it as well."
[db subscription-id aws-arn]
(when-not (sub-notification-exists? db subscription-id)
(save-sub-notification db subscription-id))
(let [sql (str "UPDATE cmr_sub_notifications "
"SET aws_arn = ? "
"WHERE subscription_concept_id = ?")]
(j/db-do-prepared db sql [aws-arn subscription-id])))

(defn delete-sub-notification
"Delete a subscription notification record by id"
[db subscription-id]
Expand All @@ -90,5 +78,4 @@
(println (sub-notification-exists? db "SUB1234-test"))
(println (get-sub-notification db "SUB1234-test"))
(println (update-sub-notification db "SUB1234-test" "2024-11-01T02:17:09.749Z"))
(println (update-sub-not-with-aws-arn db "SUB1234-test" "arn:aws:sns:us-east-1:1234455667:SometestSubscription"))
(println (delete-sub-notification db "SUB1234-test")) )
(println (delete-sub-notification db "SUB1234-test")))
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(ns cmr.metadata-db.migrations.092-update-cmr-subscriptions-table
"Move the AWS subscription arn column from the cmr_sub_notifications table to the cmr_subscriptions table."
(:require
[config.mdb-migrate-helper :as h]))

(defn up
"Migrates the database up to version 92."
[]
(println "cmr.metadata-db.migrations.092-update-cmr-sub-notifications-table up...")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS DROP COLUMN AWS_ARN")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUBSCRIPTIONS ADD AWS_ARN VARCHAR(2048) NULL"))

(defn down
"Migrates the database down from version 92."
[]
(println "cmr.metadata-db.migrations.092-update-cmr-sub-notifications-table down.")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS ADD AWS_ARN VARCHAR(2048) NULL")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUBSCRIPTIONS DROP COLUMN AWS_ARN"))
24 changes: 16 additions & 8 deletions metadata-db-app/src/cmr/metadata_db/services/concept_service.clj
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,6 @@
(= :subscription concept-type))
(:metadata previous-revision)
"")
subscription-arn (when (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(sub-not/get-subscription-aws-arn context concept-id))
tombstone (create-tombstone-concept metadata concept previous-revision)]
(cv/validate-concept tombstone)
(validate-concept-revision-id db provider tombstone previous-revision)
Expand Down Expand Up @@ -843,7 +840,7 @@
context (ingest-events/concept-delete-event revisioned-tombstone)))
(when (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(subscriptions/delete-subscription context revisioned-tombstone subscription-arn))
(subscriptions/delete-subscription context revisioned-tombstone))
(subscriptions/work-potential-notification context revisioned-tombstone)
revisioned-tombstone)))
(if revision-id
Expand Down Expand Up @@ -893,6 +890,17 @@
context
(ingest-events/associations-update-event associations))))))

(defn set-subscription-arn
"Subscribes a subscription request to the CMR external topic and
saves the subscription ARN to the concept and returns the new concept."
[context concept-type concept]
(if (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(if-let [subscription-arn (subscriptions/add-subscription context concept)]
(assoc-in concept [:extra-fields :aws-arn] subscription-arn)
concept)
concept))

;; false implies creation of a non-tombstone revision
(defmethod save-concept-revision false
[context concept]
Expand All @@ -913,6 +921,7 @@
{:keys [concept-type concept-id]} concept]
(validate-concept-revision-id db provider concept)
(let [concept (->> concept
(set-subscription-arn context concept-type)
(set-or-generate-revision-id db provider)
(set-deleted-flag false)
(try-to-save db provider context))
Expand All @@ -931,10 +940,9 @@
(ingest-events/publish-event
context
(ingest-events/concept-update-event concept))
(when (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(when-let [subscription-arn (subscriptions/add-subscription context concept)]
(sub-not/update-subscription-with-aws-arn context concept-id subscription-arn)))
;; Add the ingest subscriptions to the cache. The subscriptions were saved to the database
;; above so now we can put it into the cache.
(subscriptions/add-delete-subscription context concept)
(subscriptions/work-potential-notification context concept)
concept)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
can sometimes be nil."
[concept]
(nil-fields-validation (apply dissoc (:extra-fields concept)
[:delete-time :version-id :source-revision-id :associated-revision-id :target-provider-id :collection-concept-id :endpoint :mode :method])))
[:delete-time :version-id :source-revision-id :associated-revision-id :target-provider-id :collection-concept-id :endpoint :mode :method :aws-arn])))

(defn concept-id-validation
[concept]
Expand Down
22 changes: 0 additions & 22 deletions metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,3 @@
(errors/throw-service-error
:not-found
(msg/subscription-not-found subscription-id)))))

(defn update-subscription-with-aws-arn
"Update the sub_notifications DB table with the subscription arn value."
[context subscription-id subscription-arn]
(let [errors (common-concepts/concept-id-validation subscription-id)
db (mdb-util/context->db context)]
(if (nil? errors)
(sub-note/update-sub-not-with-aws-arn db subscription-id subscription-arn)
(errors/throw-service-error
:not-found
(msg/subscription-not-found subscription-id)))))

(defn get-subscription-aws-arn
"Get the subscription ARN value from the sub_notifications DB table."
[context subscription-id]
(let [errors (common-concepts/concept-id-validation subscription-id)
db (mdb-util/context->db context)]
(if (nil? errors)
(:aws-arn (sub-note/get-sub-notification db subscription-id))
(errors/throw-service-error
:not-found
(msg/subscription-not-found subscription-id)))))
11 changes: 6 additions & 5 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,26 @@
(when (and subscriptions-enabled?
(= :subscription (:concept-type concept)))
(let [concept-edn (convert-concept-to-edn concept)]
(change-subscription context concept-edn)
concept-edn)))
(when (ingest-subscription-concept? concept-edn)
(change-subscription context concept-edn)
concept-edn))))

(defn add-subscription
"Add the subscription to the cache and subscribe the subscription to
the topic."
[context concept]
(when-let [concept-edn (add-delete-subscription context concept)]
(when-let [concept-edn (convert-concept-to-edn concept)]
(let [topic (get-in context [:system :sns :external])]
(topic-protocol/subscribe topic concept-edn))))

(defn delete-subscription
"Remove the subscription from the cache and unsubscribe the subscription from
the topic."
[context concept subscription-arn]
[context concept]
(when-let [concept-edn (add-delete-subscription context concept)]
(let [topic (get-in context [:system :sns :external])]
(topic-protocol/unsubscribe topic {:concept-id (:concept-id concept-edn)
:subscription-arn subscription-arn}))))
:subscription-arn (get-in concept-edn [:extra-fields :aws-arn])}))))

;;
;; The functions below are for refreshing the subscription cache if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@
(testing "adding and deleting subscriptions from the cache calling add-delete-subscription"
(let [db-contents '()]
(with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-contents)}
(is (= {:metadata {:CollectionConceptId "C12345-PROV1"}
(is (= {:metadata {:CollectionConceptId "C12345-PROV1"
:EndPoint "ARN"
:Mode ["New" "Delete"]
:Method "ingest"}
:concept-type :subscription}
(subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\"}"
(subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\",
\"EndPoint\":\"ARN\",
\"Mode\":[\"New\", \"Delete\"],
\"Method\":\"ingest\"}"
:concept-type :subscription}))))))))

(def db-result-1
Expand Down Expand Up @@ -567,9 +573,11 @@
:extra-fields {:parent-collection-id "C1200000002-PROV1"}}]

;; if successful, the subscription concept-id is returned for local topic.
(subscriptions/add-delete-subscription test-context sub-concept)
(is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept)))

;; the subscription is replaced when the subscription already exists.
(subscriptions/add-delete-subscription test-context sub-concept)
(is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept)))

;; For this test add the subscription to the internal topic to test publishing.
Expand Down Expand Up @@ -609,9 +617,8 @@
(let [sub-concept {:metadata concept-metadata
:deleted true
:concept-type :subscription
:concept-id "SUB1200000005-PROV1"}
subscription-arn nil]
(is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept subscription-arn)))
:concept-id "SUB1200000005-PROV1"}]
(is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept)))
;; Also remove subscription from internal queue.
(let [topic (get-in test-context [:system :sns :internal])]
(topic-protocol/unsubscribe topic sub-concept)))))))
Expand Down Expand Up @@ -654,10 +661,13 @@
\"DataGranule\": {\"Identifiers\": [{\"IdentifierType\": \"ProducerGranuleId\",
\"Identifier\": \"Algorithm-1\"}]}}"
:extra-fields {:parent-collection-id "C1200000002-PROV1"}}
subscription-arn (subscriptions/add-subscription test-context sub-concept)]
_ (subscriptions/add-delete-subscription test-context sub-concept)
subscription-arn (subscriptions/add-subscription test-context sub-concept)
sub-concept (assoc-in sub-concept [:extra-fields :aws-arn] subscription-arn)]

(is (some? subscription-arn))
(when subscription-arn
(is (some? (subscriptions/delete-subscription test-context sub-concept subscription-arn))))
(is (some? (subscriptions/delete-subscription test-context sub-concept))))

;; publish message. this should publish to the internal queue
(is (some? (subscriptions/work-potential-notification test-context granule-concept)))
Expand Down

0 comments on commit 9445088

Please sign in to comment.