diff --git a/cmd/neofs-node/transport.go b/cmd/neofs-node/transport.go index 2382e17242..efd62526a4 100644 --- a/cmd/neofs-node/transport.go +++ b/cmd/neofs-node/transport.go @@ -5,13 +5,12 @@ import ( "fmt" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" - rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/message" "github.com/nspcc-dev/neofs-api-go/v2/status" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" ) type transport struct { @@ -26,49 +25,60 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte return nil, fmt.Errorf("connect to remote node: %w", err) } - var resp replicateResponse - err = c.ExecRaw(func(c *rawclient.Client) error { + var resp objectGRPC.ReplicateResponse + err = c.ExecRaw(func(conn *grpc.ClientConn) error { // this will be changed during NeoFS API Go deprecation. Code most likely be // placed in SDK - m := common.CallMethodInfo{Service: "neo.fs.v2.object.ObjectService", Name: "Replicate"} - err = rawclient.SendUnary(c, m, rawclient.BinaryMessage(req), &resp, - rawclient.WithContext(ctx), rawclient.AllowBinarySendingOnly()) + err = conn.Invoke(ctx, objectGRPC.ObjectService_Replicate_FullMethodName, req, &resp, binaryMessageOnly) if err != nil { - return fmt.Errorf("API transport (service=%s,op=%s): %w", m.Service, m.Name, err) + return fmt.Errorf("API transport (op=%s): %w", objectGRPC.ObjectService_Replicate_FullMethodName, err) } - return resp.err + return err }) - return resp.sigs, err -} + if err != nil { + return nil, err + } -type replicateResponse struct { - sigs []byte - err error + return replicationResultFromResponse(&resp) } -func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) } +// [encoding.Codec] making Marshal to accept and forward []byte messages only. +var binaryMessageOnly = grpc.ForceCodec(protoCodecBinaryRequestOnly{}) + +type protoCodecBinaryRequestOnly struct{} + +func (protoCodecBinaryRequestOnly) Name() string { + // may be any non-empty, conflicts are unlikely to arise + return "neofs_binary_sender" +} -func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error { - m, ok := gm.(*objectGRPC.ReplicateResponse) - if !ok { - return message.NewUnexpectedMessageType(gm, m) +func (protoCodecBinaryRequestOnly) Marshal(msg any) ([]byte, error) { + bMsg, ok := msg.([]byte) + if ok { + return bMsg, nil } + return nil, fmt.Errorf("message is not of type %T", bMsg) +} + +func (protoCodecBinaryRequestOnly) Unmarshal(raw []byte, msg any) error { + return encoding.GetCodec(proto.Name).Unmarshal(raw, msg) +} + +func replicationResultFromResponse(m *objectGRPC.ReplicateResponse) ([]byte, error) { var st *status.Status if mst := m.GetStatus(); mst != nil { st = new(status.Status) err := st.FromGRPCMessage(mst) if err != nil { - return fmt.Errorf("decode response status: %w", err) + return nil, fmt.Errorf("decode response status: %w", err) } } - x.err = apistatus.ErrorFromV2(st) - if x.err != nil { - return nil + err := apistatus.ErrorFromV2(st) + if err != nil { + return nil, err } - x.sigs = m.GetObjectSignature() - - return nil + return m.GetObjectSignature(), nil } diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index 7e42f31006..285a7d357b 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -4,7 +4,6 @@ import ( "context" "io" - rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -14,6 +13,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc" ) // Client is an interface of NeoFS storage @@ -30,7 +30,7 @@ type Client interface { ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error) AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error AnnounceIntermediateTrust(ctx context.Context, epoch uint64, trust reputationSDK.PeerToPeerTrust, prm client.PrmAnnounceIntermediateTrust) error - ExecRaw(f func(client *rawclient.Client) error) error + ExecRaw(f func(*grpc.ClientConn) error) error Close() error } @@ -39,9 +39,9 @@ type Client interface { type MultiAddressClient interface { Client - // RawForAddress must return rawclient.Client - // for the passed network.Address. - RawForAddress(network.Address, func(cli *rawclient.Client) error) error + // RawForAddress executes op over gRPC connections to given multi-address + // endpoint-by-endpoint until success. + RawForAddress(multiAddr network.Address, op func(*grpc.ClientConn) error) error ReportError(error) } diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 5fc2e7cc68..fda4b2c810 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" @@ -84,7 +83,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) { cnr := cidtest.ID() var expAttr object.Attribute - expAttr.SetKey(objectV2.SysAttributeExpEpoch) + expAttr.SetKey(object.AttributeExpirationEpoch) expAttr.SetValue("1") obj := generateObjectWithCID(cnr) @@ -211,7 +210,7 @@ func TestExpiration(t *testing.T) { ch := sh.NotificationChannel() var expAttr object.Attribute - expAttr.SetKey(objectV2.SysAttributeExpEpoch) + expAttr.SetKey(object.AttributeExpirationEpoch) obj := generateObject() diff --git a/pkg/network/address.go b/pkg/network/address.go index f5e0345e81..ef79eeb829 100644 --- a/pkg/network/address.go +++ b/pkg/network/address.go @@ -8,7 +8,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" - "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + "github.com/nspcc-dev/neofs-node/internal/uriutil" ) /* @@ -69,7 +69,7 @@ func (a *Address) FromString(s string) error { host string hasTLS bool ) - host, hasTLS, err = client.ParseURI(s) + host, hasTLS, err = uriutil.Parse(s) if err != nil { host = s } diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index ffac3e6368..28a962c361 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -21,6 +21,7 @@ import ( reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -56,6 +57,20 @@ func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClien } } +type clientWrapper struct { + *client.Client +} + +func (x clientWrapper) ExecRaw(f func(*grpc.ClientConn) error) error { + return x.Client.ExecRaw(func(c *rawclient.Client) error { + conn := c.Conn() + if conn == nil { + return errors.New("missing conn") + } + return f(conn.(*grpc.ClientConn)) + }) +} + func (x *multiClient) createForAddress(addr network.Address) (clientcore.Client, error) { var ( prmInit client.PrmInit @@ -88,7 +103,7 @@ func (x *multiClient) createForAddress(addr network.Address) (clientcore.Client, return nil, fmt.Errorf("can't init SDK client: %w", err) } - return c, nil + return clientWrapper{c}, nil } // updateGroup replaces current multiClient addresses with a new group. @@ -329,7 +344,7 @@ func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, epoch uint6 }) } -func (x *multiClient) ExecRaw(f func(client *rawclient.Client) error) error { +func (x *multiClient) ExecRaw(f func(*grpc.ClientConn) error) error { return x.iterateClients(context.Background(), func(c clientcore.Client) error { return c.ExecRaw(f) }) @@ -351,7 +366,7 @@ func (x *multiClient) Close() error { return nil } -func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclient.Client) error) error { +func (x *multiClient) RawForAddress(addr network.Address, f func(*grpc.ClientConn) error) error { c, err := x.client(addr) if err != nil { return err diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index e1f6ae1e33..11740b1bfb 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -11,12 +11,13 @@ import ( "sync" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-api-go/v2/refs" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" - rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + protorefs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" + protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" @@ -30,6 +31,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/nspcc-dev/tzhash/tz" + "google.golang.org/grpc" ) var errWrongMessageSeq = errors.New("incorrect message sequence") @@ -101,9 +103,11 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - var getStream *rpc.GetResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - getStream, err = rpc.GetObject(cli, req, rpcclient.WithContext(ctx)) + var getStream protoobject.ObjectService_GetClient + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + getStream, err = protoobject.NewObjectServiceClient(conn).Get(ctx, req.ToGRPCMessage().(*protoobject.GetRequest)) return err }) if err != nil { @@ -112,13 +116,12 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre var ( headWas bool - resp = new(objectV2.GetResponse) localProgress int ) for { // receive message from server stream - err := getStream.Read(resp) + resp, err := getStream.Recv() if err != nil { if errors.Is(err, io.EOF) { if !headWas { @@ -138,7 +141,11 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.GetResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("response verification failed: %w", err) } @@ -149,18 +156,27 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre switch v := resp.GetBody().GetObjectPart().(type) { default: return nil, fmt.Errorf("unexpected object part %T", v) - case *objectV2.GetObjectPartInit: + case *protoobject.GetResponse_Body_Init_: if headWas { return nil, errWrongMessageSeq } headWas = true - obj := new(objectV2.Object) + if v == nil || v.Init == nil { + return nil, errors.New("nil header oneof field") + } - obj.SetObjectID(v.GetObjectID()) - obj.SetSignature(v.GetSignature()) - obj.SetHeader(v.GetHeader()) + m := &protoobject.Object{ + ObjectId: v.Init.ObjectId, + Signature: v.Init.Signature, + Header: v.Init.Header, + } + + obj := new(objectV2.Object) + if err := obj.FromGRPCMessage(m); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } onceHeaderSending.Do(func() { err = streamWrapper.WriteHeader(object.NewFromV2(obj)) @@ -168,7 +184,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre if err != nil { return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) } - case *objectV2.GetObjectPartChunk: + case *protoobject.GetResponse_Body_Chunk: if !headWas { return nil, errWrongMessageSeq } @@ -187,8 +203,15 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre localProgress += len(origChunk) globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) + case *protoobject.GetResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 objectV2.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) return nil, object.NewSplitInfoError(si) } } @@ -271,21 +294,22 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - var rangeStream *rpc.ObjectRangeResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - rangeStream, err = rpc.GetObjectRange(cli, req, rpcclient.WithContext(ctx)) + var rangeStream protoobject.ObjectService_GetRangeClient + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + rangeStream, err = protoobject.NewObjectServiceClient(conn).GetRange(ctx, req.ToGRPCMessage().(*protoobject.GetRangeRequest)) return err }) if err != nil { return nil, fmt.Errorf("could not create Get payload range stream: %w", err) } - resp := new(objectV2.GetRangeResponse) var localProgress int for { // receive message from server stream - err := rangeStream.Read(resp) + resp, err := rangeStream.Recv() if err != nil { if errors.Is(err, io.EOF) { break @@ -301,7 +325,11 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.GetRangeResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("could not verify %T: %w", resp, err) } @@ -312,7 +340,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get switch v := resp.GetBody().GetRangePart().(type) { case nil: return nil, fmt.Errorf("unexpected range type %T", v) - case *objectV2.GetRangePartChunk: + case *protoobject.GetRangeResponse_Body_Chunk: origChunk := v.GetChunk() chunk := chunkToSend(globalProgress, localProgress, origChunk) @@ -327,8 +355,15 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get localProgress += len(origChunk) globalProgress += len(chunk) - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) + case *protoobject.GetRangeResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 objectV2.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) return nil, object.NewSplitInfoError(si) } @@ -435,9 +470,9 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran return nil, err } - var resp *objectV2.GetRangeHashResponse - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - resp, err = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx)) + var resp *protoobject.GetRangeHashResponse + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + resp, err = protoobject.NewObjectServiceClient(conn).GetRangeHash(ctx, req.ToGRPCMessage().(*protoobject.GetRangeHashRequest)) return err }) if err != nil { @@ -450,7 +485,11 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.GetRangeHashResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("could not verify %T: %w", resp, err) } @@ -546,9 +585,9 @@ func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp * // perhaps it is worth highlighting the utility function in neofs-api-go // send Head request - var headResp *objectV2.HeadResponse - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx)) + var headResp *protoobject.HeadResponse + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + headResp, err = protoobject.NewObjectServiceClient(conn).Head(ctx, req.ToGRPCMessage().(*protoobject.HeadRequest)) return err }) if err != nil { @@ -561,53 +600,64 @@ func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp * } // verify response structure - if err := signature.VerifyServiceMessage(headResp); err != nil { + resp2 := new(objectV2.HeadResponse) + if err = resp2.FromGRPCMessage(headResp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("response verification failed: %w", err) } - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + if err = checkStatus(headResp.GetMetaHeader().GetStatus()); err != nil { return nil, err } var ( - hdr *objectV2.Header - idSig *refs.Signature + hdr *protoobject.Header + idSig *protorefs.Signature ) - switch v := headResp.GetBody().GetHeaderPart().(type) { + switch v := headResp.GetBody().GetHead().(type) { case nil: return nil, fmt.Errorf("unexpected header type %T", v) - case *objectV2.ShortHeader: + case *protoobject.HeadResponse_Body_ShortHeader: if !body.GetMainOnly() { return nil, fmt.Errorf("wrong header part type: expected %T, received %T", (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), ) } - h := v - - hdr = new(objectV2.Header) - hdr.SetPayloadLength(h.GetPayloadLength()) - hdr.SetVersion(h.GetVersion()) - hdr.SetOwnerID(h.GetOwnerID()) - hdr.SetObjectType(h.GetObjectType()) - hdr.SetCreationEpoch(h.GetCreationEpoch()) - hdr.SetPayloadHash(h.GetPayloadHash()) - hdr.SetHomomorphicHash(h.GetHomomorphicHash()) - case *objectV2.HeaderWithSignature: + if v == nil || v.ShortHeader == nil { + return nil, errors.New("nil short header oneof field") + } + + h := v.ShortHeader + hdr = &protoobject.Header{ + Version: h.Version, + OwnerId: h.OwnerId, + CreationEpoch: h.CreationEpoch, + PayloadLength: h.PayloadLength, + PayloadHash: h.PayloadHash, + ObjectType: h.ObjectType, + HomomorphicHash: h.HomomorphicHash, + } + case *protoobject.HeadResponse_Body_Header: if body.GetMainOnly() { return nil, fmt.Errorf("wrong header part type: expected %T, received %T", (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), ) } - hdrWithSig := v - if hdrWithSig == nil { - return nil, errors.New("nil object part") + if v == nil || v.Header == nil { + return nil, errors.New("nil header oneof field") + } + + if v.Header.Header == nil { + return nil, errors.New("missing header") } - hdr = hdrWithSig.GetHeader() - idSig = hdrWithSig.GetSignature() + hdr = v.Header.Header + idSig = v.Header.Signature if idSig == nil { // TODO(@cthulhu-rider): #1387 use "const" error @@ -616,23 +666,39 @@ func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp * binID := objAddr.Object().Marshal() + var sig2 refs.Signature + if err := sig2.FromGRPCMessage(idSig); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } var sig neofscrypto.Signature - if err := sig.ReadFromV2(*idSig); err != nil { + if err := sig.ReadFromV2(sig2); err != nil { return nil, fmt.Errorf("can't read signature: %w", err) } if !sig.Verify(binID) { return nil, errors.New("invalid object ID signature") } - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) + case *protoobject.HeadResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 objectV2.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) return nil, object.NewSplitInfoError(si) } + mObj := &protoobject.Object{ + Signature: idSig, + Header: hdr, + } objv2 := new(objectV2.Object) - objv2.SetHeader(hdr) - objv2.SetSignature(idSig) + if err := objv2.FromGRPCMessage(mObj); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } obj := object.NewFromV2(objv2) obj.SetID(objAddr.Object()) @@ -748,7 +814,11 @@ func writeCurrentVersion(metaHdr *session.RequestMetaHeader) { metaHdr.SetVersion(versionV2) } -func checkStatus(stV2 *status.Status) error { +func checkStatus(st *protostatus.Status) error { + stV2 := new(status.Status) + if err := stV2.FromGRPCMessage(st); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } if !status.IsSuccess(stV2.Code()) { return apistatus.ErrorFromV2(stV2) } diff --git a/pkg/services/object/internal/key.go b/pkg/services/object/internal/key.go index 9e5211973e..2d86353fea 100644 --- a/pkg/services/object/internal/key.go +++ b/pkg/services/object/internal/key.go @@ -3,15 +3,15 @@ package internal import ( "bytes" - "github.com/nspcc-dev/neofs-api-go/v2/session" + session "github.com/nspcc-dev/neofs-api-go/v2/session/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" ) // VerifyResponseKeyV2 checks if response is signed with expected key. Returns client.ErrWrongPublicKey if not. func VerifyResponseKeyV2(expectedKey []byte, resp interface { - GetVerificationHeader() *session.ResponseVerificationHeader + GetVerifyHeader() *session.ResponseVerificationHeader }) error { - if !bytes.Equal(resp.GetVerificationHeader().GetBodySignature().GetKey(), expectedKey) { + if !bytes.Equal(resp.GetVerifyHeader().GetBodySignature().GetKey(), expectedKey) { return client.ErrWrongPublicKey } diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go index 365ff96af9..11bdd8d63b 100644 --- a/pkg/services/object/put/v2/service.go +++ b/pkg/services/object/put/v2/service.go @@ -45,6 +45,7 @@ func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) { return &streamer{ stream: stream, key: s.key, + ctx: ctx, }, nil } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index f4516795b5..e959caa31c 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -1,21 +1,23 @@ package putsvc import ( + "context" "crypto/ecdsa" "fmt" "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" - rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" + protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "google.golang.org/grpc" ) type streamer struct { @@ -26,6 +28,7 @@ type streamer struct { chunks []*object.PutRequest *sizes // only for relay streams + ctx context.Context } type sizes struct { @@ -116,7 +119,6 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClient) error { // open stream - resp := new(object.PutResponse) key := info.PublicKey() @@ -135,10 +137,11 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien // would be nice to log otherwise }() - var stream *rpc.PutRequestWriter - - err = c.RawForAddress(addr, func(cli *rawclient.Client) error { - stream, err = rpc.PutObject(cli, resp) + var stream protoobject.ObjectService_PutClient + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + stream, err = protoobject.NewObjectServiceClient(conn).Put(ctx) return err }) if err != nil { @@ -147,7 +150,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } // send init part - err = stream.Write(s.init) + err = stream.Send(s.init.ToGRPCMessage().(*protoobject.PutRequest)) if err != nil { internalclient.ReportError(c, err) err = fmt.Errorf("sending the initial message to stream failed: %w", err) @@ -155,7 +158,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } for i := range s.chunks { - if err = stream.Write(s.chunks[i]); err != nil { + if err = stream.Send(s.chunks[i].ToGRPCMessage().(*protoobject.PutRequest)); err != nil { internalclient.ReportError(c, err) err = fmt.Errorf("sending the chunk %d failed: %w", i, err) return @@ -163,7 +166,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } // close object stream and receive response from remote node - err = stream.Close() + resp, err := stream.CloseAndRecv() if err != nil { err = fmt.Errorf("closing the stream failed: %w", err) return @@ -175,7 +178,11 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien } // verify response structure - err = signature.VerifyServiceMessage(resp) + resp2 := new(object.PutResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + err = signature.VerifyServiceMessage(resp2) if err != nil { err = fmt.Errorf("response verification failed: %w", err) } @@ -191,7 +198,11 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien return firstErr } -func checkStatus(stV2 *status.Status) error { +func checkStatus(st *protostatus.Status) error { + stV2 := new(status.Status) + if err := stV2.FromGRPCMessage(st); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } if !status.IsSuccess(stV2.Code()) { return apistatus.ErrorFromV2(stV2) } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 4b863598c4..498290a421 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -1,17 +1,19 @@ package searchsvc import ( + "context" "errors" "fmt" "io" "sync" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" - rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" + protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" @@ -22,6 +24,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "google.golang.org/grpc" ) func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) { @@ -81,9 +84,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, err } - var searchStream *rpc.SearchResponseReader - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - searchStream, err = rpc.SearchObjects(cli, req, rpcclient.WithContext(stream.Context())) + var searchStream protoobject.ObjectService_SearchClient + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + searchStream, err = protoobject.NewObjectServiceClient(conn).Search(ctx, req.ToGRPCMessage().(*protoobject.SearchRequest)) return err }) if err != nil { @@ -92,14 +97,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre // code below is copy-pasted from c.SearchObjects implementation, // perhaps it is worth highlighting the utility function in neofs-api-go - var ( - searchResult []oid.ID - resp = new(objectV2.SearchResponse) - ) + var searchResult []oid.ID for { // receive message from server stream - err := searchStream.Read(resp) + resp, err := searchStream.Recv() if err != nil { if errors.Is(err, io.EOF) { break @@ -114,7 +116,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre } // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { + resp2 := new(objectV2.SearchResponse) + if err = resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { return nil, fmt.Errorf("could not verify %T: %w", resp, err) } @@ -122,11 +128,15 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, fmt.Errorf("remote node response: %w", err) } - chunk := resp.GetBody().GetIDList() + chunk := resp.GetBody().GetIdList() var id oid.ID for i := range chunk { - err = id.ReadFromV2(chunk[i]) + var id2 refs.ObjectID + if err = id2.FromGRPCMessage(chunk[i]); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + err = id.ReadFromV2(id2) if err != nil { return nil, fmt.Errorf("invalid object ID: %w", err) } @@ -145,7 +155,11 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return p, nil } -func checkStatus(stV2 *status.Status) error { +func checkStatus(st *protostatus.Status) error { + stV2 := new(status.Status) + if err := stV2.FromGRPCMessage(st); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } if !status.IsSuccess(stV2.Code()) { return apistatus.ErrorFromV2(stV2) }