From 513869008c56724f86f98f265ecfe744aff370d6 Mon Sep 17 00:00:00 2001 From: vearne Date: Mon, 11 Nov 2024 12:08:44 +0800 Subject: [PATCH] update http2/tcp_buffer.go --- Makefile | 2 +- http2/http2.go | 4 ++-- http2/tcp_buffer.go | 28 ++++++++++++++++++++++++++++ http2/tcp_buffer_test.go | 22 ++++++++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index f84e2fd..33cff10 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION := v0.2.4 +VERSION := v0.2.5 BIN_NAME = grpcr CONTAINER = grpcr diff --git a/http2/http2.go b/http2/http2.go index 20af38a..100063b 100644 --- a/http2/http2.go +++ b/http2/http2.go @@ -152,7 +152,7 @@ func (hc *Http2Conn) deal() { buf := make([]byte, HeaderSize) _, err = io.ReadFull(hc.Reader, buf) if err != nil { - slog.Error("Http2Conn.deal, ReadFull:%v", err) + slog.Warn("Http2Conn.deal, ReadFull:%v", err) break } @@ -170,7 +170,7 @@ func (hc *Http2Conn) deal() { if fb.Length > 0 { _, err = io.ReadFull(hc.Reader, buf) if err != nil { - slog.Error("Http2Conn.deal, ReadFull:%v", err) + slog.Warn("Http2Conn.deal, ReadFull:%v", err) break } } diff --git a/http2/tcp_buffer.go b/http2/tcp_buffer.go index 9dda887..1728b7a 100644 --- a/http2/tcp_buffer.go +++ b/http2/tcp_buffer.go @@ -9,6 +9,8 @@ import ( "sync/atomic" ) +const MaxWindowSize = 65536 + type TCPBuffer struct { //The number of bytes of data currently cached size atomic.Int64 @@ -55,6 +57,14 @@ func (sb *TCPBuffer) AddTCP(tcpPkg *layers.TCP) { slog.Debug("[start]SocketBuffer.addTCP, size:%v, actualCanReadSize:%v, expectedSeq:%v", sb.size.Load(), sb.actualCanReadSize.Load(), sb.expectedSeq) + // Discard packets outside the sliding window + if !validPackage(sb.expectedSeq, MaxWindowSize, tcpPkg.Seq) { + slog.Debug("[end]SocketBuffer.addTCP-discard packets outside the sliding window, "+ + "size:%v, actualCanReadSize:%v, expectedSeq:%v", + sb.size.Load(), sb.actualCanReadSize.Load(), sb.expectedSeq) + return + } + // duplicate package if sb.List.Get(tcpPkg.Seq) != nil { slog.Debug("[end]SocketBuffer.addTCP-duplicate package, size:%v, actualCanReadSize:%v, expectedSeq:%v", @@ -91,3 +101,21 @@ func (sb *TCPBuffer) AddTCP(tcpPkg *layers.TCP) { slog.Debug("[end]SocketBuffer.addTCP, size:%v, actualCanReadSize:%v, expectedSeq:%v", sb.size.Load(), sb.actualCanReadSize.Load(), sb.expectedSeq) } + +func validPackage(expectedSeq uint32, maxWindowSize uint32, pkgSeq uint32) bool { + rightBorder := (expectedSeq + maxWindowSize) % math.MaxUint32 + // case 1: sequence wrap around + if rightBorder < expectedSeq { + if (pkgSeq <= rightBorder) || (pkgSeq >= expectedSeq) { + return true + } else { + return false + } + } else { // case 2 + if pkgSeq >= expectedSeq && pkgSeq <= expectedSeq+maxWindowSize { + return true + } else { + return false + } + } +} diff --git a/http2/tcp_buffer_test.go b/http2/tcp_buffer_test.go index f8896c7..97005bc 100644 --- a/http2/tcp_buffer_test.go +++ b/http2/tcp_buffer_test.go @@ -200,3 +200,25 @@ func TestSocketBufferWrapAround3(t *testing.T) { // assert for nil (good for errors) assert.Nil(t, err) } + +func TestValidPackage(t *testing.T) { + testCases := []struct { + expectedSeq uint32 + maxWindowSize uint32 + pkgSeq uint32 + expected bool + }{ + // case 1 + {4294966995, 10000, 4294967095, true}, + {4294966995, 10000, 9500, true}, + {4294966995, 10000, 4294946995, false}, + // case 2 + {10000, 10000, 10200, true}, + {10000, 10000, 3000, false}, + {10000, 10000, 20300, false}, + } + for _, testCase := range testCases { + actual := validPackage(testCase.expectedSeq, testCase.maxWindowSize, testCase.pkgSeq) + assert.Equal(t, testCase.expected, actual, "Not consistent with expectations") + } +}