Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Timer Batch Limit in DoFnOp #139

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.34'
project.version = '2.45.35'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.34
sdk_version=2.45.34
version=2.45.35
sdk_version=2.45.35

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
})
public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
private static final Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
public static final String TIMER_BATCH_LIMIT_CONFIG = "beam.samza.dofnop.timerBatchLimit";

private final TupleTag<FnOutT> mainOutputTag;
private final DoFn<InT, FnOutT> doFn;
Expand Down Expand Up @@ -124,6 +125,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
private transient ExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
private transient boolean bundleDisabled;
private transient int timerBatchLimit;

private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<?, PCollectionView<?>> sideInputMapping;
Expand Down Expand Up @@ -183,6 +185,7 @@ public void open(
Context context,
Scheduler<KeyedTimerData<Void>> timerRegistry,
OpEmitter<OutT> emitter) {
this.timerBatchLimit = config.getInt(TIMER_BATCH_LIMIT_CONFIG, Integer.MAX_VALUE);
this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
Expand Down Expand Up @@ -330,9 +333,12 @@ private void doProcessWatermark(Instant watermark, OpEmitter<OutT> emitter) {
timerInternalsFactory.setInputWatermark(actualInputWatermark);

Collection<? extends KeyedTimerData<?>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
// Process timers in chunks
Iterator<? extends KeyedTimerData<?>> timerIterator = readyTimers.iterator();
while (timerIterator.hasNext()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might not work as expected. Even the timeres are processed in chunks, the output will be collected for all the trunks, since they are processed in a single Samza flatmap function. Seems to me we have to break this DoFnOp into two steps, first is to emit the timers in batches, and then the second is to process the timers in batches.

pushbackFnRunner.startBundle();
for (KeyedTimerData<?> keyedTimerData : readyTimers) {
for (int i = 0; i < timerBatchLimit && timerIterator.hasNext(); i++) {
KeyedTimerData<?> keyedTimerData = timerIterator.next();
fireTimer(keyedTimerData);
}
pushbackFnRunner.finishBundle();
Expand Down
Loading