Skip to content

Commit

Permalink
Feature/rework interfaces (#7)
Browse files Browse the repository at this point in the history
* ref(create chunks) Add parallel workers

* opti(int vector) Optimize GC and memory alloc

* fix(create chunk) race condition

* fix(vector) repair incorrect insert

* ref(memusage) add field to enable memusage analysis

* ref(test) better test names

* add(makefile) add command to test race conditions

* add(ignore) untrack benchmark files

* ref(merge sort) export results to file
ref(merge sort) minimize chunks min value
ref(memusage) become an option

* ref(create chunks) Add channel with batches to sort in parallel

* opti(main) find good parameter (speed:1min10, ram:80M)

* ref(vector) Remove unecessary sort function

* fix(test) Repair batching channel test
ref(vector) Remove useless methods

* ref(vector) Replace all empty interfaces with concrete types
new(tsv model) Allow merge sort on tsv files

* ref(allocate) Beautify slice vector setup

* Increase output buffer size

* ref (tsv vector) Remove unusued struct
fix(create chunks) Fix wrong context

* Add docker setup
Remove deprectaed function
  • Loading branch information
askiada authored Feb 14, 2022
1 parent 8af00c7 commit d4fcd22
Show file tree
Hide file tree
Showing 28 changed files with 1,486 additions and 323 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
data/
*_test.go
testdata/
*.tsv
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
bench*
gen*
chunk_*.tsv
bin/
24 changes: 24 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM golang:1.17-alpine AS builder

WORKDIR /go/src/github.com/askiada/external-sort

# Caching dependencies.
COPY go.mod .
COPY go.sum .

COPY . .
RUN chmod -R 777 .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
go build \
-a -installsuffix cgo \
-ldflags="-w -s" \
-trimpath \
-o /bin/external-sort main.go

FROM alpine:latest

RUN apk --no-cache add ca-certificates
COPY --from=builder /bin/external-sort /external-sort

USER root
ENTRYPOINT ["/external-sort"]
29 changes: 27 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
.PHONY: help
help: ## Show this
@grep -E '^[0-9a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

tag=v1.0.0
docker_image=askiada/external-sort

include ./env.list
export $(shell sed 's/=.*//' ./env.list)


.PHONY: test
test:
go test ./...

.PHONY: test_race
test_race:
go test -race ./...

.PHONY: run
run:
go run main.go
run: build
./bin/external-sort

.PHONY: build
build:
go build -o bin/external-sort main.go

.PHONY: build_docker
build_docker: ## Build a docker image from current git sha
@docker build \
--build-arg BUILDKIT_INLINE_CACHE=1 \
-t $(docker_image):$(tag) .
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,20 @@ make test

## Show some stuff

Set the correct values in env.list file

```sh
make run
```

Print on stdout how we ordered 10 integers. The original file can be find `data/10elems.tsv`

## Docker setup

You can set all the values in the file `env.list`

```sh
make build_docker
```

docker run --rm -it -v $(pwd):/mnt/data --env-file env.list askiada/external-sort:v1.0.0
6 changes: 6 additions & 0 deletions env.list
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
INPUT_PATH=./works.tsv
OUTPUT_PATH=./output.tsv
CHUNK_FOLDER=./data/chunks/
CHUNK_SIZE=1000000
MAX_WORKERS=10
OUTPUT_BUFFER_SIZE=1000
113 changes: 113 additions & 0 deletions file/batchingchannels/batching_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package batchingchannels

import (
"context"

"github.com/askiada/external-sort/vector"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

// BatchingChannel implements the Channel interface, with the change that instead of producing individual elements
// on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel
// will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).
type BatchingChannel struct {
input chan string
output chan vector.Vector
buffer vector.Vector
allocate *vector.Allocate
g *errgroup.Group
sem *semaphore.Weighted
dCtx context.Context
size int
maxWorker int64
}

func NewBatchingChannel(ctx context.Context, allocate *vector.Allocate, maxWorker int64, size int) *BatchingChannel {
if size == 0 {
panic("channels: BatchingChannel does not support unbuffered behaviour")
}
if size < 0 {
panic("channels: invalid negative size in NewBatchingChannel")
}
g, dCtx := errgroup.WithContext(ctx)
ch := &BatchingChannel{
input: make(chan string),
output: make(chan vector.Vector),
size: size,
allocate: allocate,
maxWorker: maxWorker,
g: g,
sem: semaphore.NewWeighted(maxWorker),
dCtx: dCtx,
}
go ch.batchingBuffer()
return ch
}

func (ch *BatchingChannel) In() chan<- string {
return ch.input
}

// Out returns a <-chan vector.Vector in order that BatchingChannel conforms to the standard Channel interface provided
// by this package, however each output value is guaranteed to be of type vector.Vector - a vector collecting the most
// recent batch of values sent on the In channel. The vector is guaranteed to not be empty or nil.
func (ch *BatchingChannel) Out() <-chan vector.Vector {
return ch.output
}

func (ch *BatchingChannel) ProcessOut(f func(vector.Vector) error) error {
for val := range ch.Out() {
if err := ch.sem.Acquire(ch.dCtx, 1); err != nil {
return err
}
val := val
ch.g.Go(func() error {
defer ch.sem.Release(1)
return f(val)
})
}
err := ch.g.Wait()
if err != nil {
return err
}
return nil
}

func (ch *BatchingChannel) Len() int {
return ch.size
}

func (ch *BatchingChannel) Cap() int {
return ch.size
}

func (ch *BatchingChannel) Close() {
close(ch.input)
}

func (ch *BatchingChannel) batchingBuffer() {
ch.buffer = ch.allocate.Vector(ch.size, ch.allocate.Key)
for {
elem, open := <-ch.input
if open {
err := ch.buffer.PushBack(elem)
if err != nil {
ch.g.Go(func() error {
return err
})
}
} else {
if ch.buffer.Len() > 0 {
ch.output <- ch.buffer
}
break
}
if ch.buffer.Len() == ch.size {
ch.output <- ch.buffer
ch.buffer = ch.allocate.Vector(ch.size, ch.allocate.Key)
}
}

close(ch.output)
}
120 changes: 120 additions & 0 deletions file/batchingchannels/batching_channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package batchingchannels_test

import (
"context"
"strconv"
"sync"
"testing"
"time"

"github.com/askiada/external-sort/file/batchingchannels"
"github.com/askiada/external-sort/vector"
"github.com/askiada/external-sort/vector/key"
"github.com/stretchr/testify/assert"
)

type Int struct {
value int
}

func AllocateInt(line string) (key.Key, error) {
num, err := strconv.Atoi(line)
if err != nil {
return nil, err
}
return &Int{num}, nil
}

func (k *Int) Get() int {
return k.value
}

func (k *Int) Less(other key.Key) bool {
return k.value < other.(*Int).value
}
func testBatches(t *testing.T, ch *batchingchannels.BatchingChannel) {
maxI := 10000
expectedSum := (maxI - 1) * maxI / 2
wg := &sync.WaitGroup{}
wgInput := &sync.WaitGroup{}

maxIn := 100
wgInput.Add(maxIn)
for j := 0; j < maxIn; j++ {
go func(j int) {
defer wgInput.Done()
for i := maxI / maxIn * j; i < maxI*(j+1)/maxIn; i++ {
ch.In() <- strconv.Itoa(i)
}
}(j)
}

go func() {
wgInput.Wait()
ch.Close()
}()

got := make(chan *vector.Element, maxI)
wgSum := &sync.WaitGroup{}
wgSum.Add(1)
gotSum := 0
go func() {
defer wgSum.Done()
for g := range got {
gotSum += g.Key.(*Int).Get()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err := ch.ProcessOut(func(val vector.Vector) error {
for i := 0; i < val.Len(); i++ {
val := val.Get(i)
got <- val
}
time.Sleep(3 * time.Millisecond)
return nil
})
if err != nil {
panic(err)
}
}()
wg.Wait()
close(got)
wgSum.Wait()
assert.Equal(t, expectedSum, gotSum)
}

func TestBatchingChannel(t *testing.T) {
allocate := vector.DefaultVector(AllocateInt)
ch := batchingchannels.NewBatchingChannel(context.Background(), allocate, 2, 50)
testBatches(t, ch)

ch = batchingchannels.NewBatchingChannel(context.Background(), allocate, 2, 3)
testBatches(t, ch)

ch = batchingchannels.NewBatchingChannel(context.Background(), allocate, 2, 1)
testChannelConcurrentAccessors(t, "batching channel", ch)
}

func TestBatchingChannelCap(t *testing.T) {
allocate := vector.DefaultVector(AllocateInt)
ch := batchingchannels.NewBatchingChannel(context.Background(), allocate, 2, 5)
if ch.Cap() != 5 {
t.Error("incorrect capacity on infinite channel")
}
}

func testChannelConcurrentAccessors(t *testing.T, name string, ch *batchingchannels.BatchingChannel) {
// no asserts here, this is just for the race detector's benefit
go ch.Len()
go ch.Cap()

go func() {
ch.In() <- ""
}()

go func() {
<-ch.Out()
}()
}
Loading

0 comments on commit d4fcd22

Please sign in to comment.