Skip to content

Commit c7080c4

Browse files
authored
feat(api): feed a specific kafka topic with jobs (#6070)
Signed-off-by: francois samin <[email protected]>
1 parent 008ec54 commit c7080c4

21 files changed

+296
-97
lines changed

engine/api/api.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,7 @@ type Configuration struct {
210210
DefaultRetentionPolicy string `toml:"defaultRetentionPolicy" comment:"Default rule for workflow run retention policy, this rule can be overridden on each workflow.\n Example: 'return run_days_before < 365' keeps runs for one year." json:"defaultRetentionPolicy" default:"return run_days_before < 365"`
211211
DisablePurgeDeletion bool `toml:"disablePurgeDeletion" comment:"Allow you to disable the deletion part of the purge. Workflow run will only be marked as delete" json:"disablePurgeDeletion" default:"false"`
212212
} `toml:"workflow" comment:"######################\n 'Workflow' global configuration \n######################" json:"workflow"`
213-
EventBus struct {
214-
GlobalKafka event.KafkaConfig `toml:"globalKafka" default:"false" json:"globalKafka" mapstructure:"globalKafka"`
215-
} `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
213+
EventBus event.Config `toml:"events" comment:"######################\n Event bus configuration \n######################" json:"events" mapstructure:"events"`
216214
}
217215

218216
// DefaultValues is the struc for API Default configuration default values
@@ -686,7 +684,7 @@ func (a *API) Serve(ctx context.Context) error {
686684
}
687685

688686
log.Info(ctx, "Initializing event broker...")
689-
if err := event.Initialize(ctx, a.mustDB(), a.Cache, a.Config.EventBus.GlobalKafka); err != nil {
687+
if err := event.Initialize(ctx, a.mustDB(), a.Cache, &a.Config.EventBus); err != nil {
690688
log.Error(ctx, "error while initializing event system: %s", err)
691689
}
692690

engine/api/application/dao_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
func TestLoadByNameAsAdmin(t *testing.T) {
2222
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
2323

24-
_ = event.Initialize(context.Background(), db.DbMap, cache)
24+
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)
2525
key := sdk.RandomString(10)
2626
proj := assets.InsertTestProject(t, db, cache, key, key)
2727
app := sdk.Application{

engine/api/application_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
7979
api, db, tsURL := newTestServer(t)
8080

8181
event.OverridePubSubKey("events_pubsub_test")
82-
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
82+
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
8383
require.NoError(t, api.initWebsocket("events_pubsub_test"))
8484

8585
u, jwt := assets.InsertAdminUser(t, db)

engine/api/environment_ascode_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestUpdateAsCodeEnvironmentHandler(t *testing.T) {
3131
api, db, tsURL := newTestServer(t)
3232

3333
event.OverridePubSubKey("events_pubsub_test")
34-
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
34+
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
3535
require.NoError(t, api.initWebsocket("events_pubsub_test"))
3636

3737
u, jwt := assets.InsertAdminUser(t, db)

engine/api/event/event.go

+59-25
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package event
22

33
import (
44
"context"
5-
"errors"
5+
"encoding/json"
66
"fmt"
77
"os"
88
"strconv"
@@ -15,15 +15,22 @@ import (
1515

1616
"github.com/ovh/cds/engine/api/integration"
1717
"github.com/ovh/cds/sdk"
18+
"github.com/ovh/cds/sdk/event"
1819
"github.com/ovh/cds/sdk/namesgenerator"
1920
)
2021

22+
type Config struct {
23+
GlobalKafka event.KafkaConfig `toml:"globalKafka" json:"globalKafka" mapstructure:"globalKafka"`
24+
JobSummaryKafka event.KafkaConfig `toml:"jobSummaryKafka" json:"jobSummaryKafka" mapstructure:"jobSummaryKafka"`
25+
}
26+
2127
// cache with go cache
2228
var (
2329
brokersConnectionCache = gocache.New(10*time.Minute, 6*time.Hour)
2430
hostname, cdsname string
2531
brokers []Broker
2632
globalBroker Broker
33+
jobSummaryBroker Broker
2734
subscribers []chan<- sdk.Event
2835
)
2936

@@ -34,7 +41,7 @@ func init() {
3441
// Broker event typed
3542
type Broker interface {
3643
initialize(ctx context.Context, options interface{}) (Broker, error)
37-
sendEvent(event *sdk.Event) error
44+
sendEvent(event interface{}) error
3845
status() string
3946
close(ctx context.Context)
4047
}
@@ -48,8 +55,8 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
4855
return nil, fmt.Errorf("invalid Broker Type %s", t)
4956
}
5057

51-
func getKafkaConfig(cfg sdk.IntegrationConfig) KafkaConfig {
52-
kafkaCfg := KafkaConfig{
58+
func getKafkaConfig(cfg sdk.IntegrationConfig) event.KafkaConfig {
59+
kafkaCfg := event.KafkaConfig{
5360
Enabled: true,
5461
BrokerAddresses: cfg["broker url"].Value,
5562
Topic: cfg["topic"].Value,
@@ -100,11 +107,7 @@ func ResetEventIntegration(ctx context.Context, db gorp.SqlExecutor, eventIntegr
100107
}
101108

102109
// Initialize initializes event system
103-
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaConfigs ...KafkaConfig) error {
104-
if len(glolbalKafkaConfigs) > 1 {
105-
return errors.New("only one global kafka global config is supported")
106-
}
107-
110+
func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, config *Config) error {
108111
store = cache
109112
var err error
110113
hostname, err = os.Hostname()
@@ -121,11 +124,27 @@ func Initialize(ctx context.Context, db *gorp.DbMap, cache Store, glolbalKafkaCo
121124
}
122125
}
123126

124-
if len(glolbalKafkaConfigs) == 1 && glolbalKafkaConfigs[0].BrokerAddresses != "" {
125-
globalBroker, err = getBroker(ctx, "kafka", glolbalKafkaConfigs[0])
127+
if config == nil {
128+
return nil
129+
}
130+
131+
if config.GlobalKafka.BrokerAddresses != "" {
132+
globalBroker, err = getBroker(ctx, "kafka", config.GlobalKafka)
126133
if err != nil {
127134
ctx = log.ContextWithStackTrace(ctx, err)
128135
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
136+
} else {
137+
log.Info(ctx, "client to broker %s:%s ready", config.GlobalKafka.BrokerAddresses, config.GlobalKafka.Topic)
138+
}
139+
}
140+
141+
if config.JobSummaryKafka.BrokerAddresses != "" {
142+
jobSummaryBroker, err = getBroker(ctx, "kafka", config.JobSummaryKafka)
143+
if err != nil {
144+
ctx = log.ContextWithStackTrace(ctx, err)
145+
log.Error(ctx, "unable to init builtin kafka broker from config: %v", err)
146+
} else {
147+
log.Info(ctx, "client to broker %s:%s ready", config.JobSummaryKafka.BrokerAddresses, config.GlobalKafka.Topic)
129148
}
130149
}
131150

@@ -150,32 +169,47 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
150169
return
151170
}
152171

153-
for _, s := range subscribers {
154-
s <- e
172+
// Filter "EventJobSummary" for globalKafka Broker
173+
if e.EventType != "sdk.EventJobSummary" {
174+
for _, s := range subscribers {
175+
s <- e
176+
}
177+
if globalBroker != nil {
178+
log.Info(ctx, "sending event %q to global broker", e.EventType)
179+
if err := globalBroker.sendEvent(&e); err != nil {
180+
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
181+
}
182+
}
183+
continue
184+
// we don't send other events than EventJobSummary to users kafka
155185
}
156186

157-
if globalBroker != nil {
158-
log.Info(ctx, "sending event %q to global broker", e.EventType)
159-
if err := globalBroker.sendEvent(&e); err != nil {
160-
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
187+
// We now only send "EventJobSummary" in the jobSummary Broker in project integrations
188+
// if the users send specific kafka integration on their workflows
189+
var ejs sdk.EventJobSummary
190+
if err := json.Unmarshal(e.Payload, &ejs); err != nil {
191+
ctx := log.ContextWithStackTrace(ctx, err)
192+
log.Error(ctx, "unable to unmarshal EventJobSummary")
193+
continue
194+
}
195+
if jobSummaryBroker != nil {
196+
log.Info(ctx, "sending event %+v to job summary broker", ejs)
197+
if err := jobSummaryBroker.sendEvent(ejs); err != nil {
198+
log.Error(ctx, "Error while sending message %s: %v", string(e.Payload), err)
161199
}
162200
}
163201

164202
for _, eventIntegrationID := range e.EventIntegrationsID {
165203
brokerConnectionKey := strconv.FormatInt(eventIntegrationID, 10)
166204
brokerConnection, ok := brokersConnectionCache.Get(brokerConnectionKey)
167-
var brokerConfig KafkaConfig
205+
var brokerConfig event.KafkaConfig
168206
if !ok {
169207
projInt, err := integration.LoadProjectIntegrationByIDWithClearPassword(ctx, db, eventIntegrationID)
170208
if err != nil {
171209
log.Error(ctx, "Event.DequeueEvent> Cannot load project integration for project %s and id %d and type event: %v", e.ProjectKey, eventIntegrationID, err)
172210
continue
173211
}
174212

175-
if projInt.Model.Public {
176-
continue
177-
}
178-
179213
kafkaCfg := getKafkaConfig(projInt.Config)
180214
kafkaBroker, err := getBroker(ctx, "kafka", kafkaCfg)
181215
if err != nil {
@@ -197,9 +231,9 @@ func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
197231
}
198232

199233
// Send into external brokers
200-
log.Info(ctx, "sending event %q to %s", e.EventType, brokerConfig.BrokerAddresses)
201-
if err := broker.sendEvent(&e); err != nil {
202-
log.Warn(ctx, "Error while sending message [%s: %s/%s/%s/%s/%s]: %s", e.EventType, e.ProjectKey, e.WorkflowName, e.ApplicationName, e.PipelineName, e.EnvironmentName, err)
234+
log.Info(ctx, "sending event %q to integration broker: %s", e.EventType, brokerConfig.BrokerAddresses)
235+
if err := broker.sendEvent(ejs); err != nil {
236+
log.Warn(ctx, "Error while sending message %s: %v", string(e.Payload), err)
203237
}
204238
}
205239
}

engine/api/event/kafka.go

+9-23
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,20 @@ import (
77
"strings"
88

99
"github.com/Shopify/sarama"
10+
"github.com/ovh/cds/sdk/event"
11+
"github.com/pkg/errors"
1012
"github.com/rockbears/log"
11-
12-
"github.com/ovh/cds/sdk"
1313
)
1414

1515
// KafkaClient enbeddes the Kafka connecion
1616
type KafkaClient struct {
17-
options KafkaConfig
17+
options event.KafkaConfig
1818
producer sarama.SyncProducer
1919
}
2020

21-
// KafkaConfig handles all config to connect to Kafka
22-
type KafkaConfig struct {
23-
Enabled bool `toml:"enabled" json:"-" default:"false" mapstructure:"enabled"`
24-
BrokerAddresses string `toml:"broker" json:"-" mapstructure:"broker"`
25-
User string `toml:"user" json:"-" mapstructure:"user"`
26-
Password string `toml:"password" json:"-" mapstructure:"password"`
27-
Version string `toml:"version" json:"-" mapstructure:"version"`
28-
Topic string `toml:"topic" json:"-" mapstructure:"topic"`
29-
MaxMessageByte int `toml:"maxMessageByte" json:"-" mapstructure:"maxMessageByte"`
30-
DisableTLS bool `toml:"disableTLS" json:"-" mapstructure:"disableTLS"`
31-
DisableSASL bool `toml:"disableSASL" json:"-" mapstructure:"disableSASL"`
32-
ClientID string `toml:"clientID" json:"-" mapstructure:"clientID"`
33-
}
34-
3521
// initialize returns broker, isInit and err if
3622
func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Broker, error) {
37-
conf, ok := options.(KafkaConfig)
23+
conf, ok := options.(event.KafkaConfig)
3824
if !ok {
3925
return nil, fmt.Errorf("invalid Kafka Initialization")
4026
}
@@ -104,15 +90,15 @@ func (c *KafkaClient) initProducer() error {
10490
}
10591

10692
// sendOnKafkaTopic send a hook on a topic kafka
107-
func (c *KafkaClient) sendEvent(event *sdk.Event) error {
108-
data, errm := json.Marshal(event)
109-
if errm != nil {
110-
return errm
93+
func (c *KafkaClient) sendEvent(event interface{}) error {
94+
data, err := json.Marshal(event)
95+
if err != nil {
96+
return errors.WithStack(err)
11197
}
11298

11399
msg := &sarama.ProducerMessage{Topic: c.options.Topic, Value: sarama.ByteEncoder(data)}
114100
if _, _, err := c.producer.SendMessage(msg); err != nil {
115-
return err
101+
return errors.WithStack(err)
116102
}
117103
return nil
118104
}

engine/api/event/publish_workflow_run.go

+18
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,21 @@ func PublishWorkflowNodeJobRun(ctx context.Context, pkey string, wr sdk.Workflow
219219
}
220220
publishRunWorkflow(ctx, e, data)
221221
}
222+
223+
func PublishEventJobSummary(ctx context.Context, e sdk.EventJobSummary, integrations []sdk.WorkflowProjectIntegration) {
224+
eventIntegrationsID := make([]int64, len(integrations))
225+
for i, eventIntegration := range integrations {
226+
eventIntegrationsID[i] = eventIntegration.ProjectIntegrationID
227+
}
228+
229+
bts, _ := json.Marshal(e)
230+
event := sdk.Event{
231+
Timestamp: time.Now(),
232+
Hostname: hostname,
233+
CDSName: cdsname,
234+
EventType: fmt.Sprintf("%T", e),
235+
Payload: bts,
236+
EventIntegrationsID: eventIntegrationsID,
237+
}
238+
_ = publishEvent(ctx, event)
239+
}

engine/api/pipeline/pipeline_importer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func testImportUpdate(t *testing.T, db gorp.SqlExecutor, store cache.Store, tt t
7676
func TestImportUpdate(t *testing.T) {
7777
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
7878

79-
_ = event.Initialize(context.Background(), db.DbMap, cache)
79+
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)
8080

8181
if db == nil {
8282
t.FailNow()

engine/api/pipeline_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestUpdateAsCodePipelineHandler(t *testing.T) {
3030
api, db, tsURL := newTestServer(t)
3131

3232
event.OverridePubSubKey("events_pubsub_test")
33-
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache))
33+
require.NoError(t, event.Initialize(context.Background(), api.mustDB(), api.Cache, nil))
3434
require.NoError(t, api.initWebsocket("events_pubsub_test"))
3535

3636
u, jwt := assets.InsertAdminUser(t, db)

engine/api/project/dao_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestExist(t *testing.T) {
5555
func TestLoadAllByRepo(t *testing.T) {
5656
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
5757

58-
_ = event.Initialize(context.Background(), db.DbMap, cache)
58+
_ = event.Initialize(context.Background(), db.DbMap, cache, nil)
5959

6060
app, _ := application.LoadByName(db, "TestLoadAllByRepo", "TestLoadAllByRepo")
6161
if app != nil {

engine/api/websocket_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {
254254

255255
pubSubKey := "events_pubsub_test_" + sdk.RandomString(10)
256256
event.OverridePubSubKey(pubSubKey)
257-
require.NoError(t, event.Initialize(context.TODO(), api.mustDB(), api.Cache))
257+
require.NoError(t, event.Initialize(context.TODO(), api.mustDB(), api.Cache, nil))
258258
require.NoError(t, api.initWebsocket(pubSubKey))
259259

260260
_, jwt := assets.InsertAdminUser(t, db)

engine/api/workflow/dao_run_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestCanBeRun(t *testing.T) {
8181
func TestPurgeWorkflowRun(t *testing.T) {
8282
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
8383

84-
_ = event.Initialize(context.TODO(), db.DbMap, cache)
84+
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)
8585

8686
mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
8787
defer func() {
@@ -257,7 +257,7 @@ vcs_ssh_key: proj-blabla
257257
func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
258258
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
259259

260-
_ = event.Initialize(context.TODO(), db.DbMap, cache)
260+
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)
261261

262262
u, _ := assets.InsertAdminUser(t, db)
263263
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
@@ -346,7 +346,7 @@ func TestPurgeWorkflowRunWithRunningStatus(t *testing.T) {
346346
func TestPurgeWorkflowRunWithOneSuccessWorkflowRun(t *testing.T) {
347347
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
348348

349-
_ = event.Initialize(context.TODO(), db.DbMap, cache)
349+
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)
350350

351351
mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
352352
defer func() {
@@ -546,7 +546,7 @@ vcs_ssh_key: proj-blabla
546546
func TestPurgeWorkflowRunWithNoSuccessWorkflowRun(t *testing.T) {
547547
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
548548

549-
_ = event.Initialize(context.TODO(), db.DbMap, cache)
549+
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)
550550

551551
mockVCSSservice, _ := assets.InsertService(t, db, "TestManualRunBuildParameterMultiApplication", sdk.TypeVCS)
552552
defer func() {
@@ -718,7 +718,7 @@ vcs_ssh_key: proj-blabla
718718
func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
719719
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
720720

721-
_ = event.Initialize(context.TODO(), db.DbMap, cache)
721+
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)
722722

723723
u, _ := assets.InsertAdminUser(t, db)
724724
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)
@@ -804,7 +804,7 @@ func TestPurgeWorkflowRunWithoutTags(t *testing.T) {
804804
func TestPurgeWorkflowRunWithoutTagsBiggerHistoryLength(t *testing.T) {
805805
db, cache := test.SetupPG(t, bootstrap.InitiliazeDB)
806806

807-
_ = event.Initialize(context.TODO(), db.DbMap, cache)
807+
_ = event.Initialize(context.TODO(), db.DbMap, cache, nil)
808808

809809
u, _ := assets.InsertAdminUser(t, db)
810810
consumer, _ := authentication.LoadConsumerByTypeAndUserID(context.TODO(), db, sdk.ConsumerLocal, u.ID, authentication.LoadConsumerOptions.WithAuthentifiedUser)

0 commit comments

Comments
 (0)