diff --git a/examples/filter_rust_msgpack/Cargo.lock b/examples/filter_rust_msgpack/Cargo.lock new file mode 100644 index 00000000000..768e9af175f --- /dev/null +++ b/examples/filter_rust_msgpack/Cargo.lock @@ -0,0 +1,388 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "windows-targets", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + +[[package]] +name = "filter_rust_msgpack" +version = "0.1.0" +dependencies = [ + "chrono", + "libc", + "rmp", + "rmp-serde", + "rmpv", + "serde", + "serde_bytes", + "serde_derive", + "serde_json", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.59" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "itoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "js-sys" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + +[[package]] +name = "proc-macro2" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + +[[package]] +name = "rmpv" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e0e0214a4a2b444ecce41a4025792fc31f77c7bb89c46d253953ea8c65701ec" +dependencies = [ + "num-traits", + "rmp", +] + +[[package]] +name = "ryu" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" + +[[package]] +name = "serde" +version = "1.0.196" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_bytes" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.196" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.113" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" diff --git a/examples/filter_rust_msgpack/Cargo.toml b/examples/filter_rust_msgpack/Cargo.toml new file mode 100644 index 00000000000..32960295ab4 --- /dev/null +++ b/examples/filter_rust_msgpack/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "filter_rust_msgpack" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +serde = { version = "*", features = ["derive", "std"] } +serde_json = "1.0" +serde_bytes = "0.11" +serde_derive = "1.0" +rmp-serde = "1.1" +rmpv = "1.0" +rmp = "0.8" +chrono = { version = "0.4", default-features = false, features = ["clock", "std", "oldtime"] } +libc = "0.2" diff --git a/examples/filter_rust_msgpack/README.md b/examples/filter_rust_msgpack/README.md new file mode 100644 index 00000000000..081460f319d --- /dev/null +++ b/examples/filter_rust_msgpack/README.md @@ -0,0 +1,59 @@ +# Fluent Bit / filter_rust_msgpack + +This source source tree provides an example of WASM program which uses msgpack format written in Rust. + +## Prerequisites + +* Rust + * rustc 1.75.0 (82e1608df 2023-12-21)) or later +* [rustup](https://rustup.rs/) (For preparing rust compiler and toolchains) + +## How to build + +Add `wasm32-unknown-unknown` target for Rust toolchain: + +```console +$ rustup target add wasm32-unknown-unknown +``` + +Then, execute _cargo build_ as follows: + +```console +$ cargo build --target wasm32-unknown-unknown --release +``` + +Finally, `*.wasm` file will be created: + +```console +$ ls target/wasm32-unknown-unknown/release/*.wasm +target/wasm32-unknown-unknown/release/filter_rust_msgpack.wasm +``` + +## How to confirm WASM filter integration + +Create fluent-bit configuration file as follows: + +```ini +[SERVICE] + Flush 1 + Daemon Off + Log_Level info + HTTP_Server Off + HTTP_Listen 0.0.0.0 + HTTP_Port 2020 + +[INPUT] + Name dummy + Tag dummy.local + +[FILTER] + Name wasm + match dummy.* + WASM_Path /path/to/filter_rust_msgpack.wasm + Function_Name rust_filter_msgpack + accessible_paths .,/path/to/fluent-bit + +[OUTPUT] + Name stdout + Match * +``` diff --git a/examples/filter_rust_msgpack/src/lib.rs b/examples/filter_rust_msgpack/src/lib.rs new file mode 100644 index 00000000000..d8fe6870856 --- /dev/null +++ b/examples/filter_rust_msgpack/src/lib.rs @@ -0,0 +1,83 @@ +// Import pure and fast msgpack library written in Rust +use rmp_serde::Serializer; +use serde::{Deserialize, Serialize}; +use rmpv::Value; + +// Import chrono library to handle time related operation conveniently +use chrono::{TimeZone, Utc}; +use std::collections::BTreeMap; +use std::io::Cursor; +use std::io::Write; +use std::os::raw::c_char; +use std::slice; +use std::str; + +#[derive(Debug, PartialEq, Deserialize, Serialize)] +struct FilteredLog { + message: String, + time: String, + tag: String, + original: BTreeMap, + lang: String, +} + +#[inline] +fn value_to_string(val: &Value) -> String { + if val.is_str() { + let into = match val { + Value::String(s) => s.clone().into_str(), + _ => unreachable!() + }; + match into { + Some(i) => i.to_string(), + None => "".to_string(), + } + } else { + format!("{}", val) + } +} + +#[no_mangle] +pub extern "C" fn rust_filter_msgpack(tag: *const c_char, tag_len: u32, time_sec: u32, time_nsec: u32, record: *const c_char, record_len: u32) -> *const u8 { + let slice_tag: &[u8] = unsafe { slice::from_raw_parts(tag as *const u8, tag_len as usize) }; + let mut vt: Vec = Vec::new(); + vt.write(slice_tag).expect("Unable to write"); + let vtag = str::from_utf8(&vt).unwrap(); + let slice_record: &[u8] = + unsafe { slice::from_raw_parts(record as *const u8, record_len as usize) }; + let de = rmpv::decode::read_value(&mut Cursor::new(slice_record)).unwrap(); + + let mut map = BTreeMap::new(); + + let binding = de.as_map().unwrap(); + let size = binding.len(); + + // Create BTreeMap to handle collection operations easily + for i in 0..size { + let (k, v) = &binding[i]; + let key = value_to_string(k); + let value = value_to_string(v); + map.insert(key, value); + } + + map.insert("platform".to_string(), "wasm".to_string()); + + let dt = Utc.timestamp_opt(time_sec as i64, time_nsec).unwrap(); + let time = dt.format("%Y-%m-%dT%H:%M:%S.%9f %z").to_string(); + let mut buf = Vec::new(); + let msg = match map.get("message") { + Some(m) => m.to_string(), + None => "None".to_string(), + }; + let val = FilteredLog { + message: msg, + time: format!("{}", time), + tag: vtag.to_owned(), + original: map, + lang: "Rust".to_string(), + }; + + let mut se = Serializer::new(&mut buf).with_struct_map(); + val.serialize(&mut se).unwrap(); + buf.as_ptr() +} diff --git a/include/fluent-bit/wasm/flb_wasm.h b/include/fluent-bit/wasm/flb_wasm.h index d3809221458..ecd3fc9d2b7 100644 --- a/include/fluent-bit/wasm/flb_wasm.h +++ b/include/fluent-bit/wasm/flb_wasm.h @@ -25,6 +25,7 @@ #include #include #include +#include /* WASM Context */ struct flb_wasm { @@ -48,6 +49,14 @@ char *flb_wasm_call_function_format_json(struct flb_wasm *fw, const char *functi const char* tag_data, size_t tag_len, struct flb_time t, const char* record_data, size_t record_len); + +int flb_wasm_format_msgpack_mode(const char *tag, int tag_len, + struct flb_log_event *log_event, + void **out_buf, size_t *out_size); +char *flb_wasm_call_function_format_msgpack(struct flb_wasm *fw, const char *function_name, + const char* tag_data, size_t tag_len, + struct flb_time t, + const char *records, size_t records_len); int flb_wasm_call_wasi_main(struct flb_wasm *fw); void flb_wasm_buffer_free(struct flb_wasm *fw); void flb_wasm_destroy(struct flb_wasm *fw); diff --git a/plugins/filter_wasm/filter_wasm.c b/plugins/filter_wasm/filter_wasm.c index e07a4b5c61b..8d652e1f85a 100644 --- a/plugins/filter_wasm/filter_wasm.c +++ b/plugins/filter_wasm/filter_wasm.c @@ -56,6 +56,7 @@ static int cb_wasm_filter(const void *data, size_t bytes, size_t json_size; int root_type; struct flb_wasm *wasm = NULL; + size_t buf_size; struct flb_filter_wasm *ctx = filter_context; struct flb_log_event_encoder log_encoder; @@ -99,23 +100,44 @@ static int cb_wasm_filter(const void *data, size_t bytes, off = log_decoder.offset; alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ last_off = off; + switch(ctx->event_format) { + case FLB_FILTER_WASM_FMT_JSON: + /* Encode as JSON from msgpack */ + buf = flb_msgpack_to_json_str(alloc_size, log_event.body); + + if (buf) { + /* Execute WASM program */ + ret_val = flb_wasm_call_function_format_json(wasm, ctx->wasm_function_name, + tag, tag_len, + log_event.timestamp, + buf, strlen(buf)); + + flb_free(buf); + } + else { + flb_plg_error(ctx->ins, "encode as JSON from msgpack is failed"); - /* Encode as JSON from msgpack */ - buf = flb_msgpack_to_json_str(alloc_size, log_event.body); + goto on_error; + } + break; + case FLB_FILTER_WASM_FMT_MSGPACK: + ret = flb_wasm_format_msgpack_mode(tag, tag_len, + &log_event, + (void **)&buf, &buf_size); + if (ret < 0) { + flb_plg_error(ctx->ins, "format msgpack is failed"); + + goto on_error; + } - if (buf) { /* Execute WASM program */ - ret_val = flb_wasm_call_function_format_json(wasm, ctx->wasm_function_name, - tag, tag_len, - log_event.timestamp, - buf, strlen(buf)); + ret_val = flb_wasm_call_function_format_msgpack(wasm, ctx->wasm_function_name, + tag, tag_len, + log_event.timestamp, + buf, buf_size); flb_free(buf); - } - else { - flb_plg_error(ctx->ins, "encode as JSON from msgpack is failed"); - - goto on_error; + break; } if (ret_val == NULL) { /* Skip record */ @@ -123,8 +145,8 @@ static int cb_wasm_filter(const void *data, size_t bytes, continue; } - - if (strlen(ret_val) == 0) { /* Skip record */ + if (ctx->event_format == FLB_FILTER_WASM_FMT_JSON && + strlen(ret_val) == 0) { /* Skip record */ flb_plg_debug(ctx->ins, "WASM function returned empty string. Skip."); flb_free(ret_val); continue; @@ -144,16 +166,38 @@ static int cb_wasm_filter(const void *data, size_t bytes, } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - /* Convert JSON payload to msgpack */ - ret = flb_pack_json(ret_val, strlen(ret_val), - &json_buf, &json_size, &root_type, NULL); + switch(ctx->event_format) { + case FLB_FILTER_WASM_FMT_JSON: + /* Convert JSON payload to msgpack */ + ret = flb_pack_json(ret_val, strlen(ret_val), + &json_buf, &json_size, &root_type, NULL); + + if (ret == 0 && root_type == JSMN_OBJECT) { + /* JSON found, pack it msgpack representation */ + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + &log_encoder, + json_buf, + json_size); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(&log_encoder); + } + else { + flb_log_event_encoder_rollback_record(&log_encoder); + } + } + else { + flb_plg_error(ctx->ins, "invalid JSON format. ret: %d, buf: %s", ret, ret_val); - if (ret == 0 && root_type == JSMN_OBJECT) { - /* JSON found, pack it msgpack representation */ + flb_log_event_encoder_rollback_record(&log_encoder); + } + break; + case FLB_FILTER_WASM_FMT_MSGPACK: + /* msgpack found, pack it msgpack representation */ ret = flb_log_event_encoder_set_body_from_raw_msgpack( &log_encoder, - json_buf, - json_size); + ret_val, + strlen(ret_val)); if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_commit_record(&log_encoder); @@ -161,11 +205,8 @@ static int cb_wasm_filter(const void *data, size_t bytes, else { flb_log_event_encoder_rollback_record(&log_encoder); } - } - else { - flb_plg_error(ctx->ins, "invalid JSON format. ret: %d, buf: %s", ret, ret_val); - flb_log_event_encoder_rollback_record(&log_encoder); + break; } } else { @@ -177,9 +218,11 @@ static int cb_wasm_filter(const void *data, size_t bytes, flb_free(ret_val); } - /* release 'json_buf' if it was allocated */ - if (json_buf != NULL) { - flb_free(json_buf); + if (ctx->event_format == FLB_FILTER_WASM_FMT_JSON) { + /* release 'json_buf' if it was allocated */ + if (json_buf != NULL) { + flb_free(json_buf); + } } } @@ -288,6 +331,8 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins, { struct flb_filter_wasm *ctx = NULL; int ret = -1; + const char *tmp; + int event_format; /* Allocate space for the configuration */ ctx = flb_calloc(1, sizeof(struct flb_filter_wasm)); @@ -301,6 +346,22 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins, goto init_error; } + tmp = flb_filter_get_property("event_format", f_ins); + if (tmp) { + if (strcasecmp(tmp, FLB_FMT_STR_JSON) == 0) { + event_format = FLB_FILTER_WASM_FMT_JSON; + } + else if (strcasecmp(tmp, FLB_FMT_STR_MSGPACK) == 0) { + event_format = FLB_FILTER_WASM_FMT_MSGPACK; + } else { + flb_error("[filter_wasm] unknown format: %s", tmp); + goto init_error; + } + ctx->event_format = event_format; + } else { + ctx->event_format = FLB_FILTER_WASM_FMT_JSON; + } + flb_wasm_init(config); /* Set context */ @@ -323,6 +384,11 @@ static int cb_wasm_exit(void *data, struct flb_config *config) } static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "event_format", NULL, + 0, FLB_FALSE, 0, + "Sepecify the ingesting event format for wasm program" + }, { FLB_CONFIG_MAP_STR, "wasm_path", NULL, 0, FLB_TRUE, offsetof(struct flb_filter_wasm, wasm_path), diff --git a/plugins/filter_wasm/filter_wasm.h b/plugins/filter_wasm/filter_wasm.h index af803c36d46..f00b131b07f 100644 --- a/plugins/filter_wasm/filter_wasm.h +++ b/plugins/filter_wasm/filter_wasm.h @@ -29,11 +29,21 @@ #include +enum { + FLB_FILTER_WASM_FMT_JSON = 0, + FLB_FILTER_WASM_FMT_MSGPACK, + FLB_FILTER_WASM_FMT_ERROR, +}; + +#define FLB_FMT_STR_JSON "json" +#define FLB_FMT_STR_MSGPACK "msgpack" + struct flb_filter_wasm { flb_sds_t wasm_path; struct mk_list *accessible_dir_list; /* list of directories to be * accesible from WASM */ flb_sds_t wasm_function_name; + int event_format; struct flb_filter_instance *ins; struct flb_wasm *wasm; }; diff --git a/src/wasm/flb_wasm.c b/src/wasm/flb_wasm.c index f7806684fef..8c7d24bb334 100644 --- a/src/wasm/flb_wasm.c +++ b/src/wasm/flb_wasm.c @@ -28,6 +28,8 @@ #include #include +#include + #ifdef FLB_SYSTEM_WINDOWS #define STDIN_FILENO (_fileno( stdin )) #define STDOUT_FILENO (_fileno( stdout )) @@ -252,6 +254,87 @@ char *flb_wasm_call_function_format_json(struct flb_wasm *fw, const char *functi return (char *)flb_strdup(func_result); } +/* + * Msgpack Format but for WASM + * ------------------------------ + * This mode is used if the char (C string) is only permitted as UTF-8 + * environment such as Rust. + * + * { + * RECORD/MAP + * } + */ +int flb_wasm_format_msgpack_mode(const char *tag, int tag_len, + struct flb_log_event *log_event, + void **out_buf, size_t *out_size) +{ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + msgpack_unpacked result; + + /* + * if the case, we need to compose a new outgoing buffer instead + * of use the original one. + */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + msgpack_unpacked_init(&result); + + msgpack_pack_object(&mp_pck, *log_event->body); + + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + msgpack_unpacked_destroy(&result); + + return 0; +} + +char *flb_wasm_call_function_format_msgpack(struct flb_wasm *fw, const char *function_name, + const char* tag_data, size_t tag_len, + struct flb_time t, + const char *records, size_t records_len) +{ + const char *exception; + uint8_t *func_result; + wasm_function_inst_t func = NULL; + /* We should pass the length that is null terminator included into + * WASM runtime. This is why we add +1 for tag_len and record_len. + */ + fw->tag_buffer = wasm_runtime_module_dup_data(fw->module_inst, tag_data, tag_len+1); + fw->record_buffer = wasm_runtime_module_dup_data(fw->module_inst, records, records_len); + uint32_t func_args[6] = {fw->tag_buffer, tag_len, + t.tm.tv_sec, t.tm.tv_nsec, + fw->record_buffer, records_len}; + size_t args_size = sizeof(func_args) / sizeof(uint32_t); + + if (!(func = wasm_runtime_lookup_function(fw->module_inst, function_name, NULL))) { + flb_error("The %s wasm function is not found.", function_name); + return NULL; + } + + if (!wasm_runtime_call_wasm(fw->exec_env, func, args_size, func_args)) { + exception = wasm_runtime_get_exception(fw->module_inst); + flb_error("Got exception running wasm code: %s", exception); + wasm_runtime_clear_exception(fw->module_inst); + return NULL; + } + + // The return value is stored in the first element of the function argument array. + // It's a WASM pointer to null-terminated c char string. + // WAMR allows us to map WASM pointers to native pointers. + if (!wasm_runtime_validate_app_str_addr(fw->module_inst, func_args[0])) { + flb_warn("[wasm] returned value is invalid"); + return NULL; + } + func_result = wasm_runtime_addr_app_to_native(fw->module_inst, func_args[0]); + + if (func_result == NULL) { + return NULL; + } + + return (char *)flb_strdup(func_result); +} + int flb_wasm_call_wasi_main(struct flb_wasm *fw) { #if WASM_ENABLE_LIBC_WASI != 0 diff --git a/tests/runtime/data/wasm/msgpack/filter_rust_mp.wasm b/tests/runtime/data/wasm/msgpack/filter_rust_mp.wasm new file mode 100755 index 00000000000..f2f7487190a Binary files /dev/null and b/tests/runtime/data/wasm/msgpack/filter_rust_mp.wasm differ diff --git a/tests/runtime/filter_wasm.c b/tests/runtime/filter_wasm.c index 4cc1afcb807..e0e68c371d8 100644 --- a/tests/runtime/filter_wasm.c +++ b/tests/runtime/filter_wasm.c @@ -458,11 +458,73 @@ void flb_test_drop_all_records(void) } +void flb_test_append_kv_on_msgpack(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + char *output = NULL; + char *input = "[0, {\"key\":\"val\"}]"; + char *result; + struct flb_lib_out_cb cb_data; + + /* clear previous output */ + clear_output(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = callback_test; + cb_data.data = NULL; + + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "wasm", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "event_format", "msgpack", + "wasm_path", DPATH_WASM "/msgpack/filter_rust_mp.wasm", + "function_name", "rust_filter_mp", + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test.wasm.mp", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output */ + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test.wasm.mp", + "format", "json", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + flb_lib_push(ctx, in_ffd, input, strlen(input)); + wait_with_timeout(2000, &output); + result = strstr(output, "\"platform\":\"wasm\""); + TEST_CHECK(result != NULL); + + /* clean up */ + flb_lib_free(output); + + flb_stop(ctx); + flb_destroy(ctx); +} + TEST_LIST = { {"hello_world", flb_test_helloworld}, {"append_tag", flb_test_append_tag}, {"numeric_records", flb_test_numerics_records}, {"array_contains_null", flb_test_array_contains_null}, {"drop_all_records", flb_test_drop_all_records}, + {"append_kv_on_msgpack_format", flb_test_append_kv_on_msgpack}, {NULL, NULL} }; diff --git a/tests/runtime/wasm/rust/filter_rust_mp/Cargo.lock b/tests/runtime/wasm/rust/filter_rust_mp/Cargo.lock new file mode 100644 index 00000000000..63438ee4541 --- /dev/null +++ b/tests/runtime/wasm/rust/filter_rust_mp/Cargo.lock @@ -0,0 +1,388 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "windows-targets", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + +[[package]] +name = "filter_rust_mp" +version = "0.1.0" +dependencies = [ + "chrono", + "libc", + "rmp", + "rmp-serde", + "rmpv", + "serde", + "serde_bytes", + "serde_derive", + "serde_json", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.59" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "itoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "js-sys" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + +[[package]] +name = "proc-macro2" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + +[[package]] +name = "rmpv" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e0e0214a4a2b444ecce41a4025792fc31f77c7bb89c46d253953ea8c65701ec" +dependencies = [ + "num-traits", + "rmp", +] + +[[package]] +name = "ryu" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" + +[[package]] +name = "serde" +version = "1.0.196" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_bytes" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.196" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.113" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" diff --git a/tests/runtime/wasm/rust/filter_rust_mp/Cargo.toml b/tests/runtime/wasm/rust/filter_rust_mp/Cargo.toml new file mode 100644 index 00000000000..baec6f26006 --- /dev/null +++ b/tests/runtime/wasm/rust/filter_rust_mp/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "filter_rust_mp" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +serde = { version = "*", features = ["derive", "std"] } +serde_json = "1.0" +serde_bytes = "0.11" +serde_derive = "1.0" +rmp-serde = "1.1" +rmpv = "1.0" +rmp = "0.8" +chrono = { version = "0.4", default-features = false, features = ["clock", "std", "oldtime"] } +libc = "0.2" diff --git a/tests/runtime/wasm/rust/filter_rust_mp/README.md b/tests/runtime/wasm/rust/filter_rust_mp/README.md new file mode 100644 index 00000000000..54edfc944f7 --- /dev/null +++ b/tests/runtime/wasm/rust/filter_rust_mp/README.md @@ -0,0 +1,39 @@ +# Fluent Bit / filter_rust_mp + +This source tree provides an test program of WASM program which uses msgpack format written in Rust. + +## Prerequisites + +* Rust + * rustc 1.75.0 (82e1608df 2023-12-21)) or later +* [rustup](https://rustup.rs/) (For preparing rust compiler and toolchains) + +## How to build + +Add `wasm32-unknown-unknown` target for Rust toolchain: + +```console +$ rustup target add wasm32-unknown-unknown +``` + +Then, execute _cargo build_ as follows: + +```console +$ cargo build --target wasm32-unknown-unknown --release +``` + +Finally, `*.wasm` file will be created: + +```console +$ ls target/wasm32-unknown-unknown/release/*.wasm +target/wasm32-unknown-unknown/release/filter_rust_mp.wasm +``` + +## How to put test data of WASM filter + +Testcase of Wasm filters, which is written in Rust, on fluent-bit is put under `tests/runtime/data/wasm/msgpack` directory. + +```console +$ cp target/wasm32-unknown-unknown/release/filter_rust_mp.wasm \ + /top/path/of/fluent-bit/tests/runtime/data/wasm/msgpack +``` diff --git a/tests/runtime/wasm/rust/filter_rust_mp/src/lib.rs b/tests/runtime/wasm/rust/filter_rust_mp/src/lib.rs new file mode 100644 index 00000000000..f0ee445b852 --- /dev/null +++ b/tests/runtime/wasm/rust/filter_rust_mp/src/lib.rs @@ -0,0 +1,59 @@ +// Import pure and fast msgpack library written in Rust +use rmp_serde::Serializer; +use serde::Serialize; +use rmpv::Value; + +// Import chrono library to handle time related operation conveniently +use std::collections::BTreeMap; +use std::io::Cursor; +use std::io::Write; +use std::os::raw::c_char; +use std::slice; + +#[inline] +fn value_to_string(val: &Value) -> String { + if val.is_str() { + let into = match val { + Value::String(s) => s.clone().into_str(), + _ => unreachable!() + }; + match into { + Some(i) => i.to_string(), + None => "".to_string(), + } + } else { + format!("{}", val) + } +} + +#[no_mangle] +pub extern "C" fn rust_filter_mp(tag: *const c_char, tag_len: u32, _time_sec: u32, _time_nsec: u32, record: *const c_char, record_len: u32) -> *const u8 { + let slice_tag: &[u8] = unsafe { slice::from_raw_parts(tag as *const u8, tag_len as usize) }; + let mut vt: Vec = Vec::new(); + vt.write(slice_tag).expect("Unable to write"); + let slice_record: &[u8] = + unsafe { slice::from_raw_parts(record as *const u8, record_len as usize) }; + let de = rmpv::decode::read_value(&mut Cursor::new(slice_record)).unwrap(); + + let mut map = BTreeMap::new(); + + let binding = de.as_map().unwrap(); + let size = binding.len(); + + // Create BTreeMap to handle collection operations easily + for i in 0..size { + let (k, v) = &binding[i]; + let key = value_to_string(k); + let value = value_to_string(v); + map.insert(key, value); + } + + map.insert("platform".to_string(), "wasm".to_string()); + + let mut buf = Vec::new(); + let val = map; + + let mut se = Serializer::new(&mut buf).with_struct_map(); + val.serialize(&mut se).unwrap(); + buf.as_ptr() +}