Skip to content

Commit

Permalink
Tune StreamingModeExecutionContext allocations (#22142)
Browse files Browse the repository at this point in the history
  • Loading branch information
steveniemitz authored Jul 6, 2022
1 parent 5465f38 commit ec47b12
Showing 1 changed file with 11 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,12 @@ public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
return StreamingModeExecutionContext.this.getSideInputNotifications();
}

private void ensureStateful(String errorPrefix) {
if (stateFamily == null) {
throw new IllegalStateException(errorPrefix + " for stateless step: " + getNameContext());
}
}

@Override
public <T, W extends BoundedWindow> void writePCollectionViewData(
TupleTag<?> tag,
Expand All @@ -738,10 +744,7 @@ public <T, W extends BoundedWindow> void writePCollectionViewData(
ByteString.Output windowStream = ByteString.newOutput();
windowCoder.encode(window, windowStream, Coder.Context.OUTER);

if (stateFamily == null) {
throw new IllegalStateException(
"Tried to write view data for stateless step: " + getNameContext());
}
ensureStateful("Tried to write view data");

Windmill.GlobalData.Builder builder =
Windmill.GlobalData.newBuilder()
Expand All @@ -768,9 +771,7 @@ public boolean issueSideInputFetch(
/** Note that there is data on the current key that is blocked on the given side input. */
@Override
public void addBlockingSideInput(Windmill.GlobalDataRequest sideInput) {
checkState(
stateFamily != null,
"Tried to set global data request for stateless step: " + getNameContext());
ensureStateful("Tried to set global data request");
sideInput =
Windmill.GlobalDataRequest.newBuilder(sideInput).setStateFamily(stateFamily).build();
outputBuilder.addGlobalDataRequests(sideInput);
Expand All @@ -787,22 +788,18 @@ public void addBlockingSideInputs(Iterable<Windmill.GlobalDataRequest> sideInput

@Override
public StateInternals stateInternals() {
checkState(
stateFamily != null, "Tried to access state for stateless step: " + getNameContext());
ensureStateful("Tried to access state");
return checkNotNull(stateInternals);
}

@Override
public TimerInternals timerInternals() {
checkState(
stateFamily != null, "Tried to access timers for stateless step: " + getNameContext());
ensureStateful("Tried to access timers");
return checkNotNull(systemTimerInternals);
}

public TimerInternals userTimerInternals() {
checkState(
stateFamily != null,
"Tried to access user timers for stateless step: " + getNameContext());
ensureStateful("Tried to access user timers");
return checkNotNull(userTimerInternals);
}
}
Expand Down

0 comments on commit ec47b12

Please sign in to comment.