Skip to content

Commit

Permalink
Refactor job error handling
Browse files Browse the repository at this point in the history
Reorganized a bit the code to also record more failures in
`bgw_job_stat_history` table.

Also improved a bit the code coverage and added regression tests for
`failed do start job` case.
  • Loading branch information
fabriziomello committed Sep 27, 2024
1 parent bf74aa6 commit 3bdbf32
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 72 deletions.
60 changes: 4 additions & 56 deletions src/bgw/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "extension.h"
#include "job.h"
#include "job_stat.h"
#include "jsonb_utils.h"
#include "license_guc.h"
#include "scan_iterator.h"
#include "scanner.h"
Expand Down Expand Up @@ -162,57 +161,6 @@ job_config_check(BgwJob *job, Jsonb *config)
job->fd.id);
}

/* this function fills in a jsonb with the non-null fields of
the error data and also includes the proc name and schema in the jsonb
we include these here to avoid adding these fields to the table */
static Jsonb *
ts_errdata_to_jsonb(ErrorData *edata, Name proc_schema, Name proc_name)
{
JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
if (edata->sqlerrcode)
ts_jsonb_add_str(parse_state, "sqlerrcode", unpack_sql_state(edata->sqlerrcode));
if (edata->message)
ts_jsonb_add_str(parse_state, "message", edata->message);
if (edata->detail)
ts_jsonb_add_str(parse_state, "detail", edata->detail);
if (edata->hint)
ts_jsonb_add_str(parse_state, "hint", edata->hint);
if (edata->filename)
ts_jsonb_add_str(parse_state, "filename", edata->filename);
if (edata->lineno)
ts_jsonb_add_int32(parse_state, "lineno", edata->lineno);
if (edata->funcname)
ts_jsonb_add_str(parse_state, "funcname", edata->funcname);
if (edata->domain)
ts_jsonb_add_str(parse_state, "domain", edata->domain);
if (edata->context_domain)
ts_jsonb_add_str(parse_state, "context_domain", edata->context_domain);
if (edata->context)
ts_jsonb_add_str(parse_state, "context", edata->context);
if (edata->schema_name)
ts_jsonb_add_str(parse_state, "schema_name", edata->schema_name);
if (edata->table_name)
ts_jsonb_add_str(parse_state, "table_name", edata->table_name);
if (edata->column_name)
ts_jsonb_add_str(parse_state, "column_name", edata->column_name);
if (edata->datatype_name)
ts_jsonb_add_str(parse_state, "datatype_name", edata->datatype_name);
if (edata->constraint_name)
ts_jsonb_add_str(parse_state, "constraint_name", edata->constraint_name);
if (edata->internalquery)
ts_jsonb_add_str(parse_state, "internalquery", edata->internalquery);
if (edata->detail_log)
ts_jsonb_add_str(parse_state, "detail_log", edata->detail_log);
if (strlen(NameStr(*proc_schema)) > 0)
ts_jsonb_add_str(parse_state, "proc_schema", NameStr(*proc_schema));
if (strlen(NameStr(*proc_name)) > 0)
ts_jsonb_add_str(parse_state, "proc_name", NameStr(*proc_name));
/* we add the schema qualified name here as well*/
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return JsonbValueToJsonb(result);
}

static BgwJob *
bgw_job_from_tupleinfo(TupleInfo *ti, size_t alloc_size)
{
Expand Down Expand Up @@ -1037,7 +985,7 @@ ts_is_telemetry_job(BgwJob *job)
}
#endif

bool
JobResult
ts_bgw_job_execute(BgwJob *job)
{
#ifdef USE_TELEMETRY
Expand Down Expand Up @@ -1112,7 +1060,7 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
Oid db_oid = DatumGetObjectId(MyBgworkerEntry->bgw_main_arg);
BgwParams params;
BgwJob *job;
JobResult res = JOB_FAILURE;
JobResult res = JOB_FAILURE_IN_EXECUTION;
bool got_lock;
instr_time start;
instr_time duration;
Expand Down Expand Up @@ -1228,7 +1176,7 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
job->job_history.execution_start = params.job_history_execution_start;

ts_bgw_job_stat_mark_end(job,
JOB_FAILURE,
JOB_FAILURE_IN_EXECUTION,
ts_errdata_to_jsonb(edata, &proc_schema, &proc_name));
ts_bgw_job_check_max_retries(job);
pfree(job);
Expand Down Expand Up @@ -1315,7 +1263,7 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial
result = func();

if (mark)
ts_bgw_job_stat_mark_end(job, result ? JOB_SUCCESS : JOB_FAILURE, NULL);
ts_bgw_job_stat_mark_end(job, result ? JOB_SUCCESS : JOB_FAILURE_IN_EXECUTION, NULL);

/* Now update next_start. */
job_stat = ts_bgw_job_stat_find(job->fd.id);
Expand Down
10 changes: 9 additions & 1 deletion src/bgw/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ typedef struct BgwJob
BgwJobHistory job_history;
} BgwJob;

