Skip to content

Commit 58284dc

Browse files
lovromazgonharisoraulb
authored
Pipeline architecture v2 (preview) (#1913)
* proof of concept for funnel (new stream engine) * tie code into the lifecycle service to test end-to-end * expand the implementation * improve performance * minimize allocations * start implementing processor * ensure graceful stop * implement processor task * build worker tasks correctly * do not use a sandbox for Run (#1886) Co-authored-by: Haris Osmanagić <[email protected]> * make graceful shutdown work * fix example * update tests * fix linter warnings * rename introduced lifecycle package * restore original lifecycle package * fix code after merge * add feature flag * go mod tidy * make acknowledgment fetching from destination stricter * documentation * make generate --------- Co-authored-by: Haris Osmanagić <[email protected]> Co-authored-by: Raúl Barroso <[email protected]>
1 parent 9a216a4 commit 58284dc

26 files changed

+3915
-53
lines changed

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ require (
4040
github.com/prometheus/client_model v0.6.1
4141
github.com/prometheus/common v0.60.1
4242
github.com/rs/zerolog v1.33.0
43+
github.com/sourcegraph/conc v0.3.0
4344
github.com/spf13/cobra v1.8.1
4445
github.com/stealthrocket/wazergo v0.19.1
4546
github.com/tetratelabs/wazero v1.8.1
@@ -312,7 +313,6 @@ require (
312313
github.com/sivchari/containedctx v1.0.3 // indirect
313314
github.com/sivchari/tenv v1.10.0 // indirect
314315
github.com/sonatard/noctx v0.0.2 // indirect
315-
github.com/sourcegraph/conc v0.3.0 // indirect
316316
github.com/sourcegraph/go-diff v0.7.0 // indirect
317317
github.com/spf13/afero v1.11.0 // indirect
318318
github.com/spf13/cast v1.7.0 // indirect

Diff for: pkg/conduit/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ type Config struct {
112112
}
113113
}
114114

115+
Preview struct {
116+
// PipelineArchV2 enables the new pipeline architecture.
117+
PipelineArchV2 bool
118+
}
119+
115120
dev struct {
116121
cpuprofile string
117122
memprofile string

Diff for: pkg/conduit/entrypoint.go

+2
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ func Flags(cfg *Config) *flag.FlagSet {
161161
flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent")
162162
flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string")
163163

164+
flags.BoolVar(&cfg.Preview.PipelineArchV2, "preview.pipeline-arch-v2", cfg.Preview.PipelineArchV2, "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)")
165+
164166
// NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help'
165167
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
166168
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")

Diff for: pkg/conduit/runtime.go

+116-23
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
4747
"github.com/conduitio/conduit/pkg/foundation/metrics/prometheus"
4848
"github.com/conduitio/conduit/pkg/lifecycle"
49+
lifecycle_v2 "github.com/conduitio/conduit/pkg/lifecycle-poc"
4950
"github.com/conduitio/conduit/pkg/orchestrator"
5051
"github.com/conduitio/conduit/pkg/pipeline"
5152
conn_plugin "github.com/conduitio/conduit/pkg/plugin/connector"
@@ -77,7 +78,7 @@ import (
7778
)
7879

7980
const (
80-
exitTimeout = 10 * time.Second
81+
exitTimeout = 30 * time.Second
8182
)
8283

8384
// Runtime sets up all services for serving and monitoring a Conduit instance.
@@ -95,7 +96,7 @@ type Runtime struct {
9596
pipelineService *pipeline.Service
9697
connectorService *connector.Service
9798
processorService *processor.Service
98-
lifecycleService *lifecycle.Service
99+
lifecycleService lifecycleService
99100

100101
connectorPluginService *conn_plugin.PluginService
101102
processorPluginService *proc_plugin.PluginService
@@ -107,6 +108,14 @@ type Runtime struct {
107108
logger log.CtxLogger
108109
}
109110

111+
// lifecycleService is an interface that we use temporarily to allow for
112+
// both the old and new lifecycle services to be used interchangeably.
113+
type lifecycleService interface {
114+
Start(ctx context.Context, pipelineID string) error
115+
Stop(ctx context.Context, pipelineID string, force bool) error
116+
Init(ctx context.Context) error
117+
}
118+
110119
// NewRuntime sets up a Runtime instance and primes it for start.
111120
func NewRuntime(cfg Config) (*Runtime, error) {
112121
if err := cfg.Validate(); err != nil {
@@ -203,21 +212,28 @@ func createServices(r *Runtime) error {
203212
tokenService,
204213
)
205214

206-
// Error recovery configuration
207-
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
208-
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
209-
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
210-
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
211-
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
212-
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
213-
}
214-
215215
plService := pipeline.NewService(r.logger, r.DB)
216216
connService := connector.NewService(r.logger, r.DB, r.connectorPersister)
217217
procService := processor.NewService(r.logger, r.DB, procPluginService)
218-
lifecycleService := lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService)
219-
provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)
220218

219+
var lifecycleService lifecycleService
220+
if r.Config.Preview.PipelineArchV2 {
221+
r.logger.Info(context.Background()).Msg("using lifecycle service v2")
222+
lifecycleService = lifecycle_v2.NewService(r.logger, connService, procService, connPluginService, plService)
223+
} else {
224+
// Error recovery configuration
225+
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
226+
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
227+
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
228+
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
229+
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
230+
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
231+
}
232+
233+
lifecycleService = lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService)
234+
}
235+
236+
provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)
221237
orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService)
222238

223239
r.Orchestrator = orc
@@ -415,6 +431,15 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error
415431
}
416432

