Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…sitory into CMR-10000
  • Loading branch information
jmaeng72 committed Nov 12, 2024
2 parents e867da8 + 6a7e069 commit 20cd68d
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 75 deletions.
23 changes: 13 additions & 10 deletions message-queue-lib/src/cmr/message_queue/topic/aws_topic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[cheshire.core :as json]
[cmr.common.dev.record-pretty-printer :as record-pretty-printer]
[cmr.common.log :refer [error]]
[cmr.common.services.errors :as errors]
[cmr.common.util :as util]
[cmr.message-queue.config :as config]
[cmr.message-queue.queue.aws-queue :as aws-queue]
Expand Down Expand Up @@ -72,13 +73,13 @@
filter policy is a hash map - for example:
{\"collection-concept-id\": \"C12345-PROV1\"
\"mode\": [\"New\", \"Update\"]}"
[sns-client subscription-arn subscription]
[sns-client subscription-arn subscription-metadata]
;; Turn the clojure filter policy to json
(when (or (:CollectionConceptId subscription)
(:Mode subscription))
(when (or (:CollectionConceptId subscription-metadata)
(:Mode subscription-metadata))
(let [filters (util/remove-nil-keys
{:collection-concept-id (:CollectionConceptId subscription)
:mode (:Mode subscription)})
{:collection-concept-id [(:CollectionConceptId subscription-metadata)]
:mode (:Mode subscription-metadata)})
filter-json (json/generate-string filters)
sub-filter-request (-> (SetSubscriptionAttributesRequest/builder)
(.subscriptionArn subscription-arn)
Expand Down Expand Up @@ -118,14 +119,16 @@
(try
(let [subscription-arn (subscribe-sqs-to-sns sns-client topic-arn (get-in subscription [:metadata :EndPoint]))]
(when subscription-arn
(set-filter-policy sns-client subscription-arn subscription)
(set-filter-policy sns-client subscription-arn (:metadata subscription))
(set-redrive-policy sns-client subscription-arn subscription-dead-letter-queue-arn))
subscription-arn)
(catch Exception e
(error (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s"
(:EndPoint subscription)
topic-arn
(.getMessage e))))))
(let [msg (format "Exception caught trying to subscribe the queue %s to the %s SNS Topic. Exception: %s"
(get-in subscription [:metadata :EndPoint])
topic-arn
(.getMessage e))]
(error msg)
(errors/throw-service-error :bad-aws-subscription msg)))))

(unsubscribe
[_this subscription-id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@
(is (= (dissoc (expected-concept concept) :coll-concept-id)
(dissoc stored-concept :revision-date :transaction-id :created-at :coll-concept-id)))
(is (= (expected-concept concept)
(dissoc stored-concept :revision-date :transaction-id :created-at))))))
(util/dissoc-in
(dissoc stored-concept :revision-date :transaction-id :created-at)
[:extra-fields :aws-arn]))))))

(defn is-tag-association-deleted?
"Returns if the ta is marked as deleted in metadata-db"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
(assoc-in [:extra-fields :subscription-type] (:subscription_type result))
(assoc-in [:extra-fields :subscription-name] (:subscription_name result))
(assoc-in [:extra-fields :subscriber-id] (:subscriber_id result))
(assoc-in [:extra-fields :aws-arn] (:awsarn result))
(add-last-notified-at-if-present result db)
(assoc-in [:extra-fields :collection-concept-id]
(:collection_concept_id result))))
Expand All @@ -32,16 +33,17 @@
subscriber-id
collection-concept-id
normalized-query
subscription-type]} :extra-fields
subscription-type
aws-arn]} :extra-fields
user-id :user-id
provider-id :provider-id} concept
[cols values] (concepts/concept->common-insert-args concept)]
[(concat cols ["provider_id" "user_id" "subscription_name"
"subscriber_id" "collection_concept_id" "normalized_query"
"subscription_type"])
"subscription_type" "aws_arn"])
(concat values [provider-id user-id subscription-name
subscriber-id collection-concept-id normalized-query
subscription-type])]))
subscription-type aws-arn])]))

