The main focus is the orchestration of these technologies by an example of using machine learning for classifying the sentiment of Twitter messages using MLlib.
The fundamental idea of sentiment classification used in this template is based on the paper by Alec Go et al..
Make sure that you have Java 8, either Sbt or Typesafe Activator and Node.js already installed on your machine. You should have at least two cores available on this machine since Spark streaming (used by the OnlineTrainer
) will occupy one core. Hence, to be able to process the data the application needs at least one more resource.
- Clone this repository:
git clone etc.
- Change into the newly created directory:
cd spark-mlib-scala-play
- Insert your Twitter access token and consumer key/secret pairs in
application.conf
. For generating a token, please refer to dev.twitter.com. By default the application runs in single-user-mode which means the access tokens configured in yourapplication.conf
respectivelylocal.conf
will be also used for querying Twitter by keywords. This is fine when you run the application locally. Note: If you want to run the application in production mode you would have to turn single-user-mode off so that OAuth per user is used instead. To do so change the line in yourconf/application.conf
totwitter.single-user-mode = no
. Also make sure to provide an application secret. - Launch SBT:
sbt run
. - Navigate your browser to: http://localhost:9000
- If necessary change the
twitter.redirect.url
inapplication.conf
to the url the application actually uses - If necessary (if twitter changes the url to its fetch tweets service) change the twitter.fetch.url in application.conf to the new one. Ensure that the last url parameter is the query string, the application will append the keyword at the end of the url.
If starting the application takes a very long time or even times out it may be due to a known Activator issue.
In that case do the following before starting with sbt run
.
- Delete the
project/sbt-fork-run.sbt
file - Remove the line
fork in run := true
(added automatically when you start activator) from the bottom ofbuild.sbt
Without the fork option, which is needed by Activator the application should start within a few seconds.
The following outline demonstrates how the actor communication workflow for classification looks like:
The Application controller serves HTTP requests from the client/browser and obtains ActorRefs
for EventServer
, StatisticsServer
and Director
.
The Director is the root of the Actor hierarchy, which creates all other durable (long lived) actors except StatisticsServer
and EventServer
. Besides supervision of the child actors it builds the bridge between Playframework and Akka by handing over the Classifier
ActorRefs
to the controller. Moreover, when trainings of the estimators within BatchTrainer
and OnlineTrainer
are finished, this actor passes the latest Machine Learning models to the StatisticsServer
(see Figure below). For the OnlineTrainer
statistics generation is scheduled every 5 seconds.
The Classifier creates a FetchResponseHandler
actor and tells the TwitterHandler
with a Fetch
message (and the ActorRef
of the FetchResponseHandler
) to get the latest Tweets by a given keyword or query.
Once the TwitterHandler has fetched some Tweets, the FetchResponse
is sent to the FetchResponseHandler
.
The FetchResponseHandler creates a TrainingModelResponseHandler
actor and tells the BatchTrainer
and OnlineTrainer
to pass the latest model to TrainingResponseHandler
. It registers itself as a monitor for TrainingResponseHandler
and when this actor terminates it stops itself as well.
The TrainingModelResponseHandler collects the models and vectorized Tweets makes predictions and sends the results to the original sender (the Application
controller). The original sender is passed through the ephemeral (short lived) actors, indicated by the yellow dotted line in the figure above.
The following outline demonstrates how the actors involved in training the machine learning estimators and serving statistics about their predictive performance:
The BatchTrainer receives a Train
message as soon as a corpus (a collection of labeled Tweets) has been initialized. This corpus is initialized by the CorpusInitializer and can either be created on-the-fly via Sparks TwitterUtils.createStream
(with automatic labeling by using emoticons ":)" and ":(") or a static corpus provided by Sentiment140 which is read from a CSV file. Which one to choose can be configured via ml.corpus.initialization.streamed
in application.conf
. For batch training we use the high-level org.apache.spark.ml
API. We use Grid Search Cross Validation to get the best hyperparameters for our LogisticRegression
model.
The OnlineTrainer receives a Train
message with a corpus (an RDD[Tweet]
) upon successful initialization just like the BatchTrainer
. For the online learning approach we use the experimental StreamingLogisticRegressionWithSGD
estimator which, as the name implies, uses Stochastic Gradient Descent to update the model continually on each Mini-Batch (RDD) of the DStream
created via TwitterUtils.createStream
.
The StatisticsServer receives {Online,Batch}TrainerModel
messages and creates performance metrics like Accuracy, Area under the ROC Curve and so forth which in turn are forwarded to the subscribed EventListener
s and finally sent to the client (browser) via Web Socket.
The EventListener s are created for each client via the Playframeworks built-in WebSocket.acceptWithActor
. EventListener
s subscribe for EventServer
and StatisticsServer
. When the connections terminate (e.g. browser window is closed) the respective EventListener
shuts down and unsubscribes from EventServer
and/or StatisticsServer
via postStop()
.
The EventServer is created by the Application
controller and forwards event messages (progress of corpus initialization) to the client (also via Web Socket).