417433
func (r *Runtime) registerCleanup(t *tomb.Tomb) {
434+
if r.Config.Preview.PipelineArchV2 {
435+
r.registerCleanupV2(t)
436+
} else {
437+
r.registerCleanupV1(t)
438+
}
439+
}
440+
441+
func (r *Runtime) registerCleanupV1(t *tomb.Tomb) {
442+
ls := r.lifecycleService.(*lifecycle.Service)
418443
t.Go(func() error {
419444
<-t.Dying()
420445
// start cleanup with a fresh context
@@ -423,12 +448,12 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
423448
// t.Err() can be nil, when we had a call: t.Kill(nil)
424449
// t.Err() will be context.Canceled, if the tomb's context was canceled
425450
if t.Err() == nil || cerrors.Is(t.Err(), context.Canceled) {
426-
r.lifecycleService.StopAll(ctx, pipeline.ErrGracefulShutdown)
451+
ls.StopAll(ctx, pipeline.ErrGracefulShutdown)
427452
} else {
428453
// tomb died due to a real error
429-
r.lifecycleService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
454+
ls.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
430455
}
431-
err := r.lifecycleService.Wait(exitTimeout)
456+
err := ls.Wait(exitTimeout)
432457
t.Go(func() error {
433458
r.connectorPersister.Wait()
434459
return r.DB.Close()
@@ -437,6 +462,62 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
437462
})
438463
}
439464

465+
func (r *Runtime) registerCleanupV2(t *tomb.Tomb) {
466+
ls := r.lifecycleService.(*lifecycle_v2.Service)
467+
t.Go(func() error {
468+
<-t.Dying()
469+
// start cleanup with a fresh context
470+
ctx := context.Background()
471+
472+
err := ls.StopAll(ctx, false)
473+
if err != nil {
474+
r.logger.Err(ctx, err).Msg("some pipelines stopped with an error")
475+
}
476+
477+
// Wait for the pipelines to stop
478+
const (
479+
count = 6
480+
interval = exitTimeout / count
481+
)
482+
483+
pipelinesStopped := make(chan struct{})
484+
go func() {
485+
for i := count; i > 0; i-- {
486+
if i == 1 {
487+
// on last try, stop forcefully
488+
_ = ls.StopAll(ctx, true)
489+
}
490+
491+
r.logger.Info(ctx).Msgf("waiting for pipelines to stop running (time left: %s)", time.Duration(i)*interval)
492+
select {
493+
case <-time.After(interval):
494+
case <-pipelinesStopped:
495+
return
496+
}
497+
}
498+
}()
499+
500+
err = ls.Wait(exitTimeout)
501+
switch {
502+
case err != nil && err != context.DeadlineExceeded:
503+
r.logger.Warn(ctx).Err(err).Msg("some pipelines stopped with an error")
504+
case err == context.DeadlineExceeded:
505+
r.logger.Warn(ctx).Msg("some pipelines did not stop in time")
506+
default:
507+
r.logger.Info(ctx).Msg("all pipelines stopped gracefully")
508+
}
509+
510+
pipelinesStopped <- struct{}{}
511+
512+
t.Go(func() error {
513+
r.connectorPersister.Wait()
514+
return r.DB.Close()
515+
})
516+
517+
return nil
518+
})
519+
}
520+
440521
func (r *Runtime) newHTTPMetricsHandler() http.Handler {
441522
return promhttp.Handler()
442523
}
@@ -770,13 +851,25 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
770851
}
771852

772853
if r.Config.Pipelines.ExitOnDegraded {
773-
r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) {
774-
r.logger.Warn(ctx).
775-
Err(e.Error).
776-
Str(log.PipelineIDField, e.ID).
777-
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
778-
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
779-
})
854+
if r.Config.Preview.PipelineArchV2 {
855+
ls := r.lifecycleService.(*lifecycle_v2.Service)
856+
ls.OnFailure(func(e lifecycle_v2.FailureEvent) {
857+
r.logger.Warn(ctx).
858+
Err(e.Error).
859+
Str(log.PipelineIDField, e.ID).
860+
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
861+
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
862+
})
863+
} else {
864+
ls := r.lifecycleService.(*lifecycle.Service)
865+
ls.OnFailure(func(e lifecycle.FailureEvent) {
866+
r.logger.Warn(ctx).
867+
Err(e.Error).
868+
Str(log.PipelineIDField, e.ID).
869+
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
870+
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
871+
})
872+
}
780873
}
781874
err = r.pipelineService.Init(ctx)
782875
if err != nil {

Diff for: pkg/connector/source.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package connector
1616

1717
import (
1818
"context"
19+
"strconv"
1920
"sync"
21+
"time"
2022

2123
"github.com/conduitio/conduit-commons/opencdc"
2224
"github.com/conduitio/conduit-connector-protocol/pconnector"
@@ -153,6 +155,7 @@ func (s *Source) Teardown(ctx context.Context) error {
153155
return plugin.ErrPluginNotRunning
154156
}
155157

158+
s.Instance.logger.Debug(ctx).Msg("closing stream")
156159
// close stream
157160
if s.stopStream != nil {
158161
s.stopStream()
@@ -192,8 +195,9 @@ func (s *Source) Read(ctx context.Context) ([]opencdc.Record, error) {
192195
return nil, err
193196
}
194197

198+
now := strconv.FormatInt(time.Now().UnixNano(), 10)
195199
for _, r := range resp.Records {
196-
s.sanitizeRecord(&r)
200+
s.sanitizeRecord(&r, now)
197201
}
198202

199203
s.Instance.inspector.Send(ctx, resp.Records)
@@ -375,7 +379,7 @@ func (s *Source) triggerLifecycleEvent(ctx context.Context, oldConfig, newConfig
375379
}
376380
}
377381

378-
func (s *Source) sanitizeRecord(r *opencdc.Record) {
382+
func (s *Source) sanitizeRecord(r *opencdc.Record, now string) {
379383
if r.Key == nil {
380384
r.Key = opencdc.RawData{}
381385
}
@@ -385,12 +389,19 @@ func (s *Source) sanitizeRecord(r *opencdc.Record) {
385389
if r.Payload.After == nil {
386390
r.Payload.After = opencdc.RawData{}
387391
}
388-
389392
if r.Metadata == nil {
390-
r.Metadata = opencdc.Metadata{}
393+
r.Metadata = opencdc.Metadata{
394+
opencdc.MetadataReadAt: now,
395+
opencdc.MetadataConduitSourceConnectorID: s.Instance.ID,
396+
}
397+
} else {
398+
if r.Metadata[opencdc.MetadataReadAt] == "" {
399+
r.Metadata[opencdc.MetadataReadAt] = now
400+
}
401+
if r.Metadata[opencdc.MetadataConduitSourceConnectorID] == "" {
402+
r.Metadata[opencdc.MetadataConduitSourceConnectorID] = s.Instance.ID
403+
}
391404
}
392-
// source connector ID is added to all records
393-
r.Metadata.SetConduitSourceConnectorID(s.Instance.ID)
394405
}
395406

396407
func (*Source) isEqual(cfg1, cfg2 map[string]string) bool {

Diff for: pkg/foundation/metrics/metrics.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -405,14 +405,18 @@ func (mt *labeledHistogram) WithValues(vs ...string) Histogram {
405405
// RecordBytesHistogram wraps a histrogram metric and allows to observe record
406406
// sizes in bytes.
407407
type RecordBytesHistogram struct {
408-
h Histogram
408+
H Histogram
409409
}
410410

411411
func NewRecordBytesHistogram(h Histogram) RecordBytesHistogram {
412-
return RecordBytesHistogram{h}
412+
return RecordBytesHistogram{H: h}
413413
}
414414

415415
func (m RecordBytesHistogram) Observe(r opencdc.Record) {
416+
m.H.Observe(m.SizeOf(r))
417+
}
418+
419+
func (m RecordBytesHistogram) SizeOf(r opencdc.Record) float64 {
416420
// TODO for now we call method Bytes() on key and payload to get the
417421
// bytes representation. In case of a structured payload or key it
418422
// is marshaled into JSON, which might not be the correct way to
@@ -429,5 +433,5 @@ func (m RecordBytesHistogram) Observe(r opencdc.Record) {
429433
if r.Payload.After != nil {
430434
bytes += len(r.Payload.After.Bytes())
431435
}
432-
m.h.Observe(float64(bytes))
436+
return float64(bytes)
433437
}

0 commit comments

Comments
 (0)