Skip to content

Commit

Permalink
Merge pull request #130 from flrdv/master
Browse files Browse the repository at this point in the history
Disconnect IDLE connections silently, return HTTP errors without description, other internal improvements
  • Loading branch information
flrdv authored Feb 2, 2024
2 parents 9354b01 + b782f18 commit 4c8a2e2
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 79 deletions.
26 changes: 16 additions & 10 deletions examples/combined/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ func Stressful(request *http.Request) *http.Response {
}

func main() {
s := settings.Default()
s.TCP.ReadTimeout = time.Hour

app := indigo.New(addr).
Tune(s).
AutoHTTPS(8443).
NotifyOnStart(func() {
log.Println("initialized")
})

r := inbuilt.New().
Use(middleware.LogRequests()).
Alias("/", "/static/index.html").
Expand All @@ -70,20 +80,16 @@ func main() {
r.Resource("/").
Post(IndexSay)

r.Post("/shutdown", func(request *http.Request) (_ *http.Response) {
app.GracefulStop()

return http.Code(request, status.Teapot)
})

r.Group("/hello").
Get("/world", World).
Get("/easter", Easter)

s := settings.Default()
s.TCP.ReadTimeout = time.Hour

app := indigo.New(addr).
Tune(s).
AutoHTTPS(8443).
NotifyOnStart(func() {
log.Println("initialized")
})

if err := app.Serve(r); err != nil {
log.Fatal(err)
}
Expand Down
10 changes: 0 additions & 10 deletions http/method/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,28 @@ func Parse(str string) Method {
} else if str == "PUT" {
return PUT
}

return Unknown
case 4:
if str == "POST" {
return POST
} else if str == "HEAD" {
return HEAD
}

return Unknown
case 5:
if str == "PATCH" {
return PATCH
} else if str == "TRACE" {
return TRACE
}

return Unknown
case 6:
if str == "DELETE" {
return DELETE
}

return Unknown
case 7:
if str == "CONNECT" {
return CONNECT
} else if str == "OPTIONS" {
return OPTIONS
}

return Unknown
}

return Unknown
Expand Down
4 changes: 1 addition & 3 deletions http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ func (r *Response) Error(err error) *Response {
}

if http, ok := err.(status.HTTPError); ok {
return r.
Code(http.Code).
String(http.Message)
return r.Code(http.Code)
}

return r.
Expand Down
7 changes: 3 additions & 4 deletions http/status/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,9 @@ func Text(code Code) Status {
}
}

