Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP(incomplete) support meta-text protocol #60

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conf/nutcracker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ gamma:
server_failure_limit: 3
servers:
- 127.0.0.1:11212:1
- 127.0.0.1:11213:1

delta:
listen: 127.0.0.1:22124
Expand Down
27 changes: 27 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ typedef enum msg_parse_result {
ACTION( REQ_MC_DECR ) \
ACTION( REQ_MC_TOUCH ) /* memcache touch request */ \
ACTION( REQ_MC_QUIT ) /* memcache quit request */ \
ACTION( REQ_MC_VERSION ) /* memcache version request */ \
ACTION( REQ_MC_MG ) /* memcache meta commands - meta-get */ \
ACTION( REQ_MC_MS ) /* meta-set */ \
ACTION( REQ_MC_ME ) /* meta-debug */ \
ACTION( REQ_MC_MN ) /* no-op */ \
ACTION( RSP_MC_NUM ) /* memcache arithmetic response */ \
ACTION( RSP_MC_STORED ) /* memcache cas and storage response */ \
ACTION( RSP_MC_NOT_STORED ) \
Expand All @@ -57,6 +62,12 @@ typedef enum msg_parse_result {
ACTION( RSP_MC_ERROR ) /* memcache error responses */ \
ACTION( RSP_MC_CLIENT_ERROR ) \
ACTION( RSP_MC_SERVER_ERROR ) \
ACTION( RSP_MC_VERSION ) \
ACTION( RSP_MC_VA ) /* memcache meta responses - value*/ \
ACTION( RSP_MC_EN ) /* meta response end */ \
ACTION( RSP_MC_MN ) /* meta response end */ \
ACTION( RSP_MC_ME ) /* meta debug end */ \
ACTION( RSP_MC_OK ) /* meta response ok */ \
ACTION( REQ_REDIS_COPY ) /* redis commands - keys */ \
ACTION( REQ_REDIS_DEL ) \
ACTION( REQ_REDIS_EXISTS ) \
Expand Down Expand Up @@ -411,4 +422,20 @@ inline static rstatus_t msg_fragment(struct msg *m, uint32_t nservers, struct ms
}
}

/*
* Set a placeholder key for a command with no key
* that is forwarded to an arbitrary backend.
*/
inline static bool set_placeholder_key(struct msg *r)
{
struct keypos *kpos;
ASSERT(array_n(r->keys) == 0);
kpos = array_push(r->keys);
if (kpos == NULL) {
return false;
}
kpos->start = (uint8_t *)"placeholder";
kpos->end = kpos->start + sizeof("placeholder") - 1;
return true;
}
#endif
7 changes: 5 additions & 2 deletions src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,11 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,
return;
}

