Skip to content

Commit bd365b5

Browse files
authored
chore: enabling gateway to ingest events even when sharedDB is down (#4262)
* chore: enabling gateway to ingest events even when sharedDB is down * addressed comments * fix unit tests
1 parent b928bfe commit bd365b5

12 files changed

+94
-70
lines changed

app/apphandlers/embeddedAppHandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
127127

128128
fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)
129129

130-
rsourcesService, err := NewRsourcesService(deploymentType)
130+
rsourcesService, err := NewRsourcesService(deploymentType, true)
131131
if err != nil {
132132
return err
133133
}

app/apphandlers/gatewayAppHandler.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
121121
if err != nil {
122122
return fmt.Errorf("failed to create rate limiter: %w", err)
123123
}
124-
rsourcesService, err := NewRsourcesService(deploymentType)
124+
rsourcesService, err := NewRsourcesService(deploymentType, false)
125125
if err != nil {
126126
return err
127127
}
@@ -132,23 +132,23 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
132132
})
133133
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"))
134134
if err != nil {
135-
return fmt.Errorf("drain config manager setup: %v", err)
136-
}
137-
defer drainConfigManager.Stop()
138-
g.Go(misc.WithBugsnag(func() (err error) {
139-
return drainConfigManager.DrainConfigRoutine(ctx)
140-
}))
141-
g.Go(misc.WithBugsnag(func() (err error) {
142-
return drainConfigManager.CleanupRoutine(ctx)
143-
}))
135+
a.log.Errorw("drain config manager setup failed while starting gateway", "error", err)
136+
}
137+
138+
drainConfigHttpHandler := drain_config.ErrorResponder("unable to start drain config http handler")
139+
if drainConfigManager != nil {
140+
defer drainConfigManager.Stop()
141+
drainConfigHttpHandler = drainConfigManager.DrainConfigHttpHandler()
142+
}
143+
144144
err = gw.Setup(
145145
ctx,
146146
config, logger.NewLogger().Child("gateway"), stats.Default,
147147
a.app, backendconfig.DefaultBackendConfig, gatewayDB, errDB,
148148
rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
149149
gateway.WithInternalHttpHandlers(
150150
map[string]http.Handler{
151-
"/drain": drainConfigManager.DrainConfigHttpHandler(),
151+
"/drain": drainConfigHttpHandler,
152152
},
153153
),
154154
)

app/apphandlers/processorAppHandler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
134134

135135
fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)
136136

137-
rsourcesService, err := NewRsourcesService(deploymentType)
137+
rsourcesService, err := NewRsourcesService(deploymentType, true)
138138
if err != nil {
139139
return err
140140
}

app/apphandlers/setup.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func rudderCoreWorkSpaceTableSetup() error {
5757
}
5858

5959
// NewRsourcesService produces a rsources.JobService through environment configuration (env variables & config file)
60-
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
60+
func NewRsourcesService(deploymentType deployment.Type, shouldSetupSharedDB bool) (rsources.JobService, error) {
6161
var rsourcesConfig rsources.JobServiceConfig
6262
rsourcesConfig.MaxPoolSize = config.GetInt("Rsources.MaxPoolSize", 3)
6363
rsourcesConfig.MinPoolSize = config.GetInt("Rsources.MinPoolSize", 1)
@@ -82,6 +82,8 @@ func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, er
8282
}
8383
}
8484

85+
rsourcesConfig.ShouldSetupSharedDB = shouldSetupSharedDB
86+
8587
return rsources.NewJobService(rsourcesConfig)
8688
}
8789

internal/drain-config/drainConfig.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,25 @@ type drainConfigManager struct {
3939
wg sync.WaitGroup
4040
}
4141

