Skip to content

Commit

Permalink
Increase the maximum read size of a single data packet
Browse files Browse the repository at this point in the history
  • Loading branch information
vearne committed Aug 7, 2024
1 parent 5977fb3 commit ffa4f13
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
3 changes: 2 additions & 1 deletion http2/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"golang.org/x/net/http2/hpack"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -165,7 +166,7 @@ func (s *Stream) toMsg(finder *PBMessageFinder) *protocol.Message {
if codecType == CodecProtobuf {
if len(s.Method) <= 0 {
slog.Error("method is empty, this is illegal")
} else {
} else if !strings.Contains(s.Method, "grpc.reflection") {
// Note: Temporarily only handle the case where the encoding method is Protobuf
pbMsg := finder.FindMethodInputWithCache(s.Method)
err := proto.Unmarshal(s.DataBuf.Bytes(), pbMsg)
Expand Down
13 changes: 11 additions & 2 deletions http2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (p *Processor) ProcessTCPPkg() {
continue
}

dc := pkg.DirectConn()
slog.Debug("Connection:%v, seq:%v, length:%v", &dc, pkg.TCP.Seq, len(payload))

for len(payload) >= HeaderSize {
f, err = ParseFrameBase(payload)
if err != nil {
Expand All @@ -60,8 +63,8 @@ func (p *Processor) ProcessTCPPkg() {

dc := pkg.DirectConn()
f.DirectConn = &dc
slog.Debug("Connection:%v, seq:%v, FrameType:%v, length:%v, streamID:%v",
f.DirectConn, pkg.TCP.Seq, GetFrameType(f.Type), f.Length, f.StreamID)
slog.Debug("Connection:%v, seq:%v, FrameType:%v, length:%v, len(payload):%v, streamID:%v",
f.DirectConn, pkg.TCP.Seq, GetFrameType(f.Type), f.Length, len(f.Payload), f.StreamID)

var ok bool
if _, ok = p.ConnRepository[dc]; !ok {
Expand Down Expand Up @@ -125,6 +128,9 @@ func (p *Processor) processFrameData(f *FrameBase) {
return
}

slog.Debug("processFrameData, Padded:%v, PadLength:%v, EndStream:%v, len(fd.Data):%v",
fd.Padded, fd.PadLength, fd.EndStream, len(fd.Data))

hc := p.ConnRepository[*f.DirectConn]
// Set the state of the stream
index := f.StreamID % StreamArraySize
Expand All @@ -136,6 +142,7 @@ func (p *Processor) processFrameData(f *FrameBase) {
msg, _ := fd.ParseGRPCMessage()
// Compression is turned on
if msg.PayloadFormat == compressionMade {
slog.Debug("msg.PayloadFormat == compressionMade")
// only support gzip
gzipReader, err = gzip.NewReader(bytes.NewReader(msg.EncodedMessage))
if err != nil {
Expand All @@ -149,6 +156,7 @@ func (p *Processor) processFrameData(f *FrameBase) {
}
}

slog.Debug("len(msg.EncodedMessage):%v", len(msg.EncodedMessage))
_, err = stream.DataBuf.Write(msg.EncodedMessage)
if err != nil {
slog.Error("processFrameData, gunzip error:%v", err)
Expand Down Expand Up @@ -305,6 +313,7 @@ func (f *PBMessageFinder) FindMethodInputWithCache(svcAndMethod string) proto.Me
m, ok := f.symbolMsg[svcAndMethod]
f.cacheMu.RUnlock()
if ok {
slog.Debug("FindMethodInputWithCache,svcAndMethod:%v, hit cache", svcAndMethod)
return m
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
snapshotLen int32 = 1024
snapshotLen int32 = 20000
promiscuous bool = false
timeout time.Duration = 5 * time.Second
)
Expand Down

0 comments on commit ffa4f13

Please sign in to comment.