Skip to content

Commit 008ec54

Browse files
authored
feat(api): don't send event to public event integrations (#6066)
Signed-off-by: francois samin <[email protected]>
1 parent 2b56b04 commit 008ec54

File tree

10 files changed

+63
-171
lines changed

10 files changed

+63
-171
lines changed

docs/content/docs/integrations/kafka/kafka_events.md

+1-36
Original file line numberDiff line numberDiff line change
@@ -56,42 +56,7 @@ Import the integration on your CDS Project with:
5656
cdsctl project integration import PROJECT_KEY project-configuration.yml
5757
```
5858

59-
Then, as a standard user, you can add a [Kafka Hook]({{<relref "/docs/concepts/workflow/hooks/kafka-hook.md">}}) on your workflow.
60-
61-
### Create a Public Kafka Integration for whole CDS Projects
62-
63-
You can also add a Kafka Integration with cdsctl. As a CDS Administrator,
64-
this allows you to propose a Public Kafka Integration, available on all CDS Projects.
65-
66-
Create a file `public-configuration.yml`:
67-
68-
```yml
69-
name: your-kafka-integration
70-
event: true
71-
public: true
72-
public_configurations:
73-
name-of-integration:
74-
"broker url":
75-
type: string
76-
value: "n1.o1.your-broker:9093,n2.o1.n1.o1.your-broker:9093,n3.o1.n1.o1.your-broker:9093"
77-
"topic":
78-
type: string
79-
value: "your-topic.events"
80-
"username":
81-
type: string
82-
value: "your-topic.cds-reader"
83-
"password":
84-
type: password
85-
value: xxxxxxxx
86-
```
87-
88-
Import the integration with :
89-
90-
```bash
91-
cdsctl admin integration-model import public-configuration.yml
92-
```
93-
94-
Then, as a standard user, you can add a [Kafka Hook]({{<relref "/docs/concepts/workflow/hooks/kafka-hook.md">}}) on your workflow.
59+
Then, as a standard user, you can use your kafka integration for workflow notifications.
9560

9661
### One Integration, two use case
9762

engine/api/api.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ type Configuration struct {
210210
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
211211
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
212212
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
213+
EventBus struct {
214+
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"`
215+
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
213216
}
214217

215218
// DefaultValues is the struc for API Default configuration default values
@@ -683,7 +686,7 @@ func (a *API) Serve(ctx context.Context) error {
683686
}
684687

685688
log.Info(ctx, "Initializing event broker...")
686-
if err := event.Initialize(ctx, a.mustDB(), a.Cache); err != nil {
689+
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
687690
log.Error(ctx, "error while initializing event system: %s", err)
688691
}
689692

engine/api/event/elasticsearch.go

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func PushInElasticSearch(ctx context.Context, db gorp.SqlExecutor, store cache.S
4141
continue
4242
}
4343
e.Payload = nil
44+
log.Info(ctx, "sending event %q to %s services", e.EventType, sdk.TypeElasticsearch)
4445
_, code, errD := services.NewClient(db, esServices).DoJSONRequest(context.Background(), "POST", "/events", e, nil)
4546
if code >= 400 || errD != nil {
4647
log.Error(ctx, "PushInElasticSearch> Unable to send event %s to elasticsearch [%d]: %v", e.EventType, code, errD)

engine/api/event/event.go

+29-34
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package event
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"strconv"
@@ -18,11 +19,13 @@ import (
1819
)
1920

2021
// cache with go cache
21-
var brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
22-
var publicBrokersConnectionCache = []Broker{}
23-
var hostname, cdsname string
24-
var brokers []Broker
25-
var subscribers []chan<- sdk.Event
22+
var (
23+
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
24+
hostname, cdsname string
25+
brokers []Broker
26+
globalBroker Broker
27+
subscribers []chan<- sdk.Event
28+
)
2629

2730
func init() {
2831
subscribers = make([]chan<- sdk.Event, 0)
@@ -45,30 +48,6 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
4548
return nil, fmt.Errorf("invalid Broker Type %s", t)
4649
}
4750

48-
// ResetPublicIntegrations load all integration of type Event and creates kafka brokers
49-
func ResetPublicIntegrations(ctx context.Context, db *gorp.DbMap) error {
50-
publicBrokersConnectionCache = []Broker{}
51-
filterType := sdk.IntegrationTypeEvent
52-
integrations, err := integration.LoadPublicModelsByTypeWithDecryption(db, &filterType)
53-
if err != nil {
54-
return sdk.WrapError(err, "cannot load public models for event type")
55-
}
56-
57-
for _, integration := range integrations {
58-
for _, cfg := range integration.PublicConfigurations {
59-
kafkaCfg := getKafkaConfig(cfg)
60-
kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg)
61-
if err != nil {
62-
return sdk.WrapError(err, "cannot get broker for %s and user %s", cfg["broker url"].Value, cfg["username"].Value)
63-
}
64-
65-
publicBrokersConnectionCache = append(publicBrokersConnectionCache, kafkaBroker)
66-
}
67-
}
68-
69-
return nil
70-
}
71-
7251
func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig {
7352
kafkaCfg := KafkaConfig{
7453
Enabled: true,
@@ -121,7 +100,11 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
121100
}
122101

123102
// Initialize initializes event system
124-
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
103+
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
104+
if len(glolbalKafkaConfigs) > 1 {
105+
return errors.New("only one global kafka global config is supported")
106+
}
107+
125108
store = cache
126109
var err error
127110
hostname, err = os.Hostname()
@@ -138,7 +121,15 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store) error {
138121
}
139122
}
140123

