Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topic-manager improvements #297

Merged
merged 2 commits into from
Feb 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:generate go-bindata -pkg templates -o web/templates/bindata.go web/templates/common/... web/templates/monitor/... web/templates/query/... web/templates/index/...
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockstorage.go github.com/lovoo/goka/storage Storage
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mocks.go github.com/lovoo/goka TopicManager,Producer,Broker
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockssarama.go github.com/Shopify/sarama Client
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mocks.go github.com/lovoo/goka TopicManager,Producer,
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockssarama.go github.com/Shopify/sarama Client,ClusterAdmin

/*
Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goka

import (
"errors"
"fmt"
reflect "reflect"
"regexp"
@@ -13,6 +14,7 @@ var (
errBuildConsumer = "error creating Kafka consumer: %v"
errBuildProducer = "error creating Kafka producer: %v"
errApplyOptions = "error applying options: %v"
errTopicNotFound = errors.New("requested topic was not found")
)

var (
22 changes: 18 additions & 4 deletions graph.go
Original file line number Diff line number Diff line change
@@ -99,12 +99,12 @@ func (gg *GroupGraph) isOutputTopic(topic Stream) bool {

// inputs returns all input topics (tables and streams)
func (gg *GroupGraph) inputs() Edges {
return append(append(gg.inputStreams, gg.inputTables...), gg.crossTables...)
return chainEdges(gg.inputStreams, gg.inputTables, gg.crossTables)
}

// copartitioned returns all copartitioned topics (joint tables and input streams)
func (gg *GroupGraph) copartitioned() Edges {
return append(gg.inputStreams, gg.inputTables...)
return chainEdges(gg.inputStreams, gg.inputTables)
}

func (gg *GroupGraph) codec(topic string) Codec {
@@ -196,8 +196,7 @@ func (gg *GroupGraph) Validate() error {
if len(gg.inputStreams) == 0 {
return errors.New("no input stream in group graph")
}
for _, t := range append(gg.outputStreams,
append(gg.inputStreams, append(gg.inputTables, gg.crossTables...)...)...) {
for _, t := range chainEdges(gg.outputStreams, gg.inputStreams, gg.inputTables, gg.crossTables) {
if t.Topic() == loopName(gg.Group()) {
return errors.New("should not directly use loop stream")
}
@@ -219,6 +218,21 @@ type Edge interface {
// Edges is a slice of edge objects.
type Edges []Edge

// chainEdges chains edges together to avoid error-prone
// append(edges, moreEdges...) constructs in the graph
func chainEdges(edgeList ...Edges) Edges {
var sum int
for _, edges := range edgeList {
sum += len(edges)
}
chained := make(Edges, 0, sum)

for _, edges := range edgeList {
chained = append(chained, edges...)
}
return chained
}

// Topics returns the names of the topics of the edges.
func (e Edges) Topics() []string {
var t []string
7 changes: 6 additions & 1 deletion graph_test.go
Original file line number Diff line number Diff line change
@@ -74,7 +74,13 @@ func TestGroupGraph_Validate(t *testing.T) {
)
err = g.Validate()
test.AssertStringContains(t, err.Error(), "loop stream")
}

func TestGroupGraph_chainEdges(t *testing.T) {
test.AssertEqual(t, len(chainEdges()), 0)
test.AssertEqual(t, len(chainEdges(Edges{}, Edges{})), 0)
test.AssertEqual(t, chainEdges(Edges{Join("a", nil)}, Edges{}), Edges{Join("a", nil)})
test.AssertEqual(t, chainEdges(Edges{Join("a", nil)}, Edges{Join("a", nil), Join("b", nil)}), Edges{Join("a", nil), Join("a", nil), Join("b", nil)})
}

func TestGroupGraph_codec(t *testing.T) {
@@ -87,7 +93,6 @@ func TestGroupGraph_codec(t *testing.T) {
codec := g.codec(topic)
test.AssertEqual(t, codec, c)
}

}

func TestGroupGraph_callback(t *testing.T) {
106 changes: 106 additions & 0 deletions integrationtest/topicmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package integrationtest

import (
"crypto/rand"
"encoding/hex"
"flag"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka"
"github.com/lovoo/goka/internal/test"
)

var (
systemtest = flag.Bool("systemtest", false, "set to run systemtests that require a running kafka-version")
)

func TestTopicManagerCreate(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0

tm, err := goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())([]string{"localhost:9092"})
test.AssertNil(t, err)

err = tm.EnsureTopicExists("test10", 4, 2, nil)
test.AssertNil(t, err)

}

// Tests the topic manager with sarama version v11 --> so it will test topic configuration using
// the sarama.ClusterAdmin
func TestTopicManager_v11(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.MismatchBehavior = goka.TMConfigMismatchBehaviorFail

tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{"localhost:9092"})
test.AssertNil(t, err)

client, _ := sarama.NewClient([]string{"localhost:9092"}, cfg)
admin, _ := sarama.NewClusterAdminFromClient(client)

t.Run("ensure-new-stream", func(t *testing.T) {
topic := newTopicName()

// delete topic, ignore error if it does not exist
admin.DeleteTopic(topic)

err := tm.EnsureStreamExists(topic, 10)
test.AssertNil(t, err)
time.Sleep(1 * time.Second)
// trying to create the same is fine
err = tm.EnsureStreamExists(topic, 10)
test.AssertNil(t, err)
time.Sleep(1 * time.Second)
// partitions changed - error
err = tm.EnsureStreamExists(topic, 11)
test.AssertNotNil(t, err)
})

t.Run("list-partitions", func(t *testing.T) {

var (
topic = newTopicName()
partitions []int32
err error
)
_, err = tm.Partitions(topic)
test.AssertNotNil(t, err)
test.AssertTrue(t, strings.Contains(err.Error(), "requested topic was not found"))
test.AssertEqual(t, len(partitions), 0)

tm.EnsureTableExists(topic, 123)
time.Sleep(1 * time.Second)
partitions, err = tm.Partitions(topic)
test.AssertNil(t, err)
test.AssertEqual(t, len(partitions), 123)

})

t.Run("non-existent", func(t *testing.T) {
// topic does not exist
partitions, err := tm.Partitions("non-existent-topic")
test.AssertTrue(t, len(partitions) == 0, "expected no partitions, was", partitions)
test.AssertNotNil(t, err)
})

}

func newTopicName() string {
topicBytes := make([]byte, 4)
rand.Read(topicBytes)
return hex.EncodeToString(topicBytes)
}
3 changes: 2 additions & 1 deletion internal/test/test.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,8 @@ func AssertNil(t Fataler, actual interface{}) {
value := reflect.ValueOf(actual)
if value.IsValid() {
if !value.IsNil() {
t.Fatalf("Expected value to be nil, but was not nil in %s", string(debug.Stack()))

t.Fatalf("Expected value to be nil, but was not nil (%v) in %s", actual, string(debug.Stack()))
}
}
}
6 changes: 0 additions & 6 deletions mockautoconsumers.go
Original file line number Diff line number Diff line change
@@ -634,11 +634,5 @@ func (cg *MockConsumerGroup) Close() error {

// close old errs chan and create new one
close(cg.errs)
cg.errs = make(chan error)

cg.offset = 0
cg.currentGeneration = 0
cg.sessions = make(map[string]*MockConsumerGroupSession)
cg.failOnConsume = nil
return nil
}
4 changes: 2 additions & 2 deletions mockbuilder.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ type builderMock struct {
consumerGroup *MockConsumerGroup
producer *MockProducer
client *MockClient
broker *MockBroker
admin *MockClusterAdmin
}

func newBuilderMock(ctrl *gomock.Controller) *builderMock {
@@ -33,7 +33,7 @@ func newBuilderMock(ctrl *gomock.Controller) *builderMock {
tmgr: NewMockTopicManager(ctrl),
producer: NewMockProducer(ctrl),
client: NewMockClient(ctrl),
broker: NewMockBroker(ctrl),
admin: NewMockClusterAdmin(ctrl),
}
}

Loading
Oops, something went wrong.