Skip to content

Commit

Permalink
[SPARK-3138][SQL] sqlContext.parquetFile should be able to take a sin…
Browse files Browse the repository at this point in the history
…gle file as parameter
  • Loading branch information
chutium committed Aug 19, 2014
1 parent cbfc26b commit 4ae477f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,14 @@ private[parquet] object ParquetTypesConverter extends Logging {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
}
val path = origPath.makeQualified(fs)
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
s"Expected $path for be a directory with Parquet files/metadata")
}
ParquetRelation.enableLogForwarding()

val children = fs.listStatus(path).filterNot { status =>
val name = status.getPath.getName
name(0) == '.' || name == FileOutputCommitter.SUCCEEDED_FILE_NAME
}

ParquetRelation.enableLogForwarding()

// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
// single row group to read schema related metadata. Notice that we are making assumptions that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils


case class TestRDDEntry(key: Int, value: String)

case class NullReflectData(
Expand Down Expand Up @@ -318,8 +317,30 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}

test("Read a parquet file instead of a directory") {
val file = getTempFilePath("parquet")
val path = file.toString
val fsPath = new Path(path)
val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))
rdd.coalesce(1).saveAsParquetFile(path)

val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
assert(children.length > 0)
val readFile = parquetFile(path + "/" + children(0).getPath.getName)
readFile.registerTempTable("tmpx")
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}
Expand Down

0 comments on commit 4ae477f

Please sign in to comment.