141-
return ResetPublicIntegrations(ctx, db)
124+
if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
125+
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
126+
if err != nil {
127+
ctx = log.ContextWithStackTrace(ctx, err)
128+
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
129+
}
130+
}
131+
132+
return nil
142133
}
143134

144135
// Subscribe to CDS events
@@ -163,15 +154,17 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
163154
s <- e
164155
}
165156

166-
// Send into public brokers
167-
for _, b := range publicBrokersConnectionCache {
168-
if err := b.sendEvent(&e); err != nil {
157+
if globalBroker != nil {
158+
log.Info(ctx, "sending event %q to global broker", e.EventType)
159+
if err := globalBroker.sendEvent(&e); err != nil {
169160
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
170161
}
171162
}
163+
172164
for _, eventIntegrationID := range e.EventIntegrationsID {
173165
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
174166
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
167+
var brokerConfig KafkaConfig
175168
if !ok {
176169
projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID)
177170
if err != nil {
@@ -194,6 +187,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
194187
continue
195188
}
196189
brokerConnection = kafkaBroker
190+
brokerConfig = kafkaCfg
197191
}
198192

199193
broker, ok := brokerConnection.(Broker)
@@ -203,6 +197,7 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
203197
}
204198

205199
// Send into external brokers
200+
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
206201
if err := broker.sendEvent(&e); err != nil {
207202
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
208203
}

engine/api/event/kafka.go

+22-10
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@ type KafkaClient struct {
2020

2121
// KafkaConfig handles all config to connect to Kafka
2222
type KafkaConfig struct {
23-
Enabled bool
24-
BrokerAddresses string
25-
User string
26-
Password string
27-
Version string
28-
Topic string
29-
MaxMessageByte int
30-
DisableTLS bool
31-
DisableSASL bool
32-
ClientID string
23+
Enabled bool `toml:"enabled" json:"-" default:"false" mapstructure:"enabled"`
24+
BrokerAddresses string `toml:"broker" json:"-" mapstructure:"broker"`
25+
User string `toml:"user" json:"-" mapstructure:"user"`
26+
Password string `toml:"password" json:"-" mapstructure:"password"`
27+
Version string `toml:"version" json:"-" mapstructure:"version"`
28+
Topic string `toml:"topic" json:"-" mapstructure:"topic"`
29+
MaxMessageByte int `toml:"maxMessageByte" json:"-" mapstructure:"maxMessageByte"`
30+
DisableTLS bool `toml:"disableTLS" json:"-" mapstructure:"disableTLS"`
31+
DisableSASL bool `toml:"disableSASL" json:"-" mapstructure:"disableSASL"`
32+
ClientID string `toml:"clientID" json:"-" mapstructure:"clientID"`
3333
}
3434

3535
// initialize returns broker, isInit and err if
@@ -43,6 +43,18 @@ func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Brok
4343
conf.Topic == "" {
4444
return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration")
4545
}
46+
47+
if conf.MaxMessageByte == 0 {
48+
conf.MaxMessageByte = 10000000
49+
}
50+
51+
if conf.ClientID == "" {
52+
conf.ClientID = conf.User
53+
}
54+
if conf.ClientID == "" {
55+
conf.ClientID = "cds"
56+
}
57+
4658
c.options = conf
4759

4860
if err := c.initProducer(); err != nil {

engine/api/integration.go

-18
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,6 @@ func (api *API) postIntegrationModelHandler() service.Handler {
7070

7171
if m.Public {
7272
go propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
73-
if m.Event {
74-
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
75-
return sdk.WrapError(err, "error while resetting public integrations")
76-
}
77-
}
7873
}
7974

8075
return service.WriteJSON(w, m, http.StatusCreated)
@@ -127,12 +122,6 @@ func (api *API) putIntegrationModelHandler() service.Handler {
127122
api.GoRoutines.Exec(ctx, "propagatePublicIntegrationModel", func(ctx context.Context) {
128123
propagatePublicIntegrationModel(ctx, api.mustDB(), api.Cache, *m, getAPIConsumer(ctx))
129124
})
130-
131-
if m.Event {
132-
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
133-
return sdk.WrapError(err, "error while resetting public integrations")
134-
}
135-
}
136125
}
137126

138127
return service.WriteJSON(w, m, http.StatusOK)
@@ -231,13 +220,6 @@ func (api *API) deleteIntegrationModelHandler() service.Handler {
231220
return sdk.WrapError(err, "Unable to commit tx")
232221
}
233222

234-
if old.Event && old.Public {
235-
// reset outside the transaction
236-
if err := event.ResetPublicIntegrations(ctx, api.mustDB()); err != nil {
237-
return sdk.WrapError(err, "error while resetting public integrations")
238-
}
239-
}
240-
241223
return nil
242224
}
243225
}

