Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: server panics during shutdown with reporting metrics: failed to store jobs: context canceled #4228

Merged
merged 2 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions enterprise/reporting/error_index/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewErrorIndexReporter(ctx context.Context, log logger.Logger, configSubscri
}

// Report reports the metrics to the errorIndex JobsDB
func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
func (eir *ErrorIndexReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, tx *Tx) error {
failedAt := eir.now()

var jobs []*jobsdb.JobT
Expand Down Expand Up @@ -171,8 +171,8 @@ func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx)
if err != nil {
return fmt.Errorf("failed to resolve jobsdb: %w", err)
}
if err := db.WithStoreSafeTxFromTx(eir.ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return db.StoreInTx(eir.ctx, tx, jobs)
if err := db.WithStoreSafeTxFromTx(ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return db.StoreInTx(ctx, tx, jobs)
}); err != nil {
return fmt.Errorf("failed to store jobs: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions enterprise/reporting/error_index/error_index_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report(tc.reports, tx)
err = eir.Report(context.Background(), tc.reports, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
db, err := eir.resolveJobsDB(tx)
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{}, tx)
err = eir.Report(context.Background(), []*types.PUReportedMetric{}, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())

Expand Down Expand Up @@ -342,7 +342,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := pg2.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{
err = eir.Report(context.Background(), []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := pg1.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{
err = eir.Report(context.Background(), []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := pg3.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{
err = eir.Report(context.Background(), []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestErrorIndexReporter(t *testing.T) {
require.NoError(t, err)

tx := &Tx{Tx: sqlTx}
err = eir.Report(reports, tx)
err = eir.Report(context.Background(), reports, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())

Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ func (edr *ErrorDetailReporter) GetSyncer(syncerKey string) *types.SyncSource {
return edr.syncers[syncerKey]
}

func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
edr.log.Debug("[ErrorDetailReport] Report method called\n")
if len(metrics) == 0 {
return nil
}

stmt, err := txn.Prepare(pq.CopyIn(ErrorDetailReportsTable, ErrorDetailReportsColumns...))
stmt, err := txn.PrepareContext(ctx, pq.CopyIn(ErrorDetailReportsTable, ErrorDetailReportsColumns...))
if err != nil {
edr.log.Errorf("Failed during statement preparation: %v", err)
return fmt.Errorf("preparing statement: %v", err)
Expand All @@ -180,7 +180,7 @@ func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *T

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)
_, err = stmt.Exec(
_, err = stmt.ExecContext(ctx,
workspaceID,
edr.namespace,
edr.instanceID,
Expand All @@ -203,7 +203,7 @@ func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *T
}
}

_, err = stmt.Exec()
_, err = stmt.ExecContext(ctx)
if err != nil {
edr.log.Errorf("Failed during statement preparation: %v", err)
return fmt.Errorf("executing final statement: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/event_stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reporting

import (
"context"
"strconv"

"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -39,7 +40,7 @@ func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) {
}
}

func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
func (es *EventStatsReporter) Report(_ context.Context, metrics []*types.PUReportedMetric, tx *Tx) error {
tx.AddSuccessListener(func() {
es.Record(metrics)
})
Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
return rm
}

func (rm *Mediator) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
func (rm *Mediator) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
for _, reporter := range rm.reporters {
if err := reporter.Report(metrics, txn); err != nil {
if err := reporter.Report(ctx, metrics, txn); err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion enterprise/reporting/noop.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package reporting

import (
"context"

. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

// NOOP reporting implementation that does nothing
type NOOP struct{}

func (*NOOP) Report(_ []*types.PUReportedMetric, _ *Tx) error {
func (*NOOP) Report(_ context.Context, _ []*types.PUReportedMetric, _ *Tx) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,12 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t
return metric
}

func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
if len(metrics) == 0 {
return nil
}

stmt, err := txn.Prepare(pq.CopyIn(ReportsTable,
stmt, err := txn.PrepareContext(ctx, pq.CopyIn(ReportsTable,
"workspace_id", "namespace", "instance_id",
"source_definition_id",
"source_category",
Expand Down Expand Up @@ -617,7 +617,7 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) err
return fmt.Errorf("executing statement: %v", err)
}
}
if _, err = stmt.Exec(); err != nil {
if _, err = stmt.ExecContext(ctx); err != nil {
return fmt.Errorf("executing final statement: %v", err)
}

Expand Down
9 changes: 5 additions & 4 deletions mocks/utils/types/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2222,13 +2222,13 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
if err != nil {
return fmt.Errorf("publishing rsources stats: %w", err)
}
err = proc.saveDroppedJobs(in.droppedJobs, tx.Tx())
err = proc.saveDroppedJobs(ctx, in.droppedJobs, tx.Tx())
if err != nil {
return fmt.Errorf("saving dropped jobs: %w", err)
}

if proc.isReportingEnabled() {
if err = proc.reporting.Report(in.reportMetrics, tx.Tx()); err != nil {
if err = proc.reporting.Report(ctx, in.reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down Expand Up @@ -2684,7 +2684,7 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *Tx) error {
func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
if len(droppedJobs) > 0 {
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch
droppedJobs[i].JobID = int64(i)
Expand All @@ -2694,7 +2694,7 @@ func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *Tx) error {
rsources.IgnoreDestinationID(),
)
rsourcesStats.JobsDropped(droppedJobs)
return rsourcesStats.Publish(context.TODO(), tx.Tx)
return rsourcesStats.Publish(ctx, tx.Tx)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string,
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
return fmt.Errorf("marking %s job statuses as aborted: %w", brt.destType, err)
}
if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion router/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package router

import (
"context"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -50,5 +52,5 @@ func (f *Factory) New(destination *backendconfig.DestinationT) *Handle {
}

type reporter interface {
Report(metrics []*utilTypes.PUReportedMetric, txn *Tx) error
Report(ctx context.Context, metrics []*utilTypes.PUReportedMetric, txn *Tx) error
}
2 changes: 1 addition & 1 deletion router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if err != nil {
return err
}
if err = rt.Reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = rt.Reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package types

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -62,7 +63,7 @@ type ConfigEnvI interface {
// Reporting is interface to report metrics
type Reporting interface {
// Report reports metrics to reporting service
Report(metrics []*PUReportedMetric, tx *Tx) error
Report(ctx context.Context, metrics []*PUReportedMetric, tx *Tx) error

// DatabaseSyncer creates reporting tables in the database and returns a function to periodically sync the data
DatabaseSyncer(c SyncerConfig) ReportingSyncer
Expand Down
3 changes: 2 additions & 1 deletion warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func (job *UploadJob) setUploadStatus(statusOpts UploadStatusOpts) (err error) {
}
if job.config.reportingEnabled {
err = job.reporting.Report(
job.ctx,
[]*types.PUReportedMetric{&statusOpts.ReportingMetric},
tx.Tx,
)
Expand Down Expand Up @@ -717,7 +718,7 @@ func (job *UploadJob) setUploadError(statusError error, state string) (string, e
})
}
if job.config.reportingEnabled {
if err = job.reporting.Report(reportingMetrics, txn.Tx); err != nil {
if err = job.reporting.Report(job.ctx, reportingMetrics, txn.Tx); err != nil {
return "", fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
Loading