42+
// NewDrainConfigManager returns a drainConfigManager
43+
// If migration fails while setting up drain config, drainConfigManager object will be returned along with error
44+
// Consumers must handle errors and non-nil drainConfigManager object according to their use case.
4245
func NewDrainConfigManager(conf *config.Config, log logger.Logger) (*drainConfigManager, error) {
4346
db, err := setupDBConn(conf)
4447
if err != nil {
4548
log.Errorw("db setup", "error", err)
4649
return nil, fmt.Errorf("db setup: %v", err)
4750
}
48-
if err := migrate(db); err != nil {
51+
if err = migrate(db); err != nil {
4952
log.Errorw("db migrations", "error", err)
50-
return nil, fmt.Errorf("db migrations: %v", err)
5153
}
5254
return &drainConfigManager{
5355
log: log,
5456
conf: conf,
5557
db: db,
5658

5759
done: &atomic.Bool{},
58-
}, nil
60+
}, err
5961
}
6062

6163
func (d *drainConfigManager) CleanupRoutine(ctx context.Context) error {
@@ -189,9 +191,6 @@ func setupDBConn(conf *config.Config) (*sql.DB, error) {
189191
if err != nil {
190192
return nil, fmt.Errorf("db open: %v", err)
191193
}
192-
if err := db.Ping(); err != nil {
193-
return nil, fmt.Errorf("db ping: %v", err)
194-
}
195194
db.SetMaxIdleConns(conf.GetInt("drainConfig.maxIdleConns", 1))
196195
db.SetMaxOpenConns(conf.GetInt("drainConfig.maxOpenConns", 2))
197196
return db, nil

internal/drain-config/http.go

+6
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,9 @@ func (dcm *drainConfigManager) insert(ctx context.Context, key, value string) er
4141
}
4242
return nil
4343
}
44+
45+
func ErrorResponder(errMsg string) http.Handler {
46+
return http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
47+
http.Error(w, errMsg, http.StatusInternalServerError)
48+
}))
49+
}

