Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ func AllZeros(s []byte) bool {
}
return true
}

// RepeatElement returns slice of n shallow copies of e.
func RepeatElement[E any, S []E](n int, e E) S {
s := make(S, n)
for i := range s {
s[i] = e
}
return s
}
36 changes: 36 additions & 0 deletions internal/slices/slices_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package slices_test

import (
"errors"
"slices"
"strconv"
"testing"

islices "github.com/nspcc-dev/neofs-node/internal/slices"
Expand Down Expand Up @@ -57,3 +59,37 @@ func TestAllZeros(t *testing.T) {
require.False(t, islices.AllZeros(sc), i)
}
}

func TestRepeatElements(t *testing.T) {
tcs := []struct {
name string
e any
}{
{name: "int", e: 1},
{name: "bool", e: true},
{name: "error", e: errors.New("some error")},
{name: "nil", e: nil},
{name: "struct", e: struct {
i int
s string
}{
i: 1,
s: "foo",
}},
{name: "slice", e: []string{"foo", "bar"}},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
for _, n := range []int{0, 1, 10} {
t.Run(strconv.Itoa(n), func(t *testing.T) {
s := islices.RepeatElement(n, tc.e)
require.Len(t, s, n)
for i := range s {
require.Equal(t, tc.e, s[i])
}
})
}
})
}
}
30 changes: 29 additions & 1 deletion internal/testutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testutil
import (
"encoding/json"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -28,6 +29,7 @@ type LogEntry struct {
// LogBuffer is a memory buffer for [zap.Logger] entries.
type LogBuffer struct {
t testing.TB
l sync.Locker
b zaptest.Buffer
}

Expand All @@ -38,6 +40,7 @@ func NewBufferedLogger(t testing.TB, minLevel zapcore.Level) (*zap.Logger, *LogB
// TODO: https://github.com/nspcc-dev/neofs-node/issues/3313
var lb LogBuffer
lb.t = t
lb.l = new(sync.Mutex)

encCfg := zap.NewProductionEncoderConfig()
encCfg.LevelKey = logLevelKey
Expand All @@ -46,7 +49,10 @@ func NewBufferedLogger(t testing.TB, minLevel zapcore.Level) (*zap.Logger, *LogB

zc := zapcore.NewCore(
zapcore.NewJSONEncoder(encCfg),
zap.CombineWriteSyncers(&lb.b),
zap.CombineWriteSyncers(lockedWriteSyncer{
l: lb.l,
w: &lb.b,
}),
minLevel,
)

Expand Down Expand Up @@ -78,7 +84,9 @@ func (x *LogBuffer) AssertContains(e LogEntry) {
}

func (x *LogBuffer) collectEntries() []LogEntry {
x.l.Lock()
lines := x.b.Lines()
x.l.Unlock()
res := make([]LogEntry, len(lines))

var err error
Expand Down Expand Up @@ -109,3 +117,23 @@ func (x *LogBuffer) collectEntries() []LogEntry {

return res
}

// [zapcore.Lock] implementation analogue.
type lockedWriteSyncer struct {
l sync.Locker
w zapcore.WriteSyncer
}

func (x lockedWriteSyncer) Write(p []byte) (int, error) {
x.l.Lock()
n, err := x.w.Write(p)
x.l.Unlock()
return n, err
}

func (x lockedWriteSyncer) Sync() error {
x.l.Lock()
err := x.w.Sync()
x.l.Unlock()
return err
}
16 changes: 16 additions & 0 deletions internal/testutil/neofs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package testutil

import (
"strconv"

"github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// Nodes returns n [netmap.NodeInfo] elements with unique public keys.
func Nodes(n int) []netmap.NodeInfo {
nodes := make([]netmap.NodeInfo, n)
for i := range nodes {
nodes[i].SetPublicKey([]byte("public_key_" + strconv.Itoa(i)))
}
return nodes
}
22 changes: 22 additions & 0 deletions internal/testutil/neofs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package testutil_test

import (
"testing"

"github.com/nspcc-dev/neofs-node/internal/testutil"
"github.com/stretchr/testify/require"
)

func TestNodes(t *testing.T) {
t.Run("empty", func(t *testing.T) {
require.Empty(t, testutil.Nodes(0))
})

s := testutil.Nodes(10)

m := make(map[string]struct{})
for i := range s {
m[string(s[i].PublicKey())] = struct{}{}
}
require.Len(t, m, len(s))
}
5 changes: 1 addition & 4 deletions pkg/services/policer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/nspcc-dev/neofs-node/pkg/core/container"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand Down Expand Up @@ -193,8 +192,6 @@ type processPlacementContext struct {
}

func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(plc.object.Address)

p.cfg.RLock()
headTimeout := p.headTimeout
p.cfg.RUnlock()
Expand Down Expand Up @@ -261,7 +258,7 @@ func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext

callCtx, cancel := context.WithTimeout(ctx, headTimeout)

_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))
_, err := p.apiConns.headObject(callCtx, nodes[i], plc.object.Address)

cancel()

Expand Down
40 changes: 37 additions & 3 deletions pkg/services/policer/policer.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package policer

import (
"context"
"sync"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
Expand All @@ -21,6 +25,22 @@ type nodeLoader interface {
ObjectServiceLoad() float64
}

// interface of [replicator.Replicator] used by [Policer] for overriding in tests.
type replicatorIface interface {
HandleTask(context.Context, replicator.Task, replicator.TaskResult)
}

// interface of [engine.StorageEngine] used by [Policer] for overriding in tests.
type localStorage interface {
ListWithCursor(uint32, *engine.Cursor) ([]objectcore.AddressWithType, *engine.Cursor, error)
Delete(oid.Address) error
}

// interface of [headsvc.RemoteHeader] used by [Policer] for overriding in tests.
type apiConnections interface {
headObject(context.Context, netmapsdk.NodeInfo, oid.Address) (object.Object, error)
}

type objectsInWork struct {
m sync.RWMutex
objs map[oid.Address]struct{}
Expand Down Expand Up @@ -85,11 +105,11 @@ type cfg struct {

placementBuilder placement.Builder

remoteHeader *headsvc.RemoteHeader
apiConns apiConnections

netmapKeys netmap.AnnouncedKeys

replicator *replicator.Replicator
replicator replicatorIface

cbRedundantCopy RedundantCopyCallback

Expand Down Expand Up @@ -164,10 +184,24 @@ func WithPlacementBuilder(v placement.Builder) Option {
}
}

type remoteHeader headsvc.RemoteHeader

func (x *remoteHeader) headObject(ctx context.Context, node netmapsdk.NodeInfo, addr oid.Address) (object.Object, error) {
var p headsvc.RemoteHeadPrm
p.WithNodeInfo(node)
p.WithObjectAddress(addr)
hdr, err := (*headsvc.RemoteHeader)(x).Head(ctx, &p)
if err != nil {
return object.Object{}, err
}

return *hdr, nil
}

// WithRemoteHeader returns option to set object header receiver of Policer.
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
return func(c *cfg) {
c.remoteHeader = v
c.apiConns = (*remoteHeader)(v)
}
}

Expand Down
Loading
Loading