From 1793dbc009cf5121707c5009ea28fdb71a62c316 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 10 Jul 2025 15:58:31 +0300 Subject: [PATCH 1/5] sn/object: Drop unused parameter of PUT server Unused since 5076fe224e3cc7a02cbf09fb925bcaa6efbb4054. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 1 - pkg/services/object/put/service.go | 8 -------- 2 files changed, 9 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b9b9691913..f1e9ab4c0c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -256,7 +256,6 @@ func initObjectService(c *cfg) { putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), putsvc.WithObjectStorage(storageEngine{engine: ls}), putsvc.WithContainerSource(c.cnrSrc), - putsvc.WithNetworkMapSource(c.netMapSource), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithRemoteWorkerPool(c.cfgObject.pool.putRemote), putsvc.WithLogger(c.log), diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 2a9a073890..a2c9b4dad0 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -90,8 +90,6 @@ type cfg struct { cnrSrc container.Source - netMapSrc netmap.Source - remotePool util.WorkerPool fmtValidator *object.FormatValidator @@ -169,12 +167,6 @@ func WithContainerSource(v container.Source) Option { } } -func WithNetworkMapSource(v netmap.Source) Option { - return func(c *cfg) { - c.netMapSrc = v - } -} - func WithRemoteWorkerPool(remote util.WorkerPool) Option { return func(c *cfg) { c.remotePool = remote From 9cd9e56a8d404195048f8289225f2a84859217ae Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 15 Jul 2025 10:21:29 +0300 Subject: [PATCH 2/5] internal/crypto: Fix proto version in test bearer token structure Current version changes, so tests will break on upgrade otherwise. Signed-off-by: Leonard Lyubich --- internal/crypto/tokens_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/crypto/tokens_test.go b/internal/crypto/tokens_test.go index 2db4afee4d..57607318b9 100644 --- a/internal/crypto/tokens_test.go +++ b/internal/crypto/tokens_test.go @@ -18,6 +18,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/stretchr/testify/require" ) @@ -389,8 +390,11 @@ func getUnsignedNoIssuerBearerToken() bearer.Token { ), } + eACL := eacl.NewTableForContainer(cnr, rs) + eACL.SetVersion(version.New(2, 16)) + var token bearer.Token - token.SetEACLTable(eacl.NewTableForContainer(cnr, rs)) + token.SetEACLTable(eACL) token.SetIat(943083305) token.SetNbf(1362292619) token.SetExp(1922557325) From bcb7d66b90430f3e9d3982822a9adb3eb85e18a7 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 15 Jul 2025 10:41:36 +0300 Subject: [PATCH 3/5] metabase: avoid using storage group type in tests Finishes 50e8a39af63fb64d987a33c935d4ea7ca3b67627. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/version_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index e079a63679..bbf116f43b 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -818,19 +818,19 @@ func TestMigrate3to4(t *testing.T) { {name: "empty containers only", m: map[object.Type][]uint{ object.TypeRegular: make([]uint, 3), object.TypeTombstone: make([]uint, 5), - object.TypeStorageGroup: make([]uint, 10), + object.TypeStorageGroup: make([]uint, 10), //nolint:staticcheck // storage groups are deprecated, but this is a migration test. object.TypeLock: make([]uint, 1), object.TypeLink: make([]uint, 100), }}, {name: "some containers are empty", m: map[object.Type][]uint{ object.TypeRegular: {1, 7, 0, 20}, object.TypeTombstone: {0, 15, 0}, - object.TypeStorageGroup: make([]uint, 10), + object.TypeStorageGroup: make([]uint, 10), //nolint:staticcheck // storage groups are deprecated, but this is a migration test. }}, {name: "some containers are empty", m: map[object.Type][]uint{ object.TypeRegular: {1, 7, 0, 20}, object.TypeTombstone: {0, 15, 0}, - object.TypeStorageGroup: make([]uint, 10), + object.TypeStorageGroup: make([]uint, 10), //nolint:staticcheck // storage groups are deprecated, but this is a migration test. }}, {name: "one big container", m: map[object.Type][]uint{ object.TypeRegular: {3999}, @@ -838,19 +838,19 @@ func TestMigrate3to4(t *testing.T) { {name: "big counts", m: map[object.Type][]uint{ object.TypeRegular: {200, 700, 600}, object.TypeTombstone: {20, 30}, - object.TypeStorageGroup: {10, 0, 20, 0, 30, 0, 40, 0}, + object.TypeStorageGroup: {10, 0, 20, 0, 30, 0, 40, 0}, //nolint:staticcheck // storage groups are deprecated, but this is a migration test. object.TypeLock: {1, 2, 3, 4, 5, 6, 7, 8, 9}, object.TypeLink: {99}, }}, {name: "big counts aligned", m: map[object.Type][]uint{ object.TypeRegular: {1000}, object.TypeTombstone: {500, 500, 500}, - object.TypeStorageGroup: {200, 200, 200, 200, 200}, + object.TypeStorageGroup: {200, 200, 200, 200, 200}, //nolint:staticcheck // storage groups are deprecated, but this is a migration test. }}, {name: "big counts not aligned", m: map[object.Type][]uint{ object.TypeRegular: {999, 999}, object.TypeTombstone: {999}, - object.TypeStorageGroup: {999, 999, 999}, + object.TypeStorageGroup: {999, 999, 999}, //nolint:staticcheck // storage groups are deprecated, but this is a migration test. }}, } { t.Run(tc.name, func(t *testing.T) { testMigrationV3To4(t, tc.m) }) @@ -895,7 +895,7 @@ func testMigrationV3To4(t *testing.T, mAll map[object.Type][]uint) { prefix = 0x06 case object.TypeTombstone: prefix = 0x09 - case object.TypeStorageGroup: + case object.TypeStorageGroup: //nolint:staticcheck // storage groups are deprecated, but this is a migration test. prefix = 0x08 case object.TypeLock: prefix = 0x07 From 0e019f8a26bb93f8dd9814699cd8183bbbc13e24 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 15 Jul 2025 10:30:52 +0300 Subject: [PATCH 4/5] go.mod: Upgrade `github.com/nspcc-dev/neofs-sdk-go` to the latest `version.Current()` is 2.18 in new revision. This affects server-side object HEAD/GET request handling (see fc2728c6872a469e2fe77f2b7b94091333d1201d). Also, this fixes LINK object binary put into SN local storage with duplicated payload field. Deprecated stuff will be replaced in future patches. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 3 ++- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3c0d197ab..e374fa205a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Changelog for NeoFS Node - Metadata's signatures for extremely big objects and/or short epochs (#3391) - No object addresses in object inhuming/deleting error logs (#3450) - Flush test timing issue with object counters update (#3455) +- Incorrect binary for LINK object put into local storage (#3461) ### Changed - SN caches up to 1000 bearer token verification results until the next epoch (#3369) @@ -43,7 +44,7 @@ Changelog for NeoFS Node - Support for Inner Ring candidate fee setting (#3459) ### Updated -- `github.com/nspcc-dev/neofs-sdk-go` dependency to `v1.0.0-rc.13.0.20250623124459-a9cfab652dc0` (#3406) +- `github.com/nspcc-dev/neofs-sdk-go` dependency to `v1.0.0-rc.13.0.20250715070617-c7038b450691` (#3406, #3461) - NeoGo dependency to v0.110.1-0.20250709130255-4f05526f09f6 (#3456) ### Updating from v0.47.1 diff --git a/go.mod b/go.mod index 56fa2c8641..71075f7490 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/nspcc-dev/neo-go v0.110.1-0.20250709130255-4f05526f09f6 github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea github.com/nspcc-dev/neofs-contract v0.23.0 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.13.0.20250627083806-673e1845df8f + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.13.0.20250715070617-c7038b450691 github.com/nspcc-dev/tzhash v1.8.2 github.com/olekukonko/tablewriter v0.0.5 github.com/panjf2000/ants/v2 v2.9.0 diff --git a/go.sum b/go.sum index 2a0bd2cb94..47aada56cf 100644 --- a/go.sum +++ b/go.sum @@ -203,8 +203,8 @@ github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y= github.com/nspcc-dev/neofs-contract v0.23.0 h1:F5ciU0wPqSbycPY8qOtb4PvgnSZBNQ5Jp9tdeVSKu4o= github.com/nspcc-dev/neofs-contract v0.23.0/go.mod h1:it6Su92UvEFQDsMOfDIXapLu0j5TQSOvkS2YdUlPdgo= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.13.0.20250627083806-673e1845df8f h1:jDqFzFwhB4sfGncaXq/NAU44UePi8DKnw4Es1OsnYMI= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.13.0.20250627083806-673e1845df8f/go.mod h1:j/NUu5iOGFkOVYM42XoC1X9DZD0/y89Pws++w5vxtQk= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.13.0.20250715070617-c7038b450691 h1:h3H+LymyO8J1gGsTLln6ID8AOVJ9IggJ5FTYNUjGPTA= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.13.0.20250715070617-c7038b450691/go.mod h1:j/NUu5iOGFkOVYM42XoC1X9DZD0/y89Pws++w5vxtQk= github.com/nspcc-dev/rfc6979 v0.2.3 h1:QNVykGZ3XjFwM/88rGfV3oj4rKNBy+nYI6jM7q19hDI= github.com/nspcc-dev/rfc6979 v0.2.3/go.mod h1:q3sCL1Ed7homjqYK8KmFSzEmm+7Ngyo7PePbZanhaDE= github.com/nspcc-dev/tzhash v1.8.2 h1:ebRCbPoEuoqrhC6sSZmrT/jI3h1SzCWakxxV6gp5QAg= From 5352c5a27ecdf06aeeda84b4a9fc0da0625f6b2a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 14 Jul 2025 16:57:33 +0300 Subject: [PATCH 5/5] sn/object: Test data slicing and chunk placement in PUT server Signed-off-by: Leonard Lyubich --- pkg/services/object/put/service.go | 7 +- pkg/services/object/put/service_test.go | 639 ++++++++++++++++++++++++ 2 files changed, 645 insertions(+), 1 deletion(-) create mode 100644 pkg/services/object/put/service_test.go diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index a2c9b4dad0..b556a579ff 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -123,7 +123,12 @@ func NewService(transport Transport, neoFSNet NeoFSNetwork, m *meta.Meta, opts . opts[i](c) } - c.fmtValidator = object.NewFormatValidator(c.cnrClient.Morph(), neoFSNet, c.fmtValidatorOpts...) + var fmtValidatorChain object.FSChain + if c.cnrClient != nil { + fmtValidatorChain = c.cnrClient.Morph() + } + + c.fmtValidator = object.NewFormatValidator(fmtValidatorChain, neoFSNet, c.fmtValidatorOpts...) c.metaSvc = m return &Service{ diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go new file mode 100644 index 0000000000..d912150be1 --- /dev/null +++ b/pkg/services/object/put/service_test.go @@ -0,0 +1,639 @@ +package putsvc + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "slices" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/nspcc-dev/neofs-node/internal/testutil" + clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session" + apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/user" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" +) + +const ( + currentEpoch = 123 + maxObjectSize = 1000 +) + +func Test_Slicing_REP3(t *testing.T) { + for _, tc := range []struct { + name string + ln uint64 + }{ + {name: "no payload", ln: 0}, + {name: "1B", ln: 1}, + {name: "limit-1B", ln: maxObjectSize - 1}, + {name: "exactly limit", ln: maxObjectSize}, + {name: "limit+1b", ln: maxObjectSize + 1}, + {name: "limitX2", ln: maxObjectSize * 2}, + {name: "limitX4-1", ln: maxObjectSize + 4 - 1}, + {name: "limitX5", ln: maxObjectSize * 5}, + } { + t.Run(tc.name, func(t *testing.T) { + testSlicingREP3(t, tc.ln) + }) + } +} + +func testSlicingREP3(t *testing.T, ln uint64) { + const repNodes = 3 + const cnrReserveNodes = 2 + const outCnrNodes = 2 + + cluster := newTestClusterForRepPolicy(t, repNodes, cnrReserveNodes, outCnrNodes) + + var srcObj object.Object + srcObj.SetContainerID(cidtest.ID()) + srcObj.SetOwner(usertest.ID()) + srcObj.SetAttributes( + object.NewAttribute("attr1", "val1"), + object.NewAttribute("attr2", "val2"), + ) + srcObj.SetPayload(testutil.RandByteSlice(ln)) + + var sessionToken session.Object + sessionToken.SetID(uuid.New()) + sessionToken.SetExp(1) + sessionToken.BindContainer(cidtest.ID()) + + testThroughNode := func(t *testing.T, idx int) { + sessionToken.SetAuthKey(cluster.nodeSessions[idx].signer.Public()) + require.NoError(t, sessionToken.Sign(usertest.User())) + + storeObjectWithSession(t, cluster.nodeServices[idx], srcObj, sessionToken) + + nodeObjLists := cluster.allStoredObjects() + + var restoredObj object.Object + if ln > maxObjectSize { + restoredObj = assertSplitChain(t, maxObjectSize, ln, sessionToken, nodeObjLists[0]) + + for i := 1; i < repNodes; i++ { + require.Equal(t, nodeObjLists[0], nodeObjLists[i], i) + } + linkerOnly := []object.Object{nodeObjLists[0][len(nodeObjLists[0])-1]} + for i := repNodes; i < repNodes+cnrReserveNodes; i++ { + require.Equal(t, linkerOnly, nodeObjLists[i], i) + } + for i := repNodes + cnrReserveNodes; i < len(nodeObjLists); i++ { + require.Empty(t, nodeObjLists[i], i) + } + } else { + require.Len(t, nodeObjLists[0], 1) + restoredObj = nodeObjLists[0][0] + + for i := 1; i < repNodes; i++ { + require.Len(t, nodeObjLists[i], 1, i) + + obj := nodeObjLists[i][0] + // require.Equal(t, restoredObj, obj) can fail for empty payload because []byte{} != []byte(nil) + require.Equal(t, restoredObj.CutPayload(), obj.CutPayload()) + require.True(t, bytes.Equal(restoredObj.Payload(), obj.Payload())) + } + for i := repNodes; i < len(nodeObjLists); i++ { + require.Empty(t, nodeObjLists[i], i) + } + } + + assertObjectIntegrity(t, restoredObj) + require.True(t, bytes.Equal(srcObj.Payload(), restoredObj.Payload())) + require.Equal(t, sessionToken, *restoredObj.SessionToken()) + require.Equal(t, srcObj.GetContainerID(), restoredObj.GetContainerID()) + require.Equal(t, sessionToken.Issuer(), restoredObj.Owner()) + require.EqualValues(t, currentEpoch, restoredObj.CreationEpoch()) + require.Equal(t, object.TypeRegular, restoredObj.Type()) + require.Equal(t, srcObj.Attributes(), restoredObj.Attributes()) + require.False(t, restoredObj.HasParent()) + + cluster.resetAllStoredObjects() + } + + for i := range repNodes + cnrReserveNodes + outCnrNodes { + testThroughNode(t, i) + } +} + +func newTestClusterForRepPolicy(t *testing.T, repNodes, cnrReserveNodes, outCnrNodes uint) *testCluster { + allNodes := allocNodes([]uint{repNodes + cnrReserveNodes + outCnrNodes})[0] + cnrNodes := allNodes[:repNodes+cnrReserveNodes] + + cn := mockContainerNodes{ + unsorted: [][]netmap.NodeInfo{cnrNodes}, + sorted: [][]netmap.NodeInfo{cnrNodes}, + repCounts: []uint{repNodes}, + } + + cluster := testCluster{ + nodeServices: make(nodeServices, len(allNodes)), + nodeNetworks: make([]mockNetwork, len(allNodes)), + nodeSessions: make([]mockNodeSession, len(allNodes)), + nodeLocalStorages: make([]inMemLocalStorage, len(allNodes)), + } + + for i := range allNodes { + nodeKey := neofscryptotest.ECDSAPrivateKey() + + nodeWorkerPool, err := ants.NewPool(10, ants.WithNonblocking(true)) + require.NoError(t, err) + + cluster.nodeNetworks[i] = mockNetwork{ + mockNodeState: mockNodeState{ + epoch: currentEpoch, + }, + localPub: allNodes[i].PublicKey(), + cnrNodes: cn, + } + + cluster.nodeSessions[i] = mockNodeSession{ + signer: neofscryptotest.Signer(), + expiresAt: cluster.nodeNetworks[i].mockNodeState.epoch + 1, + } + + cluster.nodeServices[i] = NewService(cluster.nodeServices, &cluster.nodeNetworks[i], nil, + WithLogger(zaptest.NewLogger(t).With(zap.Int("node", i))), + WithKeyStorage(objutil.NewKeyStorage(&nodeKey, cluster.nodeSessions[i], &cluster.nodeNetworks[i])), + WithObjectStorage(&cluster.nodeLocalStorages[i]), + WithMaxSizeSource(mockMaxSize(maxObjectSize)), + WithContainerSource(mockContainer{}), + WithNetworkState(&cluster.nodeNetworks[i]), + WithClientConstructor(cluster.nodeServices), + WithSplitChainVerifier(mockSplitVerifier{}), + WithRemoteWorkerPool(nodeWorkerPool), + ) + } + + return &cluster +} + +type mockSplitVerifier struct{} + +func (mockSplitVerifier) VerifySplit(context.Context, cid.ID, oid.ID, []object.MeasuredObject) error { + return nil +} + +type mockContainer container.Container + +func (x mockContainer) Get(cid.ID) (container.Container, error) { + return container.Container(x), nil +} + +type mockNetwork struct { + mockNodeState + localPub []byte + cnrNodes mockContainerNodes +} + +func (*mockNetwork) CurrentBlock() uint32 { + panic("unimplemented") +} + +func (*mockNetwork) CurrentEpochDuration() uint64 { + panic("unimplemented") +} + +func (x *mockNetwork) GetContainerNodes(cid.ID) (ContainerNodes, error) { + return x.cnrNodes, nil +} + +func (x *mockNetwork) IsLocalNodePublicKey(pub []byte) bool { + return bytes.Equal(x.localPub, pub) +} + +func (*mockNetwork) GetEpochBlock(uint64) (uint32, error) { + panic("unimplemented") +} + +type mockContainerNodes struct { + unsorted [][]netmap.NodeInfo + sorted [][]netmap.NodeInfo + repCounts []uint +} + +func (x mockContainerNodes) Unsorted() [][]netmap.NodeInfo { + return x.unsorted +} + +func (x mockContainerNodes) SortForObject(oid.ID) ([][]netmap.NodeInfo, error) { + return x.sorted, nil +} + +func (x mockContainerNodes) PrimaryCounts() []uint { + return x.repCounts +} + +type mockMaxSize uint64 + +func (x mockMaxSize) MaxObjectSize() uint64 { + return uint64(x) +} + +type mockNodeSession struct { + signer neofscryptotest.VariableSigner + expiresAt uint64 +} + +func (x mockNodeSession) Get(user.ID, []byte) *storage.PrivateToken { + return storage.NewPrivateToken(&x.signer.ECDSAPrivateKey, x.expiresAt) +} + +type mockNodeState struct { + epoch uint64 +} + +func (x mockNodeState) CurrentEpoch() uint64 { + return x.epoch +} + +type inMemLocalStorage struct { + mtx sync.RWMutex + objs []object.Object +} + +func (x *inMemLocalStorage) Put(obj *object.Object, objBin []byte) error { + if objBin != nil { + got := obj.Marshal() + if len(obj.Payload()) == 0 && len(objBin) == len(got)+2 { + // this may happen because encodeReplicateRequestWithoutPayload, unlike Marshal, + // adds field tag for payload even when it is empty + emptyPayloadField := protowire.AppendTag(nil, 4, protowire.BytesType) + emptyPayloadField = protowire.AppendBytes(emptyPayloadField, nil) + if bytes.Equal(objBin[len(got):], emptyPayloadField) { + objBin = objBin[:len(got)] + } + } + if !bytes.Equal(objBin, got) { + return errors.New("binary mismatches object") + } + } + + var cp object.Object + obj.CopyTo(&cp) + + x.mtx.Lock() + x.objs = append(x.objs, cp) + x.mtx.Unlock() + + return nil +} + +func (x *inMemLocalStorage) Delete(oid.Address, uint64, []oid.ID) error { + panic("unimplemented") +} + +func (x *inMemLocalStorage) Lock(oid.Address, []oid.ID) error { + panic("unimplemented") +} + +func (x *inMemLocalStorage) IsLocked(oid.Address) (bool, error) { + panic("unimplemented") +} + +type nodeServices []*Service + +func (x nodeServices) lookupNode(node clientcore.NodeInfo) (*Service, error) { + ind := slices.IndexFunc(x, func(svc *Service) bool { + return svc.neoFSNet.IsLocalNodePublicKey(node.PublicKey()) + }) + if ind < 0 { + return nil, errors.New("unknown node") + } + return x[ind], nil +} + +func (x nodeServices) Get(node clientcore.NodeInfo) (clientcore.MultiAddressClient, error) { + svc, err := x.lookupNode(node) + if err != nil { + return nil, err + } + return (*serviceClient)(svc), nil +} + +func (x nodeServices) SendReplicationRequestToNode(_ context.Context, reqBin []byte, node clientcore.NodeInfo) ([]byte, error) { + var req protoobject.ReplicateRequest + if err := proto.Unmarshal(reqBin, &req); err != nil { + return nil, fmt.Errorf("invalid request: %w", err) + } + + if req.Object == nil { + return nil, errors.New("missing object in request") + } + + var obj object.Object + if err := obj.FromProtoMessage(req.Object); err != nil { + return nil, fmt.Errorf("invalid object in request: %w", err) + } + + svc, err := x.lookupNode(node) + if err != nil { + return nil, err + } + + if err := svc.ValidateAndStoreObjectLocally(obj); err != nil { //nolint:contextcheck + return nil, fmt.Errorf("validate and store object locally: %w", err) + } + + return nil, nil +} + +type serviceClient Service + +func (m *serviceClient) ContainerAnnounceUsedSpace(context.Context, []container.SizeEstimation, client.PrmAnnounceSpace) error { + panic("unimplemented") +} + +func (m *serviceClient) ObjectPutInit(ctx context.Context, hdr object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) { + stream, err := (*Service)(m).Put(ctx) + if err != nil { + return nil, err + } + + // TODO: following is needed because struct parameters privatize some data. Refactor to avoid this. + localReq := &protoobject.PutRequest{ + MetaHeader: &protosession.RequestMetaHeader{Ttl: 1}, + } + commonPrm, err := objutil.CommonPrmFromRequest(localReq) + if err != nil { + panic(err) + } + + var ip PutInitPrm + ip.WithObject(hdr.CutPayload()) + ip.WithCommonPrm(commonPrm) + + if err := stream.Init(&ip); err != nil { + return nil, err + } + + return (*testPayloadStream)(stream), nil +} + +func (m *serviceClient) ReplicateObject(context.Context, oid.ID, io.ReadSeeker, neofscrypto.Signer, bool) (*neofscrypto.Signature, error) { + panic("unimplemented") +} + +func (m *serviceClient) ObjectDelete(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectDelete) (oid.ID, error) { + panic("unimplemented") +} + +func (m *serviceClient) ObjectGetInit(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectGet) (object.Object, *client.PayloadReader, error) { + panic("unimplemented") +} + +func (m *serviceClient) ObjectHead(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHead) (*object.Object, error) { + panic("unimplemented") +} + +func (m *serviceClient) ObjectSearchInit(context.Context, cid.ID, user.Signer, client.PrmObjectSearch) (*client.ObjectListReader, error) { + panic("unimplemented") +} + +func (m *serviceClient) SearchObjects(context.Context, cid.ID, object.SearchFilters, []string, string, neofscrypto.Signer, client.SearchObjectsOptions) ([]client.SearchResultItem, string, error) { + panic("unimplemented") +} + +func (m *serviceClient) ObjectRangeInit(context.Context, cid.ID, oid.ID, uint64, uint64, user.Signer, client.PrmObjectRange) (*client.ObjectRangeReader, error) { + panic("unimplemented") +} + +func (m *serviceClient) ObjectHash(context.Context, cid.ID, oid.ID, user.Signer, client.PrmObjectHash) ([][]byte, error) { + panic("unimplemented") +} + +func (m *serviceClient) AnnounceLocalTrust(context.Context, uint64, []apireputation.Trust, client.PrmAnnounceLocalTrust) error { + // TODO: interfaces are oversaturated. This will never be needed to server object PUT. Refactor this. + panic("unimplemented") +} + +func (m *serviceClient) AnnounceIntermediateTrust(context.Context, uint64, apireputation.PeerToPeerTrust, client.PrmAnnounceIntermediateTrust) error { + panic("unimplemented") +} + +func (m *serviceClient) ForEachGRPCConn(context.Context, func(context.Context, *grpc.ClientConn) error) error { + panic("unimplemented") +} + +type testPayloadStream Streamer + +func (x *testPayloadStream) Write(p []byte) (int, error) { + if err := (*Streamer)(x).SendChunk(new(PutChunkPrm).WithChunk(p)); err != nil { + return 0, err + } + return len(p), nil +} + +func (x *testPayloadStream) Close() error { + _, err := (*Streamer)(x).Close() + return err +} + +func (testPayloadStream) GetResult() client.ResObjectPut { + return client.ResObjectPut{} +} + +type testCluster struct { + nodeServices nodeServices + nodeNetworks []mockNetwork + nodeSessions []mockNodeSession + nodeLocalStorages []inMemLocalStorage +} + +func (x testCluster) allStoredObjects() [][]object.Object { + var res [][]object.Object + for i := range x.nodeLocalStorages { + res = append(res, x.nodeLocalStorages[i].objs) + } + return res +} + +func (x *testCluster) resetAllStoredObjects() { + for i := range x.nodeLocalStorages { + x.nodeLocalStorages[i].objs = nil + } +} + +func storeObjectWithSession(t *testing.T, svc *Service, obj object.Object, st session.Object) { + stream, err := svc.Put(context.Background()) + require.NoError(t, err) + + req := &protoobject.PutRequest{ + MetaHeader: &protosession.RequestMetaHeader{ + Ttl: 2, + SessionToken: st.ProtoMessage(), + }, + } + + commonPrm, err := objutil.CommonPrmFromRequest(req) + require.NoError(t, err) + + ip := new(PutInitPrm). + WithObject(obj.CutPayload()). + WithCommonPrm(commonPrm) + require.NoError(t, stream.Init(ip)) + + cp := new(PutChunkPrm). + WithChunk(obj.Payload()) + require.NoError(t, stream.SendChunk(cp)) + + _, err = stream.Close() + require.NoError(t, err) +} + +func assertSplitChain(t *testing.T, limit, ln uint64, sessionToken session.Object, members []object.Object) object.Object { + require.Len(t, members, splitMembersCount(limit, ln)) + + // all + for _, member := range members { + assertObjectIntegrity(t, member) + require.LessOrEqual(t, member.PayloadSize(), limit) + require.Equal(t, sessionToken, *member.SessionToken()) + require.Equal(t, sessionToken.Issuer(), member.Owner()) + require.EqualValues(t, currentEpoch, member.CreationEpoch()) + require.Empty(t, member.Attributes()) + require.True(t, member.HasParent()) + } + + // payload chunks + chunks, linker := members[:len(members)-1], members[len(members)-1] + + var gotPayload []byte + for _, chunk := range chunks { + require.Equal(t, object.TypeRegular, chunk.Type()) + + gotPayload = append(gotPayload, chunk.Payload()...) + } + require.Len(t, gotPayload, int(ln)) + + require.Zero(t, chunks[0].GetFirstID()) + require.Zero(t, chunks[0].GetPreviousID()) + for i := 1; i < len(chunks); i++ { + require.Equal(t, chunks[0].GetID(), chunks[i].GetFirstID()) + require.Equal(t, chunks[i-1].GetID(), chunks[i].GetPreviousID()) + } + + // linker + require.Equal(t, object.TypeLink, linker.Type()) + require.Equal(t, chunks[0].GetID(), linker.GetFirstID()) + require.Zero(t, linker.GetPreviousID()) + + var link object.Link + require.NoError(t, link.Unmarshal(linker.Payload())) + linkItems := link.Objects() + require.Len(t, linkItems, len(chunks)) + for i := range linkItems { + require.Equal(t, chunks[i].GetID(), linkItems[i].ObjectID()) + require.EqualValues(t, chunks[i].PayloadSize(), linkItems[i].ObjectSize()) + } + + // parent + firstParent := chunks[0].Parent() + require.NotNil(t, firstParent) + require.Zero(t, firstParent.PayloadSize()) + _, ok := firstParent.PayloadChecksum() + require.False(t, ok) + _, ok = firstParent.PayloadHomomorphicHash() + require.False(t, ok) + require.Zero(t, firstParent.GetID()) + require.Zero(t, firstParent.Signature()) + + lastParent := chunks[len(chunks)-1].Parent() + require.NotNil(t, lastParent) + require.Equal(t, lastParent, linker.Parent()) + + for i := 1; i < len(chunks)-1; i++ { + require.Nil(t, chunks[i].Parent()) + } + + var firstParentCp object.Object + firstParent.CopyTo(&firstParentCp) + firstParentCp.SetPayloadSize(lastParent.PayloadSize()) + if cs, ok := lastParent.PayloadChecksum(); ok { + firstParentCp.SetPayloadChecksum(cs) + } + if cs, ok := lastParent.PayloadHomomorphicHash(); ok { + firstParentCp.SetPayloadHomomorphicHash(cs) + } + firstParentCp.SetID(lastParent.GetID()) + firstParentCp.SetSignature(lastParent.Signature()) + require.Equal(t, firstParentCp, *lastParent) + + restored := *lastParent + restored.SetPayload(gotPayload) + + return restored +} + +func splitMembersCount(limit, ln uint64) int { + if ln <= limit { + return 1 + } + res := ln/limit + 1 // + LINK + if ln%limit != 0 { + res++ + } + return int(res) +} + +func assertObjectIntegrity(t *testing.T, obj object.Object) { + require.NoError(t, obj.CheckVerificationFields()) + + require.NotNil(t, obj.Version()) + require.Equal(t, version.Current(), *obj.Version()) + + require.NotZero(t, obj.GetContainerID()) + + require.NotZero(t, obj.Owner()) + + require.NotZero(t, obj.SessionToken()) + + payload := obj.Payload() + require.EqualValues(t, obj.PayloadSize(), len(payload)) + + cs, ok := obj.PayloadChecksum() + require.True(t, ok) + require.Equal(t, checksum.SHA256, cs.Type()) + got := sha256.Sum256(payload) + require.Equal(t, got[:], cs.Value()) + + if cs, ok := obj.PayloadHomomorphicHash(); ok { + require.Equal(t, checksum.TillichZemor, cs.Type()) + got := tz.Sum(payload) + require.Equal(t, got[:], cs.Value()) + } + + require.Zero(t, obj.Children()) + + require.Zero(t, obj.SplitID()) +}