Skip to content

Commit d7e8315

Browse files
committed
ubus: add support for channels
A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. A channel can either be opened by fd using ubus.open_channel(), or created from within a request by using req.new_channel(). When calling req.new_channel, the fd for the other side of the channel is automatically passed to the remote caller. Signed-off-by: Felix Fietkau <[email protected]>
1 parent 53ad35b commit d7e8315

File tree

2 files changed

+223
-25
lines changed

2 files changed

+223
-25
lines changed

Diff for: CMakeLists.txt

+5-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ if(UBUS_SUPPORT)
172172
set_target_properties(ubus_lib PROPERTIES OUTPUT_NAME ubus PREFIX "")
173173
target_link_options(ubus_lib PRIVATE ${UCODE_MODULE_LINK_OPTIONS})
174174
target_link_libraries(ubus_lib ${libubus} ${libblobmsg_json})
175-
list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox})
175+
list(APPEND CMAKE_REQUIRED_LIBRARIES ${libubox} ${libubus})
176176
file(WRITE "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c" "
177177
#include <libubus.h>
178178
int main() { return UBUS_STATUS_NO_MEMORY; }
@@ -182,9 +182,13 @@ if(UBUS_SUPPORT)
182182
"${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test.c")
183183
check_symbol_exists(uloop_fd_set_cb "libubox/uloop.h" FD_SET_CB_EXISTS)
184184
check_function_exists(uloop_timeout_remaining64 REMAINING64_FUNCTION_EXISTS)
185+
check_function_exists(ubus_channel_connect HAVE_CHANNEL_SUPPORT)
185186
if(REMAINING64_FUNCTION_EXISTS)
186187
target_compile_definitions(ubus_lib PUBLIC HAVE_ULOOP_TIMEOUT_REMAINING64)
187188
endif()
189+
if(HAVE_CHANNEL_SUPPORT)
190+
target_compile_definitions(ubus_lib PUBLIC HAVE_UBUS_CHANNEL_SUPPORT)
191+
endif()
188192
if(HAVE_NEW_UBUS_STATUS_CODES)
189193
add_definitions(-DHAVE_NEW_UBUS_STATUS_CODES)
190194
endif()

Diff for: lib/ubus.c

+218-24
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ static uc_resource_type_t *notify_type;
129129
static uc_resource_type_t *object_type;
130130
static uc_resource_type_t *defer_type;
131131
static uc_resource_type_t *conn_type;
132+
static uc_resource_type_t *chan_type;
132133

133134
static uint64_t n_cb_active;
134135
static bool have_own_uloop;
@@ -139,6 +140,9 @@ typedef struct {
139140
struct ubus_context ctx;
140141
struct blob_buf buf;
141142
int timeout;
143+
144+
uc_vm_t *vm;
145+
int registry_index;
142146
} uc_ubus_connection_t;
143147

144148
typedef struct {
@@ -291,6 +295,15 @@ _uc_reg_clear(uc_vm_t *vm, const char *key, size_t idx, size_t nptrs)
291295
}
292296

293297

298+
#define connection_reg_add(vm, conn, cb, disconnect_cb) \
299+
_uc_reg_add(vm, "ubus.connections", 3, conn, cb, disconnect_cb)
300+
301+
#define connection_reg_get(vm, idx, conn, cb, disconnect_cb) \
302+
_uc_reg_get(vm, "ubus.connections", idx, 3, conn, cb, disconnect_cb)
303+
304+
#define connection_reg_clear(vm, idx) \
305+
_uc_reg_clear(vm, "ubus.connections", idx, 3)
306+
294307
#define request_reg_add(vm, request, cb, fdcb, conn) \
295308
_uc_reg_add(vm, "ubus.requests", 4, request, cb, fdcb, conn)
296309

@@ -511,6 +524,7 @@ uc_ubus_connect(uc_vm_t *vm, size_t nargs)
511524
"timeout", UC_INTEGER, true, &timeout);
512525

513526
c = xalloc(sizeof(*c));
527+
c->registry_index = -1;
514528
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
515529

