From 1bf978bfe445cac3278f660af9abfa3a59e1ce29 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 16:57:41 +0100 Subject: [PATCH] wip Signed-off-by: Gabriele Santomaggio --- pkg/ha/ha_consumer.go | 1 - pkg/stream/client.go | 19 ++++---- pkg/stream/constants.go | 1 + pkg/stream/consumer.go | 85 ++++++++++++++++++++-------------- pkg/stream/consumer_test.go | 2 +- pkg/stream/coordinator.go | 18 +------ pkg/stream/coordinator_test.go | 1 + pkg/stream/environment.go | 69 ++++++++++++--------------- pkg/stream/listeners.go | 6 --- pkg/stream/producer.go | 8 ++-- pkg/stream/server_frame.go | 13 +++--- 11 files changed, 101 insertions(+), 122 deletions(-) diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 24e8bb9d..4ceec9ea 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -88,7 +88,6 @@ func NewReliableConsumer(env *stream.Environment, streamName string, logs.LogDebug("[Reliable] - creating %s", res.getInfo()) err := res.newConsumer() if err == nil { - res.setStatus(StatusOpen) } logs.LogDebug("[Reliable] - created %s", res.getInfo()) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9e25a992..ae94d525 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -63,8 +63,8 @@ type Client struct { tcpParameters *TCPParameters saslConfiguration *SaslConfiguration - mutex *sync.Mutex - metadataListener metadataListener + mutex *sync.Mutex + //metadataListener metadataListener lastHeartBeat HeartBeat socketCallTimeout time.Duration availableFeatures *availableFeatures @@ -512,10 +512,10 @@ func (c *Client) Close() error { } } - if c.metadataListener != nil { - close(c.metadataListener) - c.metadataListener = nil - } + //if c.metadataListener != nil { + // close(c.metadataListener) + // c.metadataListener = nil + //} c.closeHartBeat() if c.getSocket().isOpen() { @@ -747,6 +747,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) { streamMetadata := streamsMetadata.Get(stream) if streamMetadata.responseCode != responseCodeOk { + return nil, lookErrorCode(streamMetadata.responseCode) } @@ -992,12 +993,8 @@ func (c *Client) DeclareSubscriber(streamName string, go func() { for { select { - case code := <-consumer.response.code: - if code.id == closeChannel { - return - } - case chunk, ok := <-consumer.response.chunkForConsumer: + case chunk, ok := <-consumer.chunkForConsumer: if !ok { return } diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index a19ab723..c760d370 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -119,6 +119,7 @@ const ( LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" DeletePublisher = "deletePublisher" + UnSubscribe = "unSubscribe" StreamTcpPort = "5552" diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index e9ff54bc..8f6d8c70 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -10,12 +10,13 @@ import ( ) type Consumer struct { - ID uint8 - response *Response - options *ConsumerOptions - onClose onInternalClose - mutex *sync.Mutex - MessagesHandler MessagesHandler + ID uint8 + response *Response + options *ConsumerOptions + onClose onInternalClose + mutex *sync.Mutex + chunkForConsumer chan chunkInfo + MessagesHandler MessagesHandler // different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration // and won't change. currentOffset is the status of the offset currentOffset int64 @@ -312,52 +313,64 @@ func (consumer *Consumer) Close() error { if consumer.getStatus() == closed { return AlreadyClosed } - consumer.cacheStoreOffset() - - consumer.setStatus(closed) - _, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID) - if errGet != nil { - return nil - } - - length := 2 + 2 + 4 + 1 - resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe) - correlationId := resp.correlationid - var b = bytes.NewBuffer(make([]byte, 0, length+4)) - writeProtocolHeader(b, length, CommandUnsubscribe, - correlationId) - - writeByte(b, consumer.ID) - err := consumer.options.client.handleWrite(b.Bytes(), resp) - if err.Err != nil && err.isTimeout { - return err.Err - } - - errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{ + return consumer.close(Event{ Command: CommandUnsubscribe, StreamName: consumer.GetStreamName(), Name: consumer.GetName(), - Reason: "unSubscribe", + Reason: UnSubscribe, Err: nil, }) +} - if errC != nil { - logs.LogWarn("Error during remove consumer id:%s", errC) +func (consumer *Consumer) close(reason Event) error { + if consumer.options == nil { + logs.LogWarn("consumer options is nil, the close will be ignored") + return nil } + consumer.cacheStoreOffset() + consumer.setStatus(closed) - if consumer.options.client.coordinator.ConsumersCount() == 0 { - err := consumer.options.client.Close() - if err != nil { - return err + if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { + closeHandler <- reason + close(consumer.closeHandler) + consumer.closeHandler = nil + } + + if consumer.chunkForConsumer != nil { + close(consumer.chunkForConsumer) + consumer.chunkForConsumer = nil + } + + if consumer.response.data != nil { + close(consumer.response.data) + consumer.response.data = nil + } + + if reason.Reason == UnSubscribe { + length := 2 + 2 + 4 + 1 + resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe) + correlationId := resp.correlationid + var b = bytes.NewBuffer(make([]byte, 0, length+4)) + writeProtocolHeader(b, length, CommandUnsubscribe, + correlationId) + + writeByte(b, consumer.ID) + err := consumer.options.client.handleWrite(b.Bytes(), resp) + if err.Err != nil && err.isTimeout { + logs.LogWarn("error during consumer unsubscribe:%s", err.Err) } } + if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 { + _ = consumer.options.client.Close() + } + ch := make(chan uint8, 1) ch <- consumer.ID consumer.onClose(ch) close(ch) - return err.Err + return nil } func (consumer *Consumer) cacheStoreOffset() { diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 49406ed8..a1436f72 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -123,7 +123,7 @@ var _ = Describe("Streaming Consumers", func() { Eventually(func() int32 { return atomic.LoadInt32(&commandIdRecv) }, 5*time.Second).Should(Equal(int32(CommandUnsubscribe)), - "command received should be CommandMetadataUpdate ") + "command received should be unSubscribe ") Expect(err).NotTo(HaveOccurred()) }) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index a2d9b198..4d8b6f62 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -37,7 +37,6 @@ type chunkInfo struct { type Response struct { code chan Code data chan interface{} - chunkForConsumer chan chunkInfo commandDescription string correlationid int } @@ -84,20 +83,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) if err != nil { return err } - consumer.setStatus(closed) - reason.StreamName = consumer.GetStreamName() - reason.Name = consumer.GetName() - - if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { - closeHandler <- reason - close(closeHandler) - closeHandler = nil - } - - close(consumer.response.chunkForConsumer) - close(consumer.response.code) + return consumer.close(reason) - return nil } func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { @@ -117,7 +104,6 @@ func (coordinator *Coordinator) RemoveResponseById(id interface{}) error { err = coordinator.removeById(fmt.Sprintf("%d", id), coordinator.responses) close(resp.code) close(resp.data) - close(resp.chunkForConsumer) return err } @@ -131,7 +117,6 @@ func newResponse(commandDescription string) *Response { res.commandDescription = commandDescription res.code = make(chan Code, 1) res.data = make(chan interface{}, 1) - res.chunkForConsumer = make(chan chunkInfo, 100) return res } @@ -200,6 +185,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, lastAutoCommitStored: time.Now(), + chunkForConsumer: make(chan chunkInfo, 100), } coordinator.consumers[lastId] = item diff --git a/pkg/stream/coordinator_test.go b/pkg/stream/coordinator_test.go index bfc48771..322de58a 100644 --- a/pkg/stream/coordinator_test.go +++ b/pkg/stream/coordinator_test.go @@ -172,6 +172,7 @@ var _ = Describe("Coordinator", func() { Command: 0, StreamName: "UNIT_TESTS", Name: "", + Reason: "UNIT_TEST", Err: nil, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 2a4c52a5..11a5cc52 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -515,12 +515,7 @@ func (c *Client) maybeCleanProducers(streamName string) { } } c.mutex.Unlock() - if c.coordinator.ProducersCount() == 0 { - err := c.Close() - if err != nil { - return - } - } + } func (c *Client) maybeCleanConsumers(streamName string) { @@ -540,12 +535,6 @@ func (c *Client) maybeCleanConsumers(streamName string) { } } c.mutex.Unlock() - if c.coordinator.ConsumersCount() == 0 { - err := c.Close() - if err != nil { - return - } - } } func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, @@ -603,18 +592,18 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client { clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut) - chMeta := make(chan metaDataUpdateEvent, 1) - clientResult.metadataListener = chMeta - go func(ch <-chan metaDataUpdateEvent, cl *Client) { - for metaDataUpdateEvent := range ch { - clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName) - cc.maybeCleanClients() - if !cl.socket.isOpen() { - return - } - } - - }(chMeta, clientResult) + //chMeta := make(chan metaDataUpdateEvent, 1) + //clientResult.metadataListener = chMeta + //go func(ch <-chan metaDataUpdateEvent, cl *Client) { + // for metaDataUpdateEvent := range ch { + // clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName) + // cc.maybeCleanClients() + // if !cl.socket.isOpen() { + // return + // } + // } + // + //}(chMeta, clientResult) cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult @@ -638,18 +627,18 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro if clientResult == nil { clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout) - chMeta := make(chan metaDataUpdateEvent) - clientResult.metadataListener = chMeta - go func(ch <-chan metaDataUpdateEvent, cl *Client) { - for metaDataUpdateEvent := range ch { - clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName) - cc.maybeCleanClients() - if !cl.socket.isOpen() { - return - } - } - - }(chMeta, clientResult) + //chMeta := make(chan metaDataUpdateEvent) + //clientResult.metadataListener = chMeta + //go func(ch <-chan metaDataUpdateEvent, cl *Client) { + // for metaDataUpdateEvent := range ch { + // clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName) + // cc.maybeCleanClients() + // if !cl.socket.isOpen() { + // return + // } + // } + // + //}(chMeta, clientResult) cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult @@ -672,9 +661,11 @@ func (cc *environmentCoordinator) Close() error { cc.mutexContext.Lock() defer cc.mutexContext.Unlock() for _, client := range cc.clientsPerContext { - err := client.Close() - if err != nil { - logs.LogWarn("Error during close the client, %s", err) + for i := range client.coordinator.producers { + _ = client.coordinator.producers[i].(*Producer).Close() + } + for i := range client.coordinator.consumers { + _ = client.coordinator.consumers[i].(*Consumer).Close() } } return nil diff --git a/pkg/stream/listeners.go b/pkg/stream/listeners.go index b5c74b20..f0750226 100644 --- a/pkg/stream/listeners.go +++ b/pkg/stream/listeners.go @@ -8,13 +8,7 @@ type Event struct { Err error } -type metaDataUpdateEvent struct { - StreamName string - code uint16 -} - type onInternalClose func(ch <-chan uint8) -type metadataListener chan metaDataUpdateEvent type ChannelClose = <-chan Event type ChannelPublishConfirm chan []*ConfirmationStatus diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index a6bdb773..b18b154c 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -606,9 +606,10 @@ func (producer *Producer) close(reason Event) error { producer.closeConfirmationStatus() if producer.options == nil { + logs.LogWarn("producer options is nil, the close will be ignored") return nil } - _ = producer.options.client.coordinator.RemoveProducerById(producer.id, reason) + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -620,10 +621,7 @@ func (producer *Producer) close(reason Event) error { } if producer.options.client.coordinator.ProducersCount() == 0 { - err := producer.options.client.Close() - if err != nil { - logs.LogError("error during closing client: %s", err) - } + _ = producer.options.client.Close() } if producer.onClose != nil { diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 14d3cfb9..0f1e57f0 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -29,14 +29,17 @@ func logErrorCommand(error error, details string) { func (c *Client) handleResponse() { buffer := bufio.NewReader(c.socket.connection) + for { readerProtocol := &ReaderProtocol{} + frameLen, err := readUInt(buffer) if err != nil { logs.LogDebug("Read connection failed: %s", err) _ = c.Close() break } + c.setLastHeartBeat(time.Now()) readerProtocol.FrameLen = frameLen readerProtocol.CommandID = uShortExtractResponseCode(readUShort(buffer)) @@ -407,7 +410,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) { // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { - consumer.response.chunkForConsumer <- chunk + consumer.chunkForConsumer <- chunk } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) @@ -487,12 +490,8 @@ func (c *Client) metadataUpdateFrameHandler(buffer *bufio.Reader) { if code == responseCodeStreamNotAvailable { stream := readString(buffer) logs.LogDebug("stream %s is no longer available", stream) - c.mutex.Lock() - c.metadataListener <- metaDataUpdateEvent{ - StreamName: stream, - code: responseCodeStreamNotAvailable, - } - c.mutex.Unlock() + c.maybeCleanProducers(stream) + c.maybeCleanConsumers(stream) } else { //TODO handle the error, see the java code