(defmethod concepts/concept->insert-args [:subscription false]
[concept _]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
(defn dbresult->sub-notification
"Converts a map result from the database to a provider map"
[db data]
(let [{:keys [subscription_concept_id last_notified_at aws_arn]} data]
(let [{:keys [subscription_concept_id last_notified_at]} data]
(j/with-db-transaction [conn db]
{:subscription-concept-id subscription_concept_id
:last-notified-at (when last_notified_at
(oracle/oracle-timestamp->str-time conn last_notified_at))
:aws-arn aws_arn})))
(oracle/oracle-timestamp->str-time conn last_notified_at))})))

(defn subscription-exists?
"Check to see if the subscription exists"
Expand All @@ -39,7 +38,7 @@
(defn get-sub-notification
"Get subscription notification from Oracle."
[db subscription-id]
(let [sql (str "SELECT id, subscription_concept_id, last_notified_at, aws_arn "
(let [sql (str "SELECT id, subscription_concept_id, last_notified_at "
"FROM cmr_sub_notifications "
"WHERE subscription_concept_id = ?")
results (first (j/query db [sql subscription-id]))]
Expand All @@ -61,17 +60,6 @@
"WHERE subscription_concept_id = ?")]
(j/db-do-prepared db sql [(cr/to-sql-time last-notified-time) subscription-id])))

(defn update-sub-not-with-aws-arn
"Updates the subscription notification with the subscription arn.
If the subscription doesn't exist then create it as well."
[db subscription-id aws-arn]
(when-not (sub-notification-exists? db subscription-id)
(save-sub-notification db subscription-id))
(let [sql (str "UPDATE cmr_sub_notifications "
"SET aws_arn = ? "
"WHERE subscription_concept_id = ?")]
(j/db-do-prepared db sql [aws-arn subscription-id])))

(defn delete-sub-notification
"Delete a subscription notification record by id"
[db subscription-id]
Expand All @@ -90,5 +78,4 @@
(println (sub-notification-exists? db "SUB1234-test"))
(println (get-sub-notification db "SUB1234-test"))
(println (update-sub-notification db "SUB1234-test" "2024-11-01T02:17:09.749Z"))
(println (update-sub-not-with-aws-arn db "SUB1234-test" "arn:aws:sns:us-east-1:1234455667:SometestSubscription"))
(println (delete-sub-notification db "SUB1234-test")) )
(println (delete-sub-notification db "SUB1234-test")))
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(ns cmr.metadata-db.migrations.092-update-cmr-subscriptions-table
"Move the AWS subscription arn column from the cmr_sub_notifications table to the cmr_subscriptions table."
(:require
[config.mdb-migrate-helper :as h]))

(defn up
"Migrates the database up to version 92."
[]
(println "cmr.metadata-db.migrations.092-update-cmr-sub-notifications-table up...")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS DROP COLUMN AWS_ARN")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUBSCRIPTIONS ADD AWS_ARN VARCHAR(2048) NULL"))

(defn down
"Migrates the database down from version 92."
[]
(println "cmr.metadata-db.migrations.092-update-cmr-sub-notifications-table down.")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUB_NOTIFICATIONS ADD AWS_ARN VARCHAR(2048) NULL")
(h/sql "ALTER TABLE METADATA_DB.CMR_SUBSCRIPTIONS DROP COLUMN AWS_ARN"))
24 changes: 16 additions & 8 deletions metadata-db-app/src/cmr/metadata_db/services/concept_service.clj
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,6 @@
(= :subscription concept-type))
(:metadata previous-revision)
"")
subscription-arn (when (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(sub-not/get-subscription-aws-arn context concept-id))
tombstone (create-tombstone-concept metadata concept previous-revision)]
(cv/validate-concept tombstone)
(validate-concept-revision-id db provider tombstone previous-revision)
Expand Down Expand Up @@ -843,7 +840,7 @@
context (ingest-events/concept-delete-event revisioned-tombstone)))
(when (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(subscriptions/delete-subscription context revisioned-tombstone subscription-arn))
(subscriptions/delete-subscription context revisioned-tombstone))
(subscriptions/work-potential-notification context revisioned-tombstone)
revisioned-tombstone)))
(if revision-id
Expand Down Expand Up @@ -893,6 +890,17 @@
context
(ingest-events/associations-update-event associations))))))

