Skip to content

Commit

Permalink
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
Browse files Browse the repository at this point in the history
  • Loading branch information
adamrtalbot authored Dec 9, 2024
2 parents 5392e16 + c6434d4 commit 5b1568c
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 65 deletions.
2 changes: 1 addition & 1 deletion modules/nextflow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
api "com.beust:jcommander:1.35"
api("com.esotericsoftware.kryo:kryo:2.24.0") { exclude group: 'com.esotericsoftware.minlog', module: 'minlog' }
api('org.iq80.leveldb:leveldb:0.12')
api('org.eclipse.jgit:org.eclipse.jgit:6.10.0.202406032230-r')
api('org.eclipse.jgit:org.eclipse.jgit:7.1.0.202411261347-r')
api ('javax.activation:activation:1.1.1')
api ('javax.mail:mail:1.4.7')
api ('org.yaml:snakeyaml:2.2')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ class AssetManager {

protected Map refToMap(Ref ref, Map<String,Ref> remote) {
final entry = new HashMap(2)
final peel = git.getRepository().peel(ref)
final peel = git.getRepository().getRefDatabase().peel(ref)
final objId = peel.getPeeledObjectId() ?: peel.getObjectId()
// the branch or tag name
entry.name = shortenRefName(ref.name)
Expand Down Expand Up @@ -867,7 +867,7 @@ class AssetManager {
result << (name == current ? '*' : ' ')

if( level ) {
def peel = git.getRepository().peel(ref)
def peel = git.getRepository().getRefDatabase().peel(ref)
def obj = peel.getPeeledObjectId() ?: peel.getObjectId()
result << ' '
result << formatObjectId(obj, level == 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,28 @@ nxf_mem_watch() {
done

## result struct: pid %mem vmem rss peak_vmem peak_rss
echo "%mem=${nxf_stat_ret[1]}" >> $trace_file
echo "vmem=${nxf_stat_ret[2]}" >> $trace_file
echo "rss=${nxf_stat_ret[3]}" >> $trace_file
echo "peak_vmem=${nxf_stat_ret[4]}" >> $trace_file
echo "peak_rss=${nxf_stat_ret[5]}" >> $trace_file
echo "vol_ctxt=${nxf_stat_ret[6]}" >> $trace_file
echo "inv_ctxt=${nxf_stat_ret[7]}" >> $trace_file
printf "%s\n" \
"%mem=${nxf_stat_ret[1]}" \
"vmem=${nxf_stat_ret[2]}" \
"rss=${nxf_stat_ret[3]}" \
"peak_vmem=${nxf_stat_ret[4]}" \
"peak_rss=${nxf_stat_ret[5]}" \
"vol_ctxt=${nxf_stat_ret[6]}" \
"inv_ctxt=${nxf_stat_ret[7]}" >> "$trace_file" || >&2 echo "Error: Failed to append to file: $trace_file"
}

nxf_write_trace() {
echo "nextflow.trace/v2" > $trace_file
echo "realtime=$wall_time" >> $trace_file
echo "%cpu=$ucpu" >> $trace_file
echo "cpu_model=$cpu_model" >> $trace_file
echo "rchar=${io_stat1[0]}" >> $trace_file
echo "wchar=${io_stat1[1]}" >> $trace_file
echo "syscr=${io_stat1[2]}" >> $trace_file
echo "syscw=${io_stat1[3]}" >> $trace_file
echo "read_bytes=${io_stat1[4]}" >> $trace_file
echo "write_bytes=${io_stat1[5]}" >> $trace_file
printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
"syscr=${io_stat1[2]}" \
"syscw=${io_stat1[3]}" \
"read_bytes=${io_stat1[4]}" \
"write_bytes=${io_stat1[5]}" > "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"
}

nxf_trace_mac() {
Expand Down Expand Up @@ -199,16 +201,17 @@ nxf_trace_linux() {
local wall_time=$((end_millis-start_millis))
[ $NXF_DEBUG = 1 ] && echo "+++ STATS %CPU=$ucpu TIME=$wall_time I/O=${io_stat1[*]}"

echo "nextflow.trace/v2" > $trace_file
echo "realtime=$wall_time" >> $trace_file
echo "%cpu=$ucpu" >> $trace_file
echo "cpu_model=$cpu_model" >> $trace_file
echo "rchar=${io_stat1[0]}" >> $trace_file
echo "wchar=${io_stat1[1]}" >> $trace_file
echo "syscr=${io_stat1[2]}" >> $trace_file
echo "syscw=${io_stat1[3]}" >> $trace_file
echo "read_bytes=${io_stat1[4]}" >> $trace_file
echo "write_bytes=${io_stat1[5]}" >> $trace_file
printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
"syscr=${io_stat1[2]}" \
"syscw=${io_stat1[3]}" \
"read_bytes=${io_stat1[4]}" \
"write_bytes=${io_stat1[5]}" > "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"

## join nxf_mem_watch
[ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,28 @@ nxf_mem_watch() {
count=$((count+1))
done

echo "%mem=${nxf_stat_ret[1]}" >> $trace_file
echo "vmem=${nxf_stat_ret[2]}" >> $trace_file
echo "rss=${nxf_stat_ret[3]}" >> $trace_file
echo "peak_vmem=${nxf_stat_ret[4]}" >> $trace_file
echo "peak_rss=${nxf_stat_ret[5]}" >> $trace_file
echo "vol_ctxt=${nxf_stat_ret[6]}" >> $trace_file
echo "inv_ctxt=${nxf_stat_ret[7]}" >> $trace_file
printf "%s\n" \
"%mem=${nxf_stat_ret[1]}" \
"vmem=${nxf_stat_ret[2]}" \
"rss=${nxf_stat_ret[3]}" \
"peak_vmem=${nxf_stat_ret[4]}" \
"peak_rss=${nxf_stat_ret[5]}" \
"vol_ctxt=${nxf_stat_ret[6]}" \
"inv_ctxt=${nxf_stat_ret[7]}" >> "$trace_file" || >&2 echo "Error: Failed to append to file: $trace_file"
}

nxf_write_trace() {
echo "nextflow.trace/v2" > $trace_file
echo "realtime=$wall_time" >> $trace_file
echo "%cpu=$ucpu" >> $trace_file
echo "cpu_model=$cpu_model" >> $trace_file
echo "rchar=${io_stat1[0]}" >> $trace_file
echo "wchar=${io_stat1[1]}" >> $trace_file
echo "syscr=${io_stat1[2]}" >> $trace_file
echo "syscw=${io_stat1[3]}" >> $trace_file
echo "read_bytes=${io_stat1[4]}" >> $trace_file
echo "write_bytes=${io_stat1[5]}" >> $trace_file
printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
"syscr=${io_stat1[2]}" \
"syscw=${io_stat1[3]}" \
"read_bytes=${io_stat1[4]}" \
"write_bytes=${io_stat1[5]}" > "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"
}

nxf_trace_mac() {
Expand Down Expand Up @@ -173,16 +175,17 @@ nxf_trace_linux() {
local wall_time=$((end_millis-start_millis))
[ $NXF_DEBUG = 1 ] && echo "+++ STATS %CPU=$ucpu TIME=$wall_time I/O=${io_stat1[*]}"

echo "nextflow.trace/v2" > $trace_file
echo "realtime=$wall_time" >> $trace_file
echo "%cpu=$ucpu" >> $trace_file
echo "cpu_model=$cpu_model" >> $trace_file
echo "rchar=${io_stat1[0]}" >> $trace_file
echo "wchar=${io_stat1[1]}" >> $trace_file
echo "syscr=${io_stat1[2]}" >> $trace_file
echo "syscw=${io_stat1[3]}" >> $trace_file
echo "read_bytes=${io_stat1[4]}" >> $trace_file
echo "write_bytes=${io_stat1[5]}" >> $trace_file
printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
"syscr=${io_stat1[2]}" \
"syscw=${io_stat1[3]}" \
"read_bytes=${io_stat1[4]}" \
"write_bytes=${io_stat1[5]}" > "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"

[ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
wait $mem_proc 2>/dev/null || true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ import nextflow.trace.TraceRecord
@CompileStatic
class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private static Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/
private static final Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/

private static final Pattern BATCH_ERROR_REGEX = ~/Batch Error: code/

private GoogleBatchExecutor executor

Expand Down Expand Up @@ -98,6 +100,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private volatile long timestamp

/**
* A flag to indicate that the job has failed without launching any tasks
*/
private volatile boolean noTaskJobfailure

GoogleBatchTaskHandler(TaskRun task, GoogleBatchExecutor executor) {
super(task)
this.client = executor.getClient()
Expand Down Expand Up @@ -445,9 +452,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
*/
protected String getTaskState() {
final tasks = client.listTasks(jobId)
if( !tasks.iterator().hasNext() )
return 'PENDING'

if( !tasks.iterator().hasNext() ) {
// if there are no tasks checks the job status
return checkJobStatus()
}
final now = System.currentTimeMillis()
final delta = now - timestamp;
if( !taskState || delta >= 1_000) {
Expand All @@ -468,6 +476,21 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return taskState
}

protected String checkJobStatus() {
final jobStatus = client.getJobStatus(jobId)
final newState = jobStatus?.state as String
if (newState) {
taskState = newState
timestamp = System.currentTimeMillis()
if (newState == "FAILED") {
noTaskJobfailure = true
}
return taskState
} else {
return "PENDING"
}
}

static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED']

static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED']
Expand Down Expand Up @@ -510,13 +533,14 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

protected Throwable getJobError() {
try {
final status = client.getTaskStatus(jobId, taskId)
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
final events = noTaskJobfailure
? client.getJobStatus(jobId).getStatusEventsList()
: client.getTaskStatus(jobId, taskId).getStatusEventsList()
final lastEvent = events?.get(events.size() - 1)
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}"

final error = lastEvent?.description
if( error && EXIT_CODE_REGEX.matcher(error).find() ) {
if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) ) {
return new ProcessException(error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.google.cloud.batch.v1.BatchServiceClient
import com.google.cloud.batch.v1.BatchServiceSettings
import com.google.cloud.batch.v1.Job
import com.google.cloud.batch.v1.JobName
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.LocationName
import com.google.cloud.batch.v1.Task
import com.google.cloud.batch.v1.TaskGroupName
Expand Down Expand Up @@ -123,6 +124,10 @@ class BatchClient {
return describeTask(jobId, taskId).getStatus()
}

JobStatus getJobStatus(String jobId) {
return describeJob(jobId).getStatus()
}

String getTaskState(String jobId, String taskId) {
final status = getTaskStatus(jobId, taskId)
return status ? status.getState().toString() : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.cloud.google.batch

import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task

import java.nio.file.Path

import com.google.cloud.batch.v1.GCS
Expand Down Expand Up @@ -581,4 +584,39 @@ class GoogleBatchTaskHandlerTest extends Specification {
and:
0 * client.deleteJob('job1') >> null
}

JobStatus makeJobStatus(JobStatus.State state, String desc = null) {
final builder = JobStatus.newBuilder().setState(state)
if( desc ) {
builder.addStatusEvents(
StatusEvent.newBuilder()
.setDescription(desc)
)
}
builder.build()
}

def 'should check job status when no tasks in job '() {

given:
def jobId = 'job-id'
def taskId = 'task-id'
def client = Mock(BatchClient)
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))
final message = 'Job failed when Batch tries to schedule it: Batch Error: code - CODE_MACHINE_TYPE_NOT_FOUND'
when:
client.listTasks(jobId) >>> [new LinkedList<Task>(), new LinkedList<Task>()]
client.getJobStatus(jobId) >>> [
null,
makeJobStatus(JobStatus.State.FAILED, 'Scheduling Failed'),
makeJobStatus(JobStatus.State.FAILED, message)
]
then:
handler.getTaskState() == "PENDING"
handler.getTaskState() == "FAILED"
handler.getJobError().message == message
}
}

0 comments on commit 5b1568c

Please sign in to comment.