Skip to content

Commit

Permalink
Add periodic.Sequence and periodic.Impulse transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
hnnsgstfssn committed Mar 11, 2023
1 parent b1ea4d3 commit 09a518e
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
## New Features / Improvements

* The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)).
* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"context"
"flag"
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)

func init() {
register.Function4x0(update)
register.Function4x0(process)
register.Emitter2[int, string]()
register.Iter1[string]()
}

// update simulates an external call to get data for the side input.
func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))

// zero is the key used in beam.AddFixedKey which will be applied on the main input.
id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)

emit(id, externalData)
}

// process simulates processing of main input. It reads side input by key
func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)

iter := side(k)

var externalData []string
var externalDatum string
for iter(&externalDatum) {
externalData = append(externalData, externalDatum)
}

log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
}

func fatalf(err error, format string, args ...interface{}) {
if err != nil {
log.Fatalf(context.TODO(), format, args...)
}
}

func main() {
var inputTopic, periodicSequenceStart, periodicSequenceEnd string
var periodicSequenceInterval time.Duration

now := time.Now()

flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
"The time at which to start the periodic sequence.")

flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
"The time at which to end the periodic sequence.")

flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
"The interval between periodic sequence output.")

flag.StringVar(&inputTopic, "input_topic", "input",
"The PubSub topic from which to read the main input data.")

flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()

project := gcpopts.GetProject(ctx)
client, err := pubsub.NewClient(ctx, project)
fatalf(err, "Failed to create client: %v", err)
_, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
fatalf(err, "Failed to ensure topic: %v", err)

mainInput := beam.WindowInto(
s,
window.NewFixedWindows(periodicSequenceInterval),
beam.AddFixedKey( // simulate keyed data by adding a fixed key
s,
pubsubio.Read(
s,
project,
inputTopic,
nil,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart)
endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd)
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

beam.ParDo0(s, process, mainInput,
beam.SideInput{
Input: sideInput,
},
)

if _, err := beam.Run(context.Background(), "dataflow", p); err != nil {
log.Exitf(ctx, "Failed to run job: %v", err)
}
}
211 changes: 211 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package periodic contains transformations for generating periodic sequences.
package periodic

import (
"context"
"fmt"
"math"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
func(beam.EventTime, int64),
sdf.ProcessContinuation, error](&sequenceGenDoFn{})
register.Emitter2[beam.EventTime, int64]()
beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
}

// SequenceDefinition holds the configuration for generating a sequence of
// timestamped elements at an interval.
type SequenceDefinition struct {
Interval time.Duration
Start time.Time
End time.Time
}

type sequenceGenDoFn struct {
now func() time.Time
}

func (fn *sequenceGenDoFn) Setup() {
if fn.now == nil {
fn.now = time.Now
}
}

func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
return offsetrange.Restriction{
Start: int64(0),
End: int64(totalOutputs),
}
}

func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
return rest.Size()
}

func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
return []offsetrange.Restriction{rest}
}

// TruncateRestriction immediately truncates the entire restrication.
func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
return offsetrange.Restriction{}
}

func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
return &sdf.ManualWatermarkEstimator{}
}

func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime := fn.now()
we.UpdateWatermark(currentOutputTimestamp)
for currentOutputTimestamp.Before(currentTime) {
if rt.TryClaim(currentOutputIndex) {
emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
currentOutputIndex += 1
currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime = fn.now()
we.UpdateWatermark(currentOutputTimestamp)
} else if err := rt.GetError(); err != nil || rt.IsDone() {
// Stop processing on error or completion
return sdf.StopProcessing(), rt.GetError()
} else {
return sdf.ResumeProcessingIn(sd.Interval), nil
}
}

return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
}

type impulseConfig struct {
ApplyWindow bool

now func() time.Time
}

type impulseOption func(*impulseConfig) error

// ImpulseOption is a function that configures an [Impulse] transform.
type ImpulseOption = impulseOption

// WithApplyWindow configures the [Impulse] transform to apply a fixed window
// transform to the output PCollection.
func WithApplyWindow() ImpulseOption {
return func(o *impulseConfig) error {
o.ApplyWindow = true
return nil
}
}

func withNowFunc(now func() time.Time) ImpulseOption {
return func(o *impulseConfig) error {
o.now = now
return nil
}
}

// Impulse is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each
// element will be assigned to its own fixed window of interval size.
//
// The transform behaves the same as [Sequence] transform, but can be
// used as the first transform in a pipeline.
//
// The following applies to the arguments.
// - if interval <= 0, interval is set to [math.MaxInt64]
// - if start is a zero value [time.Time], start is set to the current time
// - if start is after end, start is set to end
//
// The PCollection generated by Impulse is unbounded and the output elements
// are the [time.UnixMilli] int64 values of the output timestamp.
func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection {
if interval <= 0 {
interval = math.MaxInt64
}
if start.IsZero() {
start = time.Now()
}
if start.After(end) {
start = end
}

conf := impulseConfig{}

for _, opt := range opts {
if err := opt(&conf); err != nil {
panic(fmt.Errorf("periodic.Impulse: invalid option: %v", err))
}
}

return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, conf, &sequenceGenDoFn{now: conf.now})
}

func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, conf impulseConfig, fn *sequenceGenDoFn) beam.PCollection {
sd := SequenceDefinition{Interval: interval, Start: start, End: end}
imp := beam.Create(s.Scope("ImpulseElement"), sd)
col := genSequence(s, imp, fn)
if conf.ApplyWindow {
return beam.WindowInto(s.Scope("ApplyWindowing"),
window.NewFixedWindows(interval), col)
}
return col
}

// Sequence is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals.
//
// The transform assigns each element a timestamp and will only output an
// element once the worker clock reach the output timestamp. Sequence is not
// able to guarantee that elements are output at the their exact timestamp, but
// it guarantees that elements will not be output prior to runtime timestamp.
//
// The transform will not output elements prior to the start time.
//
// Sequence receives [SequenceDefinition] elements and for each input element
// received, it will start generating output elements in the following pattern:
//
// - if element timestamp is less than current runtime then output element.
// - if element timestamp is greater than current runtime, wait until next
// element timestamp.
//
// The PCollection generated by Sequence is unbounded and the output elements
// are the [time.UnixMilli] int64 values of the output timestamp.
func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection {
return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{})
}

func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection {
return beam.ParDo(s.Scope("GenSequence"), fn, col)
}

0 comments on commit 09a518e

Please sign in to comment.