// CodeStatus returns a pre-defined line with code and status text (including
// terminating CRLF sequence) in case code is known to server, otherwise empty
// line is returned
func CodeStatus(code Code) string {
// Line returns the whole status line with code included. Be aware, that the returned string also
// has CRLF in the end
func Line(code Code) string {
switch code {
case Continue:
return "100 Continue\r\n"
Expand Down
30 changes: 16 additions & 14 deletions indi.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (a *App) getServers(addr address.Address, r router.Router) ([]*tcp.Server,
return nil, err
}

servers[i] = tcp.NewServer(sock, a.newTCPCallback(a.settings, r, listener.Encryption))
servers[i] = tcp.NewServer(sock, a.newTCPCallback(r, listener.Encryption))
}

return servers, nil
Expand All @@ -171,39 +171,41 @@ func (a *App) run(servers []*tcp.Server) error {

callIfNotNil(a.hooks.OnStart)
err := <-a.errCh
switch err {
case status.ErrGracefulShutdown:
if err == status.ErrGracefulShutdown {
// stop listening to new clients and process till the end all the old ones
tcp.PauseAll(servers)
default:
// so basically, any error here (including nil) will stop all the servers. However,
// in order to be intuitive the best choice is to send status.ErrShutdown
tcp.StopAll(servers)
}

tcp.StopAll(servers)
callIfNotNil(a.hooks.OnStop)

return err
}

// GracefulStop stops accepting new connections and waits until all the already connected clients
// disconnects
// GracefulStop stops accepting new connections, but keeps serving old ones.
//
// NOTE: the call isn't blocking. So by that, after the method returned, the server
// will be still working
func (a *App) GracefulStop() {
a.errCh <- status.ErrGracefulShutdown
}

// Stop stops the whole application immediately.
//
// NOTE: the call isn't blocking. So by that, after the method returned, the server
// will still be working
func (a *App) Stop() {
a.errCh <- status.ErrShutdown
}

func (a *App) newTCPCallback(s settings.Settings, r router.Router, enc encryption.Encryption) tcp.OnConn {
func (a *App) newTCPCallback(r router.Router, enc encryption.Encryption) tcp.OnConn {
return func(conn net.Conn) {
client := initialize.NewClient(a.settings.TCP, conn)
body := initialize.NewBody(client, s.Body)
request := initialize.NewRequest(s, conn, body)
body := initialize.NewBody(client, a.settings.Body)
request := initialize.NewRequest(a.settings, conn, body)
request.Env.Encryption = enc
trans := initialize.NewTransport(s, request)
httpServer := http.NewServer(r)
trans := initialize.NewTransport(a.settings, request)
httpServer := http.NewServer(r, a.settings.HTTP.OnDisconnect)
httpServer.Run(client, request, trans)
}
}
Expand Down
105 changes: 94 additions & 11 deletions indigo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func headersToMap(hdrs headers.Headers, keys []string) map[string]string {
return m
}

func TestServer(t *testing.T) {
func TestFirstPhase(t *testing.T) {
ch := make(chan struct{})
app := New(addr)
go func(app *App) {
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestServer(t *testing.T) {

data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "not found", string(data))
require.Empty(t, string(data))
})

t.Run("trace", func(t *testing.T) {
Expand Down Expand Up @@ -405,13 +405,11 @@ func TestServer(t *testing.T) {
t.Run("idle disconnect", func(t *testing.T) {
conn, err := net.Dial("tcp4", addr)
require.NoError(t, err)
defer conn.Close()

response, err := io.ReadAll(conn)
require.NoError(t, err)
wantResponseLine := "HTTP/1.1 408 Request Timeout\r\n"
lf := bytes.IndexByte(response, '\n')
require.NotEqual(t, -1, lf, "http response must contain at least one LF")
require.Equal(t, wantResponseLine, string(response[:lf+1]))
require.Empty(t, response, "must no data be transmitted")
})

testCtxValue := func(t *testing.T, addr string) {
Expand Down Expand Up @@ -502,10 +500,8 @@ func TestServer(t *testing.T) {
require.NoError(t, conn.SetReadDeadline(time.Now().Add(time.Second)))
data, err := io.ReadAll(conn)
require.NoError(t, err)
n := bytes.Count(data, []byte("\r\n\r\n"))
// pipelinedRequests+1 because we're reading till io.EOF. So by that, the server
// sends us 408 Request Timeout before closing the connection
require.Equal(t, pipelinedRequests+1, n, "got less pipelined responses as expected")
n := bytes.Count(data, []byte("HTTP/1.0 200 OK\r\n"))
require.Equal(t, pipelinedRequests, n, "got less successful responses as expected")
})

t.Run("chunked body", func(t *testing.T) {
Expand All @@ -523,7 +519,94 @@ func TestServer(t *testing.T) {

t.Run("forced stop", func(t *testing.T) {
app.Stop()
chanRead(ch, 5*time.Second)
_, ok := chanRead(ch, 5*time.Second)
require.True(t, ok, "server did not shut down correctly")
})
}

func TestSecondPhase(t *testing.T) {
// second phase starts a new server instance with different configuration in order
// to cover cases, that could not be covered in the first phase

ch := make(chan struct{})
app := New(addr)
go func(app *App) {
r := getInbuiltRouter()
s := settings.Default()
s.TCP.ReadTimeout = 500 * time.Millisecond
s.HTTP.OnDisconnect = func(request *http.Request) *http.Response {
return http.Error(request, status.ErrRequestTimeout)
}
_ = app.
Tune(s).
NotifyOnStart(func() {
ch <- struct{}{}
}).
NotifyOnStop(func() {
ch <- struct{}{}
}).
Listen(altPort, encryption.Plain).
AutoHTTPS(httpsPort).
Serve(r)
}(app)

<-ch

t.Run("idle disconnect", func(t *testing.T) {
conn, err := net.Dial("tcp4", addr)
require.NoError(t, err)
defer conn.Close()

response, err := io.ReadAll(conn)
require.NoError(t, err)
wantResponseLine := "HTTP/1.1 408 Request Timeout\r\n"
lf := bytes.IndexByte(response, '\n')
require.NotEqual(t, -1, lf, "http response must contain at least one LF")
require.Equal(t, wantResponseLine, string(response[:lf+1]))
})

doRequest := func(conn net.Conn) error {
_, _ = conn.Write([]byte("GET / HTTP/1.1\r\n\r\n"))
buff := make([]byte, 4096)
_, err := conn.Read(buff)
return err
}

t.Run("graceful shutdown", func(t *testing.T) {
client := func(ch chan<- error) {
conn, err := net.Dial("tcp", addr)
ch <- err
if err != nil {
return
}

for i := 0; i < 20; i++ {
if err := doRequest(conn); err != nil {
ch <- err
return
}

time.Sleep(100 * time.Millisecond)
}

ch <- nil
}

first := make(chan error)
go client(first)
require.NoError(t, <-first)

// as in main test-case there is no way to combine this test with forced shutdown
app.GracefulStop()
time.Sleep(100 * time.Millisecond)
second := make(chan error)
go client(second)
// the socket on graceful shutdown isn't closed, it just stops accepting new connections.
// So by that, we are able to connect freely
<-second
// ...the thing is, that we'll get the read-tcp error on a try to send something
require.Error(t, <-second)
require.NoError(t, <-first)
})
}

Expand Down
18 changes: 13 additions & 5 deletions internal/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,46 @@ import (
"github.com/indigo-web/indigo/internal/server/tcp"
"github.com/indigo-web/indigo/internal/transport"
"github.com/indigo-web/indigo/router"
"github.com/indigo-web/indigo/settings"
"github.com/indigo-web/utils/uf"
"os"
)

type Server struct {
router router.Router
upgradePreResp *http.Response
onDisconnect settings.OnDisconnectCallback
}

func NewServer(router router.Router) *Server {
func NewServer(router router.Router, onDisconnect settings.OnDisconnectCallback) *Server {
return &Server{
router: router,
upgradePreResp: http.NewResponse(),
onDisconnect: onDisconnect,
}
}

func (h *Server) Run(client tcp.Client, req *http.Request, trans transport.Transport) {
for h.HandleRequest(client, req, trans) {
}

if h.onDisconnect != nil {
_ = trans.Write(req.Proto, req, h.onDisconnect(req), client)
}

_ = client.Close()
}

func (h *Server) HandleRequest(client tcp.Client, req *http.Request, trans transport.Transport) (ok bool) {
data, err := client.Read()
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
err = status.ErrConnectionTimeout
} else {
err = status.ErrCloseConnection
return false
}

_ = trans.Write(req.Proto, req, h.router.OnError(req, err), client)
// TODO: maybe, passing the err as-is would be a better solution? However, in this case it'll
// be more difficult to recognize what exact kind of error that is.
_ = trans.Write(req.Proto, req, h.router.OnError(req, status.ErrCloseConnection), client)
return false
}

Expand Down
2 changes: 1 addition & 1 deletion internal/server/http/http_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newServer(client tcp.Client) (*Server, *http.Request, transport.Transport)
request := initialize.NewRequest(settings.Default(), dummy.NewNopConn(), body)
trans := initialize.NewTransport(settings.Default(), request)

return NewServer(r), request, trans
return NewServer(r, nil), request, trans
}

func disperse(data []byte, n int) (parts [][]byte) {
Expand Down
Loading

0 comments on commit 4c8a2e2

Please sign in to comment.