33
33
import org .apache .flink .core .io .InputStatus ;
34
34
import org .apache .flink .core .io .SimpleVersionedSerializer ;
35
35
import org .apache .flink .metrics .MetricGroup ;
36
- import org .apache .flink .runtime .jobgraph .OperatorID ;
37
36
import org .apache .flink .runtime .operators .coordination .OperatorEvent ;
38
37
import org .apache .flink .runtime .operators .coordination .OperatorEventGateway ;
39
38
import org .apache .flink .runtime .operators .coordination .OperatorEventHandler ;
47
46
import org .apache .flink .streaming .api .operators .source .TimestampsAndWatermarks ;
48
47
import org .apache .flink .streaming .api .operators .util .SimpleVersionedListState ;
49
48
import org .apache .flink .streaming .runtime .io .PushingAsyncDataInput ;
50
- import org .apache .flink .streaming .runtime .streamrecord .LatencyMarker ;
51
49
import org .apache .flink .streaming .runtime .tasks .ProcessingTimeService ;
52
50
import org .apache .flink .util .CollectionUtil ;
53
51
import org .apache .flink .util .FlinkRuntimeException ;
58
56
import java .io .IOException ;
59
57
import java .util .List ;
60
58
import java .util .concurrent .CompletableFuture ;
61
- import java .util .concurrent .ScheduledFuture ;
62
59
63
60
import static org .apache .flink .util .Preconditions .checkNotNull ;
64
- import static org .apache .flink .util .Preconditions .checkState ;
65
61
66
62
/**
67
63
* Base source operator only used for integrating the source reader which is proposed by FLIP-27. It
@@ -136,7 +132,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
136
132
/** Indicating whether the source operator has been closed. */
137
133
private boolean closed ;
138
134
139
- private LatencyMarkerEmitter <OUT > latencyMarerEmitter ;
135
+ private @ Nullable LatencyMarkerEmitter <OUT > latencyMarerEmitter ;
140
136
141
137
public SourceOperator (
142
138
FunctionWithException <SourceReaderContext , SourceReader <OUT , SplitT >, Exception >
@@ -237,19 +233,6 @@ public void open() throws Exception {
237
233
watermarkStrategy , getMetricGroup ());
238
234
}
239
235
240
- latencyMarerEmitter =
241
- new LatencyMarkerEmitter <>(
242
- getProcessingTimeService (),
243
- getExecutionConfig ().isLatencyTrackingConfigured ()
244
- ? getExecutionConfig ().getLatencyTrackingInterval ()
245
- : getContainingTask ()
246
- .getEnvironment ()
247
- .getTaskManagerInfo ()
248
- .getConfiguration ()
249
- .getLong (MetricOptions .LATENCY_INTERVAL ),
250
- getOperatorID (),
251
- getRuntimeContext ().getIndexOfThisSubtask ());
252
-
253
236
// restore the state if necessary.
254
237
final List <SplitT > splits = CollectionUtil .iterableToList (readerState .get ());
255
238
if (!splits .isEmpty ()) {
@@ -263,7 +246,6 @@ public void open() throws Exception {
263
246
sourceReader .start ();
264
247
265
248
eventTimeLogic .startPeriodicWatermarkEmits ();
266
- latencyMarerEmitter .startLatencyMarkerEmit ();
267
249
}
268
250
269
251
@ Override
@@ -272,7 +254,7 @@ public void close() throws Exception {
272
254
eventTimeLogic .stopPeriodicWatermarkEmits ();
273
255
}
274
256
if (latencyMarerEmitter != null ) {
275
- latencyMarerEmitter .stopLatencyMarkerEmit ();
257
+ latencyMarerEmitter .close ();
276
258
}
277
259
if (sourceReader != null ) {
278
260
sourceReader .close ();
@@ -289,7 +271,7 @@ public void dispose() throws Exception {
289
271
sourceReader .close ();
290
272
}
291
273
if (latencyMarerEmitter != null ) {
292
- latencyMarerEmitter .stopLatencyMarkerEmit ();
274
+ latencyMarerEmitter .close ();
293
275
}
294
276
}
295
277
@@ -306,11 +288,31 @@ public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
306
288
307
289
// this creates a batch or streaming output based on the runtime mode
308
290
currentMainOutput = eventTimeLogic .createMainOutput (output );
309
- latencyMarerEmitter . emitMainOutput (output );
291
+ initializeLatencyMarkerEmitter (output );
310
292
lastInvokedOutput = output ;
311
293
return sourceReader .pollNext (currentMainOutput );
312
294
}
313
295
296
+ private void initializeLatencyMarkerEmitter (DataOutput <OUT > output ) {
297
+ long latencyTrackingInterval =
298
+ getExecutionConfig ().isLatencyTrackingConfigured ()
299
+ ? getExecutionConfig ().getLatencyTrackingInterval ()
300
+ : getContainingTask ()
301
+ .getEnvironment ()
302
+ .getTaskManagerInfo ()
303
+ .getConfiguration ()
304
+ .getLong (MetricOptions .LATENCY_INTERVAL );
305
+ if (latencyTrackingInterval > 0 ) {
306
+ latencyMarerEmitter =
307
+ new org .apache .flink .streaming .api .operators .LatencyMarkerEmitter <>(
308
+ getProcessingTimeService (),
309
+ output ::emitLatencyMarker ,
310
+ latencyTrackingInterval ,
311
+ getOperatorID (),
312
+ getRuntimeContext ().getIndexOfThisSubtask ());
313
+ }
314
+ }
315
+
314
316
@ Override
315
317
public void snapshotState (StateSnapshotContext context ) throws Exception {
316
318
long checkpointId = context .getCheckpointId ();
@@ -377,80 +379,4 @@ public SourceReader<OUT, SplitT> getSourceReader() {
377
379
ListState <SplitT > getReaderState () {
378
380
return readerState ;
379
381
}
380
-
381
- private static class LatencyMarkerEmitter <OUT > {
382
-
383
- private final ProcessingTimeService timeService ;
384
-
385
- private final long latencyTrackingInterval ;
386
-
387
- private final OperatorID operatorId ;
388
-
389
- private final int subtaskIndex ;
390
-
391
- @ Nullable private DataOutput <OUT > currentMainOutput ;
392
-
393
- @ Nullable
394
- private ScheduledFuture <?> latencyMarkerTimer ;
395
-
396
- public LatencyMarkerEmitter (
397
- final ProcessingTimeService timeService ,
398
- long latencyTrackingInterval ,
399
- final OperatorID operatorId ,
400
- final int subtaskIndex ) {
401
- this .timeService = timeService ;
402
- this .latencyTrackingInterval = latencyTrackingInterval ;
403
- this .operatorId = operatorId ;
404
- this .subtaskIndex = subtaskIndex ;
405
- }
406
-
407
- // ------------------------------------------------------------------------
408
-
409
- public void emitMainOutput (PushingAsyncDataInput .DataOutput <OUT > output ) {
410
- // At the moment, we assume only one output is ever created!
411
- // This assumption is strict, currently, because many of the classes in this
412
- // implementation
413
- // do not support re-assigning the underlying output
414
- checkState (currentMainOutput == null , "Main output has already been set." );
415
- currentMainOutput = output ;
416
- }
417
-
418
- public void startLatencyMarkerEmit () {
419
- checkState (
420
- latencyMarkerTimer == null , "Latency marker emitter has already been started" );
421
- if (latencyTrackingInterval == 0 ) {
422
- // a value of zero means not activated
423
- return ;
424
- }
425
- latencyMarkerTimer =
426
- timeService .scheduleWithFixedDelay (
427
- this ::triggerLatencyMarkerEmit , 0L , latencyTrackingInterval );
428
- }
429
-
430
- public void stopLatencyMarkerEmit () {
431
- if (latencyMarkerTimer != null ) {
432
- latencyMarkerTimer .cancel (false );
433
- latencyMarkerTimer = null ;
434
- }
435
- }
436
-
437
- void triggerLatencyMarkerEmit (@ SuppressWarnings ("unused" ) long wallClockTimestamp ) {
438
- if (currentMainOutput != null ) {
439
- try {
440
- // ProcessingTimeService callbacks are executed under the
441
- // checkpointing lock
442
- currentMainOutput .emitLatencyMarker (
443
- new LatencyMarker (
444
- timeService .getCurrentProcessingTime (),
445
- operatorId ,
446
- subtaskIndex ));
447
- } catch (Throwable t ) {
448
- // we catch the Throwable here so that we don't trigger the
449
- // processing
450
- // timer services async exception handler
451
- LOG .warn ("Error while emitting latency marker." , t );
452
- }
453
- }
454
- }
455
- }
456
382
}
0 commit comments