Skip to content

Commit

Permalink
Merge pull request #39 from vearne/develop
Browse files Browse the repository at this point in the history
Even if FindMethodInput fails, the program does not exit
  • Loading branch information
vearne authored Sep 9, 2024
2 parents def27a5 + 1218b79 commit 75abdb9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION := v0.1.4
VERSION := v0.1.5

BIN_NAME = grpcr
CONTAINER = grpcr
Expand Down
14 changes: 10 additions & 4 deletions http2/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func NewStream() *Stream {
return &s
}

func (s *Stream) toMsg(finder *PBMessageFinder) *protocol.Message {
func (s *Stream) toMsg(finder *PBMessageFinder) (*protocol.Message, error) {
var msg protocol.Message
id := uuid.Must(uuid.NewUUID())
msg.Meta.Version = 1
Expand All @@ -168,23 +168,29 @@ func (s *Stream) toMsg(finder *PBMessageFinder) *protocol.Message {
slog.Error("method is empty, this is illegal")
} else if !strings.Contains(s.Method, "grpc.reflection") {
// Note: Temporarily only handle the case where the encoding method is Protobuf
pbMsg := finder.FindMethodInputWithCache(s.Method)
err := proto.Unmarshal(s.DataBuf.Bytes(), pbMsg)
pbMsg, err := finder.FindMethodInputWithCache(s.Method)
if err != nil {
slog.Error("finder.FindMethodInputWithCache, error:%v", err)
return nil, err
}
err = proto.Unmarshal(s.DataBuf.Bytes(), pbMsg)
if err != nil {
slog.Error("method:%v, proto.Unmarshal:%v", s.Method, err)
return nil, err
}

s.Request, err = protojson.Marshal(pbMsg)
if err != nil {
slog.Error("method:%v, json.Marshal:%v", s.Method, err)
return nil, err
}
}
} else {
s.Request = s.DataBuf.Bytes()
}

msg.Data.Request = string(s.Request)
return &msg
return &msg, nil
}

func (s *Stream) Reset() {
Expand Down
43 changes: 28 additions & 15 deletions http2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"context"
"fmt"
"github.com/fullstorydev/grpcurl"
"google.golang.org/protobuf/types/descriptorpb"
"io"
Expand Down Expand Up @@ -164,7 +165,10 @@ func (p *Processor) processFrameData(f *FrameBase) {
}

if fd.EndStream {
p.OutputChan <- stream.toMsg(p.Finder)
pMsg, pErr := stream.toMsg(p.Finder)
if pErr == nil {
p.OutputChan <- pMsg
}
stream.Reset()
}
}
Expand Down Expand Up @@ -211,7 +215,10 @@ func (p *Processor) processFrameHeader(f *FrameBase) {
}

if fh.EndStream {
p.OutputChan <- stream.toMsg(p.Finder)
pMsg, pErr := stream.toMsg(p.Finder)
if pErr == nil {
p.OutputChan <- pMsg
}
stream.Reset()
}
}
Expand Down Expand Up @@ -306,47 +313,51 @@ func NewPBMessageFinder(addr string) *PBMessageFinder {
return &f
}

func (f *PBMessageFinder) FindMethodInputWithCache(svcAndMethod string) proto.Message {
func (f *PBMessageFinder) FindMethodInputWithCache(svcAndMethod string) (proto.Message, error) {
slog.Debug("FindMethodInputWithCache, svcAndMethod:%v", svcAndMethod)

f.cacheMu.RLock()
m, ok := f.symbolMsg[svcAndMethod]
f.cacheMu.RUnlock()
if ok {
slog.Debug("FindMethodInputWithCache,svcAndMethod:%v, hit cache", svcAndMethod)
return m
return m, nil
}

msg := f.FindMethodInput(svcAndMethod)
msg, err := f.FindMethodInput(svcAndMethod)
if err != nil {
return nil, err
}

f.cacheMu.Lock()
f.symbolMsg[svcAndMethod] = msg
f.cacheMu.Unlock()
return msg
return msg, nil
}

func (f *PBMessageFinder) FindMethodInput(svcAndMethod string) proto.Message {
func (f *PBMessageFinder) FindMethodInput(svcAndMethod string) (proto.Message, error) {
slog.Debug("FindMethodInput, svcAndMethod:%v", svcAndMethod)

var cc *grpc.ClientConn
network := "tcp"
ctx := context.Background()
cc, err := grpcurl.BlockingDial(ctx, network, f.addr, nil)
if err != nil {
slog.Fatal("PBMessageFinder.FindMethodInput, addr:%v, error:%v,enable grpc reflection service",
slog.Fatal("PBMessageFinder.FindMethodInput,addr:%v,error:%v,enable grpc reflection service?",
f.addr, err)
}
refClient := grpcreflect.NewClientV1Alpha(ctx, reflectpb.NewServerReflectionClient(cc))
descSource := grpcurl.DescriptorSourceFromServer(ctx, refClient)
svc, method := parseSymbol(svcAndMethod)
slog.Debug("parseSymbol, svc:%v, method:%v", svc, method)
slog.Info("parseSymbol, svc:%v, method:%v", svc, method)
dsc, err := descSource.FindSymbol(svc)
if err != nil {
slog.Fatal("descSource.FindSymbol, service:%v, method:%v, error:%v", svc, method, err)
return nil, fmt.Errorf("descSource.FindSymbol,service:%v,method:%v,error:%w", svc, method, err)
}
sd, ok := dsc.(*desc.ServiceDescriptor)
if !ok {
slog.Fatal("FindMethodInput, error:%v", err)
return nil, fmt.Errorf("change to *desc.ServiceDescriptor,service:%v,method:%v, type:%T",
svc, method, dsc)
}
mtd := sd.FindMethodByName(method)
inputType := mtd.GetInputType()
Expand All @@ -356,18 +367,20 @@ func (f *PBMessageFinder) FindMethodInput(svcAndMethod string) proto.Message {
ConstructFileDescriptorSet(strSet, fdSet, inputType.GetFile())
prFiles, err := protodesc.NewFiles(fdSet)
if err != nil {
slog.Fatal("protodesc.NewFiles, svcAndMethod:%v, error:%v", svcAndMethod, err)
return nil, fmt.Errorf("protodesc.NewFiles,service:%v,method:%v,error:%w", svc, method, err)
}
pfd, err := prFiles.FindDescriptorByName(protoreflect.FullName(inputType.GetFullyQualifiedName()))
if err != nil {
slog.Fatal("prFiles.FindDescriptorByName, svcAndMethod:%v, error:%v", svcAndMethod, err)
return nil, fmt.Errorf("prFiles.FindDescriptorByName,service:%v,method:%v,error:%w",
svc, method, err)
}

pfmd, ok := pfd.(protoreflect.MessageDescriptor)
if !ok {
slog.Fatal("pfd.(protoreflect.MessageDescriptor), svcAndMethod:%v, type:%T", svcAndMethod, pfd)
return nil, fmt.Errorf("pfd.(protoreflect.MessageDescriptor),service:%v,method:%v,type:%T",
svc, method, pfd)
}
return dynamicpb.NewMessage(pfmd)
return dynamicpb.NewMessage(pfmd), nil
}

func ConstructFileDescriptorSet(set *StringSet, fdSet *descriptorpb.FileDescriptorSet, fd *desc.FileDescriptor) {
Expand Down

0 comments on commit 75abdb9

Please sign in to comment.