Skip to content

Commit

Permalink
eventstreamer: Add StreamFromLatest option support (#99)
Browse files Browse the repository at this point in the history
* eventstreamer: Add StreamFromHead option support

* add test container support in github

* add test container support in github

* add test containers in github actions another day

* free lock on memstreamer once log is copied

* use slog for kafkastreamer

* handle context cancellation during sleep

* check for context cancellation

* clean up

* align kafka connector implementation with kafka streamer impl
  • Loading branch information
andrewwormald authored Mar 3, 2025
1 parent 429ede3 commit e5125b8
Show file tree
Hide file tree
Showing 12 changed files with 753 additions and 209 deletions.
182 changes: 126 additions & 56 deletions adapters/adaptertest/eventstreaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package adaptertest

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -40,68 +41,137 @@ func (s SyncStatus) String() string {
}
}

func RunEventStreamerTest(t *testing.T, constructor workflow.EventStreamer) {
b := workflow.NewBuilder[User, SyncStatus]("sync user 2")
b.AddStep(
SyncStatusStarted,
setEmail(),
SyncStatusEmailSet,
).WithOptions(
workflow.PollingFrequency(time.Millisecond*200),
workflow.ParallelCount(5),
)
b.AddTimeout(
SyncStatusEmailSet,
coolDownTimerFunc(),
coolDownTimeout(),
SyncStatusRegulationTimeout,
).WithOptions(
workflow.PollingFrequency(time.Millisecond * 200),
)
b.AddStep(
SyncStatusRegulationTimeout,
generateUserID(),
SyncStatusCompleted,
).WithOptions(
workflow.PollingFrequency(time.Millisecond*200),
workflow.ParallelCount(5),
)

now := time.Date(2023, time.April, 9, 8, 30, 0, 0, time.UTC)
clock := clock_testing.NewFakeClock(now)

wf := b.Build(
constructor,
memrecordstore.New(),
memrolescheduler.New(),
workflow.WithClock(clock),
workflow.WithTimeoutStore(memtimeoutstore.New()),
)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) {
t.Run("ReceiverOption - Ensure StreamFromLatest is implemented", func(t *testing.T) {
streamer := factory()
ctx := context.Background()
topic := "test-1"
sender, err := streamer.NewSender(ctx, topic)
require.NoError(t, err)

err = sender.Send(ctx, "123", 4, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.NoError(t, err)

err = sender.Send(ctx, "456", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.NoError(t, err)

var wg sync.WaitGroup

receiver, err := streamer.NewReceiver(ctx, topic, "my-receiver", workflow.StreamFromLatest())
require.NoError(t, err)

t.Run("Should only receive events that come in after connecting", func(t *testing.T) {
wg.Add(1)
go func() {
go func() {
err = sender.Send(ctx, "789", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.Nil(t, err)
}()

e, ack, err := receiver.Recv(ctx)
require.Nil(t, err)
require.Equal(t, "789", e.ForeignID)

err = ack()
require.Nil(t, err)

wg.Done()
}()

wg.Wait()
})

err = receiver.Close()
require.Nil(t, err)

t.Run("StreamFromLatest should have no affect when offset is committed", func(t *testing.T) {
err = sender.Send(ctx, "101", 5, map[workflow.Header]string{
workflow.HeaderTopic: topic,
})
require.Nil(t, err)

secondReceiver, err := streamer.NewReceiver(ctx, topic, "my-receiver", workflow.StreamFromLatest())
require.Nil(t, err)

// Should receive event send when receiver wasn't receiving events based on the offset being set.
e, ack, err := secondReceiver.Recv(ctx)
require.Nil(t, err)
require.Equal(t, "101", e.ForeignID)

err = ack()
require.Nil(t, err)
})
})
wf.Run(ctx)
t.Cleanup(wf.Stop)

foreignID := "1"
u := User{
CountryCode: "GB",
}
runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u))
require.Nil(t, err)
t.Run("Acceptance test - full workflow run through", func(t *testing.T) {
b := workflow.NewBuilder[User, SyncStatus]("sync user 2")
b.AddStep(
SyncStatusStarted,
setEmail(),
SyncStatusEmailSet,
).WithOptions(
workflow.PollingFrequency(time.Millisecond*200),
workflow.ParallelCount(5),
)
b.AddTimeout(
SyncStatusEmailSet,
coolDownTimerFunc(),
coolDownTimeout(),
SyncStatusRegulationTimeout,
).WithOptions(
workflow.PollingFrequency(time.Millisecond * 200),
)
b.AddStep(
SyncStatusRegulationTimeout,
generateUserID(),
SyncStatusCompleted,
).WithOptions(
workflow.PollingFrequency(time.Millisecond*200),
workflow.ParallelCount(5),
)

now := time.Date(2023, time.April, 9, 8, 30, 0, 0, time.UTC)
clock := clock_testing.NewFakeClock(now)

wf := b.Build(
factory(),
memrecordstore.New(),
memrolescheduler.New(),
workflow.WithClock(clock),
workflow.WithTimeoutStore(memtimeoutstore.New()),
)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})
wf.Run(ctx)
t.Cleanup(wf.Stop)

foreignID := "1"
u := User{
CountryCode: "GB",
}
runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u))
require.Nil(t, err)

workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet)
workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet)

clock.Step(time.Hour)
clock.Step(time.Hour)

record, err := wf.Await(ctx, foreignID, runId, SyncStatusCompleted)
require.Nil(t, err)
record, err := wf.Await(ctx, foreignID, runId, SyncStatusCompleted)
require.Nil(t, err)

require.Equal(t, "andrew@workflow.com", record.Object.Email)
require.Equal(t, SyncStatusCompleted.String(), record.Status.String())
require.NotEmpty(t, record.Object.UID)
require.Equal(t, "andrew@workflow.com", record.Object.Email)
require.Equal(t, SyncStatusCompleted.String(), record.Status.String())
require.NotEmpty(t, record.Object.UID)
})
}

func setEmail() func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
Expand Down
153 changes: 126 additions & 27 deletions adapters/kafkastreamer/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,162 @@ package kafkastreamer

import (
"context"
"errors"
"log/slog"
"time"

"github.com/segmentio/kafka-go"

"github.com/IBM/sarama"
"github.com/luno/workflow"
)

func NewConnector(c kafka.ReaderConfig, t Translator) *connector {
func NewConnector(brokers []string, c *sarama.Config, t Translator, topic string) *connector {
return &connector{
translator: t,
brokers: brokers,
config: c,
translator: t,
topic: topic,
}
}

type Translator func(m kafka.Message) workflow.ConnectorEvent
type Translator func(m *sarama.ConsumerMessage) *workflow.ConnectorEvent

type connector struct {
brokers []string
config *sarama.Config
translator Translator
config kafka.ReaderConfig
topic string
}

func (c *connector) Make(ctx context.Context, name string) (workflow.ConnectorConsumer, error) {
c.config.GroupID = name
c.config.Consumer.Offsets.Initial = sarama.OffsetOldest
cg, err := sarama.NewConsumerGroup(c.brokers, name, c.config)
if err != nil {
return nil, err
}

consumeCtx, cancel := context.WithCancel(ctx)
processor := newConnectorProcessor(consumeCtx, c.translator)
go func() {
for ctx.Err() == nil {
err := cg.Consume(consumeCtx, []string{c.topic}, processor)
if err != nil && errors.Is(err, context.Canceled) {
// Exit on context cancellation
return
} else if err != nil {
slog.Error("kafka consumer exited unexpectedly", "error", err.Error())

err = wait(ctx, time.Second)
if err != nil {
return
}

continue
}

err = wait(ctx, time.Millisecond*250)
if err != nil {
return
}
}
}()

// Wait for the processor to be ready
<-processor.ready

kafkaReader := kafka.NewReader(c.config)
return &consumer{
name: name,
translator: c.translator,
reader: kafkaReader,
name: name,
cancel: cancel,
translator: c.translator,
connectorProcessor: processor,
}, nil
}

type consumer struct {
name string
translator Translator
reader *kafka.Reader
name string
cancel context.CancelFunc
translator Translator
connectorProcessor *connectorProcessor
}

func (c *consumer) Recv(ctx context.Context) (*workflow.ConnectorEvent, workflow.Ack, error) {
var commit []kafka.Message
for ctx.Err() == nil {
m, err := c.reader.FetchMessage(ctx)
if err != nil {
return nil, nil, err
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case next := <-c.connectorProcessor.iterator:
return next()
}

// Append the message to the commit slice to ensure we send all messages that have been processed
commit = append(commit, m)

e := c.translator(m)
return &e, func() error {
return c.reader.CommitMessages(ctx, commit...)
}, nil
}

return nil, nil, ctx.Err()
}

func (c *consumer) Close() error {
return c.reader.Close()
c.cancel()
return nil
}

func newConnectorProcessor(ctx context.Context, translator Translator) *connectorProcessor {
return &connectorProcessor{
ctx: ctx,
ready: make(chan bool, 1),
translator: translator,
iterator: make(chan func() (*workflow.ConnectorEvent, workflow.Ack, error)),
}
}

// connectorProcessor implements the sarama.ConsumerGroupHandler interface
type connectorProcessor struct {
ctx context.Context
ready chan bool
translator Translator
iterator chan func() (*workflow.ConnectorEvent, workflow.Ack, error)
}

func (cp *connectorProcessor) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (cp *connectorProcessor) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim processes messages from Kafka
func (cp *connectorProcessor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
cp.ready <- true

for {
select {
case m := <-claim.Messages():
select {
case cp.iterator <- consumeConnectorEvent(session, m, cp.translator):
case <-cp.ctx.Done():
return cp.ctx.Err()
}
case <-cp.ctx.Done():
return nil
case <-session.Context().Done():
return nil

case m := <-claim.Messages():
cp.iterator <- func() (*workflow.ConnectorEvent, workflow.Ack, error) {
return cp.translator(m), func() error {
session.MarkMessage(m, "")
return nil
}, nil
}
case <-cp.ctx.Done():
return nil
case <-session.Context().Done():
return nil
}
}
}

func consumeConnectorEvent(
session sarama.ConsumerGroupSession,
m *sarama.ConsumerMessage,
t Translator,
) func() (*workflow.ConnectorEvent, workflow.Ack, error) {
return func() (*workflow.ConnectorEvent, workflow.Ack, error) {
return t(m), func() error {
session.MarkMessage(m, "")
return nil
}, nil
}
}
Loading

0 comments on commit e5125b8

Please sign in to comment.