Skip to content

Commit c0d1654

Browse files
committed
ubus: add support for channels
A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. A channel can either be opened by fd using ubus.open_channel(), or created from within a request by using req.new_channel(). When calling req.new_channel, the fd for the other side of the channel is automatically passed to the remote caller. Signed-off-by: Felix Fietkau <[email protected]>
1 parent 22b9523 commit c0d1654

File tree

2 files changed

+235
-27
lines changed

2 files changed

+235
-27
lines changed

Diff for: CMakeLists.txt

+5-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ if(UBUS_SUPPORT)
172172
set_target_properties(ubus_lib PROPERTIES OUTPUT_NAME ubus PREFIX "")
173173
target_link_options(ubus_lib PRIVATE ${UCODE_MODULE_LINK_OPTIONS})
174174
target_link_libraries(ubus_lib ${libubus} ${libblobmsg_json})
175-
list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox})
175+
list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox} ${libubus})
176176
file(WRITE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c" "
177177
#include <libubus.h>
178178
int main() { return UBUS_STATUS_NO_MEMORY; }
@@ -182,9 +182,13 @@ if(UBUS_SUPPORT)
182182
"${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c")
183183
check_symbol_exists(uloop_fd_set_cb "libubox/uloop.h" FD_SET_CB_EXISTS)
184184
check_function_exists(uloop_timeout_remaining64 REMAINING64_FUNCTION_EXISTS)
185+
check_function_exists(ubus_channel_connect HAVE_CHANNEL_SUPPORT)
185186
if(REMAINING64_FUNCTION_EXISTS)
186187
target_compile_definitions(ubus_lib PUBLIC HAVE_ULOOP_TIMEOUT_REMAINING64)
187188
endif()
189+
if(HAVE_CHANNEL_SUPPORT)
190+
target_compile_definitions(ubus_lib PUBLIC HAVE_UBUS_CHANNEL_SUPPORT)
191+
endif()
188192
if(HAVE_NEW_UBUS_STATUS_CODES)
189193
add_definitions(-DHAVE_NEW_UBUS_STATUS_CODES)
190194
endif()

Diff for: lib/ubus.c

+230-26
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ static uc_resource_type_t *notify_type;
130130
static uc_resource_type_t *object_type;
131131
static uc_resource_type_t *defer_type;
132132
static uc_resource_type_t *conn_type;
133+
static uc_resource_type_t *chan_type;
133134

134135
static uint64_t n_cb_active;
135136
static bool have_own_uloop;
@@ -140,6 +141,9 @@ typedef struct {
140141
struct ubus_context ctx;
141142
struct blob_buf buf;
142143
int timeout;
144+
145+
uc_vm_t *vm;
146+
int registry_index;
143147
} uc_ubus_connection_t;
144148

145149
typedef struct {
@@ -292,6 +296,16 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs)
292296
}
293297

294298

299+
#define connection_reg_add(vm, conn, cb, disconnect_cb, fd) \
300+
_uc_reg_add(vm, "ubus.connections", 4, conn, cb, disconnect_cb, fd)
301+
302+
#define connection_reg_get(vm, idx, conn, cb, disconnect_cb) \
303+
_uc_reg_get(vm, "ubus.connections", idx, 3, conn, cb, disconnect_cb)
304+
305+
#define connection_reg_clear(vm, idx) \
306+
_uc_reg_clear(vm, "ubus.connections", idx, 4)
307+
308+
295309
#define request_reg_add(vm, request, cb, fdcb, conn, fd) \
296310
_uc_reg_add(vm, "ubus.requests", 5, request, cb, fdcb, conn, fd)
297311

@@ -512,6 +526,7 @@ uc_ubus_connect(uc_vm_t *vm, size_t nargs)
512526
"timeout", UC_INTEGER, true, &timeout);
513527

514528
c = xalloc(sizeof(*c));
529+
c->registry_index = -1;
515530
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
516531

