Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 21, 2020

What changes were proposed in this pull request?

This patch adds the new method getLatestBatchId() in CompactibleFileStreamLog in complement of getLatest() which doesn't read the content of the latest batch metadata log file, and apply to both FileStreamSource and FileStreamSink to avoid unnecessary latency on reading log file.

Why are the changes needed?

Once compacted metadata log file becomes huge, writing outputs for the compact + 1 batch is also affected due to unnecessarily reading the compacted metadata log file. This unnecessary latency can be simply avoided.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT. Also manually tested under query which has huge metadata log on file stream sink:

before applying the patch

Screen Shot 2020-02-21 at 4 20 19 PM

after applying the patch

Screen Shot 2020-02-21 at 4 06 18 PM

Peaks are compact batches - please compare the next batch after compact batches, especially the area of "light brown".

val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
.sorted(Ordering.Long.reverse)
for (batchId <- batchIds) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just simply remove reading file here, but as we already get batch IDs from "listing" files, it may not even need to check for existence. It won't be the outstanding latency, though.

log.allFiles()
}

private def withCountOpenLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code regarding FileSystem I add here is very similar with what I add in #27620. When either one gets merged, I'll rebase and deduplicate it.

))
}

test("getLatestBatchId") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't add E2E test to simplify the test code, but if we prefer E2E than I'll try to add a new test to FileStreamSinkSuite.

@HeartSaVioR
Copy link
Contributor Author

cc. @tdas @zsxwing @gaborgsomogyi

@SparkQA
Copy link

SparkQA commented Feb 21, 2020

Test build #118766 has finished for PR 27664 at commit 0cd8cda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've called both functions in the unit test 10k times and the difference looks good:

17:17:19.436 ERROR org.apache.spark.sql.execution.streaming.FileStreamSinkLogSuite: getLatestBatchId: 1449956803 ns
17:17:22.074 ERROR org.apache.spark.sql.execution.streaming.FileStreamSinkLogSuite: getLatest: 2638354455 ns

