Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nginx upsync 1.8.x #93

Open
wants to merge 4 commits into
base: nginx-upsync-1.8.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 302 additions & 4 deletions src/ngx_http_upsync_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct {

#define NGX_HTTP_UPSYNC_CONSUL 0x0001
#define NGX_HTTP_UPSYNC_ETCD 0x0002
#define NGX_HTTP_UPSYNC_ZK 0x0003


typedef ngx_int_t (*ngx_http_upsync_packet_init_pt)
Expand Down Expand Up @@ -186,6 +187,7 @@ static void ngx_http_upsync_timeout_handler(ngx_event_t *event);
static void ngx_http_upsync_clean_event(void *upsync_server);
static ngx_int_t ngx_http_upsync_etcd_parse_init(void *upsync_server);
static ngx_int_t ngx_http_upsync_consul_parse_init(void *upsync_server);
static ngx_int_t ngx_http_upsync_zk_parse_init(void *upsync_server);
static ngx_int_t ngx_http_upsync_dump_server(
ngx_http_upsync_server_t *upsync_server);
static ngx_int_t ngx_http_upsync_init_server(ngx_event_t *event);
Expand Down Expand Up @@ -218,6 +220,7 @@ static ngx_int_t ngx_http_upsync_check_index(
ngx_http_upsync_server_t *upsync_server);
static ngx_int_t ngx_http_upsync_consul_parse_json(void *upsync_server);
static ngx_int_t ngx_http_upsync_etcd_parse_json(void *upsync_server);
static ngx_int_t ngx_http_upsync_zk_parse_json(void *upsync_server);
static ngx_int_t ngx_http_upsync_check_key(u_char *key);
static void *ngx_http_upsync_servers(ngx_cycle_t *cycle,
ngx_http_upsync_server_t *upsync_server, ngx_flag_t flag);
Expand Down Expand Up @@ -347,6 +350,14 @@ static ngx_upsync_conf_t ngx_upsync_types[] = {
ngx_http_upsync_etcd_parse_init,
ngx_http_upsync_etcd_parse_json,
ngx_http_upsync_clean_event },

{ ngx_string("zk"),
NGX_HTTP_UPSYNC_ZK,
ngx_http_upsync_send_handler,
ngx_http_upsync_recv_handler,
ngx_http_upsync_zk_parse_init,
ngx_http_upsync_zk_parse_json,
ngx_http_upsync_clean_event },

{ ngx_null_string,
0,
Expand Down Expand Up @@ -708,6 +719,8 @@ ngx_http_upsync_check_index(ngx_http_upsync_server_t *upsync_server)
ngx_upsync_conf_t *upsync_type_conf;

upsync_type_conf = upsync_server->upscf->upsync_type_conf;

//todo do we need to check the zk index?

if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_CONSUL) {
for (i = 0; i < state.num_headers; i++) {
Expand Down Expand Up @@ -1587,6 +1600,226 @@ ngx_http_upsync_etcd_parse_json(void *data)
return NGX_OK;
}

static ngx_int_t
ngx_http_upsync_zk_parse_json(void *data)
{
u_char *p;
ngx_buf_t *buf;
ngx_int_t max_fails=2, backup=0, down=0;
ngx_str_t src, dst;
ngx_http_upsync_ctx_t *ctx;
ngx_http_upsync_conf_t *upstream_conf = NULL;
ngx_http_upsync_server_t *upsync_server = data;

ctx = &upsync_server->ctx;
buf = &ctx->body;

src.len = 0, src.data = NULL;
dst.len = 0, dst.data = NULL;

cJSON *root = cJSON_Parse((char *)buf->pos);
if (root == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: root error");
return NGX_ERROR;
}

if (ngx_array_init(&ctx->upstream_conf, ctx->pool, 16,
sizeof(*upstream_conf)) != NGX_OK)
{
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: array init error");
cJSON_Delete(root);
return NGX_ERROR;
}

cJSON *children = cJSON_GetObjectItem(root, "children");
if (children == NULL) {
cJSON_Delete(root);
return NGX_ERROR;
}

cJSON *server_next;
cJSON *node = cJSON_GetObjectItem(children, "path");


server_next = node == NULL ? children->child : children;
for (; server_next != NULL;
server_next = server_next->next)
{
cJSON *temp1 = cJSON_GetObjectItem(server_next, "path");
if (temp1 != NULL && temp1->valuestring != NULL) {
p = (u_char *)ngx_strrchr(temp1->valuestring, '/');
if (p == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: %s key format is illegal, "
"contains no slash ('/')", temp1->valuestring);
continue;
} else if (ngx_http_upsync_check_key(p) != NGX_OK) {
continue;
}

upstream_conf = ngx_array_push(&ctx->upstream_conf);
ngx_memzero(upstream_conf, sizeof(*upstream_conf));
ngx_sprintf(upstream_conf->sockaddr, "%*s", ngx_strlen(p + 1), p + 1);
}
temp1 = NULL;

temp1 = cJSON_GetObjectItem(server_next, "data64");
if (temp1 != NULL && temp1->valuestring != NULL) {

src.data = (u_char *)temp1->valuestring;
src.len = ngx_strlen(temp1->valuestring);

if (dst.data == NULL) {
dst.data = ngx_pcalloc(ctx->pool, 1024);

} else {
ngx_memzero(dst.data, 1024);
}
dst.len = 0;

ngx_decode_base64(&dst, &src);
}
temp1 = NULL;

/* default value, server attribute */
upstream_conf->weight = 1;
upstream_conf->max_fails = 2;
upstream_conf->fail_timeout = 10;

upstream_conf->down = 0;
upstream_conf->backup = 0;

p = NULL;

if (dst.data != NULL && dst.len != 0) {

p = dst.data;
cJSON *sub_root = cJSON_Parse((char *)p);
if (sub_root == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: parse attribute json failed,"
"setting server attribute to default value");
continue;
}

cJSON *sub_attribute = sub_root;
cJSON *temp1 = cJSON_GetObjectItem(sub_attribute, "weight");
if (temp1 != NULL) {

if (temp1->valuestring != NULL) {
upstream_conf->weight = ngx_atoi((u_char *)temp1->valuestring,
(size_t)ngx_strlen(temp1->valuestring));

} else if (temp1->valueint >= 0) {
upstream_conf->weight = temp1->valueint;
}
}
temp1 = NULL;

temp1 = cJSON_GetObjectItem(sub_attribute, "max_fails");
if (temp1 != NULL) {

if (temp1->valuestring != NULL) {
max_fails = ngx_atoi((u_char *)temp1->valuestring,
(size_t)ngx_strlen(temp1->valuestring));

} else if (temp1->valueint >= 0) {
max_fails = temp1->valueint;
}
}
temp1 = NULL;

temp1 = cJSON_GetObjectItem(sub_attribute, "fail_timeout");
if (temp1 != NULL){

if (temp1->valuestring != NULL) {

upstream_conf->fail_timeout = ngx_atoi((u_char *)temp1->valuestring,
(size_t)ngx_strlen(temp1->valuestring));

} else if (temp1->valueint >= 0) {
upstream_conf->fail_timeout = temp1->valueint;
}
}
temp1 = NULL;

temp1 = cJSON_GetObjectItem(sub_attribute, "down");
if (temp1 != NULL) {

if (temp1->valueint != 0) {
down = temp1->valueint;

} else if (temp1->valuestring != NULL) {
down = ngx_atoi((u_char *)temp1->valuestring,
(size_t)ngx_strlen(temp1->valuestring));
}
}
temp1 = NULL;

temp1 = cJSON_GetObjectItem(sub_attribute, "backup");
if (temp1 != NULL) {

if (temp1->valueint != 0) {
backup = temp1->valueint;

} else if (temp1->valuestring != NULL) {
backup = ngx_atoi((u_char *)temp1->valuestring,
(size_t)ngx_strlen(temp1->valuestring));
}
}
temp1 = NULL;

dst.len = 0;
cJSON_Delete(sub_root);
}

if (upstream_conf->weight <= 0) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: \"weight\" value is invalid"
", setting default value 1");
upstream_conf->weight = 1;
}

