Skip to content

Commit

Permalink
replace gobwas/ws with gorilla/websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
rkonfj committed Dec 9, 2023
1 parent 2a78ee9 commit 72387eb
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 222 deletions.
220 changes: 66 additions & 154 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/gorilla/websocket"
"github.com/miekg/dns"
D "github.com/rkonfj/toh/dns"
"github.com/rkonfj/toh/server/api"
"github.com/rkonfj/toh/spec"
"github.com/rkonfj/toh/ws"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -163,24 +161,55 @@ func (c *TohClient) DialUDP(ctx context.Context, addr string) (net.Conn, error)
return c.DialContext(ctx, "udp", addr)
}

func (c *TohClient) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
func (c *TohClient) DialContext(ctx context.Context, network, addr string) (
conn net.Conn, err error) {
handshake := http.Header{}
handshake.Add(spec.HeaderHandshakeKey, c.options.Key)
handshake.Add(spec.HeaderHandshakeNet, network)
handshake.Add(spec.HeaderHandshakeAddr, addr)
handshake.Add(spec.HeaderHandshakeNonce, spec.NewNonce())
for k, v := range c.options.Headers {
for _, item := range v {
handshake.Add(k, item)
}
}

t1 := time.Now()
wsConn, estAddr, connEntry, err := c.dialWS(ctx, c.options.Server, handshake)
if err != nil {
return
}
logrus.Debugf("%s://%s established successfully, toh latency %s", network, addr, time.Since(t1))

if len(estAddr) == 0 {
estAddr = "0.0.0.0:0"
}
connEntry.RemoteHost = addr
connEntry.Proto = network
connEntry.RemoteAddr = estAddr
connEntry.add()

host, _port, err := net.SplitHostPort(estAddr)
port, _ := strconv.Atoi(_port)
switch network {
case "tcp", "tcp4", "tcp6":
conn, addr, err := c.dial(ctx, network, address)
if err != nil {
return nil, err
remoteAddr := &net.TCPAddr{
IP: net.ParseIP(host),
Port: port,
}
return spec.NewConn(conn, addr), nil
conn = spec.NewConn(wsConn, remoteAddr)
case "udp", "udp4", "udp6":
conn, addr, err := c.dial(ctx, network, address)
if err != nil {
return nil, err
remoteAddr := &net.UDPAddr{
IP: net.ParseIP(host),
Port: port,
}
return &spec.PacketConnWrapper{Conn: spec.NewConn(conn, addr)}, nil
conn = spec.PacketConnWrapper{Conn: spec.NewConn(wsConn, remoteAddr)}
default:
return nil, errors.New("unsupport network " + network)
err = spec.ErrUnsupportNetwork
}
return
}

func (c *TohClient) Stats() (s *api.Stats, err error) {
u, _ := url.ParseRequestURI(c.options.Server)
scheme := u.Scheme
Expand Down Expand Up @@ -220,69 +249,8 @@ func (c *TohClient) dnsExchange(dnServer string, query *dns.Msg) (resp *dns.Msg,
return
}

func (c *TohClient) dial(ctx context.Context, network, addr string) (
conn spec.StreamConn, remoteAddr net.Addr, err error) {
handshake := http.Header{}
handshake.Add(spec.HeaderHandshakeKey, c.options.Key)
handshake.Add(spec.HeaderHandshakeNet, network)
handshake.Add(spec.HeaderHandshakeAddr, addr)
handshake.Add(spec.HeaderHandshakeNonce, spec.NewNonce())
for k, v := range c.options.Headers {
for _, item := range v {
handshake.Add(k, item)
}
}

t1 := time.Now()

conn, respHeader, err := c.dialWS(ctx, c.options.Server, handshake)
if err != nil {
return
}
logrus.Debugf("%s://%s established successfully, toh latency %s", network, addr, time.Since(t1))

estAddr := respHeader.Get(spec.HeaderEstablishAddr)
if len(estAddr) == 0 {
estAddr = "0.0.0.0:0"
}
host, _port, err := net.SplitHostPort(estAddr)
port, _ := strconv.Atoi(_port)
switch network {
case "tcp", "tcp4", "tcp6":
remoteAddr = &net.TCPAddr{
IP: net.ParseIP(host),
Port: port,
}
case "udp", "udp4", "udp6":
remoteAddr = &net.UDPAddr{
IP: net.ParseIP(host),
Port: port,
}
default:
err = spec.ErrUnsupportNetwork
}
conn.(*wsConn).entry.RemoteAddr = remoteAddr.String()
conn.(*wsConn).entry.RemoteHost = addr
conn.(*wsConn).entry.Proto = network
go conn.(*wsConn).runPingLoop()
return
}

func (c *TohClient) dialWS(ctx context.Context, urlstr string, header http.Header) (
wsc *wsConn, respHeader http.Header, err error) {
respHeader = http.Header{}
var statusCode int
dialer := ws.Dialer{
NetDial: c.directNetDial,
Header: ws.HandshakeHeaderHTTP(header),
OnHeader: func(key, value []byte) (err error) {
respHeader.Add(string(key), string(value))
return
},
OnStatusError: func(status int, reason []byte, resp io.Reader) {
statusCode = status
},
}
wsc *ws.GorillaWsConn, establishAddr string, connEntry *ConnEntry, err error) {
u, err := url.Parse(urlstr)
if err != nil {
return
Expand All @@ -293,92 +261,36 @@ func (c *TohClient) dialWS(ctx context.Context, urlstr string, header http.Heade
case "https":
u.Scheme = "wss"
}
conn, _, _, err := dialer.Dial(context.Background(), u.String())
if statusCode == http.StatusUnauthorized {
dialer := websocket.Dialer{
NetDialContext: c.directNetDial,
HandshakeTimeout: 15 * time.Second,
}
conn, httpResp, err := dialer.Dial(u.String(), header)
if httpResp.StatusCode == http.StatusUnauthorized {
err = spec.ErrAuth
return
}
if err != nil {
err = fmt.Errorf("dial %s: %s", u, err)
return
}
wsc = &wsConn{
conn: conn,
keepalive: c.options.Keepalive,
connIdleTimeout: 75 * time.Second,
entry: &ConnEntry{
// Use the nonce returned by the server (some older versions of servers do not support nonce)
Nonce: spec.MustParseNonce(respHeader.Get(spec.HeaderHandshakeNonce)),
LocalAddr: conn.LocalAddr().String(),
lastRWTime: time.Now(),
ct: c.conntrack,
},
}
wsc.entry.add()
return
}

type wsConn struct {
conn net.Conn
keepalive time.Duration
connIdleTimeout time.Duration
entry *ConnEntry
}

func (c *wsConn) Read(ctx context.Context) (b []byte, err error) {
c.entry.lastRWTime = time.Now()
if dl, ok := ctx.Deadline(); ok {
c.conn.SetReadDeadline(dl)
}
b, err = wsutil.ReadServerBinary(c.conn)
if err != nil {
return
establishAddr = httpResp.Header.Get(spec.HeaderEstablishAddr)
nonce := spec.MustParseNonce(httpResp.Header.Get(spec.HeaderHandshakeNonce))
connEntry = &ConnEntry{
// Use the nonce returned by the server (some older versions of servers do not support nonce)
Nonce: nonce,
LocalAddr: conn.LocalAddr().String(),
lastRWTime: time.Now(),
ct: c.conntrack,
}
for i, v := range b {
b[i] = v ^ c.entry.Nonce
}
return
}
func (c *wsConn) Write(ctx context.Context, p []byte) error {
c.entry.lastRWTime = time.Now()
if dl, ok := ctx.Deadline(); ok {
c.conn.SetWriteDeadline(dl)
}
for i, v := range p {
p[i] = v ^ c.entry.Nonce
}
return wsutil.WriteClientBinary(c.conn, p)
}

func (c *wsConn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}

func (c *wsConn) Close(code int, reason string) error {
ws.WriteFrame(c.conn, ws.NewCloseFrame(ws.NewCloseFrameBody(ws.StatusCode(code), reason)))
c.entry.remove()
return c.conn.Close()
}

func (c *wsConn) Ping() error {
return wsutil.WriteClientMessage(c.conn, ws.OpPing, ws.NewPingFrame([]byte{}).Payload)
}

// runPingLoop keepalive the websocket connection
func (c *wsConn) runPingLoop() {
if c.keepalive == 0 {
return
}
for {
time.Sleep(c.keepalive)
if time.Since(c.entry.lastRWTime) > c.connIdleTimeout {
logrus.Debug("ping: exited. connection reached the max idle time ", c.connIdleTimeout)
break
}
err := c.Ping()
if err != nil {
logrus.Debug("ping: ", err)
break
}
}
wsConn := ws.NewGorillaWsConn(conn, nonce)
wsConn.SetKeepalive(c.options.Keepalive)
wsConn.SetConnIdleTimeout(75 * time.Second)
wsConn.SetOnClose(func() { connEntry.remove() })
wsConn.SetOnReadWrite(func() { connEntry.lastRWTime = time.Now() })
go wsConn.RunPingLoop()
wsc = wsConn
return
}
4 changes: 4 additions & 0 deletions client/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package client
import (
"sync"
"time"

"github.com/sirupsen/logrus"
)

type ConnEntry struct {
Expand All @@ -17,10 +19,12 @@ type ConnEntry struct {

func (c *ConnEntry) add() {
c.ct.addConn(c)
logrus.Debugf("tracking %s://%s", c.Proto, c.RemoteAddr)
}

func (c *ConnEntry) remove() {
c.ct.removeConn(c)
logrus.Debugf("stop tracking %s://%s", c.Proto, c.RemoteAddr)
}

type Conntrack struct {
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/decred/base58 v1.0.5
github.com/dustin/go-humanize v1.0.1
github.com/gobwas/ws v1.2.1
github.com/gorilla/websocket v1.5.1
github.com/miekg/dns v1.1.54
github.com/oschwald/geoip2-golang v1.8.0
github.com/sirupsen/logrus v1.9.0
Expand All @@ -16,13 +16,11 @@ require (

require (
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/oschwald/maxminddb-golang v1.10.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/tools v0.3.0 // indirect
)
17 changes: 6 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI=
Expand All @@ -39,14 +35,13 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
Loading

0 comments on commit 72387eb

Please sign in to comment.