def getCountForOpenOnMetadataFile(batchId: Long): Long = {
val path = sinkLog.batchIdToPath(batchId).toUri.getPath
CountOpenLocalFileSystem.pathToNumOpenCalled
.get(path).map(_.get()).getOrElse(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no linebreak needed.

withCountOpenLocalFileSystemAsLocalFileSystem {
val scheme = CountOpenLocalFileSystem.scheme
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withTempDir { file =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a dir maybe we can call it dir or path.


val curCount = getCountForOpenOnMetadataFile(2)

assert(sinkLog.getLatestBatchId() === Some(2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/2/2L/

.get(path).map(_.get()).getOrElse(0)
}

val curCount = getCountForOpenOnMetadataFile(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/2/2L/


object CountOpenLocalFileSystem {
val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs"
val pathToNumOpenCalled = new mutable.HashMap[String, AtomicLong]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some reset functionality would be good to make it re-usable. This would also make curCount disappear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could you use a thread-safe map in case someone may reuse this in a streaming query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I feel it's safer to make the map be thread-safe. I thought it's a contradiction to assume because we allow "resetting" the count, but it can be handled with caution.


assert(sinkLog.getLatestBatchId() === Some(2))
// getLatestBatchId doesn't open the latest metadata log file
assert(getCountForOpenOnMetadataFile(2L) === curCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth to check other batches as well.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Apr 3, 2020

Thanks for reviewing! Reflected review comments.

@SparkQA
Copy link

SparkQA commented Apr 3, 2020

Test build #120748 has finished for PR 27664 at commit d270961.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

Seems unrelated.

@gaborgsomogyi
Copy link
Contributor

retest this please

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (pending tests)

assert(sinkLog.getLatestBatchId() === Some(2L))
// getLatestBatchId doesn't open the latest metadata log file
(0L to 2L).foreach { batchId =>
assert(getCountForOpenOnMetadataFile(batchId) === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just to be consistent with the other parts s/0/0L. Same applies to the other 2 occurrence...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just replaced all constants where the type is Long. Thanks!

@SparkQA
Copy link

SparkQA commented Apr 3, 2020

Test build #120764 has finished for PR 27664 at commit d270961.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 3, 2020

Test build #120772 has finished for PR 27664 at commit 30338fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121229 has finished for PR 27664 at commit 30338fb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 15, 2020

Test build #121301 has finished for PR 27664 at commit 30338fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

val latestBatchId = getLatest().map(_._1).getOrElse(-1L)

Can these two places also be optimized in this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so. Nice finding. Thanks!

@HeartSaVioR HeartSaVioR changed the title [SPARK-30915][SS] FileStreamSink: Avoid reading the metadata log file when finding the latest batch ID [SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID Apr 17, 2020
@SparkQA
Copy link

SparkQA commented Apr 17, 2020

Test build #121418 has finished for PR 27664 at commit ff9078e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 18, 2020

Test build #121434 has finished for PR 27664 at commit ff9078e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Just one more nit: since the new function shares most of the code with the original getLatest, how about modifying getLatest with a default false param to control the file reading behavior?

@HeartSaVioR
Copy link
Contributor Author

I'm sorry, but I think the name of the method should represent the thing what it does. That's why the boolean parameters have been considered as code smell and even we encourage to specify the name of boolean parameter while calling the method.

We may be able to try deduplicating the code via allowing some additional call on exists - so getLatest() can be implemented as a combination of getLatestBatchId() and get(). Does it make sense?

@xuanyuanking
Copy link
Member

so getLatest() can be implemented as a combination of getLatestBatchId() and get(). Does it make sense?

Yeah, that would be good enough. No more duplicated code and the two similar functions will be in the same class.

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122027 has finished for PR 27664 at commit f6078bb.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122026 has finished for PR 27664 at commit fc00a0c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122046 has finished for PR 27664 at commit f6078bb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122060 has finished for PR 27664 at commit f6078bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for the new exception.
cc @zsxwing @brkyvz @tdas

val content = get(batchId).getOrElse {
// This only happens in odd case where the file exists when getLatestBatchId() is called,
// but get() doesn't find it.
throw new IllegalStateException(s"failed to read log file for batch $batchId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but maybe not to involve a new behavior change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a part of the change in #25965 which should be dealt with. It shouldn't give the content with batch ID which is less than the latest batch ID - it should rather fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, you might have interesting proposals on my old PRs, https://github.com/apache/spark/pulls/HeartSaVioR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference, will take a look later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reference #25965, LGTM to this change.
I personally think the comment in https://github.com/apache/spark/pull/25965/files#diff-aaeb546880508bb771df502318c40a99R183 is clearer. Either way is fine though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pulled the comment here. Either this or #25965 will have to resolve merge conflict but wanted to be sure the code comment is clear in any way.

@SparkQA
Copy link

SparkQA commented May 21, 2020

Test build #122933 has finished for PR 27664 at commit 83451c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good except some nits.

if (batch.isDefined) {
return Some((batchId, batch.get))
val batchMetadataFile = batchIdToPath(batchId)
if (fileManager.exists(batchMetadataFile)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ni: I think we can remove this check. Previously, it was calling get which returns Option, so it had an extra isDefine check.

throw new IllegalStateException(s"failed to read log file for batch $batchId")
}
(batchId, content)
}.orElse(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .orElse(None) is not needed.


object CountOpenLocalFileSystem {
val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs"
val pathToNumOpenCalled = new mutable.HashMap[String, AtomicLong]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could you use a thread-safe map in case someone may reuse this in a streaming query?

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending tests

@SparkQA
Copy link

SparkQA commented May 22, 2020

Test build #123009 has finished for PR 27664 at commit 4a5679e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented May 22, 2020

Thanks! Merging to master.

@asfgit asfgit closed this in 5a258b0 May 22, 2020
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-30915 branch May 23, 2020 00:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants