Skip to content

Commit

Permalink
Market Actor Events (#1471)
Browse files Browse the repository at this point in the history
* publish deals event

* activate deals

* deal terminated

* deal completed event

* run CI

* rustfmt

* replace deal_id with id in market events
  • Loading branch information
aarshkshah1992 authored Nov 3, 2023
1 parent 688d404 commit 4b3eb90
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 31 deletions.
38 changes: 38 additions & 0 deletions actors/market/src/emit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use fil_actors_runtime::runtime::Runtime;
use fil_actors_runtime::{ActorError, EventBuilder};
use fvm_shared::deal::DealID;
use fvm_shared::ActorID;

/// Indicates a deal has been published.
pub fn deal_published(
rt: &impl Runtime,
client: ActorID,
provider: ActorID,
deal_id: DealID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-published")
.field_indexed("client", &client)
.field_indexed("provider", &provider)
.field_indexed("id", &deal_id)
.build()?,
)
}

/// Indicates a deal has been activated.
pub fn deal_activated(rt: &impl Runtime, deal_id: DealID) -> Result<(), ActorError> {
rt.emit_event(&EventBuilder::new().typ("deal-activated").field_indexed("id", &deal_id).build()?)
}

/// Indicates a deal has been terminated.
pub fn deal_terminated(rt: &impl Runtime, deal_id: DealID) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new().typ("deal-terminated").field_indexed("id", &deal_id).build()?,
)
}

/// Indicates a deal has been completed successfully.
pub fn deal_completed(rt: &impl Runtime, deal_id: DealID) -> Result<(), ActorError> {
rt.emit_event(&EventBuilder::new().typ("deal-completed").field_indexed("id", &deal_id).build()?)
}
19 changes: 18 additions & 1 deletion actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ mod deal;
mod state;
mod types;

pub mod emit;

#[cfg(feature = "fil-actor")]
fil_actors_runtime::wasm_trampoline!(Actor);

Expand Down Expand Up @@ -478,6 +480,13 @@ impl Actor {
.with_context_code(ExitCode::USR_ILLEGAL_ARGUMENT, || {
format!("failed to notify deal with proposal cid {}", valid_deal.cid)
})?;

emit::deal_published(
rt,
valid_deal.proposal.client.id().unwrap(),
valid_deal.proposal.provider.id().unwrap(),
new_deal_ids[i],
)?;
}

Ok(PublishStorageDealsReturn { ids: new_deal_ids, valid_deals: valid_input_bf })
Expand Down Expand Up @@ -656,6 +665,9 @@ impl Actor {
unsealed_cid: data_commitment,
});
batch_gen.add_success();
for d in p.deal_ids {
emit::deal_activated(rt, d)?;
}
}
Err(e) => {
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
Expand Down Expand Up @@ -728,6 +740,7 @@ impl Actor {
state.slash_epoch = params.epoch;

deal_states.push((id, state));
emit::deal_terminated(rt, id)?;
}

st.put_deal_states(rt.store(), &deal_states)?;
Expand Down Expand Up @@ -810,7 +823,7 @@ impl Actor {
})?;
}

let (slash_amount, remove_deal) =
let (slash_amount, remove_deal, complete_success) =
st.process_deal_update(rt.store(), &state, &deal, curr_epoch)?;

if slash_amount.is_negative() {
Expand Down Expand Up @@ -842,6 +855,10 @@ impl Actor {
"failed to delete deal proposal: does not exist"
));
}

