Skip to content

Commit

Permalink
fix(events): handle subscribe and get/fill cases separately
Browse files Browse the repository at this point in the history
* subscribe-type calls always wants future reverts but only historical reverts
  when a tipset is specified
* get-type calls only want reverts when a tipset is specified

Add a new Fill() method to avoid the unnecessary install + remove step for get-
type calls.

Closes: #12584
  • Loading branch information
rvagg committed Oct 11, 2024
1 parent 4b384ff commit 0b053cc
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 100 deletions.
56 changes: 37 additions & 19 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ type eventFilter struct {
keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match
maxResults int // maximum number of results to collect, 0 is unlimited

mu sync.Mutex
collected []*CollectedEvent
lastTaken time.Time
ch chan<- interface{}
excludeReverted bool
mu sync.Mutex
collected []*CollectedEvent
lastTaken time.Time
ch chan<- interface{}
}

var _ Filter = (*eventFilter)(nil)
Expand Down Expand Up @@ -84,9 +83,6 @@ func (f *eventFilter) ClearSubChannel() {
}

func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error {
if f.excludeReverted && revert {
return nil
}
if !f.matchTipset(te) {
return nil
}
Expand Down Expand Up @@ -380,8 +376,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
return nil
}

func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
func (m *EventFilterManager) Fill(
ctx context.Context,
minHeight,
maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
) (EventFilter, error) {
m.mu.Lock()
if m.currentHeight == 0 {
// sync in progress, we haven't had an Apply
Expand All @@ -400,28 +402,44 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
}

f := &eventFilter{
id: id,
minHeight: minHeight,
maxHeight: maxHeight,
tipsetCid: tipsetCid,
addresses: addresses,
keysWithCodec: keysWithCodec,
maxResults: m.MaxFilterResults,
excludeReverted: excludeReverted,
id: id,
minHeight: minHeight,
maxHeight: maxHeight,
tipsetCid: tipsetCid,
addresses: addresses,
keysWithCodec: keysWithCodec,
maxResults: m.MaxFilterResults,
}

if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
excludeReverted := tipsetCid != cid.Undef
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {
return nil, err
}
}

return f, nil
}

func (m *EventFilterManager) Install(
ctx context.Context,
minHeight,
maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
) (EventFilter, error) {
f, err := m.Fill(ctx, minHeight, maxHeight, tipsetCid, addresses, keysWithCodec)
if err != nil {
return nil, err
}

m.mu.Lock()
if m.filters == nil {
m.filters = make(map[types.FilterID]EventFilter)
}
m.filters[id] = f
m.filters[f.(*eventFilter).id] = f
m.mu.Unlock()

return f, nil
Expand Down
21 changes: 10 additions & 11 deletions node/impl/full/actor_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ type ChainAccessor interface {
}

type EventFilterManager interface {
Fill(
ctx context.Context,
minHeight, maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
) (filter.EventFilter, error)
Install(
ctx context.Context,
minHeight, maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
excludeReverted bool,
) (filter.EventFilter, error)
Remove(ctx context.Context, id types.FilterID) error
}
Expand Down Expand Up @@ -102,22 +108,15 @@ func (a *ActorEventHandler) GetActorEventsRaw(ctx context.Context, evtFilter *ty
return nil, err
}

// Install a filter just for this call, collect events, remove the filter
// Fill a filter and collect events
tipSetCid, err := params.GetTipSetCid()
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
excludeReverted := tipSetCid == cid.Undef
f, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, excludeReverted)
f, err := a.eventFilterManager.Fill(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields)
if err != nil {
return nil, err
}
defer func() {
// Remove the temporary filter regardless of the original context.
if err := a.eventFilterManager.Remove(context.Background(), f.ID()); err != nil {
log.Warnf("failed to remove filter: %s", err)
}
}()
return getCollected(ctx, f), nil
}

Expand Down Expand Up @@ -218,7 +217,7 @@ func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilt
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 0b053cc

Please sign in to comment.