Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 9, 2025
1 parent 4082710 commit 1bf978b
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 122 deletions.
1 change: 0 additions & 1 deletion pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func NewReliableConsumer(env *stream.Environment, streamName string,
logs.LogDebug("[Reliable] - creating %s", res.getInfo())
err := res.newConsumer()
if err == nil {

res.setStatus(StatusOpen)
}
logs.LogDebug("[Reliable] - created %s", res.getInfo())
Expand Down
19 changes: 8 additions & 11 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type Client struct {
tcpParameters *TCPParameters
saslConfiguration *SaslConfiguration

mutex *sync.Mutex
metadataListener metadataListener
mutex *sync.Mutex
//metadataListener metadataListener
lastHeartBeat HeartBeat
socketCallTimeout time.Duration
availableFeatures *availableFeatures
Expand Down Expand Up @@ -512,10 +512,10 @@ func (c *Client) Close() error {
}
}

if c.metadataListener != nil {
close(c.metadataListener)
c.metadataListener = nil
}
//if c.metadataListener != nil {
// close(c.metadataListener)
// c.metadataListener = nil
//}
c.closeHartBeat()
if c.getSocket().isOpen() {

Expand Down Expand Up @@ -747,6 +747,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {

streamMetadata := streamsMetadata.Get(stream)
if streamMetadata.responseCode != responseCodeOk {

return nil, lookErrorCode(streamMetadata.responseCode)
}

Expand Down Expand Up @@ -992,12 +993,8 @@ func (c *Client) DeclareSubscriber(streamName string,
go func() {
for {
select {
case code := <-consumer.response.code:
if code.id == closeChannel {
return
}

case chunk, ok := <-consumer.response.chunkForConsumer:
case chunk, ok := <-consumer.chunkForConsumer:
if !ok {
return
}
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
LeaderLocatorBalanced = "balanced"
LeaderLocatorClientLocal = "client-local"
DeletePublisher = "deletePublisher"
UnSubscribe = "unSubscribe"

StreamTcpPort = "5552"

Expand Down
85 changes: 49 additions & 36 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
)

type Consumer struct {
ID uint8
response *Response
options *ConsumerOptions
onClose onInternalClose
mutex *sync.Mutex
MessagesHandler MessagesHandler
ID uint8
response *Response
options *ConsumerOptions
onClose onInternalClose
mutex *sync.Mutex
chunkForConsumer chan chunkInfo
MessagesHandler MessagesHandler
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
// and won't change. currentOffset is the status of the offset
currentOffset int64
Expand Down Expand Up @@ -312,52 +313,64 @@ func (consumer *Consumer) Close() error {
if consumer.getStatus() == closed {
return AlreadyClosed
}
consumer.cacheStoreOffset()

consumer.setStatus(closed)
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
if errGet != nil {
return nil
}

length := 2 + 2 + 4 + 1
resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe)
correlationId := resp.correlationid
var b = bytes.NewBuffer(make([]byte, 0, length+4))
writeProtocolHeader(b, length, CommandUnsubscribe,
correlationId)

writeByte(b, consumer.ID)
err := consumer.options.client.handleWrite(b.Bytes(), resp)
if err.Err != nil && err.isTimeout {
return err.Err
}

errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{
return consumer.close(Event{
Command: CommandUnsubscribe,
StreamName: consumer.GetStreamName(),
Name: consumer.GetName(),
Reason: "unSubscribe",
Reason: UnSubscribe,
Err: nil,
})
}

if errC != nil {
logs.LogWarn("Error during remove consumer id:%s", errC)
func (consumer *Consumer) close(reason Event) error {

if consumer.options == nil {
logs.LogWarn("consumer options is nil, the close will be ignored")
return nil
}
consumer.cacheStoreOffset()
consumer.setStatus(closed)

if consumer.options.client.coordinator.ConsumersCount() == 0 {
err := consumer.options.client.Close()
if err != nil {
return err
if closeHandler := consumer.GetCloseHandler(); closeHandler != nil {
closeHandler <- reason
close(consumer.closeHandler)
consumer.closeHandler = nil
}

if consumer.chunkForConsumer != nil {
close(consumer.chunkForConsumer)
consumer.chunkForConsumer = nil
}

if consumer.response.data != nil {
close(consumer.response.data)
consumer.response.data = nil
}

if reason.Reason == UnSubscribe {
length := 2 + 2 + 4 + 1
resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe)
correlationId := resp.correlationid
var b = bytes.NewBuffer(make([]byte, 0, length+4))
writeProtocolHeader(b, length, CommandUnsubscribe,
correlationId)

writeByte(b, consumer.ID)
err := consumer.options.client.handleWrite(b.Bytes(), resp)
if err.Err != nil && err.isTimeout {
logs.LogWarn("error during consumer unsubscribe:%s", err.Err)
}
}

if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 {
_ = consumer.options.client.Close()
}

ch := make(chan uint8, 1)
ch <- consumer.ID
consumer.onClose(ch)
close(ch)
return err.Err
return nil
}

func (consumer *Consumer) cacheStoreOffset() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var _ = Describe("Streaming Consumers", func() {
Eventually(func() int32 {
return atomic.LoadInt32(&commandIdRecv)
}, 5*time.Second).Should(Equal(int32(CommandUnsubscribe)),
"command received should be CommandMetadataUpdate ")
"command received should be unSubscribe ")

Expect(err).NotTo(HaveOccurred())
})
Expand Down
18 changes: 2 additions & 16 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type chunkInfo struct {
type Response struct {
code chan Code
data chan interface{}
chunkForConsumer chan chunkInfo
commandDescription string
correlationid int
}
Expand Down Expand Up @@ -84,20 +83,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
if err != nil {
return err
}
consumer.setStatus(closed)
reason.StreamName = consumer.GetStreamName()
reason.Name = consumer.GetName()

if closeHandler := consumer.GetCloseHandler(); closeHandler != nil {
closeHandler <- reason
close(closeHandler)
closeHandler = nil
}

close(consumer.response.chunkForConsumer)
close(consumer.response.code)
return consumer.close(reason)

return nil
}
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {

Expand All @@ -117,7 +104,6 @@ func (coordinator *Coordinator) RemoveResponseById(id interface{}) error {
err = coordinator.removeById(fmt.Sprintf("%d", id), coordinator.responses)
close(resp.code)
close(resp.data)
close(resp.chunkForConsumer)
return err
}

Expand All @@ -131,7 +117,6 @@ func newResponse(commandDescription string) *Response {
res.commandDescription = commandDescription
res.code = make(chan Code, 1)
res.data = make(chan interface{}, 1)
res.chunkForConsumer = make(chan chunkInfo, 100)
return res
}

Expand Down Expand Up @@ -200,6 +185,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
lastStoredOffset: -1, // because 0 is a valid value for the offset
isPromotedAsActive: true,
lastAutoCommitStored: time.Now(),
chunkForConsumer: make(chan chunkInfo, 100),
}

coordinator.consumers[lastId] = item
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ var _ = Describe("Coordinator", func() {
Command: 0,
StreamName: "UNIT_TESTS",
Name: "",
Reason: "UNIT_TEST",
Err: nil,
})
Expect(err).NotTo(HaveOccurred())
Expand Down
69 changes: 30 additions & 39 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,7 @@ func (c *Client) maybeCleanProducers(streamName string) {
}
}
c.mutex.Unlock()
if c.coordinator.ProducersCount() == 0 {
err := c.Close()
if err != nil {
return
}
}

}

func (c *Client) maybeCleanConsumers(streamName string) {
Expand All @@ -540,12 +535,6 @@ func (c *Client) maybeCleanConsumers(streamName string) {
}
}
c.mutex.Unlock()
if c.coordinator.ConsumersCount() == 0 {
err := c.Close()
if err != nil {
return
}
}
}

func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string,
Expand Down Expand Up @@ -603,18 +592,18 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP

func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
chMeta := make(chan metaDataUpdateEvent, 1)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
for metaDataUpdateEvent := range ch {
clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName)
cc.maybeCleanClients()
if !cl.socket.isOpen() {
return
}
}

}(chMeta, clientResult)
//chMeta := make(chan metaDataUpdateEvent, 1)
//clientResult.metadataListener = chMeta
//go func(ch <-chan metaDataUpdateEvent, cl *Client) {
// for metaDataUpdateEvent := range ch {
// clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName)
// cc.maybeCleanClients()
// if !cl.socket.isOpen() {
// return
// }
// }
//
//}(chMeta, clientResult)

cc.nextId++
cc.clientsPerContext[cc.nextId] = clientResult
Expand All @@ -638,18 +627,18 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro

if clientResult == nil {
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
chMeta := make(chan metaDataUpdateEvent)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
for metaDataUpdateEvent := range ch {
clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName)
cc.maybeCleanClients()
if !cl.socket.isOpen() {
return
}
}

}(chMeta, clientResult)
//chMeta := make(chan metaDataUpdateEvent)
//clientResult.metadataListener = chMeta
//go func(ch <-chan metaDataUpdateEvent, cl *Client) {
// for metaDataUpdateEvent := range ch {
// clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName)
// cc.maybeCleanClients()
// if !cl.socket.isOpen() {
// return
// }
// }
//
//}(chMeta, clientResult)

cc.nextId++
cc.clientsPerContext[cc.nextId] = clientResult
Expand All @@ -672,9 +661,11 @@ func (cc *environmentCoordinator) Close() error {
cc.mutexContext.Lock()
defer cc.mutexContext.Unlock()
for _, client := range cc.clientsPerContext {
err := client.Close()
if err != nil {
logs.LogWarn("Error during close the client, %s", err)
for i := range client.coordinator.producers {
_ = client.coordinator.producers[i].(*Producer).Close()
}
for i := range client.coordinator.consumers {
_ = client.coordinator.consumers[i].(*Consumer).Close()
}
}
return nil
Expand Down
6 changes: 0 additions & 6 deletions pkg/stream/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ type Event struct {
Err error
}

type metaDataUpdateEvent struct {
StreamName string
code uint16
}

type onInternalClose func(ch <-chan uint8)
type metadataListener chan metaDataUpdateEvent

type ChannelClose = <-chan Event
type ChannelPublishConfirm chan []*ConfirmationStatus
8 changes: 3 additions & 5 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,10 @@ func (producer *Producer) close(reason Event) error {
producer.closeConfirmationStatus()

if producer.options == nil {
logs.LogWarn("producer options is nil, the close will be ignored")
return nil
}
_ = producer.options.client.coordinator.RemoveProducerById(producer.id, reason)
_, _ = producer.options.client.coordinator.ExtractProducerById(producer.id)

if !producer.options.client.socket.isOpen() {
return fmt.Errorf("tcp connection is closed")
Expand All @@ -620,10 +621,7 @@ func (producer *Producer) close(reason Event) error {
}

if producer.options.client.coordinator.ProducersCount() == 0 {
err := producer.options.client.Close()
if err != nil {
logs.LogError("error during closing client: %s", err)
}
_ = producer.options.client.Close()
}

if producer.onClose != nil {
Expand Down
Loading

0 comments on commit 1bf978b

Please sign in to comment.