diff --git a/doc/src/site/sphinx/First_Steps.rst b/doc/src/site/sphinx/First_Steps.rst index b0ef756..d315eaf 100644 --- a/doc/src/site/sphinx/First_Steps.rst +++ b/doc/src/site/sphinx/First_Steps.rst @@ -91,7 +91,7 @@ Configuration parameters +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | schema_samplingRatio | 1.0 | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| writeConcern | mongodb.WriteConcern.ACKNOWLEDGED | No | +| writeConcern | "safe" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | splitSize | 10 | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ @@ -212,6 +212,18 @@ Examples Scala API --------- +Launch the spark shell: +:: + + $ bin/spark-shell --packages com.stratio.datasource:spark-mongodb_2.10: + +If you are using the spark shell, a SQLContext is already created and is available as a variable: 'sqlContext'. +Alternatively, you could create a SQLContext instance in your spark application code: + +:: + + val sqlContext = new SQLContext(sc) + To read a DataFrame from a Mongo collection, you can use the library by loading the implicits from `com.stratio.datasource.mongodb._`. To save a DataFrame in MongoDB you should use the saveToMongodb() function as follows: @@ -249,8 +261,8 @@ In the example we can see how to use the fromMongoDB() function to read from Mon val readConfig = builder.build() val mongoRDD = sqlContext.fromMongoDB(readConfig) mongoRDD.registerTempTable("students") - sqlContext.sql("SELECT name, age FROM students") - + val dataFrame = sqlContext.sql("SELECT name, age FROM students") + dataFrame.show If you want to use a SSL connection, you need to add this 'import', and add 'SSLOptions' to the MongodbConfigBuilder: @@ -268,9 +280,9 @@ Using StructType: import org.apache.spark.sql.types._ val schemaMongo = StructType(StructField("name", StringType, true) :: StructField("age", IntegerType, true ) :: Nil) - sqlContext.createExternalTable("mongoTable", "com.stratio.datasource.mongodb", schemaMongo, Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students")) + val df = sqlContext.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "localhost:27017", "database" -> "highschool", "collection" -> "students")).load + df.registerTempTable("mongoTable") sqlContext.sql("SELECT * FROM mongoTable WHERE name = 'Torcuato'").show() - sqlContext.sql("DROP TABLE mongoTable") Using DataFrameWriter: @@ -319,9 +331,19 @@ Then: :: from pyspark.sql import SQLContext - sqlContext.sql("CREATE TEMPORARY TABLE students_table USING com.stratio.datasource.mongodb OPTIONS (host 'host:port', database 'highschool', collection 'students')") + sqlContext.sql("CREATE TEMPORARY TABLE students_table USING com.stratio.datasource.mongodb OPTIONS (host 'localhost:27017', database 'highschool', collection 'students')") sqlContext.sql("SELECT * FROM students_table").collect() +Using DataFrameReader and DataFrameWriter: +:: + + df = sqlContext.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='highschool', collection='students').load() + df.select("name").collect() + + df.select("name").write.format("com.stratio.datasource.mongodb").mode('overwrite').options(host='localhost:27017', database='highschool', collection='studentsview').save() + dfView = sqlContext.read.format('com.stratio.datasource.mongodb').options(host='localhost:27017', database='highschool', collection='studentsview').load() + dfView.show() + Java API -------- diff --git a/spark-mongodb-examples/pom.xml b/spark-mongodb-examples/pom.xml index d869a1c..7ec504e 100644 --- a/spark-mongodb-examples/pom.xml +++ b/spark-mongodb-examples/pom.xml @@ -40,7 +40,7 @@ 2.10 - 0.11.0-SNAPSHOT + 0.11.2-RC1-SNAPSHOT 1.5.2 diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala index 0df48c1..885a8d9 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/client/MongodbClientFactory.scala @@ -155,8 +155,9 @@ object MongodbClientFactory { } } + // TODO Review when refactoring config def extractValue[T](options: Map[String, Any], key: String): Option[T] = - options.get(key).map(_.asInstanceOf[T]) + options.get(key.toLowerCase).map(_.asInstanceOf[T]) def sslBuilder(optionSSLOptions: Option[MongodbSSLOptions]): Boolean = optionSSLOptions.exists(sslOptions => { diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala index d95be7b..8810585 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala @@ -23,6 +23,7 @@ import com.stratio.datasource.util.Config._ /** * Values and Functions for access and parse the configuration parameters */ +// TODO Review when refactoring config object MongodbConfig { // Parameter names @@ -93,6 +94,7 @@ object MongodbConfig { * @param parameters List of parameters * @return List of parameters parsed to correct mongoDb configurations */ + // TODO Review when refactoring config def parseParameters(parameters : Map[String,String]): Map[String, Any] = { // required properties @@ -137,6 +139,7 @@ object MongodbConfig { * @param readPreference string key for identify the correct object * @return readPreference object */ + // TODO Review when refactoring config def parseReadPreference(readPreference: String): ReadPreference = { readPreference.toUpperCase match { case "PRIMARY" => com.mongodb.casbah.ReadPreference.Primary @@ -153,18 +156,16 @@ object MongodbConfig { * @param writeConcern string key for identify the correct object * @return writeConcern object */ + // TODO Review when refactoring config def parseWriteConcern(writeConcern: String): WriteConcern = { writeConcern.toUpperCase match { - case "FSYNC_SAFE" => com.mongodb.WriteConcern.FSYNC_SAFE - case "FSYNCED" => com.mongodb.WriteConcern.FSYNCED - case "JOURNAL_SAFE" => com.mongodb.WriteConcern.JOURNAL_SAFE - case "JOURNALED" => com.mongodb.WriteConcern.JOURNALED + case "SAFE" | "ACKNOWLEDGED" => com.mongodb.WriteConcern.SAFE + case "NORMAL" | "UNACKNOWLEDGED" => com.mongodb.WriteConcern.NORMAL + case "REPLICAS_SAFE" | "REPLICA_ACKNOWLEDGED" => com.mongodb.WriteConcern.REPLICAS_SAFE + case "FSYNC_SAFE" | "FSYNCED" => com.mongodb.WriteConcern.FSYNC_SAFE case "MAJORITY" => com.mongodb.WriteConcern.MAJORITY - case "NORMAL" => com.mongodb.WriteConcern.NORMAL - case "REPLICA_ACKNOWLEDGED" => com.mongodb.WriteConcern.REPLICA_ACKNOWLEDGED - case "REPLICAS_SAFE" => com.mongodb.WriteConcern.REPLICAS_SAFE - case "SAFE" => com.mongodb.WriteConcern.SAFE - case "UNACKNOWLEDGED" => com.mongodb.WriteConcern.UNACKNOWLEDGED + case "JOURNAL_SAFE" | "JOURNALED" => com.mongodb.WriteConcern.JOURNAL_SAFE + case "NONE" | "ERRORS_IGNORED" => com.mongodb.WriteConcern.NONE case _ => DefaultWriteConcern } } diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala index 4c26377..088dc28 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala @@ -62,12 +62,14 @@ abstract class ConfigBuilder[Builder<:ConfigBuilder[Builder] ]( */ def build(): Config = new Config { + // TODO Review when refactoring config val properties = builder.properties.map { case (k, v) => k.toLowerCase -> v } + val reqProperties = requiredProperties.map(_.toLowerCase) require( - requiredProperties.forall(properties.isDefinedAt), + reqProperties.forall(properties.isDefinedAt), s"Not all properties are defined! : ${ - requiredProperties.diff( + reqProperties.diff( properties.keys.toList.intersect(requiredProperties)) }")