Skip to content

Commit

Permalink
[#24789] Handle missing FullValue wrapper.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Dec 27, 2022
1 parent 14c3584 commit 93dbd73
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions sdks/go/pkg/beam/core/runtime/exec/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,25 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d
//
// Output Diagram:
//
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (original input)
// Elm2: *FullValue {
// Elm: Restriction
// Elm2: Watermark estimator state
// }
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (original input)
// Elm2: *FullValue {
// Elm: Restriction
// Elm2: Watermark estimator state
// }
// Elm2: float64 (size)
// Windows
// Timestamps
// }
// Elm2: float64 (size)
// Windows
// Timestamps
// }
func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
rest := elm.Elm2.(*FullValue).Elm
ws := elm.Elm2.(*FullValue).Elm2
mainElm := elm.Elm.(*FullValue)

// If receiving directly from a datasource,
// the element may not be wrapped in a *FullValue
mainElm := convertIfNeeded(elm.Elm, &FullValue{})

splitRests := n.splitInv.Invoke(mainElm, rest)

Expand Down

0 comments on commit 93dbd73

Please sign in to comment.