Skip to content

Commit 977b037

Browse files
harisoraulb
andauthored
Allow a connector's or processor's plugin name to be updated (#1938)
* allow plugin change * tests * update tests * tests * log * update processors * fix tests * update test --------- Co-authored-by: Raúl Barroso <[email protected]>
1 parent 048a99c commit 977b037

30 files changed

+275
-290
lines changed

Diff for: pkg/connector/service.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,17 @@ func (s *Service) Delete(ctx context.Context, id string, dispenserFetcher Plugin
191191
}
192192

193193
// Update updates the connector config.
194-
func (s *Service) Update(ctx context.Context, id string, data Config) (*Instance, error) {
194+
func (s *Service) Update(ctx context.Context, id string, plugin string, data Config) (*Instance, error) {
195195
conn, err := s.Get(ctx, id)
196196
if err != nil {
197197
return nil, err
198198
}
199199

200+
if conn.Plugin != plugin {
201+
s.logger.Warn(ctx).Msgf("connector plugin changing from %v to %v, "+
202+
"this may lead to unexpected behavior and configuration issues.", conn.Plugin, plugin)
203+
}
204+
conn.Plugin = plugin
200205
conn.Config = data
201206
conn.UpdatedAt = time.Now().UTC()
202207

Diff for: pkg/connector/service_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ func TestService_UpdateSuccess(t *testing.T) {
514514
Name: "changed-name",
515515
Settings: map[string]string{"foo": "bar"},
516516
}
517+
wantPlugin := "changed-plugin"
517518

518519
conn, err := service.Create(
519520
ctx,
@@ -530,10 +531,11 @@ func TestService_UpdateSuccess(t *testing.T) {
530531
is.NoErr(err)
531532

532533
beforeUpdate := time.Now()
533-
got, err := service.Update(ctx, conn.ID, want)
534+
got, err := service.Update(ctx, conn.ID, wantPlugin, want)
534535
is.NoErr(err)
535536

536537
is.Equal(got.Config, want)
538+
is.Equal(got.Plugin, wantPlugin)
537539
is.True(!got.UpdatedAt.Before(beforeUpdate))
538540
}
539541

@@ -545,7 +547,7 @@ func TestService_UpdateInstanceNotFound(t *testing.T) {
545547

546548
service := NewService(logger, db, nil)
547549
// update connector that does not exist
548-
got, err := service.Update(ctx, uuid.NewString(), Config{})
550+
got, err := service.Update(ctx, uuid.NewString(), "foo-plugin", Config{})
549551
is.True(err != nil)
550552
is.True(cerrors.Is(err, ErrInstanceNotFound))
551553
is.Equal(got, nil)

Diff for: pkg/orchestrator/connectors.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error {
151151
return nil
152152
}
153153

154-
func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config connector.Config) (*connector.Instance, error) {
154+
func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, plugin string, config connector.Config) (*connector.Instance, error) {
155155
var r rollback.R
156156
defer r.MustExecute()
157157
txn, ctx, err := c.db.NewTransaction(ctx, true)
@@ -181,12 +181,12 @@ func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config co
181181
}
182182

183183
oldConfig := conn.Config
184-
conn, err = c.connectors.Update(ctx, id, config)
184+
conn, err = c.connectors.Update(ctx, id, plugin, config)
185185
if err != nil {
186186
return nil, err
187187
}
188188
r.Append(func() error {
189-
_, err = c.connectors.Update(ctx, id, oldConfig)
189+
_, err = c.connectors.Update(ctx, id, conn.Plugin, oldConfig)
190190
return err
191191
})
192192
err = txn.Commit()

Diff for: pkg/orchestrator/connectors_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -485,11 +485,11 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
485485
ValidateSourceConfig(gomock.Any(), conn.Plugin, newConfig.Settings).
486486
Return(nil)
487487
consMock.EXPECT().
488-
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, newConfig).
488+
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, newConfig).
489489
Return(want, nil)
490490

491491
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
492-
got, err := orc.Connectors.Update(ctx, conn.ID, newConfig)
492+
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, newConfig)
493493
is.NoErr(err)
494494
is.Equal(got, want)
495495
}
@@ -507,7 +507,7 @@ func TestConnectorOrchestrator_Update_ConnectorNotExist(t *testing.T) {
507507
Return(nil, wantErr)
508508

509509
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
510-
got, err := orc.Connectors.Update(ctx, id, connector.Config{})
510+
got, err := orc.Connectors.Update(ctx, id, "test-plugin", connector.Config{})
511511
is.True(got == nil)
512512
is.True(err != nil)
513513
is.True(cerrors.Is(err, wantErr))
@@ -537,7 +537,7 @@ func TestConnectorOrchestrator_Update_PipelineRunning(t *testing.T) {
537537
Return(pl, nil)
538538

539539
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
540-
got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{})
540+
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{})
541541
is.True(got == nil)
542542
is.True(err != nil)
543543
is.Equal(pipeline.ErrPipelineRunning, err)
@@ -571,11 +571,11 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
571571
ValidateDestinationConfig(gomock.Any(), conn.Plugin, conn.Config.Settings).
572572
Return(nil)
573573
consMock.EXPECT().
574-
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, connector.Config{}).
574+
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, connector.Config{}).
575575
Return(nil, wantErr)
576576