if (max_fails < 0) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: \"max_fails\" value is invalid"
", setting default value 2");
} else {
upstream_conf->max_fails = (ngx_uint_t)max_fails;
}

if (upstream_conf->fail_timeout < 0) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: \"fail_timeout\" value is invalid"
", setting default value 10");
upstream_conf->fail_timeout = 10;
}

if (down != 1 && down != 0) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: \"down\" value is invalid"
", setting default value 0");
} else {
upstream_conf->down = (ngx_uint_t)down;
}

if (backup != 1 && backup != 0) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_parse_json: \"backup\" value is invalid"
", setting default value 0");
} else {
upstream_conf->backup = (ngx_uint_t)backup;
}

max_fails=2, backup=0, down=0;
}
cJSON_Delete(root);

return NGX_OK;
}


static ngx_int_t
ngx_http_upsync_check_key(u_char *key)
Expand Down Expand Up @@ -2582,6 +2815,12 @@ ngx_http_upsync_send_handler(ngx_event_t *event)

}
}

if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ZK) {
ngx_sprintf(request, "GET %V?view=children&recursive=true"
" HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n",
&upscf->upsync_send, &upscf->conf_server.name);
}

ctx->send.pos = request;
ctx->send.last = ctx->send.pos + ngx_strlen(request);
Expand Down Expand Up @@ -2837,6 +3076,56 @@ ngx_http_upsync_etcd_parse_init(void *data)
}


