Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
Merge pull request #129 from darroyocazorla/doc/updateExamples
Browse files Browse the repository at this point in the history
[CROSSDATA-451] Update doc/examples
  • Loading branch information
darroyo-stratio committed May 5, 2016
2 parents 8d35cbf + 04b29cd commit 1f8e399
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 19 deletions.
34 changes: 28 additions & 6 deletions doc/src/site/sphinx/First_Steps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Configuration parameters
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
| schema_samplingRatio | 1.0 | No |
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
| writeConcern | mongodb.WriteConcern.ACKNOWLEDGED | No |
| writeConcern | "safe" | No |
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
| splitSize | 10 | No |
+-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+
Expand Down Expand Up @@ -212,6 +212,18 @@ Examples
Scala API
---------

Launch the spark shell:
::

$ bin/spark-shell --packages com.stratio.datasource:spark-mongodb_2.10:<VERSION>

If you are using the spark shell, a SQLContext is already created and is available as a variable: 'sqlContext'.
Alternatively, you could create a SQLContext instance in your spark application code:

::

val sqlContext = new SQLContext(sc)

To read a DataFrame from a Mongo collection, you can use the library by loading the implicits from `com.stratio.datasource.mongodb._`.

To save a DataFrame in MongoDB you should use the saveToMongodb() function as follows:
Expand Down Expand Up @@ -249,8 +261,8 @@ In the example we can see how to use the fromMongoDB() function to read from Mon
val readConfig = builder.build()
val mongoRDD = sqlContext.fromMongoDB(readConfig)
mongoRDD.registerTempTable("students")
sqlContext.sql("SELECT name, age FROM students")

val dataFrame = sqlContext.sql("SELECT name, age FROM students")
dataFrame.show


If you want to use a SSL connection, you need to add this 'import', and add 'SSLOptions' to the MongodbConfigBuilder:
Expand All @@ -268,9 +280,9 @@ Using StructType:

import org.apache.spark.sql.types._
val schemaMongo = StructType(StructField("name", StringType, true) :: StructField("age", IntegerType, true ) :: Nil)
sqlContext.createExternalTable("mongoTable", "com.stratio.datasource.mongodb", schemaMongo, Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students"))
val df = sqlContext.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students")).load
df.registerTempTable("mongoTable")
sqlContext.sql("SELECT * FROM mongoTable WHERE name = 'Torcuato'").show()
sqlContext.sql("DROP TABLE mongoTable")


Using DataFrameWriter:
Expand Down Expand Up @@ -319,9 +331,19 @@ Then:
::

from pyspark.sql import SQLContext
sqlContext.sql("CREATE TEMPORARY TABLE students_table USING com.stratio.datasource.mongodb OPTIONS (host 'host:port', database 'highschool', collection 'students')")
sqlContext.sql("CREATE TEMPORARY TABLE students_table USING com.stratio.datasource.mongodb OPTIONS (host 'localhost:27017', database 'highschool', collection 'students')")
sqlContext.sql("SELECT * FROM students_table").collect()

Using DataFrameReader and DataFrameWriter:
::

df = sqlContext.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='highschool', collection='students').load()
df.select("name").collect()

df.select("name").write.format("com.stratio.datasource.mongodb").mode('overwrite').options(host='localhost:27017', database='highschool', collection='studentsview').save()
dfView = sqlContext.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='highschool', collection='studentsview').load()
dfView.show()

Java API
--------

Expand Down
2 changes: 1 addition & 1 deletion spark-mongodb-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

<properties>
<scala.binary.version>2.10</scala.binary.version>
<mongodb.datasource.version>0.11.0-SNAPSHOT</mongodb.datasource.version>
<mongodb.datasource.version>0.11.2-RC1-SNAPSHOT</mongodb.datasource.version>
<spark.version>1.5.2</spark.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ object MongodbClientFactory {
}
}

// TODO Review when refactoring config
def extractValue[T](options: Map[String, Any], key: String): Option[T] =
options.get(key).map(_.asInstanceOf[T])
options.get(key.toLowerCase).map(_.asInstanceOf[T])

def sslBuilder(optionSSLOptions: Option[MongodbSSLOptions]): Boolean =
optionSSLOptions.exists(sslOptions => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.stratio.datasource.util.Config._
/**
* Values and Functions for access and parse the configuration parameters
*/
// TODO Review when refactoring config
object MongodbConfig {

// Parameter names
Expand Down Expand Up @@ -93,6 +94,7 @@ object MongodbConfig {
* @param parameters List of parameters
* @return List of parameters parsed to correct mongoDb configurations
*/
// TODO Review when refactoring config
def parseParameters(parameters : Map[String,String]): Map[String, Any] = {

// required properties
Expand Down Expand Up @@ -137,6 +139,7 @@ object MongodbConfig {
* @param readPreference string key for identify the correct object
* @return readPreference object
*/
// TODO Review when refactoring config
def parseReadPreference(readPreference: String): ReadPreference = {
readPreference.toUpperCase match {
case "PRIMARY" => com.mongodb.casbah.ReadPreference.Primary
Expand All @@ -153,18 +156,16 @@ object MongodbConfig {
* @param writeConcern string key for identify the correct object
* @return writeConcern object
*/
// TODO Review when refactoring config
def parseWriteConcern(writeConcern: String): WriteConcern = {
writeConcern.toUpperCase match {
case "FSYNC_SAFE" => com.mongodb.WriteConcern.FSYNC_SAFE
case "FSYNCED" => com.mongodb.WriteConcern.FSYNCED
case "JOURNAL_SAFE" => com.mongodb.WriteConcern.JOURNAL_SAFE
case "JOURNALED" => com.mongodb.WriteConcern.JOURNALED
case "SAFE" | "ACKNOWLEDGED" => com.mongodb.WriteConcern.SAFE
case "NORMAL" | "UNACKNOWLEDGED" => com.mongodb.WriteConcern.NORMAL
case "REPLICAS_SAFE" | "REPLICA_ACKNOWLEDGED" => com.mongodb.WriteConcern.REPLICAS_SAFE
case "FSYNC_SAFE" | "FSYNCED" => com.mongodb.WriteConcern.FSYNC_SAFE
case "MAJORITY" => com.mongodb.WriteConcern.MAJORITY
case "NORMAL" => com.mongodb.WriteConcern.NORMAL
case "REPLICA_ACKNOWLEDGED" => com.mongodb.WriteConcern.REPLICA_ACKNOWLEDGED
case "REPLICAS_SAFE" => com.mongodb.WriteConcern.REPLICAS_SAFE
case "SAFE" => com.mongodb.WriteConcern.SAFE
case "UNACKNOWLEDGED" => com.mongodb.WriteConcern.UNACKNOWLEDGED
case "JOURNAL_SAFE" | "JOURNALED" => com.mongodb.WriteConcern.JOURNAL_SAFE
case "NONE" | "ERRORS_IGNORED" => com.mongodb.WriteConcern.NONE
case _ => DefaultWriteConcern
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ abstract class ConfigBuilder[Builder<:ConfigBuilder[Builder] ](
*/
def build(): Config = new Config {

// TODO Review when refactoring config
val properties = builder.properties.map { case (k, v) => k.toLowerCase -> v }
val reqProperties = requiredProperties.map(_.toLowerCase)

require(
requiredProperties.forall(properties.isDefinedAt),
reqProperties.forall(properties.isDefinedAt),
s"Not all properties are defined! : ${
requiredProperties.diff(
reqProperties.diff(
properties.keys.toList.intersect(requiredProperties))
}")

Expand Down

0 comments on commit 1f8e399

Please sign in to comment.