You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
EventTime timers fire when the input watermark passes them in EventTime. ProcessingTime timers fire when at the real time that was set. Both are associated with arbitrary user callbacks associated with the transform.
The output watermark can be held back by the timer using watermark holds, otherwise, elements emitted by the timer callback are emitted at the timer's fireing time.
The important bit is that holds only affect the output watermark, and not the input watermark.
Like state API use, timer processing uses keyed stateful processing. Timers are associated with a user key (and thus associated user state).
Timers are passed to the SDK along the Data Channel, in its own "union'd" data field.
In practice, to ensure the SDK has processed all data elements in event time before a fireing timer, they are split into separate data element batches.
This doesn't mean they can't be processed in the same bundle however. Just that the Data channel messages for timers should be in a separate Elements message on the data channel, than preceeding data, and succeeding data.
At time of writing, this means we need to adjust how data is moved from the ElementManager to the Bundle to be available to the DataChannel for processing. Further, it needs to not prevent a later implementation of Elements on ProcessBundleRequest (and Response).
Data is currently joined together, and sent to the SDK in a single blob, no matter how many elements are present.
It's done this way to ensure an identical path for both Runner transforms, and SDK transforms. Notionally, with stateful handling, we can change up and make GBK handled eagerly instead of on demand, since the aggregation could pre-sort the received bytes by key for the stage. Currently, we must re-parse the data when the stage executes: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L308 Flattens are already "parsed" so any additional re-copying of the data is wasteful.
If we assume the runner transform handling will change up a little, we can instead improve handling for SDK bound bundles. Right now, as linked above, we only handle Data, not data and timers. That means we need to update worker.B to have an ordered list potentially interleaved Data and Timer blocks. Then the ElementSending loop iterates over that list with given blocks for each type. This moves the consolidation logic into the ElementManager.
Data&Timers (Elements) from here are sent directly to the SDK in the Data stream handler:
The TentativeData structure in the bundle here would need to cache timers as part of the output data, from producing transform to timer family, to timer tag to timer. A new method should be added to the TentativeData adding this. This avoids additional plumbing to handle for timers.
engine.ElementManager.PersistBundle will need to process and handle timers, routing them to the pending part of the same stage, as Timers are a loop within the same stage. This is handy, as it means we don't need to maintain a transform -> stage lookup structure in the ElementManager like we do for Data outputs.
EventTime timers could be treated as "elements", we can ensure the per-key pendingHeaps are event time ordered. This permits the timers to fire in event time order.
With that approach, startBundle then needs to handle each element individually for the stateful keys and check if it's a timer. Not the most efficient, but probably fine as an initial approach.
Note: Prism is intending to be correct first, and optimizations can be found later. We could probably pre-block out a meta heap (a heap of heaps), by using the heap invariant to ensure that each sub heap is a contiguous type. The optimization goal would be to put as much work as possible on the PersistBundle goroutine, so that the bundle scheduling goroutine doesn't do as much when extracting work.
However, in particular, timer values must not be emitted until the input watermark for the stage is greater than their event time. Timers are unique in that they happen in the future (in event time), and loop back in on themselves. This would be handled in startBundle.
Finally, timer/State holds are important for processing correctness. We need to plumb through the hold from persist bundle to the watermark refreshing, retaining this. This is about as normal however for various persisted elements (or residual handling for SDFs). But there's additional plumbing to handle here, since they need to be available for watermark refreshes.
What I missed from the above discussion is that for a given Timer FamilyID + Tag + Key only a single timer can exist. So simply inlining the timers isn't entirely right, since
the last set timer needs to be the only one that fires. (eg. See this Java integration test
We can either store the timers separately, which requires checking the recent timer event time against the element event times. That feels too heavy weight.
Or we can update the holding struct at the user key level to be the heap of elements & timers as described and a map field for representing the last set timer for a given timer family + tag event time. We only look up this time when the element is a timer, and confirm that they match. If they don't, we throw away that timer. The main downside of this approach is that it's possible to end up with a large set of never-firing timers in memory. However, this would require poor behavior on the SDK and the user code.
Bundles with Timers require SDKs to wait until there is a "Done" signal for every transform (not timer) being processed. Otherwise the SDK will hang. So bundle tracking needs to know which transform(s) need timers, even before any timers are written by the SDK, so the "is_last" bit can be sent to the SDK for all applicable transforms.
Now that prism has keyed stateful processing (#28543), it's possible to add timers.
Timers come in two flavours: EventTime, and ProcessingTime. They are described in the Programming guide: https://beam.apache.org/documentation/programming-guide/#timers But I'll re-iterate key points here.
EventTime timers fire when the input watermark passes them in EventTime. ProcessingTime timers fire when at the real time that was set. Both are associated with arbitrary user callbacks associated with the transform.
The output watermark can be held back by the timer using watermark holds, otherwise, elements emitted by the timer callback are emitted at the timer's fireing time.
The important bit is that holds only affect the output watermark, and not the input watermark.
Like state API use, timer processing uses keyed stateful processing. Timers are associated with a user key (and thus associated user state).
Timers are passed to the SDK along the Data Channel, in its own "union'd" data field.
In practice, to ensure the SDK has processed all data elements in event time before a fireing timer, they are split into separate data element batches.
beam/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
Line 769 in bc42a63
This doesn't mean they can't be processed in the same bundle however. Just that the Data channel messages for timers should be in a separate Elements message on the data channel, than preceeding data, and succeeding data.
At time of writing, this means we need to adjust how data is moved from the ElementManager to the Bundle to be available to the DataChannel for processing. Further, it needs to not prevent a later implementation of Elements on ProcessBundleRequest (and Response).
Data is currently joined together, and sent to the SDK in a single blob, no matter how many elements are present.
beam/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Line 136 in bc42a63
But otherwise, it's looked up from the element manager here:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L81
It's done this way to ensure an identical path for both Runner transforms, and SDK transforms. Notionally, with stateful handling, we can change up and make GBK handled eagerly instead of on demand, since the aggregation could pre-sort the received bytes by key for the stage. Currently, we must re-parse the data when the stage executes: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L308 Flattens are already "parsed" so any additional re-copying of the data is wasteful.
If we assume the runner transform handling will change up a little, we can instead improve handling for SDK bound bundles. Right now, as linked above, we only handle Data, not data and timers. That means we need to update
worker.B
to have an ordered list potentially interleaved Data and Timer blocks. Then the ElementSending loop iterates over that list with given blocks for each type. This moves the consolidation logic into the ElementManager.Data&Timers (Elements) from here are sent directly to the SDK in the Data stream handler:
beam/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Line 378 in bc42a63
Response elements from the SDK will need a new handler to loop over resp.GetTimers(). We loop over data responses here:
beam/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Line 354 in bc42a63
The TentativeData structure in the bundle here would need to cache timers as part of the output data, from producing transform to timer family, to timer tag to timer. A new method should be added to the TentativeData adding this. This avoids additional plumbing to handle for timers.
engine.ElementManager.PersistBundle
will need to process and handle timers, routing them to the pending part of the same stage, as Timers are a loop within the same stage. This is handy, as it means we don't need to maintain a transform -> stage lookup structure in the ElementManager like we do for Data outputs.EventTime timers could be treated as "elements", we can ensure the per-key pendingHeaps are event time ordered. This permits the timers to fire in event time order.
beam/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Line 684 in bc42a63
beam/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Line 37 in bc42a63
With that approach,
startBundle
then needs to handle each element individually for the stateful keys and check if it's a timer. Not the most efficient, but probably fine as an initial approach.Note: Prism is intending to be correct first, and optimizations can be found later. We could probably pre-block out a meta heap (a heap of heaps), by using the heap invariant to ensure that each sub heap is a contiguous type. The optimization goal would be to put as much work as possible on the PersistBundle goroutine, so that the bundle scheduling goroutine doesn't do as much when extracting work.
However, in particular, timer values must not be emitted until the input watermark for the stage is greater than their event time. Timers are unique in that they happen in the future (in event time), and loop back in on themselves. This would be handled in
startBundle
.Finally, timer/State holds are important for processing correctness. We need to plumb through the hold from persist bundle to the watermark refreshing, retaining this. This is about as normal however for various persisted elements (or residual handling for SDFs). But there's additional plumbing to handle here, since they need to be available for watermark refreshes.
What I missed from the above discussion is that for a given Timer FamilyID + Tag + Key only a single timer can exist. So simply inlining the timers isn't entirely right, since
the last set timer needs to be the only one that fires. (eg. See this Java integration test
We can either store the timers separately, which requires checking the recent timer event time against the element event times. That feels too heavy weight.
Or we can update the holding struct at the user key level to be the heap of elements & timers as described and a map field for representing the last set timer for a given timer family + tag event time. We only look up this time when the element is a timer, and confirm that they match. If they don't, we throw away that timer. The main downside of this approach is that it's possible to end up with a large set of never-firing timers in memory. However, this would require poor behavior on the SDK and the user code.
Bundles with Timers require SDKs to wait until there is a "Done" signal for every transform (not timer) being processed. Otherwise the SDK will hang. So bundle tracking needs to know which transform(s) need timers, even before any timers are written by the SDK, so the "is_last" bit can be sent to the SDK for all applicable transforms.
Note: Most runners don't support timers in Merging Windows, so this is an acceptable implementation limitation if it comes down to it. See https://beam.apache.org/documentation/runners/capability-matrix/when-in-processing-time/
The text was updated successfully, but these errors were encountered: