The objective of the project is to perform Named Entity Recognition (NER) from Reddit Comments streamed with Kafka and processed using PySpark. The NER is then pushed to another Kafka topic and top 10 NER words are displayed as bar-chart in Kibana.
The project involves setting up of Kafka, ELK, Pyspark along with code for NER processing and creating Kibana Dashboard for visualization.
Download Kafka 3.4.0 from https://downloads.apache.org/kafka/3.4.0/kafka-3.4.0-src.tgz
bin/kafka-topics.sh --create --topic reddit-comments --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --toic word-counts --bootstrap-server localhost:9092
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --list
bin/kafka-topics.sh --version
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic reddit-comments --from-beginning
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic reddit-comments
Download the ELK Packages from the below URL's
- Kibana: https://www.elastic.co/downloads/kibana
- Elastic Search: https://www.elastic.co/downloads/elasticsearch
- LogStash: https://www.elastic.co/downloads/logstash
Disable SSL in config/elasticsearch.yml
xpack.security.enabled: false
xpack.security.http.ssl:
enabled: false
Run: bin/elasticsearch
config/kibana.yml
(Make https -> http)
elasticsearch.hosts: ['http://192.168.10.21:9200']
Run: bin/kibana
Create a new file logstash.conf
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["word-counts"]
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "word-counts"
workers => 1
}
}
Run: bin/logstash -f logstash.conf
PUT
request to http://localhost:9200/_cluster/settings
Body:
"transient": {
"cluster.routing.allocation.disk.threshold_enabled": true,
"cluster.routing.allocation.disk.watermark.low": "500mb",
"cluster.routing.allocation.disk.watermark.high": "1gb",
"cluster.info.update.interval": "1m"
}
pip install pyspark==3.1.1
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 --conf spark.sql.streaming.forceDeleteTempCheckpointLocation=true spark_connector.py <CHECKPOINT_DIR> <BOOTSTRAP_SERVER> <READ_TOPIC> <WRITE_TOPIC>
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 --conf spark.sql.streaming.forceDeleteTempCheckpointLocation=true spark_connector.py /tmp/checkpoint localhost:9092 reddit-comments word-counts
bash start_pyspark_connector.sh
NOTE: Make sure the spark-sql-kafka
package version matches that of pyspark
comments_producer.py
will keep streaming COMMENTS from AskReddit
subreddit and push the messages to reddit-comments
topic to Kafka Broker running in localhost:9092
python -u comments_producer.py reddit-comments localhost:9092
Setting up Reddit for streaming, please refer the url https://www.geeksforgeeks.org/how-to-get-client_id-and-client_secret-for-python-reddit-api-registration/
NOTE: Make sure you have .env
file configured properly as below.
user_name=
password=
client_id=
secret=
spart_connector.py
will read messages from reddit-comments
topic and filters the Named Entities from each comments and push it to word-counts
topic.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 --conf spark.sql.streaming.forceDeleteTempCheckpointLocation=true spark_connector.py
bash start_pyspark_connector.sh