-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17159][STREAM] Significant speed up for running spark streaming against Object store. #22339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,19 +17,19 @@ | |
|
|
||
| package org.apache.spark.streaming.dstream | ||
|
|
||
| import java.io.{IOException, ObjectInputStream} | ||
| import java.io.{FileNotFoundException, IOException, ObjectInputStream} | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.reflect.ClassTag | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} | ||
| import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} | ||
| import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} | ||
|
|
||
| import org.apache.spark.rdd.{RDD, UnionRDD} | ||
| import org.apache.spark.streaming._ | ||
| import org.apache.spark.streaming.scheduler.StreamInputInfo | ||
| import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils} | ||
| import org.apache.spark.util.{SerializableConfiguration, Utils} | ||
|
|
||
| /** | ||
| * This class represents an input stream that monitors a Hadoop-compatible filesystem for new | ||
|
|
@@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| // Set of files that were selected in the remembered batches | ||
| @transient private var recentlySelectedFiles = new mutable.HashSet[String]() | ||
|
|
||
| // Read-through cache of file mod times, used to speed up mod time lookups | ||
| @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) | ||
|
|
||
| // Timestamp of the last round of finding files | ||
| @transient private var lastNewFileFindingTime = 0L | ||
|
|
||
|
|
@@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| * a union RDD out of them. Note that this maintains the list of files that were processed | ||
| * in the latest modification time in the previous call to this method. This is because the | ||
| * modification time returned by the FileStatus API seems to return times only at the | ||
| * granularity of seconds. And new files may have the same modification time as the | ||
| * granularity of seconds in HDFS. And new files may have the same modification time as the | ||
| * latest modification time in the previous call to this method yet was not reported in | ||
| * the previous call. | ||
| */ | ||
|
|
@@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| logDebug("Cleared files are:\n" + | ||
| oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) | ||
| } | ||
| // Delete file mod times that weren't accessed in the last round of getting new files | ||
| fileToModTime.clearOldValues(lastNewFileFindingTime - 1) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| logDebug(s"Getting new files for time $currentTime, " + | ||
| s"ignoring files older than $modTimeIgnoreThreshold") | ||
|
|
||
| val newFileFilter = new PathFilter { | ||
| def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) | ||
| } | ||
| val directoryFilter = new PathFilter { | ||
| override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory | ||
| } | ||
| val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) | ||
| val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) | ||
| .filter(_.isDirectory) | ||
| .map(_.getPath) | ||
| val newFiles = directories.flatMap(dir => | ||
| fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) | ||
| fs.listStatus(dir) | ||
| .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) | ||
|
||
| .map(_.getPath.toString)) | ||
| val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime | ||
| logInfo("Finding new files took " + timeTaken + " ms") | ||
| logDebug("# cached file times = " + fileToModTime.size) | ||
| logInfo(s"Finding new files took $timeTaken ms") | ||
|
||
| if (timeTaken > slideDuration.milliseconds) { | ||
| logWarning( | ||
| "Time taken to find new files exceeds the batch size. " + | ||
| s"Time taken to find new files $timeTaken exceeds the batch size. " + | ||
| "Consider increasing the batch size or reducing the number of " + | ||
| "files in the monitored directory." | ||
| "files in the monitored directories." | ||
| ) | ||
| } | ||
| newFiles | ||
| } catch { | ||
| case e: FileNotFoundException => | ||
| logWarning(s"No directory to scan: $directoryPath: $e") | ||
| Array.empty | ||
| case e: Exception => | ||
| logWarning("Error finding new files", e) | ||
| logWarning(s"Error finding new files under $directoryPath", e) | ||
| reset() | ||
| Array.empty | ||
| } | ||
|
|
@@ -241,16 +236,24 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). | ||
| * Hence they can get selected as new files again. To prevent this, files whose mod time is more | ||
| * than current batch time are not considered. | ||
| * @param fileStatus file status | ||
| * @param currentTime time of the batch | ||
| * @param modTimeIgnoreThreshold the ignore threshold | ||
| * @return true if the file has been modified within the batch window | ||
| */ | ||
| private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { | ||
| private def isNewFile( | ||
| fileStatus: FileStatus, | ||
| currentTime: Long, | ||
| modTimeIgnoreThreshold: Long): Boolean = { | ||
| val path = fileStatus.getPath | ||
| val pathStr = path.toString | ||
| // Reject file if it does not satisfy filter | ||
| if (!filter(path)) { | ||
| logDebug(s"$pathStr rejected by filter") | ||
| return false | ||
| } | ||
| // Reject file if it was created before the ignore time | ||
| val modTime = getFileModTime(path) | ||
| val modTime = fileStatus.getModificationTime() | ||
| if (modTime <= modTimeIgnoreThreshold) { | ||
| // Use <= instead of < to avoid SPARK-4518 | ||
| logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold") | ||
|
|
@@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| new UnionRDD(context.sparkContext, fileRDDs) | ||
| } | ||
|
|
||
| /** Get file mod time from cache or fetch it from the file system */ | ||
| private def getFileModTime(path: Path) = { | ||
| fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) | ||
| } | ||
|
|
||
| private def directoryPath: Path = { | ||
| if (_path == null) _path = new Path(directory) | ||
| _path | ||
|
|
@@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( | |
| generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() | ||
| batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]] | ||
| recentlySelectedFiles = new mutable.HashSet[String]() | ||
| fileToModTime = new TimeStampedHashMap[String, Long](true) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the
.getOrElsecould come at the end, but it hardly matters.