Skip to content

Commit 52dd0e0

Browse files
committed
ubus: add support for channels
A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. A channel can either be opened by fd using ubus.open_channel(), or created from within a request by using req.new_channel(). When calling req.new_channel, the fd for the other side of the channel is automatically passed to the remote caller. Signed-off-by: Felix Fietkau <[email protected]>
1 parent eab7201 commit 52dd0e0

File tree

2 files changed

+231
-27
lines changed

2 files changed

+231
-27
lines changed

Diff for: CMakeLists.txt

+5-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ if(UBUS_SUPPORT)
172172
set_target_properties(ubus_lib PROPERTIES OUTPUT_NAME ubus PREFIX "")
173173
target_link_options(ubus_lib PRIVATE ${UCODE_MODULE_LINK_OPTIONS})
174174
target_link_libraries(ubus_lib ${libubus} ${libblobmsg_json})
175-
list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox})
175+
list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox} ${libubus})
176176
file(WRITE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c" "
177177
#include <libubus.h>
178178
int main() { return UBUS_STATUS_NO_MEMORY; }
@@ -182,9 +182,13 @@ if(UBUS_SUPPORT)
182182
"${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c")
183183
check_symbol_exists(uloop_fd_set_cb "libubox/uloop.h" FD_SET_CB_EXISTS)
184184
check_function_exists(uloop_timeout_remaining64 REMAINING64_FUNCTION_EXISTS)
185+
check_function_exists(ubus_channel_connect HAVE_CHANNEL_SUPPORT)
185186
if(REMAINING64_FUNCTION_EXISTS)
186187
target_compile_definitions(ubus_lib PUBLIC HAVE_ULOOP_TIMEOUT_REMAINING64)
187188
endif()
189+
if(HAVE_CHANNEL_SUPPORT)
190+
target_compile_definitions(ubus_lib PUBLIC HAVE_UBUS_CHANNEL_SUPPORT)
191+
endif()
188192
if(HAVE_NEW_UBUS_STATUS_CODES)
189193
add_definitions(-DHAVE_NEW_UBUS_STATUS_CODES)
190194
endif()

Diff for: lib/ubus.c

+226-26
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ static uc_resource_type_t *notify_type;
130130
static uc_resource_type_t *object_type;
131131
static uc_resource_type_t *defer_type;
132132
static uc_resource_type_t *conn_type;
133+
static uc_resource_type_t *chan_type;
133134

134135
static uint64_t n_cb_active;
135136
static bool have_own_uloop;
@@ -140,6 +141,9 @@ typedef struct {
140141
struct ubus_context ctx;
141142
struct blob_buf buf;
142143
int timeout;
144+
145+
uc_vm_t *vm;
146+
int registry_index;
143147
} uc_ubus_connection_t;
144148

145149
typedef struct {
@@ -292,6 +296,16 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs)
292296
}
293297

294298

299+
#define connection_reg_add(vm, conn, cb, disconnect_cb) \
300+
_uc_reg_add(vm, "ubus.connections", 3, conn, cb, disconnect_cb)
301+
302+
#define connection_reg_get(vm, idx, conn, cb, disconnect_cb) \
303+
_uc_reg_get(vm, "ubus.connections", idx, 3, conn, cb, disconnect_cb)
304+
305+
#define connection_reg_clear(vm, idx) \
306+
_uc_reg_clear(vm, "ubus.connections", idx, 3)
307+
308+
295309
#define request_reg_add(vm, request, cb, fdcb, conn, fd) \
296310
_uc_reg_add(vm, "ubus.requests", 5, request, cb, fdcb, conn, fd)
297311

@@ -512,6 +526,7 @@ uc_ubus_connect(uc_vm_t *vm, size_t nargs)
512526
"timeout", UC_INTEGER, true, &timeout);
513527

514528
c = xalloc(sizeof(*c));
529+
c->registry_index = -1;
515530
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
516531

