Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions

val relation = DataSourceV2Relation.create(source, options)
if (mode == SaveMode.Append) {
val relation = DataSourceV2Relation.create(source, options)
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,24 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-25700: do not read schema when writing in other modes except append mode") {
withTempPath { file =>
val cls = classOf[SimpleWriteOnlyDataSource]
val path = file.getCanonicalPath
val df = spark.range(5).select('id as 'i, -'id as 'j)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write path looks requiring two columns:

out.writeBytes(s"${record.getLong(0)},${record.getLong(1)}\n")

try {
df.write.format(cls.getName).option("path", path).mode("error").save()
df.write.format(cls.getName).option("path", path).mode("overwrite").save()
df.write.format(cls.getName).option("path", path).mode("ignore").save()
} catch {
case e: SchemaReadAttemptException => fail("Schema read was attempted.", e)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To validate new code path line 250, could you add intercept[SchemaReadAttemptException] and do append, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

intercept[SchemaReadAttemptException] {
df.write.format(cls.getName).option("path", path).mode("append").save()
}
}
}
}


Expand Down Expand Up @@ -640,3 +658,14 @@ object SpecificReaderFactory extends PartitionReaderFactory {
}
}
}

class SchemaReadAttemptException(m: String) extends RuntimeException(m)

class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
override def fullSchema(): StructType = {
// This is a bit hacky since this source implements read support but throws
// during schema retrieval. Might have to rewrite but it's done
// such so for minimised changes.
throw new SchemaReadAttemptException("read is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2
with BatchWriteSupportProvider
with SessionConfigSupport {

private val schema = new StructType().add("i", "long").add("j", "long")
protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long")

override def keyPrefix: String = "simpleWritableDataSource"

class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {

override def fullSchema(): StructType = schema
override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema()

override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
val dataPath = new Path(path)
Expand Down Expand Up @@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2
schema: StructType,
mode: SaveMode,
options: DataSourceOptions): Optional[BatchWriteSupport] = {
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
Copy link
Member

@viirya viirya Oct 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For modes other than Append, I think we still need this assert, don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea .. but it's in test code and just sanity check..

assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))

val path = new Path(options.get("path").get())
Expand Down