26
26
import java .io .Serializable ;
27
27
import java .nio .ByteBuffer ;
28
28
import java .nio .charset .StandardCharsets ;
29
+ import java .util .ArrayDeque ;
29
30
import java .util .ArrayList ;
30
31
import java .util .Arrays ;
31
32
import java .util .List ;
32
33
import java .util .Map ;
33
34
import java .util .Objects ;
35
+ import java .util .Queue ;
34
36
import java .util .concurrent .CountDownLatch ;
35
37
import java .util .concurrent .TimeUnit ;
36
38
import java .util .concurrent .atomic .AtomicBoolean ;
42
44
import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
43
45
import io .fabric8 .kubernetes .client .KubernetesClientTimeoutException ;
44
46
import org .apache .commons .io .output .TeeOutputStream ;
47
+ import org .apache .commons .lang .ArrayUtils ;
45
48
46
49
import com .google .common .io .NullOutputStream ;
47
50
@@ -124,21 +127,22 @@ public Proc launch(ProcStarter starter) throws IOException {
124
127
for (String cmd : cmdEnvs ) {
125
128
if (cmd .startsWith (JENKINS_HOME )) {
126
129
cmdEnvs = new String [0 ];
127
- LOGGER .info ("Skipping injection of procstarter cmdenvs due to JENKINS_HOME present" );
130
+ LOGGER .fine ("Skipping injection of procstarter cmdenvs due to JENKINS_HOME present" );
128
131
break ;
129
132
}
130
133
}
131
134
String [] commands = getCommands (starter );
132
- return doLaunch (quiet , cmdEnvs , starter .stdout (), pwd , commands );
135
+ return doLaunch (quiet , cmdEnvs , starter .stdout (), starter . stderr (), pwd , commands );
133
136
}
134
137
135
- private Proc doLaunch (boolean quiet , String [] cmdEnvs , OutputStream outputForCaller , FilePath pwd , String ... commands ) throws IOException {
138
+ private Proc doLaunch (boolean quiet , String [] cmdEnvs , OutputStream outputForCaller , OutputStream errorForCaller , FilePath pwd , String ... commands ) throws IOException {
136
139
waitUntilContainerIsReady ();
137
140
138
141
final CountDownLatch started = new CountDownLatch (1 );
139
142
final CountDownLatch finished = new CountDownLatch (1 );
140
143
final AtomicBoolean alive = new AtomicBoolean (false );
141
144
145
+ final List <FilterOutExitCodeOutputStream > streamsToFilter = new ArrayList <FilterOutExitCodeOutputStream >(5 );
142
146
143
147
PrintStream printStream = launcher .getListener ().getLogger ();
144
148
OutputStream stream = printStream ;
@@ -148,22 +152,31 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC
148
152
printStream = new PrintStream (stream , false , StandardCharsets .UTF_8 .toString ());
149
153
}
150
154
155
+ stream = new FilterOutExitCodeOutputStream (stream , streamsToFilter );
156
+
151
157
// we need to keep the last bytes in the stream to parse the exit code as it is printed there
152
158
// so we use a buffer
153
159
ExitCodeOutputStream exitCodeOutputStream = new ExitCodeOutputStream ();
154
160
// send container output both to the job output and our buffer
155
161
stream = new TeeOutputStream (exitCodeOutputStream , stream );
162
+
163
+ // don't throw away error, but don't let it interrupt the parsing of output
164
+ OutputStream errorStream = stream ;
165
+ if (errorForCaller != null ) {
166
+ errorStream = new TeeOutputStream (errorForCaller , stream );
167
+ }
168
+
156
169
// Send to proc caller as well if they sent one
157
170
if (outputForCaller != null ) {
158
- stream = new TeeOutputStream (outputForCaller , stream );
171
+ stream = new TeeOutputStream (new FilterOutExitCodeOutputStream ( outputForCaller , streamsToFilter ) , stream );
159
172
}
160
173
161
174
String msg = "Executing shell script inside container [" + containerName + "] of pod [" + podName + "]" ;
162
175
LOGGER .log (Level .FINEST , msg );
163
176
printStream .println (msg );
164
177
165
178
Execable <String , ExecWatch > execable = client .pods ().inNamespace (namespace ).withName (podName ).inContainer (containerName )
166
- .redirectingInput ().writingOutput (stream ).writingError (stream )
179
+ .redirectingInput ().writingOutput (stream ).writingError (errorStream )
167
180
.usingListener (new ExecListener () {
168
181
@ Override
169
182
public void onOpen (Response response ) {
@@ -238,7 +251,7 @@ public void onClose(int i, String s) {
238
251
239
252
this .setupEnvironmentVariable (envVars , watch );
240
253
doExec (watch , printStream , commands );
241
- ContainerExecProc proc = new ContainerExecProc (watch , alive , finished , exitCodeOutputStream ::getExitCode );
254
+ ContainerExecProc proc = new ContainerExecProc (watch , alive , finished , exitCodeOutputStream ::getExitCode , streamsToFilter );
242
255
closables .add (proc );
243
256
return proc ;
244
257
} catch (InterruptedException ie ) {
@@ -256,7 +269,7 @@ public void kill(Map<String, String> modelEnvVars) throws IOException, Interrupt
256
269
String cookie = modelEnvVars .get (COOKIE_VAR );
257
270
258
271
int exitCode = doLaunch (
259
- true , null , null , null ,
272
+ true , null , null , null , null ,
260
273
"sh" , "-c" , "kill \\ `grep -l '" + COOKIE_VAR + "=" + cookie +"' /proc/*/environ | cut -d / -f 3 \\ `"
261
274
).join ();
262
275
@@ -322,32 +335,37 @@ public void close() throws IOException {
322
335
}
323
336
}
324
337
325
- private static void doExec (ExecWatch watch , PrintStream out , String ... statements ) {
338
+ private static void doExec (ExecWatch watch , PrintStream out , String ... statements ) throws IOException {
326
339
try {
327
- out . print ( "Executing command: " );
340
+ LOGGER . log ( Level . FINE , "Executing command: " );
328
341
StringBuilder sb = new StringBuilder ();
329
342
for (String stmt : statements ) {
330
343
String s = String .format ("\" %s\" " , stmt );
331
344
sb .append (s );
332
- out . print ( s );
345
+ LOGGER . log ( Level . FINE , s );
333
346
watch .getInput ().write (s .getBytes (StandardCharsets .UTF_8 ));
334
347
}
335
348
sb .append (NEWLINE );
336
- out .println ();
337
349
watch .getInput ().write (NEWLINE .getBytes (StandardCharsets .UTF_8 ));
338
350
339
351
// get the command exit code and print it padded so it is easier to parse in ContainerExecProc
340
352
// We need to exit so that we know when the command has finished.
341
353
sb .append (ExitCodeOutputStream .EXIT_COMMAND );
342
- out . print ( ExitCodeOutputStream .EXIT_COMMAND );
354
+ LOGGER . log ( Level . FINE , ExitCodeOutputStream .EXIT_COMMAND );
343
355
LOGGER .log (Level .FINEST , "Executing command: {0}" , sb );
344
- watch .getInput ().write (ExitCodeOutputStream .EXIT_COMMAND .getBytes (StandardCharsets .UTF_8 ));
356
+ // a hack only for sshagent. it writes out output after the exitcode has been printed if we don't wait a bit
357
+ if ((statements .length >= 1 && statements [0 ].equals ("ssh-add" ))
358
+ || (statements .length == 2 && statements [0 ].equals ("ssh-agent" ) && statements [1 ].equals ("-k" ))) {
359
+ String sleepExitCommand = "tmp_exit_status=$?; sleep 3; printf \" " + ExitCodeOutputStream .EXIT_COMMAND_TXT + " %3d\" $tmp_exit_status; " + EXIT + NEWLINE ;
360
+ watch .getInput ().write (sleepExitCommand .getBytes (StandardCharsets .UTF_8 ));
361
+ } else {
362
+ watch .getInput ().write (ExitCodeOutputStream .EXIT_COMMAND .getBytes (StandardCharsets .UTF_8 ));
363
+ }
345
364
346
- out .flush ();
347
365
watch .getInput ().flush ();
348
366
} catch (IOException e ) {
349
- e . printStackTrace ( out );
350
- throw new RuntimeException ( e ) ;
367
+ LOGGER . log ( Level . WARNING , "IOException during executing command" , e );
368
+ throw e ;
351
369
}
352
370
}
353
371
@@ -394,8 +412,6 @@ public ExitCodeOutputStream() {
394
412
@ Override
395
413
public void write (int b ) throws IOException {
396
414
queue .add (b );
397
- byte [] bb = new byte []{(byte ) b };
398
- System .out .print (new String (bb , StandardCharsets .UTF_8 ));
399
415
}
400
416
401
417
public int getExitCode () {
@@ -422,4 +438,37 @@ public int getExitCode() {
422
438
return i ;
423
439
}
424
440
}
441
+
442
+ static class FilterOutExitCodeOutputStream extends OutputStream {
443
+
444
+ public FilterOutExitCodeOutputStream (OutputStream sink , List <FilterOutExitCodeOutputStream > streamsToFilter ) {
445
+ this .sink = sink ;
446
+ streamsToFilter .add (this );
447
+ }
448
+
449
+ public final static byte [] EXIT_COMMAND_TXT_BYTES ;
450
+
451
+ private final static int QUEUE_SIZE = 20 ;
452
+ private final OutputStream sink ;
453
+ private final Queue <Byte > queue = new ArrayDeque <Byte >(QUEUE_SIZE );
454
+
455
+ static {
456
+ byte [] newLine = new byte [1 ];
457
+ Arrays .fill (newLine , "\n " .getBytes (StandardCharsets .UTF_8 )[0 ]);
458
+ EXIT_COMMAND_TXT_BYTES = ArrayUtils .addAll (newLine , ExitCodeOutputStream .EXIT_COMMAND_TXT .getBytes (StandardCharsets .UTF_8 ));
459
+ }
460
+
461
+ @ Override
462
+ public void write (int b ) throws IOException {
463
+ if (queue .size () >= QUEUE_SIZE )
464
+ sink .write (queue .poll ());
465
+ queue .offer ((byte ) b );
466
+ }
467
+
468
+ public void writeOutBuffer () throws IOException {
469
+ byte [] q = ArrayUtils .toPrimitive (queue .toArray (new Byte [queue .size ()]));
470
+ byte [] partToWriteOut = ContainerExecCutExitCodeUtil .getPartToWriteOut (q , EXIT_COMMAND_TXT_BYTES );
471
+ sink .write (partToWriteOut );
472
+ }
473
+ }
425
474
}
0 commit comments