517532
if (ubus_connect_ctx(&c->ctx, socket ? ucv_string_get(socket) : NULL)) {
@@ -555,6 +570,8 @@ _conn_get(uc_vm_t *vm, uc_ubus_connection_t **conn)
555570
{
556571
uc_ubus_connection_t *c = uc_fn_thisval("ubus.connection");
557572

573+
if (!c)
574+
c = uc_fn_thisval("ubus.channel");
558575
if (!c)
559576
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid connection context");
560577

@@ -744,51 +761,36 @@ get_fd(uc_vm_t *vm, uc_value_t *val)
744761
return (int)n;
745762
}
746763

747-
static uc_value_t *
748-
uc_ubus_call(uc_vm_t *vm, size_t nargs)
764+
static int
765+
uc_ubus_call_common(uc_vm_t *vm, uc_ubus_connection_t *c, uc_ubus_call_res_t *res,
766+
uint32_t id, uc_value_t *funname, uc_value_t *funargs,
767+
uc_value_t *fd, uc_value_t *fdcb, uc_value_t *mret)
749768
{
750-
uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
751-
uc_ubus_call_res_t res = { 0 };
752769
uc_ubus_deferred_t defer = {};
753-
uc_ubus_connection_t *c;
754770
enum ubus_msg_status rv;
755771
int fd_val = -1;
756-
uint32_t id;
757-
758-
conn_get(vm, &c);
759-
760-
args_get_named(vm, nargs,
761-
"object", UC_STRING, REQUIRED, &objname,
762-
"method", UC_STRING, REQUIRED, &funname,
763-
"data", UC_OBJECT, OPTIONAL, &funargs,
764-
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
765-
"fd", 0, NAMED, &fd,
766-
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
767772

768773
blob_buf_init(&c->buf, 0);
769774

770775
if (funargs)
771776
ucv_object_to_blob(funargs, &c->buf);
772777
if (fd) {
773778
fd_val = get_fd(vm, fd);
774-
if (fd_val < 0)
775-
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument");
779+
if (fd_val < 0) {
780+
rv = UBUS_STATUS_INVALID_ARGUMENT;
781+
set_error(rv, "Invalid file descriptor argument");
782+
return rv;
783+
}
776784
}
777785

778-
rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
779-
780-
if (rv != UBUS_STATUS_OK)
781-
err_return(rv, "Failed to resolve object name '%s'",
782-
ucv_string_get(objname));
783-
784-
res.mret = ucv_is_truish(mret);
786+
res->mret = ucv_is_truish(mret);
785787

786788
rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname),
787789
c->buf.head, &defer.request, fd_val);
788790
defer.vm = vm;
789791
defer.ctx = &c->ctx;
790792
defer.request.data_cb = uc_ubus_call_cb;
791-
defer.request.priv = &res;
793+
defer.request.priv = res;
792794
if (ucv_is_callable(fdcb)) {
793795
defer.request.fd_cb = uc_ubus_call_fd_cb;
794796
defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL, NULL);
@@ -800,13 +802,66 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
800802
if (defer.request.fd_cb)
801803
request_reg_clear(vm, defer.registry_index);
802804

805+
return rv;
806+
}
807+
808+
static uc_value_t *
809+
uc_ubus_call(uc_vm_t *vm, size_t nargs)
810+
{
811+
uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
812+
uc_ubus_call_res_t res = { 0 };
813+
uc_ubus_connection_t *c;
814+
enum ubus_msg_status rv;
815+
uint32_t id;
816+
817+
args_get_named(vm, nargs,
818+
"object", UC_STRING, REQUIRED, &objname,
819+
"method", UC_STRING, REQUIRED, &funname,
820+
"data", UC_OBJECT, OPTIONAL, &funargs,
821+
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
822+
"fd", 0, NAMED, &fd,
823+
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
824+
825+
conn_get(vm, &c);
826+
827+
rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
828+
if (rv != UBUS_STATUS_OK)
829+
err_return(rv, "Failed to resolve object name '%s'",
830+
ucv_string_get(objname));
831+
832+
rv = uc_ubus_call_common(vm, c, &res, id, funname, funargs, fd, fdcb, mret);
803833
if (rv != UBUS_STATUS_OK)
804834
err_return(rv, "Failed to invoke function '%s' on object '%s'",
805835
ucv_string_get(funname), ucv_string_get(objname));
806836

807837
ok_return(res.res);
808838
}
809839

840+
static uc_value_t *
841+
uc_ubus_chan_request(uc_vm_t *vm, size_t nargs)
842+
{
843+
uc_value_t *funname, *funargs, *fd, *fdcb, *mret = NULL;
844+
uc_ubus_call_res_t res = { 0 };
845+
uc_ubus_connection_t *c;
846+
enum ubus_msg_status rv;
847+
848+
args_get_named(vm, nargs,
849+
"method", UC_STRING, REQUIRED, &funname,
850+
"data", UC_OBJECT, OPTIONAL, &funargs,
851+
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
852+
"fd", 0, NAMED, &fd,
853+
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
854+
855+
conn_get(vm, &c);
856+
857+
rv = uc_ubus_call_common(vm, c, &res, 0, funname, funargs, fd, fdcb, mret);
858+
if (rv != UBUS_STATUS_OK)
859+
err_return(rv, "Failed to send request '%s' on channel",
860+
ucv_string_get(funname));
861+
862+
ok_return(res.res);
863+
}
864+
810865
static uc_value_t *
811866
uc_ubus_defer(uc_vm_t *vm, size_t nargs)
812867
{
@@ -2152,10 +2207,145 @@ uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs)
21522207
ok_return(ucv_boolean_new(true));
21532208
}
21542209

2210+
/*
2211+
* channel related methods
2212+
* --------------------------------------------------------------------------
2213+
*/
2214+
2215+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2216+
static int
2217+
uc_ubus_channel_req_cb(struct ubus_context *ctx, struct ubus_object *obj,
2218+
struct ubus_request_data *req, const char *method,
2219+
struct blob_attr *msg)
2220+
{
2221+
uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
2222+
uc_value_t *this, *func, *args, *reqproto;
2223+
2224+
connection_reg_get(c->vm, c->registry_index, &this, &func, NULL);
2225+
if (!ucv_is_callable(func))
2226+
return UBUS_STATUS_METHOD_NOT_FOUND;
2227+
2228+
args = blob_array_to_ucv(c->vm, blob_data(msg), blob_len(msg), true);
2229+
reqproto = ucv_object_new(c->vm);
2230+
ucv_object_add(reqproto, "args", ucv_get(args));
2231+
if (method)
2232+
ucv_object_add(reqproto, "type", ucv_get(ucv_string_new(method)));
2233+
2234+
return uc_ubus_handle_reply_common(ctx, req, c->vm, this, func, reqproto);
2235+
}
2236+
2237+
static void
2238+
uc_ubus_channel_disconnect_cb(struct ubus_context *ctx)
2239+
{
2240+
uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
2241+
uc_value_t *this, *func;
2242+
2243+
connection_reg_get(c->vm, c->registry_index, &this, NULL, &func);
2244+
if (ucv_is_callable(func)) {
2245+
uc_vm_stack_push(c->vm, ucv_get(this));
2246+
uc_vm_stack_push(c->vm, ucv_get(func));
2247+
2248+
if (uc_vm_call(c->vm, true, 0) == EXCEPTION_NONE)
2249+
ucv_put(uc_vm_stack_pop(c->vm));
2250+
else
2251+
uloop_end();
2252+
}
2253+
2254+
blob_buf_free(&c->buf);
2255+
if (c->registry_index >= 0)
2256+
connection_reg_clear(c->vm, c->registry_index);
2257+
if (c->ctx.sock.fd >= 0) {
2258+
ubus_shutdown(&c->ctx);
2259+
c->ctx.sock.fd = -1;
2260+
}
2261+
}
2262+
2263+
static uc_value_t *
2264+
uc_ubus_channel_add(uc_vm_t *vm, uc_ubus_connection_t *c, uc_value_t *cb,
2265+
uc_value_t *disconnect_cb)
2266+
{
2267+
uc_value_t *chan;
2268+
2269+
c->vm = vm;
2270+
if (c->timeout < 0)
2271+
c->timeout = 30;
2272+
2273+
chan = uc_resource_new(chan_type, c);
2274+
c->registry_index = connection_reg_add(vm, ucv_get(chan), ucv_get(cb), ucv_get(disconnect_cb));
2275+
c->ctx.connection_lost = uc_ubus_channel_disconnect_cb;
2276+
ubus_add_uloop(&c->ctx);
2277+
2278+
ok_return(chan);
2279+
}
2280+
#endif
2281+
2282+
static uc_value_t *
2283+
uc_ubus_request_new_channel(uc_vm_t *vm, size_t nargs)
2284+
{
2285+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2286+
uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request");
2287+
uc_value_t *cb, *disconnect_cb, *timeout;
2288+
uc_ubus_connection_t *c;
2289+
int fd;
2290+
2291+
if (!callctx)
2292+
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context");
2293+
2294+
args_get(vm, nargs,
2295+
"cb", UC_CLOSURE, true, &cb,
2296+
"disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
2297+
"timeout", UC_INTEGER, true, &timeout);
2298+
2299+
c = xalloc(sizeof(*c));
2300+
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
2301+
2302+
if (ubus_channel_create(&c->ctx, &fd, cb ? uc_ubus_channel_req_cb : NULL)) {
2303+
free(c);
2304+
err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
2305+
}
2306+
2307+
ubus_request_set_fd(callctx->ctx, &callctx->req, fd);
2308+
2309+
return uc_ubus_channel_add(vm, c, cb, disconnect_cb);
2310+
#else
2311+
err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
2312+
#endif
2313+
}
2314+
2315+
2316+
static uc_value_t *
2317+
uc_ubus_channel_connect(uc_vm_t *vm, size_t nargs)
2318+
{
2319+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2320+
uc_value_t *fd, *cb, *disconnect_cb, *timeout;
2321+
uc_ubus_connection_t *c;
2322+
2323+
args_get(vm, nargs,
2324+
"fd", UC_INTEGER, false, &fd,
2325+
"cb", UC_CLOSURE, true, &cb,
2326+
"disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
2327+
"timeout", UC_INTEGER, true, &timeout);
2328+
2329+
c = xalloc(sizeof(*c));
2330+
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
2331+
2332+
if (ubus_channel_connect(&c->ctx, ucv_int64_get(fd),
2333+
cb ? uc_ubus_channel_req_cb : NULL)) {
2334+
free(c);
2335+
err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
2336+
}
2337+
2338+
return uc_ubus_channel_add(vm, c, cb, disconnect_cb);
2339+
#else
2340+
err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
2341+
#endif
2342+
}
2343+
21552344

21562345
static const uc_function_list_t global_fns[] = {
21572346
{ "error", uc_ubus_error },
21582347
{ "connect", uc_ubus_connect },
2348+
{ "open_channel", uc_ubus_channel_connect },
21592349
};
21602350

21612351
static const uc_function_list_t conn_fns[] = {
@@ -2171,6 +2361,12 @@ static const uc_function_list_t conn_fns[] = {
21712361
{ "disconnect", uc_ubus_disconnect },
21722362
};
21732363

2364+
static const uc_function_list_t chan_fns[] = {
2365+
{ "request", uc_ubus_chan_request },
2366+
{ "error", uc_ubus_error },
2367+
{ "disconnect", uc_ubus_disconnect },
2368+
};
2369+
21742370
static const uc_function_list_t defer_fns[] = {
21752371
{ "await", uc_ubus_defer_await },
21762372
{ "completed", uc_ubus_defer_completed },
@@ -2189,6 +2385,7 @@ static const uc_function_list_t request_fns[] = {
21892385
{ "defer", uc_ubus_request_defer },
21902386
{ "get_fd", uc_ubus_request_get_fd },
21912387
{ "set_fd", uc_ubus_request_set_fd },
2388+
{ "new_channel", uc_ubus_request_new_channel },
21922389
};
21932390

21942391
static const uc_function_list_t notify_fns[] = {
@@ -2213,6 +2410,8 @@ static void free_connection(void *ud) {
22132410

22142411
if (conn->ctx.sock.fd >= 0)
22152412
ubus_shutdown(&conn->ctx);
2413+
if (conn->registry_index >= 0)
2414+
connection_reg_clear(conn->vm, conn->registry_index);
22162415

22172416
free(conn);
22182417
}
@@ -2291,6 +2490,7 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
22912490
#endif
22922491

22932492
conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, free_connection);
2493+
chan_type = uc_type_declare(vm, "ubus.channel", chan_fns, free_connection);
22942494
defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, free_deferred);
22952495
object_type = uc_type_declare(vm, "ubus.object", object_fns, free_object);
22962496
notify_type = uc_type_declare(vm, "ubus.notify", notify_fns, free_notify);

0 commit comments

Comments
 (0)