Skip to content

Commit

Permalink
[SEDONA-670] Fix GeoJSON reader for DBR (#1662)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation authored Oct 30, 2024
1 parent ef5f7c7 commit bf11a3c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ class GeoJSONFileFormat extends TextBasedFileFormat with DataSourceRegister {
allowArrayAsStructs = true)
val dataSource = JsonDataSource(parsedOptions)

dataSource
.readFile(broadcastedHadoopConf.value.value, file, parser, actualSchema)
SparkCompatUtil
.readFile(dataSource, broadcastedHadoopConf.value.value, file, parser, actualSchema)
.map(row => {
val newRow = GeoJSONUtils.convertGeoJsonToGeometry(row, alteredSchema)
newRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.spark.sql.sedona_sql.io.geojson

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.LegacyDateFormat
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.json.JsonDataSource
import org.apache.spark.sql.types.{DataType, StructField, StructType}

import scala.reflect.runtime.{universe => ru}
Expand Down Expand Up @@ -158,4 +162,43 @@ object SparkCompatUtil {
}
}
}

def readFile(
jsonDataSource: JsonDataSource,
conf: Configuration,
file: PartitionedFile,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
val readFileMethods =
jsonDataSource.getClass.getDeclaredMethods.filter(_.getName == "readFile")
// Get the number of input arguments of the readFile method
readFileMethods.find(_.getParameterCount == 4) match {
case Some(readFileMethod) =>
// The readFile method defined by open-source Apache Spark:
// def readFile(
// conf: Configuration,
// file: PartitionedFile,
// parser: JacksonParser,
// schema: StructType): Iterator[InternalRow]
readFileMethod
.invoke(jsonDataSource, conf, file, parser, schema)
.asInstanceOf[Iterator[InternalRow]]
case None =>
readFileMethods.find(_.getParameterCount == 5) match {
case Some(readFileMethod) =>
// The readFile method defined by DBR:
// def readFile(
// conf: Configuration,
// file: PartitionedFile,
// parser: JacksonParser,
// schema: StructType,
// badRecordsWriter: Option[BadRecordsWriter]): Iterator[InternalRow]
readFileMethod
.invoke(jsonDataSource, conf, file, parser, schema, None)
.asInstanceOf[Iterator[InternalRow]]
case None =>
throw new Exception("No suitable readFile method found in JsonDataSource")
}
}
}
}

0 comments on commit bf11a3c

Please sign in to comment.