2
2
# vi: set ft=python sts=4 ts=4 sw=4 et:
3
3
"""Tests for workflow callbacks
4
4
"""
5
+ from datetime import datetime
6
+ from pathlib import Path
5
7
from time import sleep
8
+ import json
6
9
import pytest
7
10
import nipype .interfaces .utility as niu
8
11
import nipype .pipeline .engine as pe
12
+ from nipype .utils .draw_gantt_chart import _convert_string_to_datetime
9
13
10
14
try :
11
15
import pandas
@@ -71,22 +75,22 @@ def test_callback_exception(tmpdir, plugin, stop_on_first_crash):
71
75
72
76
@pytest .mark .parametrize ("plugin" , ["Linear" , "MultiProc" , "LegacyMultiProc" ])
73
77
@pytest .mark .skipif (not has_pandas , reason = "Test requires pandas" )
74
- def test_callback_gantt (tmpdir , plugin ) :
78
+ def test_callback_gantt (tmp_path : Path , plugin : str ) -> None :
75
79
import logging
76
80
77
81
from os import path
78
82
79
83
from nipype .utils .profiler import log_nodes_cb
80
84
from nipype .utils .draw_gantt_chart import generate_gantt_chart
81
85
82
- log_filename = path . join ( tmpdir , "callback.log" )
86
+ log_filename = tmp_path / "callback.log"
83
87
logger = logging .getLogger ("callback" )
84
88
logger .setLevel (logging .DEBUG )
85
89
handler = logging .FileHandler (log_filename )
86
90
logger .addHandler (handler )
87
91
88
92
# create workflow
89
- wf = pe .Workflow (name = "test" , base_dir = tmpdir . strpath )
93
+ wf = pe .Workflow (name = "test" , base_dir = str ( tmp_path ) )
90
94
f_node = pe .Node (
91
95
niu .Function (function = func , input_names = [], output_names = []), name = "f_node"
92
96
)
@@ -98,7 +102,21 @@ def test_callback_gantt(tmpdir, plugin):
98
102
plugin_args ["n_procs" ] = 8
99
103
wf .run (plugin = plugin , plugin_args = plugin_args )
100
104
101
- generate_gantt_chart (
102
- path .join (tmpdir , "callback.log" ), 1 if plugin == "Linear" else 8
103
- )
104
- assert path .exists (path .join (tmpdir , "callback.log.html" ))
105
+ with open (log_filename , "r" ) as _f :
106
+ loglines = _f .readlines ()
107
+
108
+ # test missing duration
109
+ first_line = json .loads (loglines [0 ])
110
+ if "duration" in first_line :
111
+ del first_line ["duration" ]
112
+ loglines [0 ] = json .dumps (first_line )
113
+
114
+ # test duplicate timestamp warning
115
+ loglines .append (loglines [- 1 ])
116
+
117
+ with open (log_filename , "w" ) as _f :
118
+ _f .write ("" .join (loglines ))
119
+
120
+ with pytest .warns (Warning ):
121
+ generate_gantt_chart (str (log_filename ), 1 if plugin == "Linear" else 8 )
122
+ assert (tmp_path / "callback.log.html" ).exists ()
0 commit comments