516530
if (ubus_connect_ctx(&c->ctx, socket ? ucv_string_get(socket) : NULL)) {
@@ -554,6 +568,8 @@ _conn_get(uc_vm_t *vm, uc_ubus_connection_t **conn)
554568
{
555569
uc_ubus_connection_t *c = uc_fn_thisval("ubus.connection");
556570

571+
if (!c)
572+
c = uc_fn_thisval("ubus.channel");
557573
if (!c)
558574
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid connection context");
559575

@@ -713,26 +729,14 @@ uc_ubus_have_uloop(void)
713729
return active;
714730
}
715731

716-
static uc_value_t *
717-
uc_ubus_call(uc_vm_t *vm, size_t nargs)
732+
static int
733+
__uc_ubus_call(uc_vm_t *vm, uc_ubus_connection_t *c, uc_ubus_call_res_t *res,
734+
uint32_t id, uc_value_t *funname, uc_value_t *funargs,
735+
uc_value_t *fd, uc_value_t *fdcb, uc_value_t *mret)
718736
{
719-
uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
720-
uc_ubus_call_res_t res = { 0 };
721737
uc_ubus_deferred_t defer = {};
722-
uc_ubus_connection_t *c;
723738
enum ubus_msg_status rv;
724739
int fd_val = -1;
725-
uint32_t id;
726-
727-
conn_get(vm, &c);
728-
729-
args_get_named(vm, nargs,
730-
"object", UC_STRING, REQUIRED, &objname,
731-
"method", UC_STRING, REQUIRED, &funname,
732-
"data", UC_OBJECT, OPTIONAL, &funargs,
733-
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
734-
"fd", UC_INTEGER, NAMED, &fd,
735-
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
736740

737741
blob_buf_init(&c->buf, 0);
738742

@@ -741,20 +745,14 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
741745
if (fd)
742746
fd_val = ucv_int64_get(fd);
743747

744-
rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
745-
746-
if (rv != UBUS_STATUS_OK)
747-
err_return(rv, "Failed to resolve object name '%s'",
748-
ucv_string_get(objname));
749-
750-
res.mret = ucv_is_truish(mret);
748+
res->mret = ucv_is_truish(mret);
751749

752750
rv = ubus_invoke_async_fd(&c->ctx, id, ucv_string_get(funname),
753751
c->buf.head, &defer.request, fd_val);
754752
defer.vm = vm;
755753
defer.ctx = &c->ctx;
756754
defer.request.data_cb = uc_ubus_call_cb;
757-
defer.request.priv = &res;
755+
defer.request.priv = res;
758756
if (ucv_is_callable(fdcb)) {
759757
defer.request.fd_cb = uc_ubus_call_fd_cb;
760758
defer.registry_index = request_reg_add(vm, NULL, NULL, ucv_get(fdcb), NULL);
@@ -766,13 +764,66 @@ uc_ubus_call(uc_vm_t *vm, size_t nargs)
766764
if (defer.request.fd_cb)
767765
request_reg_clear(vm, defer.registry_index);
768766

767+
return rv;
768+
}
769+
770+
static uc_value_t *
771+
uc_ubus_call(uc_vm_t *vm, size_t nargs)
772+
{
773+
uc_value_t *objname, *funname, *funargs, *fd, *fdcb, *mret = NULL;
774+
uc_ubus_call_res_t res = { 0 };
775+
uc_ubus_connection_t *c;
776+
enum ubus_msg_status rv;
777+
uint32_t id;
778+
779+
args_get_named(vm, nargs,
780+
"object", UC_STRING, REQUIRED, &objname,
781+
"method", UC_STRING, REQUIRED, &funname,
782+
"data", UC_OBJECT, OPTIONAL, &funargs,
783+
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
784+
"fd", UC_INTEGER, NAMED, &fd,
785+
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
786+
787+
conn_get(vm, &c);
788+
789+
rv = ubus_lookup_id(&c->ctx, ucv_string_get(objname), &id);
790+
if (rv != UBUS_STATUS_OK)
791+
err_return(rv, "Failed to resolve object name '%s'",
792+
ucv_string_get(objname));
793+
794+
rv = __uc_ubus_call(vm, c, &res, id, funname, funargs, fd, fdcb, mret);
769795
if (rv != UBUS_STATUS_OK)
770796
err_return(rv, "Failed to invoke function '%s' on object '%s'",
771797
ucv_string_get(funname), ucv_string_get(objname));
772798

773799
ok_return(res.res);
774800
}
775801

802+
static uc_value_t *
803+
uc_ubus_chan_request(uc_vm_t *vm, size_t nargs)
804+
{
805+
uc_value_t *funname, *funargs, *fd, *fdcb, *mret = NULL;
806+
uc_ubus_call_res_t res = { 0 };
807+
uc_ubus_connection_t *c;
808+
enum ubus_msg_status rv;
809+
810+
args_get_named(vm, nargs,
811+
"method", UC_STRING, REQUIRED, &funname,
812+
"data", UC_OBJECT, OPTIONAL, &funargs,
813+
"multiple_return", UC_BOOLEAN, OPTIONAL, &mret,
814+
"fd", UC_INTEGER, NAMED, &fd,
815+
"fd_cb", UC_CLOSURE, NAMED, &fdcb);
816+
817+
conn_get(vm, &c);
818+
819+
rv = __uc_ubus_call(vm, c, &res, 0, funname, funargs, fd, fdcb, mret);
820+
if (rv != UBUS_STATUS_OK)
821+
err_return(rv, "Failed to send request '%s' on channel",
822+
ucv_string_get(funname));
823+
824+
ok_return(res.res);
825+
}
826+
776827
static uc_value_t *
777828
uc_ubus_defer(uc_vm_t *vm, size_t nargs)
778829
{
@@ -2115,10 +2166,143 @@ uc_ubus_defer_abort(uc_vm_t *vm, size_t nargs)
21152166
ok_return(ucv_boolean_new(true));
21162167
}
21172168

2169+
/*
2170+
* channel related methods
2171+
* --------------------------------------------------------------------------
2172+
*/
2173+
2174+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2175+
static int
2176+
uc_ubus_channel_req_cb(struct ubus_context *ctx, struct ubus_object *obj,
2177+
struct ubus_request_data *req, const char *method,
2178+
struct blob_attr *msg)
2179+
{
2180+
uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
2181+
uc_value_t *this, *func, *args, *reqproto;
2182+
2183+
connection_reg_get(c->vm, c->registry_index, &this, &func, NULL);
2184+
if (!ucv_is_callable(func))
2185+
return UBUS_STATUS_METHOD_NOT_FOUND;
2186+
2187+
args = blob_array_to_ucv(c->vm, blob_data(msg), blob_len(msg), true);
2188+
reqproto = ucv_object_new(c->vm);
2189+
ucv_object_add(reqproto, "args", ucv_get(args));
2190+
if (method)
2191+
ucv_object_add(reqproto, "type", ucv_get(ucv_string_new(method)));
2192+
2193+
return uc_ubus_handle_reply_common(ctx, req, c->vm, this, func, reqproto);
2194+
}
2195+
2196+
static void
2197+
uc_ubus_channel_disconnect_cb(struct ubus_context *ctx)
2198+
{
2199+
uc_ubus_connection_t *c = container_of(ctx, uc_ubus_connection_t, ctx);
2200+
uc_value_t *this, *func;
2201+
2202+
connection_reg_get(c->vm, c->registry_index, &this, NULL, &func);
2203+
if (ucv_is_callable(func)) {
2204+
uc_vm_stack_push(c->vm, ucv_get(this));
2205+
uc_vm_stack_push(c->vm, ucv_get(func));
2206+
2207+
if (uc_vm_call(c->vm, true, 0) == EXCEPTION_NONE)
2208+
ucv_put(uc_vm_stack_pop(c->vm));
2209+
}
2210+
2211+
blob_buf_free(&c->buf);
2212+
if (c->registry_index >= 0)
2213+
connection_reg_clear(c->vm, c->registry_index);
2214+
if (c->ctx.sock.fd >= 0) {
2215+
ubus_shutdown(&c->ctx);
2216+
c->ctx.sock.fd = -1;
2217+
}
2218+
}
2219+
2220+
static uc_value_t *
2221+
uc_ubus_channel_add(uc_vm_t *vm, uc_ubus_connection_t *c, uc_value_t *cb,
2222+
uc_value_t *disconnect_cb)
2223+
{
2224+
uc_value_t *chan;
2225+
2226+
c->vm = vm;
2227+
if (c->timeout < 0)
2228+
c->timeout = 30;
2229+
2230+
chan = uc_resource_new(chan_type, c);
2231+
c->registry_index = connection_reg_add(vm, ucv_get(chan), ucv_get(cb), ucv_get(disconnect_cb));
2232+
c->ctx.connection_lost = uc_ubus_channel_disconnect_cb;
2233+
ubus_add_uloop(&c->ctx);
2234+
2235+
ok_return(chan);
2236+
}
2237+
#endif
2238+
2239+
static uc_value_t *
2240+
uc_ubus_request_new_channel(uc_vm_t *vm, size_t nargs)
2241+
{
2242+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2243+
uc_ubus_request_t *callctx = uc_fn_thisval("ubus.request");
2244+
uc_value_t *cb, *disconnect_cb, *timeout;
2245+
uc_ubus_connection_t *c;
2246+
int fd;
2247+
2248+
if (!callctx)
2249+
err_return(UBUS_STATUS_INVALID_ARGUMENT, "Invalid call context");
2250+
2251+
args_get(vm, nargs,
2252+
"cb", UC_CLOSURE, true, &cb,
2253+
"disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
2254+
"timeout", UC_INTEGER, true, &timeout);
2255+
2256+
c = xalloc(sizeof(*c));
2257+
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
2258+
2259+
if (ubus_channel_create(&c->ctx, &fd, cb ? uc_ubus_channel_req_cb : NULL)) {
2260+
free(c);
2261+
err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
2262+
}
2263+
2264+
ubus_request_set_fd(callctx->ctx, &callctx->req, fd);
2265+
2266+
return uc_ubus_channel_add(vm, c, cb, disconnect_cb);
2267+
#else
2268+
err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
2269+
#endif
2270+
}
2271+
2272+
2273+
static uc_value_t *
2274+
uc_ubus_channel_connect(uc_vm_t *vm, size_t nargs)
2275+
{
2276+
#ifdef HAVE_UBUS_CHANNEL_SUPPORT
2277+
uc_value_t *fd, *cb, *disconnect_cb, *timeout;
2278+
uc_ubus_connection_t *c;
2279+
2280+
args_get(vm, nargs,
2281+
"fd", UC_INTEGER, false, &fd,
2282+
"cb", UC_CLOSURE, true, &cb,
2283+
"disconnect_cb", UC_CLOSURE, true, &disconnect_cb,
2284+
"timeout", UC_INTEGER, true, &timeout);
2285+
2286+
c = xalloc(sizeof(*c));
2287+
c->timeout = timeout ? ucv_int64_get(timeout) : 30;
2288+
2289+
if (ubus_channel_connect(&c->ctx, ucv_int64_get(fd),
2290+
cb ? uc_ubus_channel_req_cb : NULL)) {
2291+
free(c);
2292+
err_return(UBUS_STATUS_UNKNOWN_ERROR, "Unable to create ubus channel");
2293+
}
2294+
2295+
return uc_ubus_channel_add(vm, c, cb, disconnect_cb);
2296+
#else
2297+
err_return(UBUS_STATUS_NOT_SUPPORTED, "No ubus channel support");
2298+
#endif
2299+
}
2300+
21182301

21192302
static const uc_function_list_t global_fns[] = {
21202303
{ "error", uc_ubus_error },
21212304
{ "connect", uc_ubus_connect },
2305+
{ "open_channel", uc_ubus_channel_connect },
21222306
};
21232307

21242308
static const uc_function_list_t conn_fns[] = {
@@ -2134,6 +2318,12 @@ static const uc_function_list_t conn_fns[] = {
21342318
{ "disconnect", uc_ubus_disconnect },
21352319
};
21362320

2321+
static const uc_function_list_t chan_fns[] = {
2322+
{ "request", uc_ubus_chan_request },
2323+
{ "error", uc_ubus_error },
2324+
{ "disconnect", uc_ubus_disconnect },
2325+
};
2326+
21372327
static const uc_function_list_t defer_fns[] = {
21382328
{ "complete", uc_ubus_defer_complete },
21392329
{ "completed", uc_ubus_defer_completed },
@@ -2152,6 +2342,7 @@ static const uc_function_list_t request_fns[] = {
21522342
{ "defer", uc_ubus_request_defer },
21532343
{ "get_fd", uc_ubus_request_get_fd },
21542344
{ "set_fd", uc_ubus_request_set_fd },
2345+
{ "new_channel", uc_ubus_request_new_channel },
21552346
};
21562347

21572348
static const uc_function_list_t notify_fns[] = {
@@ -2176,6 +2367,8 @@ static void free_connection(void *ud) {
21762367

21772368
if (conn->ctx.sock.fd >= 0)
21782369
ubus_shutdown(&conn->ctx);
2370+
if (conn->registry_index >= 0)
2371+
connection_reg_clear(conn->vm, conn->registry_index);
21792372

21802373
free(conn);
21812374
}
@@ -2254,6 +2447,7 @@ void uc_module_init(uc_vm_t *vm, uc_value_t *scope)
22542447
#endif
22552448

22562449
conn_type = uc_type_declare(vm, "ubus.connection", conn_fns, free_connection);
2450+
chan_type = uc_type_declare(vm, "ubus.channel", chan_fns, free_connection);
22572451
defer_type = uc_type_declare(vm, "ubus.deferred", defer_fns, free_deferred);
22582452
object_type = uc_type_declare(vm, "ubus.object", object_fns, free_object);
22592453
notify_type = uc_type_declare(vm, "ubus.notify", notify_fns, free_notify);

0 commit comments

Comments
 (0)