Skip to content

Commit be9c6f4

Browse files
committed
sn/container: Listen and handle new creation and removal notifications
With nspcc-dev/neofs-contract#477, Container contract throws new `Created` and `Removed` notifications. This makes SN to listen and handle them along with the old ones. Additionally, log messages are improved and moved to info severity level. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 3760f7d commit be9c6f4

File tree

7 files changed

+197
-51
lines changed

7 files changed

+197
-51
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Changelog for NeoFS Node
77
- Expose metrics about write-cache (#3293)
88
- IR supports recently added Container contract's `create`, `remove` and `putEACL` methods now (#3282)
99
- SN attempts to make notary requests calling Container contract's `create`, `remove` and `putEACL` methods (#3282)
10+
- SN listens and handles recently added Container contract's `Created` and `Removed` notifications (#3282)
1011

1112
### Fixed
1213
- Bearer token signed not by its issuer is no longer passed (#3216)

cmd/neofs-node/container.go

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -51,43 +51,45 @@ func initContainerService(c *cfg) {
5151
}
5252

5353
if c.containerCache != nil {
54-
subscribeToContainerCreation(c, func(id cid.ID) {
55-
// read owner of the created container in order to update the reading cache.
56-
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
57-
// but don't forget about the profit of reading the new container and caching it:
58-
// creation success are most commonly tracked by polling GET op.
59-
cnr, err := c.containerCache.Get(id)
60-
if err == nil {
61-
c.containerListCache.update(cnr.Value.Owner(), id, true)
62-
} else {
63-
// unlike removal, we expect successful receive of the container
64-
// after successful creation, so logging can be useful
65-
c.log.Error("read newly created container after the notification",
66-
zap.Stringer("id", id),
67-
zap.Error(err),
68-
)
54+
subscribeToContainerCreation(c, func(id cid.ID, owner user.ID) {
55+
if owner.IsZero() {
56+
c.log.Debug("container removal event's receipt", zap.Stringer("id", id))
57+
// read owner of the created container in order to update the reading cache.
58+
cnr, err := c.containerCache.Get(id)
59+
if err == nil {
60+
c.containerListCache.update(cnr.Value.Owner(), id, true)
61+
} else {
62+
// unlike removal, we expect successful receive of the container
63+
// after successful creation, so logging can be useful
64+
c.log.Error("read newly created container after the notification", zap.Stringer("id", id), zap.Error(err))
65+
}
66+
return
6967
}
70-
71-
c.log.Debug("container creation event's receipt",
72-
zap.Stringer("id", id),
73-
)
68+
c.log.Info("caught container creation, updating cache...", zap.Stringer("id", id), zap.Stringer("owner", owner))
69+
c.containerListCache.update(owner, id, true)
7470
})
7571

76-
subscribeToContainerRemoval(c, func(id cid.ID) {
77-
// read owner of the removed container in order to update the listing cache.
78-
// It's strange to read already removed container, but we can successfully hit
79-
// the cache.
80-
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
81-
cnr, err := c.containerCache.Get(id)
82-
if err == nil {
83-
c.containerListCache.update(cnr.Value.Owner(), id, false)
72+
subscribeToContainerRemoval(c, func(id cid.ID, owner user.ID) {
73+
if owner.IsZero() {
74+
c.log.Debug("container removal event's receipt", zap.Stringer("id", id))
75+
// read owner of the removed container in order to update the listing cache.
76+
// It's strange to read already removed container, but we can successfully hit
77+
// the cache.
78+
cnr, err := c.containerCache.Get(id)
79+
if err == nil {
80+
c.containerListCache.update(cnr.Value.Owner(), id, false)
81+
}
82+
c.containerCache.handleRemoval(id)
83+
return
8484
}
85+
c.log.Info("caught container removal, updating cache...", zap.Stringer("id", id),
86+
zap.Stringer("owner", owner))
8587

88+
c.containerListCache.update(owner, id, false)
8689
c.containerCache.handleRemoval(id)
8790

88-
c.log.Debug("container removal event's receipt",
89-
zap.Stringer("id", id),
90-
)
91+
c.log.Info("successfully updated cache of the removed container",
92+
zap.Stringer("id", id))
9193
})
9294
}
9395

@@ -212,21 +214,33 @@ func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationP
212214

213215
// subscribes to successful container creation. Provided handler is called asynchronously
214216
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
215-
// similar functions.
216-
func subscribeToContainerCreation(c *cfg, h func(cid.ID)) {
217+
// similar functions. Owner may be zero.
218+
func subscribeToContainerCreation(c *cfg, h func(id cid.ID, owner user.ID)) {
217219
const eventNameContainerCreated = "PutSuccess"
218220
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
219221
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, func(e event.Event) {
220-
h(e.(containerEvent.PutSuccess).ID)
222+
h(e.(containerEvent.PutSuccess).ID, user.ID{})
223+
})
224+
const eventNameContainerCreatedV2 = "Created"
225+
registerEventParserOnceContainer(c, eventNameContainerCreatedV2, containerEvent.RestoreCreated)
226+
addContainerAsyncNotificationHandler(c, eventNameContainerCreatedV2, func(e event.Event) {
227+
created := e.(containerEvent.Created)
228+
h(created.ID, created.Owner)
221229
})
222230
}
223231

224-
// like subscribeToContainerCreation but for removal.
225-
func subscribeToContainerRemoval(c *cfg, h func(cid.ID)) {
232+
// like subscribeToContainerCreation but for removal. Owner may be zero.
233+
func subscribeToContainerRemoval(c *cfg, h func(id cid.ID, owner user.ID)) {
226234
const eventNameContainerRemoved = "DeleteSuccess"
227235
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
228236
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, func(e event.Event) {
229-
h(e.(containerEvent.DeleteSuccess).ID)
237+
h(e.(containerEvent.DeleteSuccess).ID, user.ID{})
238+
})
239+
const eventNameContainerRemovedV2 = "Removed"
240+
registerEventParserOnceContainer(c, eventNameContainerRemovedV2, containerEvent.RestoreRemoved)
241+
addContainerAsyncNotificationHandler(c, eventNameContainerRemovedV2, func(e event.Event) {
242+
removed := e.(containerEvent.Removed)
243+
h(removed.ID, removed.Owner)
230244
})
231245
}
232246

cmd/neofs-node/storage.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/nspcc-dev/neofs-node/pkg/util"
1818
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1919
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
20+
"github.com/nspcc-dev/neofs-sdk-go/user"
2021
"github.com/panjf2000/ants/v2"
2122
"go.etcd.io/bbolt"
2223
"go.uber.org/zap"
@@ -37,12 +38,26 @@ func initLocalStorage(c *cfg) {
3738
ls.HandleNewEpoch(ev.(netmap.NewEpoch).EpochNumber())
3839
})
3940

40-
subscribeToContainerRemoval(c, func(id cid.ID) {
41-
err := ls.InhumeContainer(id)
42-
if err != nil {
43-
c.log.Warn("inhuming container after a chain event",
44-
zap.Stringer("cID", id), zap.Error(err))
41+
subscribeToContainerRemoval(c, func(id cid.ID, owner user.ID) {
42+
if owner.IsZero() {
43+
err := ls.InhumeContainer(id)
44+
if err != nil {
45+
c.log.Warn("inhuming container after a chain event",
46+
zap.Stringer("cID", id), zap.Error(err))
47+
}
48+
return
4549
}
50+
c.log.Info("caught container removal, marking its local objects for GC...",
51+
zap.Stringer("container", id), zap.Stringer("owner", owner))
52+
53+
if err := ls.InhumeContainer(id); err != nil {
54+
c.log.Warn("failed to mark local objects from the removed container for GC",
55+
zap.Stringer("container", id), zap.Error(err))
56+
return
57+
}
58+
59+
c.log.Info("successfully marked local objects from the removed container for GC",
60+
zap.Stringer("container", id))
4661
})
4762

4863
// allocate memory for the service;
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package container
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/nspcc-dev/neo-go/pkg/core/state"
7+
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
8+
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
9+
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
10+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
11+
"github.com/nspcc-dev/neofs-sdk-go/proto/refs"
12+
"github.com/nspcc-dev/neofs-sdk-go/user"
13+
)
14+
15+
func getItemsFromNotification(notifEvent *state.ContainedNotificationEvent, expectedNum int) ([]stackitem.Item, error) {
16+
items := notifEvent.Item.Value().([]stackitem.Item)
17+
if len(items) != expectedNum {
18+
return nil, fmt.Errorf("wrong/unsupported item num %d instead of %d", len(items), expectedNum)
19+
}
20+
return items, nil
21+
}
22+
23+
func restoreItemValue[T any](items []stackitem.Item, i int, desc string, typ stackitem.Type, f func(stackitem.Item) (T, error)) (v T, err error) {
24+
v, err = f(items[i])
25+
if err != nil {
26+
return v, fmt.Errorf("item#%d (%s, %s): %w", i, typ, desc, err)
27+
}
28+
return v, nil
29+
}
30+
31+
// Created is a container creation event thrown by Container contract.
32+
type Created struct {
33+
event.Event
34+
ID cid.ID
35+
Owner user.ID
36+
}
37+
38+
// RestoreCreated restores [Created] event from the notification one.
39+
func RestoreCreated(notifEvent *state.ContainedNotificationEvent) (event.Event, error) {
40+
items, err := getItemsFromNotification(notifEvent, 2)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
id, err := restoreItemValue(items, 0, "ID", stackitem.ByteArrayT, client.BytesFromStackItem)
46+
if err != nil {
47+
return nil, err
48+
}
49+
owner, err := restoreItemValue(items, 1, "owner", stackitem.ByteArrayT, client.BytesFromStackItem)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
var res Created
55+
56+
if err = res.ID.Decode(id); err != nil {
57+
return nil, fmt.Errorf("decode ID item: %w", err)
58+
}
59+
// TODO: replace message decoding after https://github.com/nspcc-dev/neofs-sdk-go/issues/669
60+
if err = res.Owner.FromProtoMessage(&refs.OwnerID{Value: owner}); err != nil {
61+
return nil, fmt.Errorf("decode owner item: %w", err)
62+
}
63+
64+
return res, nil
65+
}
66+
67+
// Removed is a container removal event thrown by Container contract.
68+
type Removed struct {
69+
event.Event
70+
ID cid.ID
71+
Owner user.ID
72+
}
73+
74+
// RestoreRemoved restores [Removed] event from the notification one.
75+
func RestoreRemoved(notifEvent *state.ContainedNotificationEvent) (event.Event, error) {
76+
items, err := getItemsFromNotification(notifEvent, 2)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
id, err := restoreItemValue(items, 0, "ID", stackitem.ByteArrayT, client.BytesFromStackItem)
82+
if err != nil {
83+
return nil, err
84+
}
85+
owner, err := restoreItemValue(items, 1, "owner", stackitem.ByteArrayT, client.BytesFromStackItem)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
var res Removed
91+
92+
if err = res.ID.Decode(id); err != nil {
93+
return nil, fmt.Errorf("decode ID item: %w", err)
94+
}
95+
// TODO: replace message decoding after https://github.com/nspcc-dev/neofs-sdk-go/issues/669
96+
if err = res.Owner.FromProtoMessage(&refs.OwnerID{Value: owner}); err != nil {
97+
return nil, fmt.Errorf("decode owner item: %w", err)
98+
}
99+
100+
return res, nil
101+
}

pkg/services/meta/blocks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
6060
m.l.Debug("received object notification", zap.Stringer("address", oid.NewAddress(ev.cID, ev.oID)))
6161

6262
evsByStorage[st] = append(evsByStorage[st], ev)
63-
case cnrDeleteName:
63+
case cnrDeleteName, cnrRmName:
6464
ev, err := parseCnrNotification(n)
6565
if err != nil {
6666
l.Error("invalid container removal notification received", zap.Error(err))
@@ -82,7 +82,7 @@ func (m *Meta) handleBlock(ctx context.Context, b *block.Header) error {
8282
}
8383

8484
l.Debug("deleted container", zap.Stringer("cID", ev.cID))
85-
case cnrPutName:
85+
case cnrPutName, cnrCrtName:
8686
ev, err := parseCnrNotification(n)
8787
if err != nil {
8888
l.Error("invalid container notification received", zap.Error(err))

pkg/services/meta/meta.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type Meta struct {
8080
ws wsClient
8181
blockSubID string
8282
cnrSubID string
83+
cnrCrtSubID string
8384
bCh chan *block.Header
8485
cnrPutEv chan *state.ContainedNotificationEvent
8586
epochEv chan *state.ContainedNotificationEvent
@@ -262,7 +263,7 @@ func (m *Meta) Run(ctx context.Context) error {
262263
return fmt.Errorf("block subscription: %w", err)
263264
}
264265
} else {
265-
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
266+
err = m.subscribeForNewContainers()
266267
if err != nil {
267268
return fmt.Errorf("new container subscription: %w", err)
268269
}

pkg/services/meta/notifications.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package meta
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"math/big"
78
"slices"
@@ -22,7 +23,9 @@ import (
2223
const (
2324
objPutEvName = "ObjectPut"
2425
cnrDeleteName = "DeleteSuccess"
26+
cnrRmName = "Removed"
2527
cnrPutName = "PutSuccess"
28+
cnrCrtName = "Created"
2629
newEpochName = "NewEpoch"
2730
)
2831

@@ -49,20 +52,31 @@ func (m *Meta) unsubscribeFromBlocks() {
4952
}
5053

5154
// subscribeForNewContainers requires [Meta.cliM] to be taken.
52-
func (m *Meta) subscribeForNewContainers(ch chan<- *state.ContainedNotificationEvent) (string, error) {
55+
func (m *Meta) subscribeForNewContainers() error {
5356
m.l.Debug("subscribing for containers")
5457

5558
cnrPutEv := cnrPutName
56-
return m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrPutEv}, ch)
59+
var err1 error
60+
m.cnrSubID, err1 = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrPutEv}, m.cnrPutEv)
61+
62+
cnrCrtEv := cnrCrtName
63+
var err2 error
64+
m.cnrCrtSubID, err2 = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrCrtEv}, m.cnrPutEv)
65+
66+
return errors.Join(err1, err2)
5767
}
5868

5969
// unsubscribeFromNewContainers requires [Meta.cliM] to be taken.
6070
func (m *Meta) unsubscribeFromNewContainers() {
6171
m.l.Debug("unsubscribing from containers")
6272

73+
if err := m.ws.Unsubscribe(m.cnrCrtSubID); err != nil {
74+
m.l.Error("could not unsubscribe from containers, ignore", zap.String("event", cnrCrtName), zap.String("sub", m.cnrCrtSubID), zap.Error(err))
75+
}
76+
6377
err := m.ws.Unsubscribe(m.cnrSubID)
6478
if err != nil {
65-
m.l.Error("could not unsubscribe from containers", zap.Error(err))
79+
m.l.Error("could not unsubscribe from containers", zap.String("event", cnrPutName), zap.String("sub", m.cnrSubID), zap.Error(err))
6680
return
6781
}
6882

@@ -190,7 +204,7 @@ func (m *Meta) reconnect(ctx context.Context) error {
190204
}
191205
} else {
192206
m.cnrPutEv = make(chan *state.ContainedNotificationEvent, notificationBuffSize)
193-
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
207+
err = m.subscribeForNewContainers()
194208
if err != nil {
195209
m.stM.RUnlock()
196210
return fmt.Errorf("subscription for containers: %w", err)
@@ -433,7 +447,7 @@ func parseCnrNotification(ev state.ContainedNotificationEvent) (cnrEvent, error)
433447
if len(arr) != expectedNotificationArgs {
434448
return res, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs)
435449
}
436-
case cnrPutName:
450+
case cnrPutName, cnrCrtName, cnrRmName:
437451
const expectedNotificationArgs = 2
438452
if len(arr) != expectedNotificationArgs {
439453
return res, fmt.Errorf("unexpected number of items on stack: %d, expected: %d", len(arr), expectedNotificationArgs)
@@ -470,7 +484,7 @@ func (m *Meta) dropContainer(cID cid.ID) error {
470484
if len(m.storages) == 0 {
471485
m.cliM.Lock()
472486
m.unsubscribeFromBlocks()
473-
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
487+
err = m.subscribeForNewContainers()
474488
m.cliM.Unlock()
475489
if err != nil {
476490
return fmt.Errorf("subscribing for new containers: %w", err)
@@ -584,7 +598,7 @@ func (m *Meta) handleEpochNotification(e uint64) error {
584598
}
585599

586600
if m.cnrSubID == "" {
587-
m.cnrSubID, err = m.subscribeForNewContainers(m.cnrPutEv)
601+
err = m.subscribeForNewContainers()
588602
if err != nil {
589603
m.cliM.Unlock()
590604
return fmt.Errorf("containers subscription: %w", err)

0 commit comments

Comments
 (0)