Skip to content

Commit

Permalink
Merge pull request #12 from moov-io/support-larger-files
Browse files Browse the repository at this point in the history
fix: allow Reader to stream larger files
  • Loading branch information
adamdecaf authored Nov 8, 2023
2 parents 750fd60 + 80a23bf commit 64e336c
Show file tree
Hide file tree
Showing 4 changed files with 103,853 additions and 44 deletions.
119 changes: 77 additions & 42 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package go_ftp

import (
"bufio"
"bytes"
"crypto/tls"
"crypto/x509"
Expand Down Expand Up @@ -179,37 +180,105 @@ func (cc *client) Open(path string) (*File, error) {
cc.mu.Lock()
defer cc.mu.Unlock()

resp, err := cc.openFilepath(path)
conn, err := cc.connection()
if err != nil {
return nil, err
}

dir, filename := filepath.Split(path)
if dir != "" {
// Jump to previous directory after command is done
wd, err := conn.CurrentDir()
if err != nil {
return nil, err
}
defer func(previous string) {
// Return to our previous directory when initially called
if cleanupErr := conn.ChangeDir(previous); cleanupErr != nil {
err = fmt.Errorf("FTP: problem with readFiles: %w", cleanupErr)
}
}(wd)

// Move into directory to run the command
if err := conn.ChangeDir(dir); err != nil {
return nil, err
}
}

resp, err := conn.Retr(filename)
if err != nil {
return nil, fmt.Errorf("retrieving %s failed: %w", path, err)
}

data, err := readResponse(resp)
if err != nil {
return nil, fmt.Errorf("reading %s failed: %w", path, err)
}

return &File{
Filename: filepath.Base(path),
Contents: data,
}, nil
}

// Reader will open the file at path and provide a reader to access its contents.
// Callers need to close the returned Contents
// Callers need to close the returned Contents.
//
// Callers should be aware that network errors while reading can occur since contents
// are streamed from the FTP server. Having multiple open readers may not be supported.
// are streamed from the FTP server. Having multiple open readers is not supported.
func (cc *client) Reader(path string) (*File, error) {
cc.mu.Lock()
defer cc.mu.Unlock()

file, err := cc.openFilepath(path)
conn, err := cc.connection()
if err != nil {
return nil, err
}
return &File{

file := &File{
Filename: filepath.Base(path),
Contents: file,
}, nil
}

dir, filename := filepath.Split(path)
if dir != "" {
// Jump to previous directory after command is done
wd, err := conn.CurrentDir()
if err != nil {
return nil, err
}

// On file Close move back to the directory we were previously in
file.cleanup = func() error {
err := conn.ChangeDir(wd)
if err != nil {
return fmt.Errorf("returning to %s failed: %w", wd, err)
}
return nil
}

// Move into directory to run the command
if err := conn.ChangeDir(dir); err != nil {
return nil, err
}
}

resp, err := conn.Retr(filename)
if err != nil {
return nil, fmt.Errorf("retrieving %s failed: %w", path, err)
}
file.Contents = resp

prev := file.cleanup
file.cleanup = func() error {
if resp != nil {
if err := resp.Close(); err != nil {
return fmt.Errorf("closing RETR %s response failed: %w", path, err)
}
}
return prev()
}

return file, nil
}

func (cc *client) Delete(path string) error {
Expand Down Expand Up @@ -379,45 +448,11 @@ func (cc *client) Walk(dir string, fn fs.WalkDirFunc) error {
return nil
}

func (cc *client) openFilepath(path string) (resp *ftp.Response, err error) {
conn, err := cc.connection()
if err != nil {
return nil, err
}

dir, filename := filepath.Split(path)
if dir != "" {
// Jump to previous directory after command is done
wd, err := conn.CurrentDir()
if err != nil {
return nil, err
}
defer func(previous string) {
// Return to our previous directory when initially called
if cleanupErr := conn.ChangeDir(previous); cleanupErr != nil {
err = fmt.Errorf("FTP: problem with readFiles: %w", cleanupErr)
}
}(wd)

// Move into directory to run the command
if err := conn.ChangeDir(dir); err != nil {
return nil, err
}
}

resp, err = conn.Retr(filename)
if err != nil {
return nil, fmt.Errorf("retrieving %s failed: %w", path, err)
}

return resp, nil
}

func readResponse(resp *ftp.Response) (io.ReadCloser, error) {
defer resp.Close()

var buf bytes.Buffer
n, err := io.Copy(&buf, resp)
n, err := io.Copy(&buf, bufio.NewReader(resp))
// If there was nothing downloaded and no error then assume it's a directory.
//
// The FTP client doesn't have a STAT command, so we can't quite ensure this
Expand Down
57 changes: 56 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"io/fs"
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -43,6 +45,30 @@ func TestClient(t *testing.T) {
require.Equal(t, "hello world", strings.TrimSpace(buf.String()))
})

t.Run("open larger files", func(t *testing.T) {
largerFileSize := size(t, filepath.Join("testdata", "ftp-server", "bigdata", "large.txt"))

const iterations = 10
var wg sync.WaitGroup
wg.Add(iterations)
for i := 0; i < iterations; i++ {
go func(wg *sync.WaitGroup) {
defer wg.Done()

file, err := client.Open("/bigdata/large.txt")
require.NoError(t, err)

var buf bytes.Buffer
_, err = io.Copy(&buf, file)
require.NoError(t, err)

require.NoError(t, file.Close())
require.Equal(t, largerFileSize, len(buf.Bytes()))
}(&wg)
}
wg.Wait()
})

t.Run("reader", func(t *testing.T) {
file, err := client.Reader("archive/old.txt")
require.NoError(t, err)
Expand All @@ -53,6 +79,23 @@ func TestClient(t *testing.T) {
require.Equal(t, "previous data", strings.TrimSpace(buf.String()))
})

t.Run("read larger files", func(t *testing.T) {
largerFileSize := size(t, filepath.Join("testdata", "ftp-server", "bigdata", "large.txt"))

// reader must process files in sequence
for i := 0; i < 10; i++ {
file, err := client.Reader("/bigdata/large.txt")
require.NoError(t, err)

var buf bytes.Buffer
_, err = io.Copy(&buf, file)
require.NoError(t, err)

require.NoError(t, file.Close())
require.Equal(t, largerFileSize, len(buf.Bytes()))
}
})

t.Run("upload and delete", func(t *testing.T) {
body := io.NopCloser(strings.NewReader("example data"))
err := client.UploadFile("new.txt", body)
Expand Down Expand Up @@ -151,7 +194,7 @@ func TestClient(t *testing.T) {
})
require.NoError(t, err)
require.ElementsMatch(t, found, []string{
"Upper/names.txt",
"Upper/names.txt", "bigdata/large.txt",
"first.txt", "second.txt", "empty.txt",
"archive/old.txt", "archive/empty2.txt",
"with-empty/EMPTY1.txt", "with-empty/empty_file2.txt",
Expand Down Expand Up @@ -256,3 +299,15 @@ func TestClient__tlsDialOption(t *testing.T) {
require.NotNil(t, client)
require.NoError(t, client.Close())
}

func size(t *testing.T, where string) int {
t.Helper()

fd, err := os.Open(where)
require.NoError(t, err)

info, err := fd.Stat()
require.NoError(t, err)

return int(info.Size())
}
11 changes: 10 additions & 1 deletion file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type File struct {
ModTime time.Time

fileinfo fs.FileInfo

cleanup func() error
}

var _ fs.File = (&File{})
Expand All @@ -31,7 +33,14 @@ func (f *File) Close() error {
return nil
}
if f.Contents != nil {
return f.Contents.Close()
if err := f.Contents.Close(); err != nil {
return err
}
}
if f.cleanup != nil {
if err := f.cleanup(); err != nil {
return err
}
}
return nil
}
Expand Down
Loading

0 comments on commit 64e336c

Please sign in to comment.