Skip to content

Commit

Permalink
Backend: Replace RabbitMQ with SQS (Cloud-CV#1752)
Browse files Browse the repository at this point in the history
  • Loading branch information
deshraj authored Sep 4, 2018
1 parent 4080092 commit 1b6b854
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 139 deletions.
11 changes: 8 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ matrix:
allow_failures:
- python: "3.6"

env:
- AWS_SECRET_ACCESS_KEY=x AWS_ACCESS_KEY_ID=x AWS_SQS_ENDPOINT="http://localhost:9324"

addons:
postgresql: "9.4"
apt:
packages:
- oracle-java8-set-default

cache:
directories:
Expand All @@ -20,14 +26,13 @@ before_cache:

before_script:
- psql -c "CREATE DATABASE evalai" -U postgres
- wget https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-0.14.2.jar
- java -jar elasticmq-server-0.14.2.jar &

install:
- pip install -r requirements/dev.txt
- pip install coveralls

services:
- rabbitmq

script:
- flake8 ./
- py.test --cov . --cov-config .coveragerc
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ You can also use Docker Compose to run all the components of EvalAI together. Th

### Using Virtual Environment

1. Install [python] 2.7.10 or above, [git], [postgresql] version >= 10.1, [RabbitMQ] and [virtualenv], in your computer, if you don't have it already.
1. Install [python] 2.7.10 or above, [git], [postgresql] version >= 10.1, have ElasticMQ installed (Amazon SQS is used in production) and [virtualenv], in your computer, if you don't have it already.
*If you are having trouble with postgresql on Windows check this link [postgresqlhelp].*
2. Get the source code on your machine via git.
Expand Down Expand Up @@ -136,7 +136,7 @@ You can also use Docker Compose to run all the components of EvalAI together. Th

10. That's it, Open web browser and hit the url [http://127.0.0.1:8888].
11. (Optional) If you want to see the whole game into play, then start the RabbitMQ worker in a new terminal window using the following command that consumes the submissions done for every challenge:
11. (Optional) If you want to see the whole game into play, then install the ElasticMQ Queue service and start the worker in a new terminal window using the following command that consumes the submissions done for every challenge:
```
python scripts/workers/submission_worker.py
Expand All @@ -155,6 +155,6 @@ If you are interested in contributing to EvalAI, follow our [contribution guidel
[virtualenv]: https://virtualenv.pypa.io/
[postgresql]: http://www.postgresql.org/download/
[postgresqlhelp]: http://bobbyong.com/blog/installing-postgresql-on-windoes/
[rabbitmq]: https://www.rabbitmq.com/
[amazon SQS]: https://aws.amazon.com/sqs/
[http://127.0.0.1:8888]: http://127.0.0.1:8888
[http://127.0.0.1:8000]: http://127.0.0.1:8000
75 changes: 57 additions & 18 deletions apps/jobs/sender.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,70 @@
from __future__ import absolute_import

from django.conf import settings

import json
import pika
import logging
import os

import botocore
import boto3


logger = logging.getLogger(__name__)

def publish_submission_message(challenge_id, phase_id, submission_id):

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=settings.RABBITMQ_PARAMETERS['HOST']))
channel = connection.channel()
channel.exchange_declare(exchange='evalai_submissions', type='topic')
def get_or_create_sqs_queue():
"""
Returns:
Returns the SQS Queue object
"""
sqs = boto3.resource('sqs',
endpoint_url=os.environ.get('AWS_SQS_ENDPOINT', 'http://sqs:9324'),
region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-east-1'))

# though worker is creating the queue(queue creation is idempotent too)
# but lets create the queue here again, so that messages dont get missed
# later on we can apply a check on queue message length to raise some alert
# this way we will be notified of worker being up or not
channel.queue_declare(queue='submission_task_queue', durable=True)
AWS_SQS_QUEUE_NAME = os.environ.get('AWS_SQS_QUEUE_NAME', 'evalai_submission_queue')
# Check if the FIFO queue exists. If no, then create one
try:
queue = sqs.get_queue_by_name(QueueName=AWS_SQS_QUEUE_NAME)
except botocore.exceptions.ClientError as ex:
if ex.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
if settings.DEBUG:
queue = sqs.create_queue(QueueName=AWS_SQS_QUEUE_NAME)
else:
# create a FIFO queue in the production environment
name = AWS_SQS_QUEUE_NAME + '.fifo'
queue = sqs.create_queue(
QueueName=name,
Attributes={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true'
}
)
else:
logger.info("Cannot get or create Queue")
return queue


def publish_submission_message(challenge_id, phase_id, submission_id):
"""
Args:
challenge_id: Challenge Id
phase_id: Challenge Phase Id
submission_id: Submission Id
Returns:
Returns SQS response
"""
message = {
'challenge_id': challenge_id,
'phase_id': phase_id,
'submission_id': submission_id
'submission_id': submission_id,
}
channel.basic_publish(exchange='evalai_submissions',
routing_key='submission.*.*',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)) # make message persistent

print(" [x] Sent %r" % message)
connection.close()
queue = get_or_create_sqs_queue()
AWS_SQS_MESSAGE_GROUP_ID = os.environ.get('AWS_SQS_MESSAGE_GROUP_ID', 'evalai_msg_group')
response = queue.send_message(
MessageBody=json.dumps(message),
MessageGroupId=AWS_SQS_MESSAGE_GROUP_ID,
)
return response
25 changes: 5 additions & 20 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ services:
image: postgres
ports:
- "5432:5432"
networks:
- evalai

rabbitmq:
image: rabbitmq
hostname: rabbitmq
sqs:
image: pakohan/elasticmq
hostname: sqs
ports:
- "5672:5672"
- "15672:15672" # here, we can access rabbitmq management plugin
networks:
- evalai
- 9324:9324

django:
container_name: django
Expand All @@ -29,11 +24,8 @@ services:
- "8000:8000"
depends_on:
- db
- rabbitmq
volumes:
- .:/code
networks:
- evalai

worker:
env_file:
Expand All @@ -42,12 +34,10 @@ services:
context: ./
dockerfile: docker/dev/worker/Dockerfile
depends_on:
- rabbitmq
- sqs
- django
volumes:
- .:/code
networks:
- evalai

nodejs:
container_name: nodejs
Expand All @@ -63,8 +53,3 @@ services:
- .:/code
- /code/node_modules
- /code/bower_components
networks:
- evalai

networks:
evalai:
7 changes: 5 additions & 2 deletions docker/dev.env
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
AWS_SECRET_ACCESS_KEY=x
AWS_ACCESS_KEY_ID=x

DEBUG=True
DJANGO_SETTINGS_MODULE=settings.dev
DJANGO_SERVER=django

POSTGRES_NAME=postgres
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=db
POSTGRES_PORT=5432

DJANGO_SERVER=django

RABBITMQ_HOST=rabbitmq
3 changes: 2 additions & 1 deletion docker/dev/worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ ADD requirements/* /code/

RUN pip install -U cffi service_identity
RUN pip install -r dev.txt
RUN pip install -r worker.txt

CMD ["./docker/wait-for-it.sh", "django:8000", "--", "python", "scripts/workers/submission_worker.py" ]
CMD ["./docker/wait-for-it.sh", "django:8000", "--", "python", "-m", "scripts.workers.submission_worker"]
4 changes: 2 additions & 2 deletions docs/source/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Django is the heart of the application, which powers our backend. We use Django

We use Django Rest Framework for writing and providing REST APIs. Its permission and serializers have helped write a maintainable codebase.

#### RabbitMQ
#### Amazon SQS

We currently use RabbitMQ for queueing submission messages which are then later on processed by a Python worker.
We currently use Amazon SQS for queueing submission messages which are then later on processed by a Python worker.

#### PostgreSQL

Expand Down
6 changes: 3 additions & 3 deletions docs/source/architecture_decisions.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ One way to process the submission is to evaluate it as soon as it is made, hence

Hence we decided to process and evaluate submission message in an asynchronous manner. To process the messages this way, we need to change our architecture a bit and add a Message Framework, along with a worker so that it can process the message.

Out of all the awesome messaging frameworks available, we have chosen RabbitMQ because of its transactional nature and reliability. Also, RabbitMQ is easily horizontally scalable, which means we can easily handle the heavy load by simply adding more nodes to the cluster.
Out of all the awesome messaging frameworks available, we have chosen Amazon Simple Queue Service (SQS) because it can support decoupled environments. It allows developers to focus on application development, rather than creating their own sophisticated message-based applications. It also eliminates queuing management tasks, such as storage. SQS also works with AWS resources, so you can use it to make reliable and scalable applications on top of an AWS infrastructure.

For the worker, we went ahead with a normal python worker, which simply runs a process and loads all the required data in its memory. As soon as the worker starts, it listens on a RabbitMQ queue named `submission_task_queue` for new submission messages.
For the worker, we went ahead with a normal python worker, which simply runs a process and loads all the required data in its memory. As soon as the worker starts, it listens on a SQS queue named `evalai_submission_queue` for new submission messages.

### Submission Worker

The submission worker are responsible for processing submission messages. It listens on a queue named `submission_task_queue`, and on receiving a message for a submission, it processes and evaluates the submission.
The submission worker are responsible for processing submission messages. It listens on a queue named `evalai_submission_queue`, and on receiving a message for a submission, it processes and evaluates the submission.

One of the major design changes that we decided to implement in the submission worker was to load all the data related to the challenge in the worker's memory, instead of fetching it every time a new submission message arrives. So the worker, when starting, fetches the list of active challenges from the database and then loads it into memory by maintaining the map `EVALUATION_SCRIPTS` on challenge id. This was actually a major performance improvement.

Expand Down
33 changes: 17 additions & 16 deletions docs/source/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ sudo apt-get install git
sudo apt-get install postgresql libpq-dev
```

* Install rabbitmq
* Register and configure Amazon SQS

Follow [these
instructions](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-setting-up.html) for the detailed steps on how to setup Amazon SQS for the production environment.


For setting up a Queue service for development environment download the stand-alone [ElasticMQ distribution](https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-0.14.2.jar). Java 8 or above is required for running the server. Run the following command for which binds to localhost:9324, for running the ElasticMQ Queue service which mocks the Amazon SQS functionality.

```shell
echo 'deb http://www.rabbitmq.com/debian/ stable main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt-get update
sudo apt-get install rabbitmq-server
java -jar elasticmq-server-0.14.2.jar
```

* Install virtualenv
Expand Down Expand Up @@ -140,19 +144,16 @@ export PATH=$PATH:/usr/pgsql-x.x/bin
```
where `x.x` is your version, such as /usr/pgsql-9.5./bin.

* Install rabbitmq
* Register and configure Amazon SQS

Follow [these
instructions](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-setting-up.html) for the detailed steps on how to setup Amazon SQS for the production environment.


For setting up a Queue service for development environment download the stand-alone [ElasticMQ distribution](https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-0.14.2.jar). Java 8 or above is required for running the server. Run the following command for which binds to localhost:9324, for running the ElasticMQ Queue service which mocks the Amazon SQS functionality.

```shell
# use the below commands to get Erlang on our system:
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm
# Finally, download and install Erlang:
sudo yum install -y erlang
# Once we have Erlang, we can continue with installing RabbitMQ:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.2/rabbitmq-server-3.2.2-1.noarch.rpm
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo yum install rabbitmq-server-3.2.2-1.noarch.rpm
java -jar elasticmq-server-0.14.2.jar
```

* Install virtualenv
Expand Down Expand Up @@ -263,7 +264,7 @@ Follow this guide to setup your development machine.

### Step 1: Install prerequisites

* Install Python 2.x, Git, PostgreSQL version >= 9.4, RabbitMQ and virtualenv, in your computer, if you don't have it already.
* Install Python 2.x, Git, PostgreSQL version >= 9.4, have Amazon SQS configured or ElasticMQ installed and virtualenv, in your computer, if you don't have it already.

### Step 2: Get EvalAI Code

Expand Down
Binary file added elasticmq-server-0.14.2.jar
Binary file not shown.
Binary file added examples/example1/sample_evaluation_script.zip
Binary file not shown.
Binary file removed examples/example1/string_matching.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/example1/test_annotation.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
7
8
9
10
10
2 changes: 2 additions & 0 deletions requirements/common.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
asgi-redis==1.4.3
boto3==1.7.31
botocore==1.10.12
commonmark==0.5.4
django==1.11
django-import-export==0.5.1
Expand Down
4 changes: 4 additions & 0 deletions requirements/worker.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
networkx==2.1
numpy==1.14.5
scipy==1.1.0
sklearn==0.0
4 changes: 3 additions & 1 deletion scripts/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def check_database():
print("Are you sure you want to wipe the existing development database and reseed it? (Y/N)")
if settings.TEST or raw_input().lower() == "y":
destroy_database()
return True
else:
return False
else:
Expand Down Expand Up @@ -131,7 +132,8 @@ def create_challenge(title, start_date, end_date, host_team):
"""
Creates a challenge.
"""
evaluation_script = open(os.path.join(settings.BASE_DIR, 'examples', 'example1', 'string_matching.zip'), 'rb')
evaluation_script = open(
os.path.join(settings.BASE_DIR, 'examples', 'example1', 'sample_evaluation_script.zip'), 'rb')
Challenge.objects.create(
title=title,
short_description=fake.paragraph(),
Expand Down
Empty file added scripts/workers/__init__.py
Empty file.
Loading

0 comments on commit 1b6b854

Please sign in to comment.