static ngx_int_t
ngx_http_upsync_zk_parse_init(void *data)
{
char *buf;
size_t parsed;
ngx_http_upsync_ctx_t *ctx;
ngx_http_upsync_server_t *upsync_server = data;

ctx = &upsync_server->ctx;

if (ngx_http_parser_init() == NGX_ERROR) {
return NGX_ERROR;
}

buf = (char *)ctx->recv.pos;

ctx->body.pos = ctx->body.last = NULL;

parsed = http_parser_execute(parser, &settings, buf, ngx_strlen(buf));
if (parsed != ngx_strlen(buf)) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"upsync_consul_parse_init: parsed body size is wrong");
return NGX_ERROR;
}

if (ngx_strncmp(state.status, "OK", 2) == 0) {

if (ngx_strlen(state.http_body) != 0) {
ctx->body.pos = state.http_body;
ctx->body.last = state.http_body + ngx_strlen(state.http_body);

}
}

if (parser != NULL) {
ngx_free(parser);
parser = NULL;
}

if (ctx->body.pos != ctx->body.last) {
*(ctx->body.last + 1) = '\0';

} else {
return NGX_ERROR;
}

return NGX_OK;
}


static ngx_int_t
ngx_http_upsync_dump_server(ngx_http_upsync_server_t *upsync_server)
{
Expand Down Expand Up @@ -3411,7 +3700,8 @@ ngx_http_upsync_clean_event(void *data)
}

if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_CONSUL
|| upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ETCD)
|| upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ETCD
|| upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ZK)
{
if (parser != NULL) {
ngx_free(parser);
Expand Down Expand Up @@ -3652,6 +3942,13 @@ ngx_http_client_send(ngx_http_conf_client *client,
"Accept: */*\r\n\r\n",
&upscf->upsync_send, &upscf->conf_server.name);
}

if (upsync_type_conf->upsync_type == NGX_HTTP_UPSYNC_ZK) {
ngx_sprintf(request, "GET %V?view=children&recursive=true"
" HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n",
&upscf->upsync_send, &upscf->conf_server.name);

}

size = ngx_strlen(request);
while(send_num < size) {
Expand Down Expand Up @@ -3819,12 +4116,12 @@ ngx_http_upsync_show(ngx_http_request_t *r)

goto end;
}
for (i = 0; i < umcf->upstreams.nelts; i++) {
for (i = 0; i < umcf->upstreams.nelts; i++) {
ngx_http_upsync_show_upstream(uscfp[i], b);
b->last = ngx_snprintf(b->last, b->end - b->last, "\n");
}
goto end;
}

Expand Down Expand Up @@ -3854,3 +4151,4 @@ ngx_http_upsync_show(ngx_http_request_t *r)

return ret;
}