Skip to content

Commit 61c9842

Browse files
authored
Added context wrapper option for processors (#420)
* Added context wrapper option for processors * remove explicit poll-timeout
1 parent d17c843 commit 61c9842

15 files changed

+166
-61
lines changed

examples/docker-compose.yml

+16
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@ services:
1010
ZOO_MY_ID: 1
1111
ZOO_PORT: 2181
1212
ZOO_SERVERS: server.1=zoo1:2888:3888
13+
ulimits:
14+
nofile:
15+
soft: 65536
16+
hard: 65536
1317
kafka1:
1418
image: confluentinc/cp-kafka:5.4.0
1519
hostname: kafka1
1620
container_name: kafka1
21+
ulimits:
22+
nofile:
23+
soft: 65536
24+
hard: 65536
1725
ports:
1826
- "9092:9092"
1927
environment:
@@ -33,6 +41,10 @@ services:
3341
image: confluentinc/cp-kafka:5.4.0
3442
hostname: kafka2
3543
container_name: kafka2
44+
ulimits:
45+
nofile:
46+
soft: 65536
47+
hard: 65536
3648
ports:
3749
- "9093:9093"
3850
environment:
@@ -52,6 +64,10 @@ services:
5264
image: confluentinc/cp-kafka:5.4.0
5365
hostname: kafka3
5466
container_name: kafka3
67+
ulimits:
68+
nofile:
69+
soft: 65536
70+
hard: 65536
5571
ports:
5672
- "9094:9094"
5773
environment:

graph.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ func (gg *GroupGraph) joint(topic string) bool {
168168
// DefineGroup creates a group graph with a given group name and a list of
169169
// edges.
170170
func DefineGroup(group Group, edges ...Edge) *GroupGraph {
171-
gg := GroupGraph{group: string(group),
171+
gg := GroupGraph{
172+
group: string(group),
172173
codecs: make(map[string]Codec),
173174
callbacks: make(map[string]ProcessCallback),
174175
joinCheck: make(map[string]bool),

integrationtest/processor_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,76 @@ func TestProcessorVisit(t *testing.T) {
223223
require.NoError(t, err)
224224
}
225225

226+
type (
227+
gokaCtx = goka.Context
228+
wrapper struct {
229+
gokaCtx
230+
value int64
231+
}
232+
)
233+
234+
func (w *wrapper) SetValue(value interface{}, options ...goka.ContextOption) {
235+
val := value.(int64)
236+
w.value = val
237+
w.gokaCtx.SetValue(val + 1)
238+
}
239+
240+
func TestProcessorContextWrapper(t *testing.T) {
241+
gkt := tester.New(t)
242+
243+
// holds the last wrapper
244+
var w *wrapper
245+
246+
// create a new processor, registering the tester
247+
proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("proc",
248+
goka.Input("input", new(codec.Int64), func(ctx goka.Context, msg interface{}) {
249+
ctx.SetValue(msg)
250+
}),
251+
goka.Visitor("visit", func(ctx goka.Context, msg interface{}) {
252+
ctx.SetValue(msg.(int64))
253+
}),
254+
goka.Persist(new(codec.Int64)),
255+
),
256+
goka.WithTester(gkt),
257+
goka.WithContextWrapper(func(ctx goka.Context) goka.Context {
258+
w = &wrapper{
259+
gokaCtx: ctx,
260+
}
261+
return w
262+
}),
263+
)
264+
265+
ctx, cancel := context.WithCancel(context.Background())
266+
done := make(chan struct{})
267+
268+
// start it
269+
go func() {
270+
defer close(done)
271+
err := proc.Run(ctx)
272+
if err != nil {
273+
t.Errorf("error running processor: %v", err)
274+
}
275+
}()
276+
277+
// send a message
278+
gkt.Consume("input", "key", int64(23))
279+
280+
// both wrapper value and real value are set
281+
require.EqualValues(t, 23, w.value)
282+
require.EqualValues(t, 24, gkt.TableValue("proc-table", "key"))
283+
284+
// also the visitor should wrap the context
285+
err := proc.VisitAll(ctx, "visit", int64(815))
286+
require.NoError(t, err)
287+
288+
// both values are set again
289+
require.EqualValues(t, 815, w.value)
290+
require.EqualValues(t, 816, gkt.TableValue("proc-table", "key"))
291+
292+
cancel()
293+
<-done
294+
}
295+
226296
/*
227297
import (
228298
"context"

options.go

+12
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ type poptions struct {
129129

130130
updateCallback UpdateCallback
131131
rebalanceCallback RebalanceCallback
132+
contextWrapper ContextWrapper
132133
partitionChannelSize int
133134
hasher func() hash.Hash32
134135
nilHandling NilHandling
@@ -147,6 +148,15 @@ type poptions struct {
147148
}
148149
}
149150

151+
// WithContextWrapper allows to intercept the context passed to each callback invocation.
152+
// The wrapper function will be called concurrently across all partitions the returned context
153+
// must not be shared.
154+
func WithContextWrapper(wrapper ContextWrapper) ProcessorOption {
155+
return func(o *poptions, gg *GroupGraph) {
156+
o.contextWrapper = wrapper
157+
}
158+
}
159+
150160
// WithUpdateCallback defines the callback called upon recovering a message
151161
// from the log.
152162
func WithUpdateCallback(cb UpdateCallback) ProcessorOption {
@@ -329,6 +339,8 @@ func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error
329339
opt.log = defaultLogger
330340
opt.hasher = DefaultHasher()
331341
opt.backoffResetTime = defaultBackoffResetTime
342+
// default context wrapper returns the original context
343+
opt.contextWrapper = func(ctx Context) Context { return ctx }
332344

333345
for _, o := range opts {
334346
o(opt, gg)

partition_processor.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type visit struct {
4747

4848
type commitCallback func(msg *message, meta string)
4949

50+
type ContextWrapper func(ctx Context) Context
51+
5052
// PartitionProcessor handles message processing of one partition by serializing
5153
// messages from different input topics.
5254
// It also handles joined tables as well as lookup views (managed by `Processor`).
@@ -595,7 +597,6 @@ func (pp *PartitionProcessor) processVisit(ctx context.Context, wg *sync.WaitGro
595597
emitterDefaultHeaders: pp.opts.producerDefaultHeaders,
596598
table: pp.table,
597599
}
598-
599600
// start context and call the ProcessorCallback cb
600601
msgContext.start()
601602

@@ -610,8 +611,8 @@ func (pp *PartitionProcessor) processVisit(ctx context.Context, wg *sync.WaitGro
610611
}
611612
}()
612613

613-
// now call cb
614-
cb(msgContext, v.meta)
614+
// now call cb, wrap the context
615+
cb(pp.opts.contextWrapper(msgContext), v.meta)
615616
msgContext.finish(nil)
616617
return
617618
}
@@ -673,7 +674,7 @@ func (pp *PartitionProcessor) processMessage(ctx context.Context, wg *sync.WaitG
673674
msgContext.start()
674675

675676
// now call cb
676-
cb(msgContext, m)
677+
cb(pp.opts.contextWrapper(msgContext), m)
677678
msgContext.finish(nil)
678679
return nil
679680
}
@@ -706,7 +707,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
706707
}
707708

708709
defer it.Release()
709-
710+
710711
stopping, doneWaitingForStop := pp.stopping()
711712
defer doneWaitingForStop()
712713

processor.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Processor struct {
4040
log logger
4141
brokers []string
4242

43+
// hook used to be notified whenever the processor has rebalanced to a new assignment
4344
rebalanceCallback RebalanceCallback
4445

4546
// rwmutex protecting read/write of partitions and lookuptables.
@@ -737,8 +738,8 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error {
737738
// WaitForReady waits until the processor is ready to consume messages
738739
// (or is actually consuming messages)
739740
// i.e., it is done catching up all partition tables, joins and lookup tables
740-
func (g *Processor) WaitForReady() {
741-
g.waitForReady(context.Background())
741+
func (g *Processor) WaitForReady() error {
742+
return g.waitForReady(context.Background())
742743
}
743744

744745
// WaitForReadyContext is context aware option of WaitForReady.

systemtest/commit_test.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import (
1212
"github.com/stretchr/testify/require"
1313
)
1414

15-
const (
16-
pollWaitSecs = 15.0
17-
)
18-
1915
// TestAutoCommit tests/demonstrates the behavior of disabling the auto-commit functionality.
2016
// The autocommiter sends the offsets of the marked messages to the broker regularily. If the processor shuts down
2117
// (or the group rebalances), the offsets are sent one last time, so just turning it of is not enough.
@@ -78,7 +74,7 @@ func TestAutoCommit(t *testing.T) {
7874

7975
// run the first processor
8076
_, cancel, done := runProc(createProc())
81-
pollTimed(t, "all-received1", pollWaitSecs, func() bool {
77+
pollTimed(t, "all-received1", func() bool {
8278
return len(offsets) == 10 && offsets[0] == 0
8379
})
8480

@@ -96,7 +92,7 @@ func TestAutoCommit(t *testing.T) {
9692

9793
// --> we'll receive all messages again
9894
// --> i.e., no offsets were committed
99-
pollTimed(t, "all-received2", pollWaitSecs, func() bool {
95+
pollTimed(t, "all-received2", func() bool {
10096
return len(offsets) == 10 && offsets[0] == 0
10197
})
10298

@@ -153,7 +149,7 @@ func TestUnmarkedMessages(t *testing.T) {
153149

154150
// run the first processor
155151
runProc(createProc())
156-
pollTimed(t, "all-received1", pollWaitSecs, func() bool {
152+
pollTimed(t, "all-received1", func() bool {
157153
return len(values) == 2 && values[0] == 1
158154
})
159155

@@ -162,7 +158,7 @@ func TestUnmarkedMessages(t *testing.T) {
162158

163159
// restart -> we'll only receive the second message
164160
runProc(createProc())
165-
pollTimed(t, "all-received2", pollWaitSecs, func() bool {
161+
pollTimed(t, "all-received2", func() bool {
166162
return len(values) == 1 && values[0] == 2
167163
})
168164
}

systemtest/emitter_disconnect_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestEmitter_KafkaDisconnect(t *testing.T) {
7878
}
7979
}()
8080

81-
pollTimed(t, "emitter emitted something successfully", 10, func() bool {
81+
pollTimed(t, "emitter emitted something successfully", func() bool {
8282
return atomic.LoadInt64(&success) > 0
8383
})
8484

systemtest/multitopic_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestMultiTopics(t *testing.T) {
8585
})
8686

8787
log.Printf("waiting for processor/view to be running")
88-
pollTimed(t, "proc and view are recovered", 10.0, proc.Recovered, view.Recovered)
88+
pollTimed(t, "proc and view are recovered", proc.Recovered, view.Recovered)
8989
log.Printf("...done")
9090

9191
var sum int64
@@ -110,7 +110,7 @@ func TestMultiTopics(t *testing.T) {
110110
}
111111

112112
// poll the view and the processor until we're sure that we have
113-
pollTimed(t, "all messages have been transferred", 10.0,
113+
pollTimed(t, "all messages have been transferred",
114114
func() bool {
115115
value, err := view.Get("key")
116116
require.NoError(t, err)

systemtest/proc_disconnect_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
8080
errg.Go(func() error {
8181
return proc.Run(ctx)
8282
})
83-
pollTimed(t, "proc running", 10, proc.Recovered, func() bool {
83+
pollTimed(t, "proc running", proc.Recovered, func() bool {
8484
if val, _ := proc.Get("key-15"); val != nil && val.(int64) > 0 {
8585
return true
8686
}

0 commit comments

Comments
 (0)