Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.PHONY: all node pruntime e2e

all: node pruntime e2e

node:
cargo build --release
pruntime:
make -C standalone/pruntime
e2e:
make -C e2e/res
cd e2e && yarn build:proto

3 changes: 3 additions & 0 deletions crates/phactory/api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ fn main() {
"MemoryUsage",
"GatekeeperStatus",
"SystemInfo",
"ContractInfo",
"SidevmInfo",
"ClusterInfo",
] {
builder = builder.type_attribute(
r#type,
Expand Down
2 changes: 1 addition & 1 deletion crates/phactory/api/proto
Submodule proto updated 1 files
+59 −0 pruntime_rpc.proto
4 changes: 0 additions & 4 deletions crates/phactory/src/bin_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ use super::*;

// For bin_api
impl<Platform: pal::Platform + Serialize + DeserializeOwned> Phactory<Platform> {
pub fn getinfo(&self) -> String {
serde_json::to_string_pretty(&self.get_info()).unwrap_or_default()
}

pub fn sign_http_response(&self, body: &[u8]) -> Option<String> {
self.system.as_ref().map(|state| {
let bytes = wrap_content_to_sign(body, SignedContentType::RpcResponse);
Expand Down
6 changes: 5 additions & 1 deletion crates/phactory/src/contracts/pink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Pink {
Query::InkMessage(input_data) => {
let _guard = context
.query_scheduler
.acquire(self.id(), 1)
.acquire(self.id(), context.weight)
.await
.or(Err(QueryError::ServiceUnavailable))?;

Expand Down Expand Up @@ -361,6 +361,10 @@ pub mod cluster {
pub fn remove_cluster(&mut self, cluster_id: &ContractClusterId) -> Option<Cluster> {
self.clusters.remove(cluster_id)
}

pub fn iter(&self) -> impl Iterator<Item = (&ContractClusterId, &Cluster)> {
self.clusters.iter()
}
}

#[derive(Serialize, Deserialize, Default)]
Expand Down
71 changes: 64 additions & 7 deletions crates/phactory/src/contracts/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ use phala_mq::{traits::MessageChannel, SignedMessageChannel};
use phala_scheduler::RequestScheduler;
use runtime::BlockNumber;
use sidevm::{
service::{CommandSender, ExitReason},
service::{Command as SidevmCommand, CommandSender, ExitReason},
OcallAborted, VmId,
};

use super::pink::cluster::ClusterKeeper;
use crate::{
hex,
secret_channel::{KeyPair, SecretMessageChannel, SecretReceiver},
system::{TransactionError, TransactionResult},
types::BlockInfo,
ContractId,
ContractId, H256,
};
use phactory_api::prpc as pb;

use phala_serde_more as more;

Expand All @@ -44,6 +46,7 @@ pub struct QueryContext {
pub sidevm_handle: Option<SidevmHandle>,
pub log_handler: Option<CommandSender>,
pub query_scheduler: RequestScheduler<ContractId>,
pub weight: u32,
}

pub(crate) struct RawData(Vec<u8>);
Expand Down Expand Up @@ -103,6 +106,8 @@ impl<'de> Deserialize<'de> for SidevmHandle {
#[derive(Serialize, Deserialize)]
struct SidevmInfo {
code: Vec<u8>,
code_hash: H256,
start_time: String,
auto_restart: bool,
handle: Arc<Mutex<SidevmHandle>>,
}
Expand All @@ -118,6 +123,8 @@ pub struct FatContract {
cluster_id: phala_mq::ContractClusterId,
contract_id: phala_mq::ContractId,
sidevm_info: Option<SidevmInfo>,
weight: u32,
code_hash: Option<H256>,
}

impl FatContract {
Expand All @@ -128,6 +135,7 @@ impl FatContract {
ecdh_key: KeyPair,
cluster_id: phala_mq::ContractClusterId,
contract_id: phala_mq::ContractId,
code_hash: Option<H256>,
) -> Self {
FatContract {
contract: contract.into(),
Expand All @@ -137,6 +145,8 @@ impl FatContract {
cluster_id,
contract_id,
sidevm_info: None,
weight: 0,
code_hash,
}
}

Expand Down Expand Up @@ -230,9 +240,14 @@ impl FatContract {
bail!("Sidevm can only be started once");
}
}
let handle = do_start_sidevm(spawner, &code, self.contract_id.0)?;
let handle = do_start_sidevm(spawner, &code, self.contract_id.0, self.weight)?;

let code_hash = sp_core::blake2_256(&code).into();
let start_time = chrono::Utc::now().to_rfc3339();
self.sidevm_info = Some(SidevmInfo {
code,
code_hash,
start_time,
handle,
auto_restart,
});
Expand Down Expand Up @@ -261,7 +276,8 @@ impl FatContract {
if !need_restart {
return Ok(());
}
do_start_sidevm(spawner, &sidevm_info.code, self.contract_id.0)?
sidevm_info.start_time = chrono::Utc::now().to_rfc3339();
do_start_sidevm(spawner, &sidevm_info.code, self.contract_id.0, self.weight)?
} else {
return Ok(());
};
Expand All @@ -271,7 +287,7 @@ impl FatContract {
Ok(())
}

pub(crate) fn push_message_to_sidevm(&self, message: sidevm::service::Command) -> Result<()> {
pub(crate) fn push_message_to_sidevm(&self, message: SidevmCommand) -> Result<()> {
let handle = self
.sidevm_info
.as_ref()
Expand Down Expand Up @@ -319,20 +335,61 @@ impl FatContract {
SidevmHandle::Terminated(_) => {}
SidevmHandle::Running(tx) => {
spawner.spawn(async move {
if let Err(err) = tx.send(sidevm::service::Command::Stop).await {
if let Err(err) = tx.send(SidevmCommand::Stop).await {
error!("Failed to send stop command to sidevm: {}", err);
}
});
}
}
}
}

pub fn set_weight(&mut self, weight: u32) {
self.weight = weight;
info!("Updated weight for contarct {:?} to {}", self.id(), weight);
if let Some(SidevmHandle::Running(tx)) = self.sidevm_handle() {
if let Err(_) = tx.try_send(SidevmCommand::UpdateWeight(weight)) {
error!("Failed to update weight for sidevm, maybe it has crashed");
}
}
}
pub fn weight(&self) -> u32 {
self.weight
}

pub fn info(&self) -> pb::ContractInfo {
pb::ContractInfo {
id: hex(&self.contract_id),
weight: self.weight,
code_hash: self.code_hash.as_ref().map(hex).unwrap_or_default(),
sidevm: self.sidevm_info.as_ref().map(|info| {
let handle = info.handle.lock().unwrap().clone();
let start_time = info.start_time.clone();
let code_hash = hex(&info.code_hash);
match handle {
SidevmHandle::Running(_) => pb::SidevmInfo {
state: "running".into(),
code_hash,
start_time,
..Default::default()
},
SidevmHandle::Terminated(reason) => pb::SidevmInfo {
state: "stopped".into(),
code_hash,
start_time,
stop_reason: format!("{}", reason),
},
}
}),
}
}
}

fn do_start_sidevm(
spawner: &sidevm::service::Spawner,
code: &[u8],
id: VmId,
weight: u32,
) -> Result<Arc<Mutex<SidevmHandle>>> {
let max_memory_pages: u32 = 1024; // 64MB
let gas_per_breath = 50_000_000_000_u64; // about 20 ms bench
Expand All @@ -342,7 +399,7 @@ fn do_start_sidevm(
id,
gas_per_breath,
local_cache_ops(),
1, // TODO: set actual weight
weight,
)?;
let handle = Arc::new(Mutex::new(SidevmHandle::Running(sender)));
let cloned_handle = handle.clone();
Expand Down
4 changes: 4 additions & 0 deletions crates/phactory/src/contracts/support/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,8 @@ impl ContractsKeeper {
pub fn remove(&mut self, id: &ContractId) -> Option<FatContract> {
self.0.remove(id)
}

pub fn iter(&self) -> impl Iterator<Item=(&ContractId, &FatContract)> {
self.0.iter()
}
}
4 changes: 4 additions & 0 deletions crates/phactory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,7 @@ fn error_msg(msg: &str) -> Value {
fn derive_key_for_checkpoint(identity_key: &[u8]) -> [u8; 16] {
sp_core::blake2_128(&(identity_key, b"/checkpoint").encode())
}

fn hex(data: impl AsRef<[u8]>) -> String {
format!("0x{}", hex_fmt::HexFmt(data))
}
78 changes: 78 additions & 0 deletions crates/phactory/src/prpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex, MutexGuard};

use crate::benchmark::Flags;
use crate::hex;
use crate::system::{chain_state, System};

use super::*;
Expand Down Expand Up @@ -757,6 +758,65 @@ impl<Platform: pal::Platform + Serialize + DeserializeOwned> Phactory<Platform>
.encode();
Ok(signature)
}

pub fn get_contract_info(
&mut self,
contract_ids: &[String],
) -> RpcResult<pb::GetContractInfoResponse> {
// TODO: use `let else`
let system = match &self.system {
None => return Ok(Default::default()),
Some(system) => system,
};
let contracts = if contract_ids.is_empty() {
system
.contracts
.iter()
.map(|(_, contract)| contract.info())
.collect()
} else {
let mut contracts = vec![];
for id in contract_ids.iter() {
let raw: [u8; 32] = try_decode_hex(&id)
.or(Err(from_display("Invalid contract id")))?
.try_into()
.or(Err(from_display("Invalid contract id")))?;
let contract = system.contracts.get(&raw.into());
// TODO: use `let else`.
let contract = match contract {
None => continue,
Some(contract) => contract,
};
contracts.push(contract.info());
}
contracts
};
Ok(pb::GetContractInfoResponse { contracts })
}

pub fn get_cluster_info(&self) -> RpcResult<pb::GetClusterInfoResponse> {
// TODO: use `let else`.
let system = match &self.system {
None => return Ok(Default::default()),
Some(system) => system,
};
let clusters = system
.contract_clusters
.iter()
.map(|(id, cluster)| {
let contracts = cluster.iter_contracts().map(hex).collect();
let ver = cluster.config.version;
let version = format!("{}.{}", ver.0, ver.1);
pb::ClusterInfo {
id: hex(id),
state_root: hex(cluster.storage.root()),
contracts,
version,
}
})
.collect();
Ok(pb::GetClusterInfoResponse { clusters })
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -1323,4 +1383,22 @@ impl<Platform: pal::Platform + Serialize + DeserializeOwned> PhactoryApi for Rpc
headers,
})
}

async fn get_contract_info(
&mut self,
request: pb::GetContractInfoRequest,
) -> Result<pb::GetContractInfoResponse, prpc::server::Error> {
self.lock_phactory().get_contract_info(&request.contract_ids)
}

async fn get_cluster_info(
&mut self,
_request: (),
) -> Result<pb::GetClusterInfoResponse, prpc::server::Error> {
self.lock_phactory().get_cluster_info()
}
}

fn try_decode_hex(hex_str: &str) -> Result<Vec<u8>, hex::FromHexError> {
hex::decode(hex_str.strip_prefix("0x").unwrap_or(hex_str))
}
Loading