ASSERT(msg->redis);
status = redis_reply(msg);
if (msg->redis) {
status = redis_reply(msg);
} else {
status = memcache_reply(msg);
}
if (status != NC_OK) {
conn->err = errno;
return;
Expand Down
134 changes: 124 additions & 10 deletions src/proto/nc_memcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
#include <nc_core.h>
#include <nc_proto.h>

#define RSP_STRING(ACTION) \
ACTION( mn, "MN\r\n" ) \

#define DEFINE_ACTION(_var, _str) static struct string rsp_##_var = string(_str);
RSP_STRING( DEFINE_ACTION )
#undef DEFINE_ACTION

/*
* From memcache protocol specification:
*
Expand All @@ -32,6 +39,7 @@
*/
#define MEMCACHE_MAX_KEY_LENGTH 250


/*
* Return true, if the memcache command is a storage command, otherwise
* return false
Expand Down Expand Up @@ -79,6 +87,12 @@ memcache_retrieval(struct msg *r)
switch (r->type) {
case MSG_REQ_MC_GET:
case MSG_REQ_MC_GETS:
/*
* NOTE: For convenience, meta-get is also marked as retrieval
* (instead of variable keys in a request, it has variable flag counts)
*/
case MSG_REQ_MC_MG:
case MSG_REQ_MC_ME:
return true;

default:
Expand Down Expand Up @@ -236,7 +250,37 @@ memcache_parse_req(struct msg *r)
r->narg++;

switch (p - m) {
case 2:
/*
* All commands of length 2 are meta-commands
* and start with 'm'.
*/
if (m[0] == 'm') {
if (m[1] == 'g') {
r->type = MSG_REQ_MC_MG;
break;
}

/* TODO: Implement meta-set support, this is likely to change in https://github.com/memcached/memcached/pull/795
if (m[1] == 's') {
r->type = MSG_REQ_MC_MS;
break;
}
*/

if (m[1] == 'n') {
r->type = MSG_REQ_MC_MN;
r->noforward = 1;
break;
}

if (m[1] == 'e') {
r->type = MSG_REQ_MC_ME;
break;
}
}

break;
case 3:
if (str4cmp(m, 'g', 'e', 't', ' ')) {
r->type = MSG_REQ_MC_GET;
Expand Down Expand Up @@ -316,6 +360,14 @@ memcache_parse_req(struct msg *r)
break;
}

if (str7cmp(m, 'v', 'e', 'r', 's', 'i', 'o', 'n')) {
r->type = MSG_REQ_MC_VERSION;
if (!set_placeholder_key(r)) {
goto enomem;
}
break;
}

break;
}

Expand All @@ -332,12 +384,16 @@ memcache_parse_req(struct msg *r)
case MSG_REQ_MC_INCR:
case MSG_REQ_MC_DECR:
case MSG_REQ_MC_TOUCH:

case MSG_REQ_MC_MG: /* New meta commands */
case MSG_REQ_MC_ME:
if (ch == CR) {
goto error;
}
state = SW_SPACES_BEFORE_KEY;
break;

case MSG_REQ_MC_MN: /* no-op, currently takes no flags/args */
case MSG_REQ_MC_VERSION:
case MSG_REQ_MC_QUIT:
p = p - 1; /* go back by 1 byte */
state = SW_CRLF;
Expand Down Expand Up @@ -798,6 +854,7 @@ memcache_parse_rsp(struct msg *r)
SW_RUNTO_CRLF,
SW_CRLF,
SW_ALMOST_DONE, /* 15 */
SW_SPACES_BEFORE_META_VLEN, /* 15 */
SW_SENTINEL
} state;

Expand Down Expand Up @@ -862,6 +919,34 @@ memcache_parse_rsp(struct msg *r)
r->type = MSG_UNKNOWN;

switch (p - m) {
case 2:
if (str2cmp(m, 'O', 'K')) {
r->type = MSG_RSP_MC_OK;
break;
}

if (str2cmp(m, 'V', 'A')) {
r->type = MSG_RSP_MC_VA;
break;
}

if (str2cmp(m, 'E', 'N')) {
r->type = MSG_RSP_MC_EN;
break;
}

if (str2cmp(m, 'M', 'N')) {
r->type = MSG_RSP_MC_MN;
break;
}

if (str2cmp(m, 'M', 'E')) {
r->type = MSG_RSP_MC_ME;
break;
}

break;

case 3:
if (str4cmp(m, 'E', 'N', 'D', '\r')) {
r->type = MSG_RSP_MC_END;
Expand Down Expand Up @@ -913,6 +998,11 @@ memcache_parse_rsp(struct msg *r)
break;
}

if (str7cmp(m, 'V', 'E', 'R', 'S', 'I', 'O', 'N')) {
r->type = MSG_RSP_MC_VERSION;
break;
}

break;

case 9:
Expand Down Expand Up @@ -955,27 +1045,32 @@ memcache_parse_rsp(struct msg *r)
case MSG_RSP_MC_NOT_FOUND:
case MSG_RSP_MC_DELETED:
case MSG_RSP_MC_TOUCHED:
state = SW_CRLF;
break;

case MSG_RSP_MC_END:
case MSG_RSP_MC_ERROR:
case MSG_RSP_MC_MN:
case MSG_RSP_MC_EN:
state = SW_CRLF;
break;

case MSG_RSP_MC_VALUE:
state = SW_SPACES_BEFORE_KEY;
break;

case MSG_RSP_MC_ERROR:
state = SW_CRLF;
case MSG_RSP_MC_VA:
r->vlen = 0;
state = SW_SPACES_BEFORE_VLEN;
break;

case MSG_RSP_MC_CLIENT_ERROR:
case MSG_RSP_MC_SERVER_ERROR:
case MSG_RSP_MC_VERSION:
case MSG_RSP_MC_OK:
case MSG_RSP_MC_ME:
state = SW_RUNTO_CRLF;
break;

default:
fprintf(stderr, "DEBUG NOT REACHED state=%d\n", (int)r->type);
NOT_REACHED();
}

