From 55a25a1d0992bb81490f6e1e59602f6cf5400208 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 19 Dec 2024 10:29:33 +0300 Subject: [PATCH 1/2] storage/shard: Use imported constant from SDK lib, not api-go one This was the only direct importer of to-be-deprecated `neofs-api-go/v2` module in `pkg/local_object_storage` space. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/shard/gc_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 5fc2e7cc68..fda4b2c810 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" @@ -84,7 +83,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) { cnr := cidtest.ID() var expAttr object.Attribute - expAttr.SetKey(objectV2.SysAttributeExpEpoch) + expAttr.SetKey(object.AttributeExpirationEpoch) expAttr.SetValue("1") obj := generateObjectWithCID(cnr) @@ -211,7 +210,7 @@ func TestExpiration(t *testing.T) { ch := sh.NotificationChannel() var expAttr object.Attribute - expAttr.SetKey(objectV2.SysAttributeExpEpoch) + expAttr.SetKey(object.AttributeExpirationEpoch) obj := generateObject() From fdc0a4275cea3ffe54f29f18847f4a99cf053ac9 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 18 Dec 2024 21:40:41 +0300 Subject: [PATCH 2/2] node/object: Refactor code making request forwarding This refactors storage node app making it to use gRPC connection to remote nodes for request forwarding without wrappers. The behavior is preserved with some error text added/changed for the better. Avoid importing `github.com/nspcc-dev/neofs-api-go/v2/rpc` packages. The only place left will naturally go with coming SDK upgrade. One step closer to whole `neofs-api-go` module's deprecation. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/transport.go | 66 +++++---- pkg/core/client/client.go | 10 +- pkg/network/address.go | 4 +- pkg/network/cache/multi.go | 21 ++- pkg/services/object/get/v2/util.go | 190 +++++++++++++++++-------- pkg/services/object/internal/key.go | 6 +- pkg/services/object/put/v2/service.go | 1 + pkg/services/object/put/v2/streamer.go | 35 +++-- pkg/services/object/search/v2/util.go | 42 ++++-- 9 files changed, 248 insertions(+), 127 deletions(-) diff --git a/cmd/neofs-node/transport.go b/cmd/neofs-node/transport.go index 2382e17242..efd62526a4 100644 --- a/cmd/neofs-node/transport.go +++ b/cmd/neofs-node/transport.go @@ -5,13 +5,12 @@ import ( "fmt" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" - rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/message" "github.com/nspcc-dev/neofs-api-go/v2/status" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" ) type transport struct { @@ -26,49 +25,60 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte return nil, fmt.Errorf("connect to remote node: %w", err) } - var resp replicateResponse - err = c.ExecRaw(func(c *rawclient.Client) error { + var resp objectGRPC.ReplicateResponse + err = c.ExecRaw(func(conn *grpc.ClientConn) error { // this will be changed during NeoFS API Go deprecation. Code most likely be // placed in SDK - m := common.CallMethodInfo{Service: "neo.fs.v2.object.ObjectService", Name: "Replicate"} - err = rawclient.SendUnary(c, m, rawclient.BinaryMessage(req), &resp, - rawclient.WithContext(ctx), rawclient.AllowBinarySendingOnly()) + err = conn.Invoke(ctx, objectGRPC.ObjectService_Replicate_FullMethodName, req, &resp, binaryMessageOnly) if err != nil { - return fmt.Errorf("API transport (service=%s,op=%s): %w", m.Service, m.Name, err) + return fmt.Errorf("API transport (op=%s): %w", objectGRPC.ObjectService_Replicate_FullMethodName, err) } - return resp.err + return err }) - return resp.sigs, err -} + if err != nil { + return nil, err + } -type replicateResponse struct { - sigs []byte - err error + return replicationResultFromResponse(&resp) } -func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) } +// [encoding.Codec] making Marshal to accept and forward []byte messages only. +var binaryMessageOnly = grpc.ForceCodec(protoCodecBinaryRequestOnly{}) + +type protoCodecBinaryRequestOnly struct{} + +func (protoCodecBinaryRequestOnly) Name() string { + // may be any non-empty, conflicts are unlikely to arise + return "neofs_binary_sender" +} -func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error { - m, ok := gm.(*objectGRPC.ReplicateResponse) - if !ok { - return message.NewUnexpectedMessageType(gm, m) +func (protoCodecBinaryRequestOnly) Marshal(msg any) ([]byte, error) { + bMsg, ok := msg.([]byte) + if ok { + return bMsg, nil } + return nil, fmt.Errorf("message is not of type %T", bMsg) +} + +func (protoCodecBinaryRequestOnly) Unmarshal(raw []byte, msg any) error { + return encoding.GetCodec(proto.Name).Unmarshal(raw, msg) +} + +func replicationResultFromResponse(m *objectGRPC.ReplicateResponse) ([]byte, error) { var st *status.Status if mst := m.GetStatus(); mst != nil { st = new(status.Status) err := st.FromGRPCMessage(mst) if err != nil { - return fmt.Errorf("decode response status: %w", err) + return nil, fmt.Errorf("decode response status: %w", err) } } - x.err = apistatus.ErrorFromV2(st) - if x.err != nil { - return nil + err := apistatus.ErrorFromV2(st) + if err != nil { + return nil, err } - x.sigs = m.GetObjectSignature() - - return nil + return m.GetObjectSignature(), nil } diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 7e42f31006..285a7d357b 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -4,7 +4,6 @@ import ( "context" "io" - rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -14,6 +13,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc" ) // Client is an interface of NeoFS storage @@ -30,7 +30,7 @@ type Client interface { ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error) AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error AnnounceIntermediateTrust(ctx context.Context, epoch uint64, trust reputationSDK.PeerToPeerTrust, prm client.PrmAnnounceIntermediateTrust) error - ExecRaw(f func(client *rawclient.Client) error) error + ExecRaw(f func(*grpc.ClientConn) error) error Close() error } @@ -39,9 +39,9 @@ type Client interface { type MultiAddressClient interface { Client - // RawForAddress must return rawclient.Client - // for the passed network.Address. - RawForAddress(network.Address, func(cli *rawclient.Client) error) error + // RawForAddress executes op over gRPC connections to given multi-address + // endpoint-by-endpoint until success. + RawForAddress(multiAddr network.Address, op func(*grpc.ClientConn) error) error ReportError(error) } diff --git a/pkg/network/address.go b/pkg/network/address.go index f5e0345e81..ef79eeb829 100644 --- a/pkg/network/address.go +++ b/pkg/network/address.go @@ -8,7 +8,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + "github.com/nspcc-dev/neofs-node/internal/uriutil" ) /* @@ -69,7 +69,7 @@ func (a *Address) FromString(s string) error { host string hasTLS bool ) - host, hasTLS, err = client.ParseURI(s) + host, hasTLS, err = uriutil.Parse(s) if err != nil { host = s } diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index ffac3e6368..28a962c361 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -21,6 +21,7 @@ import ( reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -56,6 +57,20 @@ func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClien } } +type clientWrapper struct { + *client.Client +} + +func (x clientWrapper) ExecRaw(f func(*grpc.ClientConn) error) error { + return x.Client.ExecRaw(func(c *rawclient.Client) error { + conn := c.Conn() + if conn == nil { + return errors.New("missing conn") + } + return f(conn.(*grpc.ClientConn)) + }) +} + func (x *multiClient) createForAddress(addr network.Address) (clientcore.Client, error) { var ( prmInit client.PrmInit @@ -88,7 +103,7 @@ func (x *multiClient) createForAddress(addr network.Address) (clientcore.Client, return nil, fmt.Errorf("can't init SDK client: %w", err) } - return c, nil + return clientWrapper{c}, nil } // updateGroup replaces current multiClient addresses with a new group. @@ -329,7 +344,7 @@ func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, epoch uint6 }) } -func (x *multiClient) ExecRaw(f func(client *rawclient.Client) error) error { +func (x *multiClient) ExecRaw(f func(*grpc.ClientConn) error) error { return x.iterateClients(context.Background(), func(c clientcore.Client) error { return c.ExecRaw(f) }) @@ -351,7 +366,7 @@ func (x *multiClient) Close() error { return nil } -func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclient.Client) error) error { +func (x *multiClient) RawForAddress(addr network.Address, f func(*grpc.ClientConn) error) error { c, err := x.client(addr) if err != nil { return err diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index e1f6ae1e33..11740b1bfb 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -11,12 +11,13 @@ import ( "sync" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-api-go/v2/refs" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" - rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + protorefs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" + protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" @@ -30,6 +31,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/nspcc-dev/tzhash/tz" + "google.golang.org/grpc" ) var errWrongMessageSeq = errors.New("incorrect message sequence") @@ -101,9 +103,11 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - var getStream *rpc.GetResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(ctx)) + var getStream protoobject.ObjectService_GetClient + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + getStream, err = protoobject.NewObjectServiceClient(conn).Get(ctx, req.ToGRPCMessage().(*protoobject.GetRequest)) return err }) if err != nil { @@ -112,13 +116,12 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre var ( headWas bool - resp = new(objectV2.GetResponse) localProgress int ) for { // receive message from server stream - err := getStream.Read(resp) + resp, err := getStream.Recv() if err != nil { if errors.Is(err, io.EOF) { if !headWas { @@ -138,7 +141,11 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.GetResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("response verification failed: %w", err) } @@ -149,18 +156,27 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre switch v := resp.GetBody().GetObjectPart().(type) { default: return nil, fmt.Errorf("unexpected object part %T", v) - case *objectV2.GetObjectPartInit: + case *protoobject.GetResponse_Body_Init_: if headWas { return nil, errWrongMessageSeq } headWas = true - obj := new(objectV2.Object) + if v == nil || v.Init == nil { + return nil, errors.New("nil header oneof field") + } - obj.SetObjectID(v.GetObjectID()) - obj.SetSignature(v.GetSignature()) - obj.SetHeader(v.GetHeader()) + m := &protoobject.Object{ + ObjectId: v.Init.ObjectId, + Signature: v.Init.Signature, + Header: v.Init.Header, + } + + obj := new(objectV2.Object) + if err := obj.FromGRPCMessage(m); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } onceHeaderSending.Do(func() { err = streamWrapper.WriteHeader(object.NewFromV2(obj)) @@ -168,7 +184,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre if err != nil { return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) } - case *objectV2.GetObjectPartChunk: + case *protoobject.GetResponse_Body_Chunk: if !headWas { return nil, errWrongMessageSeq } @@ -187,8 +203,15 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre localProgress += len(origChunk) globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) + case *protoobject.GetResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 objectV2.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) return nil, object.NewSplitInfoError(si) } } @@ -271,21 +294,22 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - var rangeStream *rpc.ObjectRangeResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(ctx)) + var rangeStream protoobject.ObjectService_GetRangeClient + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + rangeStream, err = protoobject.NewObjectServiceClient(conn).GetRange(ctx, req.ToGRPCMessage().(*protoobject.GetRangeRequest)) return err }) if err != nil { return nil, fmt.Errorf("could not create Get payload range stream: %w", err) } - resp := new(objectV2.GetRangeResponse) var localProgress int for { // receive message from server stream - err := rangeStream.Read(resp) + resp, err := rangeStream.Recv() if err != nil { if errors.Is(err, io.EOF) { break @@ -301,7 +325,11 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.GetRangeResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("could not verify %T: %w", resp, err) } @@ -312,7 +340,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get switch v := resp.GetBody().GetRangePart().(type) { case nil: return nil, fmt.Errorf("unexpected range type %T", v) - case *objectV2.GetRangePartChunk: + case *protoobject.GetRangeResponse_Body_Chunk: origChunk := v.GetChunk() chunk := chunkToSend(globalProgress, localProgress, origChunk) @@ -327,8 +355,15 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get localProgress += len(origChunk) globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) + case *protoobject.GetRangeResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 objectV2.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) return nil, object.NewSplitInfoError(si) } @@ -435,9 +470,9 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran return nil, err } - var resp *objectV2.GetRangeHashResponse - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - resp, err = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx)) + var resp *protoobject.GetRangeHashResponse + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + resp, err = protoobject.NewObjectServiceClient(conn).GetRangeHash(ctx, req.ToGRPCMessage().(*protoobject.GetRangeHashRequest)) return err }) if err != nil { @@ -450,7 +485,11 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.GetRangeHashResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("could not verify %T: %w", resp, err) } @@ -546,9 +585,9 @@ func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp * // perhaps it is worth highlighting the utility function in neofs-api-go // send Head request - var headResp *objectV2.HeadResponse - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx)) + var headResp *protoobject.HeadResponse + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + headResp, err = protoobject.NewObjectServiceClient(conn).Head(ctx, req.ToGRPCMessage().(*protoobject.HeadRequest)) return err }) if err != nil { @@ -561,53 +600,64 @@ func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp * } // verify response structure - if err := signature.VerifyServiceMessage(headResp); err != nil { + resp2 := new(objectV2.HeadResponse) + if err = resp2.FromGRPCMessage(headResp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("response verification failed: %w", err) } - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + if err = checkStatus(headResp.GetMetaHeader().GetStatus()); err != nil { return nil, err } var ( - hdr *objectV2.Header - idSig *refs.Signature + hdr *protoobject.Header + idSig *protorefs.Signature ) - switch v := headResp.GetBody().GetHeaderPart().(type) { + switch v := headResp.GetBody().GetHead().(type) { case nil: return nil, fmt.Errorf("unexpected header type %T", v) - case *objectV2.ShortHeader: + case *protoobject.HeadResponse_Body_ShortHeader: if !body.GetMainOnly() { return nil, fmt.Errorf("wrong header part type: expected %T, received %T", (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), ) } - h := v - - hdr = new(objectV2.Header) - hdr.SetPayloadLength(h.GetPayloadLength()) - hdr.SetVersion(h.GetVersion()) - hdr.SetOwnerID(h.GetOwnerID()) - hdr.SetObjectType(h.GetObjectType()) - hdr.SetCreationEpoch(h.GetCreationEpoch()) - hdr.SetPayloadHash(h.GetPayloadHash()) - hdr.SetHomomorphicHash(h.GetHomomorphicHash()) - case *objectV2.HeaderWithSignature: + if v == nil || v.ShortHeader == nil { + return nil, errors.New("nil short header oneof field") + } + + h := v.ShortHeader + hdr = &protoobject.Header{ + Version: h.Version, + OwnerId: h.OwnerId, + CreationEpoch: h.CreationEpoch, + PayloadLength: h.PayloadLength, + PayloadHash: h.PayloadHash, + ObjectType: h.ObjectType, + HomomorphicHash: h.HomomorphicHash, + } + case *protoobject.HeadResponse_Body_Header: if body.GetMainOnly() { return nil, fmt.Errorf("wrong header part type: expected %T, received %T", (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), ) } - hdrWithSig := v - if hdrWithSig == nil { - return nil, errors.New("nil object part") + if v == nil || v.Header == nil { + return nil, errors.New("nil header oneof field") + } + + if v.Header.Header == nil { + return nil, errors.New("missing header") } - hdr = hdrWithSig.GetHeader() - idSig = hdrWithSig.GetSignature() + hdr = v.Header.Header + idSig = v.Header.Signature if idSig == nil { // TODO(@cthulhu-rider): #1387 use "const" error @@ -616,23 +666,39 @@ func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp * binID := objAddr.Object().Marshal() + var sig2 refs.Signature + if err := sig2.FromGRPCMessage(idSig); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } var sig neofscrypto.Signature - if err := sig.ReadFromV2(*idSig); err != nil { + if err := sig.ReadFromV2(sig2); err != nil { return nil, fmt.Errorf("can't read signature: %w", err) } if !sig.Verify(binID) { return nil, errors.New("invalid object ID signature") } - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) + case *protoobject.HeadResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 objectV2.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) return nil, object.NewSplitInfoError(si) } + mObj := &protoobject.Object{ + Signature: idSig, + Header: hdr, + } objv2 := new(objectV2.Object) - objv2.SetHeader(hdr) - objv2.SetSignature(idSig) + if err := objv2.FromGRPCMessage(mObj); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } obj := object.NewFromV2(objv2) obj.SetID(objAddr.Object()) @@ -748,7 +814,11 @@ func writeCurrentVersion(metaHdr *session.RequestMetaHeader) { metaHdr.SetVersion(versionV2) } -func checkStatus(stV2 *status.Status) error { +func checkStatus(st *protostatus.Status) error { + stV2 := new(status.Status) + if err := stV2.FromGRPCMessage(st); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } if !status.IsSuccess(stV2.Code()) { return apistatus.ErrorFromV2(stV2) } diff --git a/pkg/services/object/internal/key.go b/pkg/services/object/internal/key.go index 9e5211973e..2d86353fea 100644 --- a/pkg/services/object/internal/key.go +++ b/pkg/services/object/internal/key.go @@ -3,15 +3,15 @@ package internal import ( "bytes" - "github.com/nspcc-dev/neofs-api-go/v2/session" + session "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" ) // VerifyResponseKeyV2 checks if response is signed with expected key. Returns client.ErrWrongPublicKey if not. func VerifyResponseKeyV2(expectedKey []byte, resp interface { - GetVerificationHeader() *session.ResponseVerificationHeader + GetVerifyHeader() *session.ResponseVerificationHeader }) error { - if !bytes.Equal(resp.GetVerificationHeader().GetBodySignature().GetKey(), expectedKey) { + if !bytes.Equal(resp.GetVerifyHeader().GetBodySignature().GetKey(), expectedKey) { return client.ErrWrongPublicKey } diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go index 365ff96af9..11bdd8d63b 100644 --- a/pkg/services/object/put/v2/service.go +++ b/pkg/services/object/put/v2/service.go @@ -45,6 +45,7 @@ func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) { return &streamer{ stream: stream, key: s.key, + ctx: ctx, }, nil } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index f4516795b5..e959caa31c 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -1,21 +1,23 @@ package putsvc import ( + "context" "crypto/ecdsa" "fmt" "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" - rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" + protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "google.golang.org/grpc" ) type streamer struct { @@ -26,6 +28,7 @@ type streamer struct { chunks []*object.PutRequest *sizes // only for relay streams + ctx context.Context } type sizes struct { @@ -116,7 +119,6 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClient) error { // open stream - resp := new(object.PutResponse) key := info.PublicKey() @@ -135,10 +137,11 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien // would be nice to log otherwise }() - var stream *rpc.PutRequestWriter - - err = c.RawForAddress(addr, func(cli *rawclient.Client) error { - stream, err = rpc.PutObject(cli, resp) + var stream protoobject.ObjectService_PutClient + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + stream, err = protoobject.NewObjectServiceClient(conn).Put(ctx) return err }) if err != nil { @@ -147,7 +150,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } // send init part - err = stream.Write(s.init) + err = stream.Send(s.init.ToGRPCMessage().(*protoobject.PutRequest)) if err != nil { internalclient.ReportError(c, err) err = fmt.Errorf("sending the initial message to stream failed: %w", err) @@ -155,7 +158,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } for i := range s.chunks { - if err = stream.Write(s.chunks[i]); err != nil { + if err = stream.Send(s.chunks[i].ToGRPCMessage().(*protoobject.PutRequest)); err != nil { internalclient.ReportError(c, err) err = fmt.Errorf("sending the chunk %d failed: %w", i, err) return @@ -163,7 +166,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } // close object stream and receive response from remote node - err = stream.Close() + resp, err := stream.CloseAndRecv() if err != nil { err = fmt.Errorf("closing the stream failed: %w", err) return @@ -175,7 +178,11 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } // verify response structure - err = signature.VerifyServiceMessage(resp) + resp2 := new(object.PutResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + err = signature.VerifyServiceMessage(resp2) if err != nil { err = fmt.Errorf("response verification failed: %w", err) } @@ -191,7 +198,11 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien return firstErr } -func checkStatus(stV2 *status.Status) error { +func checkStatus(st *protostatus.Status) error { + stV2 := new(status.Status) + if err := stV2.FromGRPCMessage(st); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } if !status.IsSuccess(stV2.Code()) { return apistatus.ErrorFromV2(stV2) } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 4b863598c4..498290a421 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -1,17 +1,19 @@ package searchsvc import ( + "context" "errors" "fmt" "io" "sync" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" - rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" + protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" @@ -22,6 +24,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "google.golang.org/grpc" ) func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) { @@ -81,9 +84,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, err } - var searchStream *rpc.SearchResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - searchStream, err = rpc.SearchObjects(cli, req, rpcclient.WithContext(stream.Context())) + var searchStream protoobject.ObjectService_SearchClient + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + searchStream, err = protoobject.NewObjectServiceClient(conn).Search(ctx, req.ToGRPCMessage().(*protoobject.SearchRequest)) return err }) if err != nil { @@ -92,14 +97,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre // code below is copy-pasted from c.SearchObjects implementation, // perhaps it is worth highlighting the utility function in neofs-api-go - var ( - searchResult []oid.ID - resp = new(objectV2.SearchResponse) - ) + var searchResult []oid.ID for { // receive message from server stream - err := searchStream.Read(resp) + resp, err := searchStream.Recv() if err != nil { if errors.Is(err, io.EOF) { break @@ -114,7 +116,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.SearchResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("could not verify %T: %w", resp, err) } @@ -122,11 +128,15 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, fmt.Errorf("remote node response: %w", err) } - chunk := resp.GetBody().GetIDList() + chunk := resp.GetBody().GetIdList() var id oid.ID for i := range chunk { - err = id.ReadFromV2(chunk[i]) + var id2 refs.ObjectID + if err = id2.FromGRPCMessage(chunk[i]); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + err = id.ReadFromV2(id2) if err != nil { return nil, fmt.Errorf("invalid object ID: %w", err) } @@ -145,7 +155,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return p, nil } -func checkStatus(stV2 *status.Status) error { +func checkStatus(st *protostatus.Status) error { + stV2 := new(status.Status) + if err := stV2.FromGRPCMessage(st); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } if !status.IsSuccess(stV2.Code()) { return apistatus.ErrorFromV2(stV2) }