(defn set-subscription-arn
"Subscribes a subscription request to the CMR external topic and
saves the subscription ARN to the concept and returns the new concept."
[context concept-type concept]
(if (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(if-let [subscription-arn (subscriptions/add-subscription context concept)]
(assoc-in concept [:extra-fields :aws-arn] subscription-arn)
concept)
concept))

;; false implies creation of a non-tombstone revision
(defmethod save-concept-revision false
[context concept]
Expand All @@ -913,6 +921,7 @@
{:keys [concept-type concept-id]} concept]
(validate-concept-revision-id db provider concept)
(let [concept (->> concept
(set-subscription-arn context concept-type)
(set-or-generate-revision-id db provider)
(set-deleted-flag false)
(try-to-save db provider context))
Expand All @@ -931,10 +940,9 @@
(ingest-events/publish-event
context
(ingest-events/concept-update-event concept))
(when (and subscriptions/subscriptions-enabled?
(= :subscription concept-type))
(when-let [subscription-arn (subscriptions/add-subscription context concept)]
(sub-not/update-subscription-with-aws-arn context concept-id subscription-arn)))
;; Add the ingest subscriptions to the cache. The subscriptions were saved to the database
;; above so now we can put it into the cache.
(subscriptions/add-delete-subscription context concept)
(subscriptions/work-potential-notification context concept)
concept)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
can sometimes be nil."
[concept]
(nil-fields-validation (apply dissoc (:extra-fields concept)
[:delete-time :version-id :source-revision-id :associated-revision-id :target-provider-id :collection-concept-id :endpoint :mode :method])))
[:delete-time :version-id :source-revision-id :associated-revision-id :target-provider-id :collection-concept-id :endpoint :mode :method :aws-arn])))

(defn concept-id-validation
[concept]
Expand Down
26 changes: 3 additions & 23 deletions metadata-db-app/src/cmr/metadata_db/services/sub_notifications.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
(if (sub-note/subscription-exists? db subscription-id)
(if (sub-note/sub-notification-exists? db subscription-id)
(sub-note/update-sub-notification db subscription-id last-notified-time)
(sub-note/save-sub-notification db subscription-id))
(do
(sub-note/save-sub-notification db subscription-id)
(sub-note/update-sub-notification db subscription-id last-notified-time)))
(errors/throw-service-error :not-found (msg/subscription-not-found subscription-id))))

(defn update-subscription-notification
Expand All @@ -29,25 +31,3 @@
(errors/throw-service-error
:not-found
(msg/subscription-not-found subscription-id)))))

(defn update-subscription-with-aws-arn
"Update the sub_notifications DB table with the subscription arn value."
[context subscription-id subscription-arn]
(let [errors (common-concepts/concept-id-validation subscription-id)
db (mdb-util/context->db context)]
(if (nil? errors)
(sub-note/update-sub-not-with-aws-arn db subscription-id subscription-arn)
(errors/throw-service-error
:not-found
(msg/subscription-not-found subscription-id)))))

(defn get-subscription-aws-arn
"Get the subscription ARN value from the sub_notifications DB table."
[context subscription-id]
(let [errors (common-concepts/concept-id-validation subscription-id)
db (mdb-util/context->db context)]
(if (nil? errors)
(:aws-arn (sub-note/get-sub-notification db subscription-id))
(errors/throw-service-error
:not-found
(msg/subscription-not-found subscription-id)))))
11 changes: 6 additions & 5 deletions metadata-db-app/src/cmr/metadata_db/services/subscriptions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,26 @@
(when (and subscriptions-enabled?
(= :subscription (:concept-type concept)))
(let [concept-edn (convert-concept-to-edn concept)]
(change-subscription context concept-edn)
concept-edn)))
(when (ingest-subscription-concept? concept-edn)
(change-subscription context concept-edn)
concept-edn))))

