Skip to content

Commit

Permalink
Completed
Browse files Browse the repository at this point in the history
  • Loading branch information
Anish-Malhotra committed May 16, 2023
1 parent 6cba6de commit bc18afe
Show file tree
Hide file tree
Showing 9 changed files with 51,722 additions and 0 deletions.
Empty file added __init__.py
Empty file.
12 changes: 12 additions & 0 deletions configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dataclasses import dataclass


@dataclass
class IndexConfiguration:
cluster: str
index: str
username: str
password: str
filepath: str
verbose: bool
exit_on_error: bool
Empty file added core/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions core/document_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from datetime import datetime

from dataclasses import dataclass, field
from dataclasses_json import config, dataclass_json


def _date_decoder(date_str: str) -> datetime:
return datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S %z")

def _date_encoder(dt: datetime) -> str:
return dt.strftime('%Y-%m-%dT%H:%M:%S%z')


@dataclass_json
@dataclass
class NginxLog:
time: datetime = field(
metadata=config(
decoder=_date_decoder,
encoder=_date_encoder
)
)
remote_ip: str
remote_user: str
request: str
response: int
bytes: int
referrer: str
agent: str


_NGINX_LOG_MAPPING = {
"mappings": {
"properties": {
"time": {
"type": "date",
},
"remote_ip": {"type": "text"},
"remote_user": {"type": "text"},
"request": {"type": "text"},
"response": {"type": "long"},
"bytes": {"type": "long"},
"referrer": {"type": "text"},
"agent": {"type": "text"},
}
},
}


INDEX_NAME_TO_DOCUMENT_TYPE = {
'nginx': NginxLog
}

INDEX_NAME_TO_DOCUMENT_MAPPING = {
'nginx': _NGINX_LOG_MAPPING
}
108 changes: 108 additions & 0 deletions core/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import asyncio
from itertools import islice

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

from core.document_models import INDEX_NAME_TO_DOCUMENT_MAPPING, INDEX_NAME_TO_DOCUMENT_TYPE
from configuration import IndexConfiguration


# The queue size is the number of documents that will be indexed using
# Elasticsearch's bulk indexing interface. The input file has ~52K lines which totals
# to approximately 12MB of storage. As such, bulk-indexing 21500 documents is about ~5MB per batch,
# as recommended by Elasticsearch to begin with: https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html

# Finally, we also use this queue size as the chunk size (# lines) when reading the input JSON file.
_MAX_QUEUE_SIZE = 21500


async def load_and_index(config: IndexConfiguration, client: Elasticsearch):
# Exit early if the index name provided is invalid per our mappings in 'document_models.py'
if not config.index in INDEX_NAME_TO_DOCUMENT_TYPE \
or config.index not in INDEX_NAME_TO_DOCUMENT_MAPPING:
raise ValueError(f"Configuration not found for index {config.index}")

# Create the index if it doesn't already exist and we have a mapping for it
client.indices.create(index=config.index, ignore=400, mappings=INDEX_NAME_TO_DOCUMENT_MAPPING[config.index])

# create a shared queue
queue = asyncio.Queue(maxsize=_MAX_QUEUE_SIZE)

# start the consumer which reads from the queue
_ = asyncio.create_task(consumer(config, queue, client))

# start the producer which writes to the queue
await asyncio.create_task(producer(config, queue))

# wait for all items to be processed and then close the client
client.close()


def data_generator(index: str, documents: list):
docs = [doc.to_json() for doc in documents]
for doc in docs:
yield {
"_index": index,
"_source": doc
}


# This function is called synchronously by the consumer coroutine, which means that
# when it's triggered, we've already marked the documents as processed and allow our producer to
# continue parsing the JSON file as we bulk index the documents
def index(index: str, documents: list, client: Elasticsearch):
return bulk(client, data_generator(index, documents), stats_only=True)


# Consumes parsed documents from the queue asynch and synchronously triggers a bulk index
async def consumer(config: IndexConfiguration, queue: asyncio.Queue, client: Elasticsearch):
documents_total = 0
successful = 0
while True:
# Wait for the producer to tell us we're ready to process a batch of documents
if not queue.full():
await asyncio.sleep(1)
else:
documents = []
while not queue.empty():
# Pop the documents from the queue one-by-one and add them to a generator
# for bulk indexing through Elasticsearch
document = queue.get_nowait()
if document:
documents.append(document)
documents_total += 1
# Mark that we've processed this document (so the producer knows to continue loading from file)
queue.task_done()
try:
successful += index(config.index, documents, client)[0]
except Exception as exc:
if config.exit_on_error:
raise exc
else:
if config.verbose:
print (f"Encountered exception: {str(exc)}")
print (f"Processed batch: results {successful}/{documents_total} documents successfully indexed")