577577
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
578-
got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{})
578+
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{})
579579
is.True(got == nil)
580580
is.True(err != nil)
581581
is.True(cerrors.Is(err, wantErr))

Diff for: pkg/orchestrator/mock/orchestrator.go

+12-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: pkg/orchestrator/orchestrator.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type ConnectorService interface {
9797
Get(ctx context.Context, id string) (*connector.Instance, error)
9898
Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error)
9999
Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error
100-
Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error)
100+
Update(ctx context.Context, id string, plugin string, c connector.Config) (*connector.Instance, error)
101101

102102
AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
103103
RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
@@ -108,7 +108,7 @@ type ProcessorService interface {
108108
Get(ctx context.Context, id string) (*processor.Instance, error)
109109
Create(ctx context.Context, id string, plugin string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error)
110110
MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error)
111-
Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error)
111+
Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error)
112112
Delete(ctx context.Context, id string) error
113113
}
114114

Diff for: pkg/orchestrator/processors.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.
137137
return p.processors.Get(ctx, id)
138138
}
139139

140-
func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) {
140+
func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) {
141141
var r rollback.R
142142
defer r.MustExecute()
143143

@@ -157,6 +157,7 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce
157157
return nil, cerrors.Errorf("processor %q cannot be updated: %w", proc.ID, ErrImmutableProvisionedByConfig)
158158
}
159159
// provisioned by API
160+
oldPlugin := proc.Plugin
160161
oldConfig := proc.Config
161162

162163
pl, err := p.getProcessorsPipeline(ctx, proc.Parent)
@@ -168,12 +169,12 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce
168169
return nil, pipeline.ErrPipelineRunning
169170
}
170171

171-
proc, err = p.processors.Update(ctx, id, cfg)
172+
proc, err = p.processors.Update(ctx, id, plugin, cfg)
172173
if err != nil {
173174
return nil, err
174175
}
175176
r.Append(func() error {
176-
_, err = p.processors.Update(ctx, proc.ID, oldConfig)
177+
_, err = p.processors.Update(ctx, proc.ID, oldPlugin, oldConfig)
177178
return err
178179
})
179180

Diff for: pkg/orchestrator/processors_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -375,11 +375,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_Success(t *testing.T) {
375375
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
376376
Return(pl, nil)
377377
procsMock.EXPECT().
378-
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config).
378+
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config).
379379
Return(want, nil)
380380

381381
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
382-
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
382+
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
383383
is.NoErr(err)
384384
is.Equal(want, got)
385385
}
@@ -417,7 +417,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorNotExist(t *testing.T)
417417
Return(nil, wantErr)
418418

419419
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
420-
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
420+
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
421421
is.True(err != nil)
422422
is.True(cerrors.Is(err, wantErr)) // errors did not match")
423423
is.True(got == nil)
@@ -458,7 +458,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_PipelineRunning(t *testing.T) {
458458
Return(pl, nil)
459459

460460
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
461-
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
461+
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
462462
is.True(err != nil)
463463
is.Equal(pipeline.ErrPipelineRunning, err)
464464
is.True(got == nil)
@@ -498,7 +498,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorProvisionedByConfig(t *
498498
Return(before, nil)
499499

500500
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
501-
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
501+
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
502502
is.Equal(got, nil)
503503
is.True(err != nil)
504504
is.True(cerrors.Is(err, ErrImmutableProvisionedByConfig)) // expected ErrImmutableProvisionedByConfig
@@ -550,11 +550,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_UpdateFail(t *testing.T) {
550550
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
551551
Return(pl, nil)
552552
procsMock.EXPECT().
553-
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config).
553+
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config).
554554
Return(nil, wantErr)
555555

556556
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
557-
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
557+
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
558558
is.True(err != nil)
559559
is.Equal(wantErr, err)
560560
is.True(got == nil)
@@ -586,7 +586,7 @@ func TestProcessorOrchestrator_UpdateOnConnector_ConnectorNotExist(t *testing.T)
586586
Return(nil, wantErr)
587587

588588
orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
589-
got, err := orc.Processors.Update(ctx, want.ID, processor.Config{})
589+
got, err := orc.Processors.Update(ctx, want.ID, want.Plugin, processor.Config{})
590590
is.True(err != nil)
591591
is.True(cerrors.Is(err, wantErr)) // errors did not match
592592
is.True(got == nil)

Diff for: pkg/processor/service.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,25 @@ func (s *Service) Create(
162162
}
163163

164164
// Update will update a processor instance config.
165-
func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error) {
165+
func (s *Service) Update(ctx context.Context, id string, plugin string, cfg Config) (*Instance, error) {
166166
instance, err := s.Get(ctx, id)
167167
if err != nil {
168168
return nil, err
169169
}
170+
if plugin == "" {
171+
return nil, cerrors.Errorf("could not update processor instance (ID: %s): plugin name is empty", id)
172+
}
170173

171174
if instance.running {
172175
return nil, cerrors.Errorf("could not update processor instance (ID: %s): %w", id, ErrProcessorRunning)
173176
}
174177

178+
if instance.Plugin != plugin {
179+
s.logger.Warn(ctx).Msgf("processor plugin changing from %v to %v, "+
180+
"this may lead to unexpected behavior and configuration issues.", instance.Plugin, plugin)
181+
}
182+
183+
instance.Plugin = plugin
175184
instance.Config = cfg
176185
instance.UpdatedAt = time.Now()
177186

0 commit comments

Comments
 (0)