Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Feb 1, 2018

What changes were proposed in this pull request?

Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them to make the behavior consistent with 2.2.

How was this patch tested?

  • Jenkins.
  • Manually ran the following codes and checked the UI for jobs/stages/tasks/queries.
spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.sql.ui.retainedExecutions 10
spark.ui.retainedTasks 10
new Thread() {
  override def run() {
    spark.range(1, 2).foreach { i =>
        Thread.sleep(10000)
    }
  }
}.start()

Thread.sleep(5000)

for (_ <- 1 to 20) {
    new Thread() {
      override def run() {
        spark.range(1, 2).foreach { i =>
        }
      }
    }.start()
}

Thread.sleep(15000)
  spark.range(1, 2).foreach { i =>
}

sc.makeRDD(1 to 100, 100).foreach { i =>
}

@zsxwing
Copy link
Member Author

zsxwing commented Feb 1, 2018

cc @vanzin @cloud-fan

val iter = view.closeableIterator()
try {
iter.asScala.filter(filter).take(max).toList
iter.asScala.filter(filter).toList.sortBy(sorter).take(max)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, aside from the two closure parameters making the calls super ugly, this is more expensive than the previous version.

Previously:

  • filter as you iterate over view
  • limit iteration
  • materialize "max" elements

Now:

  • filter as you iterate over view
  • materialize all elements that pass the filter
  • sort and take "max" elements

This will, at least, make replaying large apps a lot slower, given the filter in the task cleanup method.

// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
  !live || t.status != TaskState.RUNNING.toString()
}

So, when replaying, every time you need to do a cleanup of tasks, you'll deserialize all tasks for the stage. If you have a stage with 10s of thousands of tasks, that's super expensive.

If all you want to change here is the sorting of jobs, I'd recommend adding a new index to JobDataWrapper that sorts them by end time. Then you can do the sorting before you even call this method, by setting up the view appropriately.

If you also want to sort the others (stages, tasks, and sql executions), you could also create indices for those.

Or you could find a way to do this that is not so expensive on the replay side...

If adding indices, though, I'd probably try to get this into 2.3.0 since it would change the data written to disk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Yeah, I understand the expensive sort. However, adding indices needs more work. Do you have time to try it since I'm not familiar with LevelDB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 2, 2018

@vanzin just updated it. I don't fix the task order as I think it's already using stage index and I cannot iterate tasks using two indices.

@vanzin
Copy link
Contributor

vanzin commented Feb 2, 2018

I cannot iterate tasks using two indices.

You can actually; indices can have a parent index, and there's actually a bunch of examples in TaskDataWrapper.

Use them like this.

view.index("your index").parent(stageKey).blah blah blah

private def id: Int = info.jobId

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(Long.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, but if you want you can probably replace the filters in the listener by setting this to -1 for running jobs / stages / others, and starting iteration at "0".

@zsxwing
Copy link
Member Author

zsxwing commented Feb 2, 2018

This is fine, but if you want you can probably replace the filters in the listener by setting this to -1 for running jobs / stages / others, and starting iteration at "0".

Not sure if I get it correctly. I just made completionTime returns 0 for running jobs/... and started iteration at "0".

final val STAGE = "stage"
final val STATUS = "sta"
final val TASK_INDEX = "idx"
final val COMPLETION_TIME = "completionTime"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use a shorter name like the others? It saves a little bit more space on disk because there are so many tasks in large apps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've asked this before, is it possible to put an ID instead of the index name to the kvstore? Then we can use long index names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, right now there's no support for that for indices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@vanzin
Copy link
Contributor

vanzin commented Feb 2, 2018

The logic looks ok. Did you look at adding a test in AppStatusListenerSuite for this? There's already a test for the cleanup, it'd be nice if it were tweaked to cover the changes here.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86947 has finished for PR 20481 at commit 761f1ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM only tiny nits. Also it would be great to have simple test case on this.

private def id: Int = info.jobId

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -1 -> -1L

private def active: Boolean = info.status == StageStatus.ACTIVE

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -1 -> -1L

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86952 has finished for PR 20481 at commit f0de4be.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86953 has finished for PR 20481 at commit 0424c1d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86954 has finished for PR 20481 at commit 4c1080a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

LGTM

@zsxwing
Copy link
Member Author

zsxwing commented Feb 2, 2018

There's already a test for the cleanup, it'd be nice if it were tweaked to cover the changes here.

I created separated tests for job/stage/task/execution as the existing cleanup test is already pretty complicated and mixing logic into it makes it much harder to understand.

@SparkQA
Copy link

SparkQA commented Feb 3, 2018

Test build #87005 has finished for PR 20481 at commit b83b396.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Feb 5, 2018
…d timestamp before cleaning up them

## What changes were proposed in this pull request?

Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them to make the behavior consistent with 2.2.

## How was this patch tested?

- Jenkins.
- Manually ran the following codes and checked the UI for jobs/stages/tasks/queries.

```
spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.sql.ui.retainedExecutions 10
spark.ui.retainedTasks 10
```

```
new Thread() {
  override def run() {
    spark.range(1, 2).foreach { i =>
        Thread.sleep(10000)
    }
  }
}.start()

Thread.sleep(5000)

for (_ <- 1 to 20) {
    new Thread() {
      override def run() {
        spark.range(1, 2).foreach { i =>
        }
      }
    }.start()
}

Thread.sleep(15000)
  spark.range(1, 2).foreach { i =>
}

sc.makeRDD(1 to 100, 100).foreach { i =>
}
```

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20481 from zsxwing/SPARK-23307.

(cherry picked from commit a6bf3db)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in a6bf3db Feb 5, 2018
@zsxwing zsxwing deleted the SPARK-23307 branch February 6, 2018 06:36
@zsxwing
Copy link
Member Author

zsxwing commented Feb 20, 2018

I didn't know InMemoryStore doesn't have any index... Just saw the UI is very slow on a large cluster and it causes read timeout.

@cloud-fan
Copy link
Contributor

IIUC we use index a lot in the UI code, @vanzin is it possible to support index for in-memory kv store?

@vanzin
Copy link
Contributor

vanzin commented Feb 20, 2018

Indices in the in-memory store are kinda dumb right now. It should be possible to do something smarter but that would increase memory usage.

It would also be good to know what "large cluster" means. Large clusters shouldn't affect the UI responsiveness. Large apps might; but I tried apps with 100k+ tasks on each stage and things seemed fine.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 20, 2018

@vanzin NVM. I was wrong. The issue is indeed not related to this PR. Please take a look at https://issues.apache.org/jira/browse/SPARK-23470

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants