-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34498][SQL][TESTS] fix the remaining problems in #31560 #31621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,11 +27,10 @@ import org.apache.hadoop.fs.{FileSystem, Path} | |
|
|
||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsWrite, Table, TableCapability} | ||
| import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} | ||
| import org.apache.spark.sql.connector.catalog.TableCapability._ | ||
| import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, ScanBuilder} | ||
| import org.apache.spark.sql.connector.write._ | ||
| import org.apache.spark.sql.internal.connector.SimpleTableProvider | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
| import org.apache.spark.util.SerializableConfiguration | ||
|
|
@@ -41,11 +40,7 @@ import org.apache.spark.util.SerializableConfiguration | |
| * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. | ||
| * Each job moves files from `target/_temporary/uniqueId/` to `target`. | ||
| */ | ||
| class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSupport { | ||
|
|
||
| private val tableSchema = new StructType().add("i", "long").add("j", "long") | ||
|
|
||
| override def keyPrefix: String = "simpleWritableDataSource" | ||
| class SimpleWritableDataSource extends TestingV2Source { | ||
|
|
||
| class MyScanBuilder(path: String, conf: Configuration) extends SimpleScanBuilder { | ||
| override def planInputPartitions(): Array[InputPartition] = { | ||
|
|
@@ -68,7 +63,7 @@ class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSup | |
| new CSVReaderFactory(serializableConf) | ||
| } | ||
|
|
||
| override def readSchema(): StructType = tableSchema | ||
| override def readSchema(): StructType = TestingV2Source.schema | ||
| } | ||
|
|
||
| class MyWriteBuilder(path: String, info: LogicalWriteInfo) | ||
|
|
@@ -134,7 +129,7 @@ class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSup | |
| private val path = options.get("path") | ||
| private val conf = SparkContext.getActive.get.hadoopConfiguration | ||
|
|
||
| override def schema(): StructType = tableSchema | ||
| override def schema(): StructType = TestingV2Source.schema | ||
|
||
|
|
||
| override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { | ||
| new MyScanBuilder(new Path(path).toUri.toString, conf) | ||
|
|
@@ -179,7 +174,7 @@ class CSVReaderFactory(conf: SerializableConfiguration) | |
| } | ||
| } | ||
|
|
||
| override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toLong): _*) | ||
| override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toInt): _*) | ||
|
|
||
| override def close(): Unit = { | ||
| inputStream.close() | ||
|
|
@@ -222,7 +217,7 @@ class CSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[InternalRow] | |
| private val out = fs.create(file) | ||
|
|
||
| override def write(record: InternalRow): Unit = { | ||
| out.writeBytes(s"${record.getLong(0)},${record.getLong(1)}\n") | ||
| out.writeBytes(s"${record.getInt(0)},${record.getInt(1)}\n") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change just matches the TestingV2Source.schema?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea |
||
| } | ||
|
|
||
| override def commit(): WriterCommitMessage = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimpleScanBuilder.readSchemais alreadyTestingV2Source.schema, we don't need to override it here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it‘s unnecessary to override the function.