Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 7 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 43 additions & 30 deletions confidence-resolver/src/resolve_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,23 @@ impl ResolveLogger {
}
}

fn with_state<F: FnOnce(&ResolveInfoState)>(&self, f: F) -> Fallible<()> {
fn with_state<F: FnOnce(&ResolveInfoState)>(&self, f: F) {
loop {
let lock = self.state.load_full();
let Ok(rg) = lock.try_read() else {
// this is lock free. If we didn't get the read lock it means checkpoint has
// swapped and acquired the write lock so we can just retry and get the next state
continue;
};
let state = rg.as_ref().or_fail()?;
f(state);
break Ok(());
// In an earlier version we failed on this Option being None, leading to flakey tests.
// The Option can be none if thread T1 has a reference to the lock, but parks before try_lock.
// In the meantime a checkpoint thread T2, swaps out the lock, takes a write lock, takes the option
// (replacing it with None) and releases the lock. Now T1 wakes up and tries and succeeds the read
// lock. This scenario is rare and as above it's sound to retry,
if let Some(state) = rg.as_ref() {
f(state);
break;
};
}
}

Expand All @@ -63,7 +69,7 @@ impl ResolveLogger {
resolve_context: &pb::Struct,
client_credential: &str,
values: &[crate::ResolvedValue<'_>],
) -> Fallible<()> {
) {
self.with_state(|state: &ResolveInfoState| {
state
.client_resolve_info
Expand Down Expand Up @@ -121,7 +127,7 @@ impl ResolveLogger {
assigned_flags: &[crate::FlagToApply],
client: &crate::Client,
sdk: &Option<crate::flags_resolver::Sdk>,
) -> Fallible<()> {
) {
self.with_state(|state: &ResolveInfoState| {
let client_info = Some(pb::ClientInfo {
client: client.client_name.to_string(),
Expand Down Expand Up @@ -181,22 +187,29 @@ impl ResolveLogger {
})
}

pub fn checkpoint(&self) -> Fallible<pb::WriteFlagLogsRequest> {
let state = {
let lock = self
.state
.swap(Arc::new(RwLock::new(Some(ResolveInfoState::default()))));
let mut wg = lock.write().or_fail()?;
wg.take().or_fail()?
};
let client_resolve_info = build_client_resolve_info(&state);
let flag_resolve_info = build_flag_resolve_info(&state);
Ok(pb::WriteFlagLogsRequest {
flag_resolve_info,
client_resolve_info,
flag_assigned: state.flag_assigned.into_iter().collect(),
telemetry_data: None,
})
pub fn checkpoint(&self) -> pb::WriteFlagLogsRequest {
let lock = self
.state
.swap(Arc::new(RwLock::new(Some(ResolveInfoState::default()))));
// the only operation we do under write-lock is take the option, and that can't panic, so lock shouldn't be poisoned,
// even so, if it some how was it's safe to still use the value.
let mut wg = lock
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
// also shouldn't be possible for this Option to be None as we never insert None and only one thread can swap the value out
// if this assertion somehow is faulty, returning an empty WriteFlagLogsRequest is sound.
wg.take()
.map(|state| {
let client_resolve_info = build_client_resolve_info(&state);
let flag_resolve_info = build_flag_resolve_info(&state);
pb::WriteFlagLogsRequest {
flag_resolve_info,
client_resolve_info,
flag_assigned: state.flag_assigned.into_iter().collect(),
telemetry_data: None,
}
})
.unwrap_or_default()
}
}

Expand Down Expand Up @@ -373,7 +386,7 @@ mod tests {
let cred = "clients/test/clientCredentials/test";
let rv = [];
logger.log_resolve("id", &ctx, cred, &rv);
let req = logger.checkpoint().unwrap();
let req = logger.checkpoint();
// find the client entry in the built request
let crec = req
.client_resolve_info
Expand Down Expand Up @@ -463,7 +476,7 @@ mod tests {
let cred = "clients/test/clientCredentials/test";
let rv = [];
logger.log_resolve("id", &ctx, cred, &rv);
let req = logger.checkpoint().unwrap();
let req = logger.checkpoint();
let crec = req
.client_resolve_info
.iter()
Expand Down Expand Up @@ -522,7 +535,7 @@ mod tests {

let cred = "clients/test/clientCredentials/test";
logger.log_resolve("id", &Struct::default(), cred, &rv);
let req = logger.checkpoint().unwrap();
let req = logger.checkpoint();

let flag_info = req
.flag_resolve_info
Expand Down Expand Up @@ -593,7 +606,7 @@ mod tests {

let cred = "clients/test/clientCredentials/test";
logger.log_resolve("id", &Struct::default(), cred, &rv);
let req = logger.checkpoint().unwrap();
let req = logger.checkpoint();

let flag_info = req
.flag_resolve_info
Expand Down Expand Up @@ -706,17 +719,17 @@ mod tests {
let lg = logger.clone();
let tx_thread = tx.clone();
let chk_handle = thread::spawn(move || {
for _ in 0..3 {
for _ in 0..10 {
thread::sleep(Duration::from_millis(10));
let req = lg.checkpoint().unwrap();
let _ = tx_thread.send(req);
tx_thread.send(lg.checkpoint()).unwrap();
}
});

chk_handle.join().unwrap();
done.store(true, Ordering::Relaxed);
let total_expected = handles.into_iter().map(|h| h.join().unwrap()).sum::<i64>();
let _ = tx.send(logger.checkpoint().unwrap());
// logger.checkpoint().iter().
tx.send(logger.checkpoint()).unwrap();

// Aggregate all checkpoint outputs
let mut sum_variants: i64 = 0;
Expand Down
4 changes: 3 additions & 1 deletion wasm/go-host/resolver_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ func (r *ResolverApi) consume(addr uint32) []byte {
length := binary.LittleEndian.Uint32(lenBytes) - 4

// Read data
data, _ := memory.Read(addr, length)
view, _ := memory.Read(addr, length)
// make a copy before freeing
data := append([]byte(nil), view...)

// Free memory
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion wasm/node-host/src/wasm-msg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export class ApiBuilder<T = {}> {

private consume<T>(ptr:number, codec:Codec<T>): T {
const data = this.viewBuffer(ptr);
const res = codec.decode(data);
const res = codec.decode(data.slice());
this.free(ptr);
return res;
}
Expand Down
1 change: 0 additions & 1 deletion wasm/rust-guest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ prost = { version = "0.12", default-features = false }
prost-types = { version = "0.12", default-features = false }
# TODO re-export Bytes
bytes = { version = "1.4.0", default-features = false }
wee_alloc = "0.4"
arc-swap = "1.7.1"

[build-dependencies]
Expand Down
9 changes: 3 additions & 6 deletions wasm/rust-guest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use arc_swap::ArcSwapOption;
use bytes::Bytes;
use prost::Message;

#[global_allocator]
static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;

use confidence_resolver::proto::confidence::flags::resolver::v1::{
ResolveWithStickyRequest, WriteFlagLogsRequest,
};
Expand Down Expand Up @@ -135,7 +132,7 @@ impl Host for WasmHost {
client: &Client,
sdk: &Option<Sdk>,
) {
let _ = LOGGER.log_resolve(
LOGGER.log_resolve(
resolve_id,
evaluation_context,
&client.client_credential_name,
Expand All @@ -150,7 +147,7 @@ impl Host for WasmHost {
client: &Client,
sdk: &Option<Sdk>,
) {
let _ = LOGGER.log_assigns(resolve_id, evaluation_context, assigned_flags, client, sdk);
LOGGER.log_assigns(resolve_id, evaluation_context, assigned_flags, client, sdk);
}

fn encrypt_resolve_token(token_data: &[u8], _encryption_key: &[u8]) -> Result<Vec<u8>, String> {
Expand Down Expand Up @@ -208,7 +205,7 @@ wasm_msg_guest! {
Ok((&resolve_result.resolved_value).into())
}
fn flush_logs(_request:Void) -> WasmResult<WriteFlagLogsRequest> {
LOGGER.checkpoint().map_err(|e| e.into())
Ok(LOGGER.checkpoint())
}

}
Expand Down
Loading