- Pinterest Data Pipeline
Pinterest performsn daily experiments on historical and daily acquired data to create more value for the customers. Goal of this project is to replicate the workflow using AWS Cloud infrastrcture.
Following figure shows a high-level overview of the pipeline:
user_posting_emulation.py
script connects to a RDS database containing data resembling Pinterest API with a POST request.
Table Name | Columns |
---|---|
pinterest_data |
['index', 'unique_id', 'title', 'description', 'poster_name', 'follower_count', 'tag_list', 'is_image_or_video', 'image_src', 'downloaded', 'save_location', 'category'] |
geolocation_data |
['ind', 'timestamp', 'latitude', 'longitude', 'country'] |
user_data |
['ind', 'first_name', 'last_name', 'age', 'date_joined'] |
I built Databricks dashboard to present to stakeholders for the analytics queries. The results to date can be viewed here: Analytics Dashboard
If you can't see the interactive HTML, please see the snapshot of the analytics dashboard here:
- Save identity file
*.pem
locally using secret for SSH connection in Parameter Store in AWS Account. - Using public IP,
ec2-user
and identity file above connect via SSH withssh -i <identity file> ec2-user@<EC2 instance public IP>
- Login to EC2 Shell via SSH
- Install Java 11
- Install and Configure Kafka 2.8.1
- Download from Kafka and extract archive.
- Download and add IAM MSK authentication package to
CLASSPATH
- Configure EC2 Client Authentication for MSK
- Add EC2 Access Role ARN to trust relationship in IAM Roles as a principal.
- Update
client.properties
in EC2 Kafka instance configuring bootstrap server.
- Copy Kafka connector to your users S3 bucket under
kafka-connect-s3
. - Create custom plugin using MSK Connect console with name
<userID>-plugin
. - Create MSK Connector with name
<userID>-connector
using following configuration:connector.class=io.confluent.connect.s3.S3SinkConnector s3.region=us-east-1 flush.size=1 schema.compatibility=NONE tasks.max=3 topics.regex=<YOUR_UUID>.* format.class=io.confluent.connect.s3.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner value.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter storage.class=io.confluent.connect.s3.storage.S3Storage key.converter=org.apache.kafka.connect.storage.StringConverter s3.bucket.name=<BUCKET_NAME>
- Create AWS API with
{proxy+}
endpoint and HTTP ProxyANY
method. - Deploy the API using AWS Old Console Interface and note down public invoke URL.
- On EC2 Kafka instance, setup Confluent Kafka-Rest Proxy Client
- Download and extarct Confluent
- Donwnload
aws-msk-iam-auth
Java class libray and place it inside Kafkalibs
- Add the library to Java
$CLASSPATH
environment variable.
IMPORTANT: Make sure to setCLASSPATH
and runkafka-rest
with same user eitherroot
orec2-user
. - Run Kafka rest server and send JSON data via POST request using following:
./kafka-rest-start /home/ec2-user/confluent-7.2.0/etc/kafka-rest/kafka-rest.properties
Run this with sudo ifCLASSPATH
is set asroot
.
When user sends the request via public API gateway, the API then calls the backend service running on our EC2 instance. This results connection to MSK Cluster and data is stored to the designated S3 bucket.
We use datalakes in Databricks for processing the pinterests posts data.
- Connect S3 bucket to Databricks using credentials CSV file.
- Note: For this project creation of credentials and uploading to databricks is already done.
- Mount S3 bucket in databricks using notebook and credentials CSV file.
- Prepare paths for each of the
pin
,geo
, anduser
JSON data files. - Load these datasets into dataframes
df_pin
,df_geo
anddf_user
.- Notebook for the process is
PinterestPipeline/0a1d8948160f_PinterestDataPipeline.ipynb
- Notebook for the process is
Databricks provides a pre-configured Apache Spark environment in Notebook format. It supports automatic state persistence which can always resume where you left off. This avoids rerunning any previous ETL/ELT steps.
For this data pipeline, we first clean the three datasets loaded in df_pin
, df_geo
and df_user
.
- Filter
unique_id
column which contain uuid lenght 36. - Clean and transform
follower_count
:- First, we filter the rows which match the valid follower count regex
r'[0-9]{1,}[kM]?'
. - Then we transform the follower count in
k
to thousands andM
to millions, making all values integer format.
- First, we filter the rows which match the valid follower count regex
- Rename
index
column toind
. - Reorder the columns in the dataframe as:
'ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category'
- Finally, we replace empty and not-applicable cells with None followed by dropping duplicate rows.
- Create a new column
coordinates
as an array oflongitude
andlatitude
followed by dropping the contituent columns. - Convert
timestamp
in string to timestamp format. - Reorder the columns of the dataframe in the following order:
'ind', 'country', 'coordinates', 'timestamp'
- Create a new column
user_name
by concatenatingfirst_name
andlast_name
followed by dropping the contituent columns. - Convert
date_joined
in string to timestamp format. - Reorder the columns of the dataframe in the following order:
'ind', 'user_name', 'age', 'date_joined'
.
All the required analytics queries are written in the Databricks notebook using PySpark SQL API. Pandas API on PySpark is not as well supported as doing the analysis directly in Pandas.
Few key observations on Databricks PySpark SQL API:
- Syntactically, it is organised as functional programming technique.
- Operations can be chained as in functional paradigm where output of one function becomes input of next and so on.
- The operations are lazy evaluations:
- The query is prepared and checked which provides the outcome without the data.
- It is executed only when the data needs to be evaluated such as exporting or displaying the output.
- Most commonly used functions on a DataFrame are:
withColumn
,groupBy
,agg
,filter
,alias
andorderBy
. join
is used to combine two dataframes with common key identical to SQL Joins.display(dataframe)
is used to show the output dataframe in an interactive table format.
Databricks provides a feature to add the output of cells to be placed in a live dashboard. Following figure shows partial dashboard for our pinterest datapipeline.
The dashboard tables or graphs can be updated by running the query. The analysis can also be scheduled to run such as every week for each region to check outcome of A/B experiments. It can be used to present analysis to stakeholders. In contrast to data warehousing, this approach provides faster development and analysis. As the queries are directly performed on the datastore without additional ETL steps allows rapid iterations of experiments.
Airflow is an orchestrator for data engineeing workflow. A step in the workflow is defined using tasks, these tasks are performed in order defined by the directed acyclic graph (DAG). The DAG is written as a Python script which is then scheduled and executed by the Airflow scheduler.
Airflow monitor the tasks, save logs of the outcome and also provides a web interface for administration.
Data can be transferred between the tasks using XCom
API and airflow Variables
can be used to set environment information.
Airflow supports connectors for various softwares and services such as PostgreSQL, MongoDB, Databricks, AWS, Azure etc. These are called 'Apache Airflow Providers' which are listed at: Apache Airflow Providers
AWS provides Managed Workflow for Apache Airflow to simplify the service deployment and orchestration.
In Databricks, we can use spark.readStream
API for reading streaming data.
spark.readStream.
.format('json') # data source format e.g. kafka, parquet, csv
.option('option_name', 'option_value') # Customisation, e.g. 'maxFilesPerTrigger', 1 OR kafka topic when using Kafka as source
.schema(data_scheme) # specifying schema speeds up reading
.load('source_path') # Specify the path on dbfs, url or directory
- Import necessary types from
pyspark.sql
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
- Define the Schema using
StructType
. E.g.StructType([ StructField('field-1', IntegerType(), nullable=True) StructField('field-2', StringType(), nullable=True) StructField('field-3', TimestampType(), nullable=True) ])
%fs ls /databricks-datasets/structured-streaming/events
NOTE: Kinesis stream currently does not support predefined schema.
data_path = r'/databricks-datasets/structured-streaming/events/'
data_schema = StructType([
StructField("time", TimestampType(), True),
StructField("action", StringType(), True)
])
streamed_data = spark.readStream\
.format('json').schema(data_schema)\
.option('maxFilesPerTrigger', 1).load(data_path)
clean_stream = perform_cleaning(streamed_data)
clean_stream.writeStream\
.type('delta')
.tablename('pinterest_posts')
https://docs.databricks.com/en/notebooks/notebook-workflows.html
Only Available in Databricks Runtime (DBR) 11.2+ https://docs.databricks.com/en/notebooks/share-code.html
We setup three streams in Kinesis for the three topics pin
, geo
and user
as shown below in the AWS Kinesis Console.
These streams are publicly accessble via an AWS API Gateway. The http methods POST
, PUT
and DELETE
are used for
streaming data in JSON format throught the data pathway.
An overview of API tree is below: