diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index a18590f59165..66680eb6342f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -1,6 +1,8 @@ ## Release History ### 4.2.0-beta.1 (Unreleased) +#### Configuration Changes +* Changed the default value of `spark.cosmos.read.inferSchema.forceNullableProperties` from `false` to `true` based on user feedback, see [PR](https://github.com/Azure/azure-sdk-for-java/pull/22049). ### 4.1.0 (2021-05-27) #### New Features diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md index 3a0afb9a4a09..40265f5a4179 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md @@ -48,7 +48,7 @@ When doing read operations, users can specify a custom schema or allow the conne | `spark.cosmos.read.inferSchema.samplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. | | `spark.cosmos.read.inferSchema.includeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). | | `spark.cosmos.read.inferSchema.includeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchema.includeSystemProperties` is enabled, as it will already include all system properties. | -| `spark.cosmos.read.inferSchema.forceNullableProperties` | `false` | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default whether inferred columns are treated as nullable or not will depend on whether any record in the sample set has null-values within a column. If set to `true` all columns will be treated as nullable even if all rows within the sample set have non-null values. | +| `spark.cosmos.read.inferSchema.forceNullableProperties` | `true` | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default, all columns (except cosmos system properties) will be treated as nullable even if all rows within the sample set have non-null values. When disabled, the inferred columns are treated as nullable or not depending on whether any record in the sample set has null-values within a column. | #### Json conversion configuration diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index 38ded7ceb95d..48f1152bde45 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -533,7 +533,7 @@ private object CosmosSchemaInferenceConfig { private val inferSchemaForceNullableProperties = CosmosConfigEntry[Boolean]( key = CosmosConfigNames.ReadInferSchemaForceNullableProperties, mandatory = false, - defaultValue = Some(false), + defaultValue = Some(true), parseFromStringFunction = include => include.toBoolean, helpMessage = "Whether schema inference should enforce inferred properties to be nullable - even when no null-values are contained in the sample set") diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala index d16cd05e6d79..bae610c7bf0e 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala @@ -460,6 +460,57 @@ class SparkE2EQueryITest fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe false } + "spark query" can "when forceNullableProperties is false and rows have different schema" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val samplingSize = 100 + val expectedResults = samplingSize * 2 + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) + + // Inserting documents with slightly different schema + for( _ <- 1 to expectedResults) { + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + val arr = objectNode.putArray("object_array") + val nested = Utils.getSimpleObjectMapper.createObjectNode() + nested.put("A", "test") + nested.put("B", "test") + arr.add(nested) + objectNode.put("id", UUID.randomUUID().toString) + container.createItem(objectNode).block() + } + + for( _ <- 1 to samplingSize) { + val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode() + val arr = objectNode2.putArray("object_array") + val nested = Utils.getSimpleObjectMapper.createObjectNode() + nested.put("A", "test") + arr.add(nested) + objectNode2.put("id", UUID.randomUUID().toString) + container.createItem(objectNode2).block() + } + + val cfgWithInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.forceNullableProperties" -> "false", + "spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString, + "spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts", + "spark.cosmos.read.partitioning.strategy" -> "Restrictive" + ) + + val dfWithInference = spark.read.format("cosmos.oltp").options(cfgWithInference).load() + try { + dfWithInference.collect() + fail("Should have thrown an exception") + } + catch { + case inner: Exception => + inner.toString.contains("The 1th field 'B' of input row cannot be null") shouldBe true + } + } + "spark query" can "use custom sampling size" in { val cosmosEndpoint = TestConfigurations.HOST val cosmosMasterKey = TestConfigurations.MASTER_KEY @@ -580,6 +631,7 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.read.inferSchema.forceNullableProperties" -> "false", "spark.cosmos.read.partitioning.strategy" -> "Restrictive" ) @@ -652,6 +704,62 @@ class SparkE2EQueryITest fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe false } + "spark query" can "return proper Cosmos specific query plan on explain with nullable properties" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + + val id = UUID.randomUUID().toString + + val rawItem = s""" + | { + | "id" : "${id}", + | "nestedObject" : { + | "prop1" : 5, + | "prop2" : "6" + | } + | } + |""".stripMargin + + val objectNode = objectMapper.readValue(rawItem, classOf[ObjectNode]) + + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) + container.createItem(objectNode).block() + + val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.read.inferSchema.forceNullableProperties" -> "true", + "spark.cosmos.read.partitioning.strategy" -> "Restrictive" + ) + + val df = spark.read.format("cosmos.oltp").options(cfg).load() + val rowsArray = df.where("nestedObject.prop2 = '6'").collect() + rowsArray should have size 1 + + var output = new java.io.ByteArrayOutputStream() + Console.withOut(output) { + df.explain() + } + var queryPlan = output.toString.replaceAll("#\\d+", "#x") + logInfo(s"Query Plan: $queryPlan") + queryPlan.contains("Cosmos Query: SELECT * FROM r") shouldEqual true + + output = new java.io.ByteArrayOutputStream() + Console.withOut(output) { + df.where("nestedObject.prop2 = '6'").explain() + } + queryPlan = output.toString.replaceAll("#\\d+", "#x") + logInfo(s"Query Plan: $queryPlan") + val expected = s"Cosmos Query: SELECT * FROM r WHERE NOT(IS_NULL(r['nestedObject'])) " + + s"AND r['nestedObject']['prop2']=" + + s"@param0${System.getProperty("line.separator")} > param: @param0 = 6" + queryPlan.contains(expected) shouldEqual true + + val item = rowsArray(0) + item.getAs[String]("id") shouldEqual id + } + //scalastyle:on magic.number //scalastyle:on multiple.string.literals }