Skip to content

Commit

Permalink
* Adds a simple scanner to the social-media app that uses Twitter's
Browse files Browse the repository at this point in the history
  public stream API to search for tweets that match a simple filter and
  puts them through the classification pipeline

* Moves the configuration for the social-media app from multiple json
  files into a single python file and documents it

* Fixes typos and other mistakes in README files as well as the
  mechanical turk template

* Renames a function in the lambda code to reflect what the function
  actually does

* Fixes a small bug where the create-lambda-function script would fail
  if a kinesis stream already existed with the configured name
  • Loading branch information
Cesar Romero committed Oct 1, 2015
1 parent 45095c6 commit d7cf7c3
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 273 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Machine Learning Samples
# Amazon Machine Learning Samples

Each subdirectory contains sample code for using Amazon Machine Learning.
Refer to the `README.md` file in each sub-directory for details on using
each sample.

## Targeted Marketing Samples

These samples show how to use the Amazon Machine Learning API for a
These samples show how to use the Amazon Machine Learning API for a
targeted marketing application. This follows the "banking" dataset example
described in the Developer Guide. There are three versions available:

Expand All @@ -31,7 +31,7 @@ allowing delivery to email, SMS text messages, or other software services.

## Mobile Prediction Samples

These samples show how to use the Amazon Machine Learning API to make
These samples show how to use the Amazon Machine Learning API to make
real-time predictions from a mobile device. There are two versions available:

* [Real-time Machine Learning Predictions from iOS](mobile-ios/)
Expand All @@ -43,4 +43,3 @@ real-time predictions from a mobile device. There are two versions available:
A collection of simple scripts to help with common tasks.

* [Machine Learning Tools (python)](ml-tools-python/)

79 changes: 50 additions & 29 deletions social-media/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Turk, Amazon Kinesis, AWS Lambda, and Amazon Simple Notification Service
The example walks through the following steps:

