1
1
import logging
2
- from multiprocessing .sharedctypes import Value
3
2
import re
4
3
from copy import deepcopy
5
4
from typing import Any , Dict , List , Union
10
9
from .config import config
11
10
from .context_syntax import GLOBAL_CONTEXT
12
11
from .executor import Executor
13
- from .io import (PVC , ArgoVar , FutureLen , InputArtifact , InputParameter , OutputArtifact ,
12
+ from .io import (PVC , ArgoVar , InputArtifact , InputParameter , OutputArtifact ,
14
13
OutputParameter )
15
14
from .op_template import OPTemplate , PythonScriptOPTemplate , ShellOPTemplate
16
15
from .resource import Resource
30
29
uploaded_python_packages = []
31
30
32
31
32
+ class FutureLen :
33
+ '''
34
+ To solve the problem of delayed acquisition for length of output
35
+ parameters in the debug mode
36
+ '''
37
+
38
+ def __init__ (self , par ):
39
+ self .par = par
40
+
41
+ def get (self ):
42
+ return len (self .par .value )
43
+
44
+
45
+ class FutureRange :
46
+ def __init__ (self , * args ):
47
+ self .args = args
48
+
49
+ def get (self ):
50
+ args = []
51
+ for i in self .args :
52
+ if isinstance (i , FutureLen ):
53
+ args .append (i .get ())
54
+ elif isinstance (i , (InputParameter , OutputParameter )):
55
+ args .append (i .value )
56
+ else :
57
+ args .append (i )
58
+ args = tuple (args )
59
+ return list (range (* args ))
60
+
61
+
33
62
def argo_range (
34
63
* args ,
35
64
) -> ArgoVar :
@@ -40,12 +69,7 @@ def argo_range(
40
69
Each argument can be Argo parameter
41
70
"""
42
71
if config ["mode" ] == "debug" :
43
- args = tuple (i .value if isinstance (i , (InputParameter , OutputParameter
44
- )) else i for i in args )
45
- for i in range (len (args )):
46
- if isinstance (args [i ], (InputParameter , OutputParameter )):
47
- args [i ] = args [i ].value
48
- return list (range (* args ))
72
+ return FutureRange (* args )
49
73
start = 0
50
74
step = 1
51
75
if len (args ) == 1 :
@@ -86,19 +110,13 @@ def argo_sequence(
86
110
with count, can be an Argo parameter
87
111
format: a printf format string to format the value in the sequence
88
112
"""
89
- if isinstance (count ,FutureLen ):
90
- method = argo_sequence .__name__
91
- if format != None :
92
- #the writing style may not be compliant
93
- method = method + ".{{%s}}" % format
94
- count .set_method (method = method )
95
- return count
96
- if isinstance (count , ArgoVar ):
97
- count = "{{=%s}}" % count .expr
98
- if isinstance (start , ArgoVar ):
99
- start = "{{=%s}}" % start .expr
100
- if isinstance (end , ArgoVar ):
101
- end = "{{=%s}}" % end .expr
113
+ if config ["mode" ] != "debug" :
114
+ if isinstance (count , ArgoVar ):
115
+ count = "{{=%s}}" % count .expr
116
+ if isinstance (start , ArgoVar ):
117
+ start = "{{=%s}}" % start .expr
118
+ if isinstance (end , ArgoVar ):
119
+ end = "{{=%s}}" % end .expr
102
120
return V1alpha1Sequence (count = count , start = start , end = end , format = format )
103
121
104
122
@@ -112,11 +130,7 @@ def argo_len(
112
130
param: the Argo parameter which is a list
113
131
"""
114
132
if config ["mode" ] == "debug" :
115
- par = param .value
116
- if isinstance (par ,FutureLen ):
117
- par .set_method (argo_len .__name__ )
118
- return par
119
- return len (par )
133
+ return FutureLen (param )
120
134
if isinstance (param , S3Artifact ):
121
135
try :
122
136
path_list = catalog_of_artifact (param )
@@ -533,8 +547,7 @@ def __init__(
533
547
self .inputs .parameters ["dflow_with_param" ] = InputParameter (
534
548
value = self .with_param )
535
549
self .with_param = None
536
- if self .with_sequence is not None and \
537
- not isinstance (self .with_sequence ,FutureLen ):
550
+ if self .with_sequence is not None :
538
551
if self .with_sequence .start is not None :
539
552
steps .inputs .parameters ["dflow_sequence_start" ] = \
540
553
InputParameter ()
@@ -683,6 +696,7 @@ def run(self, context):
683
696
684
697
import os
685
698
from copy import copy
699
+
686
700
from .dag import DAG
687
701
from .steps import Steps
688
702
@@ -750,25 +764,32 @@ def run(self, context):
750
764
return
751
765
752
766
if self .with_param is not None or self .with_sequence is not None :
753
- if isinstance (self .with_param , (InputParameter , OutputParameter )):
767
+ if isinstance (self .with_param , FutureRange ):
768
+ item_list = self .with_param .get ()
769
+ elif isinstance (self .with_param , (InputParameter ,
770
+ OutputParameter )):
754
771
item_list = self .with_param .value
755
772
elif isinstance (self .with_param , list ):
756
773
item_list = self .with_param
757
774
elif self .with_sequence is not None :
758
- if isinstance (self .with_sequence ,FutureLen ):
759
- self .with_sequence = self .with_sequence .get ()
760
775
start = 0
761
776
if self .with_sequence .start is not None :
762
777
start = self .with_sequence .start
778
+ if isinstance (start , FutureLen ):
779
+ start = start .get ()
763
780
if isinstance (start , (InputParameter , OutputParameter )):
764
781
start = start .value
765
782
if self .with_sequence .count is not None :
766
783
count = self .with_sequence .count
784
+ if isinstance (count , FutureLen ):
785
+ count = count .get ()
767
786
if isinstance (count , (InputParameter , OutputParameter )):
768
787
count = count .value
769
788
sequence = list (range (start , start + count ))
770
789
if self .with_sequence .end is not None :
771
790
end = self .with_sequence .end
791
+ if isinstance (end , FutureLen ):
792
+ end = end .get ()
772
793
if isinstance (end , (InputParameter , OutputParameter )):
773
794
end = end .value
774
795
sequence = list (range (start , end + 1 ))
0 commit comments