Skip to content

Commit

Permalink
Merge pull request #47 from domainr/worker-writer
Browse files Browse the repository at this point in the history
Support concurrent access via mutex and worker goroutine
  • Loading branch information
ydnar authored Nov 23, 2021
2 parents a60562e + 12cceb3 commit 779e844
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 162 deletions.
10 changes: 4 additions & 6 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ func (c *Conn) CheckDomainExtensions(domains []string, extData map[string]string
return nil, err
}

err = c.writeDataUnit(x)
err = c.writeRequest(x)
if err != nil {
return nil, err
}

var res Response
err = c.readResponse(&res)
res, err := c.readResponse()
if err != nil {
return nil, err
}
Expand All @@ -41,12 +40,11 @@ func (c *Conn) CheckDomainExtensions(domains []string, extData map[string]string
if err != nil {
return nil, err
}
err = c.writeDataUnit(x)
err = c.writeRequest(x)
if err != nil {
return nil, err
}
var res2 Response
err = c.readResponse(&res2)
res2, err := c.readResponse()
if err != nil {
return nil, err
}
Expand Down
200 changes: 104 additions & 96 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package epp

import (
"bytes"
"encoding/binary"
"encoding/xml"
"io"
"net"
"sync"
"time"
)

Expand All @@ -19,153 +19,161 @@ func IgnoreEOF(err error) error {
}

// Conn represents a single connection to an EPP server.
// This implementation is not safe for concurrent use.
// Reads and writes are serialized, so it is safe for concurrent use.
type Conn struct {
// Conn is the underlying net.Conn (usually a TLS connection).
net.Conn
buf bytes.Buffer
decoder *xml.Decoder
saved xml.Decoder

// Timeout defines the timeout for network operations.
// It must be set at initialization. Changing it after
// a connection is already opened will have no effect.
Timeout time.Duration

// m protects Greeting and LoginResult.
m sync.Mutex

// Greeting holds the last received greeting message from the server,
// indicating server name, status, data policy and capabilities.
//
// Deprecated: This field is written to upon opening a new EPP connection and should not be modified.
Greeting

// LoginResult holds the last received login response message's Result
// from the server, in which some servers might include diagnostics such
// as connection count limits.
//
// Deprecated: this field is written to by the Login method but otherwise is not used by this package.
LoginResult Result

// Timeout defines the timeout for network operations.
Timeout time.Duration
// mWrite synchronizes connection writes.
mWrite sync.Mutex

responses chan *Response
readErr error

done chan struct{}
}

// NewConn initializes an epp.Conn from a net.Conn and performs the EPP
// handshake. It reads and stores the initial EPP <greeting> message.
// https://tools.ietf.org/html/rfc5730#section-2.4
func NewConn(conn net.Conn) (*Conn, error) {
c := newConn(conn)
g, err := c.readGreeting()
if err == nil {
c.Greeting = g
}
return c, err
return NewTimeoutConn(conn, 0)
}

// NewTimeoutConn initializes an epp.Conn like NewConn, limiting the duration of network
// operations on conn using Set(Read|Write)Deadline.
func NewTimeoutConn(conn net.Conn, timeout time.Duration) (*Conn, error) {
c := newConn(conn)
c.Timeout = timeout
c := &Conn{
Conn: conn,
Timeout: timeout,
responses: make(chan *Response),
done: make(chan struct{}),
}
go c.readLoop()
g, err := c.readGreeting()
if err == nil {
c.m.Lock()
c.Greeting = g
c.m.Unlock()
}
return c, err
}

// Close sends an EPP <logout> command and closes the connection c.
func (c *Conn) Close() error {
select {
case <-c.done:
return net.ErrClosed
default:
}
c.Logout()
close(c.done)
return c.Conn.Close()
}

// newConn initializes an epp.Conn from a net.Conn.
// Used internally for testing.
func newConn(conn net.Conn) *Conn {
c := Conn{Conn: conn}
c.decoder = xml.NewDecoder(&c.buf)
c.saved = *c.decoder
return &c
// writeRequest writes a single EPP request (x) for writing on c.
// writeRequest can be called from multiple goroutines.
func (c *Conn) writeRequest(x []byte) error {
c.mWrite.Lock()
defer c.mWrite.Unlock()
if c.Timeout > 0 {
c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout))
}
return writeDataUnit(c.Conn, x)
}