1. [Gather training data](#step1)
2. [Label training data with Mechanical Turk](#step2)
2. [Label training data with Amazon Mechanical Turk](#step2)
3. [Create the ML Model](#step3)
4. [Configure the model](#step4)
5. [Set up continuous monitoring](#step5)
Expand Down Expand Up @@ -50,7 +50,12 @@ Once those are installed, execute
The script uses `npm` and python's `virtualenv` to setup the required dependencies
and environment variables in the current shell.

Following scripts depend on python boto library. See [instructions](http://boto.readthedocs.org/en/latest/boto_config_tut.html) on how setup credentials for boto in ~/.aws/credentials. See [instructions](http://blogs.aws.amazon.com/security/post/Tx1R9KDN9ISZ0HF/Where-s-my-secret-access-key) on how to get AWS credentials. The AWS user that you choose, needs access to a subset of the following policy to run the scripts:
The following scripts depend on the python boto library. See
[instructions](http://boto.readthedocs.org/en/latest/boto_config_tut.html)
on how setup credentials for boto in ~/.aws/credentials. See
[instructions](http://blogs.aws.amazon.com/security/post/Tx1R9KDN9ISZ0HF/Where-s-my-secret-access-key)
on how to get AWS credentials. The AWS user that you choose, needs
access to a subset of the following policy to run the scripts:

{
"Statement": [
Expand All @@ -61,6 +66,7 @@ Following scripts depend on python boto library. See [instructions](http://boto.
"machinelearning:*",
"kinesis:*",
"lambda:*",
"s3:*",
"sns:*"
],
"Resource": [
Expand All @@ -72,18 +78,17 @@ Following scripts depend on python boto library. See [instructions](http://boto.

## <a name="step1"></a> Step 1: Gathering training data

To gather the training data, run the `gather-data.sh` command as follows:
To gather the training data, run the following command:

python gather-data.py @awscloud
python gather-data.py @awscloud

Substituting your company's Twitter handle instead of `@awscloud`.
This will write the tweets to a file called `line_separated_tweets_json.txt`.

Also, you need to create a file called `twitter.credentials.json` with your
own Twitter API keys. See
[https://dev.twitter.com/oauth/overview/application-owner-access-tokens](https://dev.twitter.com/oauth/overview/application-owner-access-tokens)
for how to set these values up.
Substitute your company's twitter handle instead of @awscloud and
configure your Twitter API credentials in config.py. Learn how to
obtain your credentials
[https://dev.twitter.com/oauth/overview/application-owner-access-tokens](here).

This will produce a file called `line_separated_tweets_json.txt` that
other scripts will read later.

## <a name="step2"></a> Step 2: Label training data with Mechanical Turk

Expand All @@ -107,12 +112,12 @@ convert it to a CSV format that Mechanical Turk can use. Do this by running:
python build-mturk-csv.py

This will consume the `line_separated_tweets_json.txt` file and output a file called
`mturk_unlabeled_dataset.csv`.
`mturk_unlabeled_dataset.csv`.

### Step 2b: Submit the job to MTurk

Use the [Mechanical Turk Console](https://www.mturk.com/mturk/welcome) to create a set
of Human Intelligence Tasks (HITs) to assign labels to these tweets. Turkers will be
of Human Intelligence Tasks (HITs) to assign labels to these tweets. Turkers will be
asked to pick which label best applies to the tweet amongst:

* Request
Expand Down Expand Up @@ -153,8 +158,8 @@ between three different human opinions on the tweet.

### Step 2c: Processing the output from MTurk

Once all of your Turk HITs are complete, [download the results](https://requester.mturk.com/batches) into a file
called `mturk_labeled_dataset.csv`. Then run the script
Once all of your Turk HITs are complete, [download the results](https://requester.mturk.com/batches) into a file
called `mturk_labeled_dataset.csv`. Then run the script

python build-aml-training-dataset.py

Expand All @@ -165,7 +170,7 @@ to convert the 3 HIT responses for each tweet into a single dataset with a binar
Once you have your labelled training data in CSV format, creating the ML model requires a few
API calls, which are automated in this script:

python create-aml-model.py labeled.csv aml_training_dataset.csv aml_training_dataset.csv.schema s3-bucket-name s3-key-name
python create-aml-model.py aml_training_dataset.csv aml_training_dataset.csv.schema s3-bucket-name s3-key-name

This utility creates a machine learning model that performs binary classification.
Requires input dataset and corresponding scheme specified through file names in
Expand Down Expand Up @@ -220,15 +225,11 @@ appropriate values. Description of the configuration required in
constraints.
* *lambdaFunctionName* : The name being given to the Lambda function. See
[docs](http://docs.aws.amazon.com/lambda/latest/dg/API_UploadFunction.html) for constraints.
* *lambdaExecutionPolicyName* : The name being given to the execution policy
* *lambdaExecutionRole* : The name being given to the execution role
used by the lambda function. See
[docs](http://docs.aws.amazon.com/lambda/latest/dg/lambda-introduction.html#lambda-intro-execution-role)
for details. See [docs](http://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateRole.html)
for constraints.
* *lambdaInvocationPolicyName* : The name being given to the invocation policy
used by the lambda function. See
[docs](http://docs.aws.amazon.com/lambda/latest/dg/lambda-introduction.html#lambda-intro-invocation-role)
for details. See [docs](http://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateRole.html)
for details. See
[docs](http://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateRole.html)
for constraints.
* *mlModelId* : The name of the machine learning model id which is used to
perform predictions on the tweets. This is the id of the model that is
Expand All @@ -247,7 +248,7 @@ tweets data. Use the following script to test that the setup works.

python push-json-to-kinesis.py line_separated_json.txt kinesisStreamName interval

Following parameters are needed to run this script
The following parameters are needed to run this script

* *line_separated_json.txt* : File that contains line separated json data.
* *kinesisStreamName* : Name of the stream to which the data is pushed to.
Expand All @@ -256,10 +257,30 @@ Following parameters are needed to run this script
This script merely pushes json data to the given Kinesis stream. As at this step, we have the file
from previous steps that contains line separated tweets json data, we reuse it for testing.

### Step 5c: Pushing tweets into Kinesis using Streaming Api
### Step 5c: Pushing tweets into Kinesis using Twitter's Streaming APIs

This sample doesn't currently include a production-level system to push tweets into Kinesis.
However, you can work with [GNIP](http://www.gnip.com) to connect the Twitter API to Kinesis.
Refer to their [blog post](http://support.gnip.com/code/gnip-kinesis-ami.html)
on the subject, or their
This project includes a sample app to push into Kinesis tweets that
match a simple filter using Twitter's
[public stream API](https://dev.twitter.com/streaming/public). For a
production system, you can work with [GNIP](http://www.gnip.com) to
consume streams. Refer to their
[blog post](http://support.gnip.com/code/gnip-kinesis-ami.html) on the
subject, or their
[open source code on github](https://github.com/gnip/sample-kinesis-connector).

You'll need a twitter library that supports streaming:

pip install twitter

Modify `config.py` to add a kinesis partition name, the twitter text
filter you'd like to search for, and your twitter credentials if you
haven't already done so. Then simply call the sample scanner.

python scanner.py

Tweets that match your filter will be processed in real time and
pushed to the kinesis stream. The lambda function will use the ML
model to classify these tweets and publish a notification to the
configured SNS topic with a link to any tweet that is considered
actionable. The easiest way to get these notifications is to
[subscribe your email address to the SNS topic](http://docs.aws.amazon.com/sns/latest/dg/SubscribeTopic.html).
2 changes: 1 addition & 1 deletion social-media/build-mturk-csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
This consumes the file line_separated_tweets_json.txt which is produced by
gather-data.py and produces mturk_unlabeled_dataset.csv which can be used to
generate labels using Mechanical Turk Service.
generate labels using Amazon Mechanical Turk.
"""

import codecs
Expand Down
31 changes: 31 additions & 0 deletions social-media/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# The following parameters are used to create the Amazon Kinesis
# stream, AWS Lambda function that reads from the stream and the
# Amazon SNS topic Lambda will publish to whenever a tweet is
# classified as actionable.
AWS = {
'awsAccountId' : "",
'kinesisStream' : "tweetStream",
'lambdaExecutionRole' : "tweetStreamExecutionRole",
'lambdaFunctionName' : "classifyTweet",
'mlModelId' : "",
'region' : "",
'snsTopic' : "actionableTweets"
}

# The parameters below are used by scanner.py to read tweets from the
# public stream twitter API[1] and push them to a kinesis
# stream. Configure this with twitter credentials for your own application[2]
#
# [1] https://dev.twitter.com/streaming/reference/post/statuses/filter
# [2] https://apps.twitter.com
CONSUMER_KEY = ''
CONSUMER_SECRET = ''
ACCESS_TOKEN = ''
ACCESS_TOKEN_SECRET = ''
# Learn more about how this filter is used by Twitter's API
# https://dev.twitter.com/streaming/reference/post/statuses/filter
TWITTER_FILTER = 'aws'
# Learn more about Kinesis partitions
# http://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-producers.html
KINESIS_STREAM = AWS['kinesisStream']
KINESIS_PARTITION = 'demo'
10 changes: 0 additions & 10 deletions social-media/create-lambda-function.config

This file was deleted.

75 changes: 32 additions & 43 deletions social-media/create-lambda-function.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,34 @@
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express
# or implied. See the License for the specific language governing permissions
# and limitations under the License.
import boto
import json
import os
from time import sleep
from zipfile import ZipFile

# To enable logging:
# boto.set_stream_logger('boto')
import boto
from boto.kinesis.exceptions import ResourceInUseException

import config

def read_file_as_string_then_format(file_name, args):
with open(file_name) as file_handle:
return file_handle.read().format(**args)
# To enable logging:
# boto.set_stream_logger('boto')

# Initialize the AWS clients.
sns = boto.connect_sns()
kinesis = boto.connect_kinesis()
aws_lambda = boto.connect_awslambda()
ml = boto.connect_machinelearning()

# Load the config file.
with open('create-lambda-function.config') as config:
config = json.load(config)
lambda_execution_policy = open('lambdaExecutionPolicyTemplate.json').read().format(**config.AWS)

lambda_execution_policy = read_file_as_string_then_format('lambdaExecutionPolicyTemplate.json', config)
lambda_invocation_policy = read_file_as_string_then_format('lambdaInvocationPolicyTemplate.json', config)
aws_account_id = config.AWS["awsAccountId"]
region = config.AWS["region"]
kinesis_stream = config.AWS["kinesisStream"]
sns_topic = config.AWS["snsTopic"]

aws_account_id = config["awsAccountId"]

region = config["region"]

kinesis_stream = config["kinesisStream"]
sns_topic = config["snsTopic"]

lambda_function_name = config["lambdaFunctionName"]
lambda_execution_policy_name = config["lambdaExecutionPolicyName"]
lambda_invocation_policy_name = config["lambdaInvocationPolicyName"]
lambda_function_name = config.AWS["lambdaFunctionName"]
lambda_execution_role = config.AWS["lambdaExecutionRole"]
lambda_trust_policy = '{"Statement":[{"Effect":"Allow","Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}'


Expand All @@ -59,7 +50,7 @@ def role_exists(iam, role_name):
return True


def create_policy(policy_name, assume_role_policy_document, policy_str):
def create_role(policy_name, assume_role_policy_document, policy_str):
iam = boto.connect_iam()
if role_exists(iam, policy_name):
print('Role "{0}" already exists. Assuming correct values.'.format(policy_name))
Expand All @@ -69,26 +60,26 @@ def create_policy(policy_name, assume_role_policy_document, policy_str):
iam.put_role_policy(policy_name, 'inlinepolicy', policy_str)


def create_roles_required_for_lambda():
# Create Lambda Execution Policy
create_policy(lambda_execution_policy_name, lambda_trust_policy, lambda_execution_policy)
# Create Lambda Invocation Policy
create_policy(lambda_invocation_policy_name, lambda_trust_policy, lambda_invocation_policy)
def create_stream(stream):
print('Creating Amazon Kinesis Stream: ' + stream)
try:
kinesis.create_stream(kinesis_stream, 1)
except ResourceInUseException, e:
print(e.message + ' Continuing.')


def main():
# Create roles
create_roles_required_for_lambda()
# Create Kinesis Stream
print('Creating Kinesis Stream: ' + kinesis_stream)
kinesis.create_stream(kinesis_stream, 1)
# Create SNS Topic
print('Creating SNS topic: ' + sns_topic)
# Create execution role
create_role(lambda_execution_role, lambda_trust_policy, lambda_execution_policy)
# Create Amazon Kinesis Stream
create_stream(kinesis_stream)
# Create Amazon SNS Topic
print('Creating Amazon SNS topic: ' + sns_topic)
sns.create_topic(sns_topic)
# Create and upload lambda function
# Create and upload AWS Lambda function
create_lambda_function(lambda_function_name)
# Create realtime endpoint for the ml model
ml.create_realtime_endpoint(config['mlModelId'])
ml.create_realtime_endpoint(config.AWS['mlModelId'])
# Wait for kinesis
pause_until_kinesis_active()
# Wait for 5 seconds
Expand All @@ -97,10 +88,9 @@ def main():
add_kinesis_as_source_to_lambda()
print('Kinesis stream is active now. You can start calling it.')


def create_lambda_function_zip():
with open('index.js.template') as lambda_function_template:
lf_string = lambda_function_template.read().format(**config)
lf_string = lambda_function_template.read().format(**config.AWS)
with open('index.js', 'w') as lambda_function_file:
lambda_function_file.write(lf_string)

Expand All @@ -114,11 +104,10 @@ def create_lambda_function_zip():

return zip_file_name


def upload_lambda_function(zip_file_name):
with open(zip_file_name) as zip_blob:
lambda_execution_role_arn = 'arn:aws:iam::' + \
aws_account_id + ':role/' + lambda_execution_policy_name
aws_account_id + ':role/' + lambda_execution_role
aws_lambda.upload_function(
lambda_function_name,
zip_blob.read(),
Expand Down Expand Up @@ -148,10 +137,10 @@ def add_kinesis_as_source_to_lambda():
# Add Kinesis as event source to the lambda function
print('Adding Kinesis as event source for Lambda function.')
response_add_event_source = aws_lambda.add_event_source(
event_source='arn:aws:kinesis:' + region + ':' +
aws_account_id + ':stream/' + kinesis_stream,
event_source='arn:aws:kinesis:' + region + ':' + aws_account_id
+ ':stream/' + kinesis_stream,
function_name=lambda_function_name,
role='arn:aws:iam::' + aws_account_id + ':role/' + lambda_invocation_policy_name
role='arn:aws:iam::' + aws_account_id + ':role/' + lambda_execution_role
)
event_source_id = response_add_event_source['UUID']

Expand Down
Loading

0 comments on commit d7cf7c3

Please sign in to comment.