Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions ipa-core/src/ff/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,46 @@ pub trait Serializable: Sized {
.map_err(Into::into)
.unwrap_infallible()
}

/// This method provides the same functionality as [`Self::deserialize`] without
/// compile-time guarantees on the size of `buf`. Therefore, it is not appropriate
/// to use in production code. It is provided as convenience method
/// for tests that are ok to panic.
///
/// ## Panics
/// If the size of `buf` is not equal to `Self::Size` or if `buf` bytes
/// are not a valid representation of this instance. See [`Self::deserialize`] for
/// more details.
///
/// [`Self::deserialize`]: Self::deserialize
#[cfg(test)]
#[must_use]
fn deserialize_from_slice(buf: &[u8]) -> Self {
use typenum::Unsigned;

assert_eq!(buf.len(), Self::Size::USIZE);

let mut arr = GenericArray::default();
arr.copy_from_slice(buf);
Self::deserialize(&arr).unwrap()
}

/// This method provides the same functionality as [`Self::serialize`] without
/// compile-time guarantees on the size of `buf`. Therefore, it is not appropriate
/// to use in production code. It is provided as convenience method
/// for tests that are ok to panic.
///
/// ## Panics
/// If the size of `buf` is not equal to `Self::Size`.
#[cfg(test)]
fn serialize_to_slice(&self, buf: &mut [u8]) {
use typenum::Unsigned;

assert_eq!(buf.len(), Self::Size::USIZE);

let dest = GenericArray::<_, Self::Size>::from_mut_slice(buf);
self.serialize(dest);
}
}

pub trait ArrayAccess {
Expand Down
31 changes: 19 additions & 12 deletions ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ pub use gateway_exports::{Gateway, MpcReceivingEnd, SendingEnd, ShardReceivingEn
pub use prss_protocol::negotiate as negotiate_prss;
#[cfg(feature = "web-app")]
pub use transport::WrappedAxumBodyStream;
#[cfg(feature = "in-memory-infra")]
pub use transport::{
config as in_memory_config, InMemoryMpcNetwork, InMemoryShardNetwork, InMemoryTransport,
};
pub use transport::{
make_owned_handler, query, routing, ApiError, BodyStream, BytesStream, HandlerBox, HandlerRef,
HelperResponse, Identity as TransportIdentity, LengthDelimitedStream, LogErrors, NoQueryId,
NoResourceIdentifier, NoStep, QueryIdBinding, ReceiveRecords, RecordsStream, RequestHandler,
RouteParams, SingleRecordStream, StepBinding, StreamCollection, StreamKey, Transport,
WrappedBoxBodyStream,
};
#[cfg(feature = "in-memory-infra")]
pub use transport::{InMemoryMpcNetwork, InMemoryShardNetwork, InMemoryTransport};
use typenum::{Const, ToUInt, Unsigned, U8};
use x25519_dalek::PublicKey;

Expand Down Expand Up @@ -130,6 +132,20 @@ impl TryFrom<usize> for HelperIdentity {
}
}

impl TryFrom<&str> for HelperIdentity {
type Error = String;

fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
for identity in HelperIdentity::make_three() {
if identity.as_str() == value {
return Ok(identity);
}
}

Err(format!("{value} is not a valid helper identity"))
}
}

