Skip to content
Closed
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions examples/src/main/r/data-manipulation.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


# Load SparkR library into your R session
library(SparkR)

## Initialize SparkContext on your local PC
sc <- sparkR.init(master = "local", appName = "MyApp")

## Initialize SQLContext
sqlContext <- SparkRSQL.init(sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be sparkRSQL and not SparkRSQL


# For this example, we shall use the "flights" dataset
# The data can be downloaded from: https://s3-us-west-2.amazonaws.com/sparkr-data/flights.csv
# The dataset consists of every flight departing Houston in 2011.
# The data set is made up of 227,496 rows x 14 columns.


# Option 1: Create an R data frame and then convert it to a SparkR DataFrame -------

## Create R dataframe
install.packages("data.table") #We want to use the fread() function to read the dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would read.csv that is part of base R also work for this ? I know that data.table is more efficient, but I would like to avoid installing new of packages in the example.

library(data.table)

flights_df <- fread("flights.csv")
flights_df$date <- as.Date(flights_df$date)

## Convert the local data frame into a SparkR DataFrame
flightsDF <- createDataFrame(sqlContext, flights_df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I tried to run this locally and this step is very slow for the dataset we are using here (I filed https://issues.apache.org/jira/browse/SPARK-8277) due to the way we convert local data frames to lists.

I see two options here: (1) Use fewer rows in the example file, so that this runs fast or (2) use a different dataset to demonstrate creating a SparkR DataFrame from a local dataframe (the CSV reader is fine)

Let me know which you think is better.


# Option 2: Alternatively, directly create a SparkR DataFrame from the source data
flightsDF <- read.df(sqlContext, "flights.csv", source = "csv", header = "true")

# Print the schema of this Spark DataFrame
printSchema(flightsDF)

# Cache the DataFrame
cache(flightsDF)


# Install the magrittr pipeline operator
install.packages("magrittr")
library(magrittr)

# Print the first 6 rows of the DataFrame
showDF(flightsDF, numRows = 6) ## Or
head(flightsDF)

# Show the column names in the DataFrame
columns(flightsDF)

# Show the number of rows in the DataFrame
count(flightsDF)

# Show summary statistics for numeric colums
describe(flightsDF)

# Select specific columns
destDF <- select(flightsDF, "dest", "cancelled")

# Using SQL to select columns of data
# First, register the flights DataFrame as a table
registerTempTable(flightsDF, "flightsTable")
destDF <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable")

# Use collect to create a local R data frame
dest_df <- collect(destDF)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets name this local_df just to be clear ?


# Print the newly created local data frame
print(dest_df)

# Filter flights whose destination is JFK
jfkDF <- filter(flightsDF, "dest == JFK") ##OR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "dest == JFK" doesn't work as the string JFK needs to be quoted. So this should be something like jfkDF <- filter(flightsDF, "dest = \"JFK\""). Note that this needs to be a single = for things to work with this syntax.

cc @davies we should probably make = and == work here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't, the expression should be either SQL or R, if we support == here, it's not SQL or R, will introduce other corner cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - so right now the convention is that the string syntax is SQL and the normal syntax is R ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes:)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we can be error tolerant and support == in the parser though. I can't think of a corner case that'd be problematic.

jfkDF <- filter(flightsDF, flightsDF$dest == JFK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "JFK" should be in quotes here


# Group the flights by date and then find the average daily delay
# Write the result into a DataFrame
groupBy(flightsDF, "date") %>%
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should also go inside the if block

avg(dep_delay = "avg", arr_delay = "avg") -> dailyDelayDF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be agg(dep_delay... or to be more clear summarize(dep_delay = ...


# Stop the SparkContext now
sparkR.stop()