diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index b588d4c06e6e..74140d707c25 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.connector.SimpleCounter; import org.apache.spark.sql.connector.TestingV2Source; -import org.apache.spark.sql.connector.catalog.SessionConfigSupport; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCapability; @@ -50,12 +49,7 @@ * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ -public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport { - - @Override - public String keyPrefix() { - return "javaSimpleWritableDataSource"; - } +public class JavaSimpleWritableDataSource implements TestingV2Source { static class MyScanBuilder extends JavaSimpleScanBuilder { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index f9306ba28e7f..065ba4caebf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -27,12 +27,10 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, ScanBuilder} import org.apache.spark.sql.connector.write._ -import org.apache.spark.sql.internal.connector.SimpleTableProvider -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -41,11 +39,7 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ -class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSupport { - - private val tableSchema = new StructType().add("i", "long").add("j", "long") - - override def keyPrefix: String = "simpleWritableDataSource" +class SimpleWritableDataSource extends TestingV2Source { class MyScanBuilder(path: String, conf: Configuration) extends SimpleScanBuilder { override def planInputPartitions(): Array[InputPartition] = { @@ -67,8 +61,6 @@ class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSup val serializableConf = new SerializableConfiguration(conf) new CSVReaderFactory(serializableConf) } - - override def readSchema(): StructType = tableSchema } class MyWriteBuilder(path: String, info: LogicalWriteInfo) @@ -134,8 +126,6 @@ class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSup private val path = options.get("path") private val conf = SparkContext.getActive.get.hadoopConfiguration - override def schema(): StructType = tableSchema - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder(new Path(path).toUri.toString, conf) } @@ -179,7 +169,7 @@ class CSVReaderFactory(conf: SerializableConfiguration) } } - override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toLong): _*) + override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toInt): _*) override def close(): Unit = { inputStream.close() @@ -222,7 +212,7 @@ class CSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[InternalRow] private val out = fs.create(file) override def write(record: InternalRow): Unit = { - out.writeBytes(s"${record.getLong(0)},${record.getLong(1)}\n") + out.writeBytes(s"${record.getInt(0)},${record.getInt(1)}\n") } override def commit(): WriterCommitMessage = {