diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 2e65e1a517..c3bf110a3a 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -46,11 +46,10 @@ type eventFilter struct { keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match maxResults int // maximum number of results to collect, 0 is unlimited - mu sync.Mutex - collected []*CollectedEvent - lastTaken time.Time - ch chan<- interface{} - excludeReverted bool + mu sync.Mutex + collected []*CollectedEvent + lastTaken time.Time + ch chan<- interface{} } var _ Filter = (*eventFilter)(nil) @@ -84,9 +83,6 @@ func (f *eventFilter) ClearSubChannel() { } func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error { - if f.excludeReverted && revert { - return nil - } if !f.matchTipset(te) { return nil } @@ -380,8 +376,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) return nil } -func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, - keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) { +func (m *EventFilterManager) Fill( + ctx context.Context, + minHeight, + maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, +) (EventFilter, error) { m.mu.Lock() if m.currentHeight == 0 { // sync in progress, we haven't had an Apply @@ -400,28 +402,44 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a } f := &eventFilter{ - id: id, - minHeight: minHeight, - maxHeight: maxHeight, - tipsetCid: tipsetCid, - addresses: addresses, - keysWithCodec: keysWithCodec, - maxResults: m.MaxFilterResults, - excludeReverted: excludeReverted, + id: id, + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + maxResults: m.MaxFilterResults, } if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight { // Filter needs historic events + excludeReverted := tipsetCid != cid.Undef if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { return nil, err } } + return f, nil +} + +func (m *EventFilterManager) Install( + ctx context.Context, + minHeight, + maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, +) (EventFilter, error) { + f, err := m.Fill(ctx, minHeight, maxHeight, tipsetCid, addresses, keysWithCodec) + if err != nil { + return nil, err + } + m.mu.Lock() if m.filters == nil { m.filters = make(map[types.FilterID]EventFilter) } - m.filters[id] = f + m.filters[f.(*eventFilter).id] = f m.mu.Unlock() return f, nil diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index 9116cce7fd..4216ef3c6b 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -32,13 +32,19 @@ type ChainAccessor interface { } type EventFilterManager interface { + Fill( + ctx context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + ) (filter.EventFilter, error) Install( ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keysWithCodec map[string][]types.ActorEventBlock, - excludeReverted bool, ) (filter.EventFilter, error) Remove(ctx context.Context, id types.FilterID) error } @@ -102,22 +108,15 @@ func (a *ActorEventHandler) GetActorEventsRaw(ctx context.Context, evtFilter *ty return nil, err } - // Install a filter just for this call, collect events, remove the filter + // Fill a filter and collect events tipSetCid, err := params.GetTipSetCid() if err != nil { return nil, fmt.Errorf("failed to get tipset cid: %w", err) } - excludeReverted := tipSetCid == cid.Undef - f, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, excludeReverted) + f, err := a.eventFilterManager.Fill(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields) if err != nil { return nil, err } - defer func() { - // Remove the temporary filter regardless of the original context. - if err := a.eventFilterManager.Remove(context.Background(), f.ID()); err != nil { - log.Warnf("failed to remove filter: %s", err) - } - }() return getCollected(ctx, f), nil } @@ -218,7 +217,7 @@ func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilt if err != nil { return nil, fmt.Errorf("failed to get tipset cid: %w", err) } - fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields) if err != nil { return nil, err } diff --git a/node/impl/full/actor_events_test.go b/node/impl/full/actor_events_test.go index b4c4e103c0..d5dea04caa 100644 --- a/node/impl/full/actor_events_test.go +++ b/node/impl/full/actor_events_test.go @@ -147,16 +147,15 @@ func TestGetActorEventsRaw(t *testing.T) { req.NoError(err) testCases := []struct { - name string - filter *types.ActorEventFilter - currentHeight int64 - installMinHeight int64 - installMaxHeight int64 - installTipSetKey cid.Cid - installAddresses []address.Address - installKeysWithCodec map[string][]types.ActorEventBlock - installExcludeReverted bool - expectErr string + name string + filter *types.ActorEventFilter + currentHeight int64 + installMinHeight int64 + installMaxHeight int64 + installTipSetKey cid.Cid + installAddresses []address.Address + installKeysWithCodec map[string][]types.ActorEventBlock + expectErr string }{ { name: "nil filter", @@ -222,7 +221,7 @@ func TestGetActorEventsRaw(t *testing.T) { filter := newMockFilter(ctx, t, rng, collectedEvents) if tc.expectErr == "" { - efm.expectInstall(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, tc.installExcludeReverted, filter) + efm.expectFill(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, filter) } ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, tc.currentHeight)}) @@ -239,7 +238,6 @@ func TestGetActorEventsRaw(t *testing.T) { req.NoError(err) expectedEvents := collectedToActorEvents(collectedEvents) req.Equal(expectedEvents, gotEvents) - efm.requireRemoved(filter.ID()) } }) } @@ -288,7 +286,7 @@ func TestSubscribeActorEventsRaw(t *testing.T) { allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight) historicalEvents := allEvents[0 : (currentHeight-filterStartHeight)*eventsPerEpoch] mockFilter := newMockFilter(ctx, t, rng, historicalEvents) - mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, false, mockFilter) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, mockFilter) ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) req.NoError(err) @@ -449,7 +447,7 @@ func TestSubscribeActorEventsRaw_OnlyHistorical(t *testing.T) { mockFilterManager := newMockEventFilterManager(t) allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight) mockFilter := newMockFilter(ctx, t, rng, allEvents) - mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, mockFilter) ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) req.NoError(err) @@ -619,46 +617,55 @@ type filterManagerExpectation struct { tipsetCid cid.Cid addresses []address.Address keysWithCodec map[string][]types.ActorEventBlock - excludeReverted bool returnFilter filter.EventFilter } type mockEventFilterManager struct { - t *testing.T - expectations []filterManagerExpectation - removed []types.FilterID - lk sync.Mutex + t *testing.T + installExpectations []filterManagerExpectation + fillExpectations []filterManagerExpectation + removed []types.FilterID + lk sync.Mutex } func newMockEventFilterManager(t *testing.T) *mockEventFilterManager { return &mockEventFilterManager{t: t} } -func (m *mockEventFilterManager) expectInstall( +func (m *mockEventFilterManager) expectFill( minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keysWithCodec map[string][]types.ActorEventBlock, - excludeReverted bool, returnFilter filter.EventFilter) { m.t.Helper() - m.expectations = append(m.expectations, filterManagerExpectation{ - minHeight: minHeight, - maxHeight: maxHeight, - tipsetCid: tipsetCid, - addresses: addresses, - keysWithCodec: keysWithCodec, - excludeReverted: excludeReverted, - returnFilter: returnFilter, + m.fillExpectations = append(m.fillExpectations, filterManagerExpectation{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + returnFilter: returnFilter, }) } -func (m *mockEventFilterManager) requireRemoved(id types.FilterID) { +func (m *mockEventFilterManager) expectInstall( + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + returnFilter filter.EventFilter) { + m.t.Helper() - m.lk.Lock() - defer m.lk.Unlock() - require.Contains(m.t, m.removed, id) + m.installExpectations = append(m.installExpectations, filterManagerExpectation{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + returnFilter: returnFilter, + }) } func (m *mockEventFilterManager) requireRemovedEventually(id types.FilterID, timeout time.Duration) { @@ -674,25 +681,43 @@ func (m *mockEventFilterManager) requireRemovedEventually(id types.FilterID, tim }, timeout, 10*time.Millisecond, "filter %x not removed", id) } +func (m *mockEventFilterManager) Fill( + _ context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, +) (filter.EventFilter, error) { + + require.True(m.t, len(m.fillExpectations) > 0, "unexpected call to Fill") + exp := m.fillExpectations[0] + m.fillExpectations = m.fillExpectations[1:] + // check the expectation matches the call then return the attached filter + require.Equal(m.t, exp.minHeight, minHeight) + require.Equal(m.t, exp.maxHeight, maxHeight) + require.Equal(m.t, exp.tipsetCid, tipsetCid) + require.Equal(m.t, exp.addresses, addresses) + require.Equal(m.t, exp.keysWithCodec, keysWithCodec) + return exp.returnFilter, nil +} + func (m *mockEventFilterManager) Install( _ context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keysWithCodec map[string][]types.ActorEventBlock, - excludeReverted bool, ) (filter.EventFilter, error) { - require.True(m.t, len(m.expectations) > 0, "unexpected call to Install") - exp := m.expectations[0] - m.expectations = m.expectations[1:] + require.True(m.t, len(m.installExpectations) > 0, "unexpected call to Install") + exp := m.installExpectations[0] + m.installExpectations = m.installExpectations[1:] // check the expectation matches the call then return the attached filter require.Equal(m.t, exp.minHeight, minHeight) require.Equal(m.t, exp.maxHeight, maxHeight) require.Equal(m.t, exp.tipsetCid, tipsetCid) require.Equal(m.t, exp.addresses, addresses) require.Equal(m.t, exp.keysWithCodec, keysWithCodec) - require.Equal(m.t, exp.excludeReverted, excludeReverted) return exp.returnFilter, nil } diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index dd970a89d7..a3164c000a 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1727,16 +1727,13 @@ func (e *EthEventHandler) ethGetEventsForFilter(ctx context.Context, filterSpec } } - // Create a temporary filter - excludeReverted := pf.tipsetCid == cid.Undef - f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, excludeReverted) + // Fill a filter and collect events + f, err := e.EventFilterManager.Fill(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys) if err != nil { return nil, xerrors.Errorf("failed to install event filter: %w", err) } ces := f.TakeCollectedEvents(ctx) - _ = e.uninstallFilter(ctx, f) - return ces, nil } @@ -1958,8 +1955,7 @@ func (e *EthEventHandler) EthNewFilter(ctx context.Context, filterSpec *ethtypes return ethtypes.EthFilterID{}, err } - excludeReverted := pf.tipsetCid == cid.Undef - f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, excludeReverted) + f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys) if err != nil { return ethtypes.EthFilterID{}, xerrors.Errorf("failed to install event filter: %w", err) } @@ -2125,7 +2121,7 @@ func (e *EthEventHandler) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) } } - f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keysToKeysWithCodec(keys), false) + f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keysToKeysWithCodec(keys)) if err != nil { // clean up any previous filters added and stop the sub _, _ = e.EthUnsubscribe(ctx, sub.id)