/* Positive result numbers reserved for success */
typedef enum JobResult
{
JOB_FAILURE_TO_START = -1,
JOB_FAILURE_IN_EXECUTION = 0,
JOB_SUCCESS = 1,
} JobResult;

typedef bool job_main_func(void);
typedef bool (*scheduler_test_hook_type)(BgwJob *job);

Expand Down Expand Up @@ -59,7 +67,7 @@ extern TSDLLEXPORT void ts_bgw_job_permission_check(BgwJob *job, const char *cmd

extern TSDLLEXPORT void ts_bgw_job_validate_job_owner(Oid owner);

extern bool ts_bgw_job_execute(BgwJob *job);
extern JobResult ts_bgw_job_execute(BgwJob *job);
extern TSDLLEXPORT void ts_bgw_job_run_config_check(Oid check, int32 job_id, Jsonb *config);

extern TSDLLEXPORT Datum ts_bgw_job_entrypoint(PG_FUNCTION_ARGS);
Expand Down
2 changes: 1 addition & 1 deletion src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job, int32 consecutive_f
/* Update the errors table regarding the crash */
if (!ts_flags_are_set_32(jobstat->fd.flags, LAST_CRASH_REPORTED))
{
ts_bgw_job_stat_mark_crash_reported(job, JOB_FAILURE);
ts_bgw_job_stat_mark_crash_reported(job, JOB_FAILURE_IN_EXECUTION);
}

return calculate_next_start_on_crash(jobstat->fd.consecutive_crashes, job);
Expand Down
8 changes: 0 additions & 8 deletions src/bgw/job_stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ typedef struct BgwJobStat
FormData_bgw_job_stat fd;
} BgwJobStat;

/* Positive result numbers reserved for success */
typedef enum JobResult
{
JOB_FAILURE_TO_START = -1,
JOB_FAILURE = 0,
JOB_SUCCESS = 1,
} JobResult;

extern TSDLLEXPORT BgwJobStat *ts_bgw_job_stat_find(int job_id);
extern void ts_bgw_job_stat_delete(int job_id);
extern TSDLLEXPORT void ts_bgw_job_stat_mark_start(BgwJob *job);
Expand Down
52 changes: 46 additions & 6 deletions src/bgw/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,45 @@ mark_job_as_started(ScheduledBgwJob *sjob)
}

static void
mark_job_as_ended(ScheduledBgwJob *sjob, JobResult res)
mark_job_as_ended(ScheduledBgwJob *sjob, JobResult res, Jsonb *edata)
{
Assert(sjob->may_need_mark_end);
ts_bgw_job_stat_mark_end(&sjob->job, res, NULL);
ts_bgw_job_stat_mark_end(&sjob->job, res, edata);
sjob->may_need_mark_end = false;
}

static ErrorData *
makeJobErrorData(ScheduledBgwJob *sjob, JobResult res)
{
ErrorData *edata = (ErrorData *) palloc0(sizeof(ErrorData));
edata->elevel = ERROR;
edata->sqlerrcode = ERRCODE_INTERNAL_ERROR;
edata->hint = NULL;

Assert(res != JOB_SUCCESS);

switch (res)
{
case JOB_FAILURE_TO_START:
edata->message = "failed to start job";
edata->detail = psprintf("Job %d (\"%s\") failed to start",
sjob->job.fd.id,
NameStr(sjob->job.fd.application_name));
break;
case JOB_FAILURE_IN_EXECUTION:
edata->message = "failed to execute job";
edata->detail = psprintf("Job %d (\"%s\") failed to execute.",
sjob->job.fd.id,
NameStr(sjob->job.fd.application_name));
break;
default:
pg_unreachable();

Check warning on line 201 in src/bgw/scheduler.c

View check run for this annotation

Codecov / codecov/patch

src/bgw/scheduler.c#L200-L201

Added lines #L200 - L201 were not covered by tests
break;
}

return edata;
}

