Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import scala.collection.mutable
*
* 2、If the partition columns size is not equal to the partition path level, but the partition
* column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10"
* who'es directory level is 3).We can still read it as a partitioned table. We will mapping the
* who's directory level is 3).We can still read it as a partitioned table. We will mapping the
* partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
*
* 3、Else the the partition columns size is not equal to the partition directory level and the
Expand Down Expand Up @@ -256,7 +256,7 @@ case class HoodieFileIndex(
.iterator().asScala.toSeq
(p._1, fileSlices)
})
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSliceSize).sum
}

// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
Expand All @@ -266,6 +266,15 @@ case class HoodieFileIndex(
s" spend: $flushSpend ms")
}

private def fileSliceSize(fileSlice: FileSlice): Long = {
val logFileSize = fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).filter(_ > 0).sum
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileLen + logFileSize
} else {
logFileSize
}
}

override def sizeInBytes: Long = {
cachedFileSize
}
Expand Down