impl From<HelperIdentity> for u8 {
fn from(value: HelperIdentity) -> Self {
value.id
Expand All @@ -138,16 +154,7 @@ impl From<HelperIdentity> for u8 {

impl Debug for HelperIdentity {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self.id {
1 => "A",
2 => "B",
3 => "C",
_ => unreachable!(),
}
)
write!(f, "{}", self.as_str())
}
}

Expand Down
148 changes: 148 additions & 0 deletions ipa-core/src/helpers/transport/in_memory/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::borrow::Cow;

use crate::{
helpers::{HelperIdentity, Role, RoleAssignment},
protocol::Gate,
sharding::ShardIndex,
sync::Arc,
};

pub type DynStreamInterceptor = Arc<dyn StreamInterceptor<Context = InspectContext>>;

/// The interface for stream interceptors.
///
/// It is used in test infrastructure to inspect
/// incoming streams and perform actions based on
/// their contents.
///
/// The `peek` method takes a context object and a mutable reference
/// to the data buffer. It is responsible for inspecting the data
/// and performing any necessary actions based on the context.
pub trait StreamInterceptor: Send + Sync {
/// The context type for the stream peeker.
/// See [`InspectContext`] and [`MaliciousHelperContext`] for
/// details.
type Context;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important to have this associated type, vs. using InspectContext directly? I also wonder if there is some existing type like Addr in the transport layer that could be reused here.

It also seems like using Context in these names is potentially confusing. Maybe InspectState or StreamMetadata?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use Addr here, the tradeoff is people will be writing add.gate.unwrap() and figure out a way to get rid of generic I on Addr. I feel that we will eventually want to inspect cross-shard traffic in addition to cross-helper, so generics may not work well here.

I agree it is messier on the stream inspector implementation side, but makes the usage of it a bit simpler


/// Inspects the stream data and performs any necessary actions.
/// The `data` buffer may be modified in-place.
///
/// ## Implementation considerations
/// This method is free to mutate the `data` buffer
/// however it wants, but it needs to account for the following:
///
/// ### Prime field streams
/// Corrupting streams that send data as sequences of serialized
/// [`PrimeField`] may cause `GreaterThanPrimeError` errors at
/// the serialization layer, instead of maybe intended malicious
/// validation failures.
///
/// ### Boolean fields
/// Flipping bits in fixed-size bit strings is indistinguishable
/// from additive attacks without additional measures implemented
/// at the transport layer, like checksumming, share consistency
/// checks, etc.
fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>);
}

impl<F: Fn(&InspectContext, &mut Vec<u8>) + Send + Sync + 'static> StreamInterceptor for F {
type Context = InspectContext;

fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>) {
(self)(ctx, data);
}
}

/// The general context provided to stream inspectors.
#[derive(Debug)]
pub struct InspectContext {
/// The shard index of this instance.
/// This is `None` for non-sharded helpers.
pub shard_index: Option<ShardIndex>,
/// The MPC identity of this instance.
/// The combination (`shard_index`, `identity`)
/// uniquely identifies a single shard within
/// a multi-sharded MPC system.
pub identity: HelperIdentity,
/// Helper that will receive this stream.
pub dest: Cow<'static, str>,
/// Circuit gate this stream is tied to.
pub gate: Gate,
}

/// The no-op stream peeker, which does nothing.
/// This is used as a default value for stream
/// peekers that don't do anything.
#[inline]
#[must_use]
pub fn passthrough() -> Arc<dyn StreamInterceptor<Context = InspectContext>> {
Arc::new(|_ctx: &InspectContext, _data: &mut Vec<u8>| {})
}

/// This narrows the implementation of stream seeker
/// to a specific helper role. Only streams sent from
/// that helper will be inspected by the provided closure.
/// Other helper's streams will be left untouched.
///
/// It does not support sharded environments and will panic
/// if used in a sharded test infrastructure.
#[derive(Debug)]
pub struct MaliciousHelper<F> {
identity: HelperIdentity,
role_assignment: RoleAssignment,
inner: F,
}

impl<F: Fn(&MaliciousHelperContext, &mut Vec<u8>) + Send + Sync> MaliciousHelper<F> {
pub fn new(role: Role, role_assignment: &RoleAssignment, peeker: F) -> Arc<Self> {
Arc::new(Self {
identity: role_assignment.identity(role),
role_assignment: role_assignment.clone(),
inner: peeker,
})
}

fn context(&self, ctx: &InspectContext) -> MaliciousHelperContext {
let dest = HelperIdentity::try_from(ctx.dest.as_ref()).unwrap_or_else(|_| {
panic!(
"MaliciousServerContext::from: invalid destination: {}",
ctx.dest
)
});
let dest = self.role_assignment.role(dest);

MaliciousHelperContext {
shard_index: ctx.shard_index,
dest,
gate: ctx.gate.clone(),
}
}
}

