-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatcher.h
401 lines (315 loc) · 10.2 KB
/
dispatcher.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
#ifndef __DISPATCHER_DISPATCHER_H__
#define __DISPATCHER_DISPATCHER_H__
#include "config.h"
/* standard headers */
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <stdbool.h> /* _Bool */
#include <stdint.h> /* int32_t */
#include <time.h> /* time */
#include <ctype.h> /* isspace */
/* other headers */
#include <unistd.h> /* sleep, fork, getopt */
#include <signal.h> /* SIGCHLD handling */
#include <sys/wait.h> /* wait */
#include <syslog.h> /* system log */
#include <mysql/mysql.h> /* mysql */
#include <mysql/errmsg.h>
#include <mysql/mysqld_error.h> /* mysql errno's */
#include <libgearman/gearman.h> /* gearman */
#include <yaml.h> /* yaml */
/* check macros */
#ifdef __GNUC__
#define ATTRIBUTE_PRINTF(A1,A2) __attribute__ ((format (printf,A1,A2)))
#define ATTRIBUTE_SENTINEL __attribute__ ((sentinel))
#define ATTRIBUTE_NONNULL(...) __attribute__ ((nonnull (__VA_ARGS__)))
#else
#define ATTRIBUTE_PRINTF(A1,A2)
#define ATTRIBUTE_SENTINEL
#define ATTRIBUTE_NONNULL(...)
#endif /* __GNUC__ */
/* global defines */
#define BUFFER_QUERY 8192 /* initial and minimal size of query buffer */
#define BUFFER_YAML 4096 /* initial and minimal size of yaml buffer */
#define BUFFER_SIZE_MAX 1024 /* length of internal buffers */
#define FORCE_TERMINATE_COUNT 3 /* number of terminate signals that force quit */
#define TIMESPEC_0_1_SEC {0, 100000000L} /* 0.1 sec as struct timespec */
#define NSEC_IN_SEC 1000000000L /* number of nsec in 1 sec */
/* internal errors for MySQL results */
/* NOTE: proper escaping for MySQL */
#define RESULT_ERROR_GEARMAN "---\\n:status: :fail\\n:message: gearman work not successful\\n"
#define RESULT_ERROR_YAML "---\\n:status: :fail\\n:message: yaml parsing error\\n"
#if 0
#undef LOG_WARNING
#undef LOG_DEBUG
#undef LOG_INFO
#define LOG_WARNING LOG_ERR
#define LOG_DEBUG LOG_ERR
#define LOG_INFO LOG_ERR
#endif
/* dispatcher configuration */
typedef struct dp_config {
struct {
char *host;
char *db;
char *user;
char *passwd;
char *table;
int port;
} mysql;
struct {
char *host;
int port;
} gearman;
struct {
uint16_t failed_delay;
uint16_t timeout_delay;
char *environment;
bool priority;
} task;
struct {
char *dispatcher;
char *worker;
int level;
int facility;
} log;
struct {
uint16_t loop;
uint16_t terminated;
uint16_t paused;
} sense;
uint16_t sleep_loop;
} dp_config;
typedef struct dp_enum {
const char *name;
int value;
} dp_enum;
/* simple buffer */
typedef struct dp_buffer {
char *str; /* string associated with buffer */
size_t size; /* length of the buffer string */
size_t pool; /* allocated length of buffer */
} dp_buffer;
/* task definition structure */
typedef struct dp_task {
int id; /* id number of the task */
int priority; /* priority of the task */
char *type; /* call destination for gearman */
char *description; /* call parameters for gearman */
char *status; /* task status, i.e. new, working, done */
time_t run_after; /* timestamp for delayed execution, timeouts handling */
} dp_task;
/* task result definition structure */
typedef struct dp_task_result {
bool status;
double time_elapsed;
} dp_task_result;
/* child definition structure */
typedef struct dp_child {
pid_t pid; /* pid of child */
dp_task task; /* task associated with child */
time_t stamp; /* stamp associated with child */
bool null; /* indicate empty entry */
} dp_child;
/* configuration fields ids */
typedef enum dp_config_val {
DP_CONFIG_UNKNOWN,
DP_CONFIG_MYSQL_HOST,
DP_CONFIG_MYSQL_DB,
DP_CONFIG_MYSQL_USER,
DP_CONFIG_MYSQL_PASSWD,
DP_CONFIG_MYSQL_TABLE,
DP_CONFIG_MYSQL_PORT,
DP_CONFIG_GEARMAN_HOST,
DP_CONFIG_GEARMAN_PORT,
DP_CONFIG_TASK_FAILED_DELAY,
DP_CONFIG_TASK_TIMEOUT_DELAY,
DP_CONFIG_TASK_ENVIRONMENT,
DP_CONFIG_TASK_PRIORITY,
DP_CONFIG_LOG_DISPATCHER,
DP_CONFIG_LOG_WORKER,
DP_CONFIG_LOG_LEVEL,
DP_CONFIG_LOG_FACILITY,
DP_CONFIG_SENSE_LOOP,
DP_CONFIG_SENSE_TERMINATED,
DP_CONFIG_SENSE_PAUSED,
DP_CONFIG_SLEEP_LOOP
} dp_config_val;
/*
* fork helper functions
*/
/* forked child function */
int dp_fork_exec(dp_child *worker);
/* initialize signal handling (logged) for fork */
bool dp_fork_signal_init();
/*
* signal management functions
*/
/* initialize signal handling (logged) */
bool dp_signal_init();
/* block specific signal */
bool dp_signal_block(int signum);
/* unblock specific signal */
bool dp_signal_unblock(int signum);
/*
* dispatcher config functions
*/
/* initialize configuration */
bool dp_config_init();
/* get config field id */
dp_config_val dp_config_field(const char *name);
/* assign field value in config */
bool dp_config_set(dp_config *config,
dp_config_val field,
char *value,
bool if_dup);
/* free data associated with config */
void dp_config_free(dp_config *config);
/*
* string buffer functions
*/
/* allocate buffer with specific pool size */
dp_buffer *dp_buffer_new(size_t pool);
/* initialize buffer with specific pool size */
dp_buffer *dp_buffer_init(dp_buffer *buf,
size_t pool);
/* free buffer */
void dp_buffer_free(dp_buffer *buf);
/* insert format string, grow as necessary */
dp_buffer *dp_buffer_printf(dp_buffer *buf,
const char *format, ...) ATTRIBUTE_PRINTF(2, 3);
/* append buffer to buffer, grow as necessary */
dp_buffer *dp_buffer_append(dp_buffer *buf,
const dp_buffer *append);
/* append format string, grow as necessary */
dp_buffer *dp_buffer_append_printf(dp_buffer *buf,
const char *format, ...) ATTRIBUTE_PRINTF(2, 3);
/*
* gearman helper functions */
/* initialize gearman (logged) */
bool dp_gearman_init(gearman_client_st **client);
/* get status from gearman reply */
bool dp_gearman_get_result(dp_task_result *result,
const char *worker_result,
size_t worker_result_size);
/*
* MySQL wrapper functions
*/
/* initialize MySQL (logged) */
bool dp_mysql_init(MYSQL **db);
/* connect to MySQL (logged) */
bool dp_mysql_connect(MYSQL *db);
/* execute MySQL query (logged), recover connection and retry if possible */
bool dp_mysql_query(MYSQL *db,
const char *query,
bool if_retry);
/* extract MySQL stored task (logged) */
bool dp_mysql_get_task(dp_task *task,
MYSQL_RES *result);
/* extract MySQL int variable (logged) */
bool dp_mysql_get_int(int *value,
MYSQL_RES *result);
/* free data associated with task */
void dp_mysql_task_free(dp_task *task);
/* clear data associated with task */
void dp_mysql_task_clear(dp_task *task);
/*
* YAML parsing functions
*/
/* write handler for yaml emitters */
int dp_yaml_write_handler(void *data,
unsigned char *buffer,
size_t size);
/* process task description */
dp_buffer *dp_yaml_task_description(const dp_task *task);
/* add map node at document root */
bool dp_yaml_add_map_node(yaml_document_t *document,
const char *key,
const char *value);
/* add int map node at document root */
bool dp_yaml_add_map_node_int(yaml_document_t *document,
const char *key,
int value);
/* get field value (single-line) from string */
char *dp_yaml_field_line(const char *yaml,
const char *field) ATTRIBUTE_NONNULL(1, 2);
/*
* logging functions
*/
/* initialize logging capabilities */
void dp_logger_init(const char *ident);
/* log message with specific priority */
void dp_logger(int priority,
const char *message, ...) ATTRIBUTE_PRINTF(2, 3);
/*
* enum helpers
*/
/* extract specific enum by name */
dp_enum *dp_enum_name(dp_enum *self,
const char *name);
/* extract specific enum by value */
dp_enum *dp_enum_value(dp_enum *self,
int value);
/*
* basic string helpers
*/
/* dup string helper */
char *dp_strdup(const char *str);
/* sized dup string helper */
char *dp_strudup(const char *str,
size_t length);
/* sized strchr string helper */
char *dp_struchr(const char *str,
size_t length,
char character);
/* sized strstr string helper */
char *dp_strustr(const char *str,
size_t length,
const char *locate);
/* concatenate string helper */
char *dp_strcat(const char *str, ...) ATTRIBUTE_SENTINEL;
/* escape string helper */
char *dp_strescape(const char *str);
/* sized escape string helper */
char *dp_struescape(const char *str,
size_t length);
/*
* timespec operations
*/
/* multiply timespec be multiplier */
void dp_timespec_mul(struct timespec *self,
double multiplier);
/* check if timespec is more that value in sec */
bool dp_timespec_more(struct timespec *self,
double value);
/* return floating point representation of timespec in secs */
double dp_timespec_double(struct timespec *self);
/*
* signal handlers
*/
void dp_sigchld(int signal);
void dp_sighup(int signal);
void dp_sigtermint(int signal);
void dp_sigusr1(int signal);
void dp_sigusr2(int signal);
/*
* dispatcher status processing
*/
/* initialize child_status table */
bool dp_status_init();
/* free data associated with child_status table */
void dp_status_free();
/* process child_status table */
void dp_status_update();
/* process child_status table timeouts */
void dp_status_timeout(time_t timestamp);
/*
* dispatcher status search
*/
/* find first null entry in child_status array */
dp_child *dp_child_null();
/* find child with pid in child_status array */
dp_child *dp_child_pid(pid_t pid);
#endif /* !__DISPATCHER_DISPATCHER_H__ */