Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ An array of alternating key-value pairs as follows:
1. **MINBATCHSIZE**: The minimum size of any batch of incoming requests.
1. **INPUTS**: array reply with one or more names of the model's input nodes (applicable only for TensorFlow models)
1. **OUTPUTS**: array reply with one or more names of the model's output nodes (applicable only for TensorFlow models)
1. **BLOB**: a blob containing the serialized model (when called with the `BLOB` argument) as a String
1. **BLOB**: a blob containing the serialized model (when called with the `BLOB` argument) as a String. If the size of the serialized model exceeds `MODEL_CHUNK_SIZE` (see `AI.CONFIG` command), then an array of chunks is returned. The full serialized model can be obtained by concatenating the chunks.

**Examples**

Expand Down Expand Up @@ -719,6 +719,7 @@ _Arguments_
* **TFLITE**: The TensorFlow Lite backend
* **TORCH**: The PyTorch backend
* **ONNX**: ONNXRuntime backend
* **MODEL_CHUNK_SIZE**: Sets the size of chunks (in bytes) in which model payloads are split for serialization, replication and `MODELGET`. Default is `511 * 1024 * 1024`.

_Return_

Expand Down Expand Up @@ -746,3 +747,10 @@ This loads the PyTorch backend with a full path:
redis> AI.CONFIG LOADBACKEND TORCH /usr/lib/redis/modules/redisai/backends/redisai_torch/redisai_torch.so
OK
```

This sets model chunk size to one megabyte (not recommended):

```
redis> AI.CONFIG MODEL_CHUNK_SIZE 1048576
OK
```
81 changes: 61 additions & 20 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ long long backends_intra_op_parallelism; // number of threads used within an
long long
backends_inter_op_parallelism; // number of threads used for parallelism
// between independent operations.
long long model_chunk_size; // size of chunks used to break up model payloads.

/**
*
Expand Down Expand Up @@ -69,6 +70,30 @@ int setBackendsIntraOpParallelism(long long num_threads) {
return result;
}

/**
* @return size of chunks (in bytes) in which models are split for
* set, get, serialization and replication.
*/
long long getModelChunkSize() {
return model_chunk_size;
}

/**
* Set size of chunks (in bytes) in which models are split for set,
* get, serialization and replication.
*
* @param size
* @return 0 on success, or 1 if failed
*/
int setModelChunkSize(long long size) {
int result = 1;
if (size > 0) {
model_chunk_size = size;
result = 0;
}
return result;
}

/**
* Helper method for AI.CONFIG LOADBACKEND <backend_identifier>
* <location_of_backend_library>
Expand Down Expand Up @@ -175,6 +200,26 @@ int RedisAI_Config_IntraOperationParallelism(
return result;
}

/**
* Set size of chunks in which model payloads are split for set,
* get, serialization and replication.
*
* @param chunk_size_string string containing chunk size (in bytes)
* @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed
*/
int RedisAI_Config_ModelChunkSize(RedisModuleString *chunk_size_string) {
long long temp;
int result = RedisModule_StringToLongLong(chunk_size_string, &temp);
// make sure chunk size is a positive integer
// if not set the value to the default
if (result == REDISMODULE_OK && temp < 1) {
temp = REDISAI_DEFAULT_MODEL_CHUNK_SIZE;
result = REDISMODULE_ERR;
}
result = setModelChunkSize(temp);
return result;
}

/**
*
* @param ctx Context in which Redis modules operate
Expand All @@ -199,34 +244,30 @@ int RAI_configParamParse(RedisModuleCtx *ctx, const char *key,
else if (strcasecmp((key), "THREADS_PER_QUEUE") == 0) {
ret = RedisAI_Config_QueueThreads(rsval);
if (ret == REDISMODULE_OK) {
char *buffer = RedisModule_Alloc(
(3 + strlen(REDISAI_INFOMSG_THREADS_PER_QUEUE) + strlen((val))) *
sizeof(*buffer));
sprintf(buffer, "%s: %s", REDISAI_INFOMSG_THREADS_PER_QUEUE, (val));
RedisModule_Log(ctx, "notice", buffer);
RedisModule_Free(buffer);
RedisModule_Log(ctx, "notice", "%s: %s",
REDISAI_INFOMSG_THREADS_PER_QUEUE,
(val));
}
} else if (strcasecmp((key), "INTRA_OP_PARALLELISM") == 0) {
ret = RedisAI_Config_IntraOperationParallelism(rsval);
if (ret == REDISMODULE_OK) {
char *buffer = RedisModule_Alloc(
(3 + strlen(REDISAI_INFOMSG_INTRA_OP_PARALLELISM) + strlen((val))) *
sizeof(*buffer));
sprintf(buffer, "%s: %lld", REDISAI_INFOMSG_INTRA_OP_PARALLELISM,
getBackendsIntraOpParallelism());
RedisModule_Log(ctx, "notice", buffer);
RedisModule_Free(buffer);
RedisModule_Log(ctx, "notice", "%s: %lld",
REDISAI_INFOMSG_INTRA_OP_PARALLELISM,
getBackendsIntraOpParallelism());
}
} else if (strcasecmp((key), "INTER_OP_PARALLELISM") == 0) {
ret = RedisAI_Config_InterOperationParallelism(rsval);
if (ret == REDISMODULE_OK) {
char *buffer = RedisModule_Alloc(
(3 + strlen(REDISAI_INFOMSG_INTER_OP_PARALLELISM) + strlen((val))) *
sizeof(*buffer));
sprintf(buffer, "%s: %lld", REDISAI_INFOMSG_INTER_OP_PARALLELISM,
getBackendsInterOpParallelism());
RedisModule_Log(ctx, "notice", buffer);
RedisModule_Free(buffer);
RedisModule_Log(ctx, "notice", "%s: %lld",
REDISAI_INFOMSG_INTER_OP_PARALLELISM,
getBackendsInterOpParallelism());
}
} else if (strcasecmp((key), "MODEL_CHUNK_SIZE") == 0) {
ret = RedisAI_Config_ModelChunkSize(rsval);
if (ret == REDISMODULE_OK) {
RedisModule_Log(ctx, "notice", "%s: %lld",
REDISAI_INFOMSG_MODEL_CHUNK_SIZE,
getModelChunkSize());
}
} else if (strcasecmp((key), "BACKENDSPATH") == 0) {
// already taken care of
Expand Down
28 changes: 28 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef enum { RAI_DEVICE_CPU = 0, RAI_DEVICE_GPU = 1 } RAI_Device;
#define REDISAI_DEFAULT_THREADS_PER_QUEUE 1
#define REDISAI_DEFAULT_INTRA_OP_PARALLELISM 0
#define REDISAI_DEFAULT_INTER_OP_PARALLELISM 0
#define REDISAI_DEFAULT_MODEL_CHUNK_SIZE 535822336 // (511 * 1024 * 1024)
#define REDISAI_ERRORMSG_PROCESSING_ARG "ERR error processing argument"
#define REDISAI_ERRORMSG_THREADS_PER_QUEUE \
"ERR error setting THREADS_PER_QUEUE to"
Expand All @@ -37,6 +38,8 @@ typedef enum { RAI_DEVICE_CPU = 0, RAI_DEVICE_GPU = 1 } RAI_Device;
"Setting INTRA_OP_PARALLELISM parameter to"
#define REDISAI_INFOMSG_INTER_OP_PARALLELISM \
"Setting INTER_OP_PARALLELISM parameter to"
#define REDISAI_INFOMSG_MODEL_CHUNK_SIZE \
"Setting MODEL_CHUNK_SIZE parameter to"

/**
* Get number of threads used for parallelism between independent operations, by
Expand Down Expand Up @@ -72,6 +75,21 @@ long long getBackendsIntraOpParallelism();
*/
int setBackendsIntraOpParallelism(long long num_threads);

/**
* @return size of chunks (in bytes) in which models are split for
* set, get, serialization and replication.
*/
long long getModelChunkSize();

/**
* Set size of chunks (in bytes) in which models are split for set,
* get, serialization and replication.
*
* @param size
* @return 0 on success, or 1 if failed
*/
int setModelChunkSize(long long size);

/**
* Helper method for AI.CONFIG LOADBACKEND <backend_identifier>
* <location_of_backend_library>
Expand Down Expand Up @@ -123,6 +141,16 @@ int RedisAI_Config_InterOperationParallelism(
int RedisAI_Config_IntraOperationParallelism(
RedisModuleString *num_threads_string);

/**
* Set size of chunks in which model payloads are split for set,
* get, serialization and replication.
*
* @param chunk_size_string string containing chunk size (in bytes)
* @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed
*/
int RedisAI_Config_ModelChunkSize(
RedisModuleString *chunk_size_string);

/**
*
* @param ctx Context in which Redis modules operate
Expand Down
57 changes: 46 additions & 11 deletions src/model.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,24 @@ static void* RAI_Model_RdbLoad(struct RedisModuleIO *io, int encver) {
};

size_t len;
char *buffer = NULL;

char *buffer = RedisModule_LoadStringBuffer(io, &len);
if (encver <= 100) {
buffer = RedisModule_LoadStringBuffer(io, &len);
}
else {
len = RedisModule_LoadUnsigned(io);
buffer = RedisModule_Alloc(len);
const size_t n_chunks = RedisModule_LoadUnsigned(io);
long long chunk_offset = 0;
for (size_t i=0; i<n_chunks; i++) {
size_t chunk_len;
char *chunk_buffer = RedisModule_LoadStringBuffer(io, &chunk_len);
memcpy(buffer + chunk_offset, chunk_buffer, chunk_len);
chunk_offset += chunk_len;
RedisModule_Free(chunk_buffer);
}
}

RAI_Error err = {0};

Expand Down Expand Up @@ -136,7 +152,14 @@ static void RAI_Model_RdbSave(RedisModuleIO *io, void *value) {
for (size_t i=0; i<model->noutputs; i++) {
RedisModule_SaveStringBuffer(io, model->outputs[i], strlen(model->outputs[i]) + 1);
}
RedisModule_SaveStringBuffer(io, buffer, len);
long long chunk_size = getModelChunkSize();
const size_t n_chunks = len / chunk_size + 1;
RedisModule_SaveUnsigned(io, len);
RedisModule_SaveUnsigned(io, n_chunks);
for (size_t i=0; i<n_chunks; i++) {
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
RedisModule_SaveStringBuffer(io, buffer + i * chunk_size, chunk_len);
}

if (buffer) {
RedisModule_Free(buffer);
Expand Down Expand Up @@ -177,32 +200,44 @@ static void RAI_Model_AofRewrite(RedisModuleIO *aof, RedisModuleString *key, voi
array_append(outputs_, RedisModule_CreateString(ctx, model->outputs[i], strlen(model->outputs[i])));
}

long long chunk_size = getModelChunkSize();
const size_t n_chunks = len / chunk_size + 1;
RedisModuleString **buffers_ = array_new(RedisModuleString*, n_chunks);

for (size_t i=0; i<n_chunks; i++) {
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
array_append(buffers_, RedisModule_CreateString(ctx, buffer + i * chunk_size, chunk_len));
}

if (buffer) {
RedisModule_Free(buffer);
}

const char* backendstr = RAI_BackendName(model->backend);

RedisModule_EmitAOF(aof, "AI.MODELSET", "slccclclcvcvb",
RedisModule_EmitAOF(aof, "AI.MODELSET", "slccclclcvcvcv",
key,
backendstr, model->devicestr, model->tag,
"BATCHSIZE", model->opts.batchsize,
"MINBATCHSIZE", model->opts.minbatchsize,
"INPUTS", inputs_, model->ninputs,
"OUTPUTS", outputs_, model->noutputs,
buffer, len);

if (buffer) {
RedisModule_Free(buffer);
}
"BLOB", buffers_, n_chunks);

for (size_t i=0; i<model->ninputs; i++) {
RedisModule_FreeString(ctx, inputs_[i]);
}

array_free(inputs_);

for (size_t i=0; i<model->noutputs; i++) {
RedisModule_FreeString(ctx, outputs_[i]);
}

array_free(outputs_);

for (size_t i=0; i<n_chunks; i++) {
RedisModule_FreeString(ctx, buffers_[i]);
}
array_free(buffers_);
}


Expand Down Expand Up @@ -248,7 +283,7 @@ int RAI_ModelInit(RedisModuleCtx* ctx) {
.digest = NULL
};

RedisAI_ModelType = RedisModule_CreateDataType(ctx, "AI__MODEL", 0, &tmModel);
RedisAI_ModelType = RedisModule_CreateDataType(ctx, "AI__MODEL", RAI_ENC_VER_MM, &tmModel);
return RedisAI_ModelType != NULL;
}

Expand Down
34 changes: 31 additions & 3 deletions src/redisai.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,21 @@ int RedisAI_ModelSet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
return REDISMODULE_OK;
}

void RAI_ReplyWithChunks(RedisModuleCtx *ctx, const char* buffer, long long len) {
long long chunk_size = getModelChunkSize();
const size_t n_chunks = len / chunk_size + 1;
if (n_chunks > 1) {
RedisModule_ReplyWithArray(ctx, (long)n_chunks);
for (size_t i=0; i<n_chunks; i++) {
size_t chunk_len = i < n_chunks - 1 ? chunk_size : len % chunk_size;
RedisModule_ReplyWithStringBuffer(ctx, buffer + i * chunk_size, chunk_len);
}
}
else {
RedisModule_ReplyWithStringBuffer(ctx, buffer, len);
}
}

/**
* AI.MODELGET model_key [META] [BLOB]
*/
Expand Down Expand Up @@ -412,7 +427,7 @@ int RedisAI_ModelGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
}

if (!meta && blob) {
RedisModule_ReplyWithStringBuffer(ctx, buffer, len);
RAI_ReplyWithChunks(ctx, buffer, len);
RedisModule_Free(buffer);
RedisModule_CloseKey(key);
return REDISMODULE_OK;
Expand Down Expand Up @@ -456,7 +471,7 @@ int RedisAI_ModelGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,

if (meta && blob) {
RedisModule_ReplyWithCString(ctx, "blob");
RedisModule_ReplyWithStringBuffer(ctx, buffer, len);
RAI_ReplyWithChunks(ctx, buffer, len);
RedisModule_Free(buffer);
}

Expand Down Expand Up @@ -874,7 +889,9 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
}

/**
* AI.CONFIG [BACKENDSPATH <default_location_of_backend_libraries> | LOADBACKEND <backend_identifier> <location_of_backend_library>]
* AI.CONFIG [BACKENDSPATH <default_location_of_backend_libraries> |
LOADBACKEND <backend_identifier> <location_of_backend_library> |
CHUNKLEN <len>]
*/
int RedisAI_Config_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 2) return RedisModule_WrongArity(ctx);
Expand All @@ -894,6 +911,16 @@ int RedisAI_Config_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i
}
}

if (!strcasecmp(subcommand, "MODEL_CHUNK_SIZE")) {
if (argc > 2) {
RedisAI_Config_ModelChunkSize(argv[2]);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
} else {
return RedisModule_ReplyWithError(
ctx, "ERR MODEL_CHUNK_SIZE: missing chunk size");
}
}

return RedisModule_ReplyWithError(ctx, "ERR unsupported subcommand");
}

Expand Down Expand Up @@ -1111,6 +1138,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
perqueueThreadPoolSize = REDISAI_DEFAULT_THREADS_PER_QUEUE;
setBackendsInterOpParallelism(REDISAI_DEFAULT_INTER_OP_PARALLELISM);
setBackendsIntraOpParallelism(REDISAI_DEFAULT_INTRA_OP_PARALLELISM);
setModelChunkSize(REDISAI_DEFAULT_MODEL_CHUNK_SIZE);

RAI_loadTimeConfig(ctx,argv,argc);

Expand Down
Loading