Expand Down Expand Up @@ -1096,7 +1191,12 @@ memcache_parse_rsp(struct msg *r)
switch (ch) {
case LF:
/* state = SW_END; */
state = SW_RSP_STR;
if (r->type == MSG_RSP_MC_VA) {
/* Meta-commands just have a value without "END\r\n". */
state = SW_START;
} else {
state = SW_RSP_STR;
}
break;

default:
Expand Down Expand Up @@ -1135,7 +1235,7 @@ memcache_parse_rsp(struct msg *r)
case SW_RUNTO_CRLF:
switch (ch) {
case CR:
if (r->type == MSG_RSP_MC_VALUE) {
if (r->type == MSG_RSP_MC_VALUE || r->type == MSG_RSP_MC_VA) {
state = SW_RUNTO_VAL;
} else {
state = SW_ALMOST_DONE;
Expand Down Expand Up @@ -1244,6 +1344,10 @@ memcache_append_key(struct msg *r, uint8_t *key, uint32_t keylen)
return NC_ENOMEM;
}

/*
* TODO probably need to support meta protocol 'b' flag for sharding
* (interpret key as base64 encoded binary value)
*/
kpos = array_push(r->keys);
if (kpos == NULL) {
return NC_ENOMEM;
Expand Down Expand Up @@ -1632,7 +1736,17 @@ memcache_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn)
rstatus_t
memcache_reply(struct msg *r)
{
NOT_REACHED();
return NC_OK;
struct msg *response = r->peer;

ASSERT(response != NULL && response->owner != NULL);

switch (r->type) {
case MSG_REQ_MC_MN:
return msg_append(response, rsp_mn.data, rsp_mn.len);

default:
NOT_REACHED();
return NC_OK;
}
}

7 changes: 7 additions & 0 deletions src/proto/nc_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

#ifdef NC_LITTLE_ENDIAN

#define str2cmp(m, c0, c1) \
(*(uint16_t *) m == ((c1 << 8) | c0))

#define str4cmp(m, c0, c1, c2, c3) \
(*(uint32_t *) m == ((c3 << 24) | (c2 << 16) | (c1 << 8) | c0))

Expand Down Expand Up @@ -55,6 +58,10 @@

#else

/* TODO: Could speed up a tiny bit with bswap in modern compilers */
#define str2cmp(m, c0, c1) \
(m[0] == c0 && m[1] == c1)

#define str4cmp(m, c0, c1, c2, c3) \
(m[0] == c0 && m[1] == c1 && m[2] == c2 && m[3] == c3)

Expand Down
15 changes: 0 additions & 15 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,21 +409,6 @@ redis_error(struct msg *r)
return false;
}

// Set a placeholder key for a command with no key that is forwarded to an arbitrary backend.
static bool
set_placeholder_key(struct msg *r)
{
struct keypos *kpos;
ASSERT(array_n(r->keys) == 0);
kpos = array_push(r->keys);
if (kpos == NULL) {
return false;
}
kpos->start = (uint8_t *)"placeholder";
kpos->end = kpos->start + sizeof("placeholder") - 1;
return true;
}

/*
* Reference: http://redis.io/topics/protocol
*
Expand Down
Loading