static void
worker_state_cleanup(ScheduledBgwJob *sjob)
{
Expand Down Expand Up @@ -224,11 +256,14 @@ worker_state_cleanup(ScheduledBgwJob *sjob)
* Usually the job process will mark the end, but if the job gets
* a signal (cancel or terminate), it won't be able to so we
* should.
* TODO: Insert a record in the job_errors table informing of this failure
* Currently the SIGTERM case is not handled, there might be other cases as well
*/
elog(LOG, "job %d failed", sjob->job.fd.id);
mark_job_as_ended(sjob, JOB_FAILURE);
ErrorData *edata = makeJobErrorData(sjob, JOB_FAILURE_IN_EXECUTION);
mark_job_as_ended(sjob,
JOB_FAILURE_IN_EXECUTION,
ts_errdata_to_jsonb(edata,
&sjob->job.fd.proc_schema,
&sjob->job.fd.proc_name));
}
else
{
Expand Down Expand Up @@ -357,7 +392,12 @@ on_failure_to_start_job(ScheduledBgwJob *sjob)
/* restore the original next_start to maintain priority (it is unset during mark_start) */
if (sjob->next_start != DT_NOBEGIN)
ts_bgw_job_stat_set_next_start(sjob->job.fd.id, sjob->next_start);
mark_job_as_ended(sjob, JOB_FAILURE_TO_START);
ErrorData *edata = makeJobErrorData(sjob, JOB_FAILURE_TO_START);
mark_job_as_ended(sjob,
JOB_FAILURE_TO_START,
ts_errdata_to_jsonb(edata,
&sjob->job.fd.proc_schema,
&sjob->job.fd.proc_name));
}
scheduled_bgw_job_transition_state_to(sjob, JOB_STATE_SCHEDULED);
CommitTransactionCommand();
Expand Down
52 changes: 52 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "debug_point.h"
#include "guc.h"
#include "hypertable_cache.h"
#include "jsonb_utils.h"
#include "time_utils.h"
#include "utils.h"

Expand Down Expand Up @@ -1822,3 +1823,54 @@ ts_is_hypercore_am(Oid amoid)

return amoid == hypercore_amoid;
}

/* this function fills in a jsonb with the non-null fields of
the error data and also includes the proc name and schema in the jsonb
we include these here to avoid adding these fields to the table */
Jsonb *
ts_errdata_to_jsonb(ErrorData *edata, Name proc_schema, Name proc_name)
{
JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
if (edata->sqlerrcode)
ts_jsonb_add_str(parse_state, "sqlerrcode", unpack_sql_state(edata->sqlerrcode));
if (edata->message)
ts_jsonb_add_str(parse_state, "message", edata->message);
if (edata->detail)
ts_jsonb_add_str(parse_state, "detail", edata->detail);
if (edata->hint)
ts_jsonb_add_str(parse_state, "hint", edata->hint);
if (edata->filename)
ts_jsonb_add_str(parse_state, "filename", edata->filename);
if (edata->lineno)
ts_jsonb_add_int32(parse_state, "lineno", edata->lineno);
if (edata->funcname)
ts_jsonb_add_str(parse_state, "funcname", edata->funcname);
if (edata->domain)
ts_jsonb_add_str(parse_state, "domain", edata->domain);
if (edata->context_domain)
ts_jsonb_add_str(parse_state, "context_domain", edata->context_domain);
if (edata->context)
ts_jsonb_add_str(parse_state, "context", edata->context);
if (edata->schema_name)
ts_jsonb_add_str(parse_state, "schema_name", edata->schema_name);
if (edata->table_name)
ts_jsonb_add_str(parse_state, "table_name", edata->table_name);
if (edata->column_name)
ts_jsonb_add_str(parse_state, "column_name", edata->column_name);
if (edata->datatype_name)
ts_jsonb_add_str(parse_state, "datatype_name", edata->datatype_name);
if (edata->constraint_name)
ts_jsonb_add_str(parse_state, "constraint_name", edata->constraint_name);
if (edata->internalquery)
ts_jsonb_add_str(parse_state, "internalquery", edata->internalquery);
if (edata->detail_log)
ts_jsonb_add_str(parse_state, "detail_log", edata->detail_log);
if (strlen(NameStr(*proc_schema)) > 0)
ts_jsonb_add_str(parse_state, "proc_schema", NameStr(*proc_schema));
if (strlen(NameStr(*proc_name)) > 0)
ts_jsonb_add_str(parse_state, "proc_name", NameStr(*proc_name));
/* we add the schema qualified name here as well*/
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return JsonbValueToJsonb(result);
}
1 change: 1 addition & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,4 @@ extern TSDLLEXPORT void ts_get_rel_info_by_name(const char *relnamespace, const
Oid *relid, Oid *amoid, char *relkind);
extern TSDLLEXPORT void ts_get_rel_info(Oid relid, Oid *amoid, char *relkind);
extern TSDLLEXPORT bool ts_is_hypercore_am(Oid amoid);
extern TSDLLEXPORT Jsonb *ts_errdata_to_jsonb(ErrorData *edata, Name proc_schema, Name proc_name);
10 changes: 10 additions & 0 deletions test/expected/test_utils.out
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,13 @@ NOTICE: 4. IndexScan: "_timescaledb_internal._hyper_1_2_chunk"