/// Special contexts for stream inspectors
/// created with [`MaliciousHelper`].
/// It provides convenient access to the
/// destination role and assumes a single MPC
/// helper intercepting streams.
#[derive(Debug)]
pub struct MaliciousHelperContext {
/// The shard index of this instance.
/// This is `None` for non-sharded helpers.
pub shard_index: Option<ShardIndex>,
/// Helper that will receive this stream.
pub dest: Role,
/// Circuit gate this stream is tied to.
pub gate: Gate,
}

impl<F: Fn(&MaliciousHelperContext, &mut Vec<u8>) + Send + Sync> StreamInterceptor
for MaliciousHelper<F>
{
type Context = InspectContext;

fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>) {
if ctx.identity == self.identity {
(self.inner)(&self.context(ctx), data);
}
}
}
32 changes: 26 additions & 6 deletions ipa-core/src/helpers/transport/in_memory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
pub mod config;
mod sharding;
mod transport;

use std::array;

pub use sharding::InMemoryShardNetwork;
pub use transport::Setup;
use transport::TransportConfigBuilder;

use crate::{
helpers::{HandlerRef, HelperIdentity},
helpers::{
in_memory_config::DynStreamInterceptor, transport::in_memory::config::passthrough,
HandlerRef, HelperIdentity,
},
sync::{Arc, Weak},
};

Expand All @@ -21,15 +24,32 @@ pub struct InMemoryMpcNetwork {

impl Default for InMemoryMpcNetwork {
fn default() -> Self {
Self::new(array::from_fn(|_| None))
Self::new(Self::noop_handlers())
}
}

impl InMemoryMpcNetwork {
#[must_use]
pub fn noop_handlers() -> [Option<HandlerRef>; 3] {
[None, None, None]
}

#[must_use]
pub fn new(handlers: [Option<HandlerRef>; 3]) -> Self {
let [mut first, mut second, mut third]: [_; 3] =
HelperIdentity::make_three().map(Setup::new);
Self::with_stream_interceptor(handlers, &passthrough())
}

#[must_use]
pub fn with_stream_interceptor(
handlers: [Option<HandlerRef>; 3],
interceptor: &DynStreamInterceptor,
) -> Self {
let [mut first, mut second, mut third]: [_; 3] = HelperIdentity::make_three().map(|i| {
let mut config_builder = TransportConfigBuilder::for_helper(i);
config_builder.with_interceptor(interceptor);

Setup::with_config(i, config_builder.not_sharded())
});

first.connect(&mut second);
second.connect(&mut third);
Expand Down
18 changes: 16 additions & 2 deletions ipa-core/src/helpers/transport/in_memory/sharding.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
helpers::{
transport::in_memory::transport::{InMemoryTransport, Setup},
in_memory_config::{passthrough, DynStreamInterceptor},
transport::in_memory::transport::{InMemoryTransport, Setup, TransportConfigBuilder},
HelperIdentity,
},
sharding::ShardIndex,
Expand All @@ -22,9 +23,22 @@ pub struct InMemoryShardNetwork {

impl InMemoryShardNetwork {
pub fn with_shards<I: Into<ShardIndex>>(shard_count: I) -> Self {
Self::with_stream_interceptor(shard_count, &passthrough())
}

pub fn with_stream_interceptor<I: Into<ShardIndex>>(
shard_count: I,
interceptor: &DynStreamInterceptor,
) -> Self {
let shard_count = shard_count.into();
let shard_network: [_; 3] = HelperIdentity::make_three().map(|h| {
let mut shard_connections = shard_count.iter().map(Setup::new).collect::<Vec<_>>();
let mut config_builder = TransportConfigBuilder::for_helper(h);
config_builder.with_interceptor(interceptor);

let mut shard_connections = shard_count
.iter()
.map(|i| Setup::with_config(i, config_builder.bind_to_shard(i)))
.collect::<Vec<_>>();
for i in 0..shard_connections.len() {
let (lhs, rhs) = shard_connections.split_at_mut(i);
if let Some((a, _)) = lhs.split_last_mut() {
Expand Down
Loading