// reset resets the underlying xml.Decoder and bytes.Buffer,
// restoring the original state of the underlying
// xml.Decoder (pos 1, line 1, stack, etc.) using a hack.
func (c *Conn) reset() {
c.buf.Reset()
*c.decoder = c.saved // Heh.
// readResponse dequeues and returns a EPP response from c.
// It returns an error if the EPP response contains an error Result.
// readResponse can be called from multiple goroutines.
func (c *Conn) readResponse() (*Response, error) {
select {
case res := <-c.responses:
if res == nil {
return res, c.readErr
}
if res.Result.IsError() {
return nil, &res.Result
}
return res, nil
case <-c.done:
return nil, net.ErrClosed
}
}

// writeDataUnit writes a slice of bytes to c.
func (c *Conn) readLoop() {
defer close(c.responses)
timeout := c.Timeout
r := &io.LimitedReader{R: c.Conn}
decoder := xml.NewDecoder(r)
for {
if timeout > 0 {
c.Conn.SetReadDeadline(time.Now().Add(timeout))
}
n, err := readDataUnitHeader(c.Conn)
if err != nil {
c.readErr = err
return
}
r.N = int64(n)
res := &Response{}
err = IgnoreEOF(scanResponse.Scan(decoder, res))
if err != nil {
c.readErr = err
return
}
c.responses <- res
}
}

// writeDataUnit writes x to w.
// Bytes written are prefixed with 32-bit header specifying the total size
// of the data unit (message + 4 byte header), in network (big-endian) order.
// http://www.ietf.org/rfc/rfc4934.txt
func (c *Conn) writeDataUnit(x []byte) error {
func writeDataUnit(w io.Writer, x []byte) error {
logXML("<-- WRITE DATA UNIT -->", x)
s := uint32(4 + len(x))
if c.Timeout > 0 {
c.Conn.SetWriteDeadline(time.Now().Add(c.Timeout))
}
err := binary.Write(c.Conn, binary.BigEndian, s)
err := binary.Write(w, binary.BigEndian, s)
if err != nil {
return err
}
_, err = c.Conn.Write(x)
_, err = w.Write(x)
return err
}

// readResponse reads a single EPP response from c and parses the XML into req.
// It returns an error if the EPP response contains an error Result.
func (c *Conn) readResponse(res *Response) error {
err := c.readDataUnit()
if err != nil {
return err
}
err = IgnoreEOF(scanResponse.Scan(c.decoder, res))
if err != nil {
return err
}
if res.Result.IsError() {
return &res.Result
}
return nil
}

// readDataUnit reads a single EPP message from c into
// c.buf. The bytes in c.buf are valid until the next
// call to readDataUnit.
func (c *Conn) readDataUnit() error {
c.reset()
var s int32
if c.Timeout > 0 {
c.Conn.SetReadDeadline(time.Now().Add(c.Timeout))
}
err := binary.Read(c.Conn, binary.BigEndian, &s)
if err != nil {
return err
}
s -= 4 // https://tools.ietf.org/html/rfc5734#section-4
if s < 0 {
return io.ErrUnexpectedEOF
}
lr := io.LimitedReader{R: c.Conn, N: int64(s)}
n, err := c.buf.ReadFrom(&lr)
// readDataUnitHeader reads a single EPP data unit header from r, returning the payload size or an error.
// An EPP data unit is prefixed with 32-bit header specifying the total size
// of the data unit (message + 4 byte header), in network (big-endian) order.
// http://www.ietf.org/rfc/rfc4934.txt
func readDataUnitHeader(r io.Reader) (uint32, error) {
var n uint32
err := binary.Read(r, binary.BigEndian, &n)
if err != nil {
return err
return 0, err
}
if n != int64(s) || lr.N != 0 {
return io.ErrUnexpectedEOF
if n < 4 {
return 0, io.ErrUnexpectedEOF
}
logXML("<-- READ DATA UNIT -->", c.buf.Bytes())
return nil
}

func deleteRange(s, pfx, sfx []byte) []byte {
start := bytes.Index(s, pfx)
if start < 0 {
return s
}
end := bytes.Index(s[start+len(pfx):], sfx)
if end < 0 {
return s
}
end += start + len(pfx) + len(sfx)
size := len(s) - (end - start)
copy(s[start:size], s[end:])
return s[:size]
}

func deleteBufferRange(buf *bytes.Buffer, pfx, sfx []byte) {
v := deleteRange(buf.Bytes(), pfx, sfx)
buf.Truncate(len(v))
// https://tools.ietf.org/html/rfc5734#section-4
return n - 4, err
}
Loading

0 comments on commit 779e844

Please sign in to comment.