Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARXIVNG-419 refactored indexing agent in Python #154

Merged
merged 7 commits into from
Apr 16, 2018

Conversation

erickpeirson
Copy link
Contributor

It was kind of a drag to have to depend on the Java-based MultiLangDaemon provided by AWS to run Kinesis consumers, especially when there is such good support for Kinesis in Python (via boto3). It seemed like the right thing to do to implement a consumer entirely in Python, so that we have direct visibility onto the whole thing. It also gives us a lot more flexibility for how we implement the consumer, and frees us up from some pointless dependencies (MultiLangDaemon used DynamoDB for checkpointing, and CloudWatch for metrics, with little flexibility).

If we're happy with the BaseConsumer implementation, that's something that we could think about making part of arXiv base for future Kinesis integrations.

I added the following to the README:

Running the indexing agent.

The indexing agent is responsible for updating the search index as new papers
are published. By default, docker-compose will also start the search index
and a service called Localstack
that provides a local Kinesis stream for testing/development purposes.

To disable the agent and localstack, just comment out those services in
docker-compose.yml.

The agent takes a little longer than the other services to start. Early in the
startup, you'll see something like:

agent            | application 12/Apr/2018:15:43:13 +0000 - search.agent.base - None - [arxiv:null] - INFO: "New consumer for MetadataIsAvailable (0)"
agent            | application 12/Apr/2018:15:43:13 +0000 - search.agent.base - None - [arxiv:null] - INFO: "Getting a new connection to Kinesis at https://localstack:4568 in region us-east-1, with SSL verification=False"
agent            | application 12/Apr/2018:15:43:13 +0000 - search.agent.base - None - [arxiv:null] - INFO: "Waiting for MetadataIsAvailable to be available"
agent            | application 12/Apr/2018:15:43:13 +0000 - search.agent.base - None - [arxiv:null] - ERROR: "Waiting for stream MetadataIsAvailable"

A little while later, when localstack and the indexing agent are running, you
should see something like:

agent            | application 12/Apr/2018:15:44:14 +0000 - search.agent.base - None - [arxiv:null] - ERROR: "Failed to get stream while waiting"
agent            | application 12/Apr/2018:15:44:14 +0000 - search.agent.base - None - [arxiv:null] - INFO: "Could not connect to stream; attempting to create"
agent            | application 12/Apr/2018:15:44:14 +0000 - search.agent.base - None - [arxiv:null] - INFO: "Created; waiting for MetadataIsAvailable again"
agent            | application 12/Apr/2018:15:44:14 +0000 - search.agent.base - None - [arxiv:null] - ERROR: "Waiting for stream MetadataIsAvailable"
localstack       | Ready.
agent            | application 12/Apr/2018:15:44:24 +0000 - search.agent.base - None - [arxiv:null] - INFO: "Ready to start"
agent            | application 12/Apr/2018:15:44:24 +0000 - search.agent.base - None - [arxiv:null] - INFO: "Starting processing from position 49583482132750299344823207796409748205413425533752967170 on stream MetadataIsAvailable and shard 0"

Note that Kinesis will be mounted locally on port 5586. It will be using SSL,
but with an invalid certificate. You can connect to this local Kinesis using:

import boto3

client = boto3.client(
    'kinesis',
    region_name='us-east-1',
    endpoint_url="https://localhost:5568",
    aws_access_key_id='foo',
    aws_secret_access_key='bar',
    verify=False
)

To verify that the agent is working correctly, try adding some records to
the stream.

import json

to_index = [
    "1712.04442",
    "1511.07473",
    "1604.04228",
    "1403.6219",
    "1404.3450",
    "1703.09067",
    "1408.6682",
    "1607.05107",
    "1509.08727",
    "1710.01597",
    "1708.07156",
    "1401.1012",
]

for document_id in to_index:
    data = bytes(json.dumps({'document_id': document_id}), encoding='utf-8')
    client.put_record(
        StreamName='MetadataIsAvailable',
        Data=data,
        PartitionKey='0'
    )

You should see these records being processed in the agent log output almost
immediately. For example:

agent            | application 12/Apr/2018:15:49:18 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447528512983060659815298629634"
agent            | application 12/Apr/2018:15:49:19 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447529721908880274444473335810"
agent            | application 12/Apr/2018:15:49:20 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447530930834699889073648041986"
agent            | application 12/Apr/2018:15:49:20 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447532139760519503702822748162"
agent            | application 12/Apr/2018:15:49:21 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447533348686339118400716931074"
agent            | application 12/Apr/2018:15:49:22 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447534557612158733029891637250"
agent            | application 12/Apr/2018:15:49:23 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447535766537978347659066343426"
agent            | application 12/Apr/2018:15:49:24 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447536975463797962288241049602"
agent            | application 12/Apr/2018:15:49:24 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447538184389617576917415755778"
agent            | application 12/Apr/2018:15:49:25 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447539393315437191546590461954"
agent            | application 12/Apr/2018:15:49:25 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447540602241256806175765168130"
agent            | application 12/Apr/2018:15:49:25 +0000 - search.agent.consumer - None - [arxiv:null] - INFO: "Processing record 49583482484923667520018808447541811167076420804939874306"

@erickpeirson
Copy link
Contributor Author

By way of vindicating myself for taking the extra time: +1,092 −998 lines in this PR. And that includes a whole lot of documentation. Read: using the MultiLangDaemon as a consumer wasn't saving us much.

@erickpeirson
Copy link
Contributor Author

Whoops; I see that this breaks populate_test_metadata.py -- working on a fix.

@erickpeirson
Copy link
Contributor Author

Fixed.

Copy link
Contributor

@eawoods eawoods left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It LIIIIIIIVES! I'm lagging behind in Python-savviness and had a bit of trouble getting started, but I did get Kinesis running and doing the test indexing for me. Glad to have gone through that step now, so that future feature testing will be smooth.

@erickpeirson erickpeirson merged commit a656a9f into develop Apr 16, 2018
@erickpeirson erickpeirson deleted the task/ARXIVNG-419 branch April 16, 2018 12:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants