From 890f1d0aafec96e8cd841abab4319ca119c3498f Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 28 Nov 2023 13:05:09 +0100 Subject: [PATCH 01/11] local-setup: Use ElasticMQ instead of fake_sqs for speed --- deploy/dockerephemeral/docker-compose.yaml | 6 +- deploy/dockerephemeral/docker/elasticmq.conf | 80 +++++++++++++++++++ .../dockerephemeral/federation-v0/brig.yaml | 2 +- .../dockerephemeral/federation-v0/galley.yaml | 2 +- deploy/dockerephemeral/init.sh | 24 ++---- integration/test/Testlib/ResourcePool.hs | 8 +- libs/types-common-aws/src/Util/Test/SQS.hs | 22 ++--- 7 files changed, 111 insertions(+), 33 deletions(-) create mode 100644 deploy/dockerephemeral/docker/elasticmq.conf diff --git a/deploy/dockerephemeral/docker-compose.yaml b/deploy/dockerephemeral/docker-compose.yaml index 564141eebc..b8bdebb4d9 100644 --- a/deploy/dockerephemeral/docker-compose.yaml +++ b/deploy/dockerephemeral/docker-compose.yaml @@ -34,10 +34,12 @@ services: fake_sqs: container_name: demo_wire_sqs -# image: airdock/fake-sqs:0.3.1 - image: julialongtin/airdock_fakesqs:0.0.9 + image: softwaremill/elasticmq-native:1.5.2 ports: - 127.0.0.1:4568:4568 + - 127.0.0.1:9325:9325 + volumes: + - ./docker/elasticmq.conf:/opt/elasticmq.conf networks: - demo_wire diff --git a/deploy/dockerephemeral/docker/elasticmq.conf b/deploy/dockerephemeral/docker/elasticmq.conf new file mode 100644 index 0000000000..7cd41d7317 --- /dev/null +++ b/deploy/dockerephemeral/docker/elasticmq.conf @@ -0,0 +1,80 @@ +include classpath("application.conf") + +# What is the outside visible address of this ElasticMQ node +# Used to create the queue URL (may be different from bind address!) +node-address { + protocol = http + host = localhost + port = 4568 + context-path = "" +} + +rest-sqs { + enabled = true + bind-port = 4568 + bind-hostname = "0.0.0.0" + # Possible values: relaxed, strict + sqs-limits = strict +} + +rest-stats { + enabled = true + bind-port = 9325 + bind-hostname = "0.0.0.0" +} + +# Should the node-address be generated from the bind port/hostname +# Set this to true e.g. when assigning port automatically by using port 0. +generate-node-address = false + +queues { + default-queue-template { + defaultVisibilityTimeout = 1s + } + + fifo-queue-template { + defaultVisibilityTimeout = 1s + fifo = true + } + + integration-brig-events = ${queues.default-queue-template} + integration-brig-events2 = ${queues.default-queue-template} + integration-brig-events3 = ${queues.default-queue-template} + integration-brig-events4 = ${queues.default-queue-template} + integration-brig-events5 = ${queues.default-queue-template} + integration-brig-events-federation-v0 = ${queues.default-queue-template} + + integration-brig-events-internal = ${queues.default-queue-template} + integration-brig-events-internal2 = ${queues.default-queue-template} + integration-brig-events-internal3 = ${queues.default-queue-template} + integration-brig-events-internal4 = ${queues.default-queue-template} + integration-brig-events-internal5 = ${queues.default-queue-template} + integration-brig-events-internal-federation-v0 = ${queues.default-queue-template} + + "integration-user-events.fifo" = ${queues.fifo-queue-template} + "integration-user-events2.fifo" = ${queues.fifo-queue-template} + "integration-user-events3.fifo" = ${queues.fifo-queue-template} + "integration-user-events4.fifo" = ${queues.fifo-queue-template} + "integration-user-events5.fifo" = ${queues.fifo-queue-template} + "integration-user-events-federation-v0.fifo" = ${queues.fifo-queue-template} + + integration-gundeck-events = ${queues.default-queue-template} + integration-gundeck-events2 = ${queues.default-queue-template} + integration-gundeck-events3 = ${queues.default-queue-template} + integration-gundeck-events4 = ${queues.default-queue-template} + integration-gundeck-events5 = ${queues.default-queue-template} + integration-gundeck-events-federation-v0 = ${queues.default-queue-template} + + "integration-team-events.fifo" = ${queues.fifo-queue-template} + "integration-team-events2.fifo" = ${queues.fifo-queue-template} + "integration-team-events3.fifo" = ${queues.fifo-queue-template} + "integration-team-events4.fifo" = ${queues.fifo-queue-template} + "integration-team-events5.fifo" = ${queues.fifo-queue-template} + "integration-team-events-federation-v0.fifo" = ${queues.fifo-queue-template} +} + +# Region and accountId which will be included in resource ids +aws { + region = eu-west-1 + accountId = 000000000000 +} \ No newline at end of file diff --git a/deploy/dockerephemeral/federation-v0/brig.yaml b/deploy/dockerephemeral/federation-v0/brig.yaml index 693ba49278..06dfefe80e 100644 --- a/deploy/dockerephemeral/federation-v0/brig.yaml +++ b/deploy/dockerephemeral/federation-v0/brig.yaml @@ -36,7 +36,7 @@ federatorInternal: # You can set up local SQS/Dynamo running e.g. `../../deploy/dockerephemeral/run.sh` aws: - userJournalQueue: integration-user-events.fifo-federation-v0 + userJournalQueue: integration-user-events-federation-v0.fifo # ^ Comment this out if you don't want to journal user events prekeyTable: integration-brig-prekeys-federation-v0 sqsEndpoint: http://fake_sqs:4568 # https://sqs.eu-west-1.amazonaws.com diff --git a/deploy/dockerephemeral/federation-v0/galley.yaml b/deploy/dockerephemeral/federation-v0/galley.yaml index 6879901c48..ab2644a8ef 100644 --- a/deploy/dockerephemeral/federation-v0/galley.yaml +++ b/deploy/dockerephemeral/federation-v0/galley.yaml @@ -88,6 +88,6 @@ logLevel: Warn logNetStrings: false journal: # if set, journals; if not set, disables journaling - queueName: integration-team-events.fifo-federation-v0 + queueName: integration-team-events-federation-v0.fifo endpoint: http://demo_wire_sqs:4568 # https://sqs.eu-west-1.amazonaws.com region: eu-west-1 diff --git a/deploy/dockerephemeral/init.sh b/deploy/dockerephemeral/init.sh index f10067319a..7f11fc7ee0 100755 --- a/deploy/dockerephemeral/init.sh +++ b/deploy/dockerephemeral/init.sh @@ -22,25 +22,10 @@ for suffix in "" "2" "3" "4" "5" "-federation-v0"; do aws --endpoint-url=http://dynamodb:8000 dynamodb delete-table --table-name integration-brig-userkey-blacklist$suffix || true aws --endpoint-url=http://dynamodb:8000 dynamodb delete-table --table-name integration-brig-prekeys$suffix || true - # Create Dynamo/SQS resources + # Create Dynamo resources exec_until_ready "aws --endpoint-url=http://dynamodb:8000 dynamodb create-table --table-name integration-brig-userkey-blacklist$suffix --attribute-definitions AttributeName=key,AttributeType=S --key-schema AttributeName=key,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5" exec_until_ready "aws --endpoint-url=http://dynamodb:8000 dynamodb create-table --table-name integration-brig-prekeys$suffix --attribute-definitions AttributeName=client,AttributeType=S --key-schema AttributeName=client,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-brig-events$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-brig-events$suffix --attributes VisibilityTimeout=1" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-brig-events-internal$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-brig-events-internal$suffix --attributes VisibilityTimeout=1" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-user-events.fifo$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-user-events.fifo$suffix --attributes VisibilityTimeout=1" - - # Gundeck's feedback queue - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-gundeck-events$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-gundeck-events$suffix --attributes VisibilityTimeout=1" - - # Galley's team event queue - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs create-queue --queue-name integration-team-events.fifo$suffix" - exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs set-queue-attributes --queue-url http://sqs:4568/integration-team-events.fifo$suffix --attributes VisibilityTimeout=1" - # Verify sender's email address (ensure the sender address is in sync with the config in brig) exec_until_ready "aws --endpoint-url=http://ses:4579 ses verify-email-identity --email-address backend-integration$suffix@wire.com" @@ -53,6 +38,13 @@ for suffix in "" "2" "3" "4" "5" "-federation-v0"; do # TODO: Lifecycle configuration for the bucket, if supported. aws --endpoint-url=http://s3:9000 s3api create-bucket --bucket "dummy-bucket$suffix" aws --endpoint-url=http://s3:9000 s3api wait bucket-exists --bucket "dummy-bucket$suffix" + + # Check that SQS resources are created + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-brig-events$suffix" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-brig-events-internal$suffix" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-user-events$suffix.fifo" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-gundeck-events" + exec_until_ready "aws --endpoint-url=http://sqs:4568 sqs get-queue-url --queue-name integration-team-events$suffix.fifo" done echo 'AWS resources created successfully!' diff --git a/integration/test/Testlib/ResourcePool.hs b/integration/test/Testlib/ResourcePool.hs index c7483ca947..f8e72ccd2e 100644 --- a/integration/test/Testlib/ResourcePool.hs +++ b/integration/test/Testlib/ResourcePool.hs @@ -94,14 +94,14 @@ backendResources dynConfs = berFederatorInternal = Ports.portForDyn (Ports.ServiceInternal FederatorInternal) i, berFederatorExternal = dynConf.federatorExternalPort, berDomain = dynConf.domain, - berAwsUserJournalQueue = "integration-user-events.fifo" <> suffix i, + berAwsUserJournalQueue = "integration-user-events" <> suffix i <> ".fifo", berAwsPrekeyTable = "integration-brig-prekeys" <> suffix i, berAwsS3Bucket = "dummy-bucket" <> suffix i, berAwsQueueName = "integration-gundeck-events" <> suffix i, berBrigInternalEvents = "integration-brig-events-internal" <> suffix i, berEmailSMSSesQueue = "integration-brig-events" <> suffix i, berEmailSMSEmailSender = "backend-integration" <> suffix i <> "@wire.com", - berGalleyJournal = "integration-team-events.fifo" <> suffix i, + berGalleyJournal = "integration-team-events" <> suffix i <> ".fifo", berVHost = dynConf.domain, berNginzSslPort = Ports.portForDyn Ports.NginzSSL i, berNginzHttp2Port = Ports.portForDyn Ports.NginzHttp2 i, @@ -151,14 +151,14 @@ backendB = berFederatorInternal = Ports.port (Ports.ServiceInternal FederatorInternal) BackendB, berFederatorExternal = Ports.port Ports.FederatorExternal BackendB, berDomain = "b.example.com", - berAwsUserJournalQueue = "integration-user-events.fifo2", + berAwsUserJournalQueue = "integration-user-events2.fifo", berAwsPrekeyTable = "integration-brig-prekeys2", berAwsS3Bucket = "dummy-bucket2", berAwsQueueName = "integration-gundeck-events2", berBrigInternalEvents = "integration-brig-events-internal2", berEmailSMSSesQueue = "integration-brig-events2", berEmailSMSEmailSender = "backend-integration2@wire.com", - berGalleyJournal = "integration-team-events.fifo2", + berGalleyJournal = "integration-team-events2.fifo", -- FUTUREWORK: set up vhosts in dev/ci for example.com and b.example.com -- in case we want backendA and backendB to federate with a third backend -- (because otherwise both queues will overlap) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index c94188c2ae..9f5dc7f1ec 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -54,14 +54,18 @@ data SQSWatcher a = SQSWatcher -- the queue has too many things in it before the tests start. -- Note that the purgeQueue command is not guaranteed to be instant (can take up to 60 seconds) -- Hopefully, the fake-aws implementation used during tests is fast enough. -watchSQSQueue :: Message a => AWS.Env -> Text -> IO (SQSWatcher a) -watchSQSQueue env queueUrl = do +watchSQSQueue :: (Message a) => AWS.Env -> Text -> IO (SQSWatcher a) +watchSQSQueue env queueName = do eventsRef <- newIORef [] - ensureEmpty - process <- async $ recieveLoop eventsRef + + queueUrlRes <- execute env . sendEnv $ SQS.newGetQueueUrl queueName + let queueUrl = view SQS.getQueueUrlResponse_queueUrl queueUrlRes + + ensureEmpty queueUrl + process <- async $ recieveLoop queueUrl eventsRef pure $ SQSWatcher process eventsRef where - recieveLoop ref = do + recieveLoop queueUrl ref = do let rcvReq = SQS.newReceiveMessage queueUrl & set SQS.receiveMessage_waitTimeSeconds (Just 100) @@ -74,10 +78,10 @@ watchSQSQueue env queueUrl = do [] -> pure () _ -> atomicModifyIORef ref $ \xs -> (parsedMsgs <> xs, ()) - recieveLoop ref + recieveLoop queueUrl ref - ensureEmpty :: IO () - ensureEmpty = void $ execute env $ sendEnv (SQS.newPurgeQueue queueUrl) + ensureEmpty :: Text -> IO () + ensureEmpty queueUrl = void $ execute env $ sendEnv (SQS.newPurgeQueue queueUrl) -- | Waits for a message matching a predicate for a given number of seconds. waitForMessage :: (MonadUnliftIO m, Eq a) => SQSWatcher a -> Int -> (a -> Bool) -> m (Maybe a) @@ -95,7 +99,7 @@ waitForMessage watcher seconds predicate = timeout (seconds * 1_000_000) poll -- an assertion on such a message. assertMessage :: (MonadUnliftIO m, Eq a, HasCallStack) => SQSWatcher a -> String -> (a -> Bool) -> (String -> Maybe a -> m ()) -> m () assertMessage watcher label predicate callback = do - matched <- waitForMessage watcher 10 predicate + matched <- waitForMessage watcher 5 predicate callback label matched ----------------------------------------------------------------------------- From c999638513a466d8edd95b1f1d578c8ae235c34e Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Thu, 30 Nov 2023 12:00:49 +0100 Subject: [PATCH 02/11] chrats/fake-aws-sqs: Use ElasticMQ --- charts/fake-aws-sqs/templates/configmap.yaml | 60 +++++++++++++++++++ charts/fake-aws-sqs/templates/deployment.yaml | 47 ++++----------- charts/fake-aws-sqs/values.yaml | 4 +- 3 files changed, 75 insertions(+), 36 deletions(-) create mode 100644 charts/fake-aws-sqs/templates/configmap.yaml diff --git a/charts/fake-aws-sqs/templates/configmap.yaml b/charts/fake-aws-sqs/templates/configmap.yaml new file mode 100644 index 0000000000..baac9d15bc --- /dev/null +++ b/charts/fake-aws-sqs/templates/configmap.yaml @@ -0,0 +1,60 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "fullname" . }} + labels: + app: {{ template "fullname" . }} + chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" + release: "{{ .Release.Name }}" + heritage: "{{ .Release.Service }}" +data: + elasticmq.conf: | + include classpath("application.conf") + + # What is the outside visible address of this ElasticMQ node + # Used to create the queue URL (may be different from bind address!) + node-address { + protocol = http + host = localhost + port = {{ .Values.service.httpPort }} + context-path = "" + } + + rest-sqs { + enabled = true + bind-port = {{ .Values.service.httpPort }} + bind-hostname = "0.0.0.0" + # Possible values: relaxed, strict + sqs-limits = strict + } + + rest-stats { + enabled = true + bind-port = 9325 + bind-hostname = "0.0.0.0" + } + + # Should the node-address be generated from the bind port/hostname + # Set this to true e.g. when assigning port automatically by using port 0. + generate-node-address = false + + queues { + {{- range $i, $queueName := .Values.queueNames }} + "{{ $queueName }}" { + {{- if hasSuffix ".fifo" $queueName }} + fifo = true + {{- end }} + } + {{- end }} + } + + messages-storage { + enabled = true + uri = "jdbc:h2:/data/elasticmq.db" + } + + # Region and accountId which will be included in resource ids + aws { + region = eu-west-1 + accountId = 000000000000 + } diff --git a/charts/fake-aws-sqs/templates/deployment.yaml b/charts/fake-aws-sqs/templates/deployment.yaml index 24055e126a..39848020c9 100644 --- a/charts/fake-aws-sqs/templates/deployment.yaml +++ b/charts/fake-aws-sqs/templates/deployment.yaml @@ -16,6 +16,8 @@ spec: metadata: labels: app: {{ template "fullname" . }} + annotations: + checksum/configmap: {{ include (print .Template.BasePath "/configmap.yaml") . | sha256sum }} spec: containers: - name: fake-aws-sqs @@ -24,45 +26,22 @@ spec: - containerPort: {{ .Values.service.httpPort }} name: http protocol: TCP + command: + - /sbin/tini + - -- + - /opt/elasticmq/bin/elasticmq-native-server + - -Dconfig.file=/config/elasticmq.conf + - -Dlogback.configurationFile=/opt/logback.xml volumeMounts: - name: storage mountPath: /data + - name: config + mountPath: /config resources: {{ toYaml .Values.resources | indent 12 }} - - name: initiate-fake-aws-sqs - image: mesosphere/aws-cli:1.14.5 - command: [/bin/sh] - args: - - -c - - | - exec_until_ready() { - until $1; do echo 'service not ready yet'; sleep 1; done - } - queue_exists() { - # NOTE: we use the '"' to match the queue name more exactly (otherwise there is some overlap) - OUTPUT=$(aws --endpoint-url=http://localhost:{{ $.Values.service.httpPort }} sqs list-queues | grep $1'"' | wc -l) - echo $OUTPUT - } - - echo 'Creating AWS resources' - aws configure set aws_access_key_id dummy - aws configure set aws_secret_access_key dummy - aws configure set region eu-west-1 - - while true - do - # Recreate resources if needed - {{ range $i, $queueName := .Values.queueNames }} - QUEUE=$(queue_exists "{{ $queueName }}") - if [ "$QUEUE" == "1" ] - then echo "Queue {{ $queueName }} exists, no need to re-create" - else exec_until_ready "aws --endpoint-url=http://localhost:{{ $.Values.service.httpPort }} sqs create-queue --queue-name {{ $queueName }}" - fi - {{ end }} - - echo 'Sleeping 10' - sleep 10 - done volumes: - emptyDir: {} name: "storage" + - name: config + configMap: + name: {{ template "fullname" . }} diff --git a/charts/fake-aws-sqs/values.yaml b/charts/fake-aws-sqs/values.yaml index 4f46cd50d9..a516170698 100644 --- a/charts/fake-aws-sqs/values.yaml +++ b/charts/fake-aws-sqs/values.yaml @@ -1,6 +1,6 @@ image: - repository: airdock/fake-sqs - tag: 0.3.1 + repository: softwaremill/elasticmq-native + tag: 1.5.2 # TODO: in a wire-server chart, these queue names should match the ones defined in galley/brig/gundeck (i.e. only be defined once) queueNames: From 8b784baecfd3c9b6c0d3de916e0fb356ec793277 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Thu, 30 Nov 2023 18:00:42 +0100 Subject: [PATCH 03/11] Changelog --- changelog.d/0-release-notes/elasticmq | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 changelog.d/0-release-notes/elasticmq diff --git a/changelog.d/0-release-notes/elasticmq b/changelog.d/0-release-notes/elasticmq new file mode 100644 index 0000000000..3ce140d0c4 --- /dev/null +++ b/changelog.d/0-release-notes/elasticmq @@ -0,0 +1,8 @@ +* Replace fake-sqs with ElasticMQ + +ElasticMQ is an actively maintained project, fake-sqs hasn't seen a commit since +2018. This is not expected to have any noticeable effect on deployments that +don't have any extra configurations for the SQS queues. If the fake-aws-sqs +chart had configured custom queue names, they have couple of extra limitations: +- The queue names must only contain alphanumeric characters and hyphens. +- The FIFO queue names must end in `.fifo`. \ No newline at end of file From f190a0f17d7be612977acbdd7140c40089fd98b5 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Mon, 4 Dec 2023 14:55:23 +0100 Subject: [PATCH 04/11] CI Setup: Create SQS queues using config Not sure why we created the script, perhaps people didn't know about existence of this config value. --- .../templates/integration-integration.yaml | 1 - hack/helm_vars/fake-aws/values.yaml | 26 +++++++++++++++++++ .../integration-dynamic-backends-sqs.sh | 26 ------------------- nix/wire-server.nix | 8 ------ 4 files changed, 26 insertions(+), 35 deletions(-) delete mode 100755 integration/scripts/integration-dynamic-backends-sqs.sh diff --git a/charts/integration/templates/integration-integration.yaml b/charts/integration/templates/integration-integration.yaml index 044b63b3b9..2fe7718fa5 100644 --- a/charts/integration/templates/integration-integration.yaml +++ b/charts/integration/templates/integration-integration.yaml @@ -113,7 +113,6 @@ spec: # FUTUREWORK: Do all of this in the integration test binary integration-dynamic-backends-db-schemas.sh --host {{ .Values.config.cassandra.host }} --port {{ .Values.config.cassandra.port }} --replication-factor {{ .Values.config.cassandra.replicationFactor }} integration-dynamic-backends-brig-index.sh --elasticsearch-server http://{{ .Values.config.elasticsearch.host }}:9200 - integration-dynamic-backends-sqs.sh {{ .Values.config.sqsEndpointUrl }} integration-dynamic-backends-ses.sh {{ .Values.config.sesEndpointUrl }} integration-dynamic-backends-s3.sh {{ .Values.config.s3EndpointUrl }} {{- range $name, $dynamicBackend := .Values.config.dynamicBackends }} diff --git a/hack/helm_vars/fake-aws/values.yaml b/hack/helm_vars/fake-aws/values.yaml index 543867c48c..9b5f18ad87 100644 --- a/hack/helm_vars/fake-aws/values.yaml +++ b/hack/helm_vars/fake-aws/values.yaml @@ -1,3 +1,29 @@ fake-aws-ses: enabled: true sesSender: "backend-integrationk8s@wire.com" + +fake-aws-sqs: + queueNames: + - "integration-brig-events" + - "integration-brig-events-internal" + - "integration-gundeck-events" + - "integration-user-events.fifo" + - "integration-team-events.fifo" + + - "integration-brig-events3" + - "integration-brig-events-internal3" + - "integration-gundeck-events3" + - "integration-user-events3.fifo" + - "integration-team-events3.fifo" + + - "integration-brig-events4" + - "integration-brig-events-internal4" + - "integration-gundeck-events4" + - "integration-user-events4.fifo" + - "integration-team-events4.fifo" + + - "integration-brig-events5" + - "integration-brig-events-internal5" + - "integration-gundeck-events5" + - "integration-user-events5.fifo" + - "integration-team-events5.fifo" diff --git a/integration/scripts/integration-dynamic-backends-sqs.sh b/integration/scripts/integration-dynamic-backends-sqs.sh deleted file mode 100755 index d85a3c85f0..0000000000 --- a/integration/scripts/integration-dynamic-backends-sqs.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env bash - -set -eo pipefail - -ENDPOINT_URL=$1 - -for i in $(seq "$INTEGRATION_DYNAMIC_BACKENDS_POOLSIZE"); do - suffix=$((i + 2)) - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name integration-brig-events$suffix - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-brig-events$suffix" --attributes VisibilityTimeout=1 - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name integration-brig-events-internal$suffix - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-brig-events-internal$suffix" --attributes VisibilityTimeout=1 - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name integration-user-events.fifo$suffix - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-user-events.fifo$suffix" --attributes VisibilityTimeout=1 - - # Gundeck's feedback queue - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name "integration-gundeck-events$suffix" - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-gundeck-events$suffix" --attributes VisibilityTimeout=1 - - # Galley's team event queue - aws --endpoint-url="$ENDPOINT_URL" sqs create-queue --queue-name "integration-team-events.fifo$suffix" - aws --endpoint-url="$ENDPOINT_URL" sqs set-queue-attributes --queue-url "$ENDPOINT_URL/integration-team-events.fifo$suffix" --attributes VisibilityTimeout=1 -done - -echo 'AWS sqs queues created successfully!' - diff --git a/nix/wire-server.nix b/nix/wire-server.nix index 82a0211e42..89356ed00c 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -223,13 +223,6 @@ let checkPhase = ""; }; - integration-dynamic-backends-sqs = pkgs.writeShellApplication { - name = "integration-dynamic-backends-sqs.sh"; - text = "${builtins.readFile ../integration/scripts/integration-dynamic-backends-sqs.sh}"; - runtimeInputs = [ pkgs.parallel pkgs.awscli2 ]; - checkPhase = ""; - }; - integration-dynamic-backends-ses = pkgs.writeShellApplication { name = "integration-dynamic-backends-ses.sh"; text = "${builtins.readFile ../integration/scripts/integration-dynamic-backends-ses.sh}"; @@ -286,7 +279,6 @@ let pkgs.awscli2 integration-dynamic-backends-db-schemas integration-dynamic-backends-brig-index - integration-dynamic-backends-sqs integration-dynamic-backends-ses integration-dynamic-backends-s3 integration-dynamic-backends-vhosts From fead4132f858764c32929acc3bd47a161f3f3351 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Mon, 4 Dec 2023 15:47:03 +0100 Subject: [PATCH 05/11] SQSWatcher: Use smaller wait time ElasticMQ allows max 20 seconds. --- libs/types-common-aws/src/Util/Test/SQS.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index 9f5dc7f1ec..da365d8cd9 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -68,7 +68,7 @@ watchSQSQueue env queueName = do recieveLoop queueUrl ref = do let rcvReq = SQS.newReceiveMessage queueUrl - & set SQS.receiveMessage_waitTimeSeconds (Just 100) + & set SQS.receiveMessage_waitTimeSeconds (Just 10) . set SQS.receiveMessage_maxNumberOfMessages (Just 1) . set SQS.receiveMessage_visibilityTimeout (Just 1) rcvRes <- execute env $ sendEnv rcvReq From d9d31efb2fa09860f53d768a79bf522b1519f87d Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Mon, 4 Dec 2023 16:05:45 +0100 Subject: [PATCH 06/11] SQSWatcher: Ensure thread being killed is flagged properly --- libs/types-common-aws/src/Util/Test/SQS.hs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index da365d8cd9..6554a953cb 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -5,8 +5,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} --- Disabling for HasCallStack -{-# OPTIONS_GHC -Wno-redundant-constraints #-} -- This file is part of the Wire Server implementation. -- @@ -37,7 +35,8 @@ import Data.ProtoLens import Data.Text.Encoding qualified as Text import Imports import Safe (headDef) -import UnliftIO (Async, async) +import UnliftIO (Async, async, throwIO) +import UnliftIO.Async qualified as Async import UnliftIO.Resource (MonadResource, ResourceT) import UnliftIO.Timeout (timeout) @@ -83,11 +82,22 @@ watchSQSQueue env queueName = do ensureEmpty :: Text -> IO () ensureEmpty queueUrl = void $ execute env $ sendEnv (SQS.newPurgeQueue queueUrl) +data SQSWatcherError = BackgroundThreadNotRunning String + deriving (Show) + +instance Exception SQSWatcherError + -- | Waits for a message matching a predicate for a given number of seconds. -waitForMessage :: (MonadUnliftIO m, Eq a) => SQSWatcher a -> Int -> (a -> Bool) -> m (Maybe a) +waitForMessage :: forall m a. (MonadUnliftIO m, Eq a, HasCallStack) => SQSWatcher a -> Int -> (a -> Bool) -> m (Maybe a) waitForMessage watcher seconds predicate = timeout (seconds * 1_000_000) poll where + poll :: (HasCallStack) => m a poll = do + -- Check if the background thread is still alive. If not fail with a nicer error + Async.poll watcher.watcherProcess >>= \case + Nothing -> pure () + Just (Left err) -> throwIO $ BackgroundThreadNotRunning $ "Thread finished with exception: " <> show err + Just (Right ()) -> throwIO $ BackgroundThreadNotRunning "Thread finished without any exceptions when it was supposed to run forever" matched <- atomicModifyIORef (events watcher) $ \events -> case filter predicate events of [] -> (events, Nothing) From a01e1228c28648cbf9b4de025bbb3d8053aaf249 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Mon, 4 Dec 2023 16:51:08 +0100 Subject: [PATCH 07/11] SQSWatcher: Use 5 concurrent loops to increase throughput Each recieve takes 300ms. When 16 tests run in parallel, this poor thread cannot keep up and causes timeouts. Instead of increasing the timeout increasing threads will ensure tests don't fail. --- libs/types-common-aws/src/Util/Test/SQS.hs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index 6554a953cb..020274cd06 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -61,7 +61,15 @@ watchSQSQueue env queueName = do let queueUrl = view SQS.getQueueUrlResponse_queueUrl queueUrlRes ensureEmpty queueUrl - process <- async $ recieveLoop queueUrl eventsRef + process <- async $ do + loop1 <- async $ recieveLoop queueUrl eventsRef + loop2 <- async $ recieveLoop queueUrl eventsRef + loop3 <- async $ recieveLoop queueUrl eventsRef + loop4 <- async $ recieveLoop queueUrl eventsRef + loop5 <- async $ recieveLoop queueUrl eventsRef + _ <- Async.waitAny [loop1, loop2, loop3, loop4, loop5] + throwIO $ BackgroundThreadNotRunning $ "One of the SQS recieve loops finished, all of them are supposed to run forever" + pure $ SQSWatcher process eventsRef where recieveLoop queueUrl ref = do From a35424b196e38d2121505e9e53ff9c1a5dd6c1b1 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 5 Dec 2023 11:18:59 +0100 Subject: [PATCH 08/11] Add comment to explain 5 recieve loops --- libs/types-common-aws/src/Util/Test/SQS.hs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index 020274cd06..cd2d810a2f 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -62,6 +62,10 @@ watchSQSQueue env queueName = do ensureEmpty queueUrl process <- async $ do + -- Every receive request takes ~300ms (on my machine). This puts a limit of + -- ~3 notifications per second. Which makes tests reallly slow. SQS scales + -- pretty well with multiple consumers, so we start 5 consumers here to bump + -- the max throughput to about ~15 notifications per second. loop1 <- async $ recieveLoop queueUrl eventsRef loop2 <- async $ recieveLoop queueUrl eventsRef loop3 <- async $ recieveLoop queueUrl eventsRef From 6066dd218cd6ab877c9e0439d588fbff7b07e883 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 5 Dec 2023 11:19:52 +0100 Subject: [PATCH 09/11] Fix typo --- libs/types-common-aws/src/Util/Test/SQS.hs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libs/types-common-aws/src/Util/Test/SQS.hs b/libs/types-common-aws/src/Util/Test/SQS.hs index cd2d810a2f..f91b649d89 100644 --- a/libs/types-common-aws/src/Util/Test/SQS.hs +++ b/libs/types-common-aws/src/Util/Test/SQS.hs @@ -66,17 +66,17 @@ watchSQSQueue env queueName = do -- ~3 notifications per second. Which makes tests reallly slow. SQS scales -- pretty well with multiple consumers, so we start 5 consumers here to bump -- the max throughput to about ~15 notifications per second. - loop1 <- async $ recieveLoop queueUrl eventsRef - loop2 <- async $ recieveLoop queueUrl eventsRef - loop3 <- async $ recieveLoop queueUrl eventsRef - loop4 <- async $ recieveLoop queueUrl eventsRef - loop5 <- async $ recieveLoop queueUrl eventsRef + loop1 <- async $ receiveLoop queueUrl eventsRef + loop2 <- async $ receiveLoop queueUrl eventsRef + loop3 <- async $ receiveLoop queueUrl eventsRef + loop4 <- async $ receiveLoop queueUrl eventsRef + loop5 <- async $ receiveLoop queueUrl eventsRef _ <- Async.waitAny [loop1, loop2, loop3, loop4, loop5] - throwIO $ BackgroundThreadNotRunning $ "One of the SQS recieve loops finished, all of them are supposed to run forever" + throwIO $ BackgroundThreadNotRunning $ "One of the SQS receive loops finished, all of them are supposed to run forever" pure $ SQSWatcher process eventsRef where - recieveLoop queueUrl ref = do + receiveLoop queueUrl ref = do let rcvReq = SQS.newReceiveMessage queueUrl & set SQS.receiveMessage_waitTimeSeconds (Just 10) @@ -89,7 +89,7 @@ watchSQSQueue env queueName = do [] -> pure () _ -> atomicModifyIORef ref $ \xs -> (parsedMsgs <> xs, ()) - recieveLoop queueUrl ref + receiveLoop queueUrl ref ensureEmpty :: Text -> IO () ensureEmpty queueUrl = void $ execute env $ sendEnv (SQS.newPurgeQueue queueUrl) From 1bf79cf796a44944e9fc3058437549f4bbd97c1b Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 5 Dec 2023 11:22:18 +0100 Subject: [PATCH 10/11] Explain lack of "2" set of queues --- hack/helm_vars/fake-aws/values.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hack/helm_vars/fake-aws/values.yaml b/hack/helm_vars/fake-aws/values.yaml index 9b5f18ad87..57bb23acb0 100644 --- a/hack/helm_vars/fake-aws/values.yaml +++ b/hack/helm_vars/fake-aws/values.yaml @@ -10,6 +10,13 @@ fake-aws-sqs: - "integration-user-events.fifo" - "integration-team-events.fifo" + # No need for the set of queues with "2" as the suffix because the second + # deployment of wire-server runs in a separate namespace (the "-fed2" + # namespace) with its own fake-aws-sqs. + # + # In that namespace these extra queues will be unused, but its easier to + # create them and not use them than to not create them. It shouldn't create + # any significant perfomance degradation. - "integration-brig-events3" - "integration-brig-events-internal3" - "integration-gundeck-events3" From 0e3fa4645e0d39fb92c9d1257b4cdc9fab6d3ccc Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Mon, 11 Dec 2023 09:37:58 +0100 Subject: [PATCH 11/11] brig-integration: Use the queue name for SQSWatcher Galley uses the queue name, brig was using the queue-url, this is not correct. With the old fake-sqs implementation it still worked. --- services/brig/test/integration/Run.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/brig/test/integration/Run.hs b/services/brig/test/integration/Run.hs index a987096b8c..7fe1b37bda 100644 --- a/services/brig/test/integration/Run.hs +++ b/services/brig/test/integration/Run.hs @@ -145,7 +145,7 @@ runTests iConf brigOpts otherArgs = do let fedGalleyClient = FedClient @'Galley mg (galley iConf) emailAWSOpts <- parseEmailAWSOpts awsEnv <- AWS.mkEnv lg awsOpts emailAWSOpts mg - mUserJournalWatcher <- for (view AWS.userJournalQueue awsEnv) $ SQS.watchSQSQueue (view AWS.amazonkaEnv awsEnv) + mUserJournalWatcher <- for (Opts.userJournalQueue awsOpts) $ SQS.watchSQSQueue (view AWS.amazonkaEnv awsEnv) userApi <- User.tests brigOpts fedBrigClient fedGalleyClient mg b c ch g n awsEnv db mUserJournalWatcher providerApi <- Provider.tests localDomain (provider iConf) mg db b c g n searchApis <- Search.tests brigOpts mg g b