Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ private object CosmosTableSchemaInferrer
ResourceIdAttributeName,
AttachmentsAttributeName)

private val notNullableProperties = List(
IdAttributeName,
ETagAttributeName,
SelfAttributeName,
ResourceIdAttributeName,
TimestampAttributeName,
AttachmentsAttributeName)

private[spark] def inferSchema(
inferredItems: Seq[ObjectNode],
includeSystemProperties: Boolean,
Expand Down Expand Up @@ -125,7 +133,7 @@ private object CosmosTableSchemaInferrer
case anyType: DataType => field.getKey -> StructField(
field.getKey,
anyType,
nullable= !systemProperties.contains(field.getKey) && allowNullForInferredProperties)
nullable= !notNullableProperties.contains(field.getKey) && allowNullForInferredProperties)
})
.toSeq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CosmosTableSchemaInferrerSpec extends UnitSpec {
schema.fields(1).name shouldBe "otherProperty"
schema.fields(0).dataType shouldBe StringType
schema.fields(1).dataType shouldBe StringType
schema.fields(0).nullable shouldBe true
schema.fields(0).nullable shouldBe false
schema.fields(1).nullable shouldBe true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ class SparkE2EQueryITest
fieldNames.contains(CosmosTableSchemaInferrer.ResourceIdAttributeName) shouldBe true
fieldNames.contains(CosmosTableSchemaInferrer.ETagAttributeName) shouldBe true
fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe true

rowWithInference.schema(CosmosTableSchemaInferrer.SelfAttributeName).nullable shouldBe false
rowWithInference.schema(CosmosTableSchemaInferrer.TimestampAttributeName).nullable shouldBe false
rowWithInference.schema(CosmosTableSchemaInferrer.ResourceIdAttributeName).nullable shouldBe false
rowWithInference.schema(CosmosTableSchemaInferrer.ETagAttributeName).nullable shouldBe false
rowWithInference.schema(CosmosTableSchemaInferrer.AttachmentsAttributeName).nullable shouldBe false
}

"spark query" can "use schema inference with just timestamp" in {
Expand Down Expand Up @@ -438,7 +444,7 @@ class SparkE2EQueryITest
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "true",
"spark.cosmos.read.inferSchema.query" -> "select TOP 1 c.type, c.age, c.isAlive, c._ts from c",
"spark.cosmos.read.inferSchema.query" -> "select TOP 1 c.type, c.age, c.isAlive, c._ts, c.id from c",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

Expand All @@ -455,9 +461,16 @@ class SparkE2EQueryITest
val fieldNames = rowWithInference.schema.fields.map(field => field.name)
fieldNames.contains(CosmosTableSchemaInferrer.SelfAttributeName) shouldBe false
fieldNames.contains(CosmosTableSchemaInferrer.TimestampAttributeName) shouldBe true
fieldNames.contains(CosmosTableSchemaInferrer.IdAttributeName) shouldBe true
fieldNames.contains(CosmosTableSchemaInferrer.ResourceIdAttributeName) shouldBe false
fieldNames.contains(CosmosTableSchemaInferrer.ETagAttributeName) shouldBe false
fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe false

rowWithInference.schema(CosmosTableSchemaInferrer.TimestampAttributeName).nullable shouldBe false
rowWithInference.schema(CosmosTableSchemaInferrer.IdAttributeName).nullable shouldBe false
rowWithInference.schema("type").nullable shouldBe true
rowWithInference.schema("age").nullable shouldBe true
rowWithInference.schema("isAlive").nullable shouldBe true
}

"spark query" can "when forceNullableProperties is false and rows have different schema" in {
Expand Down