# Parses the input JSON file in chunks and adds them to a queue for indexing
# Although this is a fixed file, we're treating it as a stream so it should be easily
# extendable to a poller or webhook that processes new records hitting an HTTP endpoint
async def producer(config: IndexConfiguration, queue: asyncio.Queue):
with open(config.filepath, 'r') as f:
while True:
chunk = list(islice(f, _MAX_QUEUE_SIZE))
if not chunk:
# We've reached EOF, so let's exit the coroutine
break
for item in chunk:
# Determines the appropriate document class for the specified index name
# and then converts the JSON entry to that document type and adds it to the queue
document = INDEX_NAME_TO_DOCUMENT_TYPE[config.index].from_json(item)
queue.put_nowait(document)
while not queue.full():
# In case we've reached EOF before the queue is full, we add dummy values
# so that the consumer still gets the signal to process the remaining documents
queue.put_nowait(None)
# Block until the consumer has bulk-processed the thus-far loaded documents
await queue.join()
37 changes: 37 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import argparse
import asyncio

from elasticsearch import Elasticsearch

from configuration import IndexConfiguration
from core.runner import load_and_index


def create_client(configuration: IndexConfiguration) -> Elasticsearch:
return Elasticsearch(
hosts=[configuration.cluster],
http_auth=[configuration.username, configuration.password],
)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog="index.py",
usage="%(prog)s [options]",
description="Indexes NGINX request/response data to a given Elasticsearch cluster/index name",
)

parser.add_argument('-c', '--cluster', dest="cluster", type=str, required=True, help="Elasticsearch API URL")
parser.add_argument('-i', '--index', dest="index", type=str, required=True, default='nginx', help="The Elasticsearch index name")
parser.add_argument('-u', '--username', dest="username", type=str, required=True, help="API username")
parser.add_argument('-p', '--password', dest="password", type=str, required=True, help="API password")
parser.add_argument('-f', '--file', dest="filepath", type=str, required=True, default="nginx.json", help="Path to JSON file to index")
parser.add_argument('-v', '--verbose', dest="verbose", action='store_true', default=False, help="Verbose output")
parser.add_argument('-e', '--error', dest="exit_on_error", action='store_true', default=False, help="Break on error")

config = IndexConfiguration(**vars(parser.parse_args()))

client = create_client(config)

asyncio.run(load_and_index(config, client))

26 changes: 26 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Welcome to my attempt at the Data Engineer II take-home project.

Here I've created a CLI script that asynchronously loads a JSON file in chunks to bulk index the relevant documents.

The approach is made generic so that it can be extended to any other document types or data sources (API stream, polling, etc).

Resolving the yellow status for the 'github-events' index was the most challenging part of this assignment for me.

Some research yielded the following to me:
* ```GET /_nodes``` -> reveals we have a total of 6 nodes in the cluster
* ```GET _cluster/settings?pretty``` -> reveals we don't have an allocation policy setup (auto-rebalance, etc)
* ```GET github-events/_settings``` -> reveals we only allow 1 shard per node
* ```GET /_cluster/allocation/explain``` -> Shows all of the unassigned shards are replicas, and the error message is that the replicas are being stored on the same nodes as the primary shards

I increased the number of shards per node to 2 and enabled rebalancing for all indices, and that seemed to solve the issue.

Resources used:
- https://www.datadoghq.com/blog/elasticsearch-unassigned-shards/#reason-3-you-need-to-re-enable-shard-allocation
- https://opster.com/guides/elasticsearch/glossary/elasticsearch-rebalance/
- https://bonsai.io/blog/the-importance-of-shard-math-in-elasticsearch

Potential improvements:
* Run the parse/transform of the input data in a separate thread
* Integrate with the Async versions of the Elasticsearch client
* Improve error messaging/verbosity
* Add unit tests
22 changes: 22 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
aiohttp==3.8.4
aiosignal==1.3.1
async-timeout==4.0.2
asyncio==3.4.3
attrs==23.1.0
certifi==2023.5.7
charset-normalizer==3.1.0
dataclasses-json==0.5.7
elastic-transport==8.4.0
elasticsearch==8.7.0
frozenlist==1.3.3
idna==3.4
ijson==3.2.0.post0
marshmallow==3.19.0
marshmallow-enum==1.5.1
multidict==6.0.4
mypy-extensions==1.0.0
packaging==23.1
typing-inspect==0.8.0
typing_extensions==4.5.0
urllib3==1.26.15
yarl==1.9.2
Loading

0 comments on commit bc18afe

Please sign in to comment.