diff --git a/CHANGELOG.md b/CHANGELOG.md index d32cc823226..7fa4de71bda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ ## Bug Fixes - Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567)) +- Event APIs (Eth events and actor events) should only return reverted events if client queries by specific block hash / tipset. Eth and actor event subscription APIs should always return reverted events to enable accurate observation of real-time changes. ([filecoin-project/lotus#12585](https://github.com/filecoin-project/lotus/pull/12585)) + +## Improvements ## Improvements diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index fa17d235ea9..c3bf110a3a7 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -376,8 +376,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) return nil } -func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, - keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) { +func (m *EventFilterManager) Fill( + ctx context.Context, + minHeight, + maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, +) (EventFilter, error) { m.mu.Lock() if m.currentHeight == 0 { // sync in progress, we haven't had an Apply @@ -407,16 +413,33 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight { // Filter needs historic events + excludeReverted := tipsetCid != cid.Undef if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { return nil, err } } + return f, nil +} + +func (m *EventFilterManager) Install( + ctx context.Context, + minHeight, + maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, +) (EventFilter, error) { + f, err := m.Fill(ctx, minHeight, maxHeight, tipsetCid, addresses, keysWithCodec) + if err != nil { + return nil, err + } + m.mu.Lock() if m.filters == nil { m.filters = make(map[types.FilterID]EventFilter) } - m.filters[id] = f + m.filters[f.(*eventFilter).id] = f m.mu.Unlock() return f, nil diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index bb192a4cf28..4216ef3c6bf 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -32,13 +32,19 @@ type ChainAccessor interface { } type EventFilterManager interface { + Fill( + ctx context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + ) (filter.EventFilter, error) Install( ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keysWithCodec map[string][]types.ActorEventBlock, - excludeReverted bool, ) (filter.EventFilter, error) Remove(ctx context.Context, id types.FilterID) error } @@ -102,21 +108,15 @@ func (a *ActorEventHandler) GetActorEventsRaw(ctx context.Context, evtFilter *ty return nil, err } - // Install a filter just for this call, collect events, remove the filter + // Fill a filter and collect events tipSetCid, err := params.GetTipSetCid() if err != nil { return nil, fmt.Errorf("failed to get tipset cid: %w", err) } - f, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + f, err := a.eventFilterManager.Fill(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields) if err != nil { return nil, err } - defer func() { - // Remove the temporary filter regardless of the original context. - if err := a.eventFilterManager.Remove(context.Background(), f.ID()); err != nil { - log.Warnf("failed to remove filter: %s", err) - } - }() return getCollected(ctx, f), nil } @@ -217,7 +217,7 @@ func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilt if err != nil { return nil, fmt.Errorf("failed to get tipset cid: %w", err) } - fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) + fm, err := a.eventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields) if err != nil { return nil, err } diff --git a/node/impl/full/actor_events_test.go b/node/impl/full/actor_events_test.go index b4c4e103c0c..16bcfe06ab9 100644 --- a/node/impl/full/actor_events_test.go +++ b/node/impl/full/actor_events_test.go @@ -146,17 +146,21 @@ func TestGetActorEventsRaw(t *testing.T) { minerAddr, err := address.NewIDAddress(uint64(rng.Int63())) req.NoError(err) + c := mkCid(t, "c") + tskey := types.NewTipSetKey(c) + tsKeyCid, err := tskey.Cid() + req.NoError(err) + testCases := []struct { - name string - filter *types.ActorEventFilter - currentHeight int64 - installMinHeight int64 - installMaxHeight int64 - installTipSetKey cid.Cid - installAddresses []address.Address - installKeysWithCodec map[string][]types.ActorEventBlock - installExcludeReverted bool - expectErr string + name string + filter *types.ActorEventFilter + currentHeight int64 + installMinHeight int64 + installMaxHeight int64 + installTipSetKey cid.Cid + installAddresses []address.Address + installKeysWithCodec map[string][]types.ActorEventBlock + expectErr string }{ { name: "nil filter", @@ -179,6 +183,15 @@ func TestGetActorEventsRaw(t *testing.T) { installMinHeight: 0, installMaxHeight: maxFilterHeightRange, }, + { + name: "query for tipset key", + filter: &types.ActorEventFilter{ + TipSetKey: &tskey, + }, + installTipSetKey: tsKeyCid, + installMinHeight: 0, + installMaxHeight: 0, + }, { name: "from, no to height", filter: &types.ActorEventFilter{ @@ -222,7 +235,7 @@ func TestGetActorEventsRaw(t *testing.T) { filter := newMockFilter(ctx, t, rng, collectedEvents) if tc.expectErr == "" { - efm.expectInstall(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, tc.installExcludeReverted, filter) + efm.expectFill(abi.ChainEpoch(tc.installMinHeight), abi.ChainEpoch(tc.installMaxHeight), tc.installTipSetKey, tc.installAddresses, tc.installKeysWithCodec, filter) } ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, tc.currentHeight)}) @@ -239,7 +252,6 @@ func TestGetActorEventsRaw(t *testing.T) { req.NoError(err) expectedEvents := collectedToActorEvents(collectedEvents) req.Equal(expectedEvents, gotEvents) - efm.requireRemoved(filter.ID()) } }) } @@ -288,7 +300,7 @@ func TestSubscribeActorEventsRaw(t *testing.T) { allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, finishHeight) historicalEvents := allEvents[0 : (currentHeight-filterStartHeight)*eventsPerEpoch] mockFilter := newMockFilter(ctx, t, rng, historicalEvents) - mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, false, mockFilter) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(tc.endEpoch), cid.Undef, nil, nil, mockFilter) ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) req.NoError(err) @@ -449,7 +461,7 @@ func TestSubscribeActorEventsRaw_OnlyHistorical(t *testing.T) { mockFilterManager := newMockEventFilterManager(t) allEvents := makeCollectedEvents(t, rng, filterStartHeight, eventsPerEpoch, currentHeight) mockFilter := newMockFilter(ctx, t, rng, allEvents) - mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, false, mockFilter) + mockFilterManager.expectInstall(abi.ChainEpoch(0), abi.ChainEpoch(currentHeight), cid.Undef, nil, nil, mockFilter) ts, err := types.NewTipSet([]*types.BlockHeader{newBlockHeader(minerAddr, currentHeight)}) req.NoError(err) @@ -619,46 +631,55 @@ type filterManagerExpectation struct { tipsetCid cid.Cid addresses []address.Address keysWithCodec map[string][]types.ActorEventBlock - excludeReverted bool returnFilter filter.EventFilter } type mockEventFilterManager struct { - t *testing.T - expectations []filterManagerExpectation - removed []types.FilterID - lk sync.Mutex + t *testing.T + installExpectations []filterManagerExpectation + fillExpectations []filterManagerExpectation + removed []types.FilterID + lk sync.Mutex } func newMockEventFilterManager(t *testing.T) *mockEventFilterManager { return &mockEventFilterManager{t: t} } -func (m *mockEventFilterManager) expectInstall( +func (m *mockEventFilterManager) expectFill( minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keysWithCodec map[string][]types.ActorEventBlock, - excludeReverted bool, returnFilter filter.EventFilter) { m.t.Helper() - m.expectations = append(m.expectations, filterManagerExpectation{ - minHeight: minHeight, - maxHeight: maxHeight, - tipsetCid: tipsetCid, - addresses: addresses, - keysWithCodec: keysWithCodec, - excludeReverted: excludeReverted, - returnFilter: returnFilter, + m.fillExpectations = append(m.fillExpectations, filterManagerExpectation{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + returnFilter: returnFilter, }) } -func (m *mockEventFilterManager) requireRemoved(id types.FilterID) { +func (m *mockEventFilterManager) expectInstall( + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + returnFilter filter.EventFilter) { + m.t.Helper() - m.lk.Lock() - defer m.lk.Unlock() - require.Contains(m.t, m.removed, id) + m.installExpectations = append(m.installExpectations, filterManagerExpectation{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keysWithCodec: keysWithCodec, + returnFilter: returnFilter, + }) } func (m *mockEventFilterManager) requireRemovedEventually(id types.FilterID, timeout time.Duration) { @@ -674,25 +695,43 @@ func (m *mockEventFilterManager) requireRemovedEventually(id types.FilterID, tim }, timeout, 10*time.Millisecond, "filter %x not removed", id) } +func (m *mockEventFilterManager) Fill( + _ context.Context, + minHeight, maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, +) (filter.EventFilter, error) { + + require.True(m.t, len(m.fillExpectations) > 0, "unexpected call to Fill") + exp := m.fillExpectations[0] + m.fillExpectations = m.fillExpectations[1:] + // check the expectation matches the call then return the attached filter + require.Equal(m.t, exp.minHeight, minHeight) + require.Equal(m.t, exp.maxHeight, maxHeight) + require.Equal(m.t, exp.tipsetCid, tipsetCid) + require.Equal(m.t, exp.addresses, addresses) + require.Equal(m.t, exp.keysWithCodec, keysWithCodec) + return exp.returnFilter, nil +} + func (m *mockEventFilterManager) Install( _ context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keysWithCodec map[string][]types.ActorEventBlock, - excludeReverted bool, ) (filter.EventFilter, error) { - require.True(m.t, len(m.expectations) > 0, "unexpected call to Install") - exp := m.expectations[0] - m.expectations = m.expectations[1:] + require.True(m.t, len(m.installExpectations) > 0, "unexpected call to Install") + exp := m.installExpectations[0] + m.installExpectations = m.installExpectations[1:] // check the expectation matches the call then return the attached filter require.Equal(m.t, exp.minHeight, minHeight) require.Equal(m.t, exp.maxHeight, maxHeight) require.Equal(m.t, exp.tipsetCid, tipsetCid) require.Equal(m.t, exp.addresses, addresses) require.Equal(m.t, exp.keysWithCodec, keysWithCodec) - require.Equal(m.t, exp.excludeReverted, excludeReverted) return exp.returnFilter, nil } diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index d9ad842f992..a3164c000ad 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1727,15 +1727,13 @@ func (e *EthEventHandler) ethGetEventsForFilter(ctx context.Context, filterSpec } } - // Create a temporary filter - f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, false) + // Fill a filter and collect events + f, err := e.EventFilterManager.Fill(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys) if err != nil { return nil, xerrors.Errorf("failed to install event filter: %w", err) } ces := f.TakeCollectedEvents(ctx) - _ = e.uninstallFilter(ctx, f) - return ces, nil } @@ -1957,7 +1955,7 @@ func (e *EthEventHandler) EthNewFilter(ctx context.Context, filterSpec *ethtypes return ethtypes.EthFilterID{}, err } - f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true) + f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys) if err != nil { return ethtypes.EthFilterID{}, xerrors.Errorf("failed to install event filter: %w", err) } @@ -2123,7 +2121,7 @@ func (e *EthEventHandler) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) } } - f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keysToKeysWithCodec(keys), true) + f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keysToKeysWithCodec(keys)) if err != nil { // clean up any previous filters added and stop the sub _, _ = e.EthUnsubscribe(ctx, sub.id)