engine/api/integration/dao_model.go

-40
Original file line numberDiff line numberDiff line change
@@ -39,46 +39,6 @@ func LoadModels(db gorp.SqlExecutor) ([]sdk.IntegrationModel, error) {
3939
return res, nil
4040
}
4141

42-
func LoadPublicModelsByTypeWithDecryption(db gorp.SqlExecutor, integrationType *sdk.IntegrationType) ([]sdk.IntegrationModel, error) {
43-
q := "SELECT * from integration_model WHERE public = true"
44-
if integrationType != nil {
45-
switch *integrationType {
46-
case sdk.IntegrationTypeEvent:
47-
q += " AND integration_model.event = true"
48-
case sdk.IntegrationTypeCompute:
49-
q += " AND integration_model.compute = true"
50-
case sdk.IntegrationTypeStorage:
51-
q += " AND integration_model.storage = true"
52-
case sdk.IntegrationTypeHook:
53-
q += " AND integration_model.hook = true"
54-
case sdk.IntegrationTypeDeployment:
55-
q += " AND integration_model.deployment = true"
56-
}
57-
}
58-
59-
query := gorpmapping.NewQuery(q)
60-
var pms integrationModelSlice
61-
62-
if err := gorpmapping.GetAll(context.Background(), db, query, &pms, gorpmapping.GetOptions.WithDecryption); err != nil {
63-
return nil, err
64-
}
65-
66-
var res []sdk.IntegrationModel
67-
for _, pm := range pms {
68-
isValid, err := gorpmapping.CheckSignature(pm, pm.Signature)
69-
if err != nil {
70-
return nil, err
71-
}
72-
if !isValid {
73-
log.Error(context.Background(), "integration.LoadModel> model %d data corrupted", pm.ID)
74-
continue
75-
}
76-
res = append(res, pm.IntegrationModel)
77-
}
78-
79-
return res, nil
80-
}
81-
8242
// LoadModel Load a integration model by its ID
8343
func LoadModel(ctx context.Context, db gorp.SqlExecutor, modelID int64) (sdk.IntegrationModel, error) {
8444
query := gorpmapping.NewQuery("SELECT * from integration_model where id = $1").Args(modelID)

engine/api/integration/dao_model_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,4 @@ func TestCRUDModel(t *testing.T) {
4242
require.NoError(t, err)
4343

4444
assert.True(t, len(models) > 1)
45-
46-
filter := sdk.IntegrationTypeEvent
47-
_, err = LoadPublicModelsByTypeWithDecryption(db, &filter)
48-
require.NoError(t, err)
4945
}

tests/05_sc_workflow_event_kafka.yml

+6-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ testcases:
1313

1414
- name: import integrations
1515
steps:
16-
- script: {{.cdsctl}} -f {{.cdsctl.config}} admin integration-model import ./fixtures/integrations/kafka-public.yml
1716
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka.yml
1817
- script: {{.cdsctl}} -f {{.cdsctl.config}} project integration import ITSCWRKFLW18 ./fixtures/integrations/kafka-hook.yml
1918

@@ -24,30 +23,32 @@ testcases:
2423
- name: check if consumer kafka is started
2524
steps:
2625
- script: sleep 15 && {{.cdsctl}} -f {{.cdsctl.config}} admin services status --type=hooks|grep 'Hook Kafka Consumers' | grep OK
27-
retry: 30
28-
delay: 10
26+
timeout: 30
2927

3028
- name: run workflow by sending a kafka event
3129
steps:
3230
- script: kafkacat -b localhost:9092 -t test.hook -T -P -l ./fixtures/ITSCWRKFLW18/input-kafka.json
31+
timeout: 30
3332

3433
- name: check event in topic test.eventsproject
3534
steps:
3635
- script: kafkacat -b localhost:9092 -t test.eventsproject -C -o -1 -c 1
3736
assertions:
3837
- result.code ShouldEqual 0
3938
- "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob"
40-
retry: 30
39+
retry: 10
4140
delay: 10
41+
timeout: 100
4242

4343
- name: check event in topic test.eventspublic
4444
steps:
4545
- script: kafkacat -b localhost:9092 -t test.eventspublic -C -o -1 -c 1
4646
assertions:
4747
- result.code ShouldEqual 0
4848
- "result.systemoutjson.type_event ShouldContainSubstring sdk.EventRunWorkflowJob"
49-
retry: 30
49+
retry: 10
5050
delay: 10
51+
timeout: 100
5152

5253
- name: check workflow
5354
steps:

0 commit comments

Comments
 (0)