Skip to content

Commit adf5444

Browse files
authored
Refactor policy processing in Policer (#3553)
2 parents 0ea1472 + 9a91db4 commit adf5444

File tree

19 files changed

+240
-508
lines changed

19 files changed

+240
-508
lines changed

cmd/neofs-node/object.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/nspcc-dev/neofs-node/pkg/services/object/split"
3232
"github.com/nspcc-dev/neofs-node/pkg/services/object/tombstone"
3333
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
34-
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
3534
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
3635
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
3736
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
@@ -197,24 +196,11 @@ func initObjectService(c *cfg) {
197196
c.shared.policer = policer.New(
198197
policer.WithLogger(c.log),
199198
policer.WithLocalStorage(ls),
200-
policer.WithContainerSource(c.cnrSrc),
201-
policer.WithPlacementBuilder(
202-
placement.NewNetworkMapSourceBuilder(c.netMapSource),
203-
),
204199
policer.WithRemoteHeader(
205200
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
206201
),
207-
policer.WithNetmapKeys(c),
208202
policer.WithHeadTimeout(c.appCfg.Policer.HeadTimeout),
209203
policer.WithReplicator(c.replicator),
210-
policer.WithRedundantCopyCallback(func(addr oid.Address) {
211-
err := ls.Delete(addr)
212-
if err != nil {
213-
c.log.Warn("could not inhume mark redundant copy as garbage",
214-
zap.Error(err),
215-
)
216-
}
217-
}),
218204
policer.WithMaxCapacity(c.appCfg.Policer.MaxWorkers),
219205
policer.WithPool(c.cfgObject.pool.replication),
220206
policer.WithNodeLoader(c),

pkg/local_object_storage/blobstor/internal/storagetest/iterate.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
2525
objects = append(objects[:delID], objects[delID+1:]...)
2626

2727
t.Run("normal handler", func(t *testing.T) {
28-
seen := make(map[string]objectDesc)
28+
seen := make(map[oid.Address]objectDesc)
2929

3030
var objHandler = func(addr oid.Address, data []byte) error {
31-
seen[addr.String()] = objectDesc{
31+
seen[addr] = objectDesc{
3232
addr: addr,
3333
raw: data,
3434
}
@@ -39,21 +39,21 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
3939
require.NoError(t, err)
4040
require.Equal(t, len(objects), len(seen))
4141
for i := range objects {
42-
d, ok := seen[objects[i].addr.String()]
42+
d, ok := seen[objects[i].addr]
4343
require.True(t, ok)
4444
require.Equal(t, objects[i].raw, d.raw)
4545
require.Equal(t, objects[i].addr, d.addr)
4646
}
4747
})
4848

4949
t.Run("addresses", func(t *testing.T) {
50-
seen := make(map[string]objectDesc)
50+
seen := make(map[oid.Address]objectDesc)
5151

5252
var addrHandler = func(addr oid.Address) error {
5353
data, err := s.GetBytes(addr)
5454
require.NoError(t, err)
5555

56-
seen[addr.String()] = objectDesc{
56+
seen[addr] = objectDesc{
5757
addr: addr,
5858
raw: data,
5959
}
@@ -64,20 +64,20 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
6464
require.NoError(t, err)
6565
require.Equal(t, len(objects), len(seen))
6666
for i := range objects {
67-
objDesc, ok := seen[objects[i].addr.String()]
67+
objDesc, ok := seen[objects[i].addr]
6868
require.True(t, ok)
6969
require.Equal(t, objects[i].raw, objDesc.raw)
7070
}
7171
})
7272

7373
t.Run("ignore errors doesn't work for logical errors", func(t *testing.T) {
74-
seen := make(map[string]objectDesc)
74+
seen := make(map[oid.Address]objectDesc)
7575

7676
var n int
7777
var logicErr = errors.New("logic error")
7878

7979
var objHandler = func(addr oid.Address, data []byte) error {
80-
seen[addr.String()] = objectDesc{
80+
seen[addr] = objectDesc{
8181
addr: addr,
8282
raw: data,
8383
}
@@ -94,7 +94,7 @@ func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) {
9494
require.ErrorIs(t, err, logicErr)
9595
require.Equal(t, len(objects)/2, len(seen))
9696
for i := range objects {
97-
d, ok := seen[objects[i].addr.String()]
97+
d, ok := seen[objects[i].addr]
9898
if ok {
9999
n--
100100
require.Equal(t, objects[i].raw, d.raw)

pkg/local_object_storage/engine/select.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.
2929
}
3030

3131
addrList := make([]oid.Address, 0)
32-
uniqueMap := make(map[string]struct{})
32+
uniqueMap := make(map[oid.Address]struct{})
3333

3434
for _, sh := range e.unsortedShards() {
3535
res, err := sh.Select(cnr, filters)
@@ -42,8 +42,8 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.
4242
}
4343

4444
for _, addr := range res { // save only unique values
45-
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
46-
uniqueMap[addr.EncodeToString()] = struct{}{}
45+
if _, ok := uniqueMap[addr]; !ok {
46+
uniqueMap[addr] = struct{}{}
4747
addrList = append(addrList, addr)
4848
}
4949
}

pkg/local_object_storage/metabase/containers_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ func TestDB_Containers(t *testing.T) {
2020

2121
const N = 10
2222

23-
cids := make(map[string]int, N)
23+
cids := make(map[cid.ID]int, N)
2424

2525
for range N {
2626
obj := generateObject(t)
2727

2828
cnr := obj.GetContainerID()
2929

30-
cids[cnr.EncodeToString()] = 0
30+
cids[cnr] = 0
3131

3232
err := putBig(db, obj)
3333
require.NoError(t, err)
@@ -37,11 +37,11 @@ func TestDB_Containers(t *testing.T) {
3737
require.NoError(t, err)
3838

3939
for _, cnr := range lst {
40-
i, ok := cids[cnr.EncodeToString()]
40+
i, ok := cids[cnr]
4141
require.True(t, ok)
4242
require.Equal(t, 0, i)
4343

44-
cids[cnr.EncodeToString()] = 1
44+
cids[cnr] = 1
4545
}
4646

4747
// require.Contains not working since cnrs is a ptr slice

pkg/local_object_storage/metabase/list_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
1010
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
1111
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
12+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1213
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
1314
"github.com/stretchr/testify/require"
1415
"go.etcd.io/bbolt"
@@ -163,22 +164,22 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
163164

164165
const total = 5
165166

166-
expected := make(map[string]int, total)
167+
expected := make(map[oid.Address]int, total)
167168

168169
// fill metabase with objects
169170
for range total {
170171
obj := generateObject(t)
171172
err := putBig(db, obj)
172173
require.NoError(t, err)
173-
expected[object.AddressOf(obj).EncodeToString()] = 0
174+
expected[object.AddressOf(obj)] = 0
174175
}
175176

176177
// get half of the objects
177178
got, cursor, err := metaListWithCursor(db, total/2, nil)
178179
require.NoError(t, err)
179180
for _, obj := range got {
180-
if _, ok := expected[obj.Address.EncodeToString()]; ok {
181-
expected[obj.Address.EncodeToString()]++
181+
if _, ok := expected[obj.Address]; ok {
182+
expected[obj.Address]++
182183
}
183184
}
184185

@@ -196,8 +197,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
196197
break
197198
}
198199
for _, obj := range got {
199-
if _, ok := expected[obj.Address.EncodeToString()]; ok {
200-
expected[obj.Address.EncodeToString()]++
200+
if _, ok := expected[obj.Address]; ok {
201+
expected[obj.Address]++
201202
}
202203
}
203204
}

pkg/local_object_storage/shard/control_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func TestResyncMetabase(t *testing.T) {
153153
const objNum = 10
154154
oldVersion := version.New(2, 17)
155155

156-
mObjs := make(map[string]objAddr)
156+
mObjs := make(map[oid.Address]objAddr)
157157
locked := make([]oid.ID, 1, 2)
158158
locked[0] = oidtest.ID()
159159
cnrLocked := cidtest.ID()
@@ -177,7 +177,7 @@ func TestResyncMetabase(t *testing.T) {
177177

178178
addr := object.AddressOf(&obj)
179179

180-
mObjs[addr.EncodeToString()] = objAddr{
180+
mObjs[addr] = objAddr{
181181
obj: &obj,
182182
addr: addr,
183183
}

pkg/local_object_storage/shard/list_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/nspcc-dev/neofs-node/pkg/core/object"
77
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
88
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
9+
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
910
"github.com/stretchr/testify/require"
1011
)
1112

@@ -31,7 +32,7 @@ func testShardList(t *testing.T, sh *shard.Shard) {
3132
const C = 10
3233
const N = 5
3334

34-
objs := make(map[string]int)
35+
objs := make(map[oid.Address]int)
3536

3637
for range C {
3738
cnr := cidtest.ID()
@@ -46,7 +47,7 @@ func testShardList(t *testing.T, sh *shard.Shard) {
4647
obj.SetParentID(idParent)
4748
obj.SetParent(parent)
4849

49-
objs[object.AddressOf(obj).EncodeToString()] = 0
50+
objs[object.AddressOf(obj)] = 0
5051

5152
err := sh.Put(obj, nil)
5253
require.NoError(t, err)
@@ -57,10 +58,10 @@ func testShardList(t *testing.T, sh *shard.Shard) {
5758
require.NoError(t, err)
5859

5960
for _, objID := range res {
60-
i, ok := objs[objID.EncodeToString()]
61+
i, ok := objs[objID]
6162
require.True(t, ok)
6263
require.Equal(t, 0, i)
6364

64-
objs[objID.EncodeToString()] = 1
65+
objs[objID] = 1
6566
}
6667
}

pkg/local_object_storage/shard/metrics_test.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
1111
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
1212
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
13+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1314
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
1415
"github.com/nspcc-dev/neofs-sdk-go/object"
1516
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@@ -19,7 +20,7 @@ import (
1920

2021
type metricsStore struct {
2122
objectCounters map[string]uint64
22-
containerSize map[string]int64
23+
containerSize map[cid.ID]int64
2324
payloadSize int64
2425
readOnly bool
2526
}
@@ -60,7 +61,11 @@ func (m *metricsStore) SetReadonly(r bool) {
6061
}
6162

6263
func (m metricsStore) AddToContainerSize(cnr string, size int64) {
63-
m.containerSize[cnr] += size
64+
c, err := cid.DecodeString(cnr)
65+
if err != nil {
66+
return
67+
}
68+
m.containerSize[c] += size
6469
}
6570

6671
func (m *metricsStore) AddToPayloadSize(size int64) {
@@ -96,11 +101,11 @@ func TestCounters(t *testing.T) {
96101

97102
var totalPayload int64
98103

99-
expectedSizes := make(map[string]int64)
104+
expectedSizes := make(map[cid.ID]int64)
100105
for i := range oo {
101106
cnr := oo[i].GetContainerID()
102107
oSize := int64(oo[i].PayloadSize())
103-
expectedSizes[cnr.EncodeToString()] += oSize
108+
expectedSizes[cnr] += oSize
104109
totalPayload += oSize
105110
}
106111

@@ -182,17 +187,17 @@ func TestInhumeContainerCounters(t *testing.T) {
182187

183188
require.Equal(t, mm.objectCounters[physical], total)
184189
require.Equal(t, mm.objectCounters[logical], uint64(objsC2))
185-
require.Empty(t, mm.containerSize[c1.EncodeToString()])
186-
require.Equal(t, mm.containerSize[c2.EncodeToString()], sizeC2)
190+
require.Empty(t, mm.containerSize[c1])
191+
require.Equal(t, mm.containerSize[c2], sizeC2)
187192
// payload size must remain unchanged after logical removal
188193
require.Equal(t, initialPayload, mm.payloadSize)
189194

190195
require.NoError(t, sh.InhumeContainer(c2))
191196

192197
require.Equal(t, mm.objectCounters[physical], total)
193198
require.Empty(t, mm.objectCounters[logical])
194-
require.Empty(t, mm.containerSize[c1.EncodeToString()])
195-
require.Empty(t, mm.containerSize[c2.EncodeToString()])
199+
require.Empty(t, mm.containerSize[c1])
200+
require.Empty(t, mm.containerSize[c2])
196201
// payload size still unchanged
197202
require.Equal(t, initialPayload, mm.payloadSize)
198203
}
@@ -260,7 +265,7 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
260265
"phy": 0,
261266
"logic": 0,
262267
},
263-
containerSize: make(map[string]int64),
268+
containerSize: make(map[cid.ID]int64),
264269
}
265270

266271
sh := shard.New(

pkg/local_object_storage/writecache/flush.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,17 @@ func (c *cache) flushScheduler() {
119119
}
120120
var sortedAddrs []addrSize
121121

122-
batchSet := make(map[string]struct{}, len(batch))
122+
batchSet := make(map[oid.Address]struct{}, len(batch))
123123
for _, bAddr := range batch {
124-
batchSet[bAddr.String()] = struct{}{}
124+
batchSet[bAddr] = struct{}{}
125125
}
126126

127127
addrs := c.objCounters.Map()
128128
for addr, size := range addrs {
129-
sAddr := addr.String()
130-
if _, exists := batchSet[sAddr]; exists {
129+
if _, exists := batchSet[addr]; exists {
131130
continue
132131
}
133-
if _, loaded := c.processingBigObjs.Load(sAddr); loaded {
132+
if _, loaded := c.processingBigObjs.Load(addr); loaded {
134133
continue
135134
}
136135
sortedAddrs = append(sortedAddrs, addrSize{addr, size})
@@ -162,7 +161,7 @@ func (c *cache) flushScheduler() {
162161
}
163162
break addrLoop
164163
case c.flushCh <- as.addr:
165-
c.processingBigObjs.Store(as.addr.String(), struct{}{})
164+
c.processingBigObjs.Store(as.addr, struct{}{})
166165
continue
167166
case <-c.closeCh:
168167
return
@@ -200,7 +199,7 @@ func (c *cache) flushWorker(id int) {
200199
err := c.flushSingle(addr, true)
201200
c.modeMtx.RUnlock()
202201

203-
c.processingBigObjs.Delete(addr.String())
202+
c.processingBigObjs.Delete(addr)
204203
if err != nil {
205204
select {
206205
case c.flushErrCh <- struct{}{}:

pkg/services/control/server/evacuate.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
1111
"github.com/nspcc-dev/neofs-node/pkg/services/control"
12-
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
1312
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
1413
"github.com/nspcc-dev/neofs-sdk-go/netmap"
1514
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
@@ -71,7 +70,7 @@ func (s *Server) replicate(addr oid.Address, obj *objectSDK.Object) error {
7170
return fmt.Errorf("can't build a list of container nodes: %w", err)
7271
}
7372

74-
nodes := placement.FlattenNodes(ns)
73+
nodes := slices.Concat(ns...)
7574
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
7675
nodes = slices.DeleteFunc(nodes, func(info netmap.NodeInfo) bool {
7776
return bytes.Equal(info.PublicKey(), bs)

0 commit comments

Comments
 (0)