Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Google Batch hang when internal error during scheduling #5567

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ import nextflow.trace.TraceRecord
@CompileStatic
class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private static Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/
private static final Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/

private static final Pattern BATCH_ERROR_REGEX = ~/Batch Error: code/

private GoogleBatchExecutor executor

Expand Down Expand Up @@ -98,6 +100,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private volatile long timestamp

/**
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
* A flag to indicate that the job has failed without launching any tasks
*/
private volatile boolean noTaskJobfailure

GoogleBatchTaskHandler(TaskRun task, GoogleBatchExecutor executor) {
super(task)
this.client = executor.getClient()
Expand Down Expand Up @@ -445,9 +452,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
*/
protected String getTaskState() {
final tasks = client.listTasks(jobId)
if( !tasks.iterator().hasNext() )
return 'PENDING'

if( !tasks.iterator().hasNext() ) {
// if there are no tasks checks the job status
return checkJobStatus()
}
final now = System.currentTimeMillis()
final delta = now - timestamp;
if( !taskState || delta >= 1_000) {
Expand All @@ -468,6 +476,21 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return taskState
}

protected String checkJobStatus() {
final jobStatus = client.getJobStatus(jobId)
final newState = jobStatus?.state as String
if (newState) {
taskState = newState
timestamp = System.currentTimeMillis()
if (newState == "FAILED") {
noTaskJobfailure = true
}
return taskState
} else {
return "PENDING"
}
}

static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED']

static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED']
Expand Down Expand Up @@ -510,13 +533,14 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

protected Throwable getJobError() {
try {
final status = client.getTaskStatus(jobId, taskId)
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
final events = noTaskJobfailure
? client.getJobStatus(jobId).getStatusEventsList()
: client.getTaskStatus(jobId, taskId).getStatusEventsList()
final lastEvent = events?.get(events.size() - 1)
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}"

final error = lastEvent?.description
if( error && EXIT_CODE_REGEX.matcher(error).find() ) {
if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) ) {
return new ProcessException(error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.google.cloud.batch.v1.BatchServiceClient
import com.google.cloud.batch.v1.BatchServiceSettings
import com.google.cloud.batch.v1.Job
import com.google.cloud.batch.v1.JobName
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.LocationName
import com.google.cloud.batch.v1.Task
import com.google.cloud.batch.v1.TaskGroupName
Expand Down Expand Up @@ -123,6 +124,10 @@ class BatchClient {
return describeTask(jobId, taskId).getStatus()
}

JobStatus getJobStatus(String jobId) {
return describeJob(jobId).getStatus()
}

String getTaskState(String jobId, String taskId) {
final status = getTaskStatus(jobId, taskId)
return status ? status.getState().toString() : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.cloud.google.batch

import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task

import java.nio.file.Path

import com.google.cloud.batch.v1.GCS
Expand Down Expand Up @@ -581,4 +584,39 @@ class GoogleBatchTaskHandlerTest extends Specification {
and:
0 * client.deleteJob('job1') >> null
}

JobStatus makeJobStatus(JobStatus.State state, String desc = null) {
final builder = JobStatus.newBuilder().setState(state)
if( desc ) {
builder.addStatusEvents(
StatusEvent.newBuilder()
.setDescription(desc)
)
}
builder.build()
}

def 'should check job status when no tasks in job '() {

given:
def jobId = 'job-id'
def taskId = 'task-id'
def client = Mock(BatchClient)
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))
final message = 'Job failed when Batch tries to schedule it: Batch Error: code - CODE_MACHINE_TYPE_NOT_FOUND'
when:
client.listTasks(jobId) >>> [new LinkedList<Task>(), new LinkedList<Task>()]
client.getJobStatus(jobId) >>> [
null,
makeJobStatus(JobStatus.State.FAILED, 'Scheduling Failed'),
makeJobStatus(JobStatus.State.FAILED, message)
]
then:
handler.getTaskState() == "PENDING"
handler.getTaskState() == "FAILED"
handler.getJobError().message == message
}
}
Loading