Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Google Batch hang when internal error during scheduling #5567

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
private volatile CloudMachineInfo machineInfo

private volatile long timestamp
/**
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
* Flag to indicate job has failed without tasks
jorgee marked this conversation as resolved.
Show resolved Hide resolved
*/
private volatile boolean noTaskJobfailure

GoogleBatchTaskHandler(TaskRun task, GoogleBatchExecutor executor) {
super(task)
Expand Down Expand Up @@ -445,9 +449,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
*/
protected String getTaskState() {
final tasks = client.listTasks(jobId)
if( !tasks.iterator().hasNext() )
return 'PENDING'

if( !tasks.iterator().hasNext() ) {
// if there are no tasks checks the job status
return checkJobStatus()
}
final now = System.currentTimeMillis()
final delta = now - timestamp;
if( !taskState || delta >= 1_000) {
Expand All @@ -468,6 +473,21 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return taskState
}

String checkJobStatus() {
final jobStatus = client.getJobStatus(jobId);
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
final newState = jobStatus?.state as String
if (newState) {
taskState = newState
timestamp = System.currentTimeMillis()
if (newState == "FAILED") {
noTaskJobfailure = true
}
return taskState
} else {
return "PENDING"
}
}

static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED']

static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED']
Expand Down Expand Up @@ -510,9 +530,8 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

protected Throwable getJobError() {
try {
final status = client.getTaskStatus(jobId, taskId)
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
final events = noTaskJobfailure ? client.getJobStatus(jobId).getStatusEventsList() : client.getTaskStatus(jobId, taskId).getStatusEventsList()
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
final lastEvent = events?.get( events.size() -1 )
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}"

final error = lastEvent?.description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.google.cloud.batch.v1.BatchServiceClient
import com.google.cloud.batch.v1.BatchServiceSettings
import com.google.cloud.batch.v1.Job
import com.google.cloud.batch.v1.JobName
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.LocationName
import com.google.cloud.batch.v1.Task
import com.google.cloud.batch.v1.TaskGroupName
Expand Down Expand Up @@ -123,6 +124,10 @@ class BatchClient {
return describeTask(jobId, taskId).getStatus()
}

JobStatus getJobStatus(String jobId){
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
return describeJob(jobId).getStatus();
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
}

String getTaskState(String jobId, String taskId) {
final status = getTaskStatus(jobId, taskId)
return status ? status.getState().toString() : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.cloud.google.batch

import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task

import java.nio.file.Path

import com.google.cloud.batch.v1.GCS
Expand Down Expand Up @@ -581,4 +584,40 @@ class GoogleBatchTaskHandlerTest extends Specification {
and:
0 * client.deleteJob('job1') >> null
}

JobStatus makeJobStatus(JobStatus.State state, String desc = null) {
def builder = JobStatus.newBuilder()
builder = builder.setState(state)
jorgee marked this conversation as resolved.
Show resolved Hide resolved
if (desc)
builder = builder.addStatusEvents(
StatusEvent.newBuilder()
.setDescription(desc)
)
jorgee marked this conversation as resolved.
Show resolved Hide resolved
builder.build()
}

def 'should check job status when '() {

given:
def jobId = 'job-id'
def taskId = 'task-id'
def client = Mock(BatchClient)
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))

when:
client.listTasks(jobId) >>> [new LinkedList<Task>(), new LinkedList<Task>()]
client.getJobStatus(jobId) >>> [
null,
makeJobStatus(JobStatus.State.FAILED, 'Scheduling Failed'),
]
then:
handler.getTaskState() == "PENDING"
handler.getTaskState() == "FAILED"



jorgee marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading