News Entity Tracker is a real-time Big Data analysis project focused on extracting and analyzing named entities from current news articles using Spark, Kafka, and the ELK stack.
You can read my summarized findings here.
This project was completed in the following setting:
- University: University of Texas at Dallas
- Course: CS 6350 (Big Data Management and Analytics)
- Professor: Dr. Anurag Nagar
- Semester: Fall 2023
- Get news data from NewsAPI and send it to the RawDataTopic (
newsapi.py
) - Get news data from RawDataTopic by uploading it to a
PySpark
streaming dataframe, transform the data to keep a running count of named entities found, and send these tallies to the NamedEntitiesCountTopic in a JSON format (ex: {"named_entity":"Tesla","count":"4"}) (streamer.py
). - Setup
Logstash
to parse the data stored in NamedEntitiesCountTopic and send it to a pre-defined index inElasticsearch
. - Use
Kibana
to explore the data and create bar plots of the top 10 named entities by counts, in several intervals.
The project setup is done locally. These instructions assume you're on a Linux environment.
-
Install Apache Kafka and start Zookeeper and Kafka servers.
- Create 2 Kafka topics:
RawDataTopic
andNamedEntitiesCountTopic
.~/kafka_*/bin/kafka-topics.sh --create --topic <topic-name> --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- To see what topics are available on your Kafka server, you can use the following command:
~/kafka_*/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
- Create 2 Kafka topics:
-
Install Apache Spark.
-
Install Elasticsearch, Kibana, and Logstash.
-
Article Helps
-
In
/etc/elasticsearch/elasticsearch.yml
, do the following:# Set this to "localhost" network.host: localhost # Set this to false, ONLY if you face authentication issues with Elasticsearch xpack.security.enabled: false
-
Make sure the user and group of the
/usr/share/logstash/data
folder is bylogstash
. If not, use the following commands to fix it:chown -R logstash.logstash /usr/share/logstash
chmod 777 /usr/share/logstash/data
-
-
Get an API Key from NewsAPI.
-
Install project dependencies.
- Additionally, download spaCy model:
python -m spacy download en_core_web_sm
- NOTE: For this project, I am using Python 3.11, however Python 3.9+ should work.
- Additionally, download spaCy model:
-
Setup environment variables.
- Rename
config/config.env
toconfig/.env
and fill in the variables.
- Rename
-
Setup Logstash script.
- Copy
config/logstash.conf
to the config directory ofLogstash
. - Example:
cp config/logstash.conf /etc/logstash/conf.d/
- Copy
- Start up Elasticsearch and Kibana services.
sudo service elasticsearch start
- To check if Elasticsearch has started:
sudo service elasticsearch status
- Verify Elasticsearch is working:
curl http://localhost:9200/
- To check if Elasticsearch has started:
sudo service kibana start
- To check if Kibana has started:
sudo service kibana status
- To check if Kibana has started:
- Create an index in Elasticsearch for named entity data to be sent to.
curl -X PUT "localhost:9200/named_entities?pretty
- Go to home directory of
Logstash
and run Logstash with the configuration file:bin/logstash -f /etc/logstash/conf.d/logstash.conf
- To find the home directory of Logstash:
whereis -b logstash
- To find the home directory of Logstash:
- Start the streaming script by submitting it to the Spark cluster.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<your-spark-version> src/streamer.py
- Start the NewsAPI script:
python src/newsapi.py
- It is currently configured to fetch from the API every 60 mins, as fetching any earlier will result in duplicate news.
- Data should slowly start populating in Elasticsearch. You can access the data in Kibana at http://localhost:5601/.
- Currently, the count field in Kibana is of a text property, so aggregations won't work. In order to do proper visualizations (like bar plots), you need to re-index the
count
field and cast it to an integer.- Create conversion pipeline.
curl -XPUT "http://localhost:9200/_ingest/pipeline/convert_pipeline" -H 'Content-Type: application/json' -d '{"description":"Converts count field to integer","processors":[{"convert":{"field":"count","type":"integer"}}]}'
- Re-index.
curl -XPOST "http://localhost:9200/_reindex" -H 'Content-Type: application/json' -d '{"source":{"index":"named_entities"},"dest":{"index":"named_entities_converted","pipeline":"convert_pipeline"}}'
- Create conversion pipeline.
- Currently, the count field in Kibana is of a text property, so aggregations won't work. In order to do proper visualizations (like bar plots), you need to re-index the