Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
Merge pull request #125 from pfcoperez/issue114
Browse files Browse the repository at this point in the history
Take empty arrays type as arrays of Strings #124
  • Loading branch information
darroyo-stratio committed Apr 29, 2016
2 parents f041cf6 + dd0c007 commit 8d35cbf
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ case class MongodbSchema[T <: RDD[DBObject]](
val fields = doc.mapValues(f => convertToStruct(f))
fields
}
}.reduceByKey(compatibleType).aggregate(Seq[StructField]())(
(fields, newField) => fields :+ StructField(newField._1, newField._2),
(oldFields, newFields) => oldFields ++ newFields)
}.reduceByKey(compatibleType).aggregate(Seq[StructField]())({
case (fields, (name, tpe)) =>
val newType = tpe match {
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
case other => other
}
fields :+ StructField(name, newType)
}, (oldFields, newFields) => oldFields ++ newFields)
StructType(structFields)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ trait TestBsonData {
"arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
"arrayOfBoolean":[true, false, true],
"arrayOfNull":[null, null, null, null],
"arrayEmpty":[],
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import com.stratio.datasource.mongodb.partitioner.MongodbPartitioner
import com.stratio.datasource.mongodb.rdd.MongodbRDD
import com.stratio.datasource.mongodb._
import org.apache.spark.sql.mongodb.{TemporaryTestSQLContext, TestSQLContext}

import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, TimestampType}
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -70,7 +69,12 @@ with MongodbTestConstants {
withEmbedMongoFixture(complexFieldAndType1) { mongodProc =>
val schema = MongodbSchema(mongodbRDD, 1.0).schema()

schema.fields should have size 12
schema.fields should have size 13

schema.fields filter {
case StructField(name, ArrayType(StringType, _), _, _) => Set("arrayOfNull", "arrayEmpty") contains name
case _ => false
} should have size 2

schema.printTreeString()
}
Expand Down

0 comments on commit 8d35cbf

Please sign in to comment.