services/rsources/failed_records_pagination_test.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ func TestFailedRecords(t *testing.T) {
4343
require.NoError(t, err)
4444

4545
service, err := NewJobService(JobServiceConfig{
46-
LocalHostname: postgresContainer.Host,
47-
MaxPoolSize: 1,
48-
LocalConn: postgresContainer.DBDsn,
49-
Log: logger.NOP,
46+
LocalHostname: postgresContainer.Host,
47+
MaxPoolSize: 1,
48+
LocalConn: postgresContainer.DBDsn,
49+
Log: logger.NOP,
50+
ShouldSetupSharedDB: true,
5051
})
5152
require.NoError(t, err)
5253
// Create 2 different job run ids with 10 records each
@@ -85,10 +86,11 @@ func RunFailedRecordsPerformanceTest(t testing.TB, recordCount, pageSize int) ti
8586
require.NoError(t, err)
8687

8788
service, err := NewJobService(JobServiceConfig{
88-
LocalHostname: postgresContainer.Host,
89-
MaxPoolSize: 1,
90-
LocalConn: postgresContainer.DBDsn,
91-
Log: logger.NOP,
89+
LocalHostname: postgresContainer.Host,
90+
MaxPoolSize: 1,
91+
LocalConn: postgresContainer.DBDsn,
92+
Log: logger.NOP,
93+
ShouldSetupSharedDB: true,
9294
})
9395
require.NoError(t, err)
9496

services/rsources/handler.go

+30-30
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (sh *sourcesHandler) getStatusInternal(ctx context.Context, db *sql.DB, job
4444
filters, filterParams := sqlFilters(jobRunId, filter)
4545

4646
sqlStatement := fmt.Sprintf(
47-
`SELECT
47+
`SELECT
4848
source_id,
4949
destination_id,
5050
task_run_id,
@@ -96,7 +96,7 @@ func (*sourcesHandler) IncrementStats(ctx context.Context, tx *sql.Tx, jobRunId
9696
failed_count
9797
) values ($1, $2, $3, $4, $5, $6, $7)
9898
on conflict(db_name, job_run_id, task_run_id, source_id, destination_id)
99-
do update set
99+
do update set
100100
in_count = "rsources_stats".in_count + excluded.in_count,
101101
out_count = "rsources_stats".out_count + excluded.out_count,
102102
failed_count = "rsources_stats".failed_count + excluded.failed_count,
@@ -116,7 +116,7 @@ func (sh *sourcesHandler) AddFailedRecords(ctx context.Context, tx *sql.Tx, jobR
116116
return nil
117117
}
118118
row := tx.QueryRow(`INSERT INTO rsources_failed_keys_v2 (id, job_run_id, task_run_id, source_id, destination_id)
119-
VALUES ($1, $2, $3, $4, $5)
119+
VALUES ($1, $2, $3, $4, $5)
120120
ON CONFLICT (job_run_id, task_run_id, source_id, destination_id, db_name) DO UPDATE SET ts = NOW()
121121
RETURNING id`, ksuid.New().String(), jobRunId, key.TaskRunID, key.SourceID, key.DestinationID)
122122
var id string
@@ -196,8 +196,8 @@ func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string,
196196
r.id,
197197
r.record_id,
198198
r.code
199-
FROM "rsources_failed_keys_v2_records" r
200-
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
199+
FROM "rsources_failed_keys_v2_records" r
200+
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
201201
ORDER BY r.id, r.record_id ASC %[2]s`,
202202
filters, limit)
203203

@@ -291,8 +291,8 @@ func (sh *sourcesHandler) GetFailedRecordsV1(ctx context.Context, jobRunId strin
291291
k.destination_id,
292292
r.id,
293293
r.record_id
294-
FROM "rsources_failed_keys_v2_records" r
295-
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
294+
FROM "rsources_failed_keys_v2_records" r
295+
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
296296
ORDER BY r.id, r.record_id ASC %[2]s`,
297297
filters, limit)
298298

@@ -430,7 +430,7 @@ func (sh *sourcesHandler) doCleanupTables(ctx context.Context) error {
430430
}
431431
before := time.Now().Add(-config.GetDuration("Rsources.retention", defaultRetentionPeriodInHours, time.Hour))
432432
if _, err := tx.ExecContext(ctx, `delete from "rsources_stats" where job_run_id in (
433-
select lastUpdateToJobRunId.job_run_id from
433+
select lastUpdateToJobRunId.job_run_id from
434434
(select job_run_id, max(ts) as mts from "rsources_stats" group by job_run_id) lastUpdateToJobRunId
435435
where lastUpdateToJobRunId.mts <= $1
436436
)`, before); err != nil {
@@ -444,7 +444,7 @@ func (sh *sourcesHandler) doCleanupTables(ctx context.Context) error {
444444
JOIN "rsources_failed_keys_v2_records" r on r.id = k.id
445445
GROUP BY k.job_run_id
446446
) lastUpdateToJobRunId WHERE lastUpdateToJobRunId.mts <= $1
447-
)
447+
)
448448
),
449449
deleted AS (
450450
DELETE FROM "rsources_failed_keys_v2" WHERE id IN (SELECT id FROM to_delete) RETURNING id
@@ -487,7 +487,7 @@ func (sh *sourcesHandler) init() error {
487487
return err
488488
}
489489

490-
if sh.sharedDB != nil {
490+
if sh.config.ShouldSetupSharedDB && sh.sharedDB != nil {
491491
if err := withAdvisoryLock(ctx, sh.sharedDB, lockID, func(_ *sql.Tx) error {
492492
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
493493
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
@@ -536,42 +536,42 @@ func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
536536
v_alphabet char array[62] := array[
537537
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
538538
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
539-
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
540-
'U', 'V', 'W', 'X', 'Y', 'Z',
541-
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
539+
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
540+
'U', 'V', 'W', 'X', 'Y', 'Z',
541+
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
542542
'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
543543
'u', 'v', 'w', 'x', 'y', 'z'];
544544
i integer := 0;
545545
begin
546-
546+
547547
-- Get the current time
548548
v_time := clock_timestamp();
549-
549+
550550
-- Extract epoch seconds
551551
v_seconds := EXTRACT(EPOCH FROM v_time) - v_epoch;
552-
552+
553553
-- Generate a KSUID in a numeric variable
554554
v_numeric := v_seconds * pow(2::numeric(50), 128) -- 32 bits for seconds and 128 bits for randomness
555555
+ ((random()::numeric(70,20) * pow(2::numeric(70,20), 48))::numeric(50) * pow(2::numeric(50), 80)::numeric(50))
556556
+ ((random()::numeric(70,20) * pow(2::numeric(70,20), 40))::numeric(50) * pow(2::numeric(50), 40)::numeric(50))
557557
+ (random()::numeric(70,20) * pow(2::numeric(70,20), 40))::numeric(50);
558-
558+
559559
-- Encode it to base-62
560560
while v_numeric <> 0 loop
561561
v_base62 := v_base62 || v_alphabet[mod(v_numeric, 62) + 1];
562562
v_numeric := div(v_numeric, 62);
563563
end loop;
564564
v_base62 := reverse(v_base62);
565565
v_base62 := lpad(v_base62, 27, '0');
566-
566+
567567
return v_base62;
568-
568+
569569
end $$ language plpgsql;`); err != nil {
570570
return fmt.Errorf("failed to create ksuid function: %w", err)
571571
}
572572

573573
if _, err := tx.ExecContext(ctx, `WITH new_keys AS (
574-
INSERT INTO "rsources_failed_keys_v2"
574+
INSERT INTO "rsources_failed_keys_v2"
575575
(id, job_run_id, task_run_id, source_id, destination_id, db_name)
576576
SELECT ksuid(), t.* FROM (
577577
SELECT DISTINCT job_run_id, task_run_id, source_id, destination_id, db_name from "rsources_failed_keys"
@@ -580,12 +580,12 @@ func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
580580
)
581581
INSERT INTO "rsources_failed_keys_v2_records" (id, record_id, ts)
582582
SELECT n.id, o.record_id::text, min(o.ts) FROM new_keys n
583-
JOIN rsources_failed_keys o
584-
on o.db_name = n.db_name
585-
and o.destination_id = n.destination_id
586-
and o.source_id = n.source_id
587-
and o.task_run_id = n.task_run_id
588-
and o.job_run_id = n.job_run_id
583+
JOIN rsources_failed_keys o
584+
on o.db_name = n.db_name
585+
and o.destination_id = n.destination_id
586+
and o.source_id = n.source_id
587+
and o.task_run_id = n.task_run_id
588+
and o.job_run_id = n.job_run_id
589589
group by n.id, o.record_id
590590
`); err != nil {
591591
return fmt.Errorf("failed to migrate rsources_failed_keys table: %w", err)
@@ -605,7 +605,7 @@ func migrateFailedKeysTable(ctx context.Context, tx *sql.Tx) error {
605605

606606
// TODO: Remove this after a few releases
607607
func setupFailedKeysTableV0(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
608-
sqlStatement := fmt.Sprintf(`create table "rsources_failed_keys" (
608+
sqlStatement := fmt.Sprintf(`create table "rsources_failed_keys" (
609609
id BIGSERIAL,
610610
db_name text not null default '%s',
611611
job_run_id text not null,
@@ -631,13 +631,13 @@ func setupFailedKeysTableV0(ctx context.Context, db *sql.DB, defaultDbName strin
631631
}
632632

633633
func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
634-
if _, err := db.ExecContext(ctx, fmt.Sprintf(`create table "rsources_failed_keys_v2" (
634+
if _, err := db.ExecContext(ctx, fmt.Sprintf(`create table "rsources_failed_keys_v2" (
635635
id VARCHAR(27) COLLATE "C",
636636
db_name text not null default '%s',
637637
job_run_id text not null,
638638
task_run_id text not null,
639639
source_id text not null,
640-
destination_id text not null,
640+
destination_id text not null,
641641
primary key (id),
642642
unique (job_run_id, task_run_id, source_id, destination_id, db_name)
643643
)`, defaultDbName)); err != nil {
@@ -648,7 +648,7 @@ func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string,
648648
}
649649
}
650650

651-
if _, err := db.ExecContext(ctx, `create table "rsources_failed_keys_v2_records" (
651+
if _, err := db.ExecContext(ctx, `create table "rsources_failed_keys_v2_records" (
652652
id VARCHAR(27) COLLATE "C",
653653
record_id text not null,
654654
ts timestamp not null default NOW(),

0 commit comments

Comments
 (0)