(defn add-subscription
"Add the subscription to the cache and subscribe the subscription to
the topic."
[context concept]
(when-let [concept-edn (add-delete-subscription context concept)]
(when-let [concept-edn (convert-concept-to-edn concept)]
(let [topic (get-in context [:system :sns :external])]
(topic-protocol/subscribe topic concept-edn))))

(defn delete-subscription
"Remove the subscription from the cache and unsubscribe the subscription from
the topic."
[context concept subscription-arn]
[context concept]
(when-let [concept-edn (add-delete-subscription context concept)]
(let [topic (get-in context [:system :sns :external])]
(topic-protocol/unsubscribe topic {:concept-id (:concept-id concept-edn)
:subscription-arn subscription-arn}))))
:subscription-arn (get-in concept-edn [:extra-fields :aws-arn])}))))

;;
;; The functions below are for refreshing the subscription cache if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@
(testing "adding and deleting subscriptions from the cache calling add-delete-subscription"
(let [db-contents '()]
(with-bindings {#'subscriptions/get-subscriptions-from-db (fn [_context _coll-concept-id] db-contents)}
(is (= {:metadata {:CollectionConceptId "C12345-PROV1"}
(is (= {:metadata {:CollectionConceptId "C12345-PROV1"
:EndPoint "ARN"
:Mode ["New" "Delete"]
:Method "ingest"}
:concept-type :subscription}
(subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\"}"
(subscriptions/add-delete-subscription test-context {:metadata "{\"CollectionConceptId\":\"C12345-PROV1\",
\"EndPoint\":\"ARN\",
\"Mode\":[\"New\", \"Delete\"],
\"Method\":\"ingest\"}"
:concept-type :subscription}))))))))

(def db-result-1
Expand Down Expand Up @@ -567,9 +573,11 @@
:extra-fields {:parent-collection-id "C1200000002-PROV1"}}]

;; if successful, the subscription concept-id is returned for local topic.
(subscriptions/add-delete-subscription test-context sub-concept)
(is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept)))

;; the subscription is replaced when the subscription already exists.
(subscriptions/add-delete-subscription test-context sub-concept)
(is (= (:concept-id sub-concept) (subscriptions/add-subscription test-context sub-concept)))

;; For this test add the subscription to the internal topic to test publishing.
Expand Down Expand Up @@ -609,9 +617,8 @@
(let [sub-concept {:metadata concept-metadata
:deleted true
:concept-type :subscription
:concept-id "SUB1200000005-PROV1"}
subscription-arn nil]
(is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept subscription-arn)))
:concept-id "SUB1200000005-PROV1"}]
(is (= (:concept-id sub-concept) (subscriptions/delete-subscription test-context sub-concept)))
;; Also remove subscription from internal queue.
(let [topic (get-in test-context [:system :sns :internal])]
(topic-protocol/unsubscribe topic sub-concept)))))))
Expand Down Expand Up @@ -654,10 +661,13 @@
\"DataGranule\": {\"Identifiers\": [{\"IdentifierType\": \"ProducerGranuleId\",
\"Identifier\": \"Algorithm-1\"}]}}"
:extra-fields {:parent-collection-id "C1200000002-PROV1"}}
subscription-arn (subscriptions/add-subscription test-context sub-concept)]
_ (subscriptions/add-delete-subscription test-context sub-concept)
subscription-arn (subscriptions/add-subscription test-context sub-concept)
sub-concept (assoc-in sub-concept [:extra-fields :aws-arn] subscription-arn)]

(is (some? subscription-arn))
(when subscription-arn
(is (some? (subscriptions/delete-subscription test-context sub-concept subscription-arn))))
(is (some? (subscriptions/delete-subscription test-context sub-concept))))

;; publish message. this should publish to the internal queue
(is (some? (subscriptions/work-potential-notification test-context granule-concept)))
Expand Down

0 comments on commit 20cd68d

Please sign in to comment.