if complete_success {
emit::deal_completed(rt, deal_id)?;
}
} else {
if !slash_amount.is_zero() {
return Err(actor_error!(
Expand Down
10 changes: 5 additions & 5 deletions actors/market/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl State {
state: &DealState,
deal: &DealProposal,
epoch: ChainEpoch,
) -> Result<(TokenAmount, bool), ActorError>
) -> Result<(TokenAmount, bool, bool), ActorError>
where
BS: Blockstore,
{
Expand All @@ -595,7 +595,7 @@ impl State {
// This would be the case that the first callback somehow triggers before it is scheduled to
// This is expected not to be able to happen
if deal.start_epoch > epoch {
return Ok((TokenAmount::zero(), false));
return Ok((TokenAmount::zero(), false, false));
}

let payment_end_epoch = if ever_slashed {
Expand Down Expand Up @@ -654,14 +654,14 @@ impl State {
self.slash_balance(store, &deal.provider, &slashed, Reason::ProviderCollateral)
.context("slashing balance")?;

return Ok((slashed, true));
return Ok((slashed, true, false));
}

if epoch >= deal.end_epoch {
self.process_deal_expired(store, deal, state)?;
return Ok((TokenAmount::zero(), true));
return Ok((TokenAmount::zero(), true, true));
}
Ok((TokenAmount::zero(), false))
Ok((TokenAmount::zero(), false, false))
}

/// Deal start deadline elapsed without appearing in a proven sector.
Expand Down
5 changes: 4 additions & 1 deletion actors/market/tests/activate_deal_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn fail_when_caller_is_not_the_provider_of_the_deal() {
deal_ids: vec![deal_id],
}],
false,
vec![],
)
.unwrap();
let res: BatchActivateDealsResult =
Expand Down Expand Up @@ -87,6 +88,7 @@ fn fail_when_deal_has_not_been_published_before() {
deal_ids: vec![DealID::from(42u32)],
}],
false,
vec![],
)
.unwrap();
let res: BatchActivateDealsResult =
Expand Down Expand Up @@ -123,6 +125,7 @@ fn fail_when_deal_has_already_been_activated() {
deal_ids: vec![deal_id],
}],
false,
vec![],
)
.unwrap();
let res: BatchActivateDealsResult =
Expand Down Expand Up @@ -169,6 +172,6 @@ fn fail_when_deal_has_already_been_expired() {
let mut st: State = rt.get_state::<State>();
st.next_id = deal_id + 1;

let res = activate_deals(&rt, sector_expiry, PROVIDER_ADDR, 0, &[deal_id]);
let res = activate_deals_for(&rt, sector_expiry, PROVIDER_ADDR, 0, &[deal_id], vec![]);
assert_eq!(res.activation_results.codes(), vec![EX_DEAL_EXPIRED])
}
9 changes: 6 additions & 3 deletions actors/market/tests/batch_activate_deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ fn sectors_fail_and_succeed_independently_during_batch_activation() {
SectorDeals { deal_ids: vec![id_4], sector_type, sector_expiry: END_EPOCH + 2 }, // sector succeeds
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res =
batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, vec![id_4]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -172,7 +173,8 @@ fn handles_sectors_empty_of_deals_gracefully() {
SectorDeals { deal_ids: vec![], sector_type, sector_expiry: END_EPOCH + 2 }, // empty sector
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res =
batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, vec![id_1]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -221,7 +223,8 @@ fn fails_to_activate_sectors_containing_duplicate_deals() {
SectorDeals { deal_ids: vec![id_3], sector_type, sector_expiry: END_EPOCH },
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, vec![id_1, id_3])
.unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down
8 changes: 8 additions & 0 deletions actors/market/tests/cron_tick_deal_expiry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use fil_actors_runtime::network::EPOCHS_IN_DAY;
use fil_actors_runtime::runtime::Policy;
use fil_actors_runtime::EventBuilder;
use fvm_shared::clock::ChainEpoch;

mod harness;
Expand Down Expand Up @@ -40,6 +41,7 @@ fn deal_is_correctly_processed_if_first_cron_after_expiry() {
// total payment = (end - start)
let current = END_EPOCH + 5;
rt.set_epoch(current);

let (pay, slashed) =
cron_tick_and_assert_balances(&rt, CLIENT_ADDR, PROVIDER_ADDR, current, deal_id);
let duration = END_EPOCH - START_EPOCH;
Expand Down Expand Up @@ -168,6 +170,9 @@ fn expired_deal_should_unlock_the_remaining_client_and_provider_locked_balance_a
let p_escrow = get_balance(&rt, &PROVIDER_ADDR).balance;

// move the current epoch so that deal is expired
rt.expect_emitted_event(
EventBuilder::new().typ("deal-completed").field_indexed("id", &deal_id).build().unwrap(),
);
rt.set_epoch(END_EPOCH + 1000);
cron_tick(&rt);

Expand Down Expand Up @@ -203,6 +208,9 @@ fn all_payments_are_made_for_a_deal_client_withdraws_collateral_and_client_accou

// move the current epoch so that deal is expired
rt.set_epoch(END_EPOCH + 100);
rt.expect_emitted_event(
EventBuilder::new().typ("deal-completed").field_indexed("id", &deal_id).build().unwrap(),
);
cron_tick(&rt);
assert_eq!(deal_proposal.client_collateral, get_balance(&rt, &CLIENT_ADDR).balance);

Expand Down
4 changes: 2 additions & 2 deletions actors/market/tests/cron_tick_deal_slashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ fn deal_is_slashed_at_the_end_epoch_should_not_be_slashed_and_should_be_consider
// as deal is considered to be expired.

rt.set_epoch(END_EPOCH);
terminate_deals(&rt, PROVIDER_ADDR, &[deal_id]);
terminate_deals_for(&rt, PROVIDER_ADDR, &[deal_id], vec![]);

// on the next cron tick, it will be processed as expired
let current = END_EPOCH + 300;
Expand Down Expand Up @@ -352,7 +352,7 @@ fn regular_payments_till_deal_expires_and_then_we_attempt_to_slash_it_but_it_wil
// as deal is considered to be expired.
let duration = END_EPOCH - current;
rt.set_epoch(END_EPOCH);
terminate_deals(&rt, PROVIDER_ADDR, &[deal_id]);
terminate_deals_for(&rt, PROVIDER_ADDR, &[deal_id], vec![]);

// next epoch for cron schedule is endEpoch + 300 ->
// setting epoch to higher than that will cause deal to be expired, payment will be made
Expand Down
Loading

0 comments on commit 4b3eb90

Please sign in to comment.