(1 row)

-- Test errdata_to_jsonb
RESET ROLE;
CREATE OR REPLACE FUNCTION test.errdata_to_jsonb() RETURNS JSONB
AS :MODULE_PATHNAME, 'ts_test_errdata_to_jsonb' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
SELECT test.errdata_to_jsonb();
errdata_to_jsonb
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"hint": "test error hint", "detail": "test error detail", "domain": "test error domain", "lineno": 123, "context": "test error context", "message": "test error message", "filename": "test error filename", "funcname": "test error function", "proc_name": "proc_name", "detail_log": "test error detail log", "sqlerrcode": "22023", "table_name": "test error table", "column_name": "test error column", "proc_schema": "proc_schema", "schema_name": "test error schema", "datatype_name": "test error datatype", "internalquery": "test error internal query", "context_domain": "test error context domain", "constraint_name": "test error constraint"}
(1 row)

6 changes: 6 additions & 0 deletions test/sql/test_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ CREATE TABLE hyper (time timestamptz, temp float);
SELECT create_hypertable('hyper', 'time');
INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0);
SELECT test.scanner();

-- Test errdata_to_jsonb
RESET ROLE;
CREATE OR REPLACE FUNCTION test.errdata_to_jsonb() RETURNS JSONB
AS :MODULE_PATHNAME, 'ts_test_errdata_to_jsonb' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
SELECT test.errdata_to_jsonb();
21 changes: 21 additions & 0 deletions test/sql/utils/testsupport.sql
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,24 @@ BEGIN
RETURN false;
END
$BODY$;

-- Wait for job to run or fail
CREATE OR REPLACE FUNCTION test.wait_for_job_to_run_or_fail(job_param_id INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS)
RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
DECLARE
r RECORD;
BEGIN
FOR i in 1..spins
LOOP
SELECT total_runs FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r;
IF (r.total_runs > 0) THEN
RETURN true;
ELSE
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RAISE INFO 'wait_for_job_to_run_or_fail: timeout after % tries', spins;
RETURN false;
END
$BODY$;
42 changes: 42 additions & 0 deletions test/src/test_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "debug_point.h"
#include "extension_constants.h"
#include "utils.h"

TS_FUNCTION_INFO_V1(ts_test_error_injection);
TS_FUNCTION_INFO_V1(ts_debug_shippable_error_after_n_rows);
Expand Down Expand Up @@ -335,3 +336,44 @@ ts_debug_allocated_bytes(PG_FUNCTION_ARGS)

PG_RETURN_UINT64(MemoryContextMemAllocated(context, /* recurse = */ true));
}

TS_TEST_FN(ts_test_errdata_to_jsonb)
{
ErrorData *edata = (ErrorData *) palloc(sizeof(ErrorData));
edata->elevel = ERROR;
edata->output_to_server = true;
edata->output_to_client = true;
edata->hide_stmt = false;
edata->hide_ctx = false;
edata->filename = "test error filename";
edata->lineno = 123;
edata->funcname = "test error function";
edata->domain = "test error domain";
edata->context_domain = "test error context domain";
edata->sqlerrcode = ERRCODE_INVALID_PARAMETER_VALUE;
edata->message = "test error message";
edata->detail = "test error detail";
edata->detail_log = "test error detail log";
edata->hint = "test error hint";
edata->context = "test error context";
edata->backtrace = "test error backtrace";
edata->message_id = "test error message id";
edata->schema_name = "test error schema";
edata->table_name = "test error table";
edata->column_name = "test error column";
edata->datatype_name = "test error datatype";
edata->constraint_name = "test error constraint";
edata->cursorpos = 42;
edata->internalpos = 42;
edata->internalquery = "test error internal query";
edata->saved_errno = 42;

NameData proc_schema = { .data = { 0 } };
NameData proc_name = { .data = { 0 } };
namestrcpy(&proc_schema, "proc_schema");
namestrcpy(&proc_name, "proc_name");

Jsonb *out = ts_errdata_to_jsonb(edata, &proc_schema, &proc_name);

PG_RETURN_JSONB_P(out);
}
Loading

0 comments on commit 3bdbf32

Please sign in to comment.