517532
if (ubus_connect_ctx(&c->ctx, socket ? ucv_string_get(socket) : NULL)) {
@@ -555,6 +570,8 @@ _conn_get(uc_vm_t *vm, uc_ubus_connection_t **conn)
555570
{
556571
uc_ubus_connection_t *c = uc_fn_thisval("ubus.connection");
557572

573+
if (!c)
574+
c = uc_fn_thisval("ubus.channel");
558575
if (!c)
559576
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid connection context");
560577

@@ -744,51 +761,36 @@ get_fd(uc_vm_t *vm, uc_value_t *val)
744761
return (int)n;
745762
}
746763

747-
static uc_value_t *
748-
uc_ubus_call(uc_vm_t *vm, size_t nargs)
764+
static int
765+
uc_ubus_call_common(uc_vm_t *vm, uc_ubus_connection_t *c, uc_ubus_call_res_t *res,
766+
uint32_t id, uc_value_t *funname, uc_value_t *funargs,
767+
uc_value_t *fd, uc_value_t *fdcb, uc_value_t *mret)
749768
{
750-
uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
751-
uc_ubus_call_res_t res = { 0 };
752769
uc_ubus_deferred_t defer = {};
753-
uc_ubus_connection_t *c;
754770
enum ubus_msg_status rv;
755771
int fd_val = -1;
756-
uint32_t id;
757-
758-
conn_get(vm, &c);
759-
760-
args_get_named(vm, nargs,
761-
"object", UC_STRING, REQUIRED, &objname,
762-
"method", UC_STRING, REQUIRED, &funname,
763-
"data", UC_OBJECT, OPTIONAL, &funargs,
764-
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
765-
"fd", 0, NAMED, &fd,
766-
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
767772

768773
blob_buf_init(&c->buf, 0);
769774

770775
if (funargs)
771776
ucv_object_to_blob(funargs, &c->buf);
772777
if (fd) {
773778
fd_val = get_fd(vm, fd);
774-
if (fd_val < 0)
775-
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument");
779+
if (fd_val < 0) {
780+
rv = UBUS_STATUS_INVALID_ARGUMENT;
781+
set_error(rv, "Invalid file descriptor argument");
782+
return rv;
783+
}
776784
}
777785

778-
rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
779-
780-
if (rv != UBUS_STATUS_OK)
781-
err_return(rv, "Failed to resolve object name '%s'",
782-
ucv_string_get(objname));
783-
784-
res.mret = ucv_is_truish(mret);
786+
res->mret = ucv_is_truish(mret);
785787

786788
rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname),
787789
c->buf.head, &defer.request, fd_val);
788790
defer.vm = vm;
789791
defer.ctx = &c->ctx;
790792
defer.request.data_cb = uc_ubus_call_cb;
791-
defer.request.priv = &res;
793+
defer.request.priv = res;
792794
if (ucv_is_callable(fdcb)) {
793795
defer.request.fd_cb = uc_ubus_call_fd_cb;
794796
defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL, NULL);
@@ -800,13 +802,66 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
800802
if (defer.request.fd_cb)
801803
request_reg_clear(vm, defer.registry_index);
802804

805+
return rv;
806+
}
807+
808+
static uc_value_t *
809+
uc_ubus_call(uc_vm_t *vm, size_t nargs)
810+
{
811+
uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
812+
uc_ubus_call_res_t res = { 0 };
813+
uc_ubus_connection_t *c;
814+
enum ubus_msg_status rv;
815+
uint32_t id;
816+
817+
args_get_named(vm, nargs,
818+
"object", UC_STRING, REQUIRED, &objname,
819+
"method", UC_STRING, REQUIRED, &funname,
820+
"data", UC_OBJECT, OPTIONAL, &funargs,
821+
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
822+
"fd", 0, NAMED, &fd,
823+
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
824+
825+
conn_get(vm, &c);
826+
827+
rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
828+
if (rv != UBUS_STATUS_OK)
829+
err_return(rv, "Failed to resolve object name '%s'",
830+
ucv_string_get(objname));
831+
832+
rv = uc_ubus_call_common(vm, c, &res, id, funname, funargs, fd, fdcb, mret);
803833
if (rv != UBUS_STATUS_OK)
804834
err_return(rv, "Failed to invoke function '%s' on object '%s'",
805835
ucv_string_get(funname), ucv_string_get(objname));
806836

807837
ok_return(res.res);
808838
}
809839

840+
static uc_value_t *
841+
uc_ubus_chan_request(uc_vm_t *vm, size_t nargs)
842+
{
843+
uc_value_t *funname, *funargs, *fd, *fdcb, *mret = NULL;
844+
uc_ubus_call_res_t res = { 0 };
845+
uc_ubus_connection_t *c;
846+
enum ubus_msg_status rv;
847+
848+
args_get_named(vm, nargs,
849+
"method", UC_STRING, REQUIRED, &funname,
850+
"data", UC_OBJECT, OPTIONAL, &funargs,
851+
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
852+
"fd", 0, NAMED, &fd,
853+
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
854+
855+
conn_get(vm, &c);
856+
857+
rv = uc_ubus_call_common(vm, c, &res, 0, funname, funargs, fd, fdcb, mret);
858+
if (rv != UBUS_STATUS_OK)
859+
err_return(rv, "Failed to send request '%s' on channel",
860+
ucv_string_get(funname));
861+
862+
ok_return(res.res);
863+
}
864+
810865
static uc_value_t *
811866
uc_ubus_defer(uc_vm_t *vm, size_t nargs)
812867
{
@@ -2153,10 +2208,149 @@ uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs)
21532208
ok_return(ucv_boolean_new(true));
21542209
}
21552210

2211+
/*
2212+
* channel related methods
2213+
* --------------------------------------------------------------------------
2214+
*/
2215+
2216+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2217+
static int
2218+
uc_ubus_channel_req_cb(struct ubus_context *ctx, struct ubus_object *obj,
2219+
struct ubus_request_data *req, const char *method,
2220+
struct blob_attr *msg)
2221+
{
2222+
uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
2223+
uc_value_t *this, *func, *args, *reqproto;
2224+
2225+
connection_reg_get(c->vm, c->registry_index, &this, &func, NULL);
2226+
if (!ucv_is_callable(func))
2227+
return UBUS_STATUS_METHOD_NOT_FOUND;
2228+
2229+
args = blob_array_to_ucv(c->vm, blob_data(msg), blob_len(msg), true);
2230+
reqproto = ucv_object_new(c->vm);
2231+
ucv_object_add(reqproto, "args", ucv_get(args));
2232+
if (method)
2233+
ucv_object_add(reqproto, "type", ucv_get(ucv_string_new(method)));
2234+
2235+
return uc_ubus_handle_reply_common(ctx, req, c->vm, this, func, reqproto);
2236+
}
2237+
2238+
static void
2239+
uc_ubus_channel_disconnect_cb(struct ubus_context *ctx)
2240+
{
2241+
uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
2242+
uc_value_t *this, *func;
2243+
2244+
connection_reg_get(c->vm, c->registry_index, &this, NULL, &func);
2245+
if (ucv_is_callable(func)) {
2246+
uc_vm_stack_push(c->vm, ucv_get(this));
2247+
uc_vm_stack_push(c->vm, ucv_get(func));
2248+
2249+
if (uc_vm_call(c->vm, true, 0) == EXCEPTION_NONE)
2250+
ucv_put(uc_vm_stack_pop(c->vm));
2251+
else
2252+
uloop_end();
2253+
}
2254+
2255+
blob_buf_free(&c->buf);
2256+
if (c->registry_index >= 0)
2257+
connection_reg_clear(c->vm, c->registry_index);
2258+
if (c->ctx.sock.fd >= 0) {
2259+
ubus_shutdown(&c->ctx);
2260+
c->ctx.sock.fd = -1;
2261+
}
2262+
}
2263+
2264+
static uc_value_t *
2265+
uc_ubus_channel_add(uc_vm_t *vm, uc_ubus_connection_t *c, uc_value_t *cb,
2266+
uc_value_t *disconnect_cb, uc_value_t *fd)
2267+
{
2268+
uc_value_t *chan;
2269+
2270+
c->vm = vm;
2271+
if (c->timeout < 0)
2272+
c->timeout = 30;
2273+
2274+
chan = uc_resource_new(chan_type, c);
2275+
c->registry_index = connection_reg_add(vm, ucv_get(chan), ucv_get(cb), ucv_get(disconnect_cb), ucv_get(fd));
2276+
c->ctx.connection_lost = uc_ubus_channel_disconnect_cb;
2277+
ubus_add_uloop(&c->ctx);
2278+
2279+
ok_return(chan);
2280+
}
2281+
#endif
2282+
2283+
static uc_value_t *
2284+
uc_ubus_request_new_channel(uc_vm_t *vm, size_t nargs)
2285+
{
2286+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2287+
uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request");
2288+
uc_value_t *cb, *disconnect_cb, *timeout;
2289+
uc_ubus_connection_t *c;
2290+
int fd;
2291+
2292+
if (!callctx)
2293+
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context");
2294+
2295+
args_get(vm, nargs,
2296+
"cb", UC_CLOSURE, true, &cb,
2297+
"disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
2298+
"timeout", UC_INTEGER, true, &timeout);
2299+
2300+
c = xalloc(sizeof(*c));
2301+
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
2302+
2303+
if (ubus_channel_create(&c->ctx, &fd, cb ? uc_ubus_channel_req_cb : NULL)) {
2304+
free(c);
2305+
err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
2306+
}
2307+
2308+
ubus_request_set_fd(callctx->ctx, &callctx->req, fd);
2309+
2310+
return uc_ubus_channel_add(vm, c, cb, disconnect_cb, NULL);
2311+
#else
2312+
err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
2313+
#endif
2314+
}
2315+
2316+
2317+
static uc_value_t *
2318+
uc_ubus_channel_connect(uc_vm_t *vm, size_t nargs)
2319+
{
2320+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2321+
uc_value_t *fd, *cb, *disconnect_cb, *timeout;
2322+
uc_ubus_connection_t *c;
2323+
int fd_val;
2324+
2325+
args_get(vm, nargs,
2326+
"fd", UC_NULL, false, &fd,
2327+
"cb", UC_CLOSURE, true, &cb,
2328+
"disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
2329+
"timeout", UC_INTEGER, true, &timeout);
2330+
2331+
fd_val = get_fd(vm, fd);
2332+
if (fd_val < 0)
2333+
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid file descriptor argument");
2334+
2335+
c = xalloc(sizeof(*c));
2336+
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
2337+
2338+
if (ubus_channel_connect(&c->ctx, fd_val, cb ? uc_ubus_channel_req_cb : NULL)) {
2339+
free(c);
2340+
err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
2341+
}
2342+
2343+
return uc_ubus_channel_add(vm, c, cb, disconnect_cb, fd);
2344+
#else
2345+
err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
2346+
#endif
2347+
}
2348+
21562349

21572350
static const uc_function_list_t global_fns[] = {
21582351
{ "error", uc_ubus_error },
21592352
{ "connect", uc_ubus_connect },
2353+
{ "open_channel", uc_ubus_channel_connect },
21602354
};
21612355

21622356
static const uc_function_list_t conn_fns[] = {
@@ -2172,6 +2366,12 @@ static const uc_function_list_t conn_fns[] = {
21722366
{ "disconnect", uc_ubus_disconnect },
21732367
};
21742368

2369+
static const uc_function_list_t chan_fns[] = {
2370+
{ "request", uc_ubus_chan_request },
2371+
{ "error", uc_ubus_error },
2372+
{ "disconnect", uc_ubus_disconnect },
2373+
};
2374+
21752375
static const uc_function_list_t defer_fns[] = {
21762376
{ "await", uc_ubus_defer_await },
21772377
{ "completed", uc_ubus_defer_completed },
@@ -2190,6 +2390,7 @@ static const uc_function_list_t request_fns[] = {
21902390
{ "defer", uc_ubus_request_defer },
21912391
{ "get_fd", uc_ubus_request_get_fd },
21922392
{ "set_fd", uc_ubus_request_set_fd },
2393+
{ "new_channel", uc_ubus_request_new_channel },
21932394
};
21942395

21952396
static const uc_function_list_t notify_fns[] = {
@@ -2214,6 +2415,8 @@ static void free_connection(void *ud) {
22142415

22152416
if (conn->ctx.sock.fd >= 0)
22162417
ubus_shutdown(&conn->ctx);
2418+
if (conn->registry_index >= 0)
2419+
connection_reg_clear(conn->vm, conn->registry_index);
22172420

22182421
free(conn);
22192422
}
@@ -2292,6 +2495,7 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
22922495
#endif
22932496

22942497
conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, free_connection);
2498+
chan_type = uc_type_declare(vm, "ubus.channel", chan_fns, free_connection);
22952499
defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, free_deferred);
22962500
object_type = uc_type_declare(vm, "ubus.object", object_fns, free_object);
22972501
notify_type = uc_type_declare(vm, "ubus.notify", notify_fns, free_notify);

0 commit comments

Comments
 (0)