Skip to content

Commit

Permalink
fix: Handle the errors when the listener socket is closed gracefully.
Browse files Browse the repository at this point in the history
  • Loading branch information
hessjcg committed Jan 21, 2025
1 parent d565d6e commit 1a8e52b
Showing 1 changed file with 54 additions and 40 deletions.
94 changes: 54 additions & 40 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package proxy

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -637,7 +638,7 @@ func (c *Client) Serve(ctx context.Context, notify func()) error {
for _, m := range c.mnts {
go func(mnt *socketMount) {
err := c.serveSocketMount(ctx, mnt)
if err != nil {
if err != nil && errors.Is() {

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / build

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / run lint

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / run lint

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / run lint

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / run lint

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / run lint

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / FreeBSD and OpenBSD compilation check

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / Check docs are up to date

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / Run govulncheck

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / integration tests (macos-latest)

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / unit tests

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / integration tests (windows-latest)

not enough arguments in call to errors.Is

Check failure on line 641 in internal/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / integration tests (ubuntu-latest)

not enough arguments in call to errors.Is
select {
// Best effort attempt to send error.
// If this send fails, it means the reading goroutine has
Expand Down Expand Up @@ -731,50 +732,63 @@ func (c *Client) Close() error {

// serveSocketMount persistently listens to the socketMounts listener and proxies connections to a
// given Cloud SQL instance.
func (c *Client) serveSocketMount(_ context.Context, s *socketMount) error {
func (c *Client) serveSocketMount(ctx context.Context, s *socketMount) error {
for {
cConn, err := s.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
c.logger.Errorf("[%s] Error accepting connection: %v", s.inst, err)
// For transient errors, wait a small amount of time to see if it resolves itself
time.Sleep(10 * time.Millisecond)
continue
}
return err
}
// handle the connection in a separate goroutine
go func() {
c.logger.Infof("[%s] Accepted connection from %s", s.inst, cConn.RemoteAddr())

// A client has established a connection to the local socket. Before
// we initiate a connection to the Cloud SQL backend, increment the
// connection counter. If the total number of connections exceeds
// the maximum, refuse to connect and close the client connection.
count := atomic.AddUint64(&c.connCount, 1)
defer atomic.AddUint64(&c.connCount, ^uint64(0))

if c.conf.MaxConnections > 0 && count > c.conf.MaxConnections {
c.logger.Infof("max connections (%v) exceeded, refusing new connection", c.conf.MaxConnections)
if c.connRefuseNotify != nil {
go c.connRefuseNotify()
select {
case <-ctx.Done():
// If the context was canceled, do not accept any more connections,
// exit gracefully.
return nil
default:
// Wait to accept a connection. When s.Accept() returns io.EOF, exit
// gracefully.
cConn, err := s.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
c.logger.Errorf("[%s] Error accepting connection: %v", s.inst, err)
// For transient errors, wait a small amount of time to see if it resolves itself
time.Sleep(10 * time.Millisecond)
continue
} else if err == io.EOF {
// The socket was closed gracefully. Stop processing connections.
return nil
}
_ = cConn.Close()
return
return err
}

// give a max of 30 seconds to connect to the instance
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// handle the connection in a separate goroutine
go func() {
c.logger.Infof("[%s] Accepted connection from %s", s.inst, cConn.RemoteAddr())

// A client has established a connection to the local socket. Before
// we initiate a connection to the Cloud SQL backend, increment the
// connection counter. If the total number of connections exceeds
// the maximum, refuse to connect and close the client connection.
count := atomic.AddUint64(&c.connCount, 1)
defer atomic.AddUint64(&c.connCount, ^uint64(0))

if c.conf.MaxConnections > 0 && count > c.conf.MaxConnections {
c.logger.Infof("max connections (%v) exceeded, refusing new connection", c.conf.MaxConnections)
if c.connRefuseNotify != nil {
go c.connRefuseNotify()
}
_ = cConn.Close()
return
}

sConn, err := c.dialer.Dial(ctx, s.inst, s.dialOpts...)
if err != nil {
c.logger.Errorf("[%s] failed to connect to instance: %v", s.inst, err)
_ = cConn.Close()
return
}
c.proxyConn(s.inst, cConn, sConn)
}()
// give a max of 30 seconds to connect to the instance
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

sConn, err := c.dialer.Dial(ctx, s.inst, s.dialOpts...)
if err != nil {
c.logger.Errorf("[%s] failed to connect to instance: %v", s.inst, err)
_ = cConn.Close()
return
}
c.proxyConn(s.inst, cConn, sConn)
}()
}
}
}

Expand Down

0